【Spark】RDD转换算子

目录

map

mapPartitions

mapPartitionsWithIndex

flatMap

glom

groupBy

shuffle

filter

sample

distinct

coalesce

repartition

sortBy

ByKey

intersection

union

subtract

zip

partitionBy

reduceByKey

groupByKey

reduceByKey 和 groupByKey 的区别

aggregateByKey

foldByKey

combineByKey

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别

join

leftOuterJoin

cogroup


完成永远比完美更重要

Value类型

map

def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据 逐条 进行映射转换(A => B),这里的转换可以是类型的转换,也可以是值的转换。
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
 num => {
    num * 2
 }
)
val dataRDD2: RDD[String] = dataRDD1.map(
 num => {
     "" + num
 }
)
实例
从服务器日志数据 apache.log 中获取用户请求 URL 资源路径
apache.log
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Regular.ttf
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png
83.149.9.216 - - 17/05/2015:10:05:46 +0000 GET /presentations/logstash-monitorama-2013/images/Dreamhost_logo.svg
83.149.9.216 - - 17/05/2015:10:05:11 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard2.png
83.149.9.216 - - 17/05/2015:10:05:19 +0000 GET /presentations/logstash-monitorama-2013/images/apache-icon.gif
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/nagios-sms5.png
83.149.9.216 - - 17/05/2015:10:05:00 +0000 GET /presentations/logstash-monitorama-2013/images/redis.png
83.149.9.216 - - 17/05/2015:10:05:25 +0000 GET /presentations/logstash-monitorama-2013/images/elasticsearch.png
83.149.9.216 - - 17/05/2015:10:05:59 +0000 GET /presentations/logstash-monitorama-2013/images/logstashbook.png
83.149.9.216 - - 17/05/2015:10:05:30 +0000 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 - - 17/05/2015:10:05:53 +0000 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/1983_delorean_dmc-12-pic-38289.jpeg
83.149.9.216 - - 17/05/2015:10:05:54 +0000 GET /presentations/logstash-monitorama-2013/images/simple-inputs-filters-outputs.jpg
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/tiered-outputs-to-inputs.jpg
83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
24.236.252.67 - - 17/05/2015:10:05:40 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /articles/dynamic-dns-with-dhcp/
93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
93.114.45.13 - - 17/05/2015:10:05:45 +0000 GET /style2.css
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:17 +0000 GET /images/jordan-80.png
93.114.45.13 - - 17/05/2015:10:05:21 +0000 GET /images/web/2009/banner.png
package com.qihang.bigdata.spark.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.textFile("datas/apache.log")

    //长String => 短String
    val mapRDD: RDD[String] = rdd.map(
      line => {
        val data = line.split(" ")
        data(6)
      }
    )

    mapRDD.collect().foreach(println)

    sc.stop()

  }
}
 1. rdd的计算一个分区内的数据是一个一个执行逻辑
    只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
    分区内数据的执行是有序的。
 2. 不同分区数据计算是无序的。

mapPartitions

def mapPartitions[U: ClassTag](
 f: Iterator[T] => Iterator[U],
 preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据 以分区为单位 发送到计算节点进行处理,这里的处理是指可以进行任意的处
理,哪怕是过滤数据。(将一个分区的迭代器传入 => 处理后整个分区的迭代器)
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
 datas => {
     datas.filter(_==2)
 }
)
实例
获取每个数据分区的最大值
package com.qihang.bigdata.spark.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    val mpRDD = rdd.mapPartitions(
      iter => {
        List(iter.max).iterator // 需要返回迭代器,用List()封装
      }
    )

    mpRDD.collect().foreach(println(_))

    sc.stop()

  }
}
可以以分区为单位进行数据转换操作
但是会将整个分区的数据加载到内存进行引用
如果处理完的数据是不会被释放掉,存在对象的引用。
在内存较小,数据量较大的场合下,容易出现内存溢出。
map mapPartitions 的区别
数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子
是以分区为单位进行批处理操作。
功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。
MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,
所以可以增加或减少数据
性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处
理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能
不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

mapPartitionsWithIndex

