Spark(2)-基础tranform算子(一)

一、算子列表

编号名称
1map算子
2flatMap算子
3filter算子
4mapPartitions算子
5mapPartitionsWithIndex算子
6keys算子
7values算子
8mapValues算子
9flatMaplValues算子
10union算子
11reducedByKey算子
12combineByKey算子
13groupByKey算子
14foldByKey算子
15aggregateByKey算子
16ShuffledRDD算子
17distinct算子
18partitionBy算子

 二、代码示例

package sparkCore


import org.apache.hadoop.mapreduce.task.reduce.Shuffle
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext, TaskContext}

/**
 * spark基本算子
 */


object basi_transform_02 {
  def main(args: Array[String]): Unit = {


    val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    sc.setLogLevel("WARN")

    //1. map算子
    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7),2)
    val map_rdd: RDD[Int] = rdd1.map(_ * 2)
    println("*****1. map算子************")
    map_rdd.foreach(println(_))

    //2.flatMap算子
    println("*****2.flatMap算子************")
    val arr: Array[String] = Array(
      "Hive python spark",
      "Java Hello Word"
    )

    val rdd2: RDD[String] = sc.makeRDD(arr, 2)
    val flatMap_rdd: RDD[String] = rdd2.flatMap(_.split(" "))

    flatMap_rdd.foreach(println(_))

    //3.filter算子
    println("*****3.filter算子***********")
    val rdd3: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10))
    val filter_rdd :RDD[Int]= rdd3.filter(_ % 2 == 0)
    filter_rdd.foreach(println(_))

    //4. mapPartitions算子:将数据以分区的形式返回,进行map操作,一个分区对应一个迭代器
    // 应用场景: 比如在进行数据库操作时,在操作数据之前,需要通过JDBC方式连接数据库,如果使用map,那每条数据处理之前
    //         都需要连接一次数据库,效率显然很低.如果使用mapPartitions,则每个分区连接一次即可
    println("*****4. mapPartitions算子**********")
    val rdd4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10),2)
    val mapParition_rdd: RDD[Int] = rdd4.mapPartitions(iter => {
      print("模拟数据库连接操作")
      iter.map(_ * 2)
    })

    mapParition_rdd.foreach(println(_))


    //5. mapPartitionsWithIndex算子,类似于mapPartitions,不过有两个参数
    //  第一个参数是分区索引,第二个是对应的迭代器
    // 注意:函数返回的是一个迭代器
    println("*****5. mapPartitionsWithIndex算子**********")
    val rdd5: RDD[Int] = sc.parallelize(List(10, 20, 30, 40, 60),2)
    val mapPartitionWithIndex_Rdd: RDD[String] = rdd5.mapPartitionsWithIndex((index, it) => {
      it.map(e => s"partition:$index,val:$e")
    })

    mapPartitionWithIndex_Rdd.foreach(println(_))

    //6.keys算子: RDD中的数据是【对偶元组】类型,返回【对偶元组】的全部key
    println("*****6.keys算子**********")
    val lst: List[(String, Int)] = List(
      ("spark", 1), ("spark", 3), ("hive", 2),
      ("Java", 1), ("Scala", 3), ("Python", 2)
    )

    val rdd6: RDD[(String, Int)] = sc.parallelize(lst)
    val keysRdd: RDD[String] = rdd6.keys
    keysRdd.foreach(println(_))

    //7.values: RDD中的数据是【对偶元组】类型,返回【对偶元组】的全部value
    println("*****7.values算子**********")
    val values_RDD: RDD[Int] = rdd6.values
    values_RDD.foreach(println(_))

    //8.mapValues: RDD中的数据为对偶元组类型, 将value进行计算,然后与原Key进行组合返回(即返回的仍然是元组)
    println("*****8.mapValues算子**********")
    val lst2: List[(String, Int)] = List(
      ("Hello", 1), ("world", 2),
      ("I", 2), ("love", 3), ("you", 2)
    )

    val rdd8: RDD[(String, Int)] = sc.parallelize(lst2, 2)
    val mapValues_rdd: RDD[(String, Int)] = rdd8.mapValues(_ * 10)
    mapValues_rdd.foreach(println(_))

    //9.flatMaplValues:RDD是对偶元组,将value应用传入flatMap打平后,再与key组合
    println("*****9.flatMaplValues算子**********")
    // ("spark","1 2 3") => ("spark",1),("spark",2),("spark",3)
    val lst3: List[(String,String )] = List(
      ("Hello", "1 2 3"), ("world", "4 5 6"),
    )

    val rdd9: RDD[(String, String)] = sc.parallelize(lst3)
    // 第一个_是指初始元组中的value;第二个_是指value拆分后的每一个值(转换成整数)
    val flatMapValues: RDD[(String, Int)] = rdd9.flatMapValues(_.split(" ").map(_.toInt))
    flatMapValues.foreach(println(_))

    //10.union:将两个类型一样的RDD合并到一起,返回一个新的RDD,新的RDD分区数量是两个RDD分区数量之和
    println("*****10.union算子**********")
    val union_rdd1 = sc.parallelize(List(1, 2, 3), 2)
    val union_rdd2 = sc.parallelize(List(4, 5, 6), 3)
    val union_rdd: RDD[Int] = union_rdd1.union(union_rdd2)
    union_rdd.foreach(println(_))

    //11.reducedByKey,在每个分区中进行局部分组聚合,然后将每个分区聚合的结果从上游拉到下游再进行全局分组聚合
    println("*****11.reducedByKey算子**********")
    val lst4: List[(String, Int)] = List(
      ("spark", 1), ("spark", 1), ("hive", 3),
      ("Python", 1), ("Java", 1), ("Scala", 3),
      ("flink", 1), ("Mysql", 1), ("hive", 3)
    )

    val rdd11: RDD[(String, Int)] = sc.parallelize(lst4, 2)
    val reduced_rdd: RDD[(String, Int)] = rdd11.reduceByKey(_ + _)
    reduced_rdd.foreach(println(_))

    //12.combineByKey:相比reducedByKey更底层的方法,后者分区内和分区之间相同Key对应的value值计算逻辑相同,但是前者可以分别定义不同的
    //   的计算逻辑.combineByKey 需要传入三个函数作为参数:
    // 其中第一个函数:key在上游分区第一次出现时,对应的value该如何处理
    // 第二个函数:分区内相同key对应value的处理逻辑
    // 第三个函数: 分区间相同Key对应value的处理逻辑
    println("*****12.combineByKey算子**********")
    val f1 = (v:Int) => {
      val stage = TaskContext.get().stageId()
      val partition = TaskContext.getPartitionId()
      println(s"f1 function invoked in stage: $stage,partiton:$partition")
      v
    }


    //分区内相同key对应的value使用乘积
    val f2 = (a:Int,b:Int) => {
      val stage = TaskContext.get().stageId()
      val partition = TaskContext.getPartitionId()
      println(s"f2 function invoked in stage: $stage,partiton:$partition")
      a * b
    }

    //分区间相同key对应的value使用加法
    val f3 = (m:Int,n:Int) => {
      val stage = TaskContext.get().stageId()
      val partition = TaskContext.getPartitionId()
      println(s"f3 function invoked in stage: $stage,partiton:$partition")
      m + n
    }

    val rdd12: RDD[(String, Int)] = sc.parallelize(lst4,2)
    val combineByKey_rdd: RDD[(String, Int)] = rdd12.combineByKey(f1, f2, f3)
    combineByKey_rdd.foreach(println(_))

    //13.groupByKey:按key进行分组,返回的是(key,iter(value集合)
    println("*****13.groupByKey算子**********")
    val rdd13: RDD[(String, Int)] = sc.parallelize(lst4, 3)
    val groupByKey_rdd: RDD[(String, Iterable[Int])] = rdd13.groupByKey()
    groupByKey_rdd.foreach(println(_))

    //14.foldByKey:每个分区应⽤⼀次初始值,先在每个进⾏局部聚合,然后再全局聚合(注意全局聚合的时候,初始值并不会被用到)
    // 局部聚合的逻辑与全局聚合的逻辑相同
    println("*****14.foldByKey算子**********")
    val lst5: List[(String, Int)] = List(
      ("maple", 1), ("kelly", 1), ("Avery", 1),
      ("maple", 1), ("kelly", 1), ("Avery", 1)
    )


    val rdd14: RDD[(String, Int)] = sc.parallelize(lst5)
    val foldByKey_rdd: RDD[(String, Int)] = rdd14.foldByKey(1)(_ + _)
    foldByKey_rdd.foreach(println(_))

    //15.aggregateByKey:foldByKey,并且可以指定初始值,每个分区应⽤⼀次初始值,传⼊两个函数,分别是局部聚合的计算逻辑
    // 和全局聚合的逻辑
    println("*****15.aggregateByKey算子**********")
    val rdd15: RDD[(String, Int)] = sc.parallelize(lst5)
    val aggregateByKey_rdd: RDD[(String, Int)] = rdd15.aggregateByKey(1)(_ + _,_ * _ )
    aggregateByKey_rdd.foreach(print(_))

    //16 ShuffledRDD:reduceByKey、combineByKey、aggregateByKey、foldByKey底层都是使⽤的ShuffledRDD,
    // 并且 mapSideCombine = true
    println("*****16.ShuffledRDD算子**********")
    val rdd16: RDD[(String, Int)] = sc.parallelize(lst5,2)

    val partitioner = new HashPartitioner(rdd16.partitions.length)
    // 对rdd16按照指定分区器进行分区
    // String是rdd16中Key的数据类型,第一个Int是rdd16中value的数据类型,第二个Int是中间结果的数据类型(当然前提是传入聚合器-里面包含计算逻辑
    // [可以据此知晓中间结果的数据类型])
    val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](rdd16,partitioner)
    // 设置一个聚合器: 指定rdd16的计算逻辑(包含三个函数,分别是分区内一个key对应value的处理逻辑;分区内相同key对应value计算逻辑
    // 和分区间相同Key对应value计算逻辑)
    val aggregator: Aggregator[String, Int, Int] = new Aggregator[String, Int, Int](f1, f2, f3)
    // 给shuffledRDD设置聚合器
    shuffledRDD.setAggregator(aggregator)
    shuffledRDD.setMapSideCombine(true) // 设置Map端聚合
    println(shuffledRDD.collect().toList)

    // 17.distinct算子:对RDD元素进行去重
    println("*****17.distinct算子**********")
    val lst6: Array[String] = Array(
      "spark", "spark", "hive",
      "Python", "Python", "Java"
    )

    val rdd17: RDD[String] = sc.parallelize(lst6)
    val distinct_rdd: RDD[String] = rdd17.distinct()
    println(distinct_rdd.collect().toList)

    // 18.partitionBy: 按照指定的分区器进行分区(底层使用的是ShuffleRDD)
    println("***** 18.partitionBy算子**********")
    val rdd18: RDD[(String,Int)] = sc.parallelize(lst5,2)
    val partitioner2 = new HashPartitioner(rdd18.partitions.length)
    val partitioned_rdd: RDD[(String, Int)] = rdd18.partitionBy(partitioner2)
    println(partitioned_rdd.collect().toList)
    sc.stop()
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/426556.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络-网络安全(一)

