【Spark精讲】Spark与MapReduce对比

目录

对比总结

MapReduce流程

​编辑

MapTask流程

ReduceTask流程

MapReduce原理

阶段划分

Map shuffle

Partition

Collector

Sort

Spill

Merge

Reduce shuffle

Copy

Merge Sort


对比总结

  1. Map端读取文件:都是需要通过split概念来进行逻辑切片,概念相同,底层具体实现和参数略有差异;
  2. 业务逻辑实现方式:MapReduce引擎是通过用户自定义实现Mapper和Reducer类来实现业务逻辑的;而Spark提供了丰富的算子以及上层DataFrame、DataSet的抽象;
  3. 计算引擎:MapReduce是基于标准的map reduce计算理念的实现,map任务和reduce任务数据交换通过读写文件的方式进行的,运行效率低;Spark是基于内存的(RDD),同时Spark的计算引擎是基于DAG方式实现的(类似Tez),除了shuffle会通过文件进行数据交换外,每个阶段的计算是基于内存的,运行效率高,基于内存有利于进行性能优化,同时提供缓存和广播机制有利于计算结果复用,适合算法迭代计算,此外不但可以使用堆内内存还可以使用堆外内存;
  4. 部署方式:都可以基于YARN,MapReduce基于YARN实现了MRAppMaster,Spark基于YARN实现了ApplicationMaster;
  5. Shuffle方式:MapReduce采用了基于排序的数据聚集策略(按Key排序),而该策略是不可定制的,不可以使用其他数据聚集算法(如Hash聚集);而Spark默认也需要排序,但一定条件下也可以不进行排序(ByPass和Tungsten方式)。
  6. 运行方式:MapReduce的Job是基于JVM进程的;而Spark的Driver和Executor是JVM进程,Task运行是基于线程的。

MapReduce流程

MapTask流程

  1. Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
  2. Map阶段:该阶段主要是讲解析的key/value交给用户编写的map()函数处理,并产生一些列新的key/value。
  3. Collect阶段:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分片(通过调用Partitioner),并写入一个环形内存缓冲区中。
  4. Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要主要的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
  5. Combine阶段:当所有数据处理完成后,Map Task对所有临时文件进行一次合并,以确保最终只是生成一个数据文件。

ReduceTask流程

  1. Shuffle阶段:也称之为Copy阶段。Reduce Task从各个Map Task远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  2. Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
  3. Sort阶段:按照MapReduce语义,用户编写的reduce()函数输入数据是按Key进行聚集的一组数据。为了将Key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个Map Task已经实现对自己的处理结果进行了局部排序。因此Reduce Task只需对所有数据进行一次归并排序即可。
  4. Reduce阶段:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。
  5. Write阶段:reduce()函数将计算结果写到HDFS上。

MapReduce原理

阶段划分

我们知道MapReduce计算模型主要由三个阶段构成:Map、shuffle、Reduce。

  1. Map是映射,负责数据的过滤分法,将原始数据转化为键值对;
  2. Reduce是合并,将具有相同key值的value进行处理后再输出新的键值对作为最终结果。
  3. Shuffle:为了让Reduce可以并行处理Map的结果,必须对Map的输出进行一定的排序与分割,然后再交给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就是Shuffle。

整个MR的大致过程如下:

Map和Reduce操作需要我们自己定义相应Map类和Reduce类,以完成我们所需要的化简、合并操作,而shuffle则是系统自动帮我们实现的,了解shuffle的具体流程能帮助我们编写出更加高效的Mapreduce程序。

Shuffle过程包含在Map和Reduce两端,即Map shuffle和Reduce shuffle。

Map shuffle

在Map端的shuffle过程是对Map的结果进行分区、排序、分割,然后将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件,分区有序的含义是map输出的键值对按分区进行排列,具有相同partition值的键值对存储在一起,每个分区里面的键值对又按key值进行升序排列(默认),其流程大致如下:

Partition

对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理。

我们知道每一个Reduce的输出都是有序的,但是将所有Reduce的输出合并到一起却并非是全局有序的,如果要做到全局有序,我们该怎么做呢?最简单的方式,只设置一个Reduce task,但是这样完全发挥不出集群的优势,而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reduce task数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。

另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。

Collector

Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长,如图所示:

Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

索引是对在kvbuffer中的键值对的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。

Kvbuffer的大小可以通过io.sort.mb设置,默认大小为100M。但不管怎么设置,Kvbuffer的容量都是有限的,键值对和索引不断地增加,加着加着,Kvbuffer总有不够用的那天,那怎么办?把数据从内存刷到磁盘上再接着往内存写数据,把Kvbuffer中的数据刷到磁盘上的过程就叫Spill,多么明了的叫法,内存中的数据满了就自动地spill到具有更大空间的磁盘。

关于Spill触发的条件,也就是Kvbuffer用到什么程度开始Spill,还是要讲究一下的。如果把Kvbuffer用得死死得,一点缝都不剩的时候再开始Spill,那Map任务就需要等Spill完成腾出空间之后才能继续写数据;如果Kvbuffer只是满到一定程度,比如80%的时候就开始Spill,那在Spill的同时,Map任务还能继续写数据,如果Spill够快,Map可能都不需要为空闲空间而发愁。两利相衡取其大,一般选择后者。Spill的门限可以通过io.sort.spill.percent,默认是0.8。

Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

