Spring Boot整合RabbitMQ之路由模式(Direct)

RabbitMQ中的路由模式(Direct模式)应该是在实际工作中运用的比较多的一种模式了,这个模式和发布与订阅模式的区别在于路由模式需要有一个routingKey,在配置上,交换机类型需要注入DirectExchange类型的交换机bean对象。在交换机和队列的绑定过程中,绑定关系需要在绑定一个路由key。由于在实际的工作中不大可能会用自动确认的模式,所以我们在整合路由模式的过程中,依然采用发送消息双确认机制和消费端手动确认的机制来保证消息的准确送达与消息防丢失。

1. 添加配置

在配置文件中,配置rabbitmq的相关账号信息,开启消息发送回调机制,配置文件其实和发布订阅模式是一样的。配置详情如下:

server:
  port: 10001

spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # 发送者开启 return 确认机制
    publisher-returns: true
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated

2. 创建配置类

    创建配置类RabbitMQConfig,用于声明交换机、队列,建立队列和交换机的绑定关系,注入RabbitTemplate的bean对象。配置类详情如下:
package com.study.rabbitmq.config;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "direct-order-exchange";
    public static final String SMS_QUEUE = "sms-direct-queue";
    public static final String EMAIL_QUEUE = "email-direct-queue";
    public static final String WECHAT_QUEUE = "wechat-direct-queue";

    /**
     * 1.
     * 声明交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        /**
         * directExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
         * 3. 是否自动删除 false:不自动删除 true:自动删除
         */
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 2.
     * 声明队列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化 true:持久化 false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }

    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }

    /**
     * 3.
     * 队列与交换机绑定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }

    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");
    }

    /**
     * 将自定义的RabbitTemplate对象注入bean容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启消息推送结果回调
        rabbitTemplate.setMandatory(true);
        //设置ConfirmCallback回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("==============ConfirmCallback start ===============");
                log.info("回调数据:{}", correlationData);
                log.info("确认结果:{}", ack);
                log.info("返回原因:{}", cause);
                log.info("==============ConfirmCallback end =================");
            }
        });
        //设置ReturnCallback回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("==============ReturnCallback start ===============");
                log.info("发送消息:{}", JSONUtil.toJsonStr(message));
                log.info("结果状态码:{}", replyCode);
                log.info("结果状态信息:{}", replyText);
                log.info("交换机:{}", exchange);
                log.info("路由key:{}", routingKey);
                log.info("==============ReturnCallback end =================");
            }
        });
        return rabbitTemplate;
    }
}

3. 消费者配置

    在消费者项目的配置文件中开启手动确认,配置详情如下:
server:
  port: 10002

spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        # 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
        acknowledge-mode: manual

4. 创建消费者

分别创建三个消费者,DirectEmailConsumer、DirectSmsConsumer、DirectWechatConsumer来监听对应的队列,有消息后进行消费,三个消费者大同小异,分别如下

4.1 DirectEmailConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:54
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //监听队列
public class DirectEmailConsumer {

    //标记消费者逻辑执行方法
    @RabbitHandler
    public void emailMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("Email direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.2 DirectSmsConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //监听队列
public class DirectSmsConsumer {

    @RabbitHandler
    public void smsMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("sms direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.3 DirectWechatConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author chaoxian.wu
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //监听队列
public class DirectWechatConsumer {

    @RabbitHandler
    public void wechatlMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("wechat direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

以上就是全部的代码部分,接下来我们在进入测试,看看实际效果如何,先发布一个routingKey=sms的消息,查看是不是只有对应的一个队列中接收到消息,消息发送详情:

package com.study.rabbitmq;

import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        for (long i = 1; i < 2; i++) {
            //交换机名称
            String exchangeName = "direct-order-exchange";
            //路由key
            String routingKey = "sms";
            Order order = buildOrder(i);
            orderService.createOrder(order, routingKey, exchangeName);
        }
    }

    private Order buildOrder(long id) {
        Order order = new Order();
        order.setRequestId(id);
        order.setUserId(id);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setAmount(10L);
        order.setGoodsNum(1);
        order.setTotalAmount(10L);
        return order;
    }
}

我们登录rabbitmq管理后台查看下,只有sms-direct-queue这个队列有一条消息,效果如下:

我们启动消费者,看下是不是只有监听了sms-direct-queue这个队列的消费者有消费日志,效果如下:

再发一条routingKey=email的消息,消费的日志,效果图示如下

到此其实已经springboot整合rabbitmq的路由模式结束了,这种模式在工作中还是比较常见的,我们演示的是单点的效果,实际工作中,不大可能会使用服务单点部署,现在都讲究服务的高可用,就得服务集群部署,又会涉及到消息重复消费的问题需要处理,我个人觉得,遇到重复消费问题,我第一时间想到的就是分布式锁,哈哈~。但是锁什么呢?肯定是消息中的具备唯一性的属性。来达到防止消息的重复消费。

整个过程中,其实还存在一个小问题没有验证,就是ReturnCallback回调机制没有触发,因为这个得发生在交换机将消息发送到队列的时候失败才会触发,那么我们就发送一个不存在的routingKey就可以触发了,我们发送一个routingKey=duanxin的消息,这个肯定不会发送成功,我们通过断点来看看效果,效果如下:

然后我们常见的就全部整合完成了,当然,开启了双确认机制,虽然我们可以检测到消息投送的结果,然后可以针对投送失败的结果进行预警。但是开启了这个操作,就必然会对消息的处理效率产生影响。所以还得根据实际业务场景而定是否需要使用这个确认机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/95880.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

avalonia、WPF使用ScottPlot动态显示ECG心电图

文章目录 avalonia、WPF使用ScottPlot动态显示ECG心电图实现效果&#xff0c;动态效果懒得录视频了安装代码部分UpdateData方法就是用来更新心电图表的方法&#xff0c; 根据消息队列数据去更新是视图中的ScottPlot 图表 avalonia、WPF使用ScottPlot动态显示ECG心电图 avalonia…

【随笔】如何使用阿里云的OSS保存基础的服务器环境

使用阿里云OSS创建一个存储仓库&#xff1a;bucket 在Linux上下载并安装阿里云的ossutil工具 // 命令行&#xff0c;是linux环境 3. 安装ossutil。sudo -v ; curl https://gosspublic.alicdn.com/ossutil/install.sh | sudo bash 说明:安装过程中&#xff0c;需要使用解压工具…

Next.js基础语法

Next.js 目录结构 入口App组件&#xff08;_app.tsx&#xff09; _app.tsx是项目的入口组件&#xff0c;主要作用&#xff1a; 可以扩展自定义的布局&#xff08;Layout&#xff09;引入全局的样式文件引入Redux状态管理引入主题组件等等全局监听客户端路由的切换 ts.config…

什么是光流传感器

传感器 文章目录 传感器前言一、光流传感器二、px4FLOW 前言 光流利用的是图像的变化处理&#xff0c;用于检测地面的状态&#xff0c;从而监测飞机的移动&#xff1b;主要用于保持飞机的水平位置&#xff0c;以及在室内实现定高和定点飞行。 其实光流是数字图像处理理论的一部…

【MySQL】MySQL里的用户账户和角色是什么?如何管理?

用户&#xff08;user&#xff09;验证和授权创建用户账户连接服务器查看用户账户设置 角色&#xff08;role&#xff09;创建角色 操作用户帐户和角色重命名删除 感谢 &#x1f496; 用户&#xff08;user&#xff09; 在MySQL中&#xff0c;用户是数据库访问的主要实体。每个…

vscode vue3自定义自动补全

敲代码多了&#xff0c;发现重发动作很多&#xff0c;于是还是定义自动补全代码吧——懒是第一生产力&#xff01; 1&#xff0c;Ctrl Shift P打开快捷命令行&#xff1a;找到下面这个 2&#xff0c;然后找到ts&#xff1a; 里面给了demo照着写就行 // "Print to conso…

可拖动表格

支持行拖动&#xff0c;列拖动 插件&#xff1a;sortablejs UI: elementUI <template><div><hr style"margin: 30px 0;"><div><!-- 数据里面要有主键id&#xff0c; 否则拖拽异常 --><h2 style"margin-bottom: 30px&qu…

uniapp 项目实践总结(二)从零开始搭建一个项目

导语:本篇文章主要是项目方面的技术开发总结,新建一个项目可以选择使用可视化界面,也可以使用命令行搭建。 目录 可视化界面命令行搭建安卓开发环境苹果开发环境可视化界面 安装软件 使用官方推荐的 HbuilderX 软件,开发方式比较简单,内置相关环境以及终端,无需配置 no…

CSS按钮-跑马灯边框

思路很简单&#xff0c;实现方法有很多很多。但是大体思路与实现方法都类似&#xff1a;渐变色 动画&#xff0c;主要区别在动画的具体实现 0、HTML 结构 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><titl…

【uniapp 配置启动页面隐私弹窗】

为什么需要配置 原因 根据工业和信息化部关于开展APP侵害用户权益专项整治要求&#xff0c;App提交到应用市场必须满足以下条件&#xff1a; 1.应用启动运行时需弹出隐私政策协议&#xff0c;说明应用采集用户数据 2.应用不能强制要求用户授予权限&#xff0c;即不能“不给权…

[C/C++]天天酷跑游戏超详细教程-上篇

个人主页&#xff1a;北海 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏✨收录专栏&#xff1a;C/C&#x1f91d;希望作者的文章能对你有所帮助&#xff0c;有不足的地方请在评论区留言指正&#xff0c;大家一起学习交流&#xff01;&#x1f9…

WebGL模型矩阵

前言&#xff1a;依赖矩阵库 WebGL矩阵变换库_山楂树の的博客-CSDN博客 先平移&#xff0c;后旋转的模型变换&#xff1a; 1.将三角形沿着X轴平移一段距离。 2.在此基础上&#xff0c;旋转三角形。 先写下第1条&#xff08;平移操作&#xff09;中的坐标方程式。 等式1&am…

2023年DAMA-CDGA/CDGP数据治理认证线上班到这里

DAMA认证为数据管理专业人士提供职业目标晋升规划&#xff0c;彰显了职业发展里程碑及发展阶梯定义&#xff0c;帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力&#xff0c;促进开展工作实践应用及实际问题解决&#xff0c;形成企业所需的新数字经济下的核心职业…

Fooocus:一个简单且功能强大的Stable Diffusion webUI

Stable Diffusion是一个强大的图像生成AI模型&#xff0c;但它通常需要大量调整和提示工程。Fooocus的目标是改变这种状况。 Fooocus的创始人Lvmin Zhang&#xff08;也是 ControlNet论文的作者&#xff09;将这个项目描述为对“Stable Diffusion”和“ Midjourney”设计的重新…

超简单演示Android地图开发应用实例

概述 手机地图开发应用广泛&#xff0c;本实例演示了在手机上显示各种地图的方法。比如3D矢量地图、卫星地图、交通地图、夜景地图等在手机上的显示。可以根据手势自由做地图缩放&#xff0c;地图旋转等操作。代码简洁、实用&#xff0c;可以帮助你快速上手地图开发。 详细 …

【附安装包】Vm虚拟机安装Linux系统教程

软件下载 软件&#xff1a;Linux版本&#xff1a;18.0.4语言&#xff1a;简体中文大小&#xff1a;1.82G安装环境&#xff1a;VMware硬件要求&#xff1a;CPU2.0GHz 内存4G(或更高&#xff09;下载通道①丨百度网盘&#xff1a;1.Vm虚拟机15.5下载链接&#xff1a;https://pan…

视频智能分析平台EasyCVR安防视频汇聚平台助力森林公园防火安全的应用方案

一、研发背景 随着经济的发展和人们生活水平的提高&#xff0c;越来越多的人喜欢在周末去周边的森林公园旅游&#xff0c;享受大自然的美景&#xff0c;并进行野炊和烧烤等娱乐活动。然而&#xff0c;近年来由于烟蒂和烧烤碳渣等人为因素&#xff0c;森林公园火灾频繁发生。森…

【核磁共振成像】并行采集MRI

目录 一、并行成像二、SENSE重建三、SMASH重建四、灵敏度校准五、AUTO-SMASH和VD-AUTO-SMASH六、GRAPPA重建七、SPACE RIP重建算法八、PILS重建算法九、PRUNO重建算法十、UNFOLD算法 一、并行成像 并行MR成像(pMRI):相位阵列接受线圈不但各有自己专用的接受通道&#xff0c;而且…

Oracle 本地客户端连接远程 Oracle 服务端并使用 c# 连接测试

这里写自定义目录标题 前言Oracle 客户端安装先决条件下载 Oracle 客户端Oracle 客户端环境变量配置 PL/SQLPL/SQL 下载PL/SQL 配置 配置远程连接tnsnames.ora 文件配置 使用 PL/SQL 连接远程数据库使用 C# 远程访问 Oracle 数据库结语 前言 最近有一个需要使用本地的 Oracle …

Linux知识点 -- 网络基础(一)

Linux知识点 – 网络基础&#xff08;一&#xff09; 文章目录 Linux知识点 -- 网络基础&#xff08;一&#xff09;一、网络发展二、协议1.OSI七层模型2.TCP/IP五层&#xff08;或四层&#xff09;模型 三、网络传输基本流程1.局域网中的两台主机通信流程2.跨网段的两台主机间…