def mapPartitionsWithIndex[U: ClassTag](
 f: (Int, Iterator[T]) => Iterator[U],
 preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处
理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
val dataRDD1 = dataRDD.mapPartitionsWithIndex(
 (index, datas) => {
     datas.map(index, _)
 }
)

实例

获取第二个分区的数据

package com.qihang.bigdata.spark.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    val mpiRDD = rdd.mapPartitionsWithIndex(
      (index, iter) => {
        if (index == 1) { //0,1 从零开始,index==1为第二个分区
          iter
        } else {
          Nil.iterator  // 
        }
      }
    )

    mpiRDD.collect().foreach(println(_))

    sc.stop()

  }
}

将数据和所在分区合并成元组

        val rdd = sc.makeRDD(List(1,2,3,4))

        val mpiRDD = rdd.mapPartitionsWithIndex(
            (index, iter) => {
                // 1,   2,    3,   4
                //(0,1)(2,2),(4,3),(6,4)
                iter.map(
                    num => {
                        (index, num)
                    }
                )
            }
        )

flatMap

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
 
val dataRDD = sparkContext.makeRDD(List(
 List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
 list => list
)

glom

def glom(): RDD[Array[T]]
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // List => Int
    // Int => Array
    val glomRDD: RDD[Array[Int]] = rdd.glom()

    glomRDD.collect().foreach(data => println(data.mkString(",")))

案例

计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

        // 【1,2】,【3,4】
        // 【2】,【4】
        // 【6】
        val glomRDD: RDD[Array[Int]] = rdd.glom()

        val maxRDD: RDD[Int] = glomRDD.map(
            array => {
                array.max
            }
        )
        println(maxRDD.collect().sum)

groupBy

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(
 _%2
)

shuffle

将数据根据指定的规则进行分组 , 分区默认不变,但是数据会被 打乱重新组合 ,我们将这样
的操作称之为 shuffle 。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组,多个组可以放在一个分区里。
所以分区数和分组数无关。 

实例

List("Hello", "hive", "hbase", "Hadoop") 根据单词首写字母进行分组。
        val rdd  = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)

        // 分组和分区没有必然的关系
        val groupRDD = rdd.groupBy(_.charAt(0))

        groupRDD.collect().foreach(println)

filter

def filter(f: T => Boolean): RDD[T]
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出
数据倾斜
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4
),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)

实例

从服务器日志数据 apache.log 中获取 2015 5 17 日的请求路径
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Regular.ttf
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png
83.149.9.216 - - 17/05/2015:10:05:46 +0000 GET /presentations/logstash-monitorama-2013/images/Dreamhost_logo.svg
83.149.9.216 - - 17/05/2015:10:05:11 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard2.png
83.149.9.216 - - 17/05/2015:10:05:19 +0000 GET /presentations/logstash-monitorama-2013/images/apache-icon.gif
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/nagios-sms5.png
83.149.9.216 - - 17/05/2015:10:05:00 +0000 GET /presentations/logstash-monitorama-2013/images/redis.png
83.149.9.216 - - 17/05/2015:10:05:25 +0000 GET /presentations/logstash-monitorama-2013/images/elasticsearch.png
83.149.9.216 - - 17/05/2015:10:05:59 +0000 GET /presentations/logstash-monitorama-2013/images/logstashbook.png
83.149.9.216 - - 17/05/2015:10:05:30 +0000 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 - - 17/05/2015:10:05:53 +0000 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/1983_delorean_dmc-12-pic-38289.jpeg
83.149.9.216 - - 17/05/2015:10:05:54 +0000 GET /presentations/logstash-monitorama-2013/images/simple-inputs-filters-outputs.jpg
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/tiered-outputs-to-inputs.jpg
83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
24.236.252.67 - - 17/05/2015:10:05:40 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /articles/dynamic-dns-with-dhcp/
93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
93.114.45.13 - - 17/05/2015:10:05:45 +0000 GET /style2.css
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:17 +0000 GET /images/jordan-80.png
93.114.45.13 - - 17/05/2015:10:05:21 +0000 GET /images/web/2009/banner.png
        val rdd = sc.textFile("datas/apache.log")

        val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
            line => {
                val datas = line.split(" ")
                val time = datas(3)
                //time.substring(0, )
                val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
                val date: Date = sdf.parse(time)
                val sdf1 = new SimpleDateFormat("HH")
                val hour: String = sdf1.format(date)
                (hour, 1)
            }
        ).groupBy(_._1)
        timeRDD.map{
            case ( hour, iter ) => {
                (hour, iter.size)
            }
        }.collect.foreach(println)

