Spark 核心API

核心 API

spark core API 指的是 spark 预定义好的算子。无论是 spark streaming 或者 Spark SQL 都是基于这些最基础的 API 构建起来的。理解这些核心 API 也是写出高效 Spark 代码的基础。

Transformation

转化类的算子是最多的,学会使用这些算子就应付多数的数据加工需求了。他们有啥呢?可以如下分发:

  1. 转化算子: map、flatMap、filter
  2. 聚合算子:reduceByKey、reducerBy、groupBy、groupByKey、conbinerByKey、mapValues、flatMapValues
  3. 连接算子: cogroup、join、union、leftOuterJoin、rightOuterJoin、union
  4. 排序算子:sortBy、sortByKey

看起来好多,其实就这四种数据加工操作。他们之间又有实现上依赖关系。如下图所示:
函数的依赖关系

转化算子

在做数据加工的时候,我经常会将某个字段的值进行加工,例如,格式化日期、正则匹配、数据计算、逻辑判断、过滤。 都可以使用转化算子进行加工。举个例子,将过来出 158 开头的手机号,显示出来的电话中间四位替换为*


import org.apache.spark.{SparkConf, SparkContext}

object CSDN {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName(CSDN.getClass.getCanonicalName)
    val sc = new SparkContext(conf)
    sc.parallelize(List("15899887112"
      , "15799887112"
      , "15999887152"
      , "15799887192"
    )).filter(x => x.startsWith("158"))
    .map(x => x.substring(0 , 3) + "****" + x.substring(7 , x.length))
    .foreach(println);
  }

}

总结一下,map 做的事情就是 A -> B ,filter 是过滤的功能。

flatMap 的功能比较难理解,他是这样的,A -> [B , B , B] ,flatMap 返回的是一个数组。还是用一个例子来说明吧。有如下例子,

groupplayer
LakersJames,Davis
CelticsAtum,Borrow

转化为

playergroup
JamesLakers
DavisLakers
AtumCeltics
BorrowCeltics

代码是:


    val conf = new SparkConf().setMaster("local").setAppName(CSDN.getClass.getCanonicalName)
    val sc = new SparkContext(conf)
    sc.parallelize(List(
      ("Lakers" , "James,Davis")
      ,("Celtics" , "Atum,Borrow")
    )).flatMap(x => {
      x._2.split(",").map(xx => (x._1 , xx))
    }).foreach(println)

还有两个和 map 和 flatMap 长的差不多的,分别是 mapValue 和 flatMapValues 两个函数。这两个函数是 PairRDDFunctions 的匿名类中的函数,从 PairRDDFunctions 的名称中可以知道,PairRDDFunctions 是真的键值对的,也就是说 RDD 中的数据是键值对的时候,我们可以调 PairRDDFunctions 的函数,scala 这个功能好像类的被动技能。这是对 RDD 功能一种扩展。说了写废话,还是说回 mapValue 和 flatMapValue ,当这个两个算子接收到 我们字段的函数后,作用到的是 key-value 的 value 上面的, map 和 flapMap 是作用到整个数据上的。例如,我们的数据是 ( James , 37) ,我自定义的函数是 self_define_function , map 和 flatMap 的效果是 self_define_function((James , 37)) , 而 mapValue 和 flatMapValues 则是 (James , self_define_function(value))。

聚合算子

聚合算子包括 combinerByKeyWithClassTag、reduceBykey、reduceBy,然后把数据连接启动的算子:cogroup、join、leftOuterJoin、rightOuterJoin,还有 union 这几个东西。

combinerByKeyWithClassTag 是一个基础类,当明白了它,reduceByKey 和 reduceBy 都会明白了。conbinerByKey 和 Accumulator(累加器) 的计算逻辑一样的。就看一下它的入参吧。

combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)

createCombiner : 是一个函数,此函数的入参是 V 返回的是一个 C , V 和 C 是泛型。此函数的功能是创建一个初始值。
mergeValue :也是一个函数,此函数的入参是 C 和 V 返回的是 V ,此函数会接收各个分区每条数据 V ,然后经过加工,返回的还是一个 C 。
mergeCombiner: 又是一个函数,它是合并各个分区 combiner 后的值。
partitioner: 是分区器,它是用来位每条记录计算分区用的。
mapSideCombiner:这个是设置是否在 shuffle 的过程执行,执行 map-side 的局部聚合。
serializer:是数据序列化器,数据在不同的通过网络间传输的时候,需将数据序列化后传输的,这样可以提高效率。

