Spark---累加器和广播变量

文章目录

  • 1.累加器实现原理
  • 2.自定义累加器
  • 3.广播变量

1.累加器实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
    var sum=0
    dataRdd.foreach(num=>sum+=num)

    println(sum)
    context.stop()

运行结果:
在这里插入图片描述
我们预期是想要实现数据的累加,开始数据从Driver被传输到了Executor中进行计算,但是每个分区在累加数据完成之后并没有将计算结果返回到Driver端,所以导致最后的结果与预期的不一致。
在这里插入图片描述
对上述代码使用累加器

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
    val sum = context.longAccumulator("sum")
    dataRdd.foreach(num=>{
      //使用累加器
      sum.add(num)
    })

    //获取累加器的值
    println(sum.value)

运行结果:
在这里插入图片描述
由此可见,在使用了累加器之后,每个Executor在开始都会获得这个累加器变量,每个Executor在执行完成后,累加器会将每个Executor中累加器变量的值聚合到Driver端。
在这里插入图片描述

Spark提供了多种类型的累加器,以下是其中的一些:
在这里插入图片描述

2.自定义累加器

用户可以通过继承AccumulatorV2来自定义累加器。需求:自定义累加器实现WordCount案例。

AccumulatorV2[IN,OUT]中:
IN:输入数据的类型
OUT:输出数据类型

在这里插入图片描述
WordCount案例实现完整代码:

package bigdata.wordcount.leijiaqi

import bigdata.wordcount.leijiaqi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
 * 使用累加器完成WordCount案例
 */
object Spark_addDemo {
  def main(args: Array[String]): Unit = {
    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val dataRDD: RDD[String] = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")

    //创建累加器对象
    val wordCountAccumulator = new WordCountAccumulator
    //向Spark中进行注册
    context.register(wordCountAccumulator,"wordCountAccumulator")

    //实现累加
    dataRDD.foreach(word => {
      wordCountAccumulator.add(word)
    })
    //获取累加结果,打印在控制台上
    println(wordCountAccumulator.value)

    //关闭链接
    context.stop()
  }

}

class WordCountAccumulator extends  AccumulatorV2[String,mutable.Map[String,Long]]
{

  //定义一个map用于存储累加后的结果
  var map: mutable.Map[String, Long] =mutable.Map[String,Long]()

  //累加器是否为初始状态
  override def isZero: Boolean = {
    map.isEmpty
  }

  //复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new WordCountAccumulator()
  }

  //重置累加器
  override def reset(): Unit = {
    map.clear()
  }

  //向累加器添加数据IN
  override def add(word: String): Unit = {
    // 查询 map 中是否存在相同的单词
    // 如果有相同的单词,那么单词的数量加 1
    // 如果没有相同的单词,那么在 map 中增加这个单词
    val newValue = map.getOrElse(word, 0L) + 1
    map.update(word,newValue)
  }

  //合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    var map1=this.map
    var map2=other.value

    //合并两个map
    map2.foreach({
      case (word,count)=>{
        val newValue = map1.getOrElse(word,0L)+count
        map1.update(word,newValue)
      }
    })
  }

  //返回累加器的结果(OUT)
  override def value: mutable.Map[String, Long] = this.map
}
}

运行结果:
在这里插入图片描述

3.广播变量

在Apache Spark中,广播变量(Broadcast Variables)是一种特殊类型的变量,用于优化数据共享。当Spark应用程序在集群中的多个节点上运行时,每个节点都需要访问相同的数据集。如果数据集很大,那么将这些数据发送到每个节点可能会非常耗时和低效。 广播变量提供了一种方法,可以在集群中的所有节点上共享数据集的一个只读副本 ,从而避免了在每个节点上重复发送数据。

