Redis 消息队列 Stream

tip:作为程序员一定学习编程之道,一定要对代码的编写有追求,不能实现就完事了。我们应该让自己写的代码更加优雅,即使这会费时费力。

💕💕 推荐:体系化学习Java(Java面试专题)

文章目录

  • 1、什么是 Stream
  • 2、为什么要设计 Stream
  • 3、Stream 命令详解
  • 4、java 写一点 Stream 的 demo
  • 5、Stream 的应用场景

1、什么是 Stream

Stream 是 Redis 5.0 版本中新增的一种数据结构,它是一个高性能、持久化的消息队列,可以用于实现消息的发布和订阅。Stream 可以看作是一个有序的消息队列,每个消息都有一个唯一的 ID,可以根据 ID 进行消息的查找、删除和确认。在 Stream 中,消息以键值对的形式存储,可以存储任意类型的数据。Stream 还支持多个消费者组,每个消费者组可以独立消费消息,避免消息重复消费。Stream 的引入使得 Redis 在消息队列领域更具竞争力,同时也为开发者提供了一种高效、可靠的消息处理方式。

2、为什么要设计 Stream

Redis 设计 Stream 的原因主要是为了满足大规模实时数据处理的需求。在传统的消息队列中,消息的消费者只能消费最新的消息,而无法消费过去的消息。而在实时数据处理中,往往需要对历史数据进行分析和处理,因此需要一种能够存储大量历史数据并支持快速查询和消费的数据结构。Stream 的引入解决了这个问题,它支持持久化存储和快速查询,可以存储大量历史数据,并且支持多个消费者组独立消费消息,从而满足了大规模实时数据处理的需求。此外,Stream 还支持消息的延迟和重试等功能,使得 Redis 在消息队列领域更具竞争力。

3、Stream 命令详解

Stream 是 Redis 5.0 版本新增的一种数据结构,支持高性能、持久化的消息队列。下面是 Stream 命令的详细介绍:

  1. XADD key ID field1 value1 [field2 value2 …]:向指定的 Stream 中添加一条消息,消息的 ID 由用户指定,消息的字段和值由用户指定。
127.0.0.1:6379> XADD mystream 1000 name John age 30
"1000-0"
  1. XRANGE key start end [COUNT count]:返回指定 Stream 中指定范围内的消息,范围由 start 和 end 指定,可以使用 “-” 表示最大或最小 ID,COUNT 参数表示返回消息的数量。
127.0.0.1:6379> XRANGE mystream 1000-0 1000-1
1) 1) "1000-0"
      2) 1) "name"
         2) "John"
         3) "age"
         4) "30"
  1. XREVRANGE key end start [COUNT count]:同 XRANGE 命令,但是返回的消息是逆序的。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREVRANGE mystream 1003-0 1001-0
1) 1) "1003-0"
      2) 1) "name"
         2) "Jack"
         3) "age"
         4) "30"
   2) 1) "1002-0"
      2) 1) "name"
         2) "Mary"
         3) "age"
         4) "28"
   3) 1) "1001-0"
      2) 1) "name"
         2) "Tom"
         3) "age"
         4) "25"
  1. XLEN key:返回指定 Stream 中消息的数量。
127.0.0.1:6379> XLEN mystream
1
  1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定 Stream 中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREAD COUNT 2 BLOCK 1000 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
      2) 1) "1002-0"
         2) 1) "name"
            2) "Mary"
            3) "age"
            4) "28"
  1. XACK key group ID [ID …]:确认指定消费者组已经处理了指定 ID 的消息。
127.0.0.1:6379> XACK mystream mygroup 1000-0
(integer) 1
  1. XGROUP CREATE key groupname ID [MKSTREAM]:创建一个新的消费者组,可以指定组名和起始 ID,如果指定 MKSTREAM 参数,则会自动创建 Stream。
127.0.0.1:6379> XGROUP CREATE mystream mygroup 1000-0
OK
  1. XGROUP SETID key groupname ID:设置消费者组的起始 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP SETID mystream mygroup 1002-0
OK
  1. XGROUP DESTROY key groupname:销毁指定的消费者组。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP DESTROY mystream mygroup
