【SparkSQL】SparkSQL函数定义(重点:定义UDF函数、使用窗口函数)

【大家好,我是爱干饭的猿,本文重点介绍SparkSQL 定义UDF函数、SparkSQL 使用窗口函数。

后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】

上一篇文章:《【SparkSQL】DataFrame入门(重点:df代码操作、数据清洗API、通过JDBC读写数据库)》

4. SparkSQL函数定义

4.1 SparkSQL 定义UDF函数

无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.unctions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。

回顾Hive中自定义函数有三种类型:

  • 第一种:UDF (User-Defined-Function)函数
    • 一对一的关系,输入一个值经过函数以后输出一个值;
    • 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
  • 第二种:UDAF(User-Defined Aggregation Function)聚合函数
    • 多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;
  • 第三种:UDTF (User-Defined Table-Generating Functions)函数
    • —对多的关系,输入一个值输出多个值(一行变为多行);
    • 用户自定义生成函数,有点像flatMap;

目前来说Spark框架各个版本及各种语言对自定义函数的支持:

在这里插入图片描述

在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF

定义方式有2种:

  1. sparksession.udf.register()
    注册的UDF可以用于DSL和SQL
    返回值用于DSL风格,传参内给的名字用于SQL风格

  2. pyspark.sql.functions.udf
    仅能用于DSL风格

  • 方式1语法:
    udf对象 = sparksession.udf.register(参数1,参数2,参数3)
    参数1:UDF名称,可用于SQL风格
    参数2:被注册成UDF的方法名
    参数3:声明UDF的返回值类型
    udf对象: 返回值对象,是一个UDF对象,可用于DSL风格

  • 方式2语法:
    udf对象 = F.udf(参数1, 参数2)
    参数1:被注册成UDF的方法名
    参数2:声明UDF的返回值类型
    udf对象: 返回值对象,是一个UDF对象,可用于DSL风格

  • 其中F是:
    from pyspark.sql import functions as F
    其中,被注册成UDF的方法名是指具体的计算方法,如:
    def add(x, y): x + y
    add就是将要被注册成UDF的方法名

1. 构建一个Interger返回值类型的UDF

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8]).map(lambda x:[x])
    df = rdd.toDF(["num"])

    # TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用
    # UDF的处理函数
    def num_ride_10(num):
        return num * 10
    # 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格
    # 参数2: UDF的处理逻辑, 是一个单独的方法
    # 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致
    # 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法
    # 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格
    udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())

    # 1.1 SQL风格中使用
    # selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)
    # select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算
    df.selectExpr("udf1(num)").show()

    # 1.2 DSL 风格中使用
    # 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象
    df.select(udf2(df['num'])).show()

    # TODO 2: 方式2注册, 仅能用于DSL风格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

2. 构建一个ArrayType(数字\list)类型的返回值UDF

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
    df = rdd.toDF(["line"])

    # 注册UDF, UDF的执行函数定义
    def split_line(data):
        return data.split(" ")  # 返回值是一个Array对象
    # TODO 1: 方式1 构建UDF
    udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))

    # 1. DLS风格
    df.select(udf2(df['line'])).show()
    # 2. SQL风格
    df.createTempView("lines")
    spark.sql("select udf1(line) from lines").show(truncate=False)

    # TODO 2: 方式2的形式构建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

3. 构建一个字典类型的返回值的UDF

# coding:utf8
import string
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回
    # 比如传入1 我们返回 {"num":1, "letters": "a"}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(["num"])

    # 注册UDF
    def process(data):
        return {"num": data, "letters": string.ascii_letters[data]}

    """
    UDF的返回值是字典的话, 需要用StructType来接收
    """
    udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
                              add("letters", StringType(), nullable=True))

    df.selectExpr("udf1(num)").show(truncate=False)
    df.select(udf1(df['num'])).show(truncate=False)

4. 以mapPartitions API 完成UDAF构建

# coding:utf8
from pyspark.sql import SparkSession


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
    df = rdd.map(lambda x: [x]).toDF(['num'])

    # 折中的方式 就是使用RDD的mapPartitions 算子来完成聚合操作
    # 如果用mapPartitions API 完成UDAF聚合, 一定要单分区
    single_partition_rdd = df.rdd.repartition(1)

    def process(iter):
        sum = 0
        for row in iter:
            sum += row['num']

        return [sum]    # 一定要嵌套list, 因为mapPartitions方法要求的返回值是list对象


    print(single_partition_rdd.mapPartitions(process).collect())

注意
使用UDF两种方式的注册均可以
唯一需要注意的就是:返回值类型—定要有合适的类型来声明

  • 返回int 可以用IntergerType
  • 返回值小数,可以用FolatType或者DoubleType
  • 返回数组list可用ArrayType描述
  • 返回字典可用StructType描述

