Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】

欢迎来到我的博客,代码的世界里,每一行都是一个故事


在这里插入图片描述

Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案

    • 引言
    • 前言
    • Redis Streams的基本概念和特性
      • 1. 日志数据结构
      • 2. 消息和字段
      • 3. 消费者组
      • 4. 消息ID
      • 5. 实时和历史数据处理
      • 6. 性能和可靠性
    • 实战
      • maven依赖
      • 配置StreamConfig(监听)
      • 配置生产者
      • 配置消费者(组)
      • 配置初始化方法
      • 实现效果
    • 基于List和专业消息队列对比
      • 相比于Redis List解决的痛点:
      • 相比于专业高级队列的不足:
    • 总结

引言

Redis Stream解密:探秘数据流处理的黑科技【一】

解锁Redis Stream新境界:高级用法大揭秘【二】

Redis List:打造高效消息队列的秘密武器【redis实战 一】

前言

在快节奏的技术世界中,消息队列是连接不同服务和组件的关键。而在这个领域,Redis Streams作为一种新兴的消息队列解决方案,以其高性能和易用性吸引了众多开发者的目光。当这项技术遇到了Spring Boot —— 当今最受欢迎的Java开发框架,它们的结合将如何开启新的可能性?让我们开始这趟探索之旅,深入了解如何将这两种强大的技术融合在一起,打造出优雅而强大的消息队列系统。

Redis Streams的基本概念和特性

Redis Streams是Redis数据库的一个强大类型,于Redis 5.0中引入。它主要用于消息队列和事件流的存储与传递,是一个高性能、持久化的日志数据结构。以下是Redis Streams的一些基本概念和核心特性:

1. 日志数据结构

  • 持久化的消息日志:Redis Streams是一个按时间排序的消息日志。每条消息都存储在它被插入时的顺序位置,并且有一个唯一的ID标识。
  • 可追溯性:由于其日志特性,Redis Streams允许你访问历史消息,这对于消息的追溯、重放和延迟处理非常有用。

2. 消息和字段

  • 消息结构:每条消息都可以包含一个或多个字段(field)和值(value)。这类似于一个小的哈希结构,使得每条消息可以携带多个相关的数据点。
  • 灵活的数据模型:你可以根据应用的需要自由定义每条消息包含的字段和数据格式。

3. 消费者组

  • 支持多消费者:Redis Streams可以被多个消费者或多个消费者组同时读取,每个消费者组都会跟踪其成员的进度。
  • 消息确认:消费者读取并处理消息后,可以发送确认,表示消息已被处理。未确认的消息可以被再次处理,确保消息不会因消费者失败而丢失。
  • 故障处理:支持挂起的消息列表和消费者超时检测,使得在消费者失败时可以由其他消费者接手处理消息。

4. 消息ID

  • 自动生成或指定:消息ID通常由Redis自动生成,保证了全局唯一性和顺序性。你也可以手动指定ID以实现更复杂的场景。
  • 组成结构:ID由一个时间戳部分和一个序列号部分组成,格式为<时间戳>-<序列号>

5. 实时和历史数据处理

  • 实时消息处理:通过XREADXREADGROUP命令,你可以实时监听并处理新添加到流中的消息。
  • 历史消息查询:通过XRANGEXREVRANGE等命令,可以查询流中的历史消息,这对于数据分析、审计和消息重放非常有用。

6. 性能和可靠性

  • 高性能:Redis Streams设计用于处理高吞吐量的消息,能够支持每秒数百万消息的读写。
  • 持久化:与Redis的其他数据类型一样,Streams的数据可以持久化到磁盘,保证了数据的持久性和可靠性。

实战

maven依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

配置StreamConfig(监听)

package fun.bo.config;

import fun.bo.consumer.MessageConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import java.time.Duration;

/**
 * @author xiaobo
 */