Sort

当Spill触发后,SortAndSpill先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。

Spill

Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。在这个过程中如果用户配置了combiner类,那么在写之前会先调用combineAndSpill(),对结果进行进一步合并后再写出。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

所有的partition对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个partition在这个文件中存放的起始位置呢?强大的索引又出场了。有一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。然后把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out.index”的文件,文件中不光存储了索引数据,还存储了crc32的校验数据。spill12.out.index不一定在磁盘上创建,如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了,和spill12.out文件也不一定在同一个目录下。每一次Spill过程就会最少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图所示:

在Spill线程如火如荼的进行SortAndSpill工作的同时,Map任务不会因此而停歇,而是一无既往地进行着数据输出。Map还是把数据写到kvbuffer中,那问题就来了:只顾着闷头按照bufindex指针向上增长,kvmeta只顾着按照Kvindex向下增长,是保持指针起始位置不变继续跑呢,还是另谋它路?如果保持指针起始位置不变,很快bufindex和Kvindex就碰头了,碰头之后再重新开始或者移动内存都比较麻烦,不可取。Map取kvbuffer中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex指针移动到这个分界点,Kvindex移动到这个分界点的-16位置,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当Spill完成,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图所示:

Map任务总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上。

Merge

Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge过程闪亮登场。

Merge过程怎么知道产生的Spill文件都在哪了呢?从所有的本地目录上扫描得到产生的Spill文件,然后把路径存储在一个数组里。Merge过程又怎么知道Spill的索引信息呢?没错,也是从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里。到这里,又遇到了一个值得纳闷的地方。在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作?特别是Spill的索引数据,之前当内存超限之后就把数据写到磁盘,现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中。之所以多此一举,是因为这时kvbuffer这个内存大户已经不再使用可以回收,有内存空间来装这些数据了。(对于内存空间较大的土豪来说,用内存来省却这两个io步骤还是值得考虑的。)

然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引,一个partition一个partition的进行合并输出。对于某个partition来说,从索引列表中查询这个partition对应的所有索引信息,每个对应一个段插入到段列表中。也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文件名、起始位置、长度等等。

然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。最终的索引数据仍然输出到Index文件中。

Reduce shuffle

在Reduce端,shuffle主要分为复制Map输出、排序合并两个阶段。

