大数据分析与应用实验任务八

大数据分析与应用实验任务八

实验目的

  • 进一步熟悉pyspark程序运行方式;
  • 熟练掌握pysaprk RDD基本操作相关的方法、函数。

实验任务

进入pyspark实验环境,在图形界面的pyspark命令行窗口中完成下列任务:

在实验环境中自行选择路径新建以自己姓名拼音命名的文件夹,后续代码中涉及的文件请保存到该文件夹下(需要时文件夹中可以创建新的文件夹)。

一、 参考书上例子,理解并完成RDD常用操作(4.1.2节内容);
1.转换操作
(1)filter(func)

filter(func)操作会筛选出满足函数 func 的元素,并返回一个新的数据集。例如:

lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
linesWithSpark = lines.filter(lambda line: "Spark" in line) 
linesWithSpark.foreach(print) 

image-20231116111150747

(2)map(func)

map(func)操作将每个元素传递到函数 func 中,并将结果返回为一个新的数据集。例如:

data = [1,2,3,4,5] 
rdd1 = sc.parallelize(data) 
rdd2 = rdd1.map(lambda x:x+10) 
rdd2.foreach(print) 

image-20231116111301428

下面是另外一个实例:

lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
words = lines.map(lambda line:line.split(" ")) 
words.foreach(print) 

image-20231116111354416

(3)flatMap(func)

flatMap(func)与 map()相似,但每个输入元素都可以映射到 0 或多个输出结果。例如:

lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
words = lines.flatMap(lambda line:line.split(" ")) 
words.foreach(print) 

image-20231116111511592

(4)groupByKey()

groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集。下面给

出一个简单实例,代码如下:

words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)]) 
words1 = words.groupByKey() 
words1.foreach(print) 

image-20231116111553622

(5)reduceByKey(func)

reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每

个值是将每个 key 传递到函数 func 中进行聚合后得到的结果。这里给出一个简单实例,代码如下:

words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)]) 
words1 = words.reduceByKey(lambda a,b:a+b) 
words1.foreach(print)

image-20231116111642594

2.行动操作
rdd = sc.parallelize([1,2,3,4,5]) 
rdd.count() 
rdd.first() 
rdd.take(3) 
rdd.reduce(lambda a,b:a+b) 
rdd.collect() 
rdd.foreach(lambda elem:print(elem))

image-20231116111735249

3.惰性机制
lineslzy = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
lineLengths = lineslzy.map(lambda s:len(s)) 
totalLength = lineLengths.reduce(lambda a,b:a+b) 
print(totalLength)

image-20231116111910718

二、 参考书上例子,理解并完成键值对RDD常用操作(4.2.2节内容);

常用的键值对转换操作包括 reduceByKey(func)、groupByKey()、keys、values、sortByKey()、sortBy()、mapValues(func)、join()和 combineByKey 等。

1.reduceByKey(func)
pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1),("罗忠烨",1)]) 
pairRDD.reduceByKey(lambda a,b:a+b).foreach(print) 

image-20231116112042871

2.groupByKey()
list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)] 
pairRDD = sc.parallelize(list) 
pairRDD.groupByKey() 
pairRDD.groupByKey().foreach(print)

image-20231116112213649

对于一些操作,既可以通过 reduceByKey()得到结果,也可以通过组合使用 groupByKey()和 map()操作得到结果,二者是“殊途同归”,下面是一个实例:

words = ["one", "two", "two", "three", "three", "three"] 
wordPairsRDD = sc.parallelize(words).map(lambda word:(word, 1)) 
wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda a,b:a+b) 
wordCountsWithReduce.foreach(print) 
wordCountsWithGroup = wordPairsRDD.groupByKey().map(lambda t:(t[0],sum(t[1]))) 
wordCountsWithGroup.foreach(print) 

image-20231116121259280

3.keys
list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
pairRDD = sc.parallelize(list) 
pairRDD.keys().foreach(print)

image-20231116112613497

4.values
list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1),("luozhongyeTop1",1)] 
pairRDD = sc.parallelize(list) 
pairRDD.values().foreach(print)

image-20231116112822976

