Flink实时电商数仓(十)

common模块回顾

  1. app
    • BaseApp: 作为其他子模块中使用Flink - StreamAPI的父类,实现了StreamAPI中的通用逻辑,在其他子模块中只需编写关于数据处理的核心逻辑。
    • BaseSQLApp: 作为其他子模块中使用Flink- SQLAPI的父类。在里面设置了使用SQL API的环境、并行度、检查点等固定逻辑。
  2. bean:存放其他子模块中使用到的javaBean对象,因为如果一直使用jsonObject对象调用数据的话,需要使用类似getString("字段名")的方式,没有直接使用javaBean对象那么方便。
  3. constant
    • 存储字符串常量
    • 为了保证一致性,如果某个常量修改时,只需在这里修改即可对整个项目进行修改
  4. function
    • DorisMapFunction:将javaBean对象转换为对应的json字符串对象,并且将驼峰式命名方式修改为蛇形命名方式。便于写入doris。
  5. util
    • DateFormateUtil
    • FlinkSinkUtil
    • FlinkSourceUtil
    • HBaseUtil
    • IkUtil
    • JdbcUtil
    • SQLUtil
      • getUpsertKafakaSQL: 一定要声明主键,支持撤回流
      • getDorisSinkSQL: 用于写入Doris

dim层回顾

  • Flink-cdc监控mysql中的维度配置表
  • 将监控的数据流做成广播流
  • 将广播流和读取数据的主流进行connect
  • 主流数据根据广播流的配置信息进行分流,注意需要先提前缓存一次配置表信息
  • 达到动态拆分数据表的效果

dwd层FlinkSQL回顾

  • 注意join时会将所有数据都存储到内存中,需要考虑设置TTL
  • 大表join小表时,可以考虑使用lookup join
  • 如果数据流有明确的先后关系时,考虑使用Interval join

在支付成功模块,由于订单详情表处理时已经存在撤回流,但支付成功模块也是使用left join方式调用订单详情数据,会导致产生两次撤回流。在后续dws层处理时,要注意对数据进行去重过滤。

dws层回顾

  • 如何判断使用FlinkSQL还是StreamAPI
    • 如果比较标准化, 比如简单的开窗聚合,一般使用FlinkSQL
    • 如果需要使用状态处理数据,比如判断是否为独立用户,使用StreamAPI

交易域sku粒度订单下单各窗口汇总

  • 需求分析:从Kafka订单明细主题读取数据,过滤null数据并按照唯一键对数据去重,按照SKU维度分组,统计原始金额、活动减免金额、优惠券减免金额和订单金额,并关联维度信息,将数据写入Doris交易域SKU粒度下单各窗口汇总表

  • 思路分析:

    • 方案一:按照订单ID进行分组,根据业务逻辑设置定时器取最后一个数据进行发送
    • 方案二:将度量值存放到状态中,每次新数据到达时,将新的度量值减去状态中的度量值
  • 具体实现

    • 因为需要使用状态,故使用BaseApp; 设置端口号10029,并发度4,消费者组为类名,消费者主题名称为dwd订单详情
    • 读取dwd下单主题数据, stream.print()
    • 过滤清洗:
      • 去掉null数据, stream.flatMap(new FlatMapFunction<>())
      • ts: 水位线,不能为空;进行位数的修正,如果是10位的,使用 jsonObj.put("ts", ts*1000)
      • id: keyby的关键字,不能为空
      • sku_id: group by的粒度关键字,也不能为空
    • 添加水位线
      • 网络延迟5L
      • 添加数据的泛型,提取数据中的ts,作为水位线(注意观察ts的位数,需要为13位,毫秒级)
    • 修正度量值,转换数据结构
      • 使用id关键字进行分组
      • 使用process算子中的状态来进行处理stream.process(new KeyedProcessFunction<>),返回值为对应的javabean对象
      • 在状态中存储上一次的度量值大小,只保存30秒
      • processElement()方法中获取状态中的度量值,使用前需要判空,如果为空设置为0,之后才能进行数值计算。
      • 创建对应的bean对象,度量值都减去状态中的度量值和更新状态中当前的度量值
    • 分组开窗聚合
      • 使用skuId进行keyby
      • 分组后使用window算子进行开窗,设置窗口时间,注意Time属于org.apache.flink.streaming.api.windowing.time.Time.seconds()
      • 使用reduce算子进行聚合计算, 聚合时需要累积所有度量值
      • new ProcessWindowFunction()获取窗口信息, startTime, EndTime, curTime, 获取到后写入javaBean对象中
    • 关联维度信息
      • 先分组聚合再关联维度信息的原因:关联维度信息需要join操作,是很耗费性能的大操作。先聚合数据能大幅度减少数据量。
      • 启动HBase,查看对的sku_info表中是否存储着对应的维度信息
      • 获取外部连接,需要使用生命周期方法(open,close在整个算子执行过程中只运行一次);对应的关联维度信息,即RichMapFunction()
      • map方法中使用HBase的API读取表格数据,使用读取到的字段补全原本的信息
    • 创建HBase的API:读取表格数据 get
      • 获取table
      • 创建get对象
      • 调用get方法
      • 获取数据写入jsonObj
    • 写出到Doris

