Spark SQL进阶

DataFrame详解

清洗相关API

  • 去重API
    在这里插入图片描述
  • 删除空缺值的API
    在这里插入图片描述
  • 替换缺失值的API
    在这里插入图片描述
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('sparksql_etl_api')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/clear_data.csv',
        sep=',',
        encoding='UTF-8',
        header="True",
        inferSchema=True
    )

    init_df.printSchema()

    # 3- 数据处理
    # 3.1- 删除重复数据:dropDuplicates
    """
        dropDuplicates总结:用来删除重复数据。如果没有指定参数subset,那么要比对行中的所有字段内容,
            如果全部相同,就认为是重复数据,会被删除;如果有指定参数subset,那么只比对subset中指定的字段范围
            
    """
    init_df.dropDuplicates().show()
    init_df.dropDuplicates(subset=["id","name"]).show()

    # 指定不存在的字段会报错:AnalysisException: Cannot resolve column name "id2" among (id, name, age, address)
    # init_df.dropDuplicates(subset=["id2","name"]).show()


    print("-"*30)
    # 3.2- 删除缺失值数据:dropna
    """
        dropna(thresh,subset):删除缺失值数据.
            1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据
            2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内
            3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除
    """
    init_df.dropna().show()
    init_df.dropna(subset=["id","name"]).show()
    init_df.dropna(thresh=1,subset=["name","age","address"]).show()
    init_df.dropna(thresh=2,subset=["name","age","address"]).show()
    init_df.dropna(thresh=2).show()

    print("-" * 30)
    # 3.3- 替换缺失值数据:fillna
    """
        fillna(value,subset):替换缺失值数据
            value:必须要传递参数.是用来填充缺失值的
            subset:限定缺失值替换范围
        注意:
            1-value如果不是字典,那么只会替换字段类型匹配的空值
            2-最常用的是value传递字典的形式
    """
    init_df.fillna(value=999).show()
    init_df.fillna(value=999,subset=["id","name"]).show()
    init_df.fillna(value={"id":111,"name":"未知姓名","age":100,"address":"北京"}).show()

    # 4- 数据输出
    # 5- 释放资源
    spark.stop()
总结:
1- dropDuplicates总结:用来删除重复数据。如果没有指定参数subset,那么要比对行中的所有字段内容,
如果全部相同,就认为是重复数据,会被删除;如果有指定参数subset,那么只比对subset中指定的字段范围
            
2- dropna(thresh,subset):删除缺失值数据.
   2.1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据
   2.2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内
   2.3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除
            
3- fillna(value,subset):替换缺失值数据
   3.1- value:必须要传递参数.是用来填充缺失值的
   3.1- subset:限定缺失值替换范围
   
   注意:
   3.1- value如果不是字典,那么只会替换字段类型匹配的空值
   3.2- 最常用的是value传递字典的形式

Spark SQL的Shuffle分区设置

Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行

​ Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小

如何调整shuffle分区数量? spark.sql.shuffle.partitions

方案一(不推荐):  直接修改spark的配置文件spark-defaults.conf。全局设置,默认值为200。设置为: 
		spark.sql.shuffle.partitions     20

方案二(常用,推荐使用): 在客户端通过submit命令提交的时候, 动态设置shuffle的分区数量。部署、上线的时候、基于spark-submit提交运行的时候
	./spark-submit --conf "spark.sql.shuffle.partitions=20"

方案三(比较常用): 在代码中设置。主要在测试环境中使用, 但是一般在部署上线的时候, 会删除。优先级也是最高的。一般的使用场景是,当你的数据量未来不会发生太大的波动。
	 sparkSession.conf.set('spark.sql.shuffle.partitions',20)