1.网络安全威胁和漏洞类型: 窃听 假冒 重放 流量分析 破环完整 病毒 木马 诽谤 非授权访问 拒绝服务 漏洞:物理、软件、不兼容、其他等。 2.网络安全信息数据五大特征: 完整性&…

kettle下载及安装

JDK下载 安装kettle之前需要安装JDK JDK下载链接:JDK下载 配置环境变量: 新建系统变量:变量值为JDK安装路径 Path新增: kettle下载 链接地址:PDI(kettle) 点击下载 同意 Click here to a…

模拟集成电路设计:Bandgap电路设计及版图实现

模拟集成电路设计 Bandgap电路设计及版图实现 一、目的: 1、熟悉模拟集成电路设计的基本流程,实现Bandgap电路设计; 2、熟悉Linux系统及Cadence Virtuoso icfb设计、仿真软件的使用方法。 二、原理: 1、设计目标:…

Vmware esxi虚拟主机状态无效,无法注销重启等操作修复解决

问题 装有ESXI系统的服务器在强制关机启动后,显示虚拟机状态是无效的,并且无法进行任何操作。 解决办法 对出问题的虚拟机重新注册 1、开启esxi系统的ssh功能 2、取消注册出问题的虚拟机 找到问题的虚拟机 [rootlocalhost:~] vim-cmd vmsvc/getal…

