[spark] RDD 编程指南(翻译)

Overview

从高层次来看,每个 Spark 应用程序都包含一个driver program,该程序运行用户的main方法并在集群上执行各种并行操作。

Spark 提供的主要抽象是 resilient distributed dataset(RDD),它是跨集群节点分区的元素集合,可以并行操作。RDD 是通过从 Hadoop 文件系统中的文件开始创建的。用户还可以要求 Spark 将 RDD 持久保存在内存中,从而使其能够在并行操作中高效地重用。最后,RDD 会自动从节点故障中恢复。

Spark 中提供的第二抽象是 shared variables ,他可以用在并行操作中。默认情况下,当 Spark 将函数作为一组任务(task)在不同节点上并行运行时,它会将函数中使用的每个变量的副本携带给每个任务。有时,变量需要在任务之间共享或者在driver program和任务之间共享。Spark 支持两种类型的 shared variables,一是 broadcast variables 可用于在所有节点的内存中缓存一个值,二是 accumulators

,他是仅“added”的变量,例如counters和sums。

Resilient Distributed Datasets (RDDs)

Spark围绕 RDD 的概念展开,RDD是可以并行操作的元素的容错集合。有两种方法可以创建RDD:在driver program中并行化现有集合,或者引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据源。

Parallelized Collections

并行化集合是通过在驱动程序中的现有集合上调用JavaSparkContext的并行化方法创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字1到5的并行化集合:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

一旦创建,分布式数据集(distData)就可以并行操作。并行集合的一个重要参数是将数据集划分为的分区数量。 Spark 将为集群的每个分区(partition)运行一个任务(task),任务将分配给节点执行。可以手动指定分区数或使用默认值。

RDD Operations

RDD 支持两种类型的操作:

  • transformations(从现有dataset创建新dataset)。例如,map 是一种transformations,它将每个dataset每个元素传递给函数并返回表示结果的新 RDD
  • actions(在对dataset运行计算后将值返回给driver program)。例如,reduce 是一个使用某个函数聚合 RDD 的所有元素并将最终结果返回给driver program的操作

Spark中的所有transformations都是 lazy 的,因为它们不会立即计算结果。相反,它们只记住应用于某些基本 dataset(例如文件)的transformations。只有当操作需要将结果返回给driver program时,transformations 才会被计算。这样的设计使得Spark能够更高效地运行。例如,我们可以意识到,通过map创建的dataset将在reduce中使用,并且只将reduce的结果返回给driver program,而不是更大的映射dataset。

默认情况下,每次对transform后的RDD运行操作时,都可能会被重新计算。但是,您也可以使用持久(或缓存)方法将RDD持久化在内存中,在这种情况下,Spark将保留集群中的元素,以便在您下次查询时更快地访问它。还支持在磁盘上持久化RDD,或跨多个节点复制。如下图所示,如果不cache/persist 任何内容,那么每次您需要输出时(当您调用诸如“count”之类的操作时),都会从磁盘读取数据并完成操作。您可以在读取后进行缓存,然后所有其他操作都会跳过读取并从缓存的数据开始。

在这里插入图片描述

为了说明 RDD 基础知识,请看下面的简单程序:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行定义了来自外部文件的基本RDD。该数据集没有加载到内存中,也没有以其他方式对其进行操作:行只是指向文件的指针。第二行将lineLengths定义为map transformation的结果。同样,由于 lazy,lineLengths不会立即计算。最后,我们运行 reduce,这是一个操作。此时,Spark将计算分解为在不同机器上运行的任务,每台机器都运行其 map 和本地数据的 reduce,只将其答案返回给driver program。

如果 lineLengths 可能被再次使用,可以增加下面代码

lineLengths.persist(StorageLevel.MEMORY_ONLY());

在reduce之前,这会导致lineLengths在第一次计算后保存在内存中。

Understanding closures

关于Spark,更难的事情之一是理解跨集群执行代码时变量和方法的范围和生命周期。修改超出其范围的变量的RDD操作可能是混淆的常见来源。在下面的示例中,我们将查看使用foreach()增加计数器的代码,但其他操作也可能出现类似的问题。

Example

