【自定义Source、Sink】Flink自定义Source、Sink对redis进行读写操作

使用ParameterTool读取配置文件

Flink读取参数的对象

  1. Commons-cli: Apache提供的,需要引入依赖
  2. ParameterTool:Flink内置

ParameterTool 比 Commons-cli 使用上简便;

ParameterTool能避免Jar包的依赖冲突

建议使用第二种

使用ParameterTool对象可以直接获取配置文件中的信息,需要如下依赖

        <!-- Flink基础依赖 【ParameterTool类 在该依赖中】 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
        </dependency>
        <!-- Flink流批处理依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        </dependency>

Java读取资源的方式

  1. Class.getResourceAsStream(Path):Path 必须以 “/”,表示从ClassPath的根路径读取资源
  2. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”,默认从ClassPath的根路径读取资源

推荐使用第2种,以类加载器的方式获取静态资源文件,不要通过ClassPath的相对路径查找

最基本的工具类

public class ParameterUtil {
    	// 创建 ParameterTool 对象
        public static ParameterTool getParameters() {

        // 读取 resources 文件夹下 "flink.properties" 文件
        InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);

        try {
            return ParameterTool.fromPropertiesFile(inputStream);
        } catch (Exception e) {
            throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);
        }
    }
}

image-20231209095849541

可以通过 ParameterUtil.getParameters().get("redis.port") 直接读取key对应的value值

Flink写入Redis方式

  1. 继承RichSinkFunction (Flink-Stream)
  2. 使用第3方的包 (Apache-Bahir-Flink)

Apache-Bahir-Flink 的 Redis-Connector的缺点:

  1. 使用Jedis, 没有使用Lettuce
  2. 没有对 Flink Table/SQL Api 的支持

不少基于bahir二开的例子解决了上述问题

gitee地址:https://gitee.com/jeff-zou/flink-connector-redis?_from=gitee_search

github地址:https://github.com/apache/bahir-flink

bahir 集成了许多连接器,其中就包含Redis

image-20231209103659812

Flink官网上也可以看到bahir的影子

image-20231209104014483

方便起见,接下来就基于bahir,Flink写入Redis集群

基于巴希尔(Bahir)-Flink写入Redis集群

引入connector连接器依赖

        <!-- Flink-Connector-Redis -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
        </dependency>

依赖版本定义在父模块中

image-20231209100449996

实现RedisMapper接口自定义Sink

首先实现RedisMapper接口并指定泛型——处理元素的类型

/**
 * 基于apache bachir flink的RedisSink,作用于Redis String数据类型
 */
public class RedisSinkByBahirWithString implements RedisMapper<Tuple2<String, String>> {

    /**
     * 指定Redis的命令
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        /* **********************
         *
         * 如果Redis的数据类型是 hash 或 z-Set
         * RedisCommandDescription 的构造方法必须传入 additionalKey
         * additionalKey就是Redis的键
         *
         * *********************/
        return new RedisCommandDescription(RedisCommand.SET);
    }

    /**
     * 从数据流里获取Key值
     */
    @Override
    public String getKeyFromData(Tuple2<String, String> input) {
        return input.f0;
    }

    /**
     * 从数据流里获取Value值
     */
    @Override
    public String getValueFromData(Tuple2<String, String> input) {
        return input.f1;
    }
}

写入Redis工具类

public class RedisWriteUtil {

    /* **********************
     *
     * FlinkJedisClusterConfig:集群模式
     * FlinkJedisPoolConfig:单机模式
     * FlinkJedisSentinelConfig:哨兵模式
     *
     * *********************/

    // Jedis配置
    private static final FlinkJedisClusterConfig JEDIS_CONF;

    static {
        ParameterTool parameterTool = ParameterUtil.getParameters();
        String host = parameterTool.get("redis.host");
        String port = parameterTool.get("redis.port");

        /* **********************
         *
         * InetSocketAddress 是Java的套接字
         *
         * *********************/
        InetSocketAddress inetSocketAddress = new InetSocketAddress(host, Integer.parseInt(port));

        Set<InetSocketAddress> set = new HashSet<>();
        set.add(inetSocketAddress);
        JEDIS_CONF = new FlinkJedisClusterConfig
                .Builder()
                .setNodes(set)
                .build();
    }


    /**
     * 基于Bahir写入Redis,Redis的数据是String类型
     */
    public static void writeByBahirWithString(DataStream<Tuple2<String, String>> input) {
        input.addSink(new RedisSink<>(JEDIS_CONF, new RedisSinkByBahirWithString()));
    }

}

