(一)PySpark3:安装教程及RDD编程(非常详细)

目录

一、pyspark介绍

二、PySpark安装

三、RDD编程

1、创建RDD

2、常用Action操作

①collect

②take

③takeSample

④first

⑤count

⑥reduce

⑦foreach

⑧countByKey

⑨saveAsTextFile

3、常用Transformation操作

①map

②filter

③flatMap

④sample

⑤distinct

⑥subtract

⑦union

⑧intersection

⑨cartesian

⑩sortBy

⑪zip

⑫zipWithIndex

4、常用Transformation操作(键值对)

①reduceByKey

②groupByKey

③sortByKey

④join / leftOuterJoin / rightOuterJoin

⑤cogroup

⑥subtractByKey

⑦foldByKey

5、分区操作

①glom

②HashPartitioner

③mapPartitions / mapPartitionsWithIndex

④coalesce

⑤repartition

⑥partitionBy

6、缓存操作

①cache

②persist

③checkpoint

7、共享变量

①broadcast

②accumulator

四、总结


一、pyspark介绍

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。


PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

前置知识:

1、熟悉Spark RDD原理,了解RDD常用算子
2、具有Python编码能力,熟悉Python中numpy, pandas库的基本用法。
3、了解机器学习算法原理,如逻辑回归、决策树等等
4、需要安装:JDK、Anaconda

二、PySpark安装

首先安装spark,本文使用的安装文件为:spark-3.2.1-bin-hadoop3.2。即Spark版本为3.2.1,Hadoop可不安装,对本文后续代码运行无影响。

百度云链接如下:

链接:https://pan.baidu.com/s/1GmPZBoBtSZWJtPHqm-DhwA?pwd=bcm5 
提取码:bcm5

将下载的安装文件解压到指定目录即可,例如我的目录:D:\bigdata\spark-3.2.1-bin-hadoop3.2

配置系统变量:

此电脑-右键点击“属性”-高级系统设置-环境变量-系统变量

#系统变量新建
SPARK_HOME D:\bigdata\spark-3.2.1-bin-hadoop3.2 #换成你的解压目录
PYSPARK_DEIVER_PYTHON_OPTS notebook
PYSPARK_DEIVER_PYTHON ipython
PYTHONPATH %SPARK_HOME%\python\lib\py4j;%SPARK_HOME%\python\lib\pyspark

#path添加
%SPARK_HOME%\bin

修改配置文件:
在解压路径目录conf下,复制文件spark-env.sh.template,修改文件名为spark-env.sh

修改配置文件spark-env.sh,在文件末尾添加以下代码:

#D:\Anaconda3换成你的anaconda安装目录
export PYSPARK_PYTHON=D:\Anaconda3
export PYSPARK_DRIVER_PYTHON=D:\Anaconda3
export PYSPARK_SUBMIT_ARGS='--master local[*]'
#local[*]  是利用所有的资源

以上步骤完成,spark已经安装完成。

接下来在Anaconda创建虚拟环境,安装相关python库。需要注意,Python安装的pyspark版本必须与前面安装的spark版本一致。

#创建虚拟环境
conda create -n spark python=3.8

#进入虚拟环境
conda activate spark

#安装相关包
pip install pyspark==3.2.1 findspark pyhive notebook pandas

三、RDD编程

第一步,初始化Spark环境,创建一个Spark应用程序:

import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()

conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)

以上代码中,首先创建一个SparkConf对象,用于配置Spark应用程序。通过setAppName设置应用程序的名称为"test",通过setMaster设置运行模式为本地模式,使用4个本地线程。

随后,创建一个SparkContext对象,它是与Spark集群通信的主要入口点。一旦SparkContext被创建,就可以使用它来执行各种分布式计算任务。

1、创建RDD

RDD(Resilient Distributed Dataset):是 Spark 中的核心数据结构,代表分布在集群节点上的不可变、弹性(可容错)、可并行计算的数据集。RDD 可以分为多个分区,每个分区可以在集群中的不同节点上进行并行处理。

①用textFile方法加载本地或者集群文件系统中的数据

#从本地文件系统中加载数据
file = "./data/test.txt"
rdd = sc.textFile(file,3)

#从集群文件系统中加载数据
file = "hdfs://localhost:9000/user/data/test.txt"
#也可以省去hdfs://localhost:9000
rdd = sc.textFile(file,3)

②用parallelize方法将Driver中的数据结构并行化成RDD。