考虑下面简单的计算,将RDD元素sum。根据是否在同一JVM中执行,它的行为可能会有所不同。一个常见的例子是在本地模式下运行Spark(–master=local[n])与将Spark应用程序部署到集群(例如,通过Spark提交到YARN):

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);
Local vs. cluster modes

上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务都由执行器执行。在执行之前,Spark计算任务的 closures。closures 是执行器在RDD上执行计算(在本例中为foreach())时必须可见的变量和方法。此closures 被序列化并发送给每个执行器。

发送给每个excutor的closure中的变量现在是副本,因此,当在foreach函数中引用counter时,它不再是driver program上的count。driver program的内存中仍然有一个counter,但不再对excutor可见!

在本地模式下,在某些情况下,foreach 函数实际上将在与driver program相同的 JVM 中执行,并且将引用相同的原始counter,并且可能会更新它。

为了确保在这些场景中定义良好的行为,应该使用 Accumulator.。Spark中的 Accumulator专门用于提供一种机制,用于在集群中跨worker node 执行时安全地更新变量.

一般来说,closure——像循环或本地定义的方法这样的构造——不应该被用来改变一些全局状态。Spark不定义或保证对从闭包外部引用的对象的更改行为。执行此操作的一些代码可能在本地模式下工作,但这只是偶然的,这样的代码在分布式模式下不会按预期运行。如果需要一些全局聚合,请使用Accumulator。

Printing elements of an RDD

另一个常见的习惯用法是尝试使用rdd. foreach(println)或rdd.map(println)打印出RDD的元素。在单台机器上,这将生成预期的输出并打印RDD的所有元素。但是,在集群模式下,excutor 调用的stdout的输出现在写入执行程序的stdout,而不是driver program上的stdout,因此driver program上的stdout不会显示这些!要在驱动程序上打印所有元素,可以使用 collect() 首先将RDD带到 driver program 节点,因此使用:rdd.collect().foreach(println).

Working with Key-Value Pairs

虽然大多数Spark操作适用于包含任何类型对象的RDD,但一些特殊操作仅适用于键值对的RDD。最常见的是分布式“shuffle”操作,例如通过键对元素进行分组或聚合,reduceByKey和sortByKey等。

Shuffle operations

Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark重新分配数据的机制,以便在分区之间以不同的方式分组。这通常涉及跨executor和机器复制数据,这使得shuffle成为一项复杂且成本高昂的操作。

为了理解在shuffle过程中会发生什么,我们可以考虑一个例子,这个例子中有一个reduceByKey 操作,它生成一个新的RDD,其中一个键的所有值都被组合成一个tuple,这个tuple就是键和对与该键相关的所有值执行一个reduce函数的结果。挑战在于,单个键的所有值不一定都位于同一分区,甚至同一台机器上,但它们必须位于同一位置才能计算结果。

对于大多数操作,Spark不会自动地将数据重新分布到特定的节点或分区以满足特定操作的需要。相反,每个任务通常只处理一个分区内的数据。然而,对于像reduceByKey这样的操作,Spark需要将具有相同键(key)的所有值(value)聚合在一起以进行计算。这意味着,如果这些值分布在不同的分区中,Spark必须执行一个全局的重组操作(all-to-all operation),这个过程被称为shuffle。在shuffle过程中,Spark会执行以下步骤:

  1. 读取所有分区的数据,以找出每个键对应的所有值。
  2. 将具有相同键的值跨分区传输到相同的节点,以便可以对它们进行聚合。
  3. 在每个节点上,对每个键的所有值进行最终的聚合计算,得到每个键的最终结果。

可能导致随机播放的操作包括repartition operations like repartition and coalesce‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Performance Impact

Shuffle是一项昂贵的操作,因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织shuffle的数据,Spark生成一组任务——map 任务来组织数据,以及一组reduce任务来聚合数据。这个术语来自MapReduce,与Spark的map和reduce操作没有直接关系。

从内部来看,单个map任务的结果保存在内存中直到内存放不下。然后,根据目标分区对它们进行排序并写入单个文件。在reduce端,任务读取相关的排序block

