FlinkAPI开发之自定义函数UDF

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类

函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:

方式一:通过匿名类来实现FilterFunction接口:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/7 实现自定义接口FilterFunction
        DataStream<Orders> streamOperator = ordersDataStreamSource.filter(new FilterFunction<Orders>() {
            @Override
            public boolean filter(Orders orders) throws Exception {
                //过滤金额大于10000元的订单
                if (orders.getOrder_amount() > 50) {
                    return true;
                } else {
                    return false;
                }

            }
        });
        streamOperator.print();
        environment.execute();
    }
}

在这里插入图片描述

方式二: 实现FilterFunction接口

import com.zxl.bean.Orders;
import org.apache.flink.api.common.functions.FilterFunction;

public class OrderFilter implements FilterFunction<Orders> {
    @Override
    public boolean filter(Orders orders) throws Exception {
        //过滤金额大于10000元的订单
        if (orders.getOrder_amount() > 50) {
            return true;
        } else {
            return false;
        }
    }
}
import com.zxl.Functions.OrderFilter;
import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/7 返回类型记得修改为 DataStream
        DataStream<Orders> operator = ordersDataStreamSource.filter(new OrderFilter());
        operator.print();
        environment.execute();
    }
}

在这里插入图片描述

方式三:采用匿名函数(Lambda)

//创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明
        DataStream<Orders> streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);
        streamOperator.print();
        environment.execute();

在这里插入图片描述

富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DemoTest {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        ordersDataStreamSource.print();
        // TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型
        DataStream<String> operator = ordersDataStreamSource.map(new RichMapFunction<Orders, String>() {
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");
            }

            @Override
            public String map(Orders orders) throws Exception {
                return orders.getOrder_date().toString()+"字符串";
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");
            }
        });
        operator.print();
        environment.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/299391.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

论文浅尝 | 以词-词关系进行分类的统一命名实体识别

笔记整理&#xff1a;曹旭东&#xff0c;东南大学硕士&#xff0c;研究方向为知识图谱构建、自然语言处理 链接&#xff1a;https://arxiv.org/abs/2112.10070 1. 动机 在以前的工作中&#xff0c;命名实体识别&#xff08;NER&#xff09;涉及的主要问题有三种类型&#xff0c…

每日一题——LeetCode1051.高度检查器

方法一 sort排序&#xff1a; 创建一个元素和heights一模一样的expect数组 &#xff0c;将expect数组从小到大进行排序&#xff0c;比较heights和expect相同位置不同的元素个数 var heightChecker function(heights) {var expect [],count0for(const n of heights){expect.…

1、Excel工作场景和知识点总结

参考&#xff1a; 戴师兄–戴你玩转数据分析 Excel发挥战斗力的场景 地量级数据的存储 我们日常所用的各种数据表格&#xff0c;基本都以excel的.xlsx或者.xls格式进行存储。并且因为大家电脑上都有excel&#xff0c;这就使excel的通用性很高(我用excel做好一个表发给你&#x…

消息队列-RocketMQ-概览与搭建

RocketMQ 领域模型 RockeMQ整体结构预览 RocketMQ 中的一些概念 Topic&#xff1a;主题&#xff0c;可以理解为类别、分类的概念 MessageQueue&#xff1a;消息队列&#xff0c;存储数据的一个容器&#xff08;队列索引数据&#xff09;&#xff0c;默认每个 Topic 下有 4 个队…

登录验证

JWT Json Web Token 定义了一种简洁的&#xff0c;自包含的格式&#xff0c;用于在通信双方以json数据格式安全的传输信息。由于数字签名的存在&#xff0c;这些信息是可靠的 组成 第一部分 header 头&#xff1a;记录令牌类型&#xff0c;签名算法等 第二部分 Payload 有效载荷…

servlet+jdbc+jsp实现登录界面的验证(基于MVC思想)

一、MVC的概念 MVC是模型(Model)和视图(View)以及控制器(Controller)的简写&#xff0c;是一种将数据、界面显示和业务 逻辑进行分离的组织方式&#xff0c;这样在改进界面及用户交互时&#xff0c;不需要重新编写业务逻辑&#xff0c;从而提高了 代码的可维护性。 M&#xf…

基于JavaWeb+SSM+Vue基于微信小程序的消防隐患在线举报系统的设计与实现

基于JavaWebSSMVue基于微信小程序的消防隐患在线举报系统的设计与实现 源码获取入口KaiTi 报告Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 KaiTi 报告 1.1 题目背景 随着信息化飞速发展&#xff0c;互联网不…

RabbitMQ(八)消息的序列化

