Spark_SQL函数定义(定义UDF函数、使用窗口函数)

                    一、UDF函数定义

        (1)函数定义

        (2)Spark支持定义函数

        (3)定义UDF函数

                (4)定义返回Array类型的UDF

        (5)定义返回字典类型的UDF

二、窗口函数

        (1)开窗函数简述

        (2)窗口函数的语法


一、UDF函数定义

        (1)函数定义

        无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。
        Hive中自定义函数有三种类型:

        第一种:UDF(User-Defined_-function)函数

                · 一对一的关系,输入一个值经过函数以后输出一个值;

                · 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

        第二种:UDAF(User-Defined Aggregation Function)聚合函数

                · 多对一的关系,输入多个值输出一个值,通常于groupBy联合使用;

        第三种:UDTF(User-Defined Table-Generating Functions)函数

                · 一对多的关系,输入一个值输出多个值(一行变多为行);

                · 用户自定义生成函数,有点像flatMap;

        (2)Spark支持定义函数

        目前来说Spark框架各个版本及各种语言对自定义函数的支持:在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF。

Spark版本及支持函数定义
Apache Spark VersionSpark SQL UDF(Python,Java,Scala)Spark SQL UDAF(Java,Scala)Spark SQL UDF(R)Hive UDF,UDAF,UDTF
1.1-1.4
1.5experimental
1.6
2.0
        (3)定义UDF函数

        ①sparksession.udf.register()

        注册的UDF可以用于DSL和SQL,返回值用于DSL风格,传参内给的名字用于SQL风格。

        ②pyspark.sql.functions.udf

        仅能用于DSL风格

        其中F是:from pyspark.sql import functions as F。其中,被注册为UDF的方法名是指具体的计算方法,如:def add(x, y): x + y  。 add就是将要被注册成UDF的方法名

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(['num'])

    # TODO 1:方式1 sparksession.udf.register(),DSL和SQL风格均可使用
    # UDF的处理函数
    def num_ride_10(num):
        return num * 10
    # 参数1:注册的UDF的名称,这个UDF名称,仅可以用于SQL风格
    # 参数2:UDF的处理逻辑,是一个单独定义的方法
    # 参数3:声明UDF的返回值类型,注意:UDF注册时候,必要声明返回值类型,并且UDF的真实返回值一定要和声明的返回值一致
    # 当前这种方式定义的UDF,可以通过参数1的名称用于SQL风格,通过返回值对象用户的DSL风格
    udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())

    # SQL风格中使用
    # selectExpr 以SELECT的表达式执行,表达式SQL风格的表达式(字符串)
    # select方法,接受普通的字符串字段名,或者返回值时Column对象的计算
    df.selectExpr('udf1(num)').show()

    # DSL 风格使用
    # 返回值UDF对象,如果作为方法使用,传入的参数一定是Column对象
    df.select(udf2(df['num'])).show()

    # TODO 2:方式2注册,仅能用于DSL风格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

        方式1结果:

        方式2结果:

                (4)定义返回Array类型的UDF

        注意:数组或者list类型,可以使用spark的ArrayType来描述即可。

        注意:声明ArrayType要类似这样::ArrayType(StringType()),在ArrayType中传入数组内的数据类型。

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([['hadoop spark flink'], ['hadoop flink java']])
    df = rdd.toDF(['line'])

    # 注册UDF,UDF的执行函数定义
    def split_line(data):
        return data.split(' ')

    # TODO 1:方式1 后见UDF
    udf2 = spark.udf.register('udf1', split_line, ArrayType(StringType()))

    # DLS 风格
    df.select(udf2(df['line'])).show()

    # SQL风格
    df.createTempView('lines')
    spark.sql('SELECT udf1(line) FROM lines').show(truncate=False)

    # TODO 2:方式的形式构建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

        

        (5)定义返回字典类型的UDF

        注意:字典类型返回值,可以用StructType来进行描述,StructType是—个普通的Spark支持的结构化类型.
        只是可以用在:
                · DF中用于描述Schema
                · UDF中用于描述返回值是字典的数据

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字: 1 2 3 在传入数字,返回数字所在序号对应的 字母 然后和数字结合组成dict返回
    # 例:传入1 返回{'num':1, 'letters': 'a'}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(['num'])

    # 注册UDF
    def process(data):
        return {'num': data, 'letters': string.ascii_letters[data]}

    '''
    UDF返回值是字典的话,需要用StructType来接收
    '''
    udf1 = spark.udf.register('udf1', process, StructType().add('num', IntegerType(), nullable=True).\
                              add('letters', StringType(), nullable=True))
    # SQL风格
    df.selectExpr('udf1(num)').show(truncate=False)
    # DSL风格
    df.select(udf1(df['num'])).show(truncate=False)

        (6)通过RDD构建UDAF函数

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
    df = rdd.map(lambda x: [x]).toDF(['num'])

    # 方法:使用RDD的mapPartitions 算子来完成聚合操作
    # 如果用mapPartitions API 完成UDAF聚合,一定要单分区
    single_partition_rdd = df.rdd.repartition(1)

    def process(iter):
        sum = 0
        for row in iter:
            sum += row['num']

        return [sum]    # 一定要嵌套list,因为mapPartitions方法要求返回值是list对象

    print(single_partition_rdd.mapPartitions(process).collect())