某些shuffle操作可能会消耗大量的堆内存,因为它们使用内存中的数据结构在传输数据之前或之后组织数据。具体来说,reduceByKey和aggregateByKey在map端创建这些结构,而’ByKey操作在reduce端生成这些结构。当数据在内存放不下时,spark会将这些数据spill到磁盘,从而导致磁盘IO的额外开销和垃圾回收机制的增加。

Shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始,这些文件将被保留,直到相应的RDD不再使用并被垃圾收集掉。这样做是为了在重新计算lineage时不需要重新创建随机文件。垃圾收集可能只在很长一段时间后发生,如果应用程序保留对这些RDD的引用,或者如果GC不经常启动。这意味着长时间运行的Spark作业可能会消耗大量磁盘空间。临时存储目录在配置Spark上下文时由park. local.dir配置参数指定。

RDD Persistence

Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)dataset。当您持久化RDD时,每个节点都将其计算的任何分区存储在内存中,并在该dataset(或从该dataset派生的dataset)的其他操作中重用它们。这使得未来的操作更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

您可以使用RDD上的persist() cache()方法将其标记为持久化。第一次在操作中计算时,它将保存在节点的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的RDD都可以使用不同的storage level,来存储,例如,允许您将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的Java对象(以节省空间),跨节点复制它。cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。

Spark还会在shuffle操作中自动持久化一些中间数据(例如,reduceByKey),即使用户没有调用persist。这样做是为了避免在shuffle期间节点发生故障时重新计算整个input。如果用户计划重用新生成的RDD,我们仍然建议他们在生成的RDD上调用persist。

Which Storage Level to Choose?

Spark的存储级别旨在在内存使用和CPU效率之间提供不同的权衡。我们建议通过以下过程来选择一个:

  • 如果您的RDD适合默认存储级别(MEMORY_ONLY),请保持这样。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。
  • 如果没有,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库,以使对象更加节省空间,但访问速度仍然相当快。(Java和Scala)
  • 不要spill到磁盘,除非计算数据集的函数很重,或者它们过滤了大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
  • 如果您想要快速故障恢复(例如,如果使用Spark处理来自Web应用程序的请求),请使用replicated 的存储级别。所有存储级别都通过重新计算丢失的数据提供完全的容错能力,但复制的存储级别允许您继续在RDD上运行任务,而无需等待重新计算丢失的分区。

Removing Data

Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧的数据分区。如果您想手动删除RDD而不是等待它从缓存中删除,请使用RDD.unpersist()方法。请注意,此方法默认不阻塞。要在释放资源之前阻塞,请在调用此方法时指定blocking=true。

Shared Variables

通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,并且远程机器上的变量的更新不会传播回driver program。跨任务支持通用的读写共享变量将是低效的。然而,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:broadcast variables and accumulators.

Broadcast Variables

广播变量允许程序员将只读变量缓存在每台机器上,而不是将其副本与task一起发送。例如,它们可以用来以有效的方式为每个节点提供大型输入数据集的副本,减少了数据传输的开销从task粒度下降到节点粒度。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。

Spark action通过一组stage执行,由分布式“shuffle”操作分隔。Spark自动广播每个stage内task所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着显式创建广播变量仅在跨多个stage的task需要相同数据或以反序列化形式缓存数据很重要时才有用。

广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是v的包装器,可以通过调用value方法访问它的值。下面的代码显示了这一点:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

创建广播变量后,应该在集群上运行的任何函数中使用它而不是值v,这样v就不会多次发送到节点。此外,对象v在广播后不应该被修改,以确保所有节点都获得相同的广播变量值(例如,如果变量稍后传送到新加入的节点)。

要释放广播变量复制到执行器上的资源,请调用.unpersist()。如果广播之后再次使用,它将被重新广播。要永久释放广播变量使用的所有资源,请调用.destroy()。之后广播变量就不能使用了。请注意,这些方法默认情况下不会阻塞。要阻塞直到资源被释放,请在调用它们时指定blocking=true。

Accumulators

Accumulators是仅通过关联和交换运算“added”的变量,因此可以有效地支持并行。它们可用于实现计数器(如在 MapReduce 中)或求和。 Spark 原生支持数字类型的累加器,程序员可以添加对新类型的支持。

作为用户,您可以创建命名或未命名的累加器。如下图所示,修改该累加器的阶段将在Web UI中显示一个命名累加器(在本例中为计数器)。Spark在“任务”表中显示由任务修改的每个累加器的值。

