redis stream 作为消息队列的最详细的命令说明文档

简介

stream 作为消息队列,支持多次消费,重复消费,ack机制,消息异常处理机制。
涉及到以下几个概念,消息流,消费者组,消费者。
在这里插入图片描述
在这里插入图片描述

涉及到以下命令

# 添加消息到流中
XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
# 创建消费则组(加上MKSTREAM,会校验消息流是否存在,不存在会创建)
XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
# 消费者读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]

1. 环境

redis server 7.2.2

> info server
# Server
redis_version:7.2.2

redis可视化工具(可以直接使用命令行) redisInsight 2.44.0

2. 生产消费流程测试

消息队列,涉及到如下几个流程

  1. 发送消息到消息流
  2. 创建消费者组,并进行消费
  3. 正常消费,消息确认 ack
  4. 异常消费,转移消息的归属权 claim

2.1 发送消息到消息流

使用如下命令发送消息

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

各参数说明

参数名说明
NOMKSTREAM默认情况下,如果消息流不存在,则会创建消息流。
使用该参数,则不会创建,如果不存在则返回 (nil)
[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]等同于 xtrim 的参数,在添加消息后,会对 stream 裁剪,将先加入的消息剔除。
MAXLEN 表示stream长度不大于 threshold,MINID 表示stream的消息id不小于 threshold;
= 表示精确删除 ~ 表示近似删除;
threshold 表示长度或者id;
limit count 表示最多剔除多少消息
<* | id>* 表示由系统生成消息id,id 表示用用户指定的消息id
field value [field value …]消息采用键值对列表形式存储
# 1.1 执行 lua 脚本,批量添加 10000 个消息
eval "local key = 'test:stream_1';redis.call('del', key); for i=1,ARGV[1],1 do redis.call('xadd', key, i + '-0', 'index' i) end;local res = redis.call('xinfo', 'stream', key); return res[6];" 0 10000
# 1.2 查看 stream 的信息
xinfo stream test:stream_1

# 1.3 添加消息,后执行精确修剪。不输入 = | ~,表示使用 = 。会添加一条消息,然后删除消息,使流长度为10
xadd test:stream_1 maxlen 10  '10001-0' index 10001

# 1.4 近似删除。stream 中的消息是以基数树的结构存储,一个节点可能存储多个数据,所以当某个节点中存在
# 不能删除的数据时,这个节点就不会删除,因此会导致裁剪后的数据多一些。一个节点会存储 100 个数据。
# 取消 1.3 命令,执行如下命令后,流的长度会变成 101
xadd test:stream_1 maxlen ~ 10 '10001-0' index 10001

# 1.5 精确删除,根据 MINID ;添加一个消息,并将流中所有消息id 小于 1714746952323-9 删除
# 取消 1.3, 1.4 的命令,执行如下命令。结果是,保留 id >= '9001-0' 的所有数据, 流的长度会变成 1001
xadd test:stream_1 minid = '9001-0' '10001-0' index 10001
# 1.6 近似删除,根据 minid 和 limit count。
# 取消 1.3、1.4、1.5 的命令,执行如下命令。结果是,保留 id >= '9001-0' 的所有数据,并且最多删除 8950 个, 流的长度会变成 1101
# 因为限制 删除 8950 个,所以最后一个节点,计算到一半发现不能删除了,所以最后计算的节点的数据全部保留,故只删除了 8900个
xadd test:stream_1 minid ~ '9001-0' limit 8950 '10001-0' index 10001

为什么 xadd 需要添加 xtrim 的操作呢?因为有些消息,如果闲置的时间太长是要废弃掉的;所以可以加上这个。

xinfo stream test:stream_0 返回的结果字段中
radix-tree-keys:表示有几个id节点,一个id节点 至多会存储 100 个 id
radix-tree-nodes: radix tree 节点数量

2. 创建消费者组,消费消息

XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]

2.1 创建消费者组

MKSTREAM: 表示如果 stream 不存在则创建。如果不加 MKSTREAM参数,且stream不存在,执行的 xgroup create 会报错。
ENTRIESREAD:Redis version 7.0.0 可以添加此参数。如果使用了 ENTRIESREAD entries-read 参数, 设置 entries-read 已消费数量;lag 待消费数量, entries-read + lag 等于总数(包含已删除的消息数)

