【Spark精讲】一文讲透Spark RDD

MapReduce的缺陷

MR虽然在编程接口的种类和丰富程度上已经比较完善了,但这些系统普遍都缺乏操作分布式内存的接口抽象,导致很多应用在性能上非常低效 。 这些应用的共同特点是需要在多个并行操 作之间重用工作数据集 ,典型的场景就是机器学习和图应用中常用的迭代算法 (每一步对数据 执行相似的函数) 。

RDD

RDD是只读的。

RDD五大属性:①分区、②依赖、③计算函数、④分区器、⑤首选运行位置。

RDD 则是直接在编程接口层面提供了一种高度受限的共享内存模型,如图下图所示。 RDD 是 Spark 的核心数据结构,全称是弹性分布式数据集 (Resilient Distributed Dataset),其本质是一种分布式的内存抽象,表示一个只读的数据分区( Partition)集合 。一个 RDD 通常只能通过其他的 RDD转换而创建。 RDD 定义了各种丰富的转换操作(如 map、 join和 filter等),通过这些转换操作,新的 RDD 包含了如何从其他 RDD 衍生所必需的信息,这些信息构成了 RDD 之间的依赖关系( Dependency) 。 依赖具体分为两种, 一种是窄依赖, RDD 之间分区是一一对应的;另一种是宽依赖,下游 RDD 的每个分区与上游 RDD (也称之为父 RDD)的每个分区都有关,是多对多的关系 。 窄依赖中的所有转换操作可以通过类似管道(Pipeline)的方式全部执行,宽依赖意味着数据需要在不同节点之间 Shuffle 传输 。

RDD计算的时候会通过一个 compute函数得到每个分区的数据。 若 RDD是通过已有的文件系统构建的,则 compute 函数读取指定文件系统中的数据;如果 RDD 是通过其他 RDD 转换而来的,则 compute 函数执行转换逻辑,将其他 RDD 的数据进行转换。 RDD 的操作算子包括两 类, 一类是 transformation,用来将 RDD 进行转换,构建 RDD 的依赖关系;另一类称为 action, 用来触发 RDD 的计算,得到 RDD 的相关计算结果或将 RDD 保存到文件系统中。

在 Spark 中, RDD 可以创建为对象 ,通过对象上的各种方法调用来对 RDD 进行转换 。 经过一系列的 transformation逻辑之后,就可以调用 action来触发 RDD 的最终计算。 通常来讲, action 包括多种方式,可以 是 向应用程序返回结果( show、 count 和 collect等),也可以是向存 储系统保存数据(saveAsTextFile等)。 在Spark中,只有遇到 action,才会真正地执行 RDD 的计算(注:这被称为惰性计算,英文为 LazyEvqluation),这样在运行时可以通过管道的方式传输多个转换 。

总结而言,基于 RDD 的计算任务可描述为从稳定的物理存储(如分布式文件系统 HDFS) 中加载记录,记录被传入由一组确定性操作构成的 DAG (有向无环图),然后写回稳定存储。 RDD还可以将数据集缓存到内存中,使得在多个操作之间可以很方便地重用数据集。 总的来讲,RDD 能够很方便地支持 MapReduce 应用、关系型数据处理、流式数据处理(Stream Processing) 和迭代型应用(图计算、机器学习等)。

在容错性方面,基于 RDD 之间的依赖, 一个任务流可以描述为 DAG。 在实际执行的时候, RDD 通过 Lineage 信息(血缘关系)来完成容错,即使出现数据分区丢失,也可以通过 Lineage 信息重建分区。 如果在应用程序中多次使用同一个 RDD,则可以将这个 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据 Lineage 信息得到分区的数据,在后续其他地方用到这个 RDD 的时候,会直接从缓存处读取而不用再根据 Lineage信息计算,通过重用达到提升性能的目的 。 虽然 RDD 的 Lineage 信息可以天然地实现容错(当 RDD 的某个分区数据计算失败或丢 失时,可以通过 Lineage信息重建),但是对于长时间迭代型应用来说,随着迭代的进行,RDD 与 RDD之间的 Lineage信息会越来越长,一旦在后续迭代过程中出错,就需要通过非常长的 Lineage信息去重建,对性能产生很大的影响。 为此,RDD 支持用 checkpoint机制将数据保存到持久化的存储中,这样就可以切断之前的 Lineage信息,因为 checkpoint后的 RDD不再需要知道它的父 RDD ,可以从 checkpoint 处获取数据。