import time

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    start = time.time()

    # 1- 创建SparkSession顶级对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('sparksql_wordcount_demo')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.read.text(
        paths='hdfs://node1:8020/input/words.txt'
    )

    # 创建临时视图
    init_df.createTempView('words')

    # init_df.show()
    # init_df.printSchema()

    # 3- 数据处理
    # 3.1- SQL方式:子查询
    spark.sql("""
        select
            word,count(1) as cnt,max(word) as max_word,min(word) as min_word
        from (
            select
                explode(split(value,' ')) as word
            from words
        ) group by word
    """).show()

    print("-"*50)

    # 3.2- SQL方式:侧视图
    spark.sql("""
        select
            word,count(1) as cnt
        from words
        -- 侧视图
        lateral view explode(split(value,' ')) t as word
        group by word
    """).show()


    print("DSL运行结果")
    print("-" * 50)
    # 3.3- DSL方式一
    """
        DSL方式总结:
            withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
            agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
            withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
    """
    init_df.select(
        F.explode(F.split('value',' ')).alias('word')
    ).groupBy('word').count().show()

    print("-" * 50)
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').count().withColumnRenamed('count','cnt').show()

    print("-" * 50)
    # 3.3- DSL方式二
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word')
    ).show()

    print("-" * 50)
    # 3.4- DSL方式三
    init_df.withColumn(
        'word',
        F.explode(F.split('value', ' '))
    ).groupBy('word').agg(
        F.count('word').alias('cnt')
    ).show()

    run_time = time.time() - start
    print("运行耗时:",run_time)
    time.sleep(1000)

    # 4- 数据输出
    # 5- 释放资源
    spark.stop()

数据写出操作

统一的输出语法:
在这里插入图片描述

对应的简写API格式如下,以CSV为例:
init_df.write.csv(
    path='存储路径',
    mode='模式',
    header=True,
    sep='\001',
    encoding='UTF-8'
)

输出到文件中 json csv orc text …

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("csv方式读取文件")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions","1")\
        .appName('数据输出到文件系统')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/stu.txt',
        sep=' ',
        encoding='UTF-8',
        header="True",
        inferSchema=True
    )

    # 3- 数据处理
    result = init_df.where('age>=20')

    # 4- 数据输出
    result.show()
    result.printSchema()

    # 数据输出到文件系统:简单API
    """
        常用参数说明:
            1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
            2- mode:当输出目录中文件已经存在的时候处理办法
                2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
                2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
                2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
                2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path file:xxx already exists.
            3- sep:字段间的分隔符
            4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
            5- encoding:文件输出的编码方式
    """
    result.write.csv(
        path="file:///export/data/output/",
        mode='ignore',
        sep=',',
        header=True,
        encoding="UTF-8"
    )

    # 数据输出到文件系统:复杂API
    """
        设置mode,需要单独调用mode()方法
    """
    result.write\
        .format('json')\
        .mode("overwrite")\
        .option("encoding","UTF-8")\
        .save('file:///export/data/output_json/')

    # 5- 释放资源
    spark.stop()
常用参数说明:
    1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
    2- mode:当输出目录中文件已经存在的时候处理办法
        2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
        2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
        2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
        2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path 	
        			file:xxx already exists.
        
    3- sep:字段间的分隔符
    4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
    5- encoding:文件输出的编码方式

将结果数据基于JDBC方案, 输出到关系型数据库, 例如说: MySql

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("数据输出到数据库")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions","1")\
        .appName('sparksql_database')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/stu.txt',
        sep=' ',
        encoding='UTF-8',
        header="True",
        inferSchema=True
    )

    # 3- 数据处理
    result = init_df.where('age>=20')

    # 4- 数据输出
    result.show()
    result.printSchema()

    # 数据输出到数据
    """
        创建数据库命令:create database text character set utf8;
    """
    result.write.jdbc(
        url='jdbc:mysql://node1:3306/text?useUnicode=true&characterEncoding=utf-8',
        table='student',
        mode='append',
        properties={ 'user' : 'root', 'password' : '123456' }
    )

    # 5- 释放资源
    spark.stop()

运行结果截图:
在这里插入图片描述
可能出现的错误一:
在这里插入图片描述

原因:  缺少连接MySQL数据库的驱动

数据库的驱动包, 一般都是一些Jar包

如何放置【mysql-connector-java-5.1.41.jar】驱动包呢?  
	1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
		目录位置: /export/server/spark/jars
	
	2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
		目录位置: 
			/root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
	
	3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
		hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars
		

	请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars  ....

可能出现的错误二:
在这里插入图片描述
在这里插入图片描述

原因:将中文输出到了数据表中
解决办法:
1- 数据库连接要加上:useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码character set utf8

常见DSL代码