# id = 0 表示 从头开始消费
# id = 具体id,表示从指定 id 之后开始消费,不包含当前id
# id = $ 消费新消息
# 每一个消费者组都有一个 last_delivered_id 记录发送的最后一个消息id, 相互之间不会影响,比如来了一个新消息加入到队列中,通过 xreadgroup 可以让每一个消费者组都消费
# last_delivered_id = 0-0, 1714581497948-0, stream 中的最大的id
xgroup create test:stream_1 test:stream_1:group_0 0
# last_delivered_id = 1714581497948-0
xgroup create test:stream_1 test:stream_1:group_1 1714581497948-0
# last_delivered_id = stream 中的最大的id
xgroup create test:stream_1 test:stream_1:group_2 $

# 执行一下命令之后 消费者组的 entries-read = 1, lag = stream.entries-added - entries-read
xgroup create test:stream_1 test:stream_1:group_3 0 ENTRIESREAD 1
xgroup create test:stream_1 test:stream_1:group_4 1714581525681-0 ENTRIESREAD 1
xgroup create test:stream_1 test:stream_1:group_5 $ ENTRIESREAD 1

2.2 拉取消息消费

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]
  • GROUP group consumer: 与stream 绑定的 消费者组、消费者
  • COUNT count: 查找最大消息数量
  • BLOCK milliseconds: 如果一条消息都没有,阻塞多少时间
  • NOACK: 无需消息确认。相当于在读取的时候就已经确认消息了。
  • STREAMS key [key …] id [id …]
    • id 为 “>” 表示取 stream 中 message_id > consumer_group.last_delivered_id 的消息
    • id 为特定数字,表示 从 padding_list 中取 message_id > id 消息。 使用了具体id, BLOCK 和 NOACK 无效。
# 获取全部未消费的消息
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 STREAMS test:stream_1 >
# 获取至多 10 条消息;若一条消息都没有,等待 20 秒。超时返回 nil
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 count 10 BLOCK 20000 STREAMS test:stream_1 >
# 获取正在消费中的消息
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 count 100 STREAMS test:stream_1 0

3. 正常消费,消息确认 ack

XACK key group id [id ...]

key 流名称
group 组名称
id 消息id

# 读取pel列表(pedding entries list: 消费中的列表)的消息的id,并确认
eval "local key = 'test:stream_1';local list = redis.call('xreadgroup','group','test:stream_1:group_0','test:stream_1:group_0:consumer_0','STREAMS',key,0); local entries = list[1][2];local sum = 0; for i=1,#entries, 1 do sum = sum + redis.call('xack', key, 'test:stream_1:group_0', entries[i][1]); end; return sum;" 0

4. 异常消费,转移消息的归属权 claim

XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]
  • min-idle-time:最小闲置时间,如果闲置时间小于min-idle-time,则不处理
  • IDLE :设置消息的空闲时间(上次发送时间)。如果未指定 IDLE,则假定 IDLE 为 0,即重置时间计数,因为该消息现在有一个新所有者正在尝试处理它。
  • TIME :这与 IDLE 相同,但不是相对的毫秒数,而是将空闲时间设置为特定的 Unix 时间(以毫秒为单位)。像当于设置下发时间
  • RETRYCOUNT :将重试计数器设置为指定值。每次再次传送消息时,该计数器都会递增。通常XCLAIM不会更改此计数器,该计数器仅在调用 XPENDING 命令时提供给客户端:这样客户端可以检测异常情况,例如在大量传递尝试后由于某种原因从未处理的消息。
  • FORCE:即使某些指定的 ID 尚未在分配给其他客户端的 PEL 中,也会在 PEL 中创建待处理消息条目。但是该消息必须存在于流中,否则不存在的消息的 ID 将被忽略。
  • JUSTID:仅返回成功领取的消息ID数组,不返回实际消息。使用此选项意味着重试计数器不会增加。
# 强制处理id为 11-0 的,闲置时间大于 1 小时的消息;设置闲置时间为 0
xclaim test:stream_1 test:stream_1:group_0 test:stream_1:group_0:consumer_2 3600000 '11-0' IDLE 0 TIME 15 RETRYCOUNT 1 FORCE JUSTID
# 设置下发时间,并返回待处理消息列表
eval "local key = 'test:stream_1';local group = 'test:stream_1:group_0';local consumer = 'test:stream_1:group_0:consumer_2';local id = '11-0';local t = redis.call('time');local time = t[1] * 1000;redis.call('xclaim', key, group, consumer, 3600, id, 'TIME', time, 'RETRYCOUNT', 1, 'FORCE', 'JUSTID');return redis.call('xpending', key, group, '-', '+', 10, consumer);" 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/596664.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Al加码,引爆“躺平式”旅游 | 最新快讯

