【Redis实战】有MQ为啥不用?用Redis作消息队列!?Redis作消息队列使用方法及底层原理高级进阶

 🎉🎉欢迎光临🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟特别推荐给大家我的最新专栏《Redis实战与进阶》

本专栏纯属为爱发电永久免费!!!

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽icon-default.png?t=N7T8http://suzee.blog.csdn.net/

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  

我们用的是云托管的的服务 那自然是部署中间件到云服务上去了 服务是一路开通 结果到了需要调试的时候 怎么也连不上 (说是内网直连,但关键是 同事们都在线下做本地测试的呀)

直接无语了 面对这一场景 怎么办?业务还要继续 等着交货的  于是我想起了之前学过的技术栈 

Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的

正片

目录

最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了  

Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作  步骤还比MQ简单  下面就来看是如何实现的

正片

Redis作为消息队列的优缺点:

使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点和:

缺点也很明显:

应用场景:

Redis实现消息队列系统 实现步骤:

配置Redis:

设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

实现消息的发布和订阅功能。

实战与改良

代码解释

我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)

但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低

那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  

遇到的问题:

对了 中途遇到了这样一个错误

原因与分析:

实际业务中的测试

发布服务

订阅服务(监听服务)

测试结果


Redis作为消息队列的优缺点:

使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点和:

  1. 简单轻量:Redis是一个内存中的数据存储系统,具有轻量级和简单的特点。相比较专门的消息队列系统,使用Redis作为消息队列不需要引入额外的组件和依赖,可以减少系统的复杂性。

  2. 速度快:由于Redis存储在内存中,它具有非常高的读写性能。这对于需要低延迟的应用程序非常有优势。

  3. 多种数据结构支持:Redis提供了丰富的数据结构,如列表、发布/订阅、有序集合等。这使得Redis在处理不同类型的消息和任务时更加灵活。

  4. 数据持久化:Redis可以通过将数据持久化到磁盘来提供数据的持久性。这意味着即使Redis重启,之前的消息也不会丢失。

  5. 广泛的应用场景:Redis不仅可以用作消息队列,还可以用作缓存、数据库、分布式锁等多种用途。如果你的应用程序已经使用了Redis,那么使用Redis作为消息队列可以减少技术栈的复杂性。

缺点也很明显:

  1. 缺少一些高级特性:相对于专门的消息队列系统,Redis在消息队列方面的功能可能相对简单。例如,它可能缺乏一些高级消息传递功能,如消息重试、消息路由、持久化消息等。

  2. 可靠性和一致性:Redis的主要设计目标是提供高性能和低延迟,而不是强一致性和高可靠性。在某些情况下,Redis可能会丢失消息,或者在出现故障时可能无法提供持久性保证。

应用场景:

适用于简单的中小型项目 如果功能简单,访问量并不大可以考虑
如果你的应用程序对可靠性和高级功能有严格要求,并且需要处理大量的消息和复杂的消息路由,那么使用专门的消息队列系统可能更合适。

Redis实现消息队列系统 实现步骤:

配置Redis:

  1. 首先,确保你已经正确地配置了Redis和Lettuce依赖项,并创建了LettuceConnectionFactory对象。

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
      redis:
        host: 
        port: 6379
        password: 
        lettuce:
          pool:
            max-active: 1000
            max-idle: 1000
            min-idle: 0
            time-between-eviction-runs: 10s
            max-wait: 10000

  2. 创建一个RedisTemplate对象,并将LettuceConnectionFactory设置为其连接工厂:

 @Bean
    public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setDefaultSerializer(new StringRedisSerializer());
        return template;
    }

设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:

redisTemplate.setDefaultSerializer(new StringRedisSerializer());

实现消息的发布和订阅功能。

  • 发布消息:
    redisTemplate.convertAndSend("channel_name", "message_payload");

    在上述代码中,"channel_name"是消息的通道名称,"message_payload"是要发布的消息内容。

  • 订阅消息:
  • 首先,创建一个MessageListener实现类来处理接收到的消息:

public class MessageListenerImpl implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 处理接收到的消息
        String channel = new String(message.getChannel());
        String payload = new String(message.getBody());
        // 执行自定义的逻辑
    }
}

创建一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类:

LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl());
listenerAdapter.afterPropertiesSet();

创建一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器:

RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(lettuceConnectionFactory);
listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("通道名称"));
listenerContainer.start();