下面

partition1创建C
mergeValue将V累计到C上
partition2创建C
partition3创建C
combiner1获取累计值
combiner2获取累计值
mergeCombiner将combiner1和combiner2合并
获得结果

此算子是 PairRDDFunctions 的,所以它是处理 key-value 类型数据的算子。以 word count 为例子。
这需要假设我设置了 mapSideCombine = true 从可以的。

val conf = new SparkConf().setMaster("local[*]").setAppName("");
val sc = new SparkContext(conf)
sc.parallelize(List(
"Java","Spark","Scala","Python","Shell","Lisp"
)).map((_,1)).
combineByKeyWithClassTag(
 (x:Int) => x
,(x:Int,y:Int) => x + y
,(x:Int,y:Int) => x + y
,new HashPartitioner(3)
,true
,null
)
.foreach(println)

在上面的的 combinerBykeyWithClassTag 的用户相当于 reduceByKey(+) 的用法,里面的 + 其实 (x:Int,y:Int) => x + y 的简写。

搞聚合的还有一个 groupByKey 和 groupBy() 这两个东西,既然咱们知道了 combinerByKeyWithClassTag 这个函数,其实通过看源码就可以看到 groupByKey 的功能了。

// 进入 PairRDDFunction 对象的 groupByKey 方法里面
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    // 底层就是使用的 combinerByKeyWithClassTag 这个函数
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

从源码中,可以看到 groupByKey 底层还是用的 combineByKeyWithClassTag,我来看看它里面三个非常重要的函数:

  1. val createCombiner = (v: V) => CompactBuffer(v) 这是初始化 combiner 函数,返回的是一个 CompactBuffer ,这是一个底层保存是数组,这个看以看成是一个 list 。
  2. val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v,这个是 mergeValue 的函数,它的做法是将 value 的值放到 CompactBuff 列表的。
  3. val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2,这是 mergeCombiner 的函数,此函数是将两个 CompactBuffer 合并成一个 CompactBuffer 中。

这样算下来,groupByKey 其实是将相同 key 下面的 value 放入到一个 CompactBuffer 中,然后然后在像求什么值,在进行计算就行了。可以使用 mapValues 此函数。这个函数也是 PairRDDFunction 的。

现在再来 groupBy 吧,上源码:

// 打开 RDD 的 groupBy 方法
  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
      : RDD[(K, Iterable[T])] = withScope {
    val cleanF = sc.clean(f)
    // 先使用 map 将 RDD 转化为一个 PairRDD ,然后就可以使用 groupByKey 了
    this.map(t => (cleanF(t), t)).groupByKey(p)
  }

从代码中可以看到,先是将 RDD 转为 PairRDD ,然后将再使用 groupBykey。转化为 PairRDD 中,使用到 f 这个我们自定义的函数,此函数接收一个 RDD 中的数据,然后返回的是个 key 值,f 其实是定义 key 的函数。

下面看一个例子,

playergroup
JamesLakers
DavisLakers
AtumCeltics
BorrowCeltics

转化为

groupplayer
LakersJames,Davis
CelticsAtum,Borrow

代码为:

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val value = sc.parallelize(List(("Lakers", "James")
      , ("Lakers", "Davis")
      , ("Celtics", "Atum")
      , ("Celtics", "Borrow")
    ))
    value
      .groupByKey()
      .mapValues(x => x.mkString(","))
      .foreach(x => println(s"key: ${x._1} , value:${x._2}"))
    sc.stop()

// 第二种写法

    val value:RDD[(String,String)] = sc.parallelize(List(("Lakers", "James")
      , ("Lakers", "Davis")
      , ("Celtics", "Atum")
      , ("Celtics", "Borrow")
    ))
    value
      .groupBy(x=>x._1)
      .mapValues(x => x.mkString(","))
      .foreach(x => println(s"key: ${x._1} , value:${x._2}"))
    sc.stop()
排序算子

