Spring RabbitMQ那些事(3-消息可靠传输和订阅)

目录

  • 一、序言
  • 二、生产者确保消息发送成功
    • 1、为什么需要Publisher Confirms
    • 2、哪些消息会被确认处理成功
  • 三、消费者保证消息被处理
  • 四、Spring RabbitMQ支持代码示例
    • 1、 application.yml
    • 2、RabbigtMQ配置
    • 3、可靠生产者配置
    • 4、可靠消费者配置
    • 5、测试用例

一、序言

在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。

消息是有可能丢的,比如生产者在发送消息时broker服务挂了,消息没有来得及落盘,这时消息就彻底丢了。

保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。


二、生产者确保消息发送成功

1、为什么需要Publisher Confirms

在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。

在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。

最直接的解决方案是通过事务,但是通过事务有两个问题:

  • 事务阻塞:发布者必须等待Broker处理完每条消息。
  • 事务很重:每次提交都会要求触发fsync(),强制磁盘,这个过程需要花很长的时间。

备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。

在这里插入图片描述

而通过Publisher Confirm机制,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。

2、哪些消息会被确认处理成功

当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:

  • 无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。
  • 非持久化消息在入队时会被确认。
  • 持久化消息当持久化到磁盘或者被消费者消费时会被确认。

三、消费者保证消息被处理

消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。


四、Spring RabbitMQ支持代码示例

1、 application.yml

server:
  port: 8080
spring:
  rabbitmq:
    addresses: localhost:5672
    username: admin
    password: admin
    virtual-host: /
    publisher-returns: true
    publisher-confirm-type: correlated
    listener:
      type: simple
      simple:
        acknowledge-mode: manual
        concurrency: 5
        max-concurrency: 20
        prefetch: 5
    template:
      mandatory: true

备注:

  • 这里一定要设置spring.rabbitmq.publisher-returnstrue,并且设置spring.rabbitmq.publisher-confirm-typecorrelated,同时设置spring.rabbitmq.template.mandatorytrue
  • 上面我们将消费者的确认模式改为了手动确认

2、RabbigtMQ配置

@Configuration
public class RabbitReliableTransportConfig {

	/**
	 * RabbitTemplate消息转换器配置,自动将对象转换为json字符串
	 *
	 * @return
	 */
	@Bean
	public MessageConverter jackson2JsonMessageConverter() {
		Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
		messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());
		return messageConverter;
	}

	@Bean
	public Queue reliableQueue() {
		return QueueBuilder.durable("reliable-queue").build();
	}
}

3、可靠生产者配置

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqReliableProducer {

	private final RabbitTemplate rabbitTemplate;

	public void sendReliableMsg(String body) {
		// 发送可靠消息
		ReliableMsgDTO reliableMsgDTO = ReliableMsgDTO.builder().body(body).build();
		CorrelationData correlationData = new CorrelationData();
		rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);

		// 发送确认逻辑
		CompletableFuture<Confirm> future = correlationData.getFuture().completable();
		future.whenComplete((confirm, throwable) -> {
			if (confirm.isAck()) {
				log.info("消息已经被成功发送, 消息内容:{}", JSON.toJSONString(reliableMsgDTO));
				return;
			}

			log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);
			// 5秒后再发送
			LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);
			rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);
		});
	}
}

4、可靠消费者配置

@Slf4j
@Component
public class RabbitMQReliableConsumer {

	@RabbitListener(queues = "reliable-queue")
	public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
		channel.basicAck(tag, false);
		// channel.basicNack(tag, false, false);
		log.info("Message received from queue, message body: {}", JSON.toJSONString(reliableMsgDTO));
	}
}

备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。

5、测试用例

测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。

2024-01-20 18:13:11.399  INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer       : 消息已经被成功发送, 消息内容:{"body":"hello"}
2024-01-20 18:13:11.399  INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer       : Message received from queue, message body: {"body":"hello"}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/337286.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VisualSVN Server实战

文章目录 一、实战概述二、实战步骤&#xff08;一&#xff09;下载VisualSVN Server&#xff08;二&#xff09;安装VisualSVN Server&#xff08;三&#xff09;使用VisualSVN Server1、新建仓库&#xff08;1&#xff09;新建Repository&#xff08;2&#xff09;选择仓库类…

Python 折线图的绘制(Seaborn篇-04)

Python 折线图的绘制(Seaborn篇-04)         🍹博主 侯小啾 感谢您的支持与信赖。☀️ 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹…

基于python socket实现TCP/UDP通信

两个应用程序如果需要进行通讯最基本的一个前提就是能够唯一的标示一个进程&#xff0c;我们知道IP层的ip地址可以唯一标示主机&#xff0c;而TCP层协议和端口号可以唯一标示主机的一个进程&#xff0c;这样我们可以利用ip地址&#xff0b;协议&#xff0b;端口号唯一标示网络中…

论文阅读_CogTree_推理的认知树

英文名称: From Complex to Simple: Unraveling the Cognitive Tree for Reasoning with Small Language Models中文名称: 从复杂到简单&#xff1a;揭示小型语言模型推理的认知树链接: http://arxiv.org/abs/2311.06754v1代码: https://github.com/alibaba/EasyNLP作者: Junbi…

深入解析 JavaScript 中的 setTimeout() 和 setInterval()