@Configuration
public class StreamConfig {

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(
            RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) {

        // 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 设置了轮询超时的时间为100毫秒。这意味着当没有新的消息时,容器将每隔100毫秒进行一次轮询。
                        .pollTimeout(Duration.ofMillis(100))
                        // 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。
                        .targetType(String.class)
                        .build();

        // 创建一个可用于监听Redis流的消息监听容器。
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer =
                StreamMessageListenerContainer.create(connectionFactory, options);

        // 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。
        listenerContainer.receive(
                Consumer.from("your-consumer-group", "your-consumer-name"),
                StreamOffset.create("your-stream-name", ReadOffset.lastConsumed()), messageConsumer);

        // 方法启动了消息监听容器,使其开始监听消息。一旦容器被启动,它将开始接收并处理来自Redis流的消息。
        listenerContainer.start();

        return listenerContainer;
    }
}

配置生产者

package fun.bo.produce;

import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
 * @author xiaobo
 */
@Service
@RequiredArgsConstructor
public class MessageProducer {

    private final RedisTemplate<String, String> redisTemplate;


    public void sendMessage(String streamKey, String messageKey, String message) {
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put(messageKey, message);

        RecordId recordId = redisTemplate.opsForStream().add(streamKey, messageMap);
        if (recordId != null) {
            System.out.println("Message sent to Stream '" + streamKey + "' with RecordId: " + recordId);
        }
    }
}

配置消费者(组)

package fun.bo.consumer;

import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Service;

/**
 * @author xiaobo
 */
@Service
public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> {

    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        String stream = message.getStream();
        String messageId = message.getId().toString();
        String messageBody = message.getValue();

        System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId);
        System.out.println("Message body: " + messageBody);
    }
}

配置初始化方法

如果是已经存在stream,则可以不配置,这个主要是为了防止启动报错,org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key ‘your-stream-name’ or consumer group ‘your-consumer-group’ in XREADGROUP with GROUP option

public void initializeStream() {

  StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();

  // 创建一个流
  try {
    streamOperations.createGroup("your-stream-name", ReadOffset.from("0"), "your-consumer-group");
  } catch (Exception e) {
    // 流可能已存在,忽略异常
  }
}

实现效果

在这里插入图片描述

基于List和专业消息队列对比

Redis Streams作为消息队列相比于使用传统的Redis List类型,引入了一系列改进和新功能,同时也与专业的高级消息队列系统(如RabbitMQ、Kafka等)相比存在一些差距。以下是详细的分析:

相比于Redis List解决的痛点:

  1. 更好的消息顺序保证

    • List:虽然List可以保持插入顺序,但在高并发情况下,确保生产者和消费者的顺序一致性较为复杂。
    • Streams:提供了全局唯一的、基于时间的ID来标识消息,确保了消息的全局顺序。
  2. 消费者组支持

    • List:原生List类型不支持消费者组的概念,实现多消费者协调处理同一任务队列较为复杂。
    • Streams:原生支持消费者组,允许多个消费者共享负载,并跟踪各自的进度。
  3. 消息持久化和读取

    • List:读取或消费消息后,需要显式删除,否则会一直保留在List中,处理大量消息时可能会导致内存问题。
    • Streams:消息即使被消费,仍然保留在Stream中,可以随时查询历史消息,且不会因消费而被移除。
  4. 复杂的读取操作

    • List:List提供的操作相对简单,复杂的读取逻辑(如按时间范围查询)需要额外的逻辑来实现。
    • Streams:提供了复杂的查询命令,如XRANGEXREVRANGE,可以按ID范围(时间范围)查询消息。
  5. 消息确认和重试

    • List:需要手动实现消息确认和重试机制,管理起来较为复杂。
    • Streams:提供了消息确认(XACK)和挂起消息查询(XPENDING)的功能,使得消息的重试和故障处理更加容易。

相比于专业高级队列的不足:

  1. 事务和消息持久性保证

    • Redis Streams:虽然提供持久化,但在处理复杂事务和确保消息持久性方面不如一些专业的消息队列系统(如Kafka的WAL日志)。
  2. 集群和分区

    • Redis Streams:在集群环境下使用稍显复杂,且对于数据分区和扩展性的支持不如专业的消息队列系统(如Kafka的分区机制)。
  3. 管理和监控工具

    • Redis Streams:虽然有基本的监控命令,但没有一些高级消息队列系统提供的丰富的管理界面和监控工具。
  4. 高级消息路由和过滤

    • Redis Streams:缺乏一些高级消息队列提供的消息路由和过滤功能(如RabbitMQ的Exchange和Binding)。
  5. 消息传递语义

    • Redis Streams:提供了基础的至少一次处理语义,但可能不像某些系统那样支持严格的只处理一次语义。

