Redis Stream 消息队列总结

📅 2026/7/5 2:07:07 👁️ 阅读次数 📝 编程学习
Redis Stream 消息队列总结

1. Stream 是什么

Redis Stream 是 Redis 提供的一种消息队列数据结构,用于保存和传递一系列消息。

它的核心特点是:

  • 消息有唯一 ID。

  • 消息会持久化保存在 Redis 中,不会像 Pub/Sub 一样发送后立刻丢失。

  • 支持消费者组。

  • 支持消息确认机制。

  • 支持查看未确认消息并重新处理。

  • 适合订单、异步任务、日志、通知等可靠消息场景。

可以把 Stream 理解为一个“消息流”:

生产者 → Stream 消息队列 → 消费者

例如秒杀业务中:

用户下单请求 ↓ Lua 脚本校验库存和重复下单 ↓ 将订单消息写入 stream.orders ↓ 消费者异步读取订单消息 ↓ 写入 MySQL 数据库

2. Stream 中的消息结构

Stream 中的一条消息由两部分组成:

消息 ID + 多个 field-value 数据

例如:

1720000000000-0 voucherId = 10 userId = 1001 orderId = 20001

其中:

  • 1720000000000-0:消息 ID。

  • voucherId:优惠券 ID。

  • userId:用户 ID。

  • orderId:订单 ID。

消息 ID 的格式通常是:

毫秒时间戳-序号

例如:

1720000000000-0 1720000000000-1

同一毫秒内写入多条消息时,后面的序号会递增。


3. 添加消息:XADD

生产者使用XADD向 Stream 中写入消息。

XADD stream.orders * voucherId 10 userId 1001 orderId 20001

含义:

stream.orders :Stream 名称 * :让 Redis 自动生成消息 ID voucherId 10 :字段和值 userId 1001 orderId 20001

如果stream.orders不存在,Redis 会自动创建它。

执行后会返回一条消息 ID,例如:

1720000000000-0

Java 中通过 Lua 脚本写入消息时,通常类似:

redis.call( 'xadd', 'stream.orders', '*', 'voucherId', voucherId, 'userId', userId, 'orderId', orderId )

注意:XADD后面必须传入合法的消息 ID。

错误示例:

redis.call('xadd', 'stream.orders', 'voucherId', voucherId)

这里 Redis 会把voucherId当成消息 ID,因此报错:

ERR Invalid stream ID specified

正确写法必须加上:

'*'

即:

redis.call('xadd', 'stream.orders', '*', ...)

4. 不使用消费者组读取消息:XREAD

最基础的读取方式是XREAD

XREAD COUNT 1 STREAMS stream.orders 0

含义:

COUNT 1 :最多读取 1 条消息 STREAMS stream.orders:读取哪个 Stream 0 :从最早的消息开始读

也可以读取最新消息:

XREAD BLOCK 2000 COUNT 1 STREAMS stream.orders $

含义:

BLOCK 2000 :没有消息时阻塞等待 2 秒 $ :只读取命令执行之后新产生的消息

但是XREAD有一个问题:

多个消费者可能读取到同一条消息。

因此它不适合多个消费者协作处理订单这类任务。

这时需要使用消费者组。


5. 消费者组是什么

消费者组用于实现“多个消费者共同处理同一批消息”。

结构可以理解为:

Stream ├── 消费者组 g1 │ ├── 消费者 c1 │ ├── 消费者 c2 │ └── 消费者 c3 │ └── 消费者组 g2 ├── 消费者 c4 └── 消费者 c5

关键规则:

不同消费者组之间:广播

同一条消息可以被不同组分别消费。

例如:

stream.orders ↓ g1:订单服务消费 g2:日志服务消费

那么同一条订单消息:

g1 可以收到 g2 也可以收到

这相当于广播。

同一个消费者组内:竞争消费

同一组中的多个消费者会竞争消息。

例如:

g1 ├── c1 ├── c2 └── c3

某一条消息只会分配给其中一个消费者:

消息 A → c1 消息 B → c2 消息 C → c3

