Flink 实时加购数据“维表补全”实战:从 Kafka 到 HBase 再到 Redis 的完整链路

一、业务背景

在电商实时运营场景中,加购行为(AddShoppingCart) 是最核心的用户行为之一,每秒钟可能产生数万条加购事件。以某头部电商平台为例,大促期间加购QPS可突破50万。
为了支持实时推荐、实时营销、实时大屏等业务,我们需要在毫秒级完成以下动作:

  1. 消费 Kafka 中的加购事件(事件包含:user_id, sku_id, timestamp等基础字段);
  2. 根据事件中的 sku_idHBase 维表 补全商品维度(品牌、类目、价格带等12个关键维度);
  3. 将补全后的事件写入 Redis(供推荐 / 大屏 / 算法实时调用),同时支持3种存储模式:
    • Hash结构:适合完整事件存储
    • String结构:适合简单KV场景
    • TTL设置:自动过期避免数据堆积

本文用一套可落地的 Flink Java 工程 演示整条链路,代码已在线上跑通,拿来即可用。方案经过618/双11大促验证,P99延迟稳定在80ms以内。


二、整体架构

Kafka Topic:  dwd_add_cart_event(分区数=32,副本数=3)↓ Flink Source(并行度=16)
CartEvent POJO(基础字段:user_id, sku_id, ts)↓ Async I/O 维表补全(并发度=100)
AsyncGoodsDimLookupFunction → HBase(RegionServer=20节点)↓ 补全后 POJO
CartEvent(扩展字段:brandId, cate1, cate3, priceRange等)↓ Sink(批量写入)
Flink2Redis → Redis Cluster(16分片,32G内存/节点)

关键设计点:

  1. 异步化:使用Flink Async I/O避免同步阻塞
  2. 多级缓存:本地Guava Cache + Redis缓存
  3. 弹性扩展:各组件均可水平扩容

从Kafka到Redis的完整数据处理链路:

Mermaid 流程图

JSON事件
CartEvent POJO
HBase查询
Redis缓存
补全维度
完整事件
String/Hash模式
监控指标
Kafka: dwd_add_cart_event
Flink Source
Async I/O 维表补全
HBase: goods_dim表
Redis Cluster
Flink Sink
Prometheus+Grafana

流程说明

  1. 数据源层
    Kafka原始事件通过user_id+sku_id+timestamp组成基础事件体,分区数需根据QPS设置(建议分区数=预期峰值QPS/5000)

  2. 维表补全层
    Async I/O采用有序模式确保事件顺序性,通过capacity参数控制并发请求量(公式:capacity = 并行度 × 每个并行任务最大并发

  3. 缓存策略
    本地Guava Cache采用最大条目+过期时间双重控制:

    CacheBuilder.newBuilder().maximumSize(100_000).expireAfterWrite(10, TimeUnit.MINUTES).build();
    
  4. 写入优化
    Redis Sink使用Pipeline批量写入,批量大小建议值:

    • 常规场景:50-100条/批次
    • 大促场景:100-200条/批次
      需根据redis.cluster-timeout配置调整(默认2秒)
  5. 容错机制

    • HBase查询失败时自动重试3次
    • Redis写入失败进入侧输出流后续补偿
    • Checkpoint间隔设置为30秒(状态后端建议RocksDB)

三、核心代码解读

3.1 事件实体 CartEvent

// 使用lombok简化代码
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CartEvent {// 基础字段(来自Kafka原始事件)private String userId;private String skuId;private long timestamp;// 维表补全字段(来自HBase)private String brandId;private String brandName;private String cate1; // 一级类目private String cate2;private String cate3;private String priceRange; // 价格带(0-100,100-300等)// 业务标记字段private boolean isNewUser;private int userLevel;
}

3.2 维表查询 AsyncGoodsDimLookupFunction

核心优化点:

  1. 三级缓存设计

    • 一级:本地Guava Cache(10分钟过期)
    • 二级:Redis集群缓存(1小时过期)
    • 三级:HBase源数据
  2. 异步查询逻辑

@Override
public void asyncInvoke(String skuId, ResultFuture<CartEvent> resultFuture) {// 1. 先查本地缓存CartEvent cached = localCache.getIfPresent(skuId);if (cached != null) {resultFuture.complete(Collections.singleton(cached));return;}// 2. 异步查RedisredisClient.getAsync(skuId).thenAccept(redisValue -> {if (redisValue != null) {// 命中Redis缓存CartEvent event = JSON.parseObject(redisValue, CartEvent.class);localCache.put(skuId, event);resultFuture.complete(Collections.singleton(event));} else {// 3. 查HBaseCompletableFuture.supplyAsync(() -> hbaseQuery(skuId)).thenAccept(hbaseResult -> {// 双写缓存redisClient.setex(skuKey, 3600, JSON.toJSONString(hbaseResult));localCache.put(skuId, hbaseResult);resultFuture.complete(Collections.singleton(hbaseResult));});}});
}

3.3 Redis Sink Flink2Redis

支持多种写入策略:

// String模式
jedis.set(key, value);
// Hash模式
jedis.hset(hashKey, field, value);
// 批量模式
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < batchSize; i++) {pipeline.set(key[i], value[i]);
}
pipeline.sync();
// TTL设置
if (expiration > 0) {jedis.expire(key, expiration);
}