Accumulators in the Spark UI

然后,在集群上运行的task可以使用add方法add到Accumulators。但是,他们无法读取其值。只有driver program可以使用其value方法读取累加器的值。

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

对于仅在action中执行的Accumulators更新,Spark保证每个任务对Accumulators的更新只会应用一次,即重新启动的任务不会更新值。在transformations中,用户应该知道,如果重新执行任务或作业阶段,每个任务的更新可能会应用不止一次。

累加器不会改变 Spark 的惰性求值模型。如果它们是在 RDD 的操作中更新的,则只有当 RDD 作为action的一部分进行计算时,它们的值才会更新。因此,在像 map() 这样的惰性转换中进行累加器更新时,不能保证执行。下面的代码片段演示了这个属性:

LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

reference

https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/415247.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】结构体类

文章目录 问题提出一、结构体1.1结构体的声明1.1.1正常定义的结构体1.1.2在声明结构体的同时声明变量1.1.3typedef1.1.4成员变量 1.2结构体成员变量的使用1.2.1成员运算符 .1.2.2成员运算符 -> 1.3内存对齐1.3.1什么是内存对齐1.3.2内存对齐原则1.3.3结构体成员的定义顺序 1…

ISP代理是什么?怎么用?

在跨境出海业务中&#xff0c;代理IP对于您的在线任务至关重要&#xff0c;尤其是对于那些运行多个帐户的人来说。为您的帐户选择正确类型的代理对于确保帐户安全非常重要&#xff0c;劣质的IP容易使账号遭受封号风险。IPFoxy的多种代理IP类型应用范围各有侧重&#xff0c;其中…

Java 小项目开发日记 02(用户接口的开发)

Java 小项目开发日记 02&#xff08;用户接口的开发&#xff09; 项目目录 配置文件&#xff08;pom.xml&#xff09; <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation&q…

2007-2022年上市公司绿色化转型数据(仅结果)

2007-2022年上市公司绿色化转型数据&#xff08;仅结果&#xff09; 1、时间&#xff1a;2007-2022年 2、范围&#xff1a;上市公司 3、来源&#xff1a;上市公司年报、上市公司社会责任报告、上市公司网站信息 4、指标&#xff1a;证券代码、年份、绿色化转型 5、方法说明…

【JAVA日志】关于日志系统的架构讨论

目录 1.日志系统概述 2.环境搭建 3.应用如何推日志到MQ 4.logstash如何去MQ中取日志 5.如何兼顾分布式链路追踪 1.日志系统概述 关于日志系统&#xff0c;其要支撑的核心能力无非是日志的存储以及查看&#xff0c;最好的查看方式当然是实现可视化。目前市面上有成熟的解决…

【Go语言】Go语言中的数组

Go语言中的数组 1 数组的初始化和定义 在 Go 语言中&#xff0c;数组是固定长度的、同一类型的数据集合。数组中包含的每个数据项被称为数组元素&#xff0c;一个数组包含的元素个数被称为数组的长度。 在 Go 语言中&#xff0c;你可以通过 [] 来标识数组类型&#xff0c;但…

瑞_Redis_Redis命令

文章目录 1 Redis命令Redis数据结构Redis 的 key 的层级结构1.0 Redis通用命令1.0.1 KEYS1.0.2 DEL1.0.3 EXISTS1.0.4 EXPIRE1.0.5 TTL 1.1 String类型1.1.0 String类型的常见命令1.1.1 SET 和 GET1.1.2 MSET 和 MGET1.1.3 INCR和INCRBY和DECY1.1.4 SETNX1.1.5 SETEX 1.2 Hash类…

Java+SpringBoot+Vue+MySQL:狱内罪犯危险性评估系统全栈开发

✍✍计算机毕业编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java、…

5.WEB渗透测试-前置基础知识-常用的dos命令

内容参考于&#xff1a; 易锦网校会员专享课 上一篇内容&#xff1a;4.WEB渗透测试-前置基础知识-快速搭建渗透环境&#xff08;下&#xff09;-CSDN博客 常用的100个CMD指令 1.gpedit.msc—–组策略 2. sndrec32——-录音机 3. Nslookup——-IP地址侦测器 &#xff0c;是一个…

