Spark---累加器

1.累加器实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
    var sum=0
    dataRdd.foreach(num=>sum+=num)

    println(sum)
    context.stop()

运行结果:
在这里插入图片描述
我们预期是想要实现数据的累加,开始数据从Driver被传输到了Executor中进行计算,但是每个分区在累加数据完成之后并没有将计算结果返回到Driver端,所以导致最后的结果与预期的不一致。
在这里插入图片描述
对上述代码使用累加器

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
    val sum = context.longAccumulator("sum")
    dataRdd.foreach(num=>{
      //使用累加器
      sum.add(num)
    })

    //获取累加器的值
    println(sum.value)

运行结果:
在这里插入图片描述
由此可见,在使用了累加器之后,每个Executor在开始都会获得这个累加器变量,每个Executor在执行完成后,累加器会将每个Executor中累加器变量的值聚合到Driver端。
在这里插入图片描述

Spark提供了多种类型的累加器,以下是其中的一些:
在这里插入图片描述

2.自定义累加器

用户可以通过继承AccumulatorV2来自定义累加器。需求:自定义累加器实现WordCount案例。

AccumulatorV2[IN,OUT]中:
IN:输入数据的类型
OUT:输出数据类型

在这里插入图片描述
WordCount案例实现完整代码:

package bigdata.wordcount.leijiaqi

import bigdata.wordcount.leijiaqi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
 * 使用累加器完成WordCount案例
 */
object Spark_addDemo {
  def main(args: Array[String]): Unit = {
    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val dataRDD: RDD[String] = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")

    //创建累加器对象
    val wordCountAccumulator = new WordCountAccumulator
    //向Spark中进行注册
    context.register(wordCountAccumulator,"wordCountAccumulator")

    //实现累加
    dataRDD.foreach(word => {
      wordCountAccumulator.add(word)
    })
    //获取累加结果,打印在控制台上
    println(wordCountAccumulator.value)

    //关闭链接
    context.stop()
  }

}

class WordCountAccumulator extends  AccumulatorV2[String,mutable.Map[String,Long]]
{

  //定义一个map用于存储累加后的结果
  var map: mutable.Map[String, Long] =mutable.Map[String,Long]()

  //累加器是否为初始状态
  override def isZero: Boolean = {
    map.isEmpty
  }

  //复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new WordCountAccumulator()
  }

  //重置累加器
  override def reset(): Unit = {
    map.clear()
  }

  //向累加器添加数据IN
  override def add(word: String): Unit = {
    // 查询 map 中是否存在相同的单词
    // 如果有相同的单词,那么单词的数量加 1
    // 如果没有相同的单词,那么在 map 中增加这个单词
    val newValue = map.getOrElse(word, 0L) + 1
    map.update(word,newValue)
  }

  //合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    var map1=this.map
    var map2=other.value

    //合并两个map
    map2.foreach({
      case (word,count)=>{
        val newValue = map1.getOrElse(word,0L)+count
        map1.update(word,newValue)
      }
    })
  }

  //返回累加器的结果(OUT)
  override def value: mutable.Map[String, Long] = this.map
}
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/318537.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

wpf使用Popup封装数据筛选框

(关注博主后,在“粉丝专栏”,可免费阅读此文) 类似于DevExpress控件的功能 这是DevExpress的winform筛选样式,如下: 这是DevExpress的wpf筛选样式,如下: 这是Excel的筛选样式,如下: 先看效果 本案例使用wpf原生控件封装,功能基本上都满足,只是颜色样式没有写…

扫描电镜技术在材料科学中的应用及发展趋势

在材料科学领域,扫描电镜技术扮演着极为重要的角色,广泛应用于多种材料形态结构、界面状况、损伤机制和材料性能预测的研究。本文将深入探讨扫描电镜技术的结构、主要性能、工作原理、试样制备技术以及在不同领域的应用。 第一部分:扫描电镜…

【现代密码学】笔记4--消息认证码与抗碰撞哈希函数《introduction to modern cryphtography》