排序算子比较少,就两个一个 sortByKey ,另外一个就是 sortBy ,先来看 sortByKey 。
sortByKey 也是一个 PairRDDFunction 的函数,其处理的是 key-value 中的 key ,也就是根据 key 值来进行的排序,看一个例子吧。

    sc.parallelize(List("A" , "C" , "B" , "E" , "F" ))
      .map((_,0))
      // 这里必须设置分区数量为1,否则,打印出来的元素就不排序了。
      .sortByKey(true , 1)
    sc.stop()

其实,sortBy 就是基于 sortByKey ,来看看源码就知道了。

  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values
  }

keyBy(func) 可以理解为 RDD.map(x => (func(x) , x)) 其实就是将 RDD 转化为一个 PairRDD , 这样就能用 sortByKey 了,最后把 PairRDD 转化为原来的 RDD 。

连接类的算子

首先要讲的就是 cogroup 算子,它也是一个基础的算子,像 join、lelfOuterJoin、rightOuterJoin、intersection、fullOuterjoin 都是依赖 cogroup 实现的。

看一下 congroup 实现订单表和商品维表关联取出商品价格的情况。

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val order:RDD[String] = sc.parallelize(List("order1,product1,1", "order1,product2,2", "order1,product3,4"))
    val product:RDD[String] = sc.parallelize(List("product1,10", "product2,30", "product3,87"))
    val productTuple:RDD[(String,String)] = product.map(x => {
      val strings = x.split(",")
      (strings(0), strings(1))
    })
    val orderTuple:RDD[(String,String)] = order.map(x => {
      val strings = x.split(",")
      (strings(1), x)
    })
    orderTuple
      .cogroup(productTuple)
      .mapValues(x => {
          x._1.map(xx => {
            val strings = xx.split(",")
            var rs = ""
            var price:Int = 0
            if(!x._2.isEmpty){
              price = x._2.head.toInt
              rs = xx.concat(s",${price.toInt},${price.toInt*strings(2).toInt}")
            }
            rs
          })
        }).values.foreach(println)
   sc.stop()

再看一下 join 的实现吧

// join 算子是 PairRDDFunction 的
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      // 笛卡尔积
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }

从上面的代码可以知道,是对 pair 进行笛卡尔积操作,而且前后都不为 Seq() ,也就是不为空。

再来看一下 fullOuterJoin 吧。

  def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
      : RDD[(K, (Option[V], Option[W]))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues {
      case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
      case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
      case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
    }
  }

从上面的代码里面,发现 fullOuterJoin 的实现和 join 实现差不多,但是多了对应左右列表为空的处理。这和 SQL 这的 full join 的语义是相同的。这个的 case 是 scala 里面的偏函数,在 scala 的源码中应用非常的广泛。

知道了 join 和 fullOuerJoin ,就再来看看 leftOuterJoin 和 rightOuterJoin :

// leftOuterJoin 也是 PairRDDFuntion 的函数
  def leftOuterJoin[W](
      other: RDD[(K, W)],
      partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues { pair =>
      if (pair._2.isEmpty) {
        pair._1.iterator.map(v => (v, None))
      } else {
        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
      }
    }
  }