四、Flink Job 主类 FlinkDemo

完整作业流程:

public class FlinkCartJob {public static void main(String[] args) {// 1. 初始化配置Configuration config = loadConfig("config.properties");// 2. 构建SourceKafkaSource<CartEvent> source = KafkaSource.<CartEvent>builder().setBootstrapServers(config.get("bootstrapServers")).setTopics(config.get("inputTopic")).setGroupId(config.get("groupId")).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new KafkaToCartEvent()).build();// 3. 构建处理流水线DataStream<CartEvent> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 4. 异步维表补全DataStream<CartEvent> enrichedStream = AsyncDataStream.orderedWait(stream,new AsyncGoodsDimLookupFunction(config),5000, // 超时5秒TimeUnit.MILLISECONDS,100); // 并发100// 5. Sink处理enrichedStream.addSink(new RedisSinkFunction(config));// 6. 监控指标输出enrichedStream.print();env.execute("RealTime Cart Event Processing");}
}

五、配置说明

完整配置项示例:

# Kafka配置
bootstrapServers=kafka1:9092,kafka2:9092,kafka3:9092
inputTopic=dwd_add_cart_event
groupId=flink-add-cart-dim-01
auto.offset.reset=latest# HBase配置
hbase.zookeeper.quorum=zk1,zk2,zk3
hbase.table=goods_dim
hbase.cache.size=100000
hbase.cache.expire=600# Redis配置
redis.cluster=true
redis.nodes=redis1:6379,redis2:6379,redis3:6379
redis.password=xxxxxx
redis.database=0
redis.mode=hash  # string/hash
redis.expiration=1800
redis.pipeline.size=50  # 批量写入大小# 性能参数
async.timeout=5000
async.capacity=100

六、性能 & 稳定性要点

维度优化方案实现细节
并发控制Async I/O + 动态并发调节根据Kafka lag自动调整并发度
缓存策略多级缓存 + 预加载冷启动时批量加载热点商品维度
容错机制超时降级 + 熔断HBase超时后返回部分数据
资源隔离独立Slot资源池避免与其他作业资源竞争
监控告警Prometheus + Grafana实时监控P99延迟和缓存命中率

七、线上效果

某电商平台上线后关键指标:

指标日常值大促峰值
处理QPS8万/秒52万/秒
P99延迟65ms78ms
HBase查询量5千/秒2万/秒
Redis命中率97.3%95.8%
系统可用性99.99%99.97%

八、结语

本文方案已在多个电商平台落地,主要优势:

  1. 全链路优化:从Kafka消费到Redis写入的完整闭环
  2. 弹性扩展:各组件均可独立扩容,支持千万级QPS
  3. 生产就绪:包含监控、告警、容错等企业级特性
  4. 灵活配置:支持业务字段动态扩展和多存储模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/2832.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】二叉树的顺序存储、堆的实现及其应用:堆排序与Top-K问题

二叉树的顺序存储、堆的实现及其应用&#xff1a;堆排序与Top-K问题 ✨前言&#xff1a;在上一节【树与二叉树】中&#xff0c;我们已经了解了二叉树的基本结构与存储方式。 本篇文章将更进一步&#xff0c;重点介绍 二叉树的顺序结构&#xff0c;并在此基础上引出一个重要的数…

SpringBoot 快速上手:从环境搭建到 HelloWorld 实战

在 Java 开发领域&#xff0c;Spring 框架占据着举足轻重的地位&#xff0c;但它复杂的配置曾让不少开发者望而却步。SpringBoot 的出现&#xff0c;如同为 Spring 框架装上了 “加速器”&#xff0c;以 “约定大于配置” 的理念简化了开发流程。本文将从环境准备、Maven 配置入…

一键部署开源 Coze Studio

文章目录一、简介1、什么是 Coze Studio2、参考地址二、安装部署1、安装docker2、安装git3、下载core4、配置公网可用5、登录成功一、简介 1、什么是 Coze Studio Coze Studio 是一站式 AI Agent 开发工具。提供各类最新大模型和工具、多种开发模式和框架&#xff0c;从开发到…

墨刀原型设计工具操作使用指南及实践操作

壹、墨刀原型设计工具操作使用指南 一、基础入门 1. 软件版本与环境要求 版本区别&#xff1a; 免费版&#xff1a;支持 3 个项目&#xff0c;单项目最多 20 页&#xff0c;基础组件与交互&#xff0c;团队成员≤5 人&#xff1b;专业版&#xff08;付费&#xff09;&#x…

博士招生 | 美国圣地亚哥州立大学 Yifan Zhang 课题组博士招生,AI 安全领域顶尖平台等你加入!

内容源自“图灵学术博研社”gongzhonghao学校简介圣地亚哥州立大学&#xff08;San Diego State University, SDSU&#xff09;是美国加州南部久负盛名的公立研究型大学。学校坐落于科技产业高度活跃的南加州地区&#xff0c;与本地软件、电信、生物科技、国防及清洁能源等领域…

用vscode使用git工具

基础用法步骤一&#xff1a;打开vscode的git可视化工具步骤二&#xff1a;点击初始化仓库步骤三&#xff1a;选择要加入缓存区的文件注意&#xff1a;这里你可以选择自己想要的文件进行添加。如果想取消缓存区的文件&#xff0c;这里也可以进行取消提交。步骤四&#xff1a;提交…

portswigger labs XXE漏洞利用实战

lab1 利用外部实体注入获取文件解决此 lab 需要读取到/etc/passwd<!DOCTYPE test [ <!ENTITY cmd SYSTEM "file:///etc/passwd"> ]> <productId>&cmd;</productId>lab2 利用 XXE 执行 SSRF 攻击通过构造 xxe 请求特定的 url 获取目录拼接…

MySQL表的操作

1.创建表创建表的语法操作&#xff1a;CREATE TABLE table_name (field1 datatype,field2 datatype,field3 datatype) character set 字符集 collate 校验规则 engine 存储引擎;说明&#xff1a;field 表示列名datatype 表示列的数据类型character set 指定字符集&#xff0c;若…

第2章 cmd命令基础:证书操作(certutil)

Hi~ 我是李小咖&#xff0c;主要从事网络安全技术开发和研究。 本文取自《李小咖网安技术库》&#xff0c;欢迎一起交流学习&#x1fae1;&#xff1a;https://imbyter.com Certutil是一个Windows操作系统自带的命令行工具&#xff0c;主要用于执行各种与数字证书相关的任务&am…

LeetCode100-53最大子数组和

本文基于各个大佬的文章 上点关注下点赞&#xff0c;明天一定更灿烂&#xff01; 前言 Python基础好像会了又好像没会&#xff0c;所有我直接开始刷leetcode一边抄样例代码一边学习吧。本系列文章用来记录学习中的思考&#xff0c;写给自己看的&#xff0c;也欢迎大家在评论区指…

当我们想用GPU(nlp模型篇)

在个人设备上“把 GPU 真正用起来”做 NLP&#xff0c;分五步&#xff1a;准备 → 安装 → 验证 → 训练/推理 → 踩坑排查。下面每一步都给出可复制命令和常见错误。 ────────────────── 1. 硬件准备 • 一张 NVIDIA GPU&#xff0c;算力 ≥ 6.1&#xff08…

celery

celery是什么celery是Python开发的简单的、灵活可靠的、处理大量消息的分布式任务调度模块专注于实时处理的异步任务队列同时支持任务调度celery本身不含消息服务&#xff0c;它使用第三方消息服务来传递任务&#xff0c;支持的消息服务有RabbitMQ、Redis、Amazon SQS,celery本…