sample

def sample(
withReplacement: Boolean,
fraction:Double,
seed: Long = Utils.random.nextLong): RDD[T]
根据指定的规则从数据集中 抽取 数据
  // sample算子需要传递三个参数
        // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
        // 2. 第二个参数表示,
        //         如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
        //         如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
        // 3. 第三个参数表示,抽取数据时随机算法的种子
        //                    如果不传递第三个参数,那么使用的是当前系统时间
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4
),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)

防止数据倾斜。

distinct

def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
将数据集中重复的数据去重
//源码 
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
        // (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
        // (1, null)(1, null)(1, null)
        // (null, null) => null
        // (1, null) => 1
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()

coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false,
 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
 (implicit ord: Ordering[T] = null)
 : RDD[T]
根据数据量 缩减分区 ,用于大数据集过滤后,提高小数据集的执行效率
spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少
分区的个数,减小任务调度
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)

扩大分区
 // coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
        // 所以如果想要实现扩大分区的效果,需要使用shuffle操作
        // spark提供了一个简化的操作
        // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
        // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
        //val newRDD: RDD[Int] = rdd.coalesce(3, true)

repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true 。无论是将分区数多的
RDD 转换为分区数少的 RDD ,还是将分区数少的 RDD 转换为分区数多的 RDD repartition
操作都可以完成,因为无论如何都会经 shuffle 过程。
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)

sortBy

def sortBy[K](
 f: (T) => K,
ascending: Boolean = true,
 numPartitions: Int = this.partitions.length)
 (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理
的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一
致。中间存在 shuffle 的过程。
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)

ByKey

双Value类型

intersection

def intersection(other: RDD[T]): RDD[T]

union

def union(other: RDD[T]): RDD[T]

subtract

def subtract(other: RDD[T]): RDD[T]

zip

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
        // Can't zip RDDs with unequal numbers of partitions: List(2, 4)
        // 两个数据源要求分区数量要保持一致
        // Can only zip RDDs with same number of elements in each partition
        // 两个数据源要求分区中数据数量保持一致
        val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),2)
        val rdd2 = sc.makeRDD(List(3,4,5,6),2)

        val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
        println(rdd6.collect().mkString(","))
        // 交集,并集和差集要求两个数据源数据类型保持一致
        // 拉链操作两个数据源的类型可以不一致

        val rdd1 = sc.makeRDD(List(1,2,3,4))
        val rdd2 = sc.makeRDD(List(3,4,5,6))
        val rdd7 = sc.makeRDD(List("3","4","5","6"))

        // 交集 : 【3,4】
        val rdd3: RDD[Int] = rdd1.intersection(rdd2)
        //val rdd8 = rdd1.intersection(rdd7)
        println(rdd3.collect().mkString(","))

        // 并集 : 【1,2,3,4,3,4,5,6】
        val rdd4: RDD[Int] = rdd1.union(rdd2)
        println(rdd4.collect().mkString(","))

        // 差集 : 【1,2】
        val rdd5: RDD[Int] = rdd1.subtract(rdd2)
        println(rdd5.collect().mkString(","))

        // 拉链 : 【1-3,2-4,3-5,4-6】
        val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
        val rdd8 = rdd1.zip(rdd7)
        println(rdd6.collect().mkString(","))

Key-Value类型

partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]
将数据按照指定 Partitioner 重新进行分区。 Spark 默认的分区器是 HashPartitioner
        val rdd = sc.makeRDD(List(1,2,3,4),2)

        val mapRDD:RDD[(Int, Int)] = rdd.map((_,1))
        // RDD => PairRDDFunctions
        // 隐式转换(二次编译)

        // partitionBy根据指定的分区规则对数据进行重分区
