PySpark(四)PySpark SQL、Catalyst优化器、Spark SQL的执行流程、Spark新特性

目录

PySpark SQL

基础

SparkSession对象

DataFrame入门

 DataFrame构建

DataFrame代码风格

 DSL

SQL

SparkSQL Shuffle 分区数目

 DataFrame数据写出

Spark UDF

Catalyst优化器 

Spark SQL的执行流程

Spark新特性

自适应查询(SparkSQL)

动态合并

动态调整Join策略

动态优化倾斜Join 

动态分区裁剪(SparkSQL)


PySpark SQL

基础

PySpark SQL与Hive的异同

Hive和Spark 均是:“分布式SQL计算引擎”
均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。
目前,企业中使用Hive仍旧居多,但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级

这里的重点是:Spark SQL能支持SQL和其他代码混合执行,自由度更高,且其是内存计算,更快。但是其没有元数据管理,然而它最终还是会作用到Hive层面,可以调用Hive的Metasotre

SparkSQL的基本对象是DataFrame,其特点及与其他对象的区别为: 

 SparkSQL 其实有3类数据抽象对象

  • SchemaRDD对象 (已废弃)
  • DataSet对象: 可用于Java、Scala语言
  • DataFrame对象:可用于Java、Scala、Python、R

SparkSession对象

 在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象
SparkSession对象可以:
-用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext

DataFrame入门

DataFrame的组成如下
在结构层面
StructType对象描述整个DataFrame的表结构

StructField对象描述一个列的信息
在数据层面
Row对象记录一行数据
Column对象记录一列数据并包含列的信息

 DataFrame构建

1、用RDD进行构建

rdd的结构要求为:[[xx,xx],[xx,xx]]

spark.createDataFrame(rdd,schema=[])

    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    rdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
    print(rdd.collect())
    # [['Michael', 29], ['Andy', 30], ['Justin', 19]]
    df = spark.createDataFrame(rdd,schema=['name','age'])
    df.printSchema()#打印表结构
    df.show()#打印表
#     root
#     | -- name: string(nullable=true)
#     | -- age: long(nullable=true)
# 
# +-------+---+
# | name | age |
# +-------+---+
# | Michael | 29 |
# | Andy | 30 |
# | Justin | 19 |
# +-------+---+

2、利用StructType进行创建

需要先引入StructType,StringType,IntegerType等构建schema

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
if __name__ == '__main__':
    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    rdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
#构建schema    
schema =StructType().add("name",StringType(),nullable=False).\
        add('age',IntegerType(),nullable=True)
    df = spark.createDataFrame(rdd,schema=schema)
    df.printSchema()
    df.show()

3、toDF将rdd转换为df

下面展示了两种方式

    # 只设定列名,列的数据结构则是内部自己判断
    df = rdd.toDF(['name','age'])
    df.printSchema()
    # root
    # | -- name: string(nullable=true)
    # | -- age: long(nullable=true)
    # 设定列名和数据类型
    schema =StructType().add("name",StringType(),nullable=False).\
        add('age',IntegerType(),nullable=True)
    df = rdd.toDF(schema=schema)
    df.printSchema()
    # root
    # | -- name: string(nullable=false)
    # | -- age: integer(nullable=true)

4、基于pandas构建 

    dfp = pd.DataFrame({
        "id":[1,2,3],
        'score':[99,98,100]
    })
    df = spark.createDataFrame(dfp)
    df.printSchema()
    df.show()
    # root
    # | -- id: long(nullable=true)
    # | -- score: long(nullable=true)
    # 
    # +---+-----+
    # | id | score |
    # +---+-----+
    # | 1 | 99 |
    # | 2 | 98 |
    # | 3 | 100 |
    # +---+-----+

5、通过文件读取创造

在读取json和parquet文件时不需要设定schema,因为文件已经自带

而读取csv时,还需要使用.option设定 header等参数 

这里说一下parquet文件

parquet:是Spark中常用的一种列式存储文件格式
和Hive中的ORC差不多,他俩都是列存储格式
parquet对比普通的文本文件的区别:

  • parquet 内置schema(列名列类型 是否为空)
  • 存储是以列作为存储格式
  • 存储是序列化存储在文件中的(有压缩属性体积小)