分类格式含义示例
API/方法select查询字段select(‘id1’, ‘id2’)
where对数据过滤where(‘avg_score>3’)
groupBy对数据分组groupBy(‘userid’)
orderBy对数据排序orderBy(‘cnt’, ascending=False)
limit取前几条数据orderBy(‘cnt’, ascending=False).limit(1)
agg聚合操作,里面可以写多个聚合表达式agg(F.round(F.avg(‘score’), 2).alias(‘avg_score’))
show打印数据init_df.show()
printSchema打印数据的schema信息,也就是元数据信息init_df.printSchema()
alias对字段取别名F.count(‘movieid’).alias(‘cnt’)
join关联2个DataFrameetl_df.join(avg_score_dsl_df, ‘movieid’)
withColumn基于目前的数据产生一个新列init_df.withColumn(‘word’,F.explode(F.split(‘value’, ’ ')))
dropDuplicates删除重复数据init_df.dropDuplicates(subset=[“id”,“name”])
dropna删除缺失值init_df.dropna(thresh=2,subset=[“name”,“age”,“address”])
fillna替换缺失值init_df.fillna(value={“id”:111,“name”:“未知姓名”,“age”:100,“address”:“北京”})
first取DataFrame中的第一行数据
over创建一个窗口列
函数avg计算均值
count计数
col将字段包装成Column对象,一般用于对新列的包装
round保留小数位
desc降序排序
row_number行号。从1开始编号
窗口partitionBy对数据分区
orderBy对数据排序orderBy(F.desc(‘pv’))
1- 什么使用使用select实现聚合,什么时候使用groupBy+agg/select实现聚合?
   如果不需要对数据分组,那么可以直接使用select实现聚合;如果有分组操作,需要使用groupBy+agg/select,推荐使用agg
        
2- first()总结
   如果某个DataFrame中只有一行数据,并且不使用join来对比数据,那么一般需要使用first()明确指定和第一行进行比较
    
3- F.col()总结
   对于在计算过程中临时产生的字段,需要使用F.col()封装成Column对象
  • API/方法:是由DataFrame来调用
  • 函数:需要先通过import pyspark.sql.functions as F导入,使用F调用。Spark SQL内置提供的函数https://spark.apache.org/docs/3.1.2/api/sql/index.html
  • 窗口:需要先通过from pyspark.sql import Window导入

Spark SQL函数定义

窗口函数

在Spark SQL中使用窗口函数案例:
需求是找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config('spark.sql.shuffle.partitions',1)\
        .appName('sparksql_win_function')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.read.csv(
        path='file:///export/data/cookie.txt',
        schema='cookie string,datestr string,pv int',
        sep=',',
        encoding='UTF-8'
    )

    init_df.createTempView('win_data')
    init_df.show()
    init_df.printSchema()

    # 3- 数据处理
    # SQL
    spark.sql("""
        select 
            cookie,datestr,pv
        from (
            select
                cookie,datestr,pv,
                row_number() over (partition by cookie order by pv desc) as rn
            from win_data
        ) tmp where rn<=3
    """).show()

    # DSL
    """
        select:注意点,结果中需要看到哪几个字段,就要明确写出来
    """
    init_df.select(
        "cookie","datestr","pv",
        F.row_number().over(win.partitionBy('cookie').orderBy(F.desc('pv'))).alias('rn')
    ).where('rn<=3').select("cookie","datestr","pv").show()

    # 4- 数据输出
    # 5- 释放资源
    spark.stop()

运行结果截图:
在这里插入图片描述

SQL函数分类

SQL函数,主要分为以下三大类:

  • UDF函数:用户自定义函数
    • 特点:一对一,输入一个得到一个
    • 例如:split() substr()
  • UDAF函数:用户自定义聚合函数
    • 特点:多对一,输入多个得到一个
    • 例如:sum() avg() count() min()
  • UDTF函数:用户自定义表数据生成函数
    • 特点:一对多,输入一个得到多个
    • 例如:explode()

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

有这么多的内置函数,为啥还需要自定义函数呢?

为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数

在这里插入图片描述

1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件,Python可以开发UDF、UDAF函数

Spark SQL原生存在的问题:大量的序列化和反序列
在这里插入图片描述

虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好
	
早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可
	
目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作

Spark原生自定义UDF函数

自定义函数流程:

第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步: 将Python函数注册到Spark SQL中
	注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
		参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
		参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数
		参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型
		udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
	
		说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】
	
	注册方式二:  udf对象 = F.udf(参数1,参数2)
		参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数
		参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型
		udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
		
		说明: 如果通过方式二来注册函数,【仅能用在DSL中】
		
	注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面
		说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】
		
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

Pandas的UDF函数

Apache Arrow框架基本介绍

Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

​ Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

​ Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

前提:服务器上已经安装pyspark
然后执行
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]

如何使用呢?  默认不会自动启动的, 一般建议手动配置

sparkSession.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)

在这里插入图片描述

基于Arrow完成Pandas DataFrame和Spark DataFrame互转

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("基于Arrow完成Pandas DataFrame和Spark DataFrame互转")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('dataframe')\
        .master('local[*]')\
        .getOrCreate()

    # 手动开启Arrow框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

    # 2- 数据输入
    init_df = spark.createDataFrame(
        data=[(1, '张三', '广州'), (2, '李四', '深圳')],
        schema='id int,name string,address string'
    )

    # 3- 数据处理
    # sparksql dataframe -> pandas dataframe
    pd_df = init_df.toPandas()
    print(type(pd_df),pd_df)

    new_pd_df = pd_df[pd_df['id']==2]

    # pandas dataframe -> sparksql dataframe
    spark_df = spark.createDataFrame(data=new_pd_df)
    spark_df.show()
    spark_df.printSchema()

    # 4- 数据输出
    # 5- 释放资源
    spark.stop()

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

总结:
Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

基于Pandas完成UDF函数

基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

自定义函数流程:

第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步: 将Python函数包装成Spark SQL的函数
	注册方式一: udf对象 = spark.udf.register(参数1, 参数2)
		参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
		参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数
		使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用
		注意: 如果编写的是UDAF函数,那么注册方式一需要配合注册方式三,一起使用
		
	注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)
		参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数
		参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型
		udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用
	
	注册方式三: 语法糖写法  @F.pandas_udf(returnType=返回值Spark SQL的数据类型)  放置到对应Python的函数上面
		说明: 实际是方式一的扩展。仅能用在DSL中使用
	
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
自定义UDF函数