基于JavaWeb实现的药店管理系统

一、系统架构 前端:jsp | layui | jquery | css 后端:spring | springmvn | mybatis 环境:jdk1.8 | mysql 二、代码及数据库 三、功能介绍 01. 登录 02. 首页 03. 药品管理 04. 销售管理-销售记录管理 05. 销售管理-退…

AI蠕虫病毒威胁升级,揭示AI安全新危机

一组研究人员成功研发出首个能够通过电子邮件客户端窃取数据、传播恶意软件以及向他人发送垃圾邮件的AI蠕虫,并在使用流行的大规模语言模型(LLMs)的测试环境中展示了其按设计功能运作的能力。基于他们的研究成果,研究人员向生成式…

Unreal触屏和鼠标控制旋转冲突问题

Unreal触屏和鼠标控制旋转冲突问题 鼠标控制摄像机旋转添加Input轴计算旋转角度通过轴事件控制旋转 问题和原因问题原因 解决办法增加触摸控制旋转代码触屏操作下屏蔽鼠标轴响应事件 鼠标控制摄像机旋转 通过Mouse X和Mouse Y控制摄像机旋转。 添加Input轴 计算旋转角度 通过…

Python推导式大全与实战:精通列表、字典、集合和生成器推导式【第115篇—python:推导式】

