(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

文章目录

  • RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列
    • 消费端限流
    • 利用限流实现不公平分发
    • 消息存活时间
    • 优先级队列

消费端限流

在这里插入图片描述

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

1、生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

2、消费端配置限流机制

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 1233456
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
       prefetch: 5

3、消费者监听队列

@Component
public class QosConsumer{
    @RabbitListener(queues = "my_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        // 1.获取消息
        System.out.println(new String(message.getBody()));
        // 2.模拟业务处理
        Thread.sleep(3000);
        // 3.签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
   }
}

利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多

1、生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

端配置不公平分发

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 1233456
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
       prefetch: 1

编写两个消费者

@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "my_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception
     {
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
     }
    
    // 消费者2
    @RabbitListener(queues = "my_queue")
    public void listenMessage2(Message message, Channel channel) throws Exception
     {
        //1.获取消息
        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(3000);// 消费者2处理慢
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间

1、在创建队列时设置其存活时间:

@Configuration
public class RabbitConfig2 {
    private final String EXCHANGE_NAME="my_topic_exchange2";
    private final String QUEUE_NAME="my_queue2";
    // 1.创建交换机
    @Bean("bootExchange2")
    public Exchange getExchange2(){
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.创建队列
    @Bean("bootQueue2")
    public Queue getMessageQueue2(){
        return QueueBuilder
               .durable(QUEUE_NAME)
               .ttl(10000) //队列的每条消息存活10s
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2、生产者批量生产消息,测试存活时间

@Test
public void testSendBatch2() throws InterruptedException {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
       rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);
   }
}

设置单条消息存活时间

@Test
public void testSendMessage() {
    //设置消息属性
    MessageProperties messageProperties = new MessageProperties();
    //设置存活时间
    messageProperties.setExpiration("10000");
    // 创建消息对象
    Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);
    // 发送消息
    rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}

注意:

1 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。

2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration(“10000”);
// 3.创建消息对象
Message message = new Message((“send message…” +i).getBytes(),messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, message);
} else {
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, “sendmessage…” + i);
}
}
}
在以上案例中,i=5的消息才有过期时间,10s后消息并没有 马上被移除,但该消息已经不会被消费了,当它到达队列顶 端时会被移除。

优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

1、创建队列和交换机