通过以上步骤,我们创建了一个LettuceConnectionFactory对象来与Redis服务器建立连接。然后,我们创建了一个MessageListener实现类来处理接收到的消息。接下来,我们创建了一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类。最后,我们创建了一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器,然后启动容器以开始监听指定通道上的消息。

以上的方案 好处就是 可以很明显的知道监听者在哪个部分 监听对应通道的信息 然而 业务当中 如果每一个对应模块的业务和通道都建立一个监听者来进行监听(我们假设每一个就业务所要得到消息以后所执行的逻辑都不相同) 那这个工作量就会暴增 

于是就有了第二种写法 :

实战与改良

/***
 * @title MessageManager
 * @author SUZE
 * @Date 2-17
 **/
@Component
public class ReservedMessageManager {
    private String ListenerId;
    private String UserId;
    private String message;
    private final RedisTemplate<String, String> redisTemplate;

    @Autowired
    public ReservedMessageManager(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
        subscribeToChannel("reserved");
    }
    @Resource
    private SmsServer smsServer;

    public void publishMessage(String channel, reserveMessage message) {
        String  Message=serialize(message);
        redisTemplate.convertAndSend("channel_name", "message_payload");
        redisTemplate.convertAndSend(channel, Message);
    }
    // 接收到消息时触发的事件
    private void handleReserveMessage(String channel, reserveMessage reserveMessage) {
        if (reserveMessage != null) {
            String userId = reserveMessage.getUserId();
            String ListenerId=reserveMessage.getListenerId();
            String message = reserveMessage.getMessage();
            //TODO 处理接收到的消息逻辑 这里后续要对Message进行一个检测他有wait agree和refused和over四种状态 思种状态就是不一样的发送内容
            switch (message){
                //TODO 消息要给两边都发 所以要发两份 发信息的文案
                case "wait":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "agree":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "refuse":

                    smsServer.sendSms(userId,ListenerId,message);
                    break;
                case "over":
                    //这里要操作文档系统了

                    //拒绝的话 那就要监听一下
                    smsServer.sendSms(userId,ListenerId,message);
                    break;

            }
            //smsServer.sendSms(userId,ListenerId,message);
            // 其他处理逻辑...
        }
    }

    public void subscribeToChannel(String channel) {
        redisTemplate.execute((RedisCallback<Object>) (connection) -> {
            connection.subscribe((message, pattern) -> {
                String channelName = new String(message.getChannel());
                byte[] body = message.getBody();
                // 解析接收到的消息
                switch (channelName){
                    case "reserved":
                        reserveMessage reserveMessage = deserializeMessage(new String(body));
                        handleReserveMessage(channelName, reserveMessage);
                        break;
                    //还有其他的通道 例如refuse就是一个 拒绝通道 专门监听拒绝的理由
                }
            }, channel.getBytes());
            return null;
        });
    }
    // 反序列化
    private reserveMessage deserializeMessage(String body) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(body, reserveMessage.class);
        } catch (IOException e) {
            // 处理反序列化异常
            e.printStackTrace();
            return null;
        }
    }

    // 序列化
    public String serialize(reserveMessage reserveMessage) throws SerializationException {
        if (reserveMessage == null) {
            return null;
        }
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsString(reserveMessage);
        } catch (JsonProcessingException e) {
            throw new SerializationException("Error serializing object", e);
        }
    }

}

代码解释

  1. subscribeToChannel方法接受一个channel参数,用于指定要订阅的通道名称。
  2. redisTemplate.execute方法用于执行Redis操作,并传入一个RedisCallback回调函数。
  3. 回调函数使用lambda表达式的形式实现,接受一个connection参数,表示与Redis的连接。
  4. 在回调函数中,调用connection.subscribe方法来订阅通道。该方法接受一个回调函数作为参数,用于处理接收到的消息。
  5. 在消息回调函数中,首先从message对象中获取通道名称和消息体。
  6. 使用new String(message.getChannel())将通道名称转换为字符串类型,并存储在channelName变量中。
  7. 使用message.getBody()获取消息体的字节数组表示,并存储在body变量中。
  8. switch语句中,根据通道名称进行不同的处理。在这个例子中,仅处理"reserved"通道。
  9. 对于"reserved"通道的处理,调用deserializeMessage方法将消息体反序列化为reserveMessage对象,并将其存储在名为reserveMessage的局部变量中。
  10. 调用handleReserveMessage方法,将通道名称和反序列化后的reserveMessage对象作为参数进行处理。
  11. handleReserveMessage方法用于处理接收到的保留消息的逻辑。它检查消息类型,并根据类型执行不同的操作。根据消息类型,它调用smsServer.sendSms方法向指定的userIdlistenerId发送短信。