DAG

顾名思义,DAG 是一种“图”,图计算模型的应用由来已久,早在上个世纪就被应用于数据库系统(Graph databases)的实现中。任何一个图都包含两种基本元素:节点(Vertex)和边(Edge),节点通常用于表示实体,而边则代表实体间的关系。

DAG,有向无环图,Directed Acyclic Graph的缩写,常用于建模。Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。
基础概念
介绍DAGScheduler中的一些概念,有助于理解后续流程。

  • Job:调用RDD的一个action,如count,即触发一个Job,spark中对应实现为ActiveJob,DAGScheduler中使用集合activeJobs和jobIdToActiveJob维护Job
  • Stage:代表一个Job的DAG,会在发生shuffle处被切分,切分后每一个部分即为一个Stage,Stage实现分为ShuffleMapStage和ResultStage,一个Job切分的结果是0个或多个ShuffleMapStage加一个ResultStage
  • TaskSet:一组Task
  • Task:最终被发送到Executor执行的任务,和stage的ShuffleMapStage和ResultStage对应,其实现分为ShuffleMapTask和ResultTask

把 DAG 图反向解析成多个阶段,每个阶段中包含多个任务,每个任务会被任务调度器分发给工作节点上的 Executor 上执行。

Web UI上DAG举例 

Checkpoint

RDD的依赖

checkpoint先了解一下RDD的依赖,比如计算wordcount:

scala>  sc.textFile("hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd").flatMap(_.split("\\\t")).map((_,1)).reduceByKey(_+_);
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:28
 
scala> res0.toDebugString
res1: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:28 []
 +-(2) MapPartitionsRDD[3] at map at <console>:28 []
    |  MapPartitionsRDD[2] at flatMap at <console>:28 []
    |  hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd MapPartitionsRDD[1] at textFile at <console>:28 []
    |  hdfs://leen:8020/user/hive/warehouse/tools.db/cde_prd HadoopRDD[0] at textFile at <console>:28 []

1、在textFile读取hdfs的时候就会先创建一个HadoopRDD,其中这个RDD是去读取hdfs的数据key为偏移量value为一行数据,因为通常来讲偏移量没有太大的作用所以然后会将HadoopRDD转化为MapPartitionsRDD,这个RDD只保留了hdfs的数据。
2、flatMap 产生一个RDD MapPartitionsRDD
3、map 产生一个RDD MapPartitionsRDD
4、reduceByKey 产生一个RDD ShuffledRDD

如何建立checkPoint

1、首先需要用sparkContext设置hdfs的checkpoint的目录,如果不设置使用checkpoint会抛出异常:

scala> res0.checkpoint
org.apache.spark.SparkException: Checkpoint directory has not been set in the SparkContext
 
scala> sc.setCheckpointDir("hdfs://leen:8020/checkPointDir")

sc.setCheckpointDir("hdfs://leen:8020/checkPointDir")
执行了上面的代码,hdfs里面会创建一个目录:
/checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d

2、然后执行checkpoint

scala> res0.checkpoint
1

发现hdfs中还是没有数据,说明checkpoint也是个transformation的算子。

scala> res0.count()
INFO ReliableRDDCheckpointData: Done checkpointing RDD 4 to hdfs://leen:8020/checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4, new parent is RDD 5
res5: Long = 73689
1
2
3
hive > dfs -du -h /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4;
147    147    /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/_partitioner
1.2 M  1.2 M  /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/part-00000
1.2 M  1.2 M  /checkPointDir/9ae90c62-a7ff-442a-bbf0-e5c8cdd7982d/rdd-4/part-00001

但是执行的时候相当于走了两次流程,前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中,如下:

rdd.cache()
rdd.checkpoint()
rdd.collect