测试一下

class RedisWriteUtilTest {

    @DisplayName("测试基于Bahir写入Redis,Redis数据类型是String类型")
    @Test
    void writeByBahirWithString() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        DataStreamSource<Tuple2<String, String>> dataStream = env.fromElements(Tuple2.of("k", "v"));
        RedisWriteUtil.writeByBahirWithString(dataStream);

        env.execute();
    }
}

非常完美!写入成功

image-20231209105850707

Flink读取Redis方式

  1. 继承RichSourceFunction (实现自定义Source)
  2. 继承RichParallelSourceFunction (实现自定义Source)【可以指定并行度】
  3. 实现SourceFunction接口 (实现自定义Source)

RichParallelSourceFunction 和 RichSourceFunction区别

RichParallelSourceFunction 可以设置并行度

RichParallelSourceFunction 和 RichSourceFunction 代码是可以互相套用

RichParallelSourceFunction 默认的并行度是cpu 的 核心数(core数)

RichSourceFunction 的并行度只能是1

继承RichSourceFunction类-Flink读取Redis集群

前置准备

定义枚举类

Redis数据类型枚举类

@Getter
public enum RedisDataType {

    STRING,
    HASH,
    LIST,
    SET,
    SORTED_SET,
    ;

    RedisDataType() {
    }
}

定义Redis命令的枚举类,便于Source判断操作

@Getter
public enum RedisCommand {

    // get string
    GET(RedisDataType.STRING);

    private final RedisDataType redisDataType;

    RedisCommand(RedisDataType redisDataType) {
        this.redisDataType = redisDataType;
    }
}

Jedis配置类

bahir依赖中自带jedis依赖一般不用,自行引入jedis,jedis依赖版本要与巴希尔中jedis版本保持一致

image-20231209111800457

public class JedisConf {

    public static JedisCluster getJedisCluster() throws IOException {

        ParameterTool parameterTool =
                ParameterUtil.getParameters();
        String host = parameterTool.get("redis.host");
        String port = parameterTool.get("redis.port");

        /* **********************
         * Jedis对象
         *
         * JedisPool : 用于redis单机版
         * JedisCluster: 用于redis集群
         *
         * JedisCluster对象能够自动发现正常的redis节点
         *
         * *********************/

        HostAndPort hostAndPort = new HostAndPort(
                host,
                Integer.parseInt(port)
        );
        Set<HostAndPort> nodes = new HashSet<>();
        nodes.add(hostAndPort);

        return new JedisCluster(nodes);

    }
}

封装Jedis对象的redis方法

封装Jedis对象的redis方法,方便统一调用和维护

public class JedisBuilder {

    private JedisCluster jedis = null;

    public JedisBuilder(JedisCluster jedisCluster) {
        this.jedis = jedisCluster;
    }

    public void close() {
        if (this.jedis != null) {
            this.jedis.close();
        }
    }

    /**
     * Redis的Get方法
     */
    public String get(String key) {
        return jedis.get(key);
    }
}

自定义Source

Redis数据的映射对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RedisPO implements Serializable {

    private String data;
    
}

Flink 自定义Redis Source读取Redis

/* **********************
 * 【富函数类】 比函数类提供了更多函数生命周期,提供了获取上下文的方法
 * 富函数类通常是抽象类
 * *********************/
public class RedisSource extends RichSourceFunction<RedisPO> {

    /**
     * Jedis对象
     */
    private JedisBuilder jedisBuilder;

    /**
     * Redis命令枚举对象
     */
    private final RedisCommand redisCommand;

    /**
     * redis key
     */
    private final String key;

    public RedisSource(RedisCommand redisCommand, String key) {
        this.redisCommand = redisCommand;
        this.key = key;
    }

    /**
     * volatile 修饰的变量,它的更新都会通知其他线程.
     */
    private volatile boolean isRunning = true;

    /**
     * Redis的连接初始化
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        JedisCluster jedisCluster = JedisConf.getJedisCluster();
        jedisBuilder = new JedisBuilder(jedisCluster);
    }

    /**
     * Redis数据的读取
     */
    @Override
    public void run(SourceContext<RedisPO> output) throws Exception {

        /* **********************
         *
         * 一直监听Redis数据的读取
         *
         * *********************/

        String data = null;
        // while (isRunning) {

        switch (redisCommand.getRedisDataType()) {
            case STRING:
                data = jedisBuilder.get(key);
        }

        output.collect(new RedisPO(data));
        // }

    }
    