DataFrame代码风格

 DataFrame支持两种风格进行编程,分别是DSL风格SQL风格
DSL语法风格
DSL称之为:领域特定语言
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data比如: df.where0.limit0
SQL语法风格
SQL风格就是使用SQL语句处理DataFrame的数据比如: spark.sql(“SELECT*FROM xxx)

 DSL

其实就是用其内置的API处理数据,举例:

    df.select('id','subject').show()
    df.where('subject="语文"').show()
    df.select('id','subject').where('subject="语文"').show()
    df.groupBy('subject').count().show()

API其实跟SQL类似,这里不详细说明了,个人感觉不如直接写SQL语句

SQL

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sgl0来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表采用如下的方式:

    df.createTempView('tmp') #创建临时视图
    df.createGlobalTempView('global_tmp')#创建全局试图
    # 全局表: 跨SparkSession对象使用在一个程序内的多个SparkSession中均可调用查询前带上前缀:global_tmp
    df.createOrReplaceTempView('repalce_tmp')#创建临时表,如果存在则替换

然后使用spark.sql的形式书写sql代码

    spark.sql('select * from tmp where subject = "语文"').show()
    spark.sql('select id,score from repalce_tmp where score>90').show()
    spark.sql('select subject,max(score) from global_temp.global_tmp group by subject').show()

SparkSQL Shuffle 分区数目

 原因: 在SparkSQL中当Job中产生Shufle时,默认的分区数 spark.sql.shufle,partitions 为200,在实际项目中要合理的设置。
在代码中可以设置:

spark =  SparkSession.builder.appName('lmx').\
master('local[*]').config('spark.sql.shufle,partitions',2).\
getOrCreate()

spark.sqL.shuffle.partitions 参数指的是,在sql计算中,shuffle算子阶段默认的分区数是200

对于集群模式来说,200个默认也算比较合适

如在Local下运行,200个很多,在调度上会带宋限外的损耗,所以在Local下建议修改比较低, 比如2\4\10均可,这个参数和Spark RDD中设置并行度的参数是相互独立的

 DataFrame数据写出

统一API:

下面提供两种方法,分别写出为json和csv

    spark.sql(
        'select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc'
    ).write.mode('overwrite').format('json').save('data/output/1t')

    spark.sql(
        'select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc'
    ).write.mode('overwrite').format('csv')\
        .option('header',True)\
        .option('sep',';')\
        .save('data/output/csv')

其他的一些方法: 

SparkSQL中读取数据和写出数据 - 知乎

不过这里似乎不能自己命名导出的数据文件

Spark UDF

无论Hive还是SparKSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。回顾Hive中自定义函数有三种类型:
第一种:UDF(User-Defined-Function)函数.
一对一的关系,输入一个值经过函数以后输出一个值;
在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

第二种:UDAF(User-Defined Aggregation Function)聚合函数

多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;

第三种:UDTF(User-DefinedTable-Generating Functions)函数

一对多的关系,输入一个值输出多个值(一行变为多行),用户自定义生成函数,有点像flatMap;

在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF 

UDF有两种定义方式

方式1语法
udf对象=sparksession.udfregister(参数1,参数2,参数3)

参数1:UDF名称,可用于SQL风格

参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型

udf对象:返回值对象,是一个UDF对象,可用于DSL风格
方式2语法

from pyspark.sql import functions as F

udf对象 = F.udf(参数1,参数2)

参数1:被注册成UDF的方法名

参数2:声明UDF的返回值类型

udf对象:返回值对象,是一个UDF对象,可用于DSL风格

举例:

    def double_score(num):
        return 2*num

    udf1 = spark.udf.register('udf_1',double_score,IntegerType())
    # dsl风格
    df.select(udf1(df['score'])).show()
    # sql风格
    df.selectExpr('udf_1(score)').show()
    # sql风格2
    df.createTempView('tmp')
    spark.sql("select udf_1(score) from tmp").show()

    udf2 = F.udf(double_score,IntegerType())
    df.select(udf2(df['score'])).show()