&#x1f9d1;‍&#x1f393; 个人主页&#xff1a;《爱蹦跶的大A阿》 &#x1f525;当前正在更新专栏&#xff1a;《VUE》 、《JavaScript保姆级教程》、《krpano》、《krpano中文文档》 ​ ​ ✨ 前言 setTimeout() 和 setInterval() 是 JavaScript 中非常常用的定时函数…

Django从入门到精通(一)

目录 一、Django环境搭建与命令 1.1、安装 1.2、命令行 创建项目 编写代码 运行 app概念 1.3、Pycharm创建项目 1.4、虚拟环境 创建虚拟环境 - 命令行 介绍 操作 基本问题 Pycharm 项目虚拟环境 django虚拟环境【安装django最新版本】 django虚拟环境【安装指…

frida https抓包

web端导入证书、https代理即可解决大部分需求&#xff0c;但是&#xff0c;有些app需要处理ssl pinning验证。 废话不多说。frida处理ssl pin的步骤大体如下。 安装python3.x,并在python环境中安装frida&#xff1a; pip install frida pip install frida-tools下载frida-se…

【设计模式】你知道游戏SL大法是什么设计模式吗?

什么是备忘录模式&#xff1f; 老规矩&#xff0c;我们先来看看备忘录模式 (Memento) 的定义&#xff1a;在不破坏封装性的前提下&#xff0c;捕获一个对象的内部状态&#xff0c;并在该对象之外保存这个状态。这样以后就可将该对象恢复到原先保存的状态。 它的UML类图如下&a…

Django(八)

1. 管理员操作 1.1 添加 from django.shortcuts import render, redirectfrom app01 import models from app01.utils.pagination import Paginationfrom django import forms from django.core.exceptions import ValidationError from app01.utils.bootstrap import BootStr…

【MySQL】where和having的区别

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a;数据库 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 用途: 使用位置: 操作对象: 聚合函数: 示例&#xff1a; 结语 我的其他博客 前言 数据库中的 WHERE 和 HAVING 子句在 SQL 查…

排序算法整理

快速排序 C实现 void fastStore(int *a, int start, int end){if(start>end)return ;int leftstart;int rightend;int tempa[left];//设置基准值tempwhile(left < right) //左指针的位置一定小于右指针的位置{while(a[right]>temp && left < right) //左…

RabbitMQ-消息延迟

一、死信交换机 1、描述 一个队列接收到的消息有过期时间&#xff0c;消息过期之后&#xff0c;如果配置有死信队列&#xff0c;消息就会进去死信队列。 2、图解 3、过程 当生产者将消息发送到exchange1&#xff0c;然后交换机将消息路由到队列queue1&#xff0c;但是队列que…

快速入门:使用 Gemini Embeddings 和 Elasticsearch 进行向量搜索

Gemini 是 Google DeepMind 开发的多模态大语言模型家族&#xff0c;作为 LaMDA 和 PaLM 2 的后继者。由 Gemini Ultra、Gemini Pro 和 Gemini Nano 组成&#xff0c;于 2023 年 12 月 6 日发布&#xff0c;定位为 OpenAI 的竞争者 GPT-4。 本教程演示如何使用 Gemini API 创建…

浅析Java中volatile关键字

认识volatile关键字 Java中的volatile关键字用于修饰一个变量&#xff0c;当这个变量被多个线程共享时&#xff0c;这个变量的值如果发生更新&#xff0c;每个线程都能获取到最新的值。volatile关键字在多线程环境下还会禁止指令重排序&#xff0c;确保变量的赋值操作按照代码的…

书生·浦语大模型实战营-学习笔记4

XTuner 大模型单卡低成本微调实战 Finetune简介 常见的两种微调策略&#xff1a;增量预训练、指令跟随 指令跟随微调 数据是一问一答的形式 对话模板构建 每个开源模型使用的对话模板都不相同 指令微调原理&#xff1a; 由于只有答案部分是我们期望模型来进行回答的内容…

敏捷测试和DevOpes自动化测试的区别

敏捷测试和DevOps自动化测试在以下方面存在区别&#x1f447; 1️⃣目标 &#x1f388;敏捷测试的主要目标是提供快速的反馈和持续的改进&#xff0c;以便在开发过程中尽早发现和解决问题&#xff0c;从而提高软件的质量和可靠性。 &#x1f308;DevOps自动化测试的目标是提高软…

C语言算法赛——蓝桥杯(省赛试题)

一、十四届C/C程序设计C组试题 十四届程序C组试题A#include <stdio.h> int main() {long long sum 0;int n 20230408;int i 0;// 累加从1到n的所有整数for (i 1; i < n; i){sum i;}// 输出结果printf("%lld\n", sum);return 0; }//十四届程序C组试题B…

统计学-R语言-7.1

文章目录 前言假设检验的原理假设检验的原理提出假设做出决策表述结果效应量 总体均值的检验总体均值的检验(一个总体均值的检验) 练习 前言 本章主题是假设检验(hypothesis testing)。与参数估计一样&#xff0c;假设检验也是对总体参数感兴趣&#xff0c;如比例、比例间的差…

负载均衡流程

1、负载均衡流程图 2、触发负载均衡函数trigger_load_balance void trigger_load_balance(struct rq *rq) { /* Dont need to rebalance while attached to NULL domain */ if (unlikely(on_null_domain(rq)))//当前调度队列中的调度域是空的则返回 return; i…

SpringMVC数据校验

导包 配置springmvc.xml <bean id"validator" class" org.springframework.validation.beanvalidation.LocalValidatorFactoryBean"><property name"providerClass" value"org.hibernate.validator.HibernateValidator ">…
最新文章