这些Spark内置的数据类型均存储在:pyspark.sql.types包中

4.2 SparkSQL 使用窗口函数

  • 介绍
    开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。
    开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

  • 聚合函数和开窗函数
    聚合函数是将多行变成一行,count,avg…开窗函数是将一行变成多行;
    聚合函数如果要显示其他的列必须将列加入到group by中开窗函数可以不使用group by,直接将所有信息显示出来

  • 开窗函数分类

    1. 聚合开窗函数
      聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY子句,但不可以是ORDER BY子句。
    2. 排序开窗函数
      排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。
    3. 分区类型NTILE的窗口函数

代码演示:

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([
        ("张三", 'class_1', 99),
        ("王五", 'class_2', 35),
        ("王三", 'class_3', 57)
         ])
    schema = StructType().add("name", StringType()).\
        add("class", StringType()).\
        add("score", IntegerType())
    df = rdd.toDF(schema)

    df.createTempView("stu")

    # 1. T0D0聚合窗口函数的演示
    spark.sql("""
        SELECT *, AVG(score) OVER() as avg_score from stu
        """).show()

    # 2. T0D0排序相关的窗口函数计算
    # RAKN over,DENSE_RANK over ROW_NUABER over
    spark.sql("""
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) As row_number_rank, 
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) As dense_rank, 
        RANK() OVER(ORDER BY score) AS rank 
        FROM stu
        """).show()

    # 3. TOD0 NTILE
    spark.sql("""
    SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    """).show()

4.3 总结

  1. SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF(通过rdd的mapPartitions算子模拟实现udtf:通过返回array或者dict类型来模拟实现)
  2. UDF定义支持2种方式, 1:使用SparkSession对象构建. 2: 使用functions包中提供的UDF API构建. 要注意, 方式1可用DSL和SQL风格, 方式2 仅可用于DSL风格
  3. SparkSQL支持窗口函数使用, 常用SQL中的窗口函数均支持, 如聚合窗口\排序窗口\NTILE分组窗口等

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/207574.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++相关闲碎记录(2)

1、误用shared_ptr int* p new int; shared_ptr<int> sp1(p); shared_ptr<int> sp2(p); //error // 通过原始指针两次创建shared_ptr是错误的shared_ptr<int> sp1(new int); shared_ptr<int> sp2(sp1); //ok 如果对C相关闲碎记录(1)中记录的shar…

【前缀和]LeetCode1862:向下取整数对和

本文涉及的基础知识点 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 作者推荐 动态规划LeetCode2552&#xff1a;优化了6版的1324模式 题目 给你一个整数数组 nums &#xff0c;请你返回所有下标对 0 < i, j < nums.length 的 …

一文带你了解网络安全简史

网络安全简史 1. 上古时代1.1 计算机病毒的理论原型1.2 早期计算机病毒1.3 主要特点 2. 黑客时代2.1 计算机病毒的大流行2.2 知名计算机病毒2.3 主要特点 3. 黑产时代3.1 网络威胁持续升级3.2 代表性事件3.3 主要特点 4 高级威胁时代4.1 高级威胁时代到来4.2 著名的APT组织4.3 …

Python之Requests模块简介与安装

Requests模块简介 在python的标准库中&#xff0c;虽然提供了urllib,utllib2,httplib&#xff0c;但是做接口测试&#xff0c;requests使用更加方便快捷&#xff0c;正如官方说的&#xff0c;“让HTTP服务人类”。 Requests是用python语言基于urllib编写的&#xff0c;采用的是…

利用异或、取反、自增bypass_webshell_waf

目录 引言 利用异或 介绍 eval与assert 蚁剑连接 进阶题目 利用取反 利用自增 引言 有这样一个waf用于防御我们上传的文件&#xff1a; function fun($var): bool{$blacklist ["\$_", "eval","copy" ,"assert","usort…

折扣因子的变化图(Python)

var 3 var_list [3] for _ in range(50):var * .95var_list.append(var)import matplotlib.pyplot as plt import numpy as np plt.plot(np.arange(len(var_list)), var_list, linewidth1) plt.show()

美丽的时钟