二、窗口函数

        (1)开窗函数简述

        ●介绍

        开窗函数的引入是为了既显示聚集前的数据又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。 开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

        ●聚合函数和开窗函数

        聚合函数是将多行变成一行,count,avg...

        开窗函数是将一行变成多行;

        聚合函数如果要显示其他的列必须将列加入到group by中,开窗函数可以不使用group by,直接将所有信息显示出来。

        ●开窗函数分类

        1.聚合开窗函数 聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY子句,但不可以是ORDER BY子句

        2.排序开窗函数 排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。

        3.分区类型NTILE的窗口函数

        (2)窗口函数的语法

        窗口函数的语法:

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([
        ('张三', 'class_1', 99),
        ('王五', 'class_2', 35),
        ('王三', 'class_3', 57),
        ('王久', 'class_4', 12),
        ('王丽', 'class_5', 99),
        ('王娟', 'class_1', 90),
        ('王军', 'class_2', 91),
        ('王俊', 'class_3', 33),
        ('王君', 'class_4', 55),
        ('王珺', 'class_5', 66),
        ('郑颖', 'class_1', 11),
        ('郑辉', 'class_2', 33),
        ('张丽', 'class_3', 36),
        ('张张', 'class_4', 79),
        ('黄凯', 'class_5', 90),
        ('黄开', 'class_1', 90),
        ('黄恺', 'class_2', 90),
        ('王凯', 'class_3', 11),
        ('王凯杰', 'class_1', 11),
        ('王开杰', 'class_2', 3),
        ('王景亮', 'class_3', 99)])
    schema = StructType().add('name', StringType()).\
        add('class', StringType()).\
        add('score', IntegerType())
    df = rdd.toDF(schema)
    # 创建表
    df.createTempView('stu')

    # TODO 1:聚合窗口函数的演示
    spark.sql('''
        SELECT *, AVG(score) over() AS avg_socre FROM stu
    ''').show()

    # TODO 2: 排序相关的窗口函数计算
    # RANK over, DENSE_RANK over, ROW_NUMBER over
    spark.sql('''
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
        RANK() OVER(ORDER BY score) AS RANK
        FROM stu
    ''').show()

    # TODO NTILE
    spark.sql('''
        SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    ''').show()

        TODO1结果:

        TODO2结果展示:

        TODO3结果展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/104105.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于数字电路交通灯信号灯控制系统设计-单片机设计