自定义Python函数的要求:SeriesToSeries

  • 表示:第一步中创建自定义Python函数的时候,输入参数的类型和返回值类型必须都是Pandas中的Series类型
  • 需求:完成a列和b列的求和计算操作
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F

# 绑定指定的Python解释器
from pyspark.sql.types import IntegerType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('pandas_udf')\
        .master('local[*]')\
        .getOrCreate()

    # 手动开启Arrow框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

    # 2- 数据输入
    init_df = spark.createDataFrame(
        data=[(1,2),(2,3),(3,4)],
        schema='num1 int,num2 int'
    )

    init_df.createTempView('tmp')

    # 3- 数据处理
    # 3.1- 自定义Python函数
    """
        1- num1:pd.Series用来限定输入的参数类型是Pandas中的Series对象
        2-  -> pd.Series用来限定返回值类型是Pandas中的Series对象
    """
    def my_sum(num1:pd.Series, num2:pd.Series) -> pd.Series:
        return num1+num2

    # 3.2- 注册进SparkSQL。注册方式一
    dsl_my_sum = spark.udf.register('sql_my_sum',my_sum)

    # 3.3- 使用
    # SQL
    spark.sql("""
        select
            num1,num2,
            sql_my_sum(num1,num2) as result
        from tmp
    """).show()

    # DSL
    init_df.select(
        "num1",
        "num2",
        dsl_my_sum("num1", "num2").alias("result")
    ).show()


    # 注册方式二
    dsl2_my_sum = F.pandas_udf(my_sum,IntegerType())

    # DSL
    init_df.select(
        "num1",
        "num2",
        dsl2_my_sum("num1", "num2").alias("result")
    ).show()

    # 注册方式三
    @F.pandas_udf(IntegerType())
    def my_sum_candy(num1:pd.Series, num2:pd.Series) -> pd.Series:
        return num1+num2

    # DSL
    init_df.select(
        "num1",
        "num2",
        my_sum_candy("num1", "num2").alias("result")
    ).show()

    # 4- 数据输出
    # 5- 释放资源
    spark.stop()

运行结果截图:
在这里插入图片描述

自定义UDAF函数

自定义Python函数的要求:Series To 标量

  • 表示:自定义函数的输入数据类型是Pandas中的Series对象,返回值数据类型是标量数据类型。也就是Python中的数据类型,例如:int、float、bool、list…
  • 需求:对某一列数据计算平均值的操作
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F