在源码中,在checkpoint的时候强烈建议先进行cache,并且当你checkpoint执行成功了,那么前面所有的RDD依赖都会被销毁,如下:

 /**
   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
   * directory set with `SparkContext#setCheckpointDir` and all references to its parent
   * RDDs will be removed. This function must be called before any job has been
   * executed on this RDD. It is strongly recommended that this RDD is persisted in
   * memory, otherwise saving it on a file will require recomputation.
   */
 
  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    // NOTE: we use a global lock here due to complexities downstream with ensuring
    // children RDD partitions point to the correct parent partitions. In the future
    // we should revisit this consideration.
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

RDD依赖被销毁

scala> res0.toDebugString
res6: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:28 []
 |  ReliableCheckpointRDD[5] at count at <console>:30 []

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/276466.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp中uview组件库丰富LoadingPage 加载页

目录 基本使用 #显示或隐藏 #文字内容 #动画模式 #动画图片 #文字颜色 #文字大小 #图标大小 2.0.32 #背景颜色 #图标颜色 API #Props 基本使用 <template><view><u-loading-page></u-loading-page></view> </template>#显示或…

golang第五卷---包以及常用内置包归纳

包以及常用内置包归纳 包的概念math包time包sync包 Go 语言官方的包文档网站&#xff1a;包文档 包的概念 Go语言是使用包来组织源代码的&#xff0c;包&#xff08;package&#xff09;是多个 Go 源码的集合&#xff0c;是一种高级的代码复用方案。 任何源代码文件必须属于某…

Spark 集群搭建

文章目录 搭建前准备安装搭建解压并重命名环境变量配置配置文件yarn-site.xmlspark-env.sh 官网求 π(PI) 案例启动spark-shell通过浏览器查看显示查看 Spark 的网页信息展示 搭建前准备 下载地址&#xff1a;Index of /dist/spark (apache.org) 配置好 hadoop 环境&#xff…

2022年全国职业院校技能大赛(高职组)“云计算”赛项赛卷①第二场次:容器云

2022年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算”赛项赛卷1 第二场次&#xff1a;容器云&#xff08;40分&#xff09; 目录 2022年全国职业院校技能大赛&#xff08;高职组&#xff09; “云计算”赛项赛卷1 第二场次&#xff1a;容器云&#xff08…

开源预约挂号平台 - 从0到上线

文章目录 开源预约挂号平台 - 从0到上线演示地址源码地址可以学到的技术前端技术后端技术部署上线开发工具其他技术业务功能 项目讲解前端创建项目 - 安装PNPM - 使用VSCODE - 安装插件首页顶部与底部 - 封装组建 - 使用scss左右布局中间内容部分路由 - vue-routerBANNER- 走马…

阿里云30个公共云地域、89个可用区、5个金融云和政务云地域

阿里云基础设施目前已面向全球四大洲&#xff0c;公共云地域开服运营30个公共云地域、89个可用区&#xff0c;此外还拥有5个金融云、政务云地域&#xff0c;并且致力于持续的新地域规划和建设&#xff0c;从而更好的满足用户多样化的业务和场景需求。伴随着基础设施的加速投入和…

【机器学习】深度学习概论(一)

经典的机器学习算法与深度学习对比 一、机器学习面临的挑战 1.1 机器学习算法用于各种应用问题时所利用的典型特征 1.2 采用人工特征的机器学习算法处理流程 1.3 人工设计特征面临的问题 二、 深度学习技术 2.1 采用受限玻尔兹曼机和逐层训练的方法训练深层网络 2.2 自动编码器…

首次使用TypeScript,报错:无法重新声明块级范围变量(声明变量报错)

前几天在书写TypeScript代码时&#xff0c;出现了声明变量报错的情况&#xff0c;具体情况如下&#xff1a; let arr: number; arr 10; console.log(arr);报错如下&#xff1a; 解决方案&#xff1a; 在配置文件tsconfig.json中&#xff0c;配置如下代码&#xff1a; { &q…

js_常用事件演示

✨前言✨ 1.如果代码对您有帮助 欢迎点赞&#x1f44d;收藏⭐哟 后面如有问题可以私信评论哟&#x1f5d2;️ 2.博主后面将持续更新哟&#x1f618;&#x1f389;文章目录 &#x1f354;一、在JavaScript中什么是事件&#xff1f;&#x1f35f;二、为什么要使用事件&#x…

西软云XMS 反序列化RCE漏洞复现