**单片机设计介绍,1617基于数字电路交通灯信号灯控制系统设计(仿真电路,论文报告 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序文档 六、 文章目录 一 概要 交通灯控制系统在城市交通控制中发挥着重要的作用&#xf…

Unsatisfied dependency expressed through bean property ‘sqlSessionTemplate‘;

代码没有问题,但是启动运行报错 2023-10-25 16:59:38.165 INFO 228964 --- [ main] c.h.h.HailiaowenanApplication : Starting HailiaowenanApplication on ganluhua with PID 228964 (D:\ganluhua\code\java\hailiao-java\target\classes …

【CSS】伪类和伪元素

伪类 :hover:悬停active:激活focus:获取焦点:link:未访问(链接):checked:勾选(表单)first-child:第一个子元素nth-child():指定索引的子元素&…

电脑软件:推荐一款非常强大的pdf阅读编辑软件

目录 一、软件简介 二、功能介绍 1、界面美观,打开速度快 2、可直接编辑pdf 3、非常强大好用的注释功能 4、很好用的页面组织和提取功能 5、PDF转word效果非常棒 6、强大的OCR功能 三、软件特色 四、软件下载 pdf是日常办公非常常见的文档格式,…

MOS管特性及其几种常用驱动电路详解,电子工程师手把手教你

在电子工程中,MOS管(金属氧化物半导体场效应管)是一种非常重要的半导体元件。 在这篇文章中,我们将深入探讨MOS管的特性,以及几种常用的驱动电路的工作原理和设计方法。无论你是初学者还是经验丰富的电子工程师&#…

Unity - 导出的FBX模型,无法将 vector4 保存在 uv 中(使用 Unity Mesh 保存即可)

文章目录 目的问题解决方案验证保存为 Unity Mesh 结果 - OK保存为 *.obj 文件结果 - not OK,但是可以 DIY importer注意References 目的 备忘,便于日后自己索引 问题 为了学习了解大厂项目的效果: 上周为了将 王者荣耀的 杨玉环 的某个皮肤…

音频类型识别方案-audioset_tagging

audioset_tagging github上开源的音频识别模型,可以识别音频文件的类型并打分给出标签占比,如图 echo off set CHECKPOINT_PATH"module/Cnn14_mAP0.431.pth" set MODEL_TYPE"Cnn14" set CUDA_VISIBLE_DEVICES0 python pytorch\in…

ROS笔记之visualization_msgs-Marker学习

ROS笔记之visualization_msgs-Marker学习 code review! 文章目录 ROS笔记之visualization_msgs-Marker学习一.line_strip例程二.line_list例程一二.line_list例程二二.TEXT_VIEW_FACING例程三.附CMakeLists.txt和package.xml五.关于odom、base_link和map坐标系六.关于visualiz…

idea免费插件分享

分享一些在开发中常用到的idea插件,都是一些我自己常用的,希望对各位程序员有帮助吧。 1、Chinese Language 汉化插件:中文语言包将为您的 IntelliJ IDEA, AppCode, CLion, DataGrip, GoLand, PyCharm, PhpStorm, RubyMine, WebStorm, 和Rid…

【python笔记】小甲鱼

P3 查看内置函数 dir(__builtins__) P4 变量名命名规则: 1、变量名不能以数字打头; 2、变量名可以是中文 字符串可以是: 1、单引号:文本中存在双引号时使用单引号 2、双引号:文本中存在单引号时使用双引号 当…

Postman的高级使用,傻瓜式学习【上】

目录 前言 1、小白使用Postman是不是这样的? 2、管理测试用例 2.1、创建用例集collections 3、用例集的导出导入 4、再次认识Postman ​编辑 5、Authrization授权 6、Pre-request Script 前置脚本 7、Tests 断言 Postman中常用的断言: 1&…

Python+playwright 实现Web UI自动化

实现Web UI自动化 技术:Pythonplaywright 目标:自动打开百度浏览器,并搜索“亚运会 金牌榜” 需安装:Playwright (不用安装浏览器驱动) # 使用浏览器,并可视化打开 browser playwright.ch…

计算机网络_03_tcp/ip四层模型

文章目录 1.为什么会有tcp/ip?2.tcp/ip是什么?3.为什么会有tcp/ip四层模型?4.tcp/ip四层模型介绍 1.为什么会有tcp/ip? 早期的计算机(计算机网络没有出现之前)几乎都是各自为战, 各种操作系统厂家百花齐放, 市面上的大部分计算机使用的都是不同的操作系统, 为每个人提供定…

解决“您点击的链接已过期”;The Link You Followed Has Expired的问题

今天WP碰到一个坑。无论发布文章还是更新插件、更换主题都是这么一种状态“您点击的链接已过期”;The Link You Followed Has Expired 百度出来的答案都是修改post_max_size 方法1. 通过functions.php文件修复 这种方法更容易,只需将以下代码添加到Wor…

(九)QVTKOpenGLNativeWidget同时显示点云和模型

一、加载点云 pcl::PointCloud<pcl::PointXYZ>::Ptr cloud(new pcl::PointCloud<pcl::PointXYZ>); //创建点云指针QString fileName QFileDialog::getOpenFileName(this, "Open PointCloud", ".", "Open PCD files(*.pcd)");if(f…

[c语言]深入返回值为函数指针的函数

之前写过个好玩代码 c语言返回值为函数指针的函数 一、发现 #include<stdio.h>int (*drink(void)) (void) {static int i;i;printf("(%d)\n", i);return (int(*)(void))drink; }int main() {drink()();return 0; }这个代码定义了一个返回值为函数指针的函数&…

Kafka-Java一:Spring实现kafka消息的简单发送

目录 写在前面 一、创建maven项目 二、引入依赖 2.1、maven项目创建完成后&#xff0c;需要引入以下依赖 2.2、创建工程目录 三、创建生产者 3.1、创建生产者&#xff0c;同步发送消息 3.2、创建生产者&#xff0c;异步发送消息 四、同步发送消息和异步发送消息的区别…

【计算机毕设案例推荐】高校学术研讨信息管理系统小程序SpringBoot+Vue+小程序

前言&#xff1a;我是IT源码社&#xff0c;从事计算机开发行业数年&#xff0c;专注Java领域&#xff0c;专业提供程序设计开发、源码分享、技术指导讲解、定制和毕业设计服务 项目名 基于SpringBoot的高校学术研讨信息管理系统小程序 技术栈 SpringBoot小程序VueMySQLMaven 文…

reqable(小黄鸟)+雷电抓包安卓APP

x 下载证书保存到雷电模拟器根目录(安装位置) 为什么? Android7以上&#xff0c;系统允许每个应用可以定义自己的可信CA集&#xff0c;部分的应用默认只会信任系统预装的CA证书&#xff0c;而不会信任用户安装的证书&#xff0c;之前的方法安装Burp/Fiddler证书都是用户证书…

sklearn-6算法链与管道

思想类似于pipeline&#xff0c;将多个处理步骤连接起来。 看个例子&#xff0c;如果用MinMaxScaler和训练模型&#xff0c;需要反复执行fit和tranform方法&#xff0c;很繁琐&#xff0c;然后还要网格搜索&#xff0c;交叉验证 1 预处理进行参数选择 对于放缩的数据&#x…
最新文章