// rightOuterJoin 也是 PairRDDFunction 的函数
  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
      : RDD[(K, (Option[V], W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues { pair =>
      if (pair._1.isEmpty) {
        pair._2.iterator.map(w => (None, w))
      } else {
        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
      }
    }
  }

从上面的代码展示来看看,[left | right]OuterJoin 和 SQL 中的 left join 和 right join 的语义也是相同的。
在这里可以得到一个结论,join、leftOuterJoin、rightOuterJoin、fullOuterJoin 的计算效率其实是相同的,都取决于 cogroup 的效率。

最后看一个 RDD 中的函数 intersection

def intersection(other: RDD[T]): RDD[T] = withScope {
    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
        .keys
  }

从上面的代码中可以看到,是去掉了左右没有关联到的数据。这和 SQL 里面的 inner join 的语义是一致的。其实是和 join 逻辑相似,但是并没有将左右边的元素进行笛卡尔积的计算。

在实现维表关联的场景下,还有一个重要的算子,就是 broadcast 算子。来看一个例子。

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val sc = new SparkContext(conf)
    //设置文件切分大小
    sc.hadoopConfiguration.setLong("fs.local.block.size",128*1024*1024)
    //数据合并,有大量的数据移动
    val productRDD: RDD[(String, String)] = sc.parallelize(List("product1;10"))
      .map { line =>
        val field = line.split(";")
        (field(0), line)
      }
    //广播变量
    val productBC: Broadcast[collection.Map[String, String]] = sc.broadcast(productRDD.collectAsMap())
    //map task 完成数据准备
    val orderInfoRDD: RDD[(String, String)] = sc.parallelize(List("order1;10;product1"))
      .map { line =>
        val field = line.split(";")
        (field(2), line)
      }
    //map 端的join
    val resultRDD: RDD[(String, (String, String))] = orderInfoRDD.map {
      case (pid, orderInfo) => {
        val product: collection.Map[String, String] = productBC.value
        (pid, (orderInfo, product.getOrElse(pid, null)))
      }
    }

    resultRDD.foreach(println)

    Thread.sleep(100)

    sc.stop()

广播是非常好的优化方式,他会将维表的一个副本复制到各个分区里面,然后就可以和进行拉宽做了。

控制类的算子

控制类算子是一类和优化相关的算子。
控制类算子
例如,当我们重复利用一些计算结果的时候,可以将中间的计算结果保存到缓存中。例如,我计算某个商品在某些城市的销售额,我们希望计算每天、每周、每月的销售额,我们希望计算这些,怎么计算呢?我们可以计算先计算出每天的销售额,然后在这个基础上计算出每周和每月的销售额。

    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init)
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
     val dimDataRDD:RDD[String] = sc.parallelize(List(
           "2024-02-29,1,02"
          ,"2024-03-01,2,03"
          ,"2024-03-01,2,03"
          ,"2024-03-02,2,03"
          ,"2024-03-03,2,03"
          ,"2024-03-04,2,03"
      ))
    val dimDateBroadcast = sc.broadcast(dimDataRDD.map(x => {
      val strings = x.split(",")
      (strings(0), x)
    }).collectAsMap())
    val orderRDD:RDD[String] = sc.parallelize(List(
        "2024-03-01,order1,product1,city1,10"
        , "2024-03-01,order1,product1,city3,11"
        , "2024-03-01,order1,product1,city4,11"
        , "2024-03-01,order1,product1,city5,34"
        , "2024-03-01,order1,product1,city3,13"
        , "2024-03-01,order1,product1,city3,33"
        , "2024-03-02,order1,product1,city3,19"
        , "2024-03-02,order1,product1,city4,13"
        , "2024-03-02,order1,product1,city5,34"
        , "2024-03-02,order1,product1,city3,19"
        , "2024-03-02,order1,product1,city1,38"
        , "2024-03-03,order1,product1,city5,34"
        , "2024-03-03,order1,product1,city3,19"
        , "2024-03-03,order1,product1,city1,38"
        , "2024-03-04,order1,product1,city5,34"
        , "2024-03-04,order1,product1,city3,19"
        , "2024-03-04,order1,product1,city1,38"
        , "2024-02-29,order1,product1,city1,38"
      , "2024-02-28,order1,product1,city1,12"
    ))
    val byDay = orderRDD.map(x => {
      val strings = x.split(",")
      ((strings(0), strings(3)), strings(4).toInt)
    }).reduceByKey((s: Int, x: Int) => {
      s + x
    })
    println("====by day city sum(amt)===")
    byDay.foreach(println)
    val byDayWithMonthAndWeek = byDay.map(x => {
      val dimss = dimDateBroadcast.value
      val rs = dimss.getOrElse(x._1._1, Nil) match {
        case Nil => ("", "")
        case str: String => {
          val strings = str.split(",")
          (strings(1), strings(2))
        }
      }
      //日期+城市 周 月
      (x._1, rs._1, rs._2, x._2)
    })
    byDayWithMonthAndWeek.cache()

    println("====by city , week  sum(amt)===")
    byDayWithMonthAndWeek
      .keyBy(x => (x._1._2 ,x._2))
      .combineByKey(
        (a:((String,String) , String , String , Int)) => a._4
        ,(s:Int , x:((String,String) , String , String , Int))=>{
         s + x._4
      },
        (x:Int , y:Int) => {
          x + y
        }
      ).foreach(println)
    println("====by city , month  sum(amt)===")
    byDayWithMonthAndWeek.keyBy(x => (x._1._2 ,x._3))
      .combineByKey(
        (a:((String,String) , String , String , Int)) => a._4
        ,(s:Int , x:((String,String) , String , String , Int))=>{
          s + x._4
        },
        (x:Int , y:Int) => {
          x + y
        }
      ).foreach(println)

上面的代码完成了最初的逻辑。我们来看看里面的执行过程。

总体的结果
从上面的图中可以看到,有四个 job 执行,从函数来看,

0 号代表广播流
1 号代表了输出的是天、城市粒度下销售额
2 号代表输出的是城市、周粒度下的销售额汇总
3 号代表了输出的是城市、月粒度下的销售额汇总

咱们一个一个点进去看看
0 号 job
1号 job

2号代表的task
3代表的task

从 2 和 3 中可以看到前面 Stage6 和 stage3 都是 skip 的,在 Stage7 和 Stage8 中的 map 是有一个绿色的标识的,此标识就是代表了使用缓存。如果讲 cache 去掉的话。在来看看效果。

在这里插入图片描述
在这里插入图片描述
从上面的图中,可以看到 Stage7 和 Stage4 中的 map 没有绿色的标识了,说明缓存已经没了。

下面一个是 persist(), 我们先一个 cache() 的源码:

  def cache(): this.type = persist()
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

从上面的代码,可以知道,cache 是 persist 实现的,而且 persist 里面可以设置不同的保存级别:

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

它起的名字还是见名知意的。所以它可以保存缓存到内存、磁盘、堆外,并且可以序列化。
其实这些都不够安全,最安全的办法就是缓存保存到 HDFS 中。这样就最保险了。也是就有 checkpoint()
spark 的 checkpoint 值是将中间结果缓存,达到中间数据重复使用的效果,和 Flink 对比,Flink 的 checkpint 本质是一种分布式事务,可以协调各个算子完成同一批数据处理,通过checkpoin 就能实现 exactly-once 的语义,但是 spark 就不能这样。

下一个重要的控制算子就是 Accumulator ,它是一个全局性的累计器。可以保存全局性的累计值。下面是是用 accumulator 实现的 workcount。

object TestAccumulate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test_accumulate").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val value:RDD[String] = sc.makeRDD(List("Hi,", "Hello", "ww", "hhh"))
    val sum:LongAccumulator = sc.longAccumulator("sum")
    val accu:MyAccumulator = new MyAccumulator()
    sc.register(accu , name = "accu")
    value.foreach(x => {
        accu.add(x)
        sum.add(1)
    })
    val value1: mutable.Map[String, Int] = accu.value
    println(s"size ${value1.size}")
    println(s"sum ${sum.value}")
    for( (key ,value) <- value1){
      println(s"key is ${key} , and value is ${value}")
    }
    sc.stop()
  }

}
class MyAccumulator extends AccumulatorV2[String , mutable.Map[String , Int]]{
  // 定义一个集合,来记录以 H 开头的字符串的个数
  var map:mutable.Map[String , Int] = mutable.Map[String , Int]()

