Flink电商实时数仓(三)

DIM层代码流程图

维度层的重点和难点在于实时电商数仓需要的维度信息一般是动态的变化的,并且由于实时数仓一般需要一直运行,无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据,因此需要通过Flink-cdc实时监控MySql中的维度数据配置信息表,实时动态的发布广播信息。主流数据根据广播数据及时调整处理逻辑,并自动在HBase中创建相应的维度表和写入相应的维度数据。

  1. 消费Kafka ods业务主题数据
  2. 数据清洗:是否为JSON格式
  3. 使用flink-cdc读取监控配置表数据
  4. 在HBase中创建维度表
  5. 做成广播流
  6. 连接主流和广播流
  7. 筛选出需要写出的字段
  8. 写出到Hbase

在这里插入图片描述

整体架构

  • realtime-common模块
    • base: 所有Flink程序的基类,负责搭建Flink运行环境和设置并行度和检查点等相关参数。其中我们的数据来源也确定为Kafka,故数据源代码也写在这里。每个Flink程序的具体处理逻辑由handle()函数来负责处理。
    • bean:负责存放项目运行过程中需要用到的bean对象,比如当前flink-cdc程序中需要用到的TableProcessDim类,配置信息表对象。
    • constant:负责存放程序中需要使用到常量参数
    • function:负责存放一些通用的函数方法
    • util:一般存放和数据连接相关的工具类
    • test目录: 用来在写正式代码前测试连接是否通畅,数据是否可以正常发送。
  • realtime-dim模块
    • app:DimApp里面写的是dim层的具体实现,具体步骤如上述流程图所示。
    • function:负责存放数据处理的实现类,一般会继承相应的父类,在dim层可以直接调用这里的子类来实现父类接口,让dim层的代码逻辑更加清晰。
  • realtime-dwd模块:如上
  • realtime-dws模块:如上

在这里插入图片描述

数据清洗ETL

数据清洗,简单来说就是对数据进行简单的转换筛选。首先如果在转换过程中出现异常,直接过滤掉。注意这里无需抛出异常,因为如果throw a exception会导致整个程序异常终止,而在数据处理过程中出现部分数据格式错误而无法正常进行格式转换是很常见的,只需将异常信息打印到控制台即可。如果转换正常,再判断是否满足以下三个条件:

  1. 数据库名为gmall
  2. 数据类型不是bootstrap-start或者bootstrap-complete
  3. data字段不是null且长度不为0

Flink-cdc读取配置表的数据

Flink中获取数据主要有两个步骤:

  1. 获取相应的数据源Source
    • 注意:在构建Flink-cdc对应的MySQLSource时,tableList参数必须是库表.表名结构
  2. 调用env.fromSource()方法将数据源的发送过来的数据转换Ds数据流,在该方法中可以设置数据的水位线。
  3. 获取到数据后,建议先打印到控制台查看数据的具体结构。
public static MySqlSource<String> getMySqlSource(String databaseName, String tableName){
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username(Constant.MYSQL_USER_NAME)
                .password(Constant.MYSQL_PASSWORD)
                .databaseList(databaseName) // set captured database
                .tableList(databaseName+"."+tableName) // set captured table

                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .startupOptions(StartupOptions.initial())
                .build();

        return mySqlSource;
    }

在HBase中创建维度表

数据库中的配置表数据经过Flink-cdc处理后发送到这里是json格式的字符串,这里根据数据的四种类型op在HBase中进行不同的建表删表操作,同时对数json字符数据进行转换映射处理,转换为对应的bean对象数据流。这里一个数据产生一个处理后的对象,故使用Map算子或FlatMap算子都可以。

  • op类型
    • d 代表delete,需要删除before字段中对应的表
    • c 代表create,r 代表 read,需要创建after字段中对应的表
    • u 代表update,需要先删除掉旧表,然后根据新表的字段创建一个新表
  • 创建HBase连接,创建连接是很耗费资源的行为,因此新建连接和关闭连接需要写在open和close方法中
  • HBase中想要对表进行创建和删除等DDL操作,都由Admin对象管理;如果需要对数据进行插入删除等DML操作,需要创建Table对象。详细操作细节请看相应代码即可。