不会出现同一条消息同时被c1c2c3正常消费的情况。

所以:

想实现广播:创建多个消费者组。 想实现负载均衡:在同一个消费者组中创建多个消费者。

6. 创建消费者组:XGROUP CREATE

创建消费者组命令:

XGROUP CREATE stream.orders g1 0 MKSTREAM

含义:

stream.orders :Stream 名称 g1 :消费者组名称 0 :从最早的消息开始读取 MKSTREAM :如果 Stream 不存在,则自动创建

其中:

0

表示消费者组从 Stream 的第一条消息开始消费。

如果写成:

$

表示消费者组只消费创建之后的新消息,不处理历史消息。

例如:

XGROUP CREATE stream.orders g1 $ MKSTREAM

含义是:

忽略当前已有消息, 只消费以后新增的消息。

如果消费者组已经存在,再执行创建命令会报错:

BUSYGROUP Consumer Group name already exists

这不是 Stream 出错,而是表示:

g1 这个消费者组已经创建过了。

通常不需要重复创建。


7. 使用消费者组读取消息:XREADGROUP

消费者组读取消息使用:

XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders >

含义:

GROUP g1 c1 :g1 是消费者组,c1 是消费者名称 COUNT 1 :最多读取 1 条消息 STREAMS stream.orders > :读取从未分配给消费者的新消息

其中最重要的是:

>

它表示:

读取消费者组中还没有被任何消费者领取过的新消息。

例如:

XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders >

消费者c1获取一条新消息后,这条消息会被记录为:

已投递给 c1,但尚未确认。

此时它会进入 Pending List,也就是待确认消息列表。


8. Pending List:未确认消息列表

消费者读取消息后,消息不会马上彻底完成。

Redis 会先将消息记录到消费者组的 Pending List 中。

流程如下:

消费者读取消息 ↓ 消息进入 Pending List ↓ 消费者执行业务逻辑 ↓ 业务成功后发送 XACK ↓ 消息从 Pending List 移除

例如订单业务:

读取订单消息 ↓ 创建订单 ↓ 扣减数据库库存 ↓ 订单写入成功 ↓ XACK 确认消息

如果消费者读取后宕机:

消息没有 XACK

那么消息仍然保留在 Pending List 中,不会丢失。

这就是 Stream 比 Pub/Sub 更可靠的原因之一。


9. 确认消息:XACK

消费者处理成功后,需要手动确认消息。

命令:

XACK stream.orders g1 1720000000000-0

含义:

stream.orders :Stream 名称 g1 :消费者组 1720000000000-0 :消息 ID

确认后:

消息会从 g1 的 Pending List 中移除。

注意:

XACK 不会删除 Stream 中的原始消息。

它只是表示:

这个消费者组已经成功处理过这条消息。

所以:

消息仍然可以被其他消费者组消费。

10. 为什么必须手动确认

因为 Redis 不知道你的业务是否真正执行成功。

例如消费者读取到订单消息后,可能发生:

1. 数据库写入失败 2. 数据库事务回滚 3. 服务宕机 4. 网络异常 5. 消费者代码报错

如果 Redis 自动确认消息,就可能出现:

Redis 认为消息已处理, 但订单实际上没有写入数据库。

这会导致消息丢失。

因此正确流程应该是:

先完成业务逻辑 ↓ 确认数据库事务成功 ↓ 再执行 XACK

即:

业务成功后确认消息, 业务失败不要确认消息。

11. 读取 Pending List 中的消息

当消费者宕机或处理失败后,消息会留在 Pending List 中。

可以使用:

XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0

注意这里最后一个参数不是>,而是:

0

含义:

读取当前消费者 c1 自己尚未确认的消息。

区别如下:

> :读取从未被分配过的新消息 0 :读取当前消费者未确认的历史消息

在实际项目中,常见流程是:

启动消费者 ↓ 先读取 Pending List 中未确认消息 ↓ 处理完成后 XACK ↓ 再循环读取新消息