# 绑定指定的Python解释器
from pyspark.sql.types import IntegerType, FloatType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('pandas_udaf')\
        .master('local[*]')\
        .getOrCreate()

    # 手动开启Arrow框架
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

    # 2- 数据输入
    init_df = spark.createDataFrame(
        data=[(1,2),(2,3),(3,3)],
        schema='num1 int,num2 int'
    )

    init_df.createTempView('tmp')

    # 3- 数据处理
    # 3.1- 自定义Python函数
    """
        UDAF对自定义Python函数的要求:输入数据的类型必须是Pandas中的Series对象,返回值类型必须是Python中的标量数据类型
    """
    @F.pandas_udf(returnType=FloatType())
    def my_avg(num2_col:pd.Series) -> float:
        print(type(num2_col))
        print(num2_col)
        # 计算平均值
        return num2_col.mean()

    # 3.2- 注册进SparkSQL。注册方式一
    dsl_my_avg = spark.udf.register('sql_my_avg',my_avg)

    # 3.3- 使用
    # SQL
    spark.sql("""
        select
            sql_my_avg(num2) as result
        from tmp
    """).show()

    # DSL
    init_df.select(dsl_my_avg("num2").alias("result")).show()

    # 4- 数据输出
    # 5- 释放资源
    spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/311662.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

函数——自制函数(c++)

今天进入自制函数。 自制函数&#xff0c;需要自己定义其功能。比如&#xff0c;设置一个没有参数没有返回值的积木&#xff0c;叫“aaa”。那么&#xff0c;如果想要运行“aaa”&#xff0c;就需要以下代码&#xff1a; void aaa(); 告诉系统有“aaa”…

强化学习的数学原理学习笔记 - Actor-Critic

文章目录 概览&#xff1a;RL方法分类Actor-CriticBasic actor-critic / QAC&#x1f7e6;A2C (Advantage actor-critic)Off-policy AC&#x1f7e1;重要性采样&#xff08;Importance Sampling&#xff09;Off-policy PGOff-policy AC &#x1f7e6;DPG (Deterministic AC) 本…

【自控实验】1. 线性系统串联超前校正实验

本科课程实验报告&#xff0c;有太多公式和图片了&#xff0c;干脆直接转成图片了 仅分享和记录&#xff0c;不保证全对 串联超前校正实验&#xff1a;频域设计计算(校正装置)&#xff0c;时域观察验证(校正结果) 使用matlab中的simulink进行仿真

String有没有最大长度限制?

大家都用过String字符串&#xff0c;有的人可能还不知道它的长度在某些方面是有一些限制。 public String(byte bytes[], int offset, int length);这是java.lang.String中的一个构造函数&#xff0c;可以看到它的长度是int类型&#xff0c;int的最大取值是2^31-1.但是我们却不…

python股票分析挖掘预测技术指标知识跳空缺口指标详解(5)

本人股市多年的老韭菜&#xff0c;各种股票分析书籍&#xff0c;技术指标书籍阅历无数&#xff0c;萌发想法&#xff0c;何不自己开发个股票预测分析软件&#xff0c;选择python因为够强大&#xff0c;它提供了很多高效便捷的数据分析工具包。 我们已经初步的接触与学习其中数…

Next.js 学习笔记(五)——渲染

渲染 渲染将你编写的代码转换到用户界面。React 和 Next.js 允许你创建混合 web 应用程序&#xff0c;其中部分代码可以在服务器或客户端上呈现。本节将帮助你了解这些渲染环境、策略和运行时之间的差异。 基本知识 首先&#xff0c;下列对熟悉三个基本的网络概念是有帮助的…

哈希-力扣454.四数相加Ⅱ

题目 给你四个整数数组 nums1、nums2、nums3 和 nums4 &#xff0c;数组长度都是 n &#xff0c;请你计算有多少个元组 (i, j, k, l) 能满足&#xff1a; 0 < i, j, k, l < nnums1[i] nums2[j] nums3[k] nums4[l] 0 示例 1&#xff1a; 输入&#xff1a;nums1 [1…

UI功能6大流程、接口测试8大流程这些你真的全会了吗?

在讲接口流程测试之前&#xff0c;首先需要给大家申明下&#xff1a;接口测试对于测试人员而言&#xff0c;非常非常重要&#xff0c;懂功能测试接口测试&#xff0c;就能在企业中拿到一份非常不错的薪资。 这么重要的接口测试&#xff0c;一般也是面试笔试必问。为方便大家更…

Java并发之互斥一:管程