(integer) 1
  1. XGROUP DELCONSUMER key groupname consumername:从指定消费者组中删除指定的消费者。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
  1. XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定消费者组中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1000-0"
         2) 1) "name"
            2) "John"
            3) "age"
            4) "30"
  1. XCLAIM key groupname consumername min-idle-time ID [ID …] [IDLE milliseconds] [TIME milliseconds] [RETRYCOUNT count] [FORCE]:从指定消费者组中获取一条未被确认的消息,并将其标记为正在处理,可以指定最小空闲时间、最大空闲时间、重试次数等参数。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"

127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"

127.0.0.1:6379> XCLAIM mystream mygroup myconsumer 0 1001-0
1) 1) "mystream"
   2) 1) 1) "1001-0"
         2) 1) "name"
            2) "Tom"
            3) "age"
            4) "25"
  1. XDEL key ID [ID …]:从指定 Stream 中删除指定 ID 的消息。
127.0.0.1:6379> XDEL mystream 1000-0
(integer) 1
  1. XTRIM key MAXLEN [~] count:删除 Stream 中多余的消息,可以指定删除的数量或者删除到指定的 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XADD mystream 1004 name Lucy age 27
"1004-0"
127.0.0.1:6379> XADD mystream 1005 name Bob age 32
"1005-0"

127.0.0.1:6379> XTRIM mystream MAXLEN 3
(integer) 2

4、java 写一点 Stream 的 demo

我这边 redis 的版本是 Redis-x64-5.0.14.1,windows 上玩的,绿色版的,下载地址https://github.com/tporadowski/redis/releases

在这里插入图片描述
一下是 demo 的代码:

package com.pany.camp.redis;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamConsumersInfo;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;

import java.util.*;

public class RedisStreamDemo {
    private static final Logger logger = LoggerFactory.getLogger(RedisStreamDemo.class);
    private static final int MESSAGE_READ_COUNT = 1;
    private static final long MESSAGE_READ_TIMEOUT = 120000L;

    public static void main(String[] args) {
        // 创建 Jedis 实例
        try (Jedis jedis = new Jedis("127.0.0.1", 6379)) {
            // 定义 Stream 名称和消费者组名称
            String streamName = "mystream";
            String groupName = "mygroup5";
            // 创建消费者组
            try {
                jedis.xgroupCreate(streamName, groupName, new StreamEntryID(), true);
            } catch (JedisDataException e) {
                // 如果 Stream 已经存在,则忽略异常
                if (!e.getMessage().contains("BUSYGROUP")) {
                    throw e;
                }
            }
            logger.info("消费者组已创建");
            // 添加消息到 Stream 中
            Map<String, String> fields = new HashMap<>();
            fields.put("field1", "value1");
            fields.put("field2", "value2");
            StreamEntryID messageId = jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, fields);
            logger.info("消息已添加到 Stream 中,消息内容为:{}", JSONObject.toJSONString(fields));
            // 读取消息
            Map.Entry<String, StreamEntryID> streams = new AbstractMap.SimpleImmutableEntry<>(streamName,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(groupName, "c1", MESSAGE_READ_COUNT,
                    MESSAGE_READ_TIMEOUT, true, streams);
            logger.info("从 Stream 中读取了 {} 条消息", messages.size());
            for (Map.Entry<String, List<StreamEntry>> entry : messages) {
                String sn = entry.getKey();
                List<StreamEntry> streamMessages = entry.getValue();
                for (StreamEntry message : streamMessages) {
                    logger.info("Stream 名称:{}", sn);
                    logger.info("Message ID:{}", message.getID());
                    logger.info("Message fields:{}", message.getFields());
                }
            }
            // 确认消息已经被消费
            jedis.xack(streamName, groupName, messageId);
            logger.info("消息已确认消费");
            // 删除消息
            jedis.xdel(streamName, messageId);
            logger.info("消息已删除");
        } catch (Exception e) {
            logger.error("执行 Redis 操作出错", e);
        }
    }
}
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.7.0</version>
</dependency>

在这里插入图片描述

5、Stream 的应用场景