rdd = sc.parallelize(range(1,11),2)

2、常用Action操作

其主要特点如下:

触发计算:Action操作是Spark计算的触发点,当调用 Action操作时,Spark将执行整个RDD的计算流程,并生成最终结果。这与Transformation操作的惰性计算形成对比。

输出结果:Action操作生成非惰性结果,即它们会立即执行计算并返回实际的结果。可以将计算结果返回到本地驱动程序,也可以将结果写入外部存储系统(如 HDFS、数据库等),另外还可以将结果缓存到驱动程序或本地内存中(但对于大型数据集来说可能会导致内存问题)。

①collect

collect()是一个action操作,用于从RDD中收集所有元素到Driver节点,形成一个本地的数据集(数组)。

rdd = sc.parallelize(range(1,11),2)
rdd.collect()

运行结果:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

注意:collect()操作将整个RDD中的数据收集到Driver节点的内存中,在大规模数据集上执行该操作可能导致内存不足或性能问题。
因此,collect()操作适用于小规模的结果集,用于调试和查看数据。对于大规模数据集,更常见的做法是使用转换操作和行动操作组合来实现分布式计算,最后将结果写到外部存储或以其他方式处理。

②take

rdd.take(n)用于获取RDD中的前n个元素。不同于collect(),take(n)只取前n个元素,因此在处理大规模数据时更为高效。

rdd = sc.parallelize(range(1,11),2)
rdd.take(4)

输出结果:

[1, 2, 3, 4]
takeSample

rdd.takeSample(withReplacement, num, seed=None)用于从RDD中获取指定数量的随机样本。

参数说明:
withReplacement:布尔值,表示是否允许采样时元素的重复抽取。如果为 True,则允许重复抽取;如果为 False,则不允许。
num:要获取的样本数量。
seed:可选的种子值,用于控制随机数生成。如果提供了相同的种子值,多次调用takeSample将产生相同的样本。

rdd = sc.parallelize(range(1,11),2)
rdd.takeSample(False,5,0)

输出结果:

[8, 9, 2, 6, 4]
first

first()用于获取RDD中的第一个元素,对于大型数据集的性能较好。

rdd = sc.parallelize(range(1,11),2)
rdd.first()

输出结果:

1
⑤count

count()用于获取RDD中元素的总数量,以了解数据规模。

rdd = sc.parallelize(range(1,11),2)
rdd.count()

输出结果:

10
⑥reduce

reduce()用于对RDD中的元素进行规约操作,两两结合进行某种操作后继续与下一个元素结合,直到规约成一个最终的结果。reduce()通常用于执行可以并行化的可交换和可结合的操作,例如对数字进行加法或求和。这样的操作可以在每个分区上并行执行,然后合并结果。

#计算0+1+2+3+4+5+6+7+8+9
rdd = sc.parallelize(range(10),5) 
rdd.reduce(lambda x,y:x+y)

输出结果:

45
⑦foreach

rdd.foreach()用于对RDD中的每个元素应用指定的函数,与map不同,foreach 是一个行动操作,它会在每个分区上并行地对每个元素执行给定的函数。

rdd = sc.parallelize(range(10),5) 
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)

输出结果:

45

在以上代码中,通过sc.accumulator()创建了一个累加器,并初始化其值为0。然后使用rdd.foreach()对RDD中的每个元素执行匿名函数,该函数将元素的值累加到累加器中。由于累加器是在分布式环境中共享的,因此每个节点上的累加器都能够更新。

countByKey

rdd.countByKey()用于统计 (key, value) 对的RDD中每个key的出现次数。

pairRdd = sc.parallelize([("hello",1),("world",4),("hello",9),("something",16)]) 
pairRdd.countByKey()

输出结果:

defaultdict(int, {'hello': 2, 'world': 1, 'something': 1})
saveAsTextFile

saveAsTextFile()用于将RDD的内容保存为文本文件,即将分布式数据集的结果写入到本地文件系统或分布式文件系统(如HDFS)中。

#saveAsTextFile保存rdd成text文件到本地
text_file = "./test/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)

#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()

3、常用Transformation操作

Transformation操作是对RDD进行变换的操作,它们不会立即执行,而是构建了一个表示要在RDD上执行的操作的执行计划。Transformation操作是为了支持分布式计算而设计的。它们在整个集群上并行运行,并利用RDD的不可变性和分区的概念来实现高效的分布式处理。通过构建逻辑执行计划,Spark可以优化计算并在整个集群上分布计算任务,以提高性能。

