Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

rabbitmq的消息确认机制

确认消息是否发送给交换机

配置
server:
  port: 11111
spring:
  rabbitmq:
    port: 5672
    host: 192.168.201.81
    username: admin
    password: 123
    publisher-confirm-type: correlated
编码RabbitTemplate.ConfirmCallback

ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true);

配置类

rabbitTemplate.setConfirmCallback(ConfirmCallback confirmCallback);

CorrelationData:

1、消息ID需要封装到CorrelationData
2、correlationData.getFuture().addCallback(…)是一个回调函数:决定了每个业务处理confirm成功或失败的逻辑。

@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){

    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    log.debug("Rabbitmq配置启动成功,RabbitTemplate:{}设置完成",rabbitTemplate);
    rabbitTemplate.setMessageConverter(messageConverter());
    rabbitTemplate.setConfirmCallback(new RabbitConfirmCallbackImpl());
    return rabbitTemplate;
}

/**
 * 确保消息是否发送到交换机
 */
class RabbitConfirmCallbackImpl implements RabbitTemplate.ConfirmCallback{
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
         log.warn("****Exchange callback-检验是否发送成功********");
         log.warn("correlationData->相关数据:{}",correlationData);
         log.warn("ack->Exchange响应:{}",ack);
         log.warn("cause->错误原因:{}",cause);
 }
}
测试发送

测试向交换机发送数据,测试交换机是否成功收到。

假设给一个错误的Exchange
@Service
public class MqServiceImpl implements IMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(String msg) {
         //错误的Exchange名称,实际名称为:ssc_sc_routing_exchange
        final String EXCHANGE = "ssc_sc_routing_exchangex";
        final String ROUTING_KEY = "ssc_sc_routing_key";

        rabbitTemplate.convertAndSend(
                EXCHANGE,
                ROUTING_KEY,
                msg
        );
    }
}

image-20231218164926379

如果Exchange正确
@Override
public void sendMessage(String msg) {
    final String EXCHANGE = "ssc_sc_routing_exchange";
    final String ROUTING_KEY = "ssc_sc_routing_key";
    rabbitTemplate.convertAndSend(
            EXCHANGE,
            ROUTING_KEY,
            msg
    );
}

image-20231218164425398

确认消息是否从交换机发送到队列RabbitTemplate.ReturnsCallback

设置ResturnsCallback

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。

配置文件
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true     #检查是否绑定到队列中
配置
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitConfirmReturnCallbackImpl());

class RabbitConfirmReturnCallbackImpl implements RabbitTemplate.ReturnsCallback{
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
       log.warn("message:{}",returnedMessage.getMessage());
       log.warn("exchange:{}",returnedMessage.getExchange());
       log.warn("replyCode:{}",returnedMessage.getReplyCode());
       log.warn("replyText:{}",returnedMessage.getReplyText());
       log.warn("routingKey:{}",returnedMessage.getRoutingKey());
    }
}
测试

修改routingkey的值,让交换机不能路由到指定Queue。

package com.wnhz.ssc.cloud.mq.service.impl;

import com.wnhz.ssc.cloud.mq.service.IMqService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MqServiceImpl implements IMqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(String msg) {
        final String EXCHANGE = "ssc_sc_routing_exchange";
        //修改routingkey,给一个错误的值,正确值为: ssc_sc_routing_key
        final String ROUTING_KEY = "ssc_sc_routing_keyx";
        rabbitTemplate.convertAndSend(
                EXCHANGE,
                ROUTING_KEY,
                msg
        );
    }
}

image-20231218165907610

返回message:

message:(
Body:'"hello confirm call back"'
    MessageProperties
    [
        headers={
                __TypeId__=java.lang.String
                 },
        contentType=application/json,
        contentEncoding=UTF-8,
        contentLength=0,
        receivedDeliveryMode=PERSISTENT,
        priority=0,
        deliveryTag=0
   ]
)

消费确认信息

消费监听模式
  • Simple模式

    image-20231218155921090

    Simple模式即SMLC。simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。

  • Direct模式

    image-20231218160106458

    压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。

Message对象结构

Message对象的结构,

消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

image-20231218173406699

image-20231218173218649

消息确认方式
  1. AcknowledgeMode.AUTO:自动确认。
  2. AcknowledgeMode.NONE:根据情况确认。
  3. AcknowledgeMode.MANUAL:手动确认。

direct模式:

image-20231218173612719

simple模式:

image-20231218173833493
消费端监听发送
@RabbitListener(queues = "data_confirm_queue")
@Override
public void receiveBookFromMq(Message message, Channel channel, Book book) {

    log.debug("message:{}", message);
    log.debug("message.getMessageProperties().getHeaders()===>{}",
            message.getMessageProperties().getHeaders());
    log.debug("[order消费者:]接收到消息: {}", book);

    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("消息队列确认: {},{}",
                message.getMessageProperties().getConsumerQueue(), "接收到回调方法");
    } catch (IOException e) {
        e.printStackTrace();
    }
}
手动确认方式
  1. Basic.Ack 命令:用于确认当前消息。
  2. Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。
  3. Basic.Reject 命令:用于拒绝当前消息
channel.basicAck(long deliveryTag,boolean multiple)

basicAck 方法用于确认当前消息。

public void basicAck(long deliveryTag, boolean multiple) throws IOException {
    this.delegate.basicAck(deliveryTag, multiple);
}
  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

  • multiple:为了减少网络流量,手动确认可以被批处理。

    • true: 代表批量应答 channel 上未应答的消息,比当前tag小的未应答的也一并应答(如5,6,7未应答)。

    image-20240221084206673

    • false: 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

      image-20240221084249669

basicNack

basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现

public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
    this.delegate.basicNack(deliveryTag, multiple, requeue);
}
basicReject(long deliveryTag, boolean requeue)

basicNack 方法用于否定当前消息。basicReject 方法用于明确拒绝当前的消息而不是确认。

public void basicReject(long deliveryTag, boolean requeue) throws IOException {
    this.delegate.basicReject(deliveryTag, requeue);
}

消息遗弃或入队,一般建议消息丢弃重新发。

  • requeue: true :重回队列,false :丢弃,我们在nack方法中必须设置 false,否则重发没有意义。
出现异常的解决方案
package com.wnhz.mq.order.service.impl;

import com.rabbitmq.client.Channel;
import com.wnhz.domain.Book;
import com.wnhz.mq.order.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Slf4j
@Service
public class OrderServiceImpl implements IOrderService {

    private void buildException(){
        throw  new RuntimeException("[消费者:] 消费出现异常......");
    }