    @Override
    public void cancel() {
        this.isRunning = false;
    }

}

读取Redis工具类

public class RedisReadUtil {

    public static DataStream<RedisPO> read(
            StreamExecutionEnvironment env,
            RedisCommand redisCommand,
            String key) {
        return env.addSource(new RedisSource(redisCommand, key));
    }
}

测试一下

class RedisReadUtilTest {

    @DisplayName("测试自定义Source读取Redis,Redis数据类型是String类型")
    @Test
    void testReadByCustomSourceWithString() throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<RedisPO> dataStream = RedisReadUtil.read(
                env,
                RedisCommand.GET,
                "k"
        );

        dataStream.print();
        env.execute();
    }
}

测试成功!

image-20231209113539037

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/232616.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STL(七)(map篇)

### 这里重点学习map ### 在实际做题过程中,multimap几乎用不到### unordered_map拥有极好的平均时间复杂度和极差的最坏时间复杂度,所以他的时间复杂度是不稳定的,unordered_map一般用不到,要做一个了解 1.map map是一种关联容器,用于存储一组键值对(key-value pairs),其中每…

鸿蒙开发组件之Slider

一、Slider控件是鸿蒙开发中的滑动条组建&#xff0c;初始化方式 Slider({min:0, //最小值max:100,//最大值value:30,//默认值step:10,//步长&#xff0c;每次滑动的差值style:SliderStyle.OutSet, //滑块的样式&#xff0c;默认outsetdirection:Axis.Horizontal, //水平方式的…

Transformer 简介

Transformer 是 Google 在 2017 年底发表的论文 Attention Is All You Need 中所提出的 seq2seq 模型。Transformer 模型的核心是 Self-Attention 机制&#xff0c;能够处理输入序列中的每个元素&#xff0c;并能计算其与序列中其他元素的交互关系的方法&#xff0c;从而能够更…

【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作

ClickHouse官网文档 Flink 读取 ClickHouse 数据两种驱动 ClickHouse 官方提供Clickhouse JDBC.【建议使用】第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护 ClickHouse 官方提供Clickhouse JDBC…

【小沐学Python】Python实现语音识别(SpeechRecognition)

文章目录 1、简介2、安装和测试2.1 安装python2.2 安装SpeechRecognition2.3 安装pyaudio2.4 安装pocketsphinx&#xff08;offline&#xff09;2.5 安装Vosk &#xff08;offline&#xff09;2.6 安装Whisper&#xff08;offline&#xff09; 3 测试3.1 命令3.2 fastapi3.3 go…

【数据结构】——排序篇(上)

前言&#xff1a;前面我们已经学过了许许多多的排序方法&#xff0c;如冒泡排序&#xff0c;选择排序&#xff0c;堆排序等等&#xff0c;那么我们就来将排序的方法总结一下。 我们的排序方法包括以下几种&#xff0c;而快速排序和归并排序我们后面进行详细的讲解。 直接插入…

C#注册表技术及操作

目录 一、注册表基础 1.Registry和RegistryKey类 &#xff08;1&#xff09;Registry类 &#xff08;2&#xff09;RegistryKey类 二、在C#中操作注册表 1.读取注册表中的信息 &#xff08;1&#xff09;OpenSubKey()方法 &#xff08;2&#xff09;GetSubKeyNames()…

2-Spring

2-Spring 文章目录 2-Spring项目源码地址Spring概述Spring特点&#xff08;优点&#xff09;Spring相关学习网站基于Maven的Spring框架导入Spring的组成及拓展 Spring-IOC--原型理解IOC-原型--示例开发示例-常规开发示例-Set函数&#xff08;IOC原型&#xff09;开发示例-对比思…

Python-pdf工具自制(合并、拆分、删除)

pdf工具&#xff0c;之前写的合并工具有点麻烦&#xff0c;使用PyQt5库重写合并拆分和删除指定页面的程序 实现如图&#xff1a; 代码&#xff1a; import sysimport osfrom PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget, QFileDia…

新版Android Studio 正则表达式匹配代码注释,删除注释,删除全部注释,IntelliJ IDEA 正则表达式匹配代码注释

