WorkQueue模型

        WorkQueues,也被称为任务队列模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时的处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因为任务是不会被重复执行的。

P:生产者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢。

C2:消费者-2,领取任务并且完成任务,假设完成速度快。

1.生产者

public class Provider {
    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for(int i = 1;i<=200;i++){
            channel.basicPublish("","work",null,(i+"hello rabbitmq").getBytes());
        }
        RabbitMqUtil.closeConnectionAndChannel(channel,connection);
    }
}

2.消费者-1

 

public class Consumer1 {
    //消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = RabbitMqUtil.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                    System.out.println("consumer1得到:"+new String(body));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //注意这里不能关闭通道和连接,因为要一直监听
    }
}

 3.消费者-2

public class Consumer2 {
    //消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = RabbitMqUtil.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2得到:"+new String(body));
            }
        });
        //注意这里不能关闭通道和连接,因为要一直监听
    }
}

4.结果分析

通过运行结果我们发现消费者1和消费2是平均处理消息的,就比如1000个消息一人处理一半。而且现在的机制是,队列中的消息会一次性全部分配给两个消费者,囤积到两个消费者处然后让两个消费者去各自慢慢消化。这样就会产生一些问题:

1.由于我们设置了消息的自动确认机制,两个消费者刚得到大量消息都还没开始消费其实就已经告诉队列我们确认完了,这样显然是不合理的。

2.消费者那边一次性囤积了大量未处理的消息,如果处理中宕机了,囤积的消息会丢失。

而且假如消费者2执行的很快,而另一个消费者1执行的很慢,这样消费者2很快执行完就空闲了,而消费者1一直迟迟执行不完。能不能改进为能者多劳的机制呢?

  • 消费者一次只接收一条未确认的消息。

  • 关闭自动确认消息。

  • 消费者处理完一条,要手动确认消息。

5.能者多劳

消费者改进:

public class Consumer1Improve {
    //消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = RabbitMqUtil.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        channel.basicQos(1);//一次只接收一条
        //参数2:关闭自动确认
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                    System.out.println("consumer1得到:"+new String(body));
                    channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //注意这里不能关闭通道和连接,因为要一直监听
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/284062.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

动力学约束下的运动规划算法——Kinodynamic RRT*算法

一、RRT * 算法回顾 为了更好的理解Kinodynamic RRT*算法&#xff0c;我们先来回顾一下RRT * 算法 RRT * 先通过Sample函数随机选取一个点Xrand&#xff0c;然后通过Near函数找到当前树上距离Xrand最近的一个点Xnear&#xff0c;再通过Steer函数&#xff0c;沿着从Xnear到Xra…

04 HAL库下使用定时器产生一个中断

目录 一、定时器的相关知识点 1.定时器的定义 2. 查看时钟配置 3. 定时器的分类 二、实验开始 1. 配置一个定时器 2.打开定时器的中断配置 引言 在本文的开头我想给大家分享一下单片机工作的两种工作模式轮询和中断&#xff08;异步&#xff09;&#xff0c; 中断也叫做…

一文搞懂什么是缓存穿透、缓存雪崩、缓存击穿三个概念,以及解决方案

先理解概念&#xff1a;【注&#xff1a;我们这里说的是分布式、高并发环境】 一、缓存穿透是什么&#xff1f; 缓存穿透是指&#xff1a;请求【可以有很多】的数据在缓存、关系型数据库中都不存在&#xff0c;每次来查询都会查询到关系型数据库中。 解决方案&#xff1a; 1、将…

打破数据孤岛:ChatGPT如何打通金融大数据的任督二脉?

文章目录 一、引言二、ChatGPT与金融大数据分析的融合三、实践应用&#xff1a;ChatGPT在金融大数据分析中的优势与挑战四、案例分析&#xff1a;ChatGPT在金融大数据分析中的应用案例五、前景展望&#xff1a;ChatGPT在金融大数据分析领域的未来发展《AI时代Python金融大数据分…

Qt5 安装教程 - 跳过登录界面

Qt5 安装教程 - 跳过登录界面 引言一、下载二、安装三、使用四、修改、维护、卸载 引言 Qt5.14.2及以前的版本有离线安装包&#xff0c;无需登录 (老版本连登录界面也无)。之后的版本需登录进行在线安装。 本文以Qt5.12.2版本为例&#xff0c;说明如何跳过登录界面&#xff0c…

如何解决企业内部FTP文件传输速度过慢和安全问题

在数据化时代里&#xff0c;企业内部的文件传输永远是刚需&#xff0c;而因为 FTP协议的简单、易用、广泛支持等优点&#xff0c;让很多企业早期都普遍使用&#xff0c;随着数量量的增多&#xff0c;和对安全的要求越来越高&#xff0c;FTP也暴露出了一些列问题&#xff0c;小编…

算法逆袭之路(1)

11.29 开始跟进算法题进度! 每天刷4题左右 ,一周之内一定要是统一类型 而且一定稍作总结, 了解他们的内在思路究竟是怎样的!! 12.24 一定要每天早中晚都要复习一下 早中午每段一两道, 而且一定要是同一个类型, 不然刷起来都没有意义 12.26/27&#xff1a; 斐波那契数 爬…

使用 Tkinter 制作一个进制转换工具,好用!

在平时工作学习当中&#xff0c;我们经常会编写一些简单的 Python GUI 工具&#xff0c;以此来完成各种各样的自动化任务&#xff0c;比如批量处理文件&#xff0c;批量处理图片等等。当我们进行这些工具的编写之时&#xff0c;往往只关注了功能的实现&#xff0c;而忽略了页面…

Android 跨进程之间通信(IPC)方式之ContentProvider

Android 跨进程之间通信 Android 跨进程之间通信(IPC)方式之BroadcastReceiverAndroid 跨进程之间通信(IPC)方式之ContentProvider 文章目录 Android 跨进程之间通信前言一、ContentProvider 是什么&#xff1f;二、如何利用ContentProvider跨进程通信1.创建自定义ContentProv…

伺服电机:原点复位

一、原点复位概念 原点复位指的是&#xff0c;在驱动器使能时&#xff0c;触发原点复位功能后&#xff0c;电机将主动查找零点&#xff0c;完成定位功能。 那么问题来了&#xff0c;什么是原点&#xff0c;什么是零点&#xff1f; 原点&#xff1a;即机械原点&#xff0c;可…

面向对象知识点

类和对象知识点梳理 1. 类和对象的概念 类是对一类事物的描述&#xff0c;是抽象的、概念上的定义。Java 中定义类的关键字是&#xff1a;class。 具有相同特征和行为的对象抽象成类&#xff0c;类描述了这一类对象的属性和方法&#xff1a; 属性&#xff08;成员变量&#x…

分布式技术之数据复制技术

文章目录 什么是数据复制技术&#xff1f;数据复制技术原理及应用同步复制技术原理及应用异步复制技术原理及应用半同步复制技术原理及应用三种数据复制技术对比 什么是数据复制技术&#xff1f; 数据复制是一种实现数据备份的技术。数据复制技术&#xff0c;可以保证存储在不…

3D目标检测(教程+代码)

随着计算机视觉技术的不断发展&#xff0c;3D目标检测成为了一个备受关注的研究领域。与传统的2D目标检测相比&#xff0c;3D目标检测可以在三维空间中对物体进行定位和识别&#xff0c;具有更高的准确性和适用性。本文将介绍3D目标检测的相关概念、方法和代码实现。 一、3D目…

回溯法寻找元素之和等于目标值的子集

这是一个回溯法的算法,可以用来寻找所有元素之和等于目标值的子集. 整个算法中最重要的是:在递归之后"恢复现场" 也就是: t[cnt]0; cnt--; 完整代码(注释部分打印信息可以用来辅助理解递归过程)&#xff1a; #include<iostream> #include<cstring> …

RFC7636-PKCE

前言 PKCE &#xff08;RFC 7636&#xff09; 是授权代码流的扩展&#xff0c;用于防止 CSRF 和授权代码注入攻击。 PKCE 不是客户端身份验证的一种形式&#xff0c;PKCE 不能替代客户端密码或其他客户端身份验证。即使客户端使用客户端密码或其他形式的客户端身份验证&#…

oracle物化视图

物化视图定义 视图是一个虚拟表&#xff08;也可以认为是一条语句&#xff09;&#xff0c;基于它创建时指定的查询语句返回的结果集&#xff0c;每次访问它都会导致这个查询语句被执行一次&#xff0c;为了避免每次访问都执行这个查询&#xff0c;可以将这个查询结果集存储到…

【STM32】STM32学习笔记-输入捕获测频率和占空比(18)

00. 目录 文章目录 00. 目录01. 预留02. 输入捕获测频率接线图03. 输入捕获测频率示例04. 输入捕获测频率和占空比接线图05. 输入捕获测频率和占空比示例06. 示例程序下载07. 附录 01. 预留 02. 输入捕获测频率接线图 03. 输入捕获测频率示例 pwm.h #ifndef __PWM_H #define…

从入门到精通UNet: 让你快速掌握图像分割算法

文章目录 一、UNet 算法简介1.1 什么是 UNet 算法1.2 UNet 的优缺点1.3 UNet 在图像分割领域的应用 二、准备工作2.1 Python 环境配置2.2 相关库的安装 三、数据处理3.1 数据的获取与预处理3.2 数据的可视化与分析 四、网络结构五、训练模型5.1 模型训练流程5.2 模型评估指标5.…

【JS逆向】逆向案例之 ----- 安某客滑块

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 安某客滑块小结 1. 初步分析 总共分为两步&#xff0c; 获取滑块图片信息检查滑块移动是否正确 整体框架如下&#xff1a; 1.1 getinfoTp 获取图片信息…

Plantuml之JSON数据语法介绍(二十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…