Python推导式大全与实战:精通列表、字典、集合和生成器推导式 Python语言以其简洁、优雅的语法而闻名,其中推导式是其独特之处之一。推导式是一种在一行代码中构建数据结构的强大方式,它涵盖了列表、字典、集合和生成器。本篇博客将全面介绍…

Python实现BIAS工具判断信号:股票技术分析的工具系列(4)

Python实现BIAS工具判断信号:股票技术分析的工具系列(4) 介绍算法解释 代码rolling函数介绍完整代码data代码BIAS.py 介绍 在股票技术分析中,BIAS(乖离率)是一种常用的技术指标,用于判断股票价…

sparse transformer 常见稀疏注意力

参考: https://zhuanlan.zhihu.com/p/259591644 主要就是降低transformer自注意力模块的复杂度 复杂度主要就是 Q K^T影响的,稀疏注意力就是在Q点乘K的转置这模块做文章 下列式一些sparse transformer稀疏注意力方法 a、transformer原始的 &#xff0…

文献阅读:The Unreasonable Effectiveness of Easy Training Data for Hard Tasks

文献阅读:The Unreasonable Effectiveness of Easy Training Data for Hard Tasks 1. 文章简介2. 方法介绍 1. 数据集难易度分析2. 模型训练前后变化 3. 实验考察 & 结论 1. 实验设计 1. 使用数据集2. 使用模型 2. 实验结果 1. 数据集难度分析2. 在Easy数据集下…

Excel MATCH函数 两张顺序不同表格,统一排序

目录 一. 背景二. 添加辅助列,使用MATCH函数生成排序条件三. 效果 一. 背景 有如下图所示的两张表格,分别记录着同一批人的1月份和2月份的工资。表格A和表格B中的姓名列相同,工资列数据不同现在要求参考表格A中的姓名列对表格B中的数据进行排序&#xf…

2024.3.1

1.TCP机械臂测试 代码&#xff1a; #include <myhead.h>#define SER_IP "192.168.43.185" //服务器ip #define SER_PORT 8888 //服务器端口号#define CLI_IP "192.168.153.128" //客户端IP #define CLI_PORT 9999 //客户端端口号…

使用AC自动机实现敏感词过滤(java)

主要分成2部分 trie树的构建&#xff08;前缀树&#xff0c;字典树&#xff09;fail指针的构建 1. trie 树 同一层级不会有重复的字符敏感词的最后一个字符会标记&#xff0c;并携带敏感词的长度 2. fail 指针的构建 fail 指针是指在某个分支匹配失败后&#xff0c;重新…

碰撞的小球(Colliding balls)

效果如下&#xff1a; 代码: #include <bits/stdc.h> #include <graphics.h>//必须库 #include <time.h> using namespace std; int main() {initgraph(650,400);//背景图大小circle(100,100,40);fillcircle(200,200,10);//球的数据srand(time(NULL));int …

Leetcoder Day37| 动态规划part04 背包问题

01背包理论基础 面试掌握01背包&#xff0c;完全背包和重背包就够用了。 背包问题的理论基础重中之重是01背包&#xff0c;一定要理解透&#xff01; 01 背包 有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品…

[Redis]——Redis命令手册set、list、sortedset

&#x1f333;List类型常见命令 LPUSH / RPUSH [KEY] [element] …… 向列表左侧或者右侧插入一个或多个元素 LPOP / RPOP [key] 删除左边或者右边第一个元素 LRANGE [key] start end 返回索引start到end的元素&#xff08;索引从0开始&#xff09; BLPOP / BRPOP [key] [等…

Flink 定义 Temporal Table 的两种方式:Temporal Table DDL 和 Temporal Table Function

博主历时三年精心创作的《大数据平台架构与原型实现&#xff1a;数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行&#xff0c;点击《重磅推荐&#xff1a;建大数据平台太难了&#xff01;给我发个工程原型吧&#xff01;》了解图书详情&#xff0c;…

小程序环形进度条爬坑

在做微信小程序的时候&#xff0c;发现用canvas做的环形进度条&#xff0c;在带滚动条的view里面显示有闪动、显示不全的问题&#xff0c;后面改成echart-weixin的pie图实现了&#xff0c;option配置如下 // 表示进度的百分比 var progressValue 70;option {series: [{type: …

GC机制以及Golang的GC机制详解

要了解Golang的GC机制,就需要了解什么事GC,以及GC有哪几种实现方式 一.什么是GC 当一个电脑上的动态内存不再需要时&#xff0c;就应该予以释放&#xff0c;以让出内存&#xff0c;这种内存资源管理&#xff0c;称为垃圾回收&#xff08;Garbage Collection&#xff09;&#x…
最新文章