这样可以避免服务重启后遗留消息一直无法处理。


12. XPENDING:查看未确认消息

查看消费者组中是否有未确认消息:

XPENDING stream.orders g1

可以看到:

未确认消息数量 最早未确认消息 ID 最新未确认消息 ID 每个消费者持有的未确认消息数量

查看具体 Pending 消息:

XPENDING stream.orders g1 - + 10

含义:

- + :查询全部范围 10 :最多查看 10 条

13. Stream 消费流程总结

完整流程如下:

生产者执行 XADD ↓ 消息写入 Stream ↓ 消费者组读取消息 XREADGROUP ↓ 消息进入 Pending List ↓ 消费者执行业务逻辑 ↓ 业务成功:XACK 业务失败:不 XACK ↓ 未确认消息后续重新处理

可以写成:

XADD → XREADGROUP → 业务处理 → XACK

14. Stream 与 List、Pub/Sub 的区别

List 消息队列

List 常用命令:

LPUSH queue message BRPOP queue 0

特点:

优点: - 简单 - 支持阻塞读取 - 可以实现基础任务队列 缺点: - 没有消费者组 - 没有消息确认机制 - 消费失败后不方便恢复 - 多消费者场景功能较弱

Pub/Sub

Pub/Sub 常用命令:

SUBSCRIBE channel PUBLISH channel message

特点:

优点: - 实时广播 - 多个订阅者都可以收到消息 - 使用简单 缺点: - 不保存历史消息 - 订阅者离线期间会丢消息 - 没有确认机制 - 不适合订单等可靠业务

Stream

特点:

优点: - 消息持久化 - 支持消费者组 - 组内竞争消费 - 组间广播消费 - 支持 ACK 确认 - 支持 Pending List - 支持故障恢复

适合:

秒杀订单 异步下单 支付通知 日志收集 任务队列 事件驱动业务

15. 秒杀订单中的 Stream 设计

秒杀业务通常分为两部分。

第一部分:Redis + Lua 脚本

Lua 脚本负责快速校验:

库存是否充足 用户是否重复下单 扣减 Redis 库存 记录用户已购买 写入 Stream 消息

Lua 脚本成功后,将订单消息写入:

stream.orders

例如:

redis.call( 'xadd', 'stream.orders', '*', 'voucherId', voucherId, 'id', userId, 'orderId', orderId )

这里的字段名id虽然可以使用,但更推荐写成:

'userId'

因为它表达更清晰:

redis.call( 'xadd', 'stream.orders', '*', 'voucherId', voucherId, 'userId', userId, 'orderId', orderId )

第二部分:消费者异步创建订单

消费者不断读取消息:

stream.orders

然后执行:

创建 VoucherOrder 写入数据库 扣减数据库库存 检查是否重复下单

成功后执行:

XACK

如果数据库事务失败,例如:

userId 为 null 数据库字段不允许为空 唯一索引冲突 save() 失败 事务回滚

那么:

不能执行 XACK。

否则 Redis 会认为消息已成功消费,但数据库订单并没有创建成功。


16. 最重要的几个命令

# 添加消息 XADD stream.orders * voucherId 10 userId 1001 orderId 20001 # 创建消费者组 XGROUP CREATE stream.orders g1 0 MKSTREAM # 读取新消息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders > # 确认消息 XACK stream.orders g1 1720000000000-0 # 读取当前消费者未确认的消息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.orders 0 # 查看 Pending List 概况 XPENDING stream.orders g1 # 查看具体 Pending 消息 XPENDING stream.orders g1 - + 10

17. 核心结论

Redis Stream 的重点可以记成下面几句话:

Stream 是可靠消息队列。 XADD 用于写入消息。 消费者组用于管理多个消费者。 不同消费者组之间是广播关系。 同一消费者组内部是竞争消费关系。 消费者读取消息后,消息会进入 Pending List。 业务处理成功后必须 XACK。 没有 XACK 的消息可以在后续重新处理。 > 用于读取新消息。 0 用于读取当前消费者未确认的消息。