当返回值是数组时,需要定义数组内部数据的数据类型:ArrayType(StringType())

    spark =  SparkSession.builder.appName('lmx').master('local[*]').config('spark.sql.shufle,partitions',2).getOrCreate()
    sc = spark.sparkContext

    rdd=sc.parallelize([['i love you'],['i like you']])
    df = rdd.toDF(['ifo'])
    def func(num):
        return num.split(' ')
    udf = spark.udf.register('udf_sql',func,ArrayType(StringType()))

    # dsl风格
    df.select(udf(df['ifo'])).show()

当返回值是字典时,需要使用StructType(),且定义每个列的名字(需要跟函数返回值的列名一样)和数据类型

    rdd=sc.parallelize([[1],[2],[3],[4],[5]])
    df = rdd.toDF(['ifo'])
    df.show()
    def func(num):
        return {'num':num,'num1':num+10}
    udf = spark.udf.register('udf_sql',func,StructType().\
                             add('num',IntegerType(),nullable=False).\
                             add('num1',IntegerType(),nullable=False))
    df.select(udf(df['ifo'])).show()

Catalyst优化器 

RDD的执行流程为:

代码 ->DAG调度器逻辑任务 ->Task调度器任务分配和管理监控 ->Worker干活

SparkSQL会对写完的代码,执行“自动优化”,既Catalyst优化器,以提升代码运行效率,避免开发者水平影响到代码执行效率。 (RDD代码不会,是因为RDD的数据对象太过复杂,无法被针对性的优化)

加入优化的SparkSQL大致架构为:

1.API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句

2.收到 SQL 语句以后,将其交给 Catalyst,Catalyst 负责解析 SQL,生成执行计划等

3.Catalyst 的输出应该是 RDD 的执行计划

4.最终交由集群运行 

 Catalyst优化器主要分为四个步骤

1、解析sql,生成AST(抽象语法树)

2、在 AST 中加入元数据信息,做这一步主要是为了一些优化,例如 col=col 这样的条件

以上面的图为例:

  • score.id → id#1#L 为 score.id 生成 id 为1,类型是 Long
  • score.math_score→math_score#2#L为 score.math_score 生成 id 为 2,类型为 Long
  • people.id→id#3#L为 people.id 生成 id 为3,类型为 Long
  • people.age→age#4#L为 people.age 生成 id 为 4,类型为 Long 

3、对已经加入元数据的 AST,输入优化器,进行优化,主要包含两种常见的优化:

谓词下推(Predicate Pushdown)\ 断言下推:将逻辑判断 提前到前面,以减少shuffle阶段的数据量。

以上面的demo举例,可以先进行people.age>10的判断再进行Join等操作。

列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的宽度

以上面的demo举例,由于只select了score和id,所以开始的时候,可以只保留这两个列,由于parquet是按列存储的,所以很适合这个操作

4、上面的过程生成的 AST 其实最终还没办法直接运行,这个 AST 叫做 逻辑计划,结束后,需要生成 物理计划,从而生成 RDD 来运行

Spark SQL的执行流程

如此,Spark SQL的执行流程为: 

1.提交SparkSQL代码
2.catalyst优化
        a.生成原始AST语法数
        b.标记AST元数据
        c.进行断言下推和列值裁剪 以及其它方面的优化作用在AST上
        d.将最终AST得到,生成执行计划
        e.将执行计划翻译为RDD代码
3.Driver执行环境入口构建(SparkSession)
4.DAG 调度器规划逻辑任务
5.TASK 调度区分配逻辑任务到具体Executor上工作并监控管理任务
6.Worker干活

Spark新特性

自适应查询(SparkSQL)(AQE)

即:Adaptive Query Execution

由于缺乏或者不准确的数据统计信息(元数据)和对成本的错误估算(执行计划调度)导致生成的初始执行计划不理想

在Spark3.x版本提供Adaptive Query Execution自适应查询技术 通过在”运行时”对查询执行计划进行优化, 允许Planner在运行时执行可选计划,这些可选计划将会基于运行时数据统 计进行动态优化, 从而提高性能,其开启方式为:

set spark.sql.adaptive.enabled = true;

Adaptive Query Execution AQE主要提供了三个自适应优化:

动态合并shuffle分区

