Flink读取Kafka写入Paimon

Flink SQL

-- 1)注册 Paimon 源
CREATE CATALOG paimon_hive
WITH('type' = 'paimon','warehouse' = 'hdfs://xxxxx/paimon','metastore' = 'hive','hive-conf-dir' = '/xxxxx/conf','uri' = 'thrift://域名1:9083,thrift://域名2:9083');-- 2)声明 Kafka 源
create table kafkaSource (`_timestamp` string,`name` string,`age` string,`id` string
) with ('connector' = 'kafka','format' = 'json','topic' = 'topic1234','properties.bootstrap.servers' = '你的Kafka Brokers','properties.group.id' = 'kafka-to-paimon','scan.startup.mode' = 'latest-offset'
);-- 3)读取+写入Paimon
INSERT INTO paimon_hive.paimon.odm_kafka_log
SELECTname AS `name`,age AS `age`,id AS `id`FROM_UNIXTIME(CAST(CAST(`_timestamp` AS BIGINT) / 1000 AS BIGINT), 'yyyyMMdd') AS `day`
FROM kafkaSource;

Flink Table (Java)

Maven依赖

<!-- 添加Flink依赖-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.15.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.15.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.0</version>
</dependency>
<!-- flink table相关类 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.15.0</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.15.0</version>
</dependency>
<!-- 添加Paimon依赖-->
<dependency><groupId>org.apache.paimon</groupId><artifactId>paimon-flink-1.15</artifactId><version>0.5.0-incubating</version>
</dependency>

Job类