总结

Redis Streams提供了一个轻量级、高性能且功能丰富的消息队列实现,解决了使用List作为队列时的许多痛点,特别适合需要快速部署、低延迟和简单可靠的场景。然而,对于需要复杂事务处理、高级路由和过滤、或更丰富管理工具的复杂应用场景,专业的消息队列系统可能更加适合。选择哪种方案,应根据你的具体需求、资源和技术栈来决定。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/273107.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

DVWA靶场中的xss-反射型xss、存储型xss的low、medium、high的详细通关方法

目录 1.DVWA反射型xss &#xff08;1&#xff09;Low&#xff1a; &#xff08;2&#xff09;Medium&#xff1a; &#xff08;3&#xff09;Heigh 2.xss存储型 &#xff08;1&#xff09;Low&#xff1a; &#xff08;2&#xff09;Medium &#xff08;3&#xff09;He…

词法语法语义分析程序设计及实现,包含出错提示和错误恢复

词法说明 (1)关键字 main, int, char, if, else, for, while, void (2)运算符 - * / < < > > ! (3)界符 ; ( ) { } (4)标识符 ID letter(letter|digit)* (5)整型常数 NUM digit digit* (6)空格 ‘ ‘ ‘\n’ ‘\r’ ‘\t’ 空格用来分隔ID,NUM,运算符,界…

idea自动注释

前言 保存一下自己的自动注释代码 idea自动注释 前言1 创建类时&#xff0c;自动生成注释2 在方法上使用快捷键生成注释3 使用方法4 效果图 1 创建类时&#xff0c;自动生成注释 如下&#xff1a; #if (${PACKAGE_NAME} && ${PACKAGE_NAME} ! "")package …

亚马逊美国站ASTM F2613儿童折叠椅和凳子强制性安全标准

ASTM F2613折叠椅和凳子安全标准 美国消费品安全委员会&#xff08;CPSC&#xff09;发布的ASTM F2613儿童折叠椅和凳子的强制性安全标准&#xff0c;已于2020年7月6日生效&#xff0c;并被纳入联邦法规《16 CFR 1232儿童折叠椅和凳子安全标准》。 亚马逊要求在美国站上架的儿…

数据库基础面试第三弹

1. mysql数据库四种常见数据库引擎 1. MyISAM&#xff1a; MyISAM是MySQL最早的数据库引擎之一。它被设计成处理大量的插入和查询操作。MyISAM表格的数据存储在三个文件上&#xff1a;.frm文件存储表结构&#xff0c;.MYD文件存储数据&#xff0c;.MYI文件存储索引。MyISAM表…

【2023年12月18日-12月25日】一周AI咨询更新

上周&#xff0c;关于Google的Bard和Midjourney v6的讨论异常火热。 接下来&#xff0c;让我们回顾一下上周那些引人注目的AI新闻。 ① 已近乎真实拍摄&#xff1a;Midjourney v6的画质令人惊叹 由Midjourney v6制作的图片&#xff0c;质量之高&#xff0c;媲美电影级别&…

Spring高手之路-SpringBean的生命周期

目录 SpringBean的生命周期 整体介绍 详细介绍 1.实例化Bean 2.设置属性值 3.检查Aware 4.调用BeanPostProcessor的前置处理方法 5.调用InitializingBean的afterPropertiesSet方法 6.调用自定义init-method方法 7.调用BeanPostProcessor的后置处理方法 8.注册Destru…

【小黑嵌入式系统第十三课】PSoC 5LP第二个实验——中断控制实验

上一课&#xff1a; 【小黑嵌入式系统第十二课】μC/OS-III程序设计基础&#xff08;二&#xff09;——系统函数使用场合、时间管理、临界区管理、使用规则、互斥信号量 文章目录 1 实验目的2 实验要求3 实验设备4 实验原理4.1 中断(1) 中断机制概述(2) 中断源(3) 中断系统的功…

JMeter(十五)-JMeter监听器