在这里插入图片描述
广播变量也叫分布式只读变量,它可以将闭包数据发送到每个Executor的内存中来达到共享的目的,Executor其实就相当于一个JVM,在启动的时候会自动分配内存。

    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val rdd1 = context.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 4)
    val list = List(("a", 4), ("b", 5), ("c", 6), ("d", 7))
    // 声明广播变量
    val broadcast: Broadcast[List[(String, Int)]] = context.broadcast(list)
    val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
      case (key, num) => {
        var num2 = 0
        // 使用广播变量
        for ((k, v) <- broadcast.value) {
          if (k == key) {
            num2 = v
          }
        }
        (key, (num, num2))
      }
    }

    resultRDD.collect().foreach(print)
    
    context.stop()

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/328667.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

利用HTML和CSS实现的浮动布局

代码如下 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><style>*{m…

Python武器库开发-武器库篇之Whois信息收集模块化(四十五)

Python武器库开发-武器库篇之Whois信息收集模块化(四十五) 我们在进行渗透的时候&#xff0c;需要进行全面的信息收集&#xff0c;除了主动信息收集之外&#xff0c;我们还经常会进行被动信息收集&#xff0c;Whois信息收集就是其中的一种,我们可以利用一些网站进行Whois信息收…

区间预测 | Matlab实现LSTM-Adaboost-ABKDE的集成学习长短期记忆神经网络自适应带宽核密度估计多变量回归区间预测

区间预测 | Matlab实现LSTM-Adaboost-ABKDE的集成学习长短期记忆神经网络自适应带宽核密度估计多变量回归区间预测 目录 区间预测 | Matlab实现LSTM-Adaboost-ABKDE的集成学习长短期记忆神经网络自适应带宽核密度估计多变量回归区间预测效果一览基本介绍程序设计参考资料 效果一…

Java--HashMap中put()方法是如何实现的

一、什么是HashMap HashMap是Java中常用的数据结构之一&#xff0c;它基于哈希表实现&#xff0c;提供了快速的键值对存取能力。在HashMap中&#xff0c;put方法是最常用的方法之一&#xff0c;用于将键值对存储到HashMap中。本文将深入探究HashMap的put方法的实现原理&#x…

vue 作用域插槽、具名插槽

在子组件中使用了具名插槽<slot name"qwe" :games"games" x"我是x" y"我是y"></slot>&#xff0c; 相应的在父组件中使用<template #qwe"data">的语法来接收插槽内容。 这个语法是ok的&#xff0c; 但…

微信使用wx.getLocation

1&#xff0c;小程序管理后台 -「开发」-「开发管理」-「接口设置」” 中完成权限申请&#xff1b; 2&#xff0c;需在 app.json 中声明其需调用的地理位置相关接口 "permission": {"scope.userLocation": { "desc": "你的位置信息将用于小…

如何在 Python3 中使用变量

介绍 变量是一个重要的编程概念&#xff0c;值得掌握。它们本质上是在程序中用于表示值的符号。 本教程将涵盖一些变量基础知识&#xff0c;以及如何在您创建的 Python 3 程序中最好地使用它们。 理解变量 从技术角度来说&#xff0c;变量是将存储位置分配给与符号名称或标…

M1卡和CPU卡学习

目录介绍 01.整体概述介绍 1.1 项目背景说明1.2 IC智能卡说明1.3 基础概念介绍 02.CPU卡基础学习 2.1 CPU操作流程2.2 读卡器与卡交互指令2.3 APDU指令格式2.4 APDU指令组成2.5 发送指令详细剖析2.6 响应指令详细解析 03.M1卡基础学习 3.1 什么是M1卡3.2 M1卡数据结构3.3 M1扇…

晶振线路匹配需要进哪一些测试

晶振线路匹配的测试对于确保晶振性能的稳定性和可靠性至关重要&#xff0c;那么晶振线路匹配需要进哪一些测试呢? 晶振线路匹配测试是确保晶振性能稳定性和可靠性的关键环节。为了全面评估晶振的性能&#xff0c;需要进行一系列的测试&#xff0c;包括负载电容测试、驱动电平…

HarmonyOS 转场动画

今天 我们来说 组件内转场动画 我们可以先编写代码如下 Entry Component struct Index {State flag:boolean true;build() {Column({space: 30}) {Button("转换").onClick(()> {})Image("https://img1.baidu.com/it/u1699929707,733321099&fm253&…

nxp s32k144芯片使用J-LINK程序刷写

1.nxp s32k144 (1)打开软件&#xff1a;J-Flash V6.30j (2)新建工程&#xff1a;file->new project (3)选择芯片型号和 target interface (4)可以保存芯片和接口配置 (5)打开程序&#xff1a;File->open data file &#xff08;6&#xff09;程序刷写&#xff1a;T…

【UE5】交互式展厅数字博物馆交互是开发实战课程

长久以来&#xff0c;我们总是不断被初学者问到类似这样的问题&#xff1a;如何从头到尾做一个交互式程序开发项目&#xff1f;本套课程尝试对这个问题进行解答。 课程介绍视频如下 【UE5】数字展厅交互式开发全流程 【谁适合学习这门课】 本套课程面向初学者&#xff0c;满足…

【加强版】小学数学出题,加减乘除混合运算,支持自定义数字,一键打印

在线预览&#xff1a;在线HTML代码预览和运行工具 - UU在线工具 复制下面代码后到该地址预览即可 注意&#xff1a;在线预览不能打印。如需打印&#xff0c;在电脑本地上新建文本文档&#xff0c;粘贴代码后保存&#xff0c;然后把文件后缀改为.html运行&#xff0c;出题点击…

【Java JVM】栈帧

执行引擎是 Java 虚拟机核心的组成部分之一。 在《Java虚拟机规范》中制定了 Java 虚拟机字节码执行引擎的概念模型, 这个概念模型成为各大发行商的 Java 虚拟机执行引擎的统一外观 (Facade)。 不同的虚拟机的实现中, 通常会有 解释执行 (通过解释器执行)编译执行 (通过即时编…

rust跟我学二:模块编写与使用

图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info中模块的使用。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址:[我的Rust库更新]g…

【物联网】物联网设备和应用程序涉及协议的概述

物联网设备和应用程序涉及协议的概述。帮助澄清IoT层技术栈和头对头比较。 物联网涵盖了广泛的行业和用例&#xff0c;从单一受限制的设备扩展到大量跨平台部署嵌入式技术和实时连接的云系统。 将它们捆绑在一起是许多传统和新兴的通信协议&#xff0c;允许设备和服务器以新的…

小程序样例1:简单待办列表

基本功能&#xff1a; 显示所有待办列表&#xff08;点击不同的文本进行显示&#xff09; 没完成的待办 已完成的待办 新建待办test 清除待办foo 代码js文件&#xff1a; //index.js //获取应用实例 const app getApp(); Page({data: {todo: ,todos: [{"id": 1474…

鸿蒙Harmony-列表组件(List)详解

不要和别人比生活&#xff0c;每个人阶段不同&#xff0c;追求不同&#xff0c;活法自然也不同。只要今天的你能比昨天的你快乐一点点&#xff0c;那你就是自己人生赢家。 目录 一&#xff0c;定义 二&#xff0c;布局与约束 2.1 布局 2.2 约束 三&#xff0c;开发布局 3.1 设置…

LabVIEW模拟荧光显微管滑动实验

LabVIEW模拟荧光显微管滑动实验 在现代生物医学研究中&#xff0c;对微观生物过程的精准模拟和观察至关重要。本案例展示了如何利用LabVIEW软件和专业硬件平台&#xff0c;创新地模拟荧光显微管在滑动实验中的动态行为&#xff0c;这一过程不仅提升了实验效率&#xff0c;还为…

探寻爬虫世界01:HTML页面结构

文章目录 一、引言&#xff08;一&#xff09;背景介绍&#xff1a;选择爬取51job网站数据的原因&#xff08;二&#xff09;目标与需求明确&#xff1a;爬取51job网站数据的目的与用户需求 二、网页结构探索&#xff08;一&#xff09;51job网页结构分析1、页面组成&#xff1…
最新文章