5.sortByKey()
list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
pairRDD = sc.parallelize(list) 
pairRDD.foreach(print) 
pairRDD.sortByKey().foreach(print)

image-20231116113006599

6.sortBy()
d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9),("luozhongye",1)]) 
d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()

image-20231116113102891

sortByKey(False)括号中的参数 False 表示按照降序排序,如果没有提供参数 False,则默认采用升序排序(即参数取值为 True)。从排序后的效果可以看出,所有键值对都按照 key 的降序进行了排序,因此输出[(‘g’, 21), (‘f’, 29), (‘e’, 17), (‘d’, 9), (‘c’, 27), (‘b’, 38), (‘a’, 42)]。但是,如果要根据 21、29、17 等数值进行排序,就无法直接使用 sortByKey()来实现,这时可以使用 sortBy(),代码如下:

d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)]) 
d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x,False).collect() 
d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[0],False).collect() 
d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False).collect() 

image-20231116113232289

7.mapValues(func)
list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
pairRDD = sc.parallelize(list) 
pairRDD1 = pairRDD.mapValues(lambda x:x+1) 
pairRDD1.foreach(print)

image-20231116113332961

8.join()
pairRDD1 = sc.parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)]) 
pairRDD2 = sc.parallelize([("spark","fast")]) 
pairRDD3 = pairRDD1.join(pairRDD2) 
pairRDD3.foreach(print)

image-20231116113434643

9.combineByKey

(1)createCombiner:在第一次遇到 key 时创建组合器函数,将 RDD 数据集中的 V 类型值转换成 C 类型值(V => C);

(2)mergeValue:合并值函数,再次遇到相同的 key 时,将 createCombiner 的 C 类型值与这次传入的 V 类型值合并成一个 C 类型值(C,V)=>C;

(3)mergeCombiners:合并组合器函数,将 C 类型值两两合并成一个 C 类型值;

(4)partitioner:使用已有的或自定义的分区函数,默认是 HashPartitioner;

(5)mapSideCombine:是否在 map 端进行 Combine 操作,默认为 true。

下面通过一个实例来解释如何使用 combineByKey 操作。假设有一些销售数据,数据采用键值对的形式,即<公司,当月收入>,要求使用 combineByKey 操作求出每个公司的总收入和每月平均收入,并保存在本地文件中。

为了实现该功能,可以创建一个代码文件“/root/Desktop/luozhongye/Combinelzy.py”,并输入如下代码:

#!/usr/bin/env python3 
from pyspark import SparkConf, SparkContext 

conf = SparkConf().setMaster("local").setAppName("Combine ") 

sc = SparkContext(conf = conf) 