  override def isZero: Boolean = true

  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
      val rs:MyAccumulator = new MyAccumulator()
      rs.map = this.map
      rs
  }

  override def reset(): Unit = {
    this.map.clear()
  }

  override def add(v: String): Unit = {
    if(v.startsWith("H")){
      this.map.put(v , map.getOrElse(v , 0) + 1)
    }
  }

  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
    val map1:mutable.Map[String , Int] = map
    val map2:mutable.Map[String , Int] = other.value
    map = map1.foldLeft(map2) {
      (map2, kv) => {
        map2(kv._1) = map2.getOrElse(kv._1, 0) + kv._2
        map2
      }
    }
  }

  override def value: mutable.Map[String, Int] = this.map
}

Accumulator 的计算逻辑和 combinerByKey 的逻辑十分的相似。Spark 还为我们预定义了三个累加器,longAccumulator、doubleAcculator、collectionAccumulator 三个,其实现方式也是继承了AccumulatorV2 类,然后,在 SparkContext 中注册就可以使用了。

coalesce 是将 RDD 中的分区重新划分分区,这个的作用可以处理数据倾斜的问题,其实数据倾斜的根源就是在于分区中有多有的少,我们可以使用 key 值的组合,然后重新分区达到各个分区数据量差不多的情况。

action 算子

action 算子是真正出发计算的算子,在 action 算子之前, 所有的算子就像流水线上的一个工序,按照我们想要的结果设置好了加工模具,action 才能够决一个 job 的开始。一个 Spark 任务中,可以有多个 job ,一个 job 里面可以多个 Stage 。