案例绘制一个时钟 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>美丽的时钟</title><script language"javascript">window.onloadfunction(){var clockdocument.getElementById("clock"…

你需要知道所有设计模式吗?

后续我会详细展开设计模式 &#x1d5d7;&#x1d5fc; &#x1d5ec;&#x1d5fc;&#x1d602; &#x1d5e1;&#x1d5f2;&#x1d5f2;&#x1d5f1; &#x1d5e7;&#x1d5fc; &#x1d5de;&#x1d5fb;&#x1d5fc;&#x1d604; &#x1d5d4;&#x1d5f9;&…

溜冰场电脑收银系统软件会员管理操作教程,佳易王溜冰场会员卡管理软件下载

溜冰场电脑收银系统软件会员管理操作教程&#xff0c;佳易王溜冰场会员卡管理软件下载 一、软件 部分功能简介&#xff1a; 1、会员信息登记 &#xff1a;可以直接使用手机号登记&#xff0c;也可以使用实体卡片&#xff0c;推荐用手机号即可。 2、会员卡类型 &#xff1a;可…

Redis:事务操作

目录 Redis事务定义相关命令事务的错误处事务冲突的问题Redis事务三特性 Redis事务定义 redis事务是一个单独的隔离操作&#xff0c;事务中的所有命令都会序列化、按顺序地执行&#xff0c;事务在执行的过程中&#xff0c;不会被其他客户端发送来的命令请求所打断。 redis事务…

HTAP 还可以这么玩?丨TiDB 在 IoT 智慧园区的应用

作者&#xff1a;某物联网公司设施云平台负责人 用户简介&#xff1a;我们是一家提供全链智慧园区整体解决方案的物联网公司&#xff0c;致力于打造可持续发展的智慧园区。 基础设施平台简介 基础设施平台是集团一线作业人员日常工作中高度依赖的重要系统&#xff0c;涵盖了各…

涉密计算机违规外联原因及防范措施

高度信息化的时代&#xff0c;涉密计算机违规外联已成为一种严重的安全威胁。涉密计算机违规外联是指涉密计算机通过互联网、电子邮件等方式与外部计算机或网络进行连接&#xff0c;导致机密信息泄露或被恶意攻击。 为了应对这一问题&#xff0c;本文将探讨涉密计算机违规外联的…

WPF实战项目十九(客户端):修改RestSharp的引用

修改HttpRestClient&#xff0c;更新RestSharp到110.2.0&#xff0c;因为106版本和110版本的代码不一样&#xff0c;所以需要修改下代码 using Newtonsoft.Json; using RestSharp; using System; using System.Threading.Tasks; using WPFProjectShared;namespace WPFProject.S…

wps备份功能 救了我一命

感谢wps备份功能 救了我一命 文章目录 感谢wps备份功能 救了我一命**&#x1f4dd;场景回现&#xff0c;往后再不干了**&#x1f9e3;灵光一现&#x1f4c7;备注中心的设置流程&#x1f58a;️最后总结 &#x1f4dd;场景回现&#xff0c;往后再不干了 小&#x1f42e;今天接到…

理解BatchNormalization层的作用

深度学习 文章目录 深度学习前言一、“Internal Covariate Shift”问题二、BatchNorm的本质思想三、训练阶段如何做BatchNorm四、BatchNorm的推理(Inference)过程五、BatchNorm的好处六、机器学习中mini-batch和batch有什么区别 前言 Batch Normalization作为最近一年来DL的重…

Spring Task

1 介绍 Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 定位&#xff1a;定时任务框架 作用&#xff1a;定时自动执行某段Java代码 为什么要在Java程序中使用Spring Task&#xff1f; 应用场景&#xff1a; 1). 信用卡…

MyBatis-Plus动态表名使用selectPage方法不生效问题解析与解决

文章目录 MyBatis-Plus动态表名简介selectPage方法不生效的问题解决方案&#xff1a;SqlParser注解与BaseMapper的selectPage方法示例代码实体类Mapper接口Service层Controller层 总结 &#x1f389;MyBatis-Plus动态表名使用selectPage方法不生效问题解析与解决 ☆* o(≧▽≦)…

【每日OJ —— 144. 二叉树的前序遍历】

每日OJ —— 144. 二叉树的前序遍历 1.题目&#xff1a;144. 二叉树的前序遍历2.方法讲解2.1.算法讲解2.2.代码实现2.3.提交通过展示 1.题目&#xff1a;144. 二叉树的前序遍历 2.方法讲解 2.1.算法讲解 1.首先如果在每次每个节点遍历的时候都去为数组开辟空间&#xff0c;这样…

linux基础五:linux 系统(进程状态2:)

linux 系统 一.进程状态&#xff1a;1.睡眠状态(sleep)&#xff1a;2.磁盘休眠状态(disk sleep)&#xff1a;3.停止状态(stoped --- T)&#xff1a;4.死亡状态&#xff1a;5.控制状态&#xff08;t&#xff09; 二.僵尸进程和孤儿进程&#xff1a;1.僵尸状态&#xff1a;2.孤儿…

同城服务足浴按摩软件系统开发方案;

足浴按摩软件是一款线上系统小程序&#xff0c;旨在方便用户在线预约按摩师、选择适合自己的服务项目和时间&#xff0c;并在家中或指定地点享受按摩服务。这款上门按摩小程序为用户提供了便利和个性化的按摩服务体验。 用户可以通过手机随时随地通过足浴按摩软件预约按摩师&am…
最新文章