维度关联优化

  1. 旁路缓存:独立缓存服务有(redis, memcache).
    在这里插入图片描述
  • 使用旁路缓存时要注意保持数据的一致性,如果数据发生修改和删除,直接删除redis中的数据。

同步+旁路缓存模式

  1. 引入Jedis相关依赖
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
  1. 创建Redis工具类RedisUtil
  2. 在RichMapFunction中的open和close方法中获取和关闭HBase和Redisd的连接。
  3. 拼接对应的redisRowKey
  4. 读取Redis缓存的数据,jsonObj的字符串
  5. 判断redis读取到的数据是否为空
    • 没有数据:需要读取HBase;jsonObj = HBaseUtil.getCells(), 读取到数据后,使用jedis.setex()存储到redis
    • redis有缓存,直接返回
  6. 进行维度关联

Dim层写入HBase修正

  • 在dim层将数据写入HBase时,需要同时获取Redis的连接。
  • 判断redis中的缓存是否发生变化
    • 判断数据类型是修改或删除时,删除Redis中对应的数据
    • 拼写数据的rowkey
    • 使用jedis.del(rediskey)来删除

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/286688.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

桌面天气预报软件 Weather Widget free mac特点介绍

Weather Widget free for Mac多种吸引人的小部件设计可供选择&#xff0c;可以随时了解天气&#xff01;还可以在Dock和菜单栏中为您提供简短的天气预报或当前状况的概述。 Weather Widget free for Mac软件介绍 始终在桌面上使用时尚的天气小部件来随时了解天气&#xff01;多…

jenkins +jmeter 报告乱码解决

问题&#xff1a;生产报告会乱码的问题&#xff0c;一般是有编码格式引起的。我遇到的问题是&#xff0c;jmeter需要读取csv的数据作为参数。但是我们并不知道csv保存是什么编码格式&#xff0c;有可能不是utf-8的编码格式&#xff0c;所以会导致中文乱码的问题 解决方案&#…

安装Unity详细教程(如何获取免费个人版许可证)

文章目录 下载Unity Hub安装Unity Hub登录获取免费个人版许可证安装Unity编辑器卸载Unity编辑器 下载Unity Hub 首先&#xff0c;我们需要到Unity的官网下载Unity Hub&#xff1a;Unity CN 我们可以在Unity Hub上管理我们的编辑器版本和项目文件。 安装Unity Hub 然后安装Un…

kubeadm创建k8s集群

kubeadm来快速的搭建一个k8s集群&#xff1a; 二进制搭建适合大集群&#xff0c;50台以上。 kubeadm更适合中下企业的业务集群。 部署框架 master192.168.10.10dockerkubelet kubeadm kubectl flannelnode1192.168.10.20dockerkubelet kubeadm kubectl flannelnode2192.168.1…

selenium实现UI自动化

1.selenium简介 selenium是支持web浏览器自动化的一系列工具和库的综合项目。具有支持linux、windows等多个平台&#xff0c;支持Firefox、chrome等多种主流浏览器&#xff1b;支持Java、Python等多种语言。 主要包括的三大工具有&#xff1a; WebDriver&#xff08;rc 1.0)、…

JavaScript:作用域变量回收

JavaScript&#xff1a;作用域&变量回收 局部作用域函数作用域块作用域 全局作用域作用域链变量在浏览器模型中的位置浏览器模型全局变量的产生情况直接赋值全局对象与var全局对象的区别 垃圾回收机制引用计数法标记清除法 闭包变量提升&函数提升 作用域规定了变量能够…

k8s基础架构

k8s基础架构 创建pod流程 &#xff08;1&#xff09;用户通过kubectl向api-server发起创建pod请求&#xff1b; &#xff08;2&#xff09;apiserver通过对应的kubeconfig进行认证&#xff0c;认证通过后将yaml中的po信息存到etcd&#xff1b; &#xff08;3&#xff09;Contr…

图像分割-漫水填充法 floodFill

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 本文的C#版本请访问&#xff1a;图像分割-漫水填充法 floodFill (C#&#xff09;-CSDN博客 FloodFill方法是一种图像处理算法&#…