action 算子

其中,kafka 是后来使用 KafkaUtil 加入的 Kafka 的消费算子,其他都是 RDD 中自带的算子,这些算子中的共同特点是源码中都使用了 SparkContext#runJob()
下面来看看 foreach 函数:

  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

下面来说是 KafkaUtils 这是 Spark 扩展,它的功能是让 Spark 可以消费和生产 Kafka 里面的数据,这样 Spark 就能处理流式计算了。

数据源算子

数据源算子是设置数据源的算子。

在这里插入图片描述

textfile 是从文件系统中取出数据,可以是 disk 中,或者从 HDFS 中拉出来。
parallism 是可以从 List 消费数据,这个算子经常用来测试功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/441486.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

勒索软件事件手册:综合指南

近年来&#xff0c;勒索软件攻击的频率和复杂程度都急剧增加。这些攻击的影响可能是毁灭性的&#xff0c;从经济损失到严重的运营中断。 这就是为什么对于希望防范这种网络安全威胁的企业来说&#xff0c; 强大的勒索软件事件响应手册是不可谈判的。 本指南旨在深入了解勒索软…

【工作实践-07】uniapp关于单位rpx坑

问题&#xff1a;在浏览器页面退出登录按钮上“退出登录”字样消失&#xff0c;而在手机端页面正常;通过查看浏览器页面的HTML代码&#xff0c;发现有“退出登录”这几个字&#xff0c;只不过由于样式问题&#xff0c;这几个字被挤到看不见了。 样式代码中有一行为&#xff1a…

UI自动化测试使用场景及脚本录制

经常有人会问&#xff0c;什么样的项目才适合进行UI自动化测试呢&#xff1f;UI自动化测试相当于模拟手工测试&#xff0c;通过程序去操作页面上的控件。而在实际测试过程中&#xff0c;经常会遇到无法找到控件&#xff0c;或者因控件定义变更而带来的维护成本等问题。 哪些场…

设计高并发系统的关键策略

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 引言 一. 架构设计 1. 微服务架构 2. 分布式架构 3. 负…

VR全景技术在VR看房中有哪些应用,能带来哪些好处

引言&#xff1a; 随着科技的不断发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术在房地产行业中的应用也越来越广泛。其中&#xff0c;VR全景技术在VR看房中的运用尤为突出。今天&#xff0c;让我们一起深入探讨VR全景技术在VR看房中的应用及其带来的种种好处。 一、…

智慧灯杆-智慧城市照明现状分析(2)

作为城市照明的主体,城市道路照明伴随着我国城市建设的高速发展,获得了快速的增长。国家统计局数据显示,从2004年至2014年,我国城市道路照明灯数量由1053.15万盏增加到3000万盏以上,年均复合增长率超过11%,城市道路照明行业保持持续快速发展的趋势。 近几年,随着中国路灯…

钡铼技术R40工业路由器连接工业控制系统实现远程监控

钡铼技术的R40工业路由器是一款专为现代工业控制系统设计的高性能设备&#xff0c;它通过其先进的连接功能和丰富的接口&#xff0c;使得远程监控和管理成为可能。本文将从产品参数的角度出发&#xff0c;深入探讨R40工业路由器如何连接工业控制系统以实现远程监控。 1. R40工…

Windows 安装 Xinference

Windows 安装 Xinference 0. 引言1. 创建虚拟环境2. 安装 pytorch3. 安装 llama_cpp_python4. 安装 chatglm-cpp5. 安装 Xinference6. 设置 model 路径7. 启动 Xinference8. 查看 Cluster Information 0. 引言 Xorbits Inference&#xff08;Xinference&#xff09;是一个性能…

最新基于R语言lavaan结构方程模型(SEM)技术

原文链接&#xff1a;最新基于R语言lavaan结构方程模型&#xff08;SEM&#xff09;技术https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&mid2247596681&idx4&sn08753dd4d3e7bc492d750c0f06bba1b2&chksmfa823b6ecdf5b278ca0b94213391b5a222d1776743609cd3d14…

Leetcode3070. 元素和小于等于 k 的子矩阵的数目