即:Dynamically coalescing shuffle partitions 

 可以动态调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区 合并为较大的分区

动态调整Join策略

即:Dynamically switching join strategies 

此优化可以在一定程度上避免由于缺少统计信息或着错误估计大小(当然也可能两种情况同时存在),而导致执行计 划性能不佳的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能

其实就是将小的表设置为 广播表,使得所有大的表都能获得全部的小表,减少了后续的网络传输

动态优化倾斜Join 

shuffle时将过于大的数据分成与其他数据分区大小相似的n个分区,已实现数据分区均衡

 skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可 以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更 好的整体性能。

触发条件: 1. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionFactor (default=10) * "median partition size(中位数分区大小)"

2. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default = 256MB )
 

动态分区裁剪(SparkSQL)

即:Dynamic Partition Pruning

当优化器在编译时无法识别可跳过的分区时,可以使用"动态分区裁剪",即基于运行时推断的信息来进一步进行分区 裁剪。这在星型模型中很常见,星型模型是由一个或多个并且引用了任意数量的维度表的事实表组成。在这种连接操 作中,我们可以通过识别维度表过滤之后的分区来裁剪从事实表中读取的分区。

Spark SQL深入分析之动态分区裁剪(Dynamic Partition Pruning) - 知乎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/383678.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Android---Jetpack Compose学习003

Compose 状态。本文将探索如何在使用 Jetpack Compose 时使用和考虑状态,为此,我们需要构建一个 TODO 应用,我们将构建一个有状态界面,其中会显示可修改的互动式 TODO 列表。 状态的定义。在科学技术中,指物质系统所处…

【XR806开发板试用】轻松连上华为云实现物联网

本文为极术社区XR806试用活动文章。 一.开始 偶然的机会在网上看到了鸿蒙开发板的试用,作为一个"老鸿蒙"岂能放弃这个机会,报名之后不出意料地得到了使用名额,在此感谢极术社区. 收到开发板之后其实还有点失望了,就那么一个小小的核心板,其他啥也没有,连一根数据线…

图灵日记--MapSet字符串常量池反射枚举Lambda表达式泛型

目录 搜索树概念实现性能分析和 java 类集的关系 搜索概念及场景模型 Map的使用Map常用方法 Set的说明常见方法说明 哈希表冲突-避免-负载因子调节冲突-解决-闭散列冲突-解决-开散列/哈希桶冲突严重时的解决办法 实现和 java 类集的关系 字符串常量池String对象创建intern方法 …

MongoDB 与 mongo-express docker 安装

MongoDB 和 mongo-express 与 MySQL 不同,MongoDB 为 NoSQL 数据库,MongoDB 中没有 table ,schema 概念,取而代之的 collection,其中 collection 存储的为 BSON 格式,是一种类似于 JSON 的用于存储 k-v 键…

每日五道java面试题之java基础篇(五)

第一题. final、finally、finalize 的区别? final ⽤于修饰变量、⽅法和类:final 修饰的类不可被继承;修饰的⽅法不可被重写;修饰的变量不可变。finally 作为异常处理的⼀部分,它只能在 try/catch 语句中,…

PyTorch深度学习实战(26)——多对象实例分割

PyTorch深度学习实战(26)——多对象实例分割 0. 前言1. 获取并准备数据2. 使用 Detectron2 训练实例分割模型3. 对新图像进行推断小结系列链接 0. 前言 我们已经学习了多种图像分割算法,在本节中,我们将学习如何使用 Detectron2 …

Netty Review - NioEventLoopGroup源码解析

文章目录 概述类继承关系源码分析小结 概述 EventLoopGroup bossGroup new NioEventLoopGroup(1); EventLoopGroup workerGroup new NioEventLoopGroup();这段代码是在使用Netty框架时常见的用法,用于创建两个不同的EventLoopGroup实例,一个用于处理连…

【计算几何】确定两条连续线段向左转还是向右转

确定两条连续线段向左转还是向右转 目录 一、说明二、算法2.1 两点的叉积2.2 两个段的叉积 三、旋转方向判别3.1 左转3.2 右转3.3 共线判别 一、说明 如果是作图,或者是判别小车轨迹。为了直观地了解,从当前点到下一个点过程中,什么是左转、…