软件测试/测试开发丨Vuetify框架的使用

介绍 Vuetify 是一个基于 Vue.js 精心打造 UI 组件库&#xff0c;整套 UI 设计为 Material 风格。能够让没有任何设计技能的开发者创造出时尚的 Material 风格界面。 为什么要使用Vuetify框架 所有组件遵从 Material Design 设计规范&#xff0c;UI 体验非常优秀&#xff0c…

计算机毕业设计-----ssm+mysql实现的JavaWeb酒店管理系统

项目介绍 本项目为基于ssmmysql实现的JavaWeb酒店管理系统; 主要功能包括&#xff1a; 管理员登录,收入统计,客房管理,商品管理,客房预订,住宿登记,财务统计,旅客管理,接待对象管理等功能。 环境需要 1.运行环境&#xff1a;最好是java jdk 1.8&#xff0c;我们在这个平台上…

UDS诊断(ISO14229-1) 22服务

文章目录 功能简介应用场景请求和响应1、请求2、子功能3、肯定响应4、否定响应 报文示例常用 DIDUDS中常用 NRC参考 功能简介 22服务&#xff0c;即 ReadDataByIdentifier&#xff08;按标识符读取数据&#xff09;服务&#xff0c;该服务允许客户端请求读取服务器中通过一个或…

数据结构初阶之栈和队列(C语言版)

数据结构初阶之栈和队列&#xff08;C语言版&#xff09; ✍栈♈栈的结构设计♈栈的各个接口的实现&#x1f47a;StackInit(初始化)&#x1f47a;push&#xff08;入栈&#xff09;&#x1f47a;pop&#xff08;出栈&#xff09;&#x1f47a;获取栈顶元素&#x1f47a;获取栈中…

案例081:基于微信小程序的移动平台的远程在线诊疗系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

2024年金融变革RWA的7大展望

作者&#xff1a;SanjayRoofstock onChain的副总裁 编译&#xff1a;秦晋 碳链价值 稳定币、代币化国债、去中心化私人信贷、实物支持的NFTs、气候和再生金融DeFi - 这些只是未来一年将重塑资本市场的部分趋势。 在不断变化的金融格局中&#xff0c;过去两年给我们带来了一系列…

玩转贝启科技BQ3588C开源鸿蒙系统开发板 —— 开发板详情与规格

本文主要参考&#xff1a; BQ3588C_开发板详情-开源鸿蒙技术交流-Bearkey-开源社区 BQ3588C_开发板规格-开源鸿蒙技术交流-Bearkey-开源社区 厦门贝启科技有限公司-Bearkey-官网 1. 开发板详情 RK3588 核心板是一款由贝启科技自主研发的基于瑞芯微 RK3588 AI 芯片的智能核心…

专题一:递推与递归

递归 例题 递归实现指数型枚举 从 1∼n这 n个整数中随机选取任意多个&#xff0c;输出所有可能的选择方案。 输入格式 输入一个整数 n。 输出格式 每行输出一种方案。 同一行内的数必须升序排列&#xff0c;相邻两个数用恰好 1 个空格隔开。 对于没有选任何数的方案&#xff0c…

认识SpringBoot项目中的Starter

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; 循序渐进学SpringBoot ✨特色专栏&…

webshell检测方式深度剖析 --- Pixy系列二(数据流分析)

开篇 书接上文&#xff0c;这次我们来聊聊数据流分析&#xff0c;数据流分析的内容非常广泛&#xff0c;我们力求深入浅出通俗易懂&#xff0c;在简短的篇幅内将这一概念描述清楚。 简单来说&#xff0c;数据流分析是一种用来获取相关数据沿着程序执行路径流动的信息分析技术…

ROS学习笔记(7)进一步深入了解ROS第一步

0.前提 最近在学习宾夕法尼亚大学工程学院的ROS公开课&#xff0c;在尽力的去融入全英语的环境&#xff08;哪怕我的英语水准并不是很高&#xff09;。既然是在学习&#xff0c;笔记也就是必须的了&#xff0c;当然这些笔记都是课程当中提出的问题&#xff0c;我去寻找后得出的…

EDI 项目推进流程

EDI 需求确认 交易伙伴发来EDI对接邀请&#xff0c;企业应该如何应对&#xff1f; 首先需要确认EDI需求&#xff0c;通常包括传输协议和报文标准以及传输的业务单据类型。可以向交易伙伴发送以下内容&#xff1a; &#xff08;中文版&#xff09; 与贵司建立EDI连接需要使用…
最新文章