我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支  那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)

但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低

那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)  

遇到的问题:


对了 中途遇到了这样一个错误

错误信息:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `TopOne.MessageSystem.entity.reserveMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

原因与分析:

reserveMessage类缺少默认构造函数,这导致Jackson库无法构造该类的实例。错误消息中提到了以下内容:"Cannot construct instance of TopOne.MessageSystem.entity.reserveMessage (no Creators, like default constructor, exist)"。
为了使Jackson能够正确地反序列化对象,需要在reserveMessage类中添加一个默认构造函数。默认构造函数是一个无参数的构造函数,它不需要任何参数来创建对象。
在你的reserveMessage类中

这个是改好的封装类:
 

@Data
public class reserveMessage {
    private String UserId;
    private String ListenerId;
    private String message;


    public reserveMessage() {
        // 默认构造函数
    }
    public reserveMessage(String userId, String ListenerId,String message) {
        this.UserId = userId;
        this.ListenerId = ListenerId;
        this.message=message;
    }


}

实际业务中的测试

发布服务

订阅服务(监听服务)

测试结果

成功

这里面的打印是代替了原本业务中的短信发送 也算是成了

这一期就到这 感谢观看

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/391517.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

IDEA配置Lombok不起作用

IDEA配置Lombok不起作用 我们通常会只用lombok来简化代码。但是使用IDEA的lombok插件时&#xff0c;Lombok并不起作用。 可以按照如下操作。 FIle -> settings ->build,excecution,deployment–>compiler–>annotation processors勾选上 enable annotation proc…

ubuntu22.04安装jenkins并配置

准备 更新系统 sudo apt update sudo apt upgrade环境准备 jdk 安装 sudo apt install openjdk-11-jdk验证 java -versiongit ubuntu配置git maven ubuntu配置maven 部署 添加 Jenkins 存储库 导入Jenkins存储库的GPG密钥 wget -q -O - https://pkg.jenkins.io/de…

什么是PAGA系统

PAGA系统是一种公共广播和通用报警系统&#xff0c;它在船舶、海上钻井平台、石油化工、天然气开采等行业的应用非常广泛。当遇到紧急情况或其他特殊情况时&#xff0c;PAGA系统能够在大范围内进行喊话广播或报警。这种系统通过自动电话系统&#xff08;如PABX&#xff0c;即自…

Unity 2D Spine 外发光实现思路

Unity 2D Spine 外发光实现思路 前言 对于3D骨骼&#xff0c;要做外发光可以之间通过向法线方向延申来实现。 但是对于2D骨骼&#xff0c;各顶点的法线没有向3D骨骼那样拥有垂直于面的特性&#xff0c;那我们如何做2D骨骼的外发光效果呢&#xff1f; 理论基础 我们要知道&a…

前端小案例——购买电影票(HTML+CSS+JS, 附源码)

一、前言 实现功能&#xff1a; 这段代码实现了一个简单的电影票选座购买的功能界面。 在页面上展示了一个电影院的座位布局&#xff0c;以及右侧显示了电影信息、选座情况、票价、总计等内容。 用户可以通过点击座位来选择购买电影票&#xff0c;每个座位的状态会在点击时改…

Arrays工具类的常见方法总结

一、Arrays.asList( ) 1、作用 Arrays.asList( )可以将一个数组以集合的形式传入一个集合对象。通常用来将一组元素全部添加到集合中。 2、参数及返回值 参数&#xff1a;一组动态参数 返回值&#xff1a;List<T>集合 3、应用举例 List<String> boyListArra…

2023年程序员观察报告

春节假期已过&#xff0c;2023年悄然过去&#xff0c;2024年已经到来&#xff0c;无论2023年是快乐的、成长的、积极的&#xff0c;亦或是痛苦的、寂寥的、迷茫的&#xff0c;都要恭喜在座的各位程序员又熬过了一年&#xff01; ①加班篇 2023年&#xff0c;你完成了 132个需求…

you-get,一个超强的 Python 库

你好&#xff0c;我是坚持分享干货的 EarlGrey&#xff0c;翻译出版过《Python编程无师自通》、《Python并行计算手册》等技术书籍。 如果我的分享对你有帮助&#xff0c;请关注我&#xff0c;一起向上进击。 现在在线视频超火爆&#xff0c;可是我还是更倾向于将视频下载至本地…

C++-手把手教你模拟实现string

1.string的成员变量 模拟实现string只需要三个成员变量&#xff0c;capacity&#xff0c;size&#xff0c;_str&#xff0c;也就是容量&#xff0c;数据大小&#xff0c;指向字符串的指针。 2.string的构造函数 2.1 使用字符串构造 使用字符串来构造一个string类的对象&…

463. Island Perimeter(岛屿的周长)

问题描述 给定一个 row x col 的二维网格地图 grid &#xff0c;其中&#xff1a;grid[i][j] 1 表示陆地&#xff0c; grid[i][j] 0 表示水域。 网格中的格子 水平和垂直 方向相连&#xff08;对角线方向不相连&#xff09;。整个网格被水完全包围&#xff0c;但其中恰好有…

LTP/pyltp安装和使用教程

文章目录 LTP介绍分句分词加载外部词典个性化分词 词性标注命名实体识别NER依存句法分析语义角色标注 LTP介绍 官网&#xff1a;https://ltp.ai/ 下载可以到官网的下载专区&#xff1a;https://ltp.ai/download.html 语言技术平台&#xff08;Language Technology Platform&am…

Codeforces Round 925 (Div. 3)(A,B,C,D,E,F,G)

比赛链接 这场打的很顺&#xff0c;感觉难度和 div 4 差不多&#xff0c;不是很难。D题稍微考了考同余的性质&#xff0c;E题直接模拟过程即可&#xff0c;F题也可以暴力模拟或者拓扑排序&#xff0c;G题是个数学题&#xff0c;是个简单隔板法。A到F题都可以直接模拟就有点离谱…

嵌入式 day23

链接命令 建立链接文件&#xff1a;ln 命令 命令名称&#xff1a;ln 命令所在路径&#xff1a;/bin/ln 执行权限&#xff1a;所有用户 语法&#xff1a;ln -s [原文件] [目标文件] -s 创建软链接 功能描述&#xff1a;生成链接文件 范例&#xff1…

[嵌入式系统-24]:RT-Thread -11- 内核组件编程接口 - 网络组件 - TCP/UDP Socket编程

目录 一、RT-Thread网络组件 1.1 概述 1.2 RT-Thread支持的网络协议栈 1.3 RT-Thread如何选择不同的网络协议栈 二、Socket编程 2.1 概述 2.2 UDP socket编程 2.3 TCP socket编程 2.4 TCP socket收发数据 一、RT-Thread网络组件 1.1 概述 RT-Thread 是一个开源的嵌入…

不错的PMO 2024建设规划长图

公众号"PMO前沿"是国内最大的PMO组织&#xff0c;经常各个城市举办线下线上活动&#xff0c;很多专家&#xff0c;相当赞&#xff0c;而且每天还分享不少文章&#xff08;春节都不停更&#xff0c;相当感动&#xff09;&#xff0c;建议关注。看到一个不错的PMO 组织…

win32汇编获取系统信息

.data fmt db "页尺寸&#xff1a;%d",0 db "" lpsystem SYSTEM_INFO <?> szbuf db 200 dup(0) .const szCaption db 系统信息,0 .code start: invoke GetSystemInfo,addr lpsystem …

四川古力未来科技公司抖音小店:靠谱的新电商之旅

随着互联网的飞速发展&#xff0c;电商行业日新月异&#xff0c;新兴平台如抖音小店正成为消费者新的购物天堂。在众多抖音小店中&#xff0c;四川古力未来科技公司的店铺以其独特的魅力吸引了众多消费者的目光。那么&#xff0c;四川古力未来科技公司抖音小店到底靠不靠谱呢&a…

【数据结构】17 二叉树的建立

二叉树的建立 由于树是非线性结构&#xff0c;创建一颗二叉树必须首先确定树中结点的输入顺序&#xff0c;常用方法是先序创建和层序创建。 层序创建所用的节点输入序列是按数的从上至下从左到右的顺序形成的各层的空结点输入数值0。在构造二叉树过程中需要一个队列暂时存储各…

鸿蒙开发系列教程(二十三)--List 列表操作(2)

列表样式 1、设置内容间距 在列表项之间添加间距&#xff0c;可以使用space参数&#xff0c;主轴方向 List({ space: 10 }) { … } 2、添加分隔线 分隔线用来将界面元素隔开&#xff0c;使单个元素更加容易识别。 startMargin和endMargin属性分别用于设置分隔线距离列表侧…
最新文章