Redis Stream 的常用的应用场景:

  1. 消息队列:Stream 可以作为一个高性能的消息队列使用,支持多个消费者对同一 Stream 进行消费,且支持消费者组的管理、消息确认和消息持久化等功能。

  2. 日志收集:Stream 可以作为一个分布式的日志收集系统使用,支持多个客户端将日志写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。

  3. 实时数据处理:Stream 可以作为一个实时数据处理系统使用,支持多个客户端将实时数据写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。

  4. 事件驱动架构:Stream 可以作为一个事件驱动架构的基础设施使用,支持多个事件源将事件写入到同一 Stream 中,且支持按照事件类型和时间戳进行查询和过滤。

💕💕 本文由激流原创,首发于CSDN博客,博客主页 https://blog.csdn.net/qq_37967783?spm=1010.2135.3001.5421
💕💕喜欢的话记得点赞收藏啊

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/27371.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言写网络爬虫总体思路

使用C语言编写爬虫可以实现网络数据的快速获取和处理&#xff0c;适用于需要高效处理海量数据的场景。与其他编程语言相比&#xff0c;C语言具有较高的性能和灵活性&#xff0c;可以进行底层操作和内存管理&#xff0c;适合处理较复杂的网络请求和数据处理任务。 但是&#xf…

Redis学习总结(二)

AOF 为什么是在执行完命令之后记录日志&#xff1f; 关系型数据库&#xff08;如 MySQL&#xff09;通常都是执行命令之前记录日志&#xff08;方便故障恢复&#xff09;&#xff0c;而 Redis AOF 持久化机制是在执行完命令之后再记录日志。AOF 记录日志过程为什么是在执行完命…

【LeetCode】HOT 100(7)

题单介绍&#xff1a; 精选 100 道力扣&#xff08;LeetCode&#xff09;上最热门的题目&#xff0c;适合初识算法与数据结构的新手和想要在短时间内高效提升的人&#xff0c;熟练掌握这 100 道题&#xff0c;你就已经具备了在代码世界通行的基本能力。 目录 题单介绍&#…

chatgpt赋能python:Python创建SEO文章的指南

#Python创建SEO文章的指南 在当今数字化世界中&#xff0c;SEO&#xff08;搜索引擎优化&#xff09;对于拥有一个成功的在线业务至关重要。SEO文章不仅可以帮助提高网站的排名&#xff0c;还可以吸引更多的访问者并提高转化率。在本文中&#xff0c;我们将介绍如何使用Python…

数据分析第17课seaborn绘图

关系型绘图 seaborn.relplot() 这个函数功能非常强大,可以用来表示多个变量之间的关联关系。默认情况下是绘制散点图(散点图是看到变量与变量之间相关性最优的一个图形),也可以绘制线性图,具体绘制什么图形是通过kind参数来决定的。实际上以下两个函数就是relplot的特例…

2023网安面试题170道,轻松应对面试

最近有不少小伙伴跑来咨询&#xff1a; 想找网络安全工作&#xff0c;应该要怎么进行技术面试准备&#xff1f; 工作不到 2 年&#xff0c;想跳槽看下机会&#xff0c;有没有相关的面试题呢&#xff1f; 为了更好地帮助大家高薪就业&#xff0c;今天就给大家分享两份网络安全工…

(1Gbit)MT28EW01GABA1LPC-0SIT、MT28EW01GABA1HPC-0SIT FLASH - NOR 存储器

MT28EW01GABA1LPC-0SIT、MT28EW01GABA1HPC-0SIT 1Gbit并行NOR闪存器件具有较高的密度、就地执行 (XiP) 性能和架构灵活性&#xff0c;可满足汽车、消费类和移动产品的设计要求。该器件非常适合用于GPS/导航、汽车后视摄像头、手机、智能手机和电子阅读器。该器件还具有较宽的温…

使用OpenFlow和Ryu控制器实现网络交换机的软件定义网络(SDN)控制

使用OpenFlow和Ryu控制器实现网络交换机的软件定义网络&#xff08;SDN&#xff09;控制 &#xff08;1&#xff09;环境介绍 硬件环境&#xff1a;系统最低要求为2个CPU 、2 GB内存。 拓扑介绍&#xff1a;云平台具体安装拓扑如图5-4所示。 图5-4 云平台安装拓扑 搭建云平…

NodeJS MongoDB⑦

