Flink电商实时数仓(六)

交易域支付成功事务事实表

  1. 从topic_db业务数据中筛选支付成功的数据
  2. 从dwd_trade_order_detail主题中读取订单事实数据、LookUp字典表
  3. 关联三张表形成支付成功宽表
  4. 写入 Kafka 支付成功主题
执行步骤
  1. 设置ttl,通过Interval join实现左右流的状态管理
  2. 获取下单明细数据:用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集。要注意:Interval Join要求表中均为Append数据,即“只能新增,不能修改”,订单明细表数据生成过程中用到了left join,生成了回撤流,看似不满足Interval Join的条件。但是,回撤数据进入Kafka会以null值形式存在,如果用Kafka Connector将订单明细封装为动态表,null值会被过滤,最终得到的是相同主键存在重复数据的Append流(动态表本质上就是流),满足Interval Join的条件。
    • Interval join只支持事件时间,因此数据必须携带水位线;建表时水位线的相关语法为 water for order_time as order_time - interval '5' second,这里要求数据是timestamp(3)
    • 原有的时间数据类型是bigint类型的ts,使用row_time as TO_TIMESTAMP_LTZ(ts,3)这个函数即可将原有的时间数据转换为水位线所需的数据类型
  3. 筛选支付数据:
    • 支付状态为支付成功
    • 操作类型为update
  4. 构建 LookUp 字典表
  5. 联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

核心代码如下

 public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {
        //核心业务逻辑
        //1. 读取TopicDB主题数据
        createTopicDb(groupId,tableEnv);

        //2. 筛选支付成功的数据,从业务数据topic_db中
        filterPaymentTable(tableEnv);

        //3. 读取下单详情表数据, 从kafka读取数据
        createOrderDetailTable(tableEnv, groupId);

        //4. 创建base.dic字典表,从HBase维度数据中读取
        createBaseDic(tableEnv);

        //tableEnv.executeSql("select * from order_detail").print();
        //tableEnv.executeSql("select * from base_dic").print();

        //tableEnv.executeSql("select to_timestamp_ltz(ts,3) from order_detail");

        //5. 使用interval join 完成支付成功流和订单详情数据关联
        intervalJoin(tableEnv);

        //6. 使用lookup join完成维度退化
        Table resultTable = lookupJoin(tableEnv);

        //7. 创建upsert kafka连接器写出
        createKafkaSink(tableEnv);

        resultTable.insertInto(Constant.TOPIC_DWD_TRADE_ORDER_PAYMENT_SUCCESS).execute();

    }

事实表动态分流

在这里插入图片描述

dwd层其他的事实表都是从topic_db中去业务数据库一张表的变更数据,按照某些过滤后写入kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。有点类似我们前面实现DIM层的动态配置。

  1. 清洗过滤和转换:判断是否满足json格式,如果满足转换为jsonObj对象
  2. 读取配置表数据,使用flink-cdc读取
  3. 转换数据格式,转换到对应bean对象中
  4. 配置信息广播话,然后跟主流数据进行连接
  5. 筛选出需要的字段
  6. 根据表中的sink table字段来动态写出到对应的kafka主题中

核心代码如下

public static void main(String[] args) {
        new DwdBaseDb().start(10019, 4, "dwd_base_db", Constant.TOPIC_DB);
    }


    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
        //核心业务逻辑

        //1. 读取topic_db数据
        //stream.print();

        //2. 清洗过滤和转换, jsonObjStream是主流数据
        SingleOutputStreamOperator<JSONObject> jsonObjStream = filterJson(stream);

        //jsonObjStream.print();

        //3. 读取配置表数据,使用flink-cdc读取,读取配置文件时并发度最好为1
        DataStreamSource<String> tableProcessDwd = getTableProcessDwd(env);

        //tableProcessDwd.print();

        4. 转换数据格式 string -> TableProcessDwd -> broadcastStream,广播流数据
        SingleOutputStreamOperator<TableProcessDwd> processDwdStream = getProcessDwdStream(tableProcessDwd);

        MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor = new MapStateDescriptor<>("process_state", String.class, TableProcessDwd.class);
        BroadcastStream<TableProcessDwd> broadcastStream = processDwdStream.broadcast(mapStateDescriptor);

        //5. 连接主流和广播流,对主流数据进行判断是否需要保留
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcessDwd>> processStream = processBaseDb(jsonObjStream, broadcastStream, mapStateDescriptor);

        //processStream.print();

        //6. 筛选最后需要写出的字段
        SingleOutputStreamOperator<JSONObject> dataStream = filterColumns(processStream);

        //7. 通过sink_table的表名来动态写出到对应kafka主题
        //在setRecordSerializer()设置
        dataStream.sinkTo(FlinkSinkUtil.getKafkaSinkWithTopicName());

    }