十五、JMeter监听器 1.简介 监听器用来监听及显示JMeter取样器测试结果&#xff0c;能够以树、表及图形形式显示测试结果&#xff0c;也可以以文件方式保存测试结果&#xff0c;JMeter测试结果文件格式多样&#xff0c;比如XML格式、CSV格式。默认情况下&#xff0c;测试结果将…

stm32学习笔记:TIM-定时中断和外部时钟

定时器四部分讲解内容&#xff0c;本文是第一部分 ​​​​​TIM简介 基本定时器 时基单元&#xff1a;预分频器、计数器、自动重装载寄存器 预分频器之前&#xff0c;连接的就是基准计数时钟的输入&#xff0c;由于基本定时器只能选择内部时钟&#xff0c;所以可以认为这根…

最小覆盖子串(LeetCode 76)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路参考文献 1.问题描述 给你一个字符串 s 、一个字符串 t 。返回 s 中涵盖 t 所有字符的最小子串。如果 s 中不存在涵盖 t 所有字符的子串&#xff0c;则返回空字符串 “” 。 注意&#xff1a; 对于 t 中重复字符&#xff…

【DeepLearning】Deep Residual Learning for Image Recognition恺神大作学习

[TOC] Deep Residual Learning for Image Recognition 论文 1. 文章主要想解决什么问题&#xff0c;用了什么方法 深度神经网络在训练过程中的3个关键问题&#xff1a; 梯度消失/爆炸问题&#xff1a;随着网络层数的增加&#xff0c;梯度在反向传播过程中可能会变得非常小&a…

1.SQL - 概述

1. SQL语句分类 • 数据定义语言&#xff1a;简称DDL(Data Definition Language)&#xff0c;用来定义数据库对象&#xff1a;数据库&#xff0c;表&#xff0c;列等。关键字&#xff1a;create&#xff0c;alter&#xff0c;drop等 • 数据操作语言&#xff1a;简称DML(Data …

Spring 依赖注入概述、使用以及原理解析

前言 源码在我github的guide-spring仓库中&#xff0c;可以克隆下来 直接执行。 我们本文主要来介绍依赖注入的使用示例及其原理 依赖注入 什么是依赖注入 依赖注入&#xff08;Dependency Injection&#xff0c;简称DI&#xff09;是一种设计模式&#xff0c;它用于实现对…

【操作系统】探究文件系统奥秘:创建proc文件系统的解密与实战

​&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;Linux专栏&#xff1a;《探秘Linux | 操作系统解密》⏰诗赋清音&#xff1a;月悬苍穹泛清辉&#xff0c;梦随星河徜徉辉。情牵天际云千层&#xff0c;志立乘风意自飞。 ​ 目录 &a…

Vue3使用的Compostion Api和Vue2使用的Options Api有什么不同?

我们介绍Compostion Api和Options Api的区别之前&#xff0c;先来说一下为什么会推出来Composition Api&#xff0c;解决了什么问题&#xff1f; Vue2开发项目使用Options Api存在的问题 代码的可读性和维护性随着组件的变大业务的增多而变得差代码的共享和重用性存在缺点不支…

【centos】【golang】安装golang

下载安装包 方法1&#xff1a; 打开 https://go.dev/dl/ &#xff1b;点击下载golang的安装包&#xff1b;再使用ssh传到centos上&#xff08;略&#xff09; 方法2&#xff1a;能使用Google就可以这样 wget https://dl.google.com/go/go1.21.5.linux-amd64.tar.gz解压安装包…

渗透测试——1.4主动扫描

主动扫描是别人可以发觉的情报收集 一、nmap的使用 1.nmap<目标主机>:最常用的扫描方式 有nmap版本、扫描时间 “host is up”表示目标主机处于开机状态、“not shown”未开放端口 有四个端口是开的&#xff08;135.139.445.912&#xff09; 2.nmap -p<端口范围…

3.docker 安装失败

1、错误描述 2、报错前操作 ① 安装yum工具 yum install -y yum-utils \device-mapper-persistent-data \lvm2 --skip-broken ② 更新本地镜像源 # 设置docker镜像源 yum-config-manager \--add-repo \https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo…

【贪心算法】专题练习一

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析&#xff08;3&#xff09; 前言 1.什么是贪心算法&#xff1f;——贪婪鼠目寸光 贪心策…
最新文章