Copy

Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。Map任务成功完成后,会通知父TaskTracker状态已经更新,TaskTracker进而通知JobTracker(这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker能记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会从此输出对应的TaskTracker上复制输出到本地,而不会等到所有的Map任务结束。

Merge Sort

Copy过来的数据会先放入内存缓冲区中,如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存缓存区中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中,即内存到磁盘merge。在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。

当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓冲区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/248492.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

dbeaver导出数据为excel格式

dbeaver导出excel 目前数据的可选择只有这几种 恰好没有我们需要的excel模式,而我们需要数据一般都需要excel的 所以我们可以通过以下步骤得到我们的excel格式的数据集: 1.直接选csv模式,至于csv有陌生的小伙伴可以理解:CSV(Comma-Separated Values)是…

同时获取el-select的label和value

ui如下: 需求如下: 在点击确认的时候,将id和name都传给一个接口,但是ui只用展示name,name用v-model绑定给input框,但是id不知道怎么传给后端。 解决方法如下: vue中elementUi的el-select同时…

解决kernel32.dll丢失的修复方式,kernel32.dll预防错误的方法

kernel32.dll文件是电脑中的一个重要文件,如果电脑出现kernel32.dll丢失的错误提示,那么电脑中的一些程序将不能正常使用,那么出现这样的问题有什么解决办法呢?那么今天就和大家说说解决kernel32.dll丢失的修复方式。 一.kernel32…

MySql踩坑记录 DATE_FORMAT函数

使用场景:进行某一日期范围内的数据查询 查询结果:空,查询不到符合条件的数据。 Sql展示: SELECTa.dt_plan AS planDate,a.sd_status AS planStatus FROMc_hpl_plan a WHEREa.id_phr 6576727112f1a21849639530 -- 第一种写法…

STM32 寄存器配置笔记——I2C 读写AT24C02 EEPROM

一、简介 本文主要介绍STM32F10xx系列如何使用软件模拟I2C总线读写AT24C02的EEPROM数据。 二、概述 I2C协议是一种用于同步、半双工、串行总线(由单片机时钟线、单数据交换器数据线组成)上的协议。规定了总线空闲状态、起始条件、停止条件、数据有效性、字节格式、响应确认信号…

c# 为什么修改Font导致Location 变化

搜索引擎、各种人工智能,只有这个帮我解决了问题 然后我发现了这个 我就奇怪,一行行调试代码,最终发现设置Font,Location就变了,完全想不通

Linux 创建分区

要求 分一个区就行,用 ext4 文件系统,挂到 /data 目录。 查看 lsblk sdb 没有分区 创建分区 [rootlocalhost ~]# ll /dev/sd* brw-rw----. 1 root disk 8, 0 2月 27 15:10 /dev/sda brw-rw----. 1 root disk 8, 1 2月 27 15:10 /dev/sda1 brw-rw-…

【vue】jenkins打前端包时报错:第 8 行:cd: dist: 没有那个文件或目录

问题描述 jenkins打前端包时报错:第 8 行💿 dist: 没有那个文件或目录 Jenkins中 “Execute shell” 配置的脚本: echo $PATH node -v npm -v npm config set registry http://ued.edtsoft.com/ npm install npm run build:prod cd dist rm…

PostgreSQL向量数据插件--pgvector安装(附PostgreSQL安装)

PostgreSQL向量数据插件--pgvector安装 一、版本二、数据库安装1. 在官网下载PostgreSQL14.0的安装包2.增加用户postgres3.解压安装 三、pgvector安装1. 从github上克隆下来2. 安装pgvector插件3. 开始使用pgvector启用pgsql命令行创建扩展 本文为本人在安装pgvector中踩过的坑…

Github2023-12-15 开源项目日报 Top10

根据Github Trendings的统计,今日(2023-12-15统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量TypeScript项目3非开发语言项目3JavaScript项目1Python项目1Rust项目1PHP项目1 基于项目的学习 创建周期&am…

【通用】Linux,VSCode,IDEA,Eclipse等资源相对位置

正文 不论是 IDEA、Linux、VSCode、cmd等等吧,都遵循这个规则: 如果以斜杠开头,表示从根开始找: IDEA的根是classpath(classpath就是项目被编译后,位于 target下的 classes文件夹,或者位于ta…

QT笔记(节选)具体图片等下载资源

QT笔记(节选)具体图片等下载资源 根据b站视频做的笔记: https://www.bilibili.com/video/BV1g4411H78N?p44&spm_id_frompageDriver&vd_sourcea3e6a48ccd3d7d1f969f662653ed68c9 qt是一个跨平台的c图形用户界面应用程序框架&#x…

编程导航算法通关村——算法基础

目录 1. 时间复杂度 1.1. 时间复杂度概念 1.2. 几种常见的阶 1.2.1. 常数阶 O(1) 1.2.2. 线性阶 O(n) 1.2.3. 平方阶 (n) 1.2.4. 对数阶 O(logn) 2. 最坏情况和平均情况 3. 空间复杂度 1. 时间复杂度 1.1. 时间复杂度概念 当我们说算法的时间复杂度时,我们…

【动手学深度学习】(十四)数据增广+微调

文章目录 一、数据增强1.理论知识2.代码 二、微调1.理论知识 一、数据增强 1.理论知识 增加一个已有数据集,使得有更多的多样性 在语言里面加入各种不同的背景噪音改变图片的颜色和形状 使用增强数据训练 翻转 左右翻转上下翻转 不总是可行 切割 从图片中切…

【数据结构和算法】判断子序列

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、题目描述 二、题解 2.1 方法一:双指针 三、代码 3.1 方法一:双指针 3.1.1 Java易懂版:…

解决Chrome同一账号在不同设备无法自动同步书签的问题

文章目录 一、问题与原因?2. 解决办法 一、问题与原因? 1.问题 使用谷歌Chrome浏览器比较头疼的问题就是:使用同一个Google账号,办公电脑与家用电脑的数据无法同步。比如:办公电脑中的书签、浏览记录等数据&#xff0…

drf入门规范

一 Web应用模式 在开发Web应用中,有两种应用模式: 1.1 前后端不分离 1.2 前后端分离 二 API接口 为了在团队内部形成共识、防止个人习惯差异引起的混乱,我们需要找到一种大家都觉得很好的接口实现规范,而且这种规范能够让后端写…

Tomcat部署与调优

目录 前瞻 什么是tomcat? 什么是servlet? 什么是JSP? Tomcat功能组件结构 Container结构分析 Tomcat请求过程 Tomcat服务部署 1.关闭防火墙,将安装 Tomcat 所需软件包传到/opt目录下 2.安装JDK 3.设置JDK环境变量 4.安装启动Tomc…

1130 - Host “WIN-CA4FHERGO9J‘ is not allowed to connect to this MySQL server

1、知识小课堂 1.1 Mysql MySQL是一个关系型数据库管理系统,由瑞典MySQL AB公司开发,属于Oracle旗下产品。它是最流行的关系型数据库管理系统之一,在WEB应用方面,MySQL是最好的RDBMS (Relational Database Management System&am…

[每周一更]-(第27期):HTTP压测工具之wrk

[补充完善往期内容] wrk是一款简单的HTTP压测工具,托管在Github上,https://github.com/wg/wrkwrk 的一个很好的特性就是能用很少的线程压出很大的并发量. 原因是它使用了一些操作系统特定的高性能 io 机制, 比如 select, epoll, kqueue 等. 其实它是复用了 redis 的 ae 异步事…