其主要特点如下:

惰性计算:当应用一个Transformation操作时,Spark只是记录了该操作的存在,并没有实际执行计算。实际的计算将会在Action操作触发时进行。

生成新的RDD:由于RDD一旦创建就不能被修改,所以Transformation操作通常生成一个新的RDD,而不是修改原始的RDD。

窄/宽依赖:Transformation操作可以分为窄依赖和宽依赖。
窄依赖指每个父分区中的数据仅依赖于该父分区的数据,例如map操作。
宽依赖指某个父分区的数据可能依赖于多个父分区的数据,例如groupByKey和reduceByKey操作,这会导致数据的重新分区,因此可能引起数据的Shuffle。

①map

rdd.map()用于对RDD中的每个元素应用一个指定的函数,并返回一个包含应用函数后结果的RDD。rdd.map()接受一个函数作为参数(可以是lambda匿名函数),该函数将被应用到RDD中的每个元素。

rdd = sc.parallelize(range(10),3)
rdd.map(lambda x:x**2).collect()

输出结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
②filter

rdd.filter()用于对RDD中的元素进行过滤,并返回新RDD,其中包含满足指定条件的原始RDD中的元素。

rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()

输出结果:

[6, 7, 8, 9]
flatMap

rdd.flatMap()即在rdd.map()的基础上将所有的结果扁平化为一个新的RDD。

rdd = sc.parallelize(["hello world","hello China"])
print(rdd.map(lambda x:x.split(" ")).collect())
print(rdd.flatMap(lambda x:x.split(" ")).collect())

输出结果:

[['hello', 'world'], ['hello', 'China']]
['hello', 'world', 'hello', 'China']
sample

rdd.sample()用于从RDD每个分区按照比例随机抽样一部分元素,生成新的RDD。

参数如下:

withReplacement:是否放回抽样。true-有放回,false-无放回
fraction:期望样本的大小作为RDD大小的一部分。fraction范围在[0,1],即表示选择每个元素的概率。fraction大于1时,即表示选择每个元素的期望次数。
seed:随机数生成器的种子。

rdd = sc.parallelize(range(10),2)

#每个元素被抽到的概率为0.5,但输出的元素不一定是5个
print(rdd.sample(withReplacement=False, fraction=0.5,seed=0).collect())

#每个元素被抽到的期望次数是2,但输出的元素不一定是20个
print(rdd.sample(withReplacement=True, fraction=2,seed=0).collect())

输出结果:

[1, 4, 5, 7]
[0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 4, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 8, 9]

⑤distinct

rdd.distinct()对原始RDD进行去重。

rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()

输出结果:

[4, 1, 5, 2, 3]
⑥subtract

rdd1.subtract(rdd2)用于计算两个RDD的差集,返回在rdd1中但不在rdd2中的元素。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.subtract(rdd2).collect()

输出结果:

[1, 2, 3]
union

rdd1.union(rdd2)用于计算两个RDD的并集,需要注意返回结果中可能带有重复元素

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.union(rdd2).collect()

输出结果:

[1, 2, 3, 4, 5, 4, 5, 6, 7, 8]
intersection

rdd1.intersection(rdd2)用于计算两个RDD的交集。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.intersection(rdd2).collect()

输出结果:

[4, 5]
cartesian

rdd1.cartesian(rdd2)用于计算两个RDD的笛卡尔积。

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(['a', 'b', 'c'])
rdd1.cartesian(rdd2).collect()

输出结果:

[(1, 'a'),
 (1, 'b'),
 (1, 'c'),
 (2, 'a'),
 (2, 'b'),
 (2, 'c'),
 (3, 'a'),
 (3, 'b'),
 (3, 'c')]
sortBy

rdd.sortBy()用于对RDD中的元素按照指定的排序键进行排序。

参数如下:

rdd.sortBy(keyfunc, ascending=True, numPartitions=None)
keyfunc用于从 RDD 的每个元素中提取用于排序的键,可以是lambda匿名函数。
ascending表示排序的顺序。 True为升序,False为降序。
numPartitions表示返回结果RDD的分区数。

data = [(1, 'apple'), (3, 'orange'), (2, 'banana'), (4, 'grape')]
rdd = sc.parallelize(data)
rdd.sortBy(lambda x: x[0], ascending=True,numPartitions=2).collect()

输出结果:

[(1, 'apple'), (2, 'banana'), (3, 'orange'), (4, 'grape')]
⑪zip