树莓派4B(Raspberry Pi 4B)使用docker搭建阿里巴巴sentinel服务

树莓派4B(Raspberry Pi 4B)使用docker搭建阿里巴巴sentinel服务 由于国内访问不了docker hub,而国内镜像仓库又没有适配树莓派ARM架构的sentinel镜像,所以我们只能退而求其次——自己动手构建镜像。本文基于Ubuntu,Jav…

springboot169基于vue的工厂车间管理系统的设计

基于VUE的工厂车间管理系统设计与实现 摘 要 社会发展日新月异,用计算机应用实现数据管理功能已经算是很完善的了,但是随着移动互联网的到来,处理信息不再受制于地理位置的限制,处理信息及时高效,备受人们的喜爱。本…

书生谱语-大语言模型测试demo

课程内容简介 1.作业 demo1 demo2 demo3 demo4

Makefile编译原理 make 中的路径搜索_1

一.make中的路径搜索 问题:在实际的工程项目中,所有的源文件和头文件都放在同一个文件夹中吗? 实验1 : VPATH 引子 mhrubuntu:~/work/makefile1/17$ ll total 28 drwxrwxr-x 4 mhr mhr 4096 Apr 22 00:46 ./ drwxrwxr-x 7 mhr m…

《UE5_C++多人TPS完整教程》学习笔记10 ——《P11 设置加入游戏会话(Setup for Joining Sessions)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P11 设置加入游戏会话(Setup for Joining Sessions)》 的学习笔记,该系列教学视频为 Udemy 课程 《Unreal Engine 5 C Multiplayer Shooter》 的中文字幕翻译版,UP主&…

Python远程控制工具的使用

本节我们对所编写的远程控制工具的功能进行测试。首先开启主控端程序, 如下所示: 接下来打开被控端程序。当被控端打开时,主控端会收到被控端的连接请 求。 开启被控端程序: 主控端接收到连接请求并显示被控端主机的信息&#xff…

MySQL-----DCL基础操作

▶ DCL简介 DCL英文全称是Data ControlLanguage(数据控制语言),用来管理数据库用户、控制数据库的访问权限。 DCL--管理用户 ▶ 查询用户 use mysql; select * from user; ▶ 创建用户 ▶ 语法 create user 用户名主机名 identified by 密码 设置为在任意主机上访问…

Z-Stack一直卡在HAL_BOARD_INIT();

原因是Debugger没有配置好,因为默认是Simulator,不是TI的驱动,所以仿真出现一直卡在 HAL_BOARD_INIT(); 的情况,解决方法就是将Simulator改为Texas Instruments 改成下面的样子

MySQL篇----第二十篇

系列文章目录 文章目录 系列文章目录前言一、NULL 是什么意思二、主键、外键和索引的区别?三、你可以用什么来确保表格里的字段只接受特定范围里的值?四、说说对 SQL 语句优化有哪些方法?(选择几条)前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍…

Educational Codeforces Round 135 (Rated for Div. 2)C. Digital Logarithm(思维)

文章目录 题目链接题意题解代码 题目链接 C. Digital Logarithm 题意 给两个长度位 n n n的数组 a a a、 b b b,一个操作 f f f 定义操作 f f f为, a [ i ] f ( a [ i ] ) a [ i ] a[i]f(a[i])a[i] a[i]f(a[i])a[i]的位数 求最少多少次操作可以使 …

单片机学习笔记---串口向电脑发送数据电脑通过串口控制LED

目录 串口向电脑发送数据 每隔一秒串口就发送一个递增的数给电脑 电脑通过串口控制LED 波特率的具体计算 HEX模式和文本模式 前两节是本节的理论基础,这节开始代码演示! 串口向电脑发送数据 接下来先开始演示一下串口单向发送一个数字给电脑&…

【Git】上传本地文件到Git(以Windows环境为例)

Git 的下载参考:Git 安装及配置 一、Git 上传的整体流程 1、工作区 > 本地仓库 将本地文件上传到Git,需要先上传到本地仓库,然后再上传到远程仓库。要上传文件到本地仓库,不是直接拷贝进去的,而是需要通过命令一步…