data = sc.parallelize([("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92)],3) 

res = data.combineByKey(lambda income:(income,1),lambda acc,income:(acc[0]+income, acc[1]+1),lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])).map(lambda x:(x[0],x[1][0],x[1][0]/float(x[1][1])))

res.repartition(1).saveAsTextFile("file:///root/Desktop/luozhongye/combineresultlzy")

执行如下命令运行该程序:

cd /root/Desktop/luozhongye 
/usr/local/spark/bin/spark-submit Combinelzy.py

image-20231116113910011

三、 逐行理解并运行4.4.1实例“求TOP值”。

假设在某个目录下有若干个文本文件,每个文本文件里面包含了很多行数据,每行数据由 4 个字段的值构成,不同字段值之间用逗号隔开,4 个字段分别为 orderid、userid、payment 和 productid,要求求出 Top N 个 payment 值。如下为一个样例文件 file0lzy.txt:

1,1768,50,155 
2,1218,600,211 
3,2239,788,242 
4,3101,28,599 
5,4899,290,129 
6,3110,54,1201 
7,4436,259,877 
8,2369,7890,27

实现上述功能的代码文件“/root/Desktop/luozhongye/TopN.py”的内容如下:

# !/usr/bin/env python3 
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
lines = sc.textFile("file:///root/Desktop/luozhongye/file0.txt")
result1 = lines.filter(lambda line: (len(line.strip()) > 0) and (len(line.split(",")) ==
                                                                 4))
result2 = result1.map(lambda x: x.split(",")[2])
result3 = result2.map(lambda x: (int(x), ""))
result4 = result3.repartition(1)
result5 = result4.sortByKey(False)
result6 = result5.map(lambda x: x[0])
result7 = result6.take(5)
for a in result7:
	print(a)

image-20231116121013386
= result1.map(lambda x: x.split(“,”)[2])
result3 = result2.map(lambda x: (int(x), “”))
result4 = result3.repartition(1)
result5 = result4.sortByKey(False)
result6 = result5.map(lambda x: x[0])
result7 = result6.take(5)
for a in result7:
print(a)

![在这里插入图片描述](https://img-blog.csdnimg.cn/a3c32d0053e2481c83318d6c27bb68bb.png)


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/152595.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

非对口专业测试人,婉拒猎头、放弃6份高薪offer,你敢信?

从非对口的国贸专业&#xff0c;步入测试之路&#xff1b;从红色旅游小城湘潭&#xff0c;迈入国际化都市上海。“明确方向-及时实践-谨慎选择-踏实扎根-计划未来”。她的每一步&#xff0c;都走得格外坚定有力......话不多说&#xff0c;让我们一起来看看这位小姐姐的成长故事…

PyTorch:张量与矩阵

PyTorch 是一个基于 Python 的科学计算包&#xff0c;专门针对深度学习研究&#xff0c;提供了丰富的工具和库。在 PyTorch 中&#xff0c;张量&#xff08;tensor&#xff09;是深度学习的核心数据结构&#xff0c;它可以看作是可以进行自动微分的多维数组。张量不仅可以代表标…

DSVPN简介

定义 动态智能VPN&#xff08;Dynamic Smart Virtual Private Network&#xff09;&#xff0c;简称DSVPN&#xff0c;是一种在Hub-Spoke组网方式下为公网地址动态变化的分支之间建立VPN隧道的解决方案。 目的 越来越多的企业希望建立Hub-Spoke方式的IPSec VPN网络将企业总部…

Linux学习第42天:Linux RS232/485/GPS 驱动实验:天外来客

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 Linux的学习笔记今天更新到了第42天。鉴于国往笔记内容整理中出现的问题&#xff0c;我尽量按照平时学习时笔记的要求进行优化。尽量不再大段大段的贴代码。而是…

解决Tomcat中文乱码

cmd乱码如图&#xff1a; idea中运行Tomcat控制台出现乱码&#xff1a; 解决办法&#xff1a; 找到两个idea的vmoptions配置文件&#xff0c;在文件中追加-Dfile.encodingUTF-8 -Dfile.encodingUTF-8保存退出。 重启idea重新运行Tomcat&#xff1a; maven、tomcat 超级详…

什么是 SSL?SSL/TLS是如何工作的?HTTP和HTTPS有什么区别?

SSL 代表安全套接字层&#xff0c;是指用于加密、保护和验证互联网上之通信的协议。尽管 SSL 在一段时间前已被称为 TLS&#xff08;传输层安全性&#xff09;的更新协议代替&#xff0c;但“SSL”仍是该技术的常用术语。 SSL/TLS 的主要用例是保护客户端和服务器之间的通信安…

解决requests库中session.verify参数失效的问题

在使用requests库进行HTTP请求时&#xff0c;如果在环境变量中设置了’REQUESTS_CA_BUNDLE’&#xff0c;并且在session对象中设置了verify参数为False&#xff0c;那么API请求会使用环境变量中的值而不是session对象中的值。这是因为在requests库中&#xff0c;当session对象中…

Find My婴儿车|苹果Find My技术与婴儿车结合,智能防丢,全球定位

婴儿车是一种为婴儿户外活动提供便利而设计的工具车&#xff0c;是宝宝最喜爱的散步交通工具&#xff0c;更是妈妈带宝宝上街购物时的必须品。随着现在三胎的放开&#xff0c;婴儿车市场已经迎来上升的趋势。 在智能化加持下&#xff0c;防丢功能的加入使得人们日益关心物品的…

深度学习YOLO图像视频足球和人体检测 - python opencv 计算机竞赛

文章目录 0 前言1 课题背景2 实现效果3 卷积神经网络4 Yolov5算法5 数据集6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习YOLO图像视频足球和人体检测 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非…

TEMU要求提交RSL Report 铅镉RSL邻苯项目化学物质检测报告

TEMU要求提交RSL Report 铅镉RSL邻苯项目化学物质检测报告 如果您在亚马逊上销售商品&#xff0c;则必须遵守所有适用的欧盟和地方法律法规&#xff0c;以及适用于这些商品和商品信息的亚马逊政策。要在亚马逊上销售某些商品&#xff0c;( xxdu2016 )您需要向我们提供 REACH 符…

mybatis-plus3.5.3.1 支持不同数据源sql适配

mybatis-plus3.5.3.1 支持不同数据源sql适配 背景 最近公司要求支持国产数据库达梦&#xff0c;人大金仓&#xff0c;高斯等数据库&#xff0c;这些数据库与mysql的语法有一些差异&#xff0c;需要做一些兼容操作。 解决问题 1.不同数据库分页不同 2.支持通过参数控制执行…

Spi机制的必要性

SpringBoot 为啥单独加载类路径下spring.factories文件中的类&#xff1f; SpringBoot 应用运行过程中存在两种类型的类初始化&#xff1a;一部分为已经提前装载到IOC容器中的bean&#xff0c;另一部分则为实时new的bean。 IOC容器中的bean包含&#xff1a;启动类所在包路径下…

Docker的3主3从redis集群配置(扩容和缩容配置)

3主3从redis集群配置 1、关闭防火墙启动docker后台服务 systemctl start docker2、新建6个docker容器redis实例 docker run -d --name redis-node-1 --net host --privilegedtrue -v /data/redis/share/redis-node-1:/data redis:6.0.8 --cluster-enabled yes --appendonly …

基于Vue+SpringBoot的城市桥梁道路管理系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询城市桥梁4.2 新增城市桥梁4.3 编辑城市桥梁4.4 删除城市桥梁4.5 查询单个城市桥梁 五、免责说明 一、摘要 1.1 项目介绍 基于VueSpringBootMySQL的城市桥梁道路管理系统&#xff0c;支持…

小型机房380V断电报警门磁开关状态检测远程控制RTU

在现代社会中&#xff0c;小型机房起到了至关重要的作用&#xff0c;为各种系统和设备提供稳定的电力供应。然而&#xff0c;由于各种原因&#xff0c;如供电故障、设备故障或非法侵入等&#xff0c;机房的正常运行可能会受到威胁。为了保障机房的安全和可靠性&#xff0c;我们…

了解七大经典排序算法,看这一篇就足够了!!!

✏️✏️✏️好&#xff0c;那么今天给大家分享一下七大经典排序算法&#xff01; 清风的CSDN博客 &#x1f61b;&#x1f61b;&#x1f61b;希望我的文章能对你有所帮助&#xff0c;有不足的地方还请各位看官多多指教&#xff0c;大家一起学习交流&#xff01; 动动你们发财的…

java Could not resolve placeholder

1、参考&#xff1a;https://blog.csdn.net/yu1812531/article/details/123466616 2、配置文件: 3、在application.properties中设置要使用的配置文件

最简单的测试Jquery-jquery是否正常工作的代码

01-运行后在页面上显示“jQuery is working!” 代码如下&#xff1a; <!DOCTYPE html> <html> <head><meta charset"UTF-8"><title>it is title</title><meta name"viewport" content"widthdevice-width,in…

小程序游戏、App游戏与H5游戏:三种不同的游戏开发与体验方式

在当今数字化的时代&#xff0c;游戏开发者面临着多种选择&#xff0c;以满足不同用户群体的需求。小程序游戏、App游戏和H5游戏是三种流行的游戏开发和发布方式&#xff0c;它们各自具有独特的特点和适用场景。 小程序游戏&#xff1a;轻巧便捷的社交体验 小程序游戏是近年来…

小米手环8pro重新和手机配对解决办法

如果更换了手机&#xff0c;那么小米手环8pro是无法和新手机自动连接的。 但是在新手机上直接连接又连接不上&#xff0c;搜索蓝牙根本找不到手环的蓝牙。 解决办法就是&#xff1a; 把手环恢复出厂&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 是的&…