【现代密码学】笔记4--消息认证码与抗碰撞哈希函数《introduction to modern cryphtography》 写在最前面4 消息认证码与抗碰撞哈希函数MAC概念回顾(是的,我忘记这些缩写是什么了。。)MAC的定义适应性CMA(Chosen Message Attack&a…

Android json功能解析

1. 简介 JAVAScript Object Notation是一种轻量级的数据交换格式具有良好的可读和便于快速编写的特性。业内主流技术为其提供了完整的解决方案(有点类似于正则表达式 ,获得了当今大部分语言的支持)。  JSON采用兼容性很高的文本格式&#xf…

Python | 四、链表

为什么需要链表 在Python中,引入链表这一结构没有像C等语言那样有很多好处,因为Python里的列表和字符串结构已经十分灵活且大小可变,仍保留的好处如下: 列表、字符串等结构是连续存储的,因此如果有一块较小的内存区域…

QuEra 10,000个物理量子位和100个逻辑量子位的量子计算机2026

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

mongoose6.0版以上操作mongodb数据库的基本使用

1、介绍 Mongoose 是一个对象文档模型库,官网 http://www.mongoosejs.net/ 2、作用 方便使用代码操作 mongodb 数据库 3、使用流程 3.1、链接数据库 //1. 安装 mongoose---> npm install mongoose --save//2. 导入 mongoose const mongoose require(&quo…

网络安全B模块(笔记详解)- 网络渗透测试

LAND网络渗透测试 1.进入虚拟机操作系统:BT5中的/root目录,完善该目录下的land.py文件,填写该文件当中空缺的Flag1字符串,将该字符串作为Flag值(形式:Flag1字符串)提交;(land.py脚本功能见该任务第6题) 输入flag sendp(packet) Flag:sendp(packet) 2.进入虚拟机操作…

鸿蒙Harmony-PersistentStorage--持久化存储UI状态储详解

用简单的心境,对待复杂的人生,方能看淡得失,从容入世,潇洒自如,心变得简单了,世界也就简单了 目录 一,定义 二,限制条件 三,使用 一,定义 LocalStorage和App…

Open3D AABB包围盒计算与使用(19)

Open3D AABB包围盒计算与使用(19) 一、算法速览二、算法实现1.代码2.结果少年听雨歌楼上。红烛昏罗帐。壮年听雨客舟中。江阔云低、断雁叫西风。 而今听雨僧庐下。鬓已星星也。悲欢离合总无情。一任阶前、点滴到天明。 一、算法速览 AABB包围盒就是将点云用一个各条边沿着坐…

Android Studio 虚拟机 Unknown Error 解决

前言 尝试了网上很多解决方式,但很遗憾,都没效果; 于是我就想啊🤔,虚拟机属于SDK的一部分,那有没有一种可能,是SDK出了问题; 于是我就换了新的SDK,结果 ---- 完美解决…

halcon学习-blob分析统计木材个数

本文用到的主要算子简单介绍如下: 1、矩形结构开运算opening_rectangle1(); 2、圆形结构腐蚀运算erosion_circle(); 3、统计非连通区域个数count_obj(); 4、合并非连通区域concat_obj(); *读取图像 read_image(image,../wood.jpg) *图像转灰度 rgb1_to_gray(image,…

爬虫补环境jsdom、proxy、Selenium案例:某条

声明: 该文章为学习使用,严禁用于商业用途和非法用途,违者后果自负,由此产生的一切后果均与作者无关 一、简介 爬虫逆向补环境的目的是为了模拟正常用户的行为,使爬虫看起来更像是一个真实的用户在浏览网站。这样可以…

《最新出炉》系列入门篇-Python+Playwright自动化测试-9-页面(page)

1.简介 通过前边的讲解和学习,细心认真地小伙伴或者童鞋们可能发现在Playwright中,没有Element这个概念,只有Page的概念,Page不仅仅指的是某个页面,例如页面间的跳转等,还包含了所有元素、事件的概念&#…

大数据仓库开发规范示例

大数据仓库开发规范示例 一、前提概要二、数仓分层原则及定义2.1 数仓分层原则2.2 数仓分层定义 三、数仓公共开发规范3.1 分层调用规范3.2 数据类型规范3.3 数据冗余规范3.4 NULL字段处理规范3.5 公共字段规范3.6 数据表处理规范3.7 事实表划分规范 四、数仓各层开发规范4.1 分…

Linux配置JAR包为服务实现自启动

一、实现bash脚本 1.1 绘图工具 绘图需安装idea的插件plantUML-Integration 只需要上图一个就可以,别的也不需要装。 启动服务的逻辑如下 关闭服务的逻辑如下 1.2 逻辑实现 在/root路径下创建entrance文件,实现逻辑如下 #!/usr/bin/env bash # 2>…

【120版本】最新谷歌浏览器驱动下载地址

在使用selenium时可能会遇到谷歌浏览器和谷歌驱动器版本不一致的问题,并且国内可以搜到的谷歌浏览器下载地址里面最新的驱动器只有114版本的,但目前谷歌浏览器最新版本是120。所以这里记录下最新版本120谷歌驱动器下载地址: Chrome for Test…

spark中Rdd依赖和SparkSQL介绍--学习笔记

1,RDD的依赖 1.1概念 rdd的特性之一 相邻rdd之间存在依赖关系(因果关系) 窄依赖 每个父RDD的一个Partition最多被子RDD的一个Partition所使用 父rdd和子rdd的分区是一对一(多对一) 触发窄依赖的算子 map()&…

强化学习应用(七):基于Q-learning的无人机物流路径规划研究(提供Python代码)

一、Q-learning简介 Q-learning是一种强化学习算法,用于解决基于马尔可夫决策过程(MDP)的问题。它通过学习一个价值函数来指导智能体在环境中做出决策,以最大化累积奖励。 Q-learning算法的核心思想是通过不断更新一个称为Q值的…

【每日小bug】mybatis plus id注解错误导致的问题

插入数据 id不为自增 指定了主键,没有指定自增。会导致出现 修改如上 报错 Data truncation: Out of range value for column ‘id’ at row 1 数据库是bigint,java中是Integer。 修改如上
最新文章