旅游业正迎来新的技术浪潮。 文&#xff5c;锌刻度&#xff0c;作者&#xff5c;孟会缘&#xff0c;编辑&#xff5c;李季 今年的五一&#xff0c;“微度假”“微旅行”纷纷出圈。 相较于三亚、云南等老牌旅游大热门&#xff0c;人们开始寻找一些不用“人挤人”的小众旅行目的…

谁能取代迈巴赫,征服互联网安全大佬周鸿祎?

‍作者 |老缅 编辑 |德新 4月18日&#xff0c;「周鸿祎卖车」登上了微博热搜。这位360创始人、董事长发微博称&#xff1a;自己做了一个艰难的决定&#xff0c;将把陪伴9年的迈巴赫600给卖掉。 随后&#xff0c;他解释道&#xff1a;「这是因为我需要体验新一代车的感觉。古人…

SQL注入——绕过information

衔接上文&#xff0c;进一步对SQL注入less-1进行禁止information的操作&#xff0c;上文连接如下&#xff1a; SQL注入less-1-CSDN博客 一、对less-1进行编辑 增加一段代码&#xff0c;作用是禁止information字段 二、进行检查 可以看到代码已经生效&#xff0c;禁止用infor…

TypeError报错处理

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 一、Python中的TypeError简介 这个错误通常表示在方法调用时&#xff0c;参数类型不正确&#xff0c;或者在对字符串进行格式化操作时&#xff0c;提供的变量与预期不符。 二、错误的源头&#xff1a;字符串格式化…

调用第三方接口——支付宝付款

沙箱环境是支付宝开放平台为开发者提供的用于接口开发及主要功能联调的模拟环境。 参考 登录 - 支付宝 在沙箱环境下&#xff0c;已经分配好了用于模拟测试的应用信息、商家信息、买家信息等 小程序文档 - 支付宝文档中心 内网穿透&#xff08;支付宝付款需要在公网进行检查…

MybatisPlus也能轻松生成三层架构代码?

&#x1f469;&#x1f3fd;‍&#x1f4bb;个人主页&#xff1a;阿木木AEcru &#x1f525; 系列专栏&#xff1a;《Docker容器化部署系列》 《Java每日面筋》 &#x1f4b9;每一次技术突破&#xff0c;都是对自我能力的挑战和超越。 目录 一、前言三层架构的流程图为什么使用…

为什么需要自动化测试?自动化有哪些优势?

前言 自动化测试&#xff0c;最近些年可谓是大火。招聘上的要求也好&#xff0c;培训班的广告也罢&#xff0c;比比皆是&#xff0c;足以说明它在业内的火爆程度。 虽然说会写自动化测试并不能说明你就很牛批&#xff0c;但是你不会的话&#xff0c;那么很抱歉&#xff0c;你…

保持 Hiti 证卡打印机清洁的重要性和推荐的清洁用品

在证卡印刷业务中&#xff0c;保持印刷设备的清洁至关重要。特别是对于 Hiti 证卡打印机来说&#xff0c;它们是生产高质量证卡的关键工具。保持设备清洁不仅可以保证打印质量和效率&#xff0c;还可以延长其使用寿命。本文将探讨保持 Hiti 证卡打印机清洁卡的重要性&#xff0…

数码管的显示

静态数码管显示 数码管有两种一种的负电压促发,一种是正电压促发,上图是单数码管的引脚 上图是数码管模组的引脚,采用了引脚复用技术 咱们这个单片机由8个单数码管,所以要用上38译码器,如下图 74138使能端,单片机上电直接就默认接通了 74HC245的作用是稳定输入输出,数据缓冲作…

Rust Course学习(编写测试)

如果友友你的计算机上没有安装Rust&#xff0c;可以直接安装&#xff1a;Rust 程序设计语言 (rust-lang.org)https://www.rust-lang.org/zh-CN/ Introduce 介绍 Testing in Rust involves writing code specifically designed to verify that other code works as expected. It…

线上线下包搭建小程序/公众号/H5 支持二开!