文章目录 ✨文章有误请指正&#xff0c;如果觉得对你有用&#xff0c;请点三连一波&#xff0c;蟹蟹支持&#x1f618;前言Node&MongoDB 第一步 连接数据库 第二步 创建User Mongodb模型 第三步 简单使用 Mongodb命令 第四步 规范使用 Mongodb命令 &#xff08…

解数独--难的一批

1题目 编写一个程序&#xff0c;通过填充空格来解决数独问题。 数独的解法需 遵循如下规则&#xff1a; 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考示例图&#xff09; 数…

RHEL7同步ntp时间

RHEL7同步ntp时间 RHEL7同步ntp时间测试ntp服务器是否可用抓包分析ntp 查看NTP同步情况ntp服务器配置文件将ntp配置迁移到chronytimedatectl设置时区和时间设置UTC或RTC时间查看所有可用时区查看当前时区设置系统时区启用夏令时timedatectl时间同步timedatectl修改当前日期时间…

基于ADME的分子过滤和 lead-likeness标准

T002 基于ADME的分子过滤和 lead-likeness标准 项目来源于TeachOpenCADD 本文目标 在药物设计的背景下&#xff0c;重要的是通过例如它们的物理化学性质来过滤候选分子。 在这个教程中&#xff0c;从 ChEMBL ( Talktorial T001 )获得的化合物将按照 Lipinsik 的五法则进行…

卷积编码和维特比译码

文章目录 卷积编码维特比译码 卷积编码 卷积码是一种非分组码&#xff0c;通常适用于前向纠错。在分组码中&#xff0c;编码器产生的 n 个码元的一个码组&#xff0c;完全决定于这段时间中 k 比特输入信息。这个码组中的监督位仅监督本码组中 k 个信息位。卷积码在编码时虽然也…

【马蹄集】第十四周作业

第十四周作业 目录 MT2134 泡泡MT2135 调整队伍MT2141 快排变形MT2142 逆序MT2143 线段树 MT2134 泡泡 难度&#xff1a;黄金    时间限制&#xff1a;1秒    占用内存&#xff1a;128M 题目描述 小码哥小时候喜欢吹泡泡&#xff0c;有一次他吹出了 n n n 个一样小的泡泡&…

SSM-Spring+SpringMVC+MyBatis框架的水果商城网站

项目介绍 主要功能&#xff1a; 前端用户购物端&#xff1a; ①角色信息&#xff1a;用户注册、用户登录、个人中心 ②个人中心&#xff1a;基本信息、我的订单、商品收藏、修改密码 ③首页管理&#xff1a;公告、留言、折扣大促销、热门商品 ④商品详情&#xff1a;收藏、加入…

使用阿里云OSS实现图片文件上传

说明&#xff1a;注册用户时&#xff0c;经常会用到上传头像。文件的上传/接收与一般文本数据不同。 一、创建Demo页面 先准备一个Demo页面 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>图片上传…

影响电磁铁磁力大小的因素有哪些

影响电磁铁磁力大小的因素主要有四个&#xff0c;一是缠绕在铁芯上线圈的圈数&#xff0c;二是线圈中电流的强度&#xff0c;三是缠绕的线圈与铁芯的距离&#xff0c;四是铁芯的大小形状。 首先要了解电磁铁的磁性是如何产生的&#xff0c;通电螺线管的磁场&#xff0c;由毕奥&…

总结895

学习目标&#xff1a; 月目标&#xff1a;6月&#xff08;线性代数强化9讲&#xff0c;背诵15篇短文&#xff0c;考研核心词过三遍&#xff09; 周目标&#xff1a;线性代数强化3讲&#xff0c;英语背3篇文章并回诵&#xff0c;检测 每日必复习&#xff08;5分钟&#xff09;…

JMM如何实现volatile写/读的内存语义

内存屏障类型表 StoreLoad Barriers是一个“全能型”的屏障&#xff0c;它同时具有其他3个屏障的效果。现代的多处理器大多支持该屏障&#xff08;其他类型的屏障不一定被所有处理器支持&#xff09;。执行该屏障开销会很昂贵&#xff0c;因为当前处理器通常要把写缓冲区中的数…

基于html+css的图展示112

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…
最新文章