rdd1.zip(rdd2)按照拉链方式将两个RDD中的元素一对一地合并成元组,效果类似python的zip函数,需要两个RDD具有相同的分区,每个分区元素数量相同。

rdd1 = sc.parallelize([1, 2, 3, 4, 5],2)
rdd2 = sc.parallelize(['a', 'b', 'c', 'd', 'e'],2)
rdd1.zip(rdd2).collect()

输出结果:

[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
zipWithIndex

rdd.zipWithIndex()用于将RDD中的每个元素与其在 RDD 中的索引位置一对一地合并成元组。

rdd = sc.parallelize([10, 20, 30, 40, 50])
rdd.zipWithIndex().collect()

输出结果:

[(10, 0), (20, 1), (30, 2), (40, 3), (50, 4)]

4、常用Transformation操作(键值对)

PairRDD中的元素是键值对,Spark提供了针对键值对的一系列转换和操作,使得对数据进行分组、聚合和排序等操作更加方便。

reduceByKey

rdd.reduceByKey()用于对RDD进行聚合的Transformation操作,将具有相同键的所有值根据提供的聚合函数进行合并。

参数如下:
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)
func: 聚合函数,接受两个参数,用于将相同键的值进行合并。
numPartitions: 可选参数,用于指定返回结果RDD的分区数。
partitionFunc: 可选参数,用于指定分区函数,默认为哈希分区函数。

data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
rdd.reduceByKey(lambda x, y: x + y).collect()

输出结果:

[('y', 6), ('x', 6), ('z', 4)]
groupByKey

rdd.groupByKey()用于对PairRDD进行分组的Transformation操作,将相同键的所有值放在一个迭代器中并返回新RDD,适用于不涉及值的聚合操作,只需按键进行分组的情况。不过groupByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。另外,生成的结果是以键值对形式的迭代器,存在大量数据时可能导致内存溢出。

data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
print(rdd.groupByKey().collect())

for key, values in rdd.groupByKey().collect():
    print(f"{key}: {list(values)}")

输出结果:

[('y', <pyspark.resultiterable.ResultIterable object at 0x00000259F5816C10>), ('x', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C790>), ('z', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C490>)]
y: [5, 1]
x: [3, 2, 1]
z: [4]
sortByKey

rdd.sortByKey()用于对PairRDD进行按键排序的Transformation操作,按照键进行排序,并返回一个新RDD。sortByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。在数据量庞大时,可能会影响性能,因为需要将数据在不同分区间移动以进行排序。

参数如下:
ascending:表示排序的顺序。True为升序(默认),False为降序。

data = [(3, 'x'), (5, 'y'), (2, 'z'), (4, 's')]
rdd = sc.parallelize(data)
rdd.sortByKey().collect()

输出结果:

[(2, 'z'), (3, 'x'), (4, 's'), (5, 'y')]
join / leftOuterJoin / rightOuterJoin

rdd1.join(rdd2)用于对两个PairRDD进行连接的Transformation操作,根据键将两个 PairRDD 中的元素进行连接。类似SQL中的inner join。

rdd1.leftOuterJoin(rdd2)、rdd1.rightOuterJoin(rdd2)分别是左关联、右关联。如果另一侧PairRDD 中没有匹配的键,则对应位置的值为None。

data1 = [('Tom', 18), ('Jerry', 19), ('Alice', 17)]
data2 = [('Tom', 'male'), ('Bob', 'male'), ('Alice', 'female')]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
print(rdd1.join(rdd2).collect())
print(rdd1.leftOuterJoin(rdd2).collect())
print(rdd1.rightOuterJoin(rdd2).collect())

输出结果:

[('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Jerry', (19, None)), ('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Tom', (18, 'male')), ('Bob', (None, 'male')), ('Alice', (17, 'female'))]
cogroup

rdd1.cogroup(rdd2)用于对两个PairRDD进行先后两次分组连接的Transformation操作,相当于对rdd1、rdd2分别进行goupByKey,再对两个结果进行groupByKey。

rdd1 = sc.parallelize([("a", 1), ("a", 2), ("b", 3)])
rdd2 = sc.parallelize([("a", 3), ("b", 4)])
[(x, tuple(map(list, y))) for x, y in sorted(list(rdd1.cogroup(rdd2).collect()))]

输出结果:

[('a', ([1, 2], [3])), ('b', ([3], [4]))]
subtractByKey