import org.apache.spark.HashPartitioner
        val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
        newRDD.partitionBy(new HashPartitioner(2))

        newRDD.saveAsTextFile("output")

reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
可以将数据按照相同的 Key Value 进行聚合
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
reduceByKey分区内和分区间计算规则相同。

groupByKey

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K,
将数据源的数据根据 key value 进行分组
val dataRDD1 =
 sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

reduceByKey groupByKey 的区别

shuffle 的角度 reduceByKey groupByKey 都存在 shuffle 的操作,但是 reduceByKey
可以在 shuffle 前对分区内相同 key 的数据进行预聚合( combine )功能,这样会减少落盘的
数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题, reduceByKey 性能比较
高。
从功能的角度 reduceByKey 其实包含分组和聚合的功能。 GroupByKey 只能分组,不能聚
合,所以在分组聚合的场合下,推荐使用 reduceByKey ,如果仅仅是分组而不需要聚合。那
么还是只能使用 groupByKey

aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
 combOp: (U, U) => U): RDD[(K, U)]
将数据根据 不同的规则 进行分区内计算和分区间计算
val dataRDD1 =
 sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 =
 dataRDD1.aggregateByKey(0)(_+_,_+_)

实例

取出每个分区内 相同 key 的最大值然后分区间相加
// TODO : 取出每个分区内相同 key 的最大值然后分区间相加
// aggregateByKey 算子是函数柯里化,存在两个参数列表
// 1. 第一个参数列表中的参数表示初始值
// 2. 第二个参数列表中含有两个参数
// 2.1 第一个参数表示分区内的计算规则
// 2.2 第二个参数表示分区间的计算规则
val rdd =
 sc.makeRDD(List(
 ("a",1),("a",2),("c",3),
 ("b",4),("c",5),("c",6)
 ),2)
// 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
// => (a,10)(b,10)(c,20)
// 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
val resultRDD =
 rdd.aggregateByKey(10)(
 (x, y) => math.max(x,y),
 (x, y) => x + y
 )
resultRDD.collect().foreach(println)

foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

combineByKey

def combineByKey[C](
 createCombiner: V => C,
 mergeValue: (C, V) => C,
 mergeCombiners: (C, C) => C): RDD[(K, C)]
最通用的对 key-value rdd 进行聚集操作的聚集函数( aggregation function )。类似于
aggregate() combineByKey() 允许用户返回值的类型与输入不一致。
实例
将数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
求每个 key 的平 均值
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), 
("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
 (_, 1),
 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

reduceByKeyfoldByKeyaggregateByKeycombineByKey 的区别

reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相 同
AggregateByKey :相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规
则可以不相同
CombineByKey: 当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区
内和分区间计算规则不相同。
/*
        reduceByKey:

             combineByKeyWithClassTag[V](
                 (v: V) => v, // 第一个值不会参与计算
                 func, // 分区内计算规则
                 func, // 分区间计算规则
                 )

        aggregateByKey :

            combineByKeyWithClassTag[U](
                (v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
                cleanedSeqOp, // 分区内计算规则
                combOp,       // 分区间计算规则
                )

        foldByKey:

            combineByKeyWithClassTag[V](
                (v: V) => cleanedFunc(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
                cleanedFunc,  // 分区内计算规则
                cleanedFunc,  // 分区间计算规则
                )

        combineByKey :

            combineByKeyWithClassTag(
                createCombiner,  // 相同key的第一条数据进行的处理函数
                mergeValue,      // 表示分区内数据的处理函数
                mergeCombiners,  // 表示分区间数据的处理函数
                )

         */

join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
在类型为 (K,V) (K,W) RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的
(K,(V,W)) RDD
存在笛卡儿积
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)

leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
类似于 SQL 语句的左外连接
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

cogroup

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))
在类型为 (K,V) (K,W) RDD 上调用,返回一个 (K,(Iterable<V>,Iterable<W>)) 类型的 RDD
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2) , ("c", 3)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6), ("c", 7)
    ))

    // cogroup : connect + group (分组,连接)
    val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)

    cgRDD.collect().foreach(println)