    @RabbitListener(queues = "data_confirm_queue")
    @Override
    public void receiveBookFromMq(Message message, Channel channel, Book book) {
        try {
            //制造异常测试
            buildException();
            log.debug("message:{}", message);
            log.debug("message.getMessageProperties().getHeaders()===>{}",
                    message.getMessageProperties().getHeaders());
            log.debug("[order消费者:]接收到消息: {}", book);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

            log.debug("消息队列确认: {},{}",
                    message.getMessageProperties().getConsumerQueue(), "接收到回调方法");
        } catch (Exception e) {
          log.debug("消费异常: {}",e.getMessage());
            try {
                log.debug("尝试丢弃:{}消息.....................",book);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/398821.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

突出最强算法模型——回归算法 !!

文章目录 1、特征工程的重要性 2、缺失值和异常值的处理 (1)处理缺失值 (2)处理异常值 3、回归模型的诊断 (1)残差分析 (2)检查回归假设 (3)Cooks 距离 4、学…

汽车电子论文学习---电动汽车用高功率密度碳化硅电机控制器研究

关键重点: sic的特点:耐压高、开关速度快、开关损耗小;采用sic的控制器,损耗降低70%,续航里程提高5%。sic的模块并联设计难度高于IGBT模块;多芯片并联导致热耦合问题、温升不均,导致部分芯片率…

合纵连横 – 以 Flink 和 Amazon MSK 构建 Amazon DocumentDB 之间的实时数据同步

在大数据时代,实时数据同步已经有很多地方应用,包括从在线数据库构建实时数据仓库,跨区域数据复制。行业落地场景众多,例如,电商 GMV 数据实时统计,用户行为分析,广告投放效果实时追踪&#xff…

【git】提交信息写错了,使用 amend 或者 reset 修改最近一次的提交信息 ,修改上上次/以前的提交信息

如果你的提交信息写错了,比如下面,你想修改【初始化项目】这5个字 修改最近一次的提交新的两个办法 (1)使用 reset 把这个提交重置,然后重新提交,reset 的使用方法请参考这篇文章。但是 reset 这种方法只能…

计算机设计大赛 深度学习人体跌倒检测 -yolo 机器视觉 opencv python

0 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 **基于深度学习的人体跌倒检测算法研究与实现 ** 该项目较为新颖,适合作为竞赛课题方向,学长非常推荐! 🥇学长这里给一个题目综合评分(每项满…

消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

九、发布确认 1、发布确认逻辑 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给…

设计模式二:代理模式

1、什么是动态代理 可能很多小伙伴首次接触动态代理这个名词的时候,或者是在面试过程中被问到动态代理的时候,不能很好的描述出来,动态代理到底是个什么高大上的技术。不方,其实动态代理的使用非常广泛,例如我们平常使…

华为配置直连三层组网直接转发示例

华为配置直连三层组网直接转发示例 组网图形 图1 配置直连三层组网直接转发示例组网图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件扩展阅读 业务需求 企业用户接入WLAN网络,以满足移动办公的最基本需求。且在覆盖区域内移动发生漫游时&#xff…

无人机的视频图传技术有哪些?

在操控无人机时,视频图传技术显得尤为关键。通过这项技术,无人机的摄像头所捕捉的画面能实时回传至遥控器,使操作者全面掌握无人机的拍摄情况。同时,无人机图传技术也是衡量无人机性能的重要标准,它关乎飞行距离与时间…

SG-8201CJA(汽车可编程晶体振荡器)

爱普生的SG-8021CJA是一款符合AEC-Q100标准的晶体振荡器,专为要求苛刻的汽车/ADAS应用(如激光雷达和相机ECU)而设计。它采用爱普生的内部低噪声小数NPLL,输出 频率高达170MHz,相位抖动小于1/25,稳定性比之前…

基于多种机器学习模型的西北地区蒸散发模拟与趋势分析_季鹏_2023

基于多种机器学习模型的西北地区蒸散发模拟与趋势分析_季鹏_2023 摘要关键词 1 资料和方法1. 1 研究区域与观测数据1. 2 机器学习模型构建与验证方法1. 3 SHAP 可解释性方法 2 主要结果2. 1 不同模型的模拟性能和泛化能力2. 2 不同模型的可解释性分析2. 3 5 km 分辨率格点蒸散发…

Qt _day1

1.思维导图 2.设计一个简单登录界面 #include "mywidget.h"MyWidget::MyWidget(QWidget *parent): QWidget(parent) {this->setWindowTitle("原神启动"); // this->setStyleSheet("background-color:rgb(255,184,64)");this->setStyl…

游戏行业洞察:分布式开源爬虫项目在数据采集与分析中的应用案例介绍

前言 我在领导一个为游戏行业巨头提供数据采集服务的项目中,我们面临着实时数据需求和大规模数据处理的挑战。我们构建了一个基于开源分布式爬虫技术的自动化平台,实现了高效、准确的数据采集。通过自然语言处理技术,我们确保了数据的质量和…

剪辑视频调色软件有哪些 剪辑视频软件哪个最好 剪辑视频怎么学 剪辑视频的方法和步骤 会声会影2024 会声会影视频制作教程

看了很多调色教程,背了一堆调色参数,可最终还是调不出理想的效果。别再怀疑自己了,不是你的剪辑技术不行,而是剪辑软件没选对。只要掌握了最基本的调色原理,一款适合自己的视频剪辑软件是很容易出片的。 有关剪辑视频…

ABAQUS应用04——集中质量的添加方法

文章目录 0. 背景1. 集中质量的编辑2. 约束的设置3. 总结 0. 背景 混塔ABAQUS模型中,机头、法兰等集中质量的设置是模型建立过程中的一部分,需要研究集中质量的添加。 1. 集中质量的编辑 集中质量本身的编辑没什么难度,我已经用Python代码…

Bert-VITS-2 效果挺好的声音克隆工具

持中日英三语训练和推理。内置干声分离,切割和标注工具,开箱即用。请点下载量右边的符号查看镜像所对应的具体版本号。 教程地址: sjj​​​​​​​CodeWithGPU | 能复现才是好算法CodeWithGPU | GitHub AI算法复现社区,能复现…

Python classmethod函数

在Python编程中,classmethod()函数是一个内置函数,用于定义类方法。类方法是绑定到类而不是实例的方法,可以通过类名直接调用,并且可以访问类的属性和方法。本文将深入探讨Python中的classmethod()函数,包括基本用法、…

【Linux】自主WEB服务器实现

自主web服务器实现 1️⃣构建TcpServer2️⃣构建HttpServer3️⃣构建HttpRequest和HttpResponseHttp请求报文格式Http相应报文读取、处理请求&构建响应读取请求中的一行读取请求中需要注意的点 4️⃣CGI模式判断是否需要用CGI处理请求构建任务&线程池管理 5️⃣实验结果…

使用静态CRLSP配置MPLS TE隧道

正文共:1591 字 13 图,预估阅读时间:4 分钟 静态CRLSP(Constraint-based Routed Label Switched Paths,基于约束路由的LSP)是指在报文经过的每一跳设备上(包括Ingress、Transit和Egress&#xf…

数据结构:跳表讲解

跳表 1.什么是跳表-skiplist1.1简介1.2设计思路 2.跳表的效率分析3.跳表实现3.1类成员设计3.2查找3.3插入3.4删除3.5完整代码 4.skiplist跟平衡搜索树和哈希表的对比 1.什么是跳表-skiplist 1.1简介 skiplist本质上也是一种查找结构,用于解决算法中的查找问题&…
最新文章