rdd1.subtractByKey(rdd2)用于对两个PairRDD求差集的Transformation操作,即返回在rdd1中而不在rdd2中的元素。

rdd1 = sc.parallelize([("x", 1), ("y", 2), ("z", 3)])
rdd2 = sc.parallelize([("x", 3), ("y", 4)])
rdd1.subtractByKey(rdd2).collect()

输出结果:

[('z', 3)]
foldByKey

foldByKey的操作和reduceByKey类似,但是foldByKey可以提供一个初始值

data = [('x', 1), ('x', 2), ('x', 3),  ('y', 1), ('y', 2), ('z', 1)]
rdd = sc.parallelize(data)
print(rdd.foldByKey(0, lambda x, y: x + y).collect())
print(rdd.foldByKey(1, lambda x, y: x + y).collect())

输出结果:

[('y', 3), ('x', 6), ('z', 1)]
[('y', 5), ('x', 8), ('z', 2)]

5、分区操作

RDD分区操作主要分为调整分区与转换分区操作。

调整分区操作用于调整已有RDD的分区结构,不改变数据的物理位置,仅影响分区元数据,通常性能开销较小。例如:分区数调整,即调整现有RDD的分区数,但不移动数据。

转换分区操作改变了RDD的分区结构,通常是在数据上执行Transformation操作,产生一个新的RDD,其分区数可能发生变化。例如通过指定的分区数或者使用一些具体的分区算法,重新组织数据分区。在数据的重新组织过程中可能涉及跨分区的数据移动,通常伴随着性能开销。

①glom

rdd.glom()用于将每个分区的数据转换为一个数组,是Transformation操作。适用于需要对每个分区进行整体操作的场景。

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
rdd.glom().collect()

输出结果:

[[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
②HashPartitioner

rdd.HashPartitioner()用于对PairRDD进行哈希分区,即根据键的哈希值将数据划分到不同的分区中。

data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.partitionBy(2)

for partition, data in enumerate(rdd2.glom().collect()):
    print(f"Partition {partition}: {data}")

输出结果:

Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]
mapPartitions / mapPartitionsWithIndex

rdd.mapPartitions()是Transformation操作,用于对RDD的每个分区执行一个自定义映射函数,该函数可以处理分区内的所有元素,而不是一次仅处理一个元素。mapPartitions能够减少通信开销,因为映射操作是在每个分区内进行的,适用于需要对整个分区进行批量操作的场景,而不适用于需要考虑跨分区元素之间关系的场景。

rdd.mapPartitionsWithIndex()类似于mapPartitions,但提供了分区索引信息,允许更细粒度的控制。

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
print(rdd.mapPartitions(lambda x:(i * 2 for i in x)).collect())
print(rdd.mapPartitionsWithIndex(lambda index,x:((index, i*2) for i in x)).collect())

输出结果:

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[(0, 2), (0, 4), (0, 6), (1, 8), (1, 10), (1, 12), (2, 14), (2, 16), (2, 18), (2, 20)]
④coalesce

coalesce()用于减少分区数的Transformation操作,可以尽量避免数据迁移,提升效率。

参数如下:

coalesce(numPartitions, shuffle=False)
numPartitions:新的分区数。
shuffle:是否进行数据洗牌,默认为False。当设置为 True 时,将触发数据洗牌操作,否则只是简单地减小分区数。

rdd = sc.parallelize(range(20),10)
print(rdd.glom().collect())
print(rdd.coalesce(2,shuffle=False).glom().collect())
print(rdd.coalesce(2,shuffle=True).glom().collect())

输出结果:

[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15], [16, 17], [18, 19]]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]
[[0, 1, 4, 5, 6, 7, 12, 13, 16, 17], [2, 3, 8, 9, 10, 11, 14, 15, 18, 19]]
⑤repartition

rdd.repartition()用于重新分区的Transformation操作,可以增加或减少分区数,通过shuffle来重新组织数据。允许动态调整RDD的分区数,可在数据分布不均匀时提高计算性能。

对比:coalesce在已有分区基础上尽量减少数据shuffle,而repartition会创建新分区并且使用full shuffle。

rdd = sc.parallelize(range(25),25)
print(rdd.glom().collect())
print(rdd.repartition(5).glom().collect())
print(rdd.coalesce(5).glom().collect())

输出结果:

[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [10], [11], [12], [13], [14], [15], [16], [17], [18], [19], [20], [21], [22], [23], [24]]
[[6, 8, 11, 15, 20, 21], [0, 9, 16, 18, 24], [2, 3, 7, 14, 19, 22], [1], [4, 5, 10, 12, 13, 17, 23]]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19], [20, 21, 22, 23, 24]]
⑥partitionBy

rdd.partitionBy()用于对PairRDD重新分区,是Transformation操作,可以根据指定的分区器对键值对数据进行重新分区,以更好地控制数据的分布。

data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
pair_rdd = sc.parallelize(data)
hash_partitioned_rdd = pair_rdd.partitionBy(2)
for partition, data in enumerate(hash_partitioned_rdd.glom().collect()):
    print(f"Partition {partition}: {data}")

输出结果:

Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]

6、缓存操作

如果多个任务在计算过程中共享同一个RDD作为中间数据,通过对其进行缓存,将其存储在内存中,可以显著加快计算速度。但是对RDD的缓存并不会立即生效,而是在该RDD第一次被计算出来时才会进行缓存。在不再需要某个RDD时,可以使用unpersist来释放缓存,而这个操作是立即执行的。这样可以有效地管理内存资源,避免不必要的缓存。
另一方面,缓存在提高计算速度的同时,并不会切断RDD的血缘依赖关系。因为缓存的数据可能存在某些分区的节点发生故障的情况,例如内存溢出或者节点损坏。在这种情况下,可以根据血缘关系重新计算受影响分区的数据,确保计算的正确性。
如果需要切断血缘关系,可以使用checkpoint来设置检查点,将RDD保存到磁盘中。与缓存类似,对RDD进行checkpoint同样不会立即生效,而是在该RDD第一次被计算出来时才会保存成检查点。通常,checkpoint适用于一些计算代价非常高昂的中间结果,或者在重复计算结果不可保证完全一致的情况下(例如使用zipWithIndex算子)。
对RDD进行缓存是优化Spark计算性能的有效手段,但需要根据具体情况灵活运用,以确保计算的准确性和效率。

①cache

rdd.cache()用于将RDD的计算结果缓存到内存中,以便在后续操作中重用,可以显著提高迭代算法等需要多次使用同一数据集的性能。rdd.cache使用存储级别MEMORY_ONLY,意味着如果内存不足,Spark可能会根据缓存数据的大小和可用内存的情况进行动态调整,例如将一部分或全部缓存的数据移除,以腾出内存供其他操作使用。

a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

print(mean_a)

输出结果:

4999.5
②persist

rdd.persist()用于将RDD中间结果缓存到内存或磁盘中,以便在后续操作中重用。与cache不同,persist允许用户指定不同的存储级别,以更灵活地管理缓存。存储级别即不同的数据缓存的位置和策略,可以是MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK(默认)等。

rdd.persist()写入磁盘的文件是临时文件,应用执行完成后就会被删除,可以使用rdd.unpersist()立即释放缓存。

from  pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

a.unpersist() 
print(mean_a)

输出结果:

4999.5
③checkpoint

rdd.checkpoint()用于将RDD中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,可以中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重新执行程序,减少开销。需要注意的是,checkpoint操作并不会马上被执行,而是在执行Action操作时才被触发。另外,checkpoint路径保存的文件是永久存在的,不会随着应用的结束而被删除。

sc.setCheckpointDir("./data/checkpoint/")
rdd = sc.parallelize(["a","b","c","d"],2)
rdd_idx = rdd.zipWithIndex() 
rdd_idx.checkpoint() 
rdd_idx.take(3)

输出结果:

[('a', 0), ('b', 1), ('c', 2)]

7、共享变量

共享变量主要用于在分布式计算中实现在任务之间共享数据,以提高性能和降低网络开销。广播变量(broadcast variables)和累加器(accumulators)是两个重要的分布式计算工具,用于在集群上共享数据和累积结果。

①broadcast

当需要在所有工作节点之间共享较小的只读数据集时,使用广播变量可以避免将该数据集多次传输到各个节点。这可以有效减少网络开销,提高性能。典型的应用场景包括在所有节点上使用相同的配置参数、字典或者映射表等。并且可以避免任务间重复传输,如果一个RDD需要在多个任务中使用,而且这个RDD的数据较小,使用广播变量可以避免在不同任务之间多次传输相同的数据。

#广播变量 broadcast 不可变,在所有节点可读
broads = sc.broadcast(100)
rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())
print(broads.value)

输出结果:

[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100
②accumulator

累加器主要用于执行在分布式任务中的“添加”和“合并”操作,通常用于聚合和计数等操作。
例如,可以用累加器来计算在整个集群上发生的某个特定事件的总次数;计算所有节点上某个变量的总和或平均值。

#累加器 只能在Driver上可读,在其它节点只能进行累加
total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)
rdd.foreach(lambda x:total.add(x))
total.value

输出结果:

45

四、总结

总的来说,PySpark适合初学者入门学习,由于python门槛不高,易于掌握,可以通过PySpark了解Spark的运行机制以及RDD算子的使用。但如果是需要几百台服务器才能运行的任务场景, PySpark的UDF(User Defined Functions)的性能差距肯定比不过Spark-Scala。

至于选什么语言,取决于业务需求。如果是处理简单的数据清洗聚合,且数据量非常大,用Scala会有性能优势,可以节约计算资源。如果需要处理较为复杂的算法模型,依赖于各种第三方包,那么使用Python会更好。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/357697.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SQL报错注入

君衍. 一、sqllabs第五关报错注入updatexml报错注入原理及思路 二、常见的报错函数三、floor报错注入原理1、概念2、函数回顾2.1 rand函数2.2 floor(rand(0)*2)函数2.3 group by函数2.4 count(*)函数2.5 函数综合报错 3、报错分析4、总结 一、sqllabs第五关报错注入 之前我在这…

Linux系列之查看cpu、内存、磁盘使用情况

查看磁盘空间 df命令用于显示磁盘分区上的可使用的磁盘空间。默认显示单位为KB。可以利用该命令来获取硬盘被占用了多少空间&#xff0c;目前还剩下多少空间等信息。使用df -h命令&#xff0c;加个-h参数是为了显示GB MB KB单位&#xff0c;这样更容易查看 Filesystem …

使用Hugging Face下载ImageNet

1. 创建Hugging Face账号&#xff0c;在个人中心的setting部分&#xff0c;生成Access Token 2. 使用在python命令行中运行&#xff1a; pip install datasets 此时需要输入token 3. 新建一个python文件 from datasets import load_dataset dset load_dataset(imagenet-1k…

http-server开启一个服务器

前言 在写前端页面中&#xff0c;经常会在浏览器运行HTML页面&#xff0c;从本地文件夹中直接打开的一般都是file协议&#xff0c;当代码中存在http或https的链接时&#xff0c;HTML页面就无法正常打开&#xff0c;为了解决这种情况&#xff0c;需要在在本地开启一个本地的服务…

vuex store,mutations,getters,actions

文章目录 1.vuex概述2.构建vuex【多组件数据共享】环境Son1.vueSon2.vueApp.vue 3.创建一个空仓库4.如何提供&访问vuex的数据①核心概念 - state状态1.通过store直接访问2.通过辅助函数简化代码 ②核心概念 - mutations&#xff08;粗略&#xff09; 5.核心概念 - mutation…

如何在 VM 虚拟机中安装 Kail Linux 2023.4 操作系统保姆级教程(附链接)

一、VMware Workstation 虚拟机 先得安装 VM 虚拟机&#xff0c;没有的可以参考这篇文章安装 VM 虚拟机 如何在 VM 虚拟机中安装 Win10 操作系统保姆级教程&#xff08;附链接&#xff09;https://eclecticism.blog.csdn.net/article/details/135713915 二、Kail 镜像 进入…

树控件、下拉框、文本框常用测试用例

01 控件的测试外观操作 1&#xff09;项目中的所有树是否风格一致 2&#xff09;树结构的默认状态是怎样的。比如默认树是否是展开&#xff0c;是展开几级&#xff1f; 是否有默认的焦点&#xff1f;默认值是什么&#xff1f;展开的节点图标和颜色&#xff1f; 3&#xff09…

关于如何将Win幻兽帕鲁服务端存档转化为单人本地存档的一种方法(无损转移)

本文转自博主的个人博客&#xff1a;https://blog.zhumengmeng.work,欢迎大家前往查看。 原文链接&#xff1a;点我访问 **起因&#xff1a;**最近大火的开放世界缝合体游戏幻兽帕鲁的大火也是引起了博主的注意&#xff0c;然后博主和周边小伙伴纷纷入手&#xff0c;博主也是利…

谷歌人工智能视频生成器-LUMIERE(未开源)