(a,(CompactBuffer(1),CompactBuffer(4)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(3),CompactBuffer(6, 7)))

案例

package com.qihang.bigdata.spark.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
    val sc = new SparkContext(sparkConf)

    val dataRDD = sc.textFile("datas/agent.log")



    val mapRDD = dataRDD.map(
      line => {
        val datas = line.split(" ")
        ((datas(1), datas(4)), 1)
      }
    )

    val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)
    //如果map中的数据有特定的格式, 可以用模式匹配简化
    //case 特定格式
    //https://www.cnblogs.com/JYB2021/p/16199184.html
    val newMapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
      case ((prv, ad), sum) =>
        (prv, (ad, sum))
    }

    val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()

    //只对 value 进行迭代 用mapValues
    val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
      }
    )

    resultRDD.collect().foreach(println)

    sc.stop()

  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/35670.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C#学习之路-常量

C# 常量 常量是固定值&#xff0c;程序执行期间不会改变。常量可以是任何基本数据类型&#xff0c;比如整数常量、浮点常量、字符常量或者字符串常量&#xff0c;还有枚举常量。 常量可以被当作常规的变量&#xff0c;只是它们的值在定义后不能被修改。 整数常量 整数常量可…

跨境平台做测评、采退、Lu卡、lu货要怎么做安全?

大家好&#xff0c;我是珑哥测评&#xff0c;今天和大家聊聊比较小众的圈子&#xff0c;也就是测评衍生出来的分支&#xff0c;采购和退款。因为最近也有很多客户咨询这个问题&#xff0c;由于沃尔玛风控升级了&#xff0c;很多客户下不成功的问题。 大家都知道无论是做测评还是…

从源码全面解析 dubbo 服务端服务调用的来龙去脉

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱敲代码的小黄&#xff0c;独角兽企业的Java开发工程师&#xff0c;CSDN博客专家&#xff0c;阿里云专家博主&#x1f4d5;系列专栏&#xff1a;Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码…

Docker自学记录笔记

安装联系Docker命令 1. 搜索镜像 docker search nagin 2. 下载镜像 3. 启动nginx 强调文本 强调文本 加粗文本 加粗文本 标记文本 删除文本 引用文本 H2O is是液体。 210 运算结果是 1024. 插入链接与图片 链接: link. 图片: 带尺寸的图片: 居中的图片: 居中并…

OpenCV的remap实现图像垂直翻转

以下是完整的代码: #include <opencv2/highgui/highgui.hpp> #include <opencv2/imgproc/imgproc.hpp> #include <iostream>int main() {

Redis10大性能优化点(上)

1.Redis真的变慢了吗&#xff1f; 对 Redis 进行基准性能测试 例如&#xff0c;我的机器配置比较低&#xff0c;当延迟为 2ms 时&#xff0c;我就认为 Redis 变慢了&#xff0c;但是如果你的硬件配置比较高&#xff0c;那么在你的运行环境下&#xff0c;可能延迟是 0.5ms 时就…

flutter:实现一个简单的appBar上的搜索框、一个简单的搜索历史