Every day a Leetcode 题目来源&#xff1a;3070. 元素和小于等于 k 的子矩阵的数目 解法1&#xff1a;二维前缀和 二维前缀和的模板题。 代码&#xff1a; /** lc appleetcode.cn id3070 langcpp** [3070] 元素和小于等于 k 的子矩阵的数目*/// lc codestart// 二维前缀和…

Web3探索加密世界:什么是Web3钱包?

随着加密货币和区块链技术的发展&#xff0c;人们越来越多地开始探索Web3世界&#xff0c;这个世界以去中心化、安全和开放性为特征。在这个新兴的数字化领域中&#xff0c;Web3钱包成为了一个关键的概念和工具。但是&#xff0c;什么是Web3钱包&#xff1f;它有什么特点&#…

二、TensorFlow结构分析(3)

目录 1、张量 1.1 张量的类型 1.2 张量的阶 1.3 创建张量的指令 2、张量的变换 3、张量的数学运算 TF数据流图图与TensorBoard会话张量Tensor变量OP高级API 1、张量 1.1 张量的类型 1.2 张量的阶 def tensor_demo():# 张量的演示tensor1 tf.constant(4.0)tensor2 tf.co…

IPSEC VPN安全介绍以及相关实验

目录 一、IPSEC相关的安全服务 二、IPSEC的安全协议 三、实验 IPSEC一组协议集合&#xff0c;用于确保在IP网络上进行通信时的安全性和保密性。它提供了一种标准化的方法&#xff0c;用于对IP数据包进行加密、身份验证和完整性保护。IPSEC通常用于建立虚拟私人网络VPN连接&am…

课时58:流程控制_基础知识_流程基础

2.1.1 流程基础 学习目标 这一节&#xff0c;我们从 基础知识、简单实践、小结 三个方面来学习。 基础知识 编程逻辑 编程语言的目的是通过风格化的编程思路将代码写出来后&#xff0c;实现项目功能的。为了实现功能&#xff0c;我们通过在代码层面通过一些代码逻辑来实现…

Dubbo 和 Zookeeper 的关系

Dubbo 和 Zookeeper 的关系 Zookeeper的作用 zookeeper用来注册服务和进行负载均衡&#xff0c;哪一个服务由哪一个机器来提供必需让调用者知道&#xff0c;简 单来说就是ip地址和服务名称的对应关系。当然也可以通过硬编码的方式把这种对应关系在调用方 业务代码中实现&#…

20240308-1-校招前端面试常见问题CSS

校招前端面试常见问题【3】——CSS 1、盒模型 Q&#xff1a;请简述一下 CSS 盒模型&#xff1f; W3C 模式&#xff1a;盒子宽widthpaddingbordermargin 怪异模式&#xff1a;盒子宽widthmargin Q&#xff1a;inline、block、inline-block 元素的区别&#xff1f; inline&am…

图片编辑器tui-image-editor

提示&#xff1a;图片编辑器tui-image-editor 文章目录 前言一、安装tui-image-editor二、新建components/ImageEditor.vue三、修改App.vue四、效果五、遇到问题 this.getResolve is not a function总结 前言 需求&#xff1a;图片编辑器tui-image-editor 一、安装tui-image-ed…

Jmeter二次开发实现rsa加密

jmeter函数助手提供了大量的函数&#xff0c;像 counter、digest、random、split、strLen&#xff0c;这些函数在接口测试、性能测试中大量被使用&#xff0c;但是大家在实际工作&#xff0c;形形色色的测试需求不同&#xff0c;导致jmeter自带或者扩展插件给我们提供的函数无法…

分布式定时任务调度xxl-job

1. xxl-job基本介绍 1.1 Quartz的体系结构 Quartz中最重要的三个对象:Job&#xff08;作业&#xff09;、Trigger&#xff08;触发器&#xff09;、Scheduler&#xff08;调度器&#xff09;。 xxl-job的调度原理:调度线程在一个while循环中不断地获取一定数量的即将触发的Tr…

从数据处理到3D PDF生成:交互式3D PDF生成引擎HOOPS Publish的工作原理

在当今数字化时代&#xff0c;3D技术在各个行业中扮演着重要角色&#xff0c;从制造业到医疗保健&#xff0c;再到建筑设计。为了更好地共享、演示和交互展示3D模型数据&#xff0c;HOOPS Publish作为一款强大的3D引擎&#xff0c;专门用于生成交互式的3D PDF文件。本文将深入探…
最新文章