正则表达式匹配代码注释 完整表达式拼接Android Studio 搜索匹配【IntelliJ IDEA 也是一样的】 完整表达式拼接 (/*{1,2}[\s\S]?*/)|(//[\x{4e00}-\x{9fa5}].)|(<!-[\s\S]?–>)|(^\s\n)|(System.out.println.*) 表达式拆解&#xff0c;可以根据自己需求自由组合&#x…

【Dubbo3云原生微服务开发实战】「Dubbo前奏导学」 RPC服务的底层原理和实现

RPC服务 RPC服务介绍RPC通信模式RPC架构组成RPC技术要点RPC通信技术选项分析RPC实战开发6大基础组件基础组件之Guava基础组件之Hutools基础组件之ReflectionASM基础组件之FastJSON/FastJSON2基础组件之FST相比FastJSON的优势 基础组件之Commons-Codec RPC框架层面选项分析RPC组…

Cocos Creator:创建棋盘

Cocos Creator&#xff1a;创建棋盘 创建地图三部曲&#xff1a;1. 创建layout组件2. 创建预制体Prefab&#xff0c;做好精灵贴图&#xff1a;3. 创建脚本LayoutSprite.ts收尾工作&#xff1a; 创建地图三部曲&#xff1a; 1. 创建layout组件 使用layout进行布局&#xff0c;…

sensitive word 敏感词(脏词) 如何忽略无意义的字符?达到更好的过滤效果?

忽略字符 说明 我们的敏感词一般都是比较连续的&#xff0c;比如 傻帽 那就有大聪明发现&#xff0c;可以在中间加一些字符&#xff0c;比如【傻!#$帽】跳过检测&#xff0c;但是骂人等攻击力不减。 那么&#xff0c;如何应对这些类似的场景呢&#xff1f; 我们可以指定特…

【论文精读】REACT: SYNERGIZING REASONING AND ACTING IN LANGUAGE MODELS

REACT: SYNERGIZING REASONING AND ACTING IN LANGUAGE MODELS 前言ABSTRACT1 INTRODUCTION2 REACT: SYNERGIZING REASONING ACTING3 KNOWLEDGE-INTENSIVE REASONING TASKS3.1 SETUP3.2 METHODS3.3 RESULTS AND OBSERVATIONS 4 DECISION MAKING TASKS5 RELATED WORK6 CONCLUSI…

Ubuntu20.04使用cephadm部署ceph集群

文章目录 Requirements环境安装Cephadm部署Ceph单机集群引导&#xff08;bootstrap&#xff09;建立新集群 管理OSD列出可用的OSD设备部署OSD删除OSD 管理主机列出主机信息添加主机到集群从集群中删除主机 部署Ceph集群 Cephadm通过在单个主机上创建一个Ceph单机集群&#xff0…

★102. 二叉树的层序遍历

102. 二叉树的层序遍历 很巧妙的&#xff0c;又学习了一种层次遍历的方法&#xff0c;就是说根据当前的队列的长度去遍历&#xff0c;遍历的当前队列的长度就是该层次的节点个数。 /*** Definition for a binary tree node.* public class TreeNode {* int val;* Tr…

Flink 本地单机/Standalone集群/YARN模式集群搭建

准备工作 本文简述Flink在Linux中安装步骤&#xff0c;和示例程序的运行。需要安装JDK1.8及以上版本。 下载地址&#xff1a;下载Flink的二进制包 点进去后&#xff0c;选择如下链接&#xff1a; 解压flink-1.10.1-bin-scala_2.12.tgz&#xff0c;我这里解压到soft目录 [ro…

UniGui禁用缓存

今天有人问到如何禁用缓存&#xff0c;原因是引用了第三方js,css等文件&#xff0c;但是因为缓存的原因&#xff0c;修改后没有及时生效。 首先纠正一点&#xff0c;地址后加?不会禁用缓存 可以看到&#xff0c;后面即使加了&#xff1f;但仍然是from memory cache。对于浏览…

管理类联考——数学——真题篇——按知识分类——数据

文章目录 排列组合2023真题&#xff08;2023-05&#xff09;-数据分析-排列组合-组合-C运算-至少-需反面思考真题&#xff08;2023-08&#xff09;-数据分析-排列组合-相邻不相邻-捆绑法插空法-插空法注意空位比座位多1个&#xff0c;是用A&#xff1b;捆绑法内部排序用A&#…

ubuntu 20.04.6 server 服务器 下载与安装(配置静态IP)

下载地址&#xff1a;https://releases.ubuntu.com/20.04.6/ubuntu-20.04.6-live-server-amd64.iso 第一步&#xff1a; 准备U盘&#xff0c;使用软碟通将下载好的镜像写入到U盘中 软碟通网址&#xff1a;https://www.cn.ultraiso.net/xiazai.html 点击&#xff1a;文件 ->…