@Configuration
public class RabbitConfig3 {
    private final String EXCHANGE_NAME="priority_exchange";
    private final String QUEUE_NAME="priority_queue";
    // 1.创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange priorityExchange(){
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.创建队列
    @Bean(QUEUE_NAME)
    public Queue priorityQueue(){
        return QueueBuilder
               .durable(QUEUE_NAME)
                //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
               .maxPriority(10)
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2、编写生产者

@Test
public void testPriority() {
    for (int i = 0; i < 10; i++) {
        if (i == 5) {
            // i为5时消息的优先级较高
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setPriority(9);
            Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);
            rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
       } else {
           rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
       }
   }
}

3、编写消费者

@Component
public class PriorityConsumer {
    @RabbitListener(queues = "priority_queue")
    public void listenMessage(Message message, Channel channel) throws Exception
     {
        //获取消息
        System.out.println(new String(message.getBody()));
        //手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/45257.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

jenkins

Gitlab添加钩子 测试钩子 添加完成后&#xff0c;下面会出现钩子选择。点击test中的&#xff0c;push event。 出现successful&#xff0c;既添加成功。 如果添加失败&#xff0c;报错&#xff0c;更改Network

智能安全配电装置应用场景有哪些?

安科瑞 华楠 一、应用背景 电力作为一种清洁能源&#xff0c;给人们带来了舒适、便捷的电气化生活。与此同时&#xff0c;由于使用不当&#xff0c;维护不及时等原因引发的漏电触电和电气火灾事故&#xff0c;也给人们的生命和财产带来了巨大的威胁和损失。 为了防止低压配电…

妙记多 Mojidoc PC端(Mac 端+windows端)Beta版本正式上线!

你们呼唤了无数次的妙记多 Mojidoc PC客户端 Beta版本正式上线啦&#xff01; 感谢300位妙友积极参与内测&#xff0c;给予了我们很多非常有效的意见和建议&#xff01;我们会根据用户反馈不断优化和修复相关功能&#xff0c;在此感谢妙友们一直以来的支持&#xff5e; PC端拥…

Spring(10) 生成和替换Banner启动图案

目录 1.背景2.推荐网站3.如何集成到spring项目中4.效果展示 1.背景 我们在启动 Spring 项目的时候经常会看到一个 Spring 字样的启动图案。如下所示&#xff1a; 如果我们也想根据我们的内容生成这样的图案&#xff0c;应该怎么操作呢&#xff1f; 2.推荐网站 可以生成这种图…

Qt Core学习日记——第四天QMetaEnum(下)

类定义&#xff1a; 成员变量就只有QMetaObject *mobj和uint handle&#xff0c;handle同样用于计算在qt_meta_stringdata_XTest的位置 成员函数&#xff1a; 接下以test类进行函数讲解 test.h #pragma once #include <qobject.h> #include <QFlags> class X…

C语言非常道 6.4习题解答

关于 #include “stdarg.h” 相关知识小结&#xff1a; 函数&#xff1a;tppedef va_list char * ; va_list al; va_start(al, fmt) 使 al 指向变参函数中最后一个已知参数&#xff08;从右往左数的第一个已知参数&#xff09; va_arg(两个参数&#xff09;&#xff0c;第一个…

SpringBoot中配置文件的加载

springboot 启动会扫描一下位置的application.properties或者application.yml文件作为springboot的默认配置文件 file:./config/(项目根目录config文件夹下的配置文件) file:./(项目根目录下的配置文件) classpath:/config/(resources目录config文件下的配置文件) classpat…

【Android】Ubuntu20.04编译Android 13并用模拟器运行

前言 一直好奇Android系统是怎么定制的&#xff0c;直到亲自走一遍Android系统编译流程才发现并没想象的复杂。 这就跟app开发一样&#xff0c;Google官方其实都提供了平台、文档、IDE及一些工具&#xff0c;咱们只要按照官方提供的指南来操作就行了。 如果Android没有提供这…

中文输入法开发-关键代码

续上篇介绍了嵌入式Linux下中文输入法&#xff0c; 嵌入式Linux下开发中文输入法_嵌入式输入法_小刚学長的博客-CSDN博客 本篇继续介绍核心关键功能 展现效果图如下&#xff1a; 1、如何跟应用关联起来&#xff0c;比如说&#xff0c;希望当LineEdit 输入状态激活后&#xff0…

一文看懂FIFO!

什么是FIFO&#xff1f; FIFO: First in, First out 代表先进的数据先出 &#xff0c;后进的数据后出。 为什么需要FIFO&#xff1f; FIFO存储器是系统的缓冲环节&#xff0c;如果没有FIFO存储器&#xff0c;整个系统就不可能正常工作。 FIFO的功能可以概括为 &#xff0…

Python Flask构建微信小程序订餐系统 (十二)

🔥 创建切换商品分类状态的JS文件 🔥 ; var food_act_ops={init:function(){this.eventBind();},eventBind:function(){//表示作用域var that = this;$(".wrap_search select[name=status]").change(function(){$(".wrap_search").submit();});$(&qu…

java根据模板导出word

java根据模板导出word 日常开发中&#xff0c;常常会遇到各种各样的表格进行导出&#xff0c;比较好的办法就是提前弄好word模版&#xff0c;再通过遍历的方式进行导出文档 1、制作word模版 模版编写 内容替换 目标下面模版进行多页展示 将word转换成xml 将xml格式化 再将x…

HCIP——OSPF基础

OSPF基础 一、OSPF基础二、OSPF的区域划分三、OSPF的数据包hello包数据库描述包DBD包链路状态请求包LSR包链路状态更新包LSU包链路状态确认包LSAck包 四、OSPF的状态机五、OSPF的工作过程六、链路状态型的路由生成过程七、条件匹配五、OSPF数据包头部八、OSPF的接口网络类型 一…

STC12C5A系列单片机内部 EEPROM 的应用

参考范例程序。 eeprom.c #include "eeprom.h"/*---------------------------- Disable ISP/IAP/EEPROM function Make MCU in a safe state ----------------------------*/ void IapIdle() {IAP_CONTR 0; //Close IAP functionIAP_CMD 0; …

ubuntu docker离线安装docker(.deb包方式)(成功)(附卸载方法)

参考文章&#xff1a;Install Docker Engine on Ubuntu 文章目录 安装步骤下载安装包拷贝到目标主机并执行安装命令 验证拉取运行容器测试build dockerfile测试持久运行容器测试主机重启后&#xff0c;docker各服务是否正常自启 卸载方法附&#xff1a;各安装包作用说明&#x…

静态路由小实验

文章目录 一、实验要求及拓扑图二、实验步骤三、思考题 一、实验要求及拓扑图 二、实验步骤 1、创建VLAN&#xff0c;将端口划入vlan 在交换机S3、S4上创建VLAN10、20 Switch(config)#vl 10 Switch(config-vlan)#vl 20 S3(config)#int f0/3 S3(config-if)#switchport access …

音视频——帧内预测

H264编码(帧内预测) 在帧内预测模式中&#xff0c;预测块P是基于已编码重建块和当前块形成的。对亮度像素而言&#xff0c;P块用于44子块或者1616宏块的相关操作。44亮度子块有9种可选预测模式&#xff0c;独立预测每一个44亮度子块&#xff0c;适用于带有大量细节的图像编码&…

uniapp 之 微信小程序、支付宝小程序 对于自定义导航栏的不同

目录 前言 微信小程序 代码 支付宝小程序 首页配置文件 二级菜单页面 配置 总结 不同 相同 前言 小程序都是 uni-app 写的 不是原生 微信小程序 代码 pages.json文件中配置 重点&#xff1a; "navigationStyle": "custom", // 导航栏样式…

MAC 推送证书不受信任

配置推送证书的时候&#xff0c;一打开就变成不受信任&#xff0c;搜了很多解决版本。 由于苹果修改相关规定&#xff0c;推送证书 打开Apple PKI - Apple 下载AppleWWDRCA文件&#xff0c;选择G4,双击安装之后&#xff0c;证书已经变为受信任。 AppleWWDRCA(Apple Worldwid…

代码随想录day12 | [前、中、后、层]二叉树的遍历迭代法和递归法

文章目录 一、前后中序递归法二、前后序迭代法三、中序遍历迭代法四、层序遍历 递归三部曲&#xff1a; 1️⃣ 第一步确定递归函数的返回值和参数 2️⃣第二步确定递归的终止条件 3️⃣第三步确定单层递归处理的逻辑 一、前后中序递归法 前序遍历二叉树 class Solution { pr…
最新文章