0x01 产品简介 西软云XMS是基于云平台数据中心开发的支持多酒店、多语言、多平台的酒店管理系统。致力于以新一代云架构为国内四,五星级中高端酒店提供灵活、高度整合酒店业务,助力酒店智能转型升级。 0x02 漏洞概述 西软云XMS /fox-invoker/FoxLookupInvoker接口处存在反…

Erlang、RabbitMQ下载与安装教程(windows超详细)

目录 安装Erlang 1.首先安装RabbitMQ需要安装Erlang环境 2.点击下载好的.exe文件进行傻瓜式安装,一直next即可 3.配置Erlang环境变量 安装RabbitMQ 1.给出RabbitMQ官网下载址&#xff1a;Installing on Windows — RabbitMQ&#xff0c;找到 2.配置RabbitMQ环境变量&#xff0…

国际物流公司科普_集装箱种类区分和介绍_箱讯科技

集装箱运输的不断发展&#xff0c;为适应装载不同种类货物的需要&#xff0c;因而出现了不同种类的集装箱。今天和大家一起来总结一下。 按使用材料分类 根据箱子主体部件&#xff08;侧壁、端壁、箱顶等&#xff09;采用什么材料&#xff0c;就叫做什么材料制造的集装箱&…

机器学习的任务

根据学习任务的不同&#xff0c;机器学习算法大致可分为3类&#xff1a;监督学习、无监督学习和半监督学习。 监督学习&#xff08;Supervised Learning&#xff09; 在监督学习中&#xff0c;我们有一组带有标签&#xff08;即已知输出&#xff09;的训练样本作为输入。目标是…

Baumer工业相机堡盟工业相机如何通过BGAPI SDK实现Raw格式的图像保存(C++)

Baumer工业相机堡盟工业相机如何通过BGAPI SDK实现Raw格式的图像保存&#xff08;C&#xff09; Baumer工业相机Baumer工业相机通过SDK实现Raw格式的图像保存的技术背景通过SDK获取相机信息的代码分析Baumer工业相机回调函数里保存原始图像数据Baumer保存Raw图像格式重要核心代…

用Html和js和layui写一个简单猜拳小游戏

简单学习技术&#xff0c;写了一个小游戏&#xff0c;用html和js写一个简单的小游戏。玩家点击按钮出拳&#xff0c;玩家胜利结果显示绿色&#xff0c;玩家输了结果显示红色&#xff0c;平局结果显示蓝色。 页面效果&#xff1a; 代码&#xff1a; <!DOCTYPE html> <…

低代码平台快速开发CRM 可灵活自定义的CRM软件

白码低代码平台以其简化和加速应用程序开发的方法而闻名&#xff0c;无需大量编写代码&#xff0c;只需通过可视化界面和配置来构建应用程序。在快速开发CRM方面&#xff0c;白码低代码平台具有许多优势和应用。 白码低代码平台快速搭建CRM 快速开发是白码低代码平台的一大优势…

最新AI系统ChatGPT网站系统源码,Midjourney绘画,GPT语音对话+ChatFile文档对话总结+DALL-E3文生图+思维导图一站式解决方案

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

MD5算法

一、引言 MD5&#xff08;Message-Digest Algorithm 5&#xff09;是一种广泛应用的密码散列算法&#xff0c;由Ronald L. Rivest于1991年提出。MD5算法主要用于对任意长度的消息进行加密&#xff0c;将消息压缩成固定长度的摘要&#xff08;通常为128位&#xff09;。在密码学…

算法设计与分析期末上机板子——课内题目题意与题解分析+课外知识点总结!

真正的模板&#xff01;&#xff01;&#xff01; 文章目录 课内堆实现C语言矩阵连乘E1D连分数计算C3A-钢管切割&#xff1a;动态规划C3C-流水线调度&#xff1a;动态规划C3E-矩阵连乘效率&#xff1a;区间动态规划C3F-导弹轰炸&#xff08;小偷问题&#xff09;&#xff1a;动…

flutter dio使用proxyman抓包进行网络调试

证书 wifi 手机和电脑连上同一个wifi&#xff0c;并且手机wifi使用代理&#xff0c;代理地址为电脑的ip和proxyman设置的监听端口 代码 import package:dio/dio.dart; import package:dio/io.dart; import dart:io;class ProxyUtil {static String proxyIP "";st…