1、简单聊聊什么是管程模型 &#xff08;共享资源&#xff09;&#xff1a;定义一个共享变量&#xff0c;可以理解锁&#xff0c;令牌这类的东西&#xff08;互斥访问共享资源&#xff09;&#xff1a;获取这个锁、令牌的时候是排好队的&#xff0c;只允许单线程访问&#xff…

LeetCode刷题---最小栈

解题思路&#xff1a; 该题通过辅助栈的方式来解决 定义数据栈(用于实现正常操作流程)和辅助栈(用于获取最小元素)。 push:首先将数据push进数据栈中&#xff0c;此时再判断辅助栈是否为空或者当前数据是否小于辅助栈中的最小元素(栈顶元素)&#xff0c;如果条件成立&#xff0…

什么是广告联盟?国内哪些广告联盟?广告联盟如何赚取收益?

开发者想要对接广告联盟获得广告变现收益&#xff0c;就要了解广告联盟的优势&#xff0c;以及广告联盟是如何获取收益的。 一、什么是广告联盟&#xff1f; 广告联盟是一种在线广告服务模式&#xff0c;将广告主和流量主联系在一起。通过广告联盟平台的技术服务&#xff0c;…

用友U8流程审批效率-SQLServer+SSRS

文章目录 @[TOC]1、 需求及效果1.1 需求1.2 效果2、 思路及SQL语句3、实现折叠明细表4、结语1、 需求及效果 1.1 需求 想要查看U8的审批流程,查看流程在哪个节点或人停留的时间,这个单据整个流程走下来需要的时间。可以更加直观方便的查看审批效率 1.2 效果 采用了SSRS上…

NLP论文阅读记录 - wos | 01 使用深度学习对资源匮乏的语言进行抽象文本摘要

文章目录 前言0、论文摘要一、Introduction1.1目标问题1.2相关的尝试1.3本文贡献 二.相关工作三.本文方法四 实验效果4.1数据集4.2 对比模型4.3实施细节4.4评估指标4.5 实验结果4.6 细粒度分析 五 总结思考 前言 Abstractive text summarization of lowresourced languages usi…

企业级进销存管理系统

框架&#xff1a; 进销存管理系统&#xff0c;采用SpringBootShiroMyBatisEasyUI 项目采用Maven构建&#xff0c;数据库文件存放在 sql/jxc.sql 截图 运行项目部分截图&#xff0c; 登录界面&#xff0c;用户名admin&#xff0c;密码admin123 当前库存查询&#xff0c; 进…

C/C++ 堆排序

个人主页&#xff1a;仍有未知等待探索-CSDN博客 专题分栏&#xff1a;数据结构_仍有未知等待探索的博客-CSDN博客 欢迎大家来指教&#xff01; 一、前言 今天要介绍的是堆排序。 首先什么是堆&#xff1f;简而言之&#xff0c;堆就是二叉树的数组形式&#xff0c;用数组来存…

c++最值查找

目录 min和max函数 min_element和max_element 例 nth_element函数 例 例题 题目描述 输入描述 输出描述 解 min和max函数 只能传入两个值或一个列表 时间复杂度为O(1),数组O(n)&#xff0c;n为元素个数 min_element和max_element min_element(st,ed)返回地址[st,…

企业泛域名SSL证书

SSL数字证书是一种电子证书&#xff0c;它由CA认证机构颁发&#xff0c;提供了加密连接和身份验证的功能。SSL数字证书广泛应用于各种在线服务&#xff0c;如网页浏览、电子邮件、在线银行等。它能够提供安全的连接&#xff0c;保护用户的个人信息和交易数据不被窃取或篡改。企…

如果你还忍受校园网,那么一定要看它!随身WiFi哪个品牌最靠谱 ?高人气随身WiFi第一名

很多学生党反映校园网价格太贵&#xff0c;贵就算了&#xff0c;还非常不好用&#xff0c;真的很泪崩。 对于学生党来说&#xff0c;一个性价比极高的随身WiFi才是正正好。 但是很多学生党对随身WiFi并不了解&#xff0c;那么下面我就回答几个学生党最关心的问题。一、价格合…

JVM类加载器打破双亲委派机制

欢迎大家关注我的微信公众号&#xff1a; 传送门&#xff1a;从JDK源码级别剖析JVM类加载器 目录 打破双亲委派机制 Tomcat打破双亲委派机制 Tomcat自定义加载器详解 模拟实现Tomcat打破双亲委派 上一篇文章讲到了jvm类加载器的双亲委派机制&#xff0c;本文就来讲…
最新文章