gitee地址 :https://gitee.com/langpaian/gmall2023-realtime

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/270764.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

支付宝、学习强国小程序input、textarea数据双向绑定

前言 和 vue 的绑定有些区别&#xff0c;需要注意。直接 value"{{inputValue}}" 是无法双向绑定的。 正确思路 文档说的比较详细&#xff0c;不过没有组合使用的案例&#xff0c;需要自行理解。这里正确的方法是先用 value 绑定数据&#xff0c;再使用 onInput 事件…

(GCC) 库的操作

文章目录 预备静态库生成链接环境区别 动态库生成链接环境区别 END参考ar指令 预备 准备两个文件&#xff0c;以最简单的形式进行展示。 add.c int add(int x, int y) {return x y; }main.c 为了方便直接在头文件中声明函数 #include <stdio.h>extern int add(int,…

【C++初阶】九、STL容器中的string类(上)

相关代码gitee自取&#xff1a; C语言学习日记: 加油努力 (gitee.com) 接上期&#xff1a; 【C初阶】八、初识模板&#xff08;泛型编程、函数模板、类模板&#xff09;-CSDN博客 目录 一 . STL简介 什么是STL STL的版本 HP 原始版本&#xff1a; P.J. 版本&#xff1a; R…

HBase基础知识(四):HBase API

HBase还提供了API&#xff0c;我们可以通过编程的方式来进行对HBase的操作。 1. 环境准备 新建项目后在 pom.xml 中添加依赖&#xff1a; <dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifa…

2023年教程汇总 | 《小杜的生信笔记》

2023年总结 2023年即将结束&#xff0c;我们即将迎来2024年。2023年&#xff0c;我们做了什么呢&#xff1f;&#xff1f;这个是个值得深思的问题…? 12月份是个快乐且痛苦时间节点。前一段时间&#xff0c;单位需要提交2023年工作总结&#xff0c;真的是憋了好久才可以下笔…

重塑资产管理:三叠云助力企业高效运营

资产管理是企业管理中不可或缺的一环&#xff0c;它对企业的经营管理和决策起着至关重要的作用。资产管理涉及到企业的固定资产、无形资产、流动资产等各类资产&#xff0c;它们的管理情况影响着企业的经济效益和运营效率。因此&#xff0c;企业需要建立一套完善的资产管理体系…

Apache Commons Math: 面向Java的数学和统计库

第1章&#xff1a;引言 大家好&#xff0c;我是小黑&#xff0c;咱们今天要聊的是Apache Commons Math这个宝藏级的Java库。为啥说它是宝藏呢&#xff1f;因为它简直就是处理数学问题的瑞士军刀&#xff0c;无论你是要搞统计分析、数值计算&#xff0c;还是解决优化问题&#…

geyser互通服基岩版进不去

Java版需要在服务器安全组开通TCP端口&#xff08;如果有宝塔&#xff0c;也需要开通&#xff09; geyser下载好的安装运行也需要开通端口&#xff0c;但是它是UDP的&#xff08;但是我同时也开启了TCP&#xff0c;可能不需要&#xff1f; Java 版玩家隧道 Java 版玩家使用 T…

VMware安装linux系统二

1、设置光驱 1.1、编辑虚拟机设置 1.2、设置虚拟机镜像 1.3、设置好后开机 2、安装Linux系统 2.1、等待安装 2.2、开始安装 2.3、选择语言&#xff0c;我选择中文 2.4、本地化不用改 2.5、软件选择一定要选&#xff0c;否则就会是默认最小安装 2.6、我这里选择的是带GUI的&am…

Windows下安装Oracle19C