网上交友有以下三个积极影响&#xff1a; 1. 扩展社交圈和增加社交机会&#xff1a;网上交友可以让人们接触到不同地区、不同背景、不同文化的人&#xff0c;拓展人们的社交圈并且增加交友机会。这些新的社交联系对于个人的成长和发展有积极的影响&#xff0c;可以让人们学习新…

奶爸预备 |《P.E.T.父母效能训练:让亲子沟通如此高效而简单:21世纪版》 / 托马斯·戈登——读书笔记

目录 引出致中国读者译序前言第1章 父母总是被指责&#xff0c;而非受训练第2章 父母是人&#xff0c;不是神第3章 如何听&#xff0c;孩子才会说&#xff1a;接纳性语言第4章 让积极倾听发挥作用第5章 如何倾听不会说话的婴幼儿第6章 如何听&#xff0c;孩子才肯听第8章 通过改…

[每日AI·0506]巴菲特谈 AI,李飞飞创业,苹果或将推出 AI 功能,ChatGPT 版搜索引擎

AI 资讯 苹果或将推出 AI 功能&#xff0c;随 iPhone 发布2024 年巴菲特股东大会&#xff0c;巴菲特将 AI 类比为核技术 巴菲特股东大会 5 万字实录消息称 OpenAI 将于 5 月 9 日发布 ChatGPT 版搜索引擎路透社消息&#xff0c;斯坦福大学 AI 领军人物李飞飞打造“空间智能”创…

leetCode75. 颜色分类

leetCode75. 颜色分类 题目思路 代码 class Solution { public:void sortColors(vector<int>& nums) {for(int i 0, j 0, k nums.size() - 1; i < k;){if(nums[i] 0) swap(nums[i],nums[j]);else if(nums[i] 2) swap(nums[i],nums[k--]);else if(nums[i] …

基于Springboot+Vue+Java的学生就业管理系统

&#x1f49e; 文末获取源码联系 &#x1f649; &#x1f447;&#x1f3fb; 精选专栏推荐收藏订阅 &#x1f447;&#x1f3fb; &#x1f380;《Java 精选实战项目-计算机毕业设计题目推荐-期末大作业》&#x1f618; 更多实战项目~ https://www.yuque.com/liuyixin-rotwn/ei3…

Docker容器:Docker-Consul 的容器服务更新与发现

目录 前言 一、什么是服务注册与发现 二、 Docker-Consul 概述 1、Consul 概念 2、Consul 提供的一些关键特性 3、Consul 的优缺点 4、传统模式与自动发现注册模式的区别 4.1 传统模式 4.2 自动发现注册模式 5、Consul 核心组件 5.1 Consul-Template组件 5.2 Consu…

自动驾驶融合定位:IMU内参模型及标定

自动驾驶融合定位&#xff1a;IMU内参模型及标定 一、 概述 标定的本质是参数辨识。首先明确哪些参数可辨识&#xff0c;其次弄清怎样辨识。 参数包括陀螺仪和加速度计各自的零偏、标度因数、安装误差。 辨识就比较丰富了&#xff0c;如果让各位先不局限于标定任务&#xf…

HCIP-Datacom-ARST必选题库_BGP【道题】

1.关于summary automatic命令和BGP聚合的描述,错误的是? 该命令用于实现自动聚合,其优先级高于手动聚合 配置该命令后,BGP将按自然网段聚合路由 该命令用来使能对本地引入的路由进行自动聚合 配置该命令后,BGP只向对等体发送聚合后的路由 1.关于summary automatic命令和BGP聚…

C++初阶学习第五弹——类与对象(下)——类与对象的收官战

类与对象&#xff08;上&#xff09;&#xff1a;C初阶学习第三弹——类与对象&#xff08;上&#xff09;——初始类与对象-CSDN博客 类与对象&#xff08;中&#xff09;&#xff1a;C初阶学习第四弹——类与对象&#xff08;中&#xff09;——刨析类与对象的核心点-CSDN博…

Linux环境下的事件驱动力量:探索Libevent的高性能I/O架构

hello &#xff01;大家好呀&#xff01; 欢迎大家来到我的Linux高性能服务器编程系列之《Linux环境下的事件驱动力量&#xff1a;探索Libevent的高性能I/O架构》&#xff0c;在这篇文章中&#xff0c;你将会学习到Libevent的高性能I/O原理以及应用&#xff0c;并且我会给出源码…