Google重磅发布视频生成模型Lumiere 据说后续会开源 亮点1.支持文本到视频与图像到视频 亮点2.画风迁移 亮点3.运动蒙版 亮点4.视频编辑 亮点5.视频修复 谷歌视频模型可以生成80帧的片段&#xff01;不仅画质好、质量高&#xff0c;而且时长更长。 视频局部编辑 这项功能可以…

07.领域驱动设计:了解3种常见微服务架构模型的对比和分析

目录 1、概述 2、整洁架构 3、六边形架构 4、三种微服务架构模型的对比和分析 5、从三种架构模型看中台和微服务设计 5.1 中台建设要聚焦领域模型 5.2 微服务要有合理的架构分层 5.2.1 项目级微服务 5.2.2 企业级中台微服务 5.3 应用和资源的解耦与适配 6、总结 1、概…

C语言-动态内存申请

一、动态分配内存的概述 在数组一章中&#xff0c;介绍过数组的长度是预先定义好的&#xff0c;在整个程序中固定不变&#xff0c;但是在实际的编程中&#xff0c;往往会发生这种情况&#xff0c;即所需的内存空间取决于实际输入的数据&#xff0c;而无法预先确定 。为了解决…

一、创建Vue3项目

1. 下载 node.js 下载地址&#xff1a;https://nodejs.org/zh-cn 优先选择 16 版本; node -v || node -version 可以检查本地 node.js 版本 2. 设置淘宝镜像源 npm config set registry https://registry.npmmirror.com/ 设置淘宝镜像源 npm config get registry 查看当前镜像…

常见的网络安全威胁和防护方法

随着数字化转型和新兴技术在各行业广泛应用&#xff0c;网络安全威胁对现代企业的业务运营和生产活动也产生了日益深远的影响。常见的网络安全威胁通常有以下几种&#xff1a; 1. 钓鱼攻击 攻击者伪装成合法的实体&#xff08;如银行、电子邮件提供商、社交媒体平台等&#xf…

解析PDF二维码:数字时代文件管理的创新之道

随着数字时代的来临&#xff0c;文件管理方式正经历着翻天覆地的变革。在这个变革的浪潮中&#xff0c;PDF二维码作为一种创新的技术手段&#xff0c;正逐渐引起人们的关注。本文将深入探讨PDF二维码的概念、应用领域以及在文件管理中的前景。 一、PDF二维码的概念 PDF二维码…

mcu专用看门狗复位芯片(如MAX706)

mcu专用看门狗复位芯片&#xff08;如MAX706&#xff09; 为什么要使用电压复位芯片RESET引脚WDO引脚MR引脚WDI引脚 国产替代型号应用电路1 推荐电路&#xff08;用一个跳线帽使能/关闭看门狗功能&#xff0c;调试MCU时防止看门狗芯片随便触发复位功能&#xff09;&#xff0c;…

Linux操作系统概述

操作系统&#xff08;Operating System&#xff09;的定义 操作系统&#xff0c;是指直接管理系统硬件和资源&#xff08;如 CPU、内存和存储空间&#xff09;的软件。 操作系统的基本功能 ①统一管理计算机资源&#xff1a;处理器资源&#xff0c;IO设备资源&#xff0c;存储…

Flutter的安装与环境配置

一、下载安装Futter&#xff1a; 1、Flutter中文文档&#xff1a; 安装和环境配置 - Flutter 中文文档 - Flutter 中文开发者网站 - Flutter 2、下载 Futter SDK&#xff1a; Flutter中文文档 里面有&#xff0c;下载完成之后找个文件夹解压出来&#xff0c;最好不要将 Flu…

MybatisPlus应用参数类型不一致导致索引失效

业务场景介绍 在电商项目中&#xff0c;有一个商品表【t_goods】和一个商品sku表【t_goods_sku】,具体表结构如下所示&#xff1a; CREATE TABLE t_goods (id bigint NOT NULL AUTO_INCREMENT COMMENT 主键id,brand_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_…

Java链表(2)

&#x1f435;本篇文章将对双向链表进行讲解&#xff0c;模拟实现双向链表的常用方法 一、什么是双向链表 双向链表在指针域上相较于单链表&#xff0c;每一个节点多了一个指向前驱节点的引用prev以及多了指向最后一个节点的引用last&#xff1a; 二、双向链表的模拟实现 首先…

Python实现时间序列分析AR定阶自回归模型(ar_select_order算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 时间序列分析中&#xff0c;AR定阶自回归模型&#xff08;AR order selection&#xff09;是指确定自回…
最新文章