目录 一、为什么需要消息序列化&#xff1f;二、常用的消息序列化方式1&#xff09;Java原生序列化&#xff08;默认&#xff09;2&#xff09;JSON格式3&#xff09;Protobuf 格式4&#xff09;Avro 格式5&#xff09;MessagePack 格式 三、总结 RabbitMQ 是一个强大的消息中间…

网络层协议及IP编址

0x00 前言 本节为网络层协议及IP编址内容 IP地址的范围&#xff1a;0.0.0.0-255.255.255.255 IP分为网络位以及主机位。子网划分就是向主机位借位。 网络层协议 IPICMP&#xff08;internet Control message protocol&#xff09;IPX IP协议的作用 为网络层的设备提供逻…

程序语言相关知识——偏向Eigen矩阵

1 查看 Eigen库表示的矩阵 方法 1.1 列矩阵x在监视中&#xff0c;这样查看&#xff0c;数值右侧的圈圈 可用于更新数值 随程序 1.2 比较全的方法&#xff1a;来自于知乎&#xff1a;https://zhuanlan.zhihu.com/p/625334009?utm_id0 1.3 eigen的用法&#xff1a;https://ww…

linux高级管理——Squid代理

一、squid服务基础&#xff1a; 1.1缓存代理的概述&#xff1a; 代理的工作机制 当客户机通过代理来请求Web页面时&#xff0e;指定的代理服务器会先检查自己的缓存&#xff0c;如果缓存中已经有客户机需要的页面&#xff0c;则直接将缓存中的页面内容反馈给客户机:如果缓存中…

整合事务,名词,概念

1、MySQL是单进程多线程&#xff08;而Oracle等是多进程&#xff09;&#xff0c;也就是说MySQL实例在系 统上表现就是一个服务进程&#xff0c;即进程&#xff0c;&#xff08;通过多种方法可以创建多实例&#xff0c;再安装一个端口号不同的mysql&#xff0c;或者通过workben…

滞回比较器(施密特触发器)在软件中的应用-电池电压显示

1、单限比较器和滞回比较器的区别 在单限比较器中&#xff0c;输入电压在阈值电压附近的任何微小变化&#xff0c;都会引起输出电压的跃变。不管这种微小变化是来源于输入电压还是来源于外部干扰。因此&#xff0c;虽然单限比较器很灵敏&#xff0c;但是抗干扰能力差。在单限比…

学习Redis缓存

学习Redis缓存 NoSQL和SQL的区别缓存缓存作用缓存成本添加Redis缓存 Redis特征Redis中数据结构Redis通用命令String类型Key的层级格式Hash类型Redis的Java客户端 NoSQL和SQL的区别 缓存 缓存就是数据交换的缓冲区&#xff0c;是存储数据的临时地方&#xff0c;一般读写性比较高…

10款有趣的前端源码分享(附效果图及在线演示)

分享10款非常有趣的前端特效源码 其中包含css动画特效、js原生特效、svg特效以及小游戏等 下面我会给出特效样式图或演示效果图 但你也可以点击在线预览查看源码的最终展示效果及下载源码资源 自毁按钮动画特效 自毁按钮动画特效 点击打开盒子可以点击自毁按钮 进而会出现自毁…

使用Python+selenium3.0实现第一个自动化测试脚本

这篇文章主要介绍了使用Pythonselenium实现第一个自动化测试脚本&#xff0c;文中通过示例代码介绍的非常详细&#xff0c;对大家的学习或者工作具有一定的参考学习价值&#xff0c;需要的朋友们下面随着小编来一起学习学习吧 最近在学web自动化&#xff0c;记录一下学习过程。…

Linux驱动学习—中断

1、中断基础概念 1.1 什么是中断 CPU在正常运行期间&#xff0c;由外部或者内部引起的时间&#xff0c;让CPU停下当前正在运行的程序&#xff0c;转而去执行触发他的中断所对应的程序&#xff0c;这就是中断。 响应中断的过程&#xff1a; <1>中断请求 <2>中断…

Spring Cloud之OpenFeign异常处理

简易原理图 原理基于请求头传递错误消息&#xff0c;利用aop和全局异常拦截机制实现。 服务提供者 远程调用本地方法b&#xff0c;throw异常出来FeignExceptionAspect AOP拦截处理异常到请求头中&#xff0c;继续throwGlobalExceptionHandler处理&#xff0c;返回响应Respons…

1868_C语言单向链表的实现

Grey 全部学习内容汇总&#xff1a; GitHub - GreyZhang/c_basic: little bits of c. 1868_C语言中简单的链表实现 简单整理一下链表的实现&#xff0c;这一次结合前面看到的一些代码简单修改做一个小结。 主题由来介绍 以前工作之中链表的使用其实不多&#xff0c;主要是…