ruoyi框架学习

RBAC模型 数据字典 拦截器 token没有&#xff0c;submit&#xff0c;request.js中&#xff0c;前端前置拦截器&#xff0c;响应拦截器 后台 注解

35岁了,还能转行做鸿蒙开发吗?

随着互联网行业的蓬勃发展时&#xff0c;不止从何时网上开始就有了&#xff1a;“程序员30岁危机、35岁中年危机”这种类似的话题&#xff0c;可以说影响了不少程序员。 人们一般常说的是三十而立&#xff0c;一个人应该对生活、职业、个人信仰等方面有了明确的认识和规划&…

【Linux】HTTP协议

目录 预备知识 认识url urlencode和urldecode http和https的区别 http request 和 http response http request格式: http reponse格式&#xff1a; HTTP的请求方法 HTTP的状态码 HTTP常见Header cookie文件 cookie是什么 问题 解决方案 预备知识 认识url 平时…

pytorch 数据集处理以及模型训练

1.基础类说明 为了统一数据的加载和处理代码&#xff0c;pytorch提供了两个类&#xff0c;用来处理数据加载&#xff1a; torch.utils.data.DataLoader torch.utils.data.Dataset 通过这两个类&#xff0c;可以使数据集加载和预处理代码&#xff0c;与模型训练代码脱钩…

10.网络游戏逆向分析与漏洞攻防-游戏网络架构逆向分析-接管游戏发送数据的操作

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;接管游戏连接服务器的操作 码云地址&#xff08;master 分支&#xff09;&#xff1a;染指/titan 码云版本号&#xff1a;00820853d5492fa7b6e32407d46b5f9c01930ec6 代码下载地址&#xff0c;在 ti…

RTF文件格式解析(二)图像问题

图片 一个RTF文件可以包含由其他应用创建的图象。这些图象可以是16进制(默认的)或2进制格式。图象属于目标引用&#xff0c;由\pict 控制字开始。如后面的例子中将描述的&#xff0c;\pict关键字应在\*\shppict引用控制关键字之后。一个图象引用具有如下语法&#xff1a; <p…

frp 内网穿透 linux部署版

frp 内网穿透 linux部署版 前提安装 frp阿里云服务器配置测试服务器配置访问公网 前提 使用 frp&#xff0c;您可以安全、便捷地将内网服务暴露到公网&#xff0c;通过访问公网 IP 直接可以访问到内网的测试环境。准备如下&#xff1a; 公网 IP已部署好的测试服务 IP:端口号阿…

v68.指针

1.取地址运算 1.1 1.2 打印出变量的地址&#xff0c;需要使用 %p&#xff0c;注意后面加运算符 & 。注意输出地址的代码格式。%p会把这个值作地址来输出&#xff0c;输出的结果前面会加0x&#xff0c;并且以16进制的方式来输出地址 注意int 的大小是否和地址大小相同取决…

嵌入式 Linux 下的 LVGL 移植

目录 准备创建工程修改配置修改 lv_drv_conf.h修改 lv_conf.h修改 main.c修改 Makefile 编译运行更多内容 LVGL&#xff08;Light and Versatile Graphics Library&#xff0c;轻量级通用图形库&#xff09;是一个轻量化的、开源的、在嵌入式系统中广泛使用的图形库&#xff0c…

算法:动态规划全解(上)

一、动态规划初识 1.介绍 动态规划&#xff0c;英文&#xff1a;Dynamic Programming&#xff0c;简称DP&#xff0c;如果某一问题有很多重叠子问题&#xff0c;使用动态规划是最有效的。所以动态规划中每一个状态一定是由上一个状态推导出来的。 例如&#xff1a;有N件物品…

逆向案例三:动态xhr包中AES解密的一般步骤,以精灵数据为例

补充知识&#xff1a;进行AES解密需要知道四个关键字&#xff0c;即密钥key,向量iv,模式mode,填充方式pad 一般网页AES都是16位的&#xff0c;m3u8视频加密一般是AES-128格式 网页链接:https://www.jinglingshuju.com/articles 进行抓包结果返回的是密文&#xff1a; 一般思…