package job;import com.google.protobuf.ByteString;
import function.GalaxyToPaimonFlatMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;/*** @Author zhangjinke* @Create 2023/12/25 17:02* @Description 将银河PB格式日志写入到Paimon* @Wiki -* @Modifier -* @ModificationTime -* @Node -*/public class GalaxyToPaimonJob {private static final Logger LOG = LoggerFactory.getLogger(GalaxyToPaimonJob.class);private static final String GROUP_ID = "job.GalaxyToPaimonJob";public static void main(String[] args) {try {ParameterTool tool = ParameterTool.fromArgs(args);int source = tool.getInt("source");int flatmap = tool.getInt("flatmap");// Kafka consumerProperties galaxyPro = new Properties();properties.setProperty("bootstrap.servers", bootstrap_servers);properties.setProperty("group.id", groupId);// 自动检测topic分区变化时间间隔properties.put("flink.partition-discovery.interval-millis", "60000");properties.put("refresh.leader.backoff.ms", 6000);KafkaSource<ByteString> galaxyKafkaSource = KafkaSource.<ByteString>builder().setTopics(PropertyUtil.get("user_event_etl_topic")).setValueOnlyDeserializer(new ByteStringSchema()).setProperties(galaxyPro).setStartingOffsets(OffsetsInitializer.latest()).build();/** 1、 创建flink流式执行环境 */final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(120000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(180000L);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getConfig().setAutoWatermarkInterval(0);env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(200, 60000 * 2L));env.setParallelism(32);/** 2、 添加 用户+事件 Source 源 */SingleOutputStreamOperator<Row> rsoso = env.fromSource(galaxyKafkaSource, WatermarkStrategy.noWatermarks(), "GalaxyToPaimonSource").uid("GalaxyToPaimonSource_Uid").name("GalaxyToPaimonSource_Name").setParallelism(source)/** 3、 简单取出字段,下发GalaxyEntity对象 */.flatMap(new GalaxyToPaimonFlatMap()).uid("GalaxyToPaimonFlatMapFunction_Uid").name("GalaxyToPaimonFlatMapFunction_Name").setParallelism(flatmap).returns(Types.ROW_NAMED(new String[]{"realtime", "ip", "session_id", "app_id", "device_uuid""day", "hour"},Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING));/** 4、创建flink table执行环境 */StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Schema schema = Schema.newBuilder().column("realtime", DataTypes.STRING()).column("ip", DataTypes.STRING()).column("session_id", DataTypes.STRING()).column("app_id", DataTypes.STRING()).column("device_uuid", DataTypes.STRING()).column("day", DataTypes.STRING()).column("hour", DataTypes.STRING()).build();/** 5、创建 Paimon catalog */tableEnv.executeSql("CREATE CATALOG paimon_hive WITH ('type' = 'paimon', 'warehouse'='hdfs://xxxxx/paimon')");tableEnv.executeSql("USE CATALOG paimon_hive");/** 6、将流表注册为一个临时视图 */tableEnv.createTemporaryView("odm_event_realtime_view", rsoso, schema);/** 7、将数据插入到 Paimon 表中 */tableEnv.executeSql("INSERT INTO paimon.odm_event_realtime SELECT * FROM odm_event_realtime_view");env.execute("job.GalaxyToPaimonJob");} catch (Exception e) {LOG.error("GalaxyToPaimonJob启动失败!", e);}}
}

Function类

package function;import com.google.protobuf.ByteString;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class GalaxyToPaimonFlatMap extends RichFlatMapFunction<ByteString, Row> {private static final Logger log = LoggerFactory.getLogger(GalaxyToPaimonFlatMap.class);private static final DateTimeFormatter inputDateFormat = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");private static final DateTimeFormatter outputDateFormat = DateTimeFormatter.ofPattern("yyyyMMdd");private static final DateTimeFormatter outputHourFormat = DateTimeFormatter.ofPattern("yyyyMMddHH");@Overridepublic void flatMap(ByteString bytes, Collector<Row> out) {try {// 创建结果RowRow row = new Row(86);// 使用myProtoBufObj对象依次赋值myProtoBufObjDataToProtoBuf.myProtoBufObj myProtoBufObj = myProtoBufObjDataToProtoBuf.myProtoBufObj.parseFrom(bytes);String realtime = myProtoBufObj.getRealtime();row.setField(0, realtime);row.setField(1, myProtoBufObj.getIp());row.setField(2, myProtoBufObj.getSessionId());row.setField(3, myProtoBufObj.getAppId());row.setField(4, myProtoBufObj.getDeviceUuid());row.setField(5, LocalDateTime.parse(realtime, inputDateFormat).format(outputDateFormat));row.setField(6, LocalDateTime.parse(realtime, inputDateFormat).format(outputHourFormat));// 将 Row 对象输出out.collect(row);} catch (Exception e) {log.error("function.GalaxyToPaimonFlatMap error is:  ", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/437.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【开源工具】:基于PyQt5的智能网络驱动器映射工具开发全流程(附源码)

&#x1f517; 【开源工具】&#xff1a;基于PyQt5的智能网络驱动器映射工具开发全流程 &#x1f308; 个人主页&#xff1a;创客白泽 - CSDN博客 &#x1f525; 系列专栏&#xff1a;&#x1f40d;《Python开源项目实战》 &#x1f4a1; 热爱不止于代码&#xff0c;热情源自每…

MySQL 索引学习笔记

1.二叉树&#xff0c;红黑树&#xff0c;B 树&#xff0c;B树 二叉树&#xff1a;就是每个节点最多只能有两个子节点的树&#xff1b; 红黑树&#xff1a;就是自平衡二叉搜索树&#xff0c;红黑树通过一下五个规则构建&#xff1a; 1.节点只能是红色或黑色&#xff1b; 2.根…

嵌入式通信模块实战新范式:基于虚拟仿真平台的NB-IoT核心技能训练——零硬件损耗的全栈式实验方案,重构物联网通信教学逻辑

在万物智联时代&#xff0c;NB-IoT通信模块已成为低功耗广域网的基石。BC260Y作为行业主流模组&#xff0c;其AT指令控制与网络诊断能力是嵌入式开发者的必备技能。传统教学受限于硬件采购成本、设备管理难度及实验风险&#xff0c;难以开展规模化训练。嵌入式仿真实验教学平台…

docker compose的变量使用说明

澄清一下 x-shared-env 和 &shared-api-worker-env 的作用范围&#xff1a; 核心概念&#xff1a;Docker Compose 配置 vs 容器环境 x-shared-env: &shared-api-worker-env 是 Docker Compose 配置的一部分 这些定义仅在 Docker Compose 解析 YAML 文件时 有效它们定义…

美团完整面经

面试岗位 面试的岗位 - 2025春季校招 【转正实习】软件服务工程师-后端方向&#xff08;成都 - 软硬件服务-SaaS事业部&#xff09; 一面&#xff08;业务初试 - 30min&#xff09; 问题 自我介绍 Java基础 HashMap底层用的数据结构是什么&#xff1f;是线程安全的吗&…

JAVA毕业设计227—基于SpringBoot+hadoop+spark+Vue的大数据房屋维修系统(源代码+数据库)

毕设所有选题&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于SpringBoothadoopsparkVue的大数据房屋维修系统(源代码数据库)227 一、系统介绍 本项目前后端分离&#xff0c;分为业主、维修人员、管理员三种角色 1、业主&#xff1a; 登…

uniapp 页面栈一定深度后,回首页导航到新页面的解决方案

uniapp 页面栈一定深度后&#xff0c;回首页导航到新页面的解决方案 uniapp 页面导航解决方案 在 uniapp 中&#xff0c;要实现先弹出页面栈回到首页&#xff0c;然后再跳转到指定页面。 /*** description 后台选择链接专用跳转*/ interface Link {path: string;name?: stri…

java实现Google邮箱SMTP协议

一、开通Google的SMTP协议 在谷歌邮箱中开启IMAP访问 到google的设置中开启两步验证功能 在到 创建和管理应用专用密码 二、java中实现 引入maven <!--邮件--><dependency><groupId>com.sun.mail</groupId><artifactId>javax.mail</artif…

【2025最新】Adobe Illustrator下载保姆级安装教程(附官方下载链接)

文章目录 Adobe Illustrator 2024新功能介绍如何提高Adobe Illustrator的运行效率 Adobe Illustrator 这款神器相信不用我多介绍了吧&#xff0c;设计师们的得力助手&#xff01;最新的2025版据说功能和体验都提升了不少。这篇呢&#xff0c;算是我个人整理的一个超详细adobe i…

2025.06.11【Ribo-seq】|根据注释文件获取外显子及ORF序列

文章目录 一、准备材料二、提取外显子区间为BED文件1. 提取GTF中exon为BED 三、用bedtools提取外显子fasta四、后续拼接外显子为ORF序列五、流程总结 一、准备材料 基因组fasta&#xff08;如&#xff1a;genome.fa&#xff09;RiboCode生成的GTF文件&#xff08;如&#xff1…

python第48天打卡

知识点回顾&#xff1a; 随机张量的生成&#xff1a;torch.randn函数卷积和池化的计算公式&#xff08;可以不掌握&#xff0c;会自动计算的&#xff09;pytorch的广播机制&#xff1a;加法和乘法的广播机制 ps&#xff1a;numpy运算也有类似的广播机制&#xff0c;基本一致 作…

Day50 Python打卡训练营

知识点回顾&#xff1a; 1. resnet结构解析 2. CBAM放置位置的思考 3. 针对预训练模型的训练策略 a. 差异化学习率 b. 三阶段微调 现在我们思考下&#xff0c;是否可以对于预训练模型增加模块来优化其效果&#xff0c;这里我们会遇到一个问题 预训练模型的结构和权重是固定…