public static SingleOutputStreamOperator<TableProcessDim> createHbaseTable(DataStreamSource<String> mysqlSource) {
        SingleOutputStreamOperator<TableProcessDim> createHBaseTable = mysqlSource.flatMap(
                new RichFlatMapFunction<String, TableProcessDim>() {


                    public Connection connection ;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //获取连接
                        connection = HBaseUtil.getHBaseConnection();
                    }

                    @Override
                    public void close() throws Exception {
                        //关闭连接
                        HBaseUtil.closeHBaseConn(connection);
                    }

                    @Override
                    public void flatMap(String s, Collector<TableProcessDim> out){
                        //使用读取的配置表数据,到HBase中创建与之对应的表格

                        try {
                            JSONObject jsonObject = JSONObject.parseObject(s);
                            String op = jsonObject.getString("op");
                            TableProcessDim dim;//维度表
                            if ("d".equals(op)) {
                                dim = jsonObject.getObject("before", TableProcessDim.class);
                                dim.setOp(op);
                                //当配置表发送一个D类型的数据,对应的HBase需要删除一张维度表
                                deleteTable(dim);
                            } else if ("c".equals(op) || "r".equals(op)) {
                                dim = jsonObject.getObject("after", TableProcessDim.class);
                                createTable(dim);
                                dim.setOp(op);
                            } else {//op = 'u', 即修改
                                dim = jsonObject.getObject("after", TableProcessDim.class);
                                deleteTable(dim);
                                createTable(dim);
                            }
                            dim.setOp(op);
                            out.collect(dim);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    private void createTable(TableProcessDim dim) {
                        String sinkFamily = dim.getSinkFamily();
                        String[] split = sinkFamily.split(",");
                        try {
                            HBaseUtil.createHBaseTable(connection,Constant.HBASE_NAMESPACE,dim.getSinkTable(),split);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }

                    private void deleteTable(TableProcessDim dim) {
                        try {
                            HBaseUtil.dropHBaseTable(connection, Constant.HBASE_NAMESPACE, dim.getSinkTable());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
                }
        );
        return createHBaseTable;
    }

主流连接广播流

从Flink-cdc获取的数据(gmall2023_config)是作为一个参数来控制我们对于主流即ODS层数据(gmall数据库的业务数据)的处理逻辑。gmall2023)_config库中的Table_process_dim表决定了后续程序筛选哪个表作为维度信息,并且定义了表中有哪些字段。

  1. 转换为广播流只需要调用上述得到的TableProcessDimStream的broadcast方法
  2. 使用的主流(gmall业务数据)的connect方法,得到一个连接流,然后对连接流进行process处理。
  3. 创建BroadcastProcessFunction,在里面分别有两个函数
    • processBroadcastElement():处理广播流数据
    • processElement():处理主流数据
  4. 广播流处理逻辑:
    • 读取广播状态
    • 将配置表信息写到广播状态中
    • 根据广播状态数据的op对状态做相应的修改
  5. 主流处理逻辑:
    • 查询广播状态,判断当前数据对应的表是否存在于状态中
    • 如果数据比状态来的更早,造成状态为空,需要对状态做预处理(提前从mysql中读取维表配置表信息)
    • 如果根据当前表的表名查询的状态不为空,说明该表为维度数据,使用收集器收集起来。

筛选出需要的字段

在这里插入图片描述
在维度配置信息表中的sink_column字段里定义了维度表需要的字段,使用filter算子对JsonObj里面的data字段进行过滤即可获取到想要的字段数据。

写出到Hbase

过滤后的数据流调用它的addSink方法,方法中需要传入一个SinkFunction接口类。该接口需要实现三个方法分别是:

  • open方法:获取HBase连接
  • close方法:关闭HBase连接
  • invoke方法:写入数据时调用的方法,根据jsonObj中的type做不同处理,如果是delete,需要删除对应的维度表数据;否则都是直接覆盖写入。

代码的Gitee仓库地址:https://gitee.com/langpaian/gmall2023-realtime.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/262105.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开发医疗陪诊系统源码:搭建安全高效的医患互动平台

本文将深入探讨开发医疗陪诊系统的源码&#xff0c;以及如何搭建一个安全高效的医患互动平台。 一、引言 医疗陪诊系统旨在通过技术手段&#xff0c;缩短患者与医生之间的距离&#xff0c;提供更快速、便捷的医疗服务。 二、技术选型 2.1前端技术 在搭建医疗陪诊系统的前…

Redis-Day3实战篇-商户查询缓存(缓存的添加和更新, 缓存穿透/雪崩/击穿, 缓存工具封装)

Redis-Day3实战篇-商户查询缓存 什么是缓存添加Redis缓存业务流程项目实现练习 - 给店铺类型查询业务添加缓存 缓存更新策略最佳实践方案案例 - 给查询商铺的缓存添加超时剔除和主动更新 缓存穿透/雪崩/击穿缓存穿透概述项目实现 - 商铺查询缓存 缓存雪崩缓存击穿概述互斥锁逻辑…

HBase基础知识(二):HBase集群部署、HBaseShell操作

1. HBase安装部署 1.1 Zookeeper正常部署 首先保证Zookeeper集群的正常部署&#xff0c;并启动之&#xff1a; 创建集群启动脚本&#xff1a; #!/bin/bash case $1 in "start"){ for i in hadoop100 hadoop101 hadoop102 do echo----------zookeeper $i 启动----…

vue2 之 实现pdf电子签章

一、前情提要 1. 需求 仿照e签宝&#xff0c;实现pdf电子签章 > 拿到pdf链接&#xff0c;移动章的位置&#xff0c;获取章的坐标 技术 : 使用fabric pdfjs-dist vuedraggable 2. 借鉴 一位大佬的代码仓亏 : 地址 一位大佬写的文章 &#xff1a;地址 3. 优化 在大佬的代码…

【算法与数据结构】135、LeetCode分发糖果

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;本题的思路是要相比较一边&#xff0c;然后在比较另外一边&#xff0c;左右两边一起比较的代码非常难写…

​ SK Ecoplant借助亚马逊云科技,海外服务器为环保事业注入新活力

在当今全球面临着资源紧缺和环境挑战的大背景下&#xff0c;数字技术所依赖的海外服务器正成为加速循环经济转型的关键利器。然而&#xff0c;很多企业在整合数字技术到运营中仍然面临着一系列挑战&#xff0c;依然存在低效流程导致的不必要浪费。针对这一问题&#xff0c;SK E…

使用HTTP协议有哪些风险?HTTP与HTTPS的区别是什么

作为两种常见的网络协议&#xff0c;HTTP和HTTPS都是用于在浏览器和服务器之间传输数据的。然而在保障数据安全性方面&#xff0c;HTTPS远远优于HTTP。在网络安全愈发重要的当下&#xff0c;HTTP协议的不安全性使得其逐渐被淘汰弃用。那么使用HTTP协议有哪些风险呢&#xff1f;…

开发知识点-HTML/JavaScript

HTML/JavaScript xlinksvgviewBoxuse基础预热与语法基础知识js 如何运行页面适用js 及输出 面向对象抽奖功能 json 支持 字符串转数组数组转字符串数组元素删除长度0位添加一个元素// 表示在下标为1处添加一项tttarray.splice(1,0,ttt)//[123,ttt,456]// 数组是否包含某个元素a…

KoPA: Making Large Language Models Perform Better in Knowledge Graph Completion

本来这个论文用来组会讲的&#xff0c;但是冲突了&#xff0c;没怎么讲&#xff0c;记录一下供以后学习。 创新点 按照我的理解简单概述一下这篇论文的创新点 提出使用大模型补全知识图谱&#xff0c;并且融合知识图谱的结构信息提出一个新的模型KoPA模型&#xff0c;采用少…

Django 简单图书管理系统

一、图书需求 1. 书籍book_index.html中有超链接&#xff1a;查看所有的书籍列表book_list.html页面 2. 书籍book_list.html中显示所有的书名&#xff0c;有超链接&#xff1a;查看本书籍详情book_detail.html(通过书籍ID)页面 3. 书籍book_detail.html中书的作者和出版社&…

说说 style gan 中的感知路径长度(Perceptual Path Length)

我在之前的博库中介绍了 style gan 的基本原理&#xff0c;原文中有提出感知路径长度&#xff08;Perceptual Path Length&#xff09;的概念。这是一种评价生成器质量的方式。 PPL基本思想&#xff1a;给出两个随机噪声 z 1 , z 2 ​ &#xff0c;为求得两点的感知路径长度PPL…

竞赛保研 基于Django与深度学习的股票预测系统

文章目录 0 前言1 课题背景2 实现效果3 Django框架4 数据整理5 模型准备和训练6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于Django与深度学习的股票预测系统 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff…

WMS仓储管理系统的基本架构和功能模块

在数字化时代&#xff0c;WMS仓储管理系统解决方案已经成为了企业物流管理的重要组成部分。WMS仓储管理系统通过先进的信息化技术&#xff0c;实现了对仓库的全面管理&#xff0c;提高了仓库的运营效率&#xff0c;降低了成本。本文将详细解析WMS仓储管理系统的基本架构和功能模…

C#合并多个Word文档(微软官方免费openxml接口)

g /// <summary>/// 合并多个word文档&#xff08;合并到第一文件&#xff09;/// </summary>/// <param name"as_word_paths">word文档完整路径</param>/// <param name"breakNewPage">true(默认值)&#xff0c;合并下一个…

深信服技术认证“SCSA-S”划重点:命令执行漏洞

为帮助大家更加系统化地学习网络安全知识&#xff0c;以及更高效地通过深信服安全服务认证工程师考核&#xff0c;深信服特别推出“SCSA-S认证备考秘笈”共十期内容&#xff0c;“考试重点”内容框架&#xff0c;帮助大家快速get重点知识~ 划重点来啦 *点击图片放大展示 深信服…

深入理解 Spring Boot:核心知识与约定大于配置原则

深入理解 Spring Boot&#xff1a;核心知识与约定大于配置原则 简单说一下为什么要有 Spring Boot&#xff1f; 因为 Spring 的缺点。 虽然 Spring 的组件代码是轻量级的&#xff0c;但它的配置却是重量级的(需要大量 XML 配置) 为了减少配置文件&#xff0c;简化开发 Spri…

【Pytorch】学习记录分享6——PyTorch经典网络 ResNet与手写体识别

【Pytorch】学习记录分享5——PyTorch经典网络 ResNet 1. ResNet &#xff08;残差网络&#xff09;基础知识2. 感受野3. 手写体数字识别3. 0 数据集&#xff08;训练与测试集&#xff09;3. 1 数据加载3. 2 函数实现&#xff1a;3. 3 训练及其测试&#xff1a; 1. ResNet &…

JFreeChart 生成图表,并为图表标注特殊点、添加文本标识框

一、项目场景&#xff1a; Java使用JFreeChart库生成图片&#xff0c;主要场景为将具体的数据 可视化 生成曲线图等的图表。 本篇文章主要针对为数据集生成的图表添加特殊点及其标识框。具体包括两种场景&#xff1a;x轴为 时间戳 类型和普通 数值 类型。&#xff08;y轴都为…

阿里云林立翔:基于阿里云 GPU 的 AIGC 小规模训练优化方案

云布道师 本篇文章围绕生成式 AI 技术栈、生成式 AI 微调训练和性能分析、ECS GPU 实例为生成式 AI 提供算力保障、应用场景案例等相关话题展开。 生成式 AI 技术栈介绍 1、生成式 AI 爆发的历程 在 2022 年的下半年&#xff0c;业界迎来了生成式 AI 的全面爆发&#xff0c…

RAG实战案例:如何基于 LangChain 实现智能检索生成系统

在人工智能领域&#xff0c;如何有效结合大型语言模型&#xff08;LLM&#xff09;的常识性知识与特定的专有数据&#xff0c;一直是业界探索的热点。微调&#xff08;Fine-tuning&#xff09;与检索增强生成&#xff08;Retrieval-Augmented Generation&#xff0c;简称RAG&am…