官网下载oracle19c 以及客户端 官网地址&#xff1a;Software Download | Oracle 这个是要登录账号的,没有的可以注册,登录上 这个时候在点开这个官网:Database Software Downloads | Oracle 往下面滑 点了之后有个界面注意事项勾上,点下载,你就会下载: 安装oracle19c 解压安…

WebRTC概念

定义 一个实时通信标准 通话原理 媒体协商 在WebRTC中&#xff0c;参与视频通讯的双方必须先交换SDP信息&#xff0c;获得一个都支持的编码格式 网络协商 目的&#xff1a;找到一条相互通讯的链路 做法&#xff1a;获取外网IP地址映射&#xff0c;通过信令服务器交换“网…

RK3588平台开发系列讲解(AI 篇)RKNN-Toolkit2 模型的加载转换

文章目录 一、Caffe 模型加载接口二、TensorFlow 模型加载接口三、TensorFlowLite 模型加载接口四、ONNX 模型加载五、DarkNet 模型加载接口六、PyTorch 模型加载接口沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 RKNN-Toolkit2 目前支持 Caffe、TensorFlow、Tensor…

【Java JVM】Java 实例对象的访问定位

Java 程序会通过栈上的 reference 数据来操作堆上的具体对象。 但是 reference 类型在《Java虚拟机规范》里面只规定了它是一个指向对象的引用, 并没有定义这个引用应该通过什么方式去定位, 访问到堆中对象的具体位置, 所以对象访问方式也是由虚拟机实现而定的&#xff0c;主流…

【MYSQL】MYSQL 的学习教程(七)之 慢 SQL 优化思

1. 慢 SQL 优化思路 慢查询日志记录慢 SQLexplain 分析 SQL 的执行计划profile 分析执行耗时Optimizer Trace 分析详情确定问题并采用相应的措施 1. 慢查询日志记录慢 SQL 如何定位慢SQL呢&#xff1f; 我们可以通过 慢查询日志 来查看慢 SQL。 ①&#xff1a;开启慢查询日志…

Django 访问前端页面一直在转异常:ReferenceError:axios is not defined

访问&#xff1a;http://127.0.0.1:8080/ my.html 一、异常&#xff1a; 二、原因 提示&#xff1a;axios找不到&#xff01;&#xff01; 查看代码<script src"https://unpkg.com/axios/dist/axios.min.js"></script>无法访问到官网 三、解决 Using j…

Opencv学习笔记(二)图像基本操作

图像基本操作 一、边界填充 二、图像融合 三、图像阈值 四、图像平滑 五、形态学预算 1、腐蚀操作 2、膨胀操作 3、开闭运算操作 4、梯度运算 5、顶帽运算 6、黑帽运算 一、边界填充 cv2.copyMakeBorder(img, top_size, bottom_size, left_size, right_size, borde…

鸿蒙基础-常用组件与布局(ArkTS)

实现“登录”页面 本节主要介绍“登录”页面的实现&#xff0c;页面使用Column容器组件布局&#xff0c;由Image、Text、TextInput、Button、LoadingProgress等基础组件构成。 // LoginPage.ets Entry Component struct LoginPage {...build() {Column() {Image($r(app.media…

信号优先级与安全性

问题 对于同一个进程&#xff0c;如果存在两个不同的未决实时信号&#xff0c;那么先处理谁&#xff1f; 信号优先级的概念 信号的本质是一种软中断 (中断有优先级&#xff0c;信号也有优先级) 对于同一个未决实时信号&#xff0c;按照发送先后次序递送给进程 对于不同的未…

K8S三台服务器一键部署总结

随着互联网、云计算技术的深入发展&#xff0c;为降低企业大规模云应用建设的难度和成本&#xff0c;支持云应用开发、运行与运维一体化的云应用平台软件应运而生。在数通家族中对企业集成套件的云平台开发、部署、管理、运维进行统一管理&#xff0c;实现数据集成和共享的平台…

使用Mecury人型机器人搭建VR遥操作控制平台!

概述 VR遥操作机械臂是一种将虚拟现实技术与机械臂控制相结合的系统&#xff0c;使用户可以通过虚拟现实设备操控和交互实际的机械臂。这种技术可以应用于多个领域&#xff0c;包括远程操作、培训、危险环境中的工作等。 双臂人形机器人是一种模拟人体上半身结构&#xff0c;包…
最新文章