搜索框 效果图 代码 import package:flutter/material.dart;class NovelSearch extends StatefulWidget {overrideState<StatefulWidget> createState() > _NovelSearchState(); }class _NovelSearchState extends State<NovelSearch> {String searchVal ;o…

Debian 12 静态IP / 固定IP的设置

环境&#xff1a;Debian 12 amd64-lxde 局域网&#xff1a;PT925E电信光猫 手机APP 网络管家 一般用动态IP就可以了&#xff0c;但如果软件环境比较小众&#xff0c;问题就随之而来。起始问题&#xff1a;路由器无法解析设备名和IP&#xff0c;网络管家也不让设置固定IP&…

基于深度学习的高精度水果检测识别系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度水果&#xff08;苹果、香蕉、葡萄、橘子、菠萝和西瓜&#xff09;检测识别系统可用于日常生活中或野外来检测与定位水果目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的水果目标检测识别&#xff0c;另外支持结果可视…

Python教程(2)——开发python常用的IDE

为什么需要IDE 在理解IDE之前&#xff0c;我们先做以下的实验&#xff0c;新建一个文件&#xff0c;输入以下代码 total_sum 0 for x in range(1,101):total_sum x print(total_sum)非常非常简单的一个程序&#xff0c;主要就是计算1加到100的值&#xff0c;我们将它重命名…

SpringBoot第19讲:SpringBoot 如何保证接口幂等

SpringBoot第19讲&#xff1a;SpringBoot 如何保证接口幂等 在以SpringBoot开发Restful接口时&#xff0c;如何防止接口的重复提交呢&#xff1f; 本文是SpringBoot第19讲&#xff0c;主要介绍接口幂等相关的知识点&#xff0c;并实践常见基于Token实现接口幂等。 文章目录 Spr…

动态规划之343 整数拆分(第6道)

题目&#xff1a; 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得的最大乘积 。 示例&#xff1a; 解法&#xff1a; 其实可以从1开始遍历 j &#xff0c;然后有两种…

通用分页【下】(将分页封装成标签)

目录 一、debug调试 1、什么是debug调试&#xff1f; 2、debug调试步骤 3、实践 二、分页的核心 三、优化 分页工具类 编写servlet jsp代码页面&#xff1a; 分页工具类PageBean完整代码 四、分页标签 jsp代码 编写标签 tld文件 助手类 改写servlet 解析&…

CTFshow-pwn入门-栈溢出pwn49(静态链接pwn-mprotect函数的应用)

pwn49 首先我们先将pwn文件下载下来&#xff0c;然后赋上可执行权限&#xff0c;再来查看pwn文件的保护信息。 chomd x pwn checksec pwn file pwn我们可以看到这是一个32位的pwn文件&#xff0c;并且保护信息开启了NX和canary&#xff0c;也就是堆栈不可执行且有canary。最最…

html案例2

效果 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width, initia…

redis如何实现持久化

RDB快照 RDB是一种快照存储持久化方式&#xff0c;具体就是将Redis某一时刻的内存数据保存到硬盘的文件当中&#xff0c;默认保存的文件名为dump.rdb&#xff0c;而在Redis服务器启动时&#xff0c;会重新加载dump.rdb文件的数据到内存当中恢复数据。 开启RDB持久化方式 开启…

缓存更新策略,先更新数据库还是缓存呢?

学了这么多&#xff0c;相信大家对缓存更新的策略都已经有了清晰的认识。最后稍稍总结一下。 缓存更新的策略主要分为三种&#xff1a; Cache aside Cache aside Cache aside也就是旁路缓存&#xff0c;是比较常用的缓存策略。 &#xff08;1&#xff09;读请求常见流程 应…

亿级日活业务稳如磐石 华为云发布性能测试服务CodeArts PerfTest

HDC期间可参与华为云PaaS生态抽奖活动&#xff0c;活动链接在文末 计算机软件作为人类逻辑智慧的伟大结晶之一&#xff0c;已经渗透到了人类社会的各个角落。早期的计算机发展对硬件有很强的依赖性&#xff0c;只有少数的个人或者机构才能拥有软件这种“奢侈品”。但随着软件行…

java 并发 随笔7 ThreadLocal源码走读

0. 刚刚见了下老朋友&#xff0c;桌球撞起来的感觉很爽 可以看到 Thread 是内部是维护了局部变量的(thread-local-map) 1. 源码走读 很多的细节都在代码块中备注了 package java.lang;// 现在回来起来&#xff0c;很多经验不太丰富的人之所以在接触、学习java.lang.thread的…

TiDB(1):TiDB简介

1 从MySQL到TiDB 1.1 场景引入 假设现在有一个高速发展的互联网公司,核心业务库MySQL的数据量已经近亿行,且还在不断增长中,公司对于数据资产较为重视,所有数据要求多副本保存至少5年,且除了有对历史数据进行统计分析的离线报表业务外,还有一些针对用户数据实时查询的需求,如用…