rabbitmq入门学习

写在前面

本文看下rabbit mq的基础概念以及使用。

1:简单介绍

为了不同进程间通信的解耦,出现了消息队列,为了规范消息队列的具体实现,Java制定了jms规范,这是一套基于接口的规范,因此是绑定语言的,即只能通过Java语言来实现和使用,与jms类似还有基于net的nms,这也是一套规范接口,只不过是基于.net开发语言的。不管是jms还是nms,它们都有一个通病,就是无法实现跨语言,这个时候amqp就应用而生了,可以将其理解为一种应用层的协议,构建在tcp之上,因此就可以实现跨语言的消息通信,参考下图:
在这里插入图片描述

amqp协议通信模型如下:
在这里插入图片描述

2:基础环境准备

2.1:服务安装

参考docker安装rabbitmq 。

2.2:创建Virtual host和用户

  • 创建virtual host
    在这里插入图片描述
  • 创建admin用户
    在这里插入图片描述
  • 设置admin权限
    在这里插入图片描述
    添加成功:
    在这里插入图片描述

3:正戏

本文主要看其提供的5种队列,如下图:
在这里插入图片描述

3.1:简单队列

简单队列就是一个生产者一个消费者的队列方式,如下图:
在这里插入图片描述

  • 生产消息:
public class SimpleSend {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World come here!!!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

生产后:
在这里插入图片描述
点击q_test_01:
在这里插入图片描述

  • 消费消息
public class SimpleRecv {

    private final static String QUEUE_NAME = "q_test_01";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列 true 这里使用自动确认,及消息消费默认消费成功,这种方式效率高,但是容易丢失消息
        // 如果某些场景允许部分消息丢失,但是要求执行效率,则可以考虑将该值设置为true,否则设置为false,即手动确认
        // 最周通过执行channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);完成手动确认,多一次网络通信
        channel.basicConsume(QUEUE_NAME, true, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

输出:

[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---
 [x] Received 'Hello World come here!!!'

在这里插入图片描述

3.2:work队列

一个生产者多个消费者,如下图:
在这里插入图片描述

work队列看起来和简单队列相比只是多起了几个消费者而已。

  • 生产者
public class WorkSend {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}
  • 消费者
public class WorkRecv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        // 该值设置为1,配合手动确认,则可以实现一条一条消费,确认一条后消费下一条,且多个消费者时谁的消费能力强,谁消费的消息多
        // 消费者之间消费消息不相互影响
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,false表示手动返回完成状态,true表示自动
        channel.basicConsume(QUEUE_NAME, false, consumer);
//        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [y] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回确认状态,注释掉表示使用自动确认模式
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.3:订阅模式

这种这种模式可以实现一个消息同时被多个消费者消费(广播),但是具体的实现需要依赖于交换器exchange,生产者端将消息发送到交换器,之后消费者端只需要将某个消息队列绑定到交换器,交换器会将消息发送到绑定的所有队列,消费者端就可以从队列中获取到对应的消息,但需要注意一个队列的消息还是只可以获取一次,如下图们:
在这里插入图片描述

  • 生产者端
public class SubscribeSend {

    private final static String EXCHANGE_NAME = "test_exchange_fanout111";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}

消费者端2个

public class SubscribeRecv {

    private final static String QUEUE_NAME = "test_queue_work1";
    private final static String EXCHANGE_NAME = "test_exchange_fanout111";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

public class SubscribeRecv2 {

    private final static String QUEUE_NAME = "test_queue_work2";
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv2] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
  • 启动2个消费者
    在这里插入图片描述

  • 启动生产者
    在这里插入图片描述

[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---
 [Recv2] Received 'Hello World!'

[INFO] --- exec-maven-plugin:3.1.0:exec (default-cli) @ rabbitmq-study ---
 [Recv] Received 'Hello World!'

接着看下转换器和队列的绑定关系:
在这里插入图片描述

注意以下的问题:

1:因为交换器本身不具备数据存储的能力,所以如果是某个交换器上没有绑定任何的队列,则该消息将会丢失。
2:因为交换器本身不存储数据,所以在具体的消息队列绑定到交换器(即消费者启动前),不要生产消息到交换器,否则消息将会丢失。

3.4:路由模式

这种方式类似于订阅模式,也需要转换器作为中间商,但是并不会直接无脑的发送消息,而是会根据消费者额外指定的路由key,生产者在向转换器发送消息时会带着routeKey,消费者在消费消息时会指定自己期望的routeKey只有二者匹配时,才会从队列中消费对应的消息,l另外注意的时交换机类型设置为direct,如下图:
在这里插入图片描述

  • 生产者
public class RoutingSend {

    private final static String EXCHANGE_NAME = "test_exchange_direct123";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange 设置交换器为direct,即路由模式
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息内容
        String message = "Hello World!";
        // 设置路由key为"update",即只有对应队列上设置了update路由key的消费者才会消费到消息
        channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}
  • 2个消费者
public class RoutingRecv {

    private final static String QUEUE_NAME = "test_queue_work_route";
    private final static String EXCHANGE_NAME = "test_exchange_direct123";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机 设置路由key为delete,update
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

public class RoutingRecv2 {

    private final static String QUEUE_NAME = "test_queue_work_route2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

在这里插入图片描述
在这里插入图片描述
先启动消费者再启动生产者测试可。
在这里插入图片描述

写在后面

参考文章列表

RabbitMQ使用教程(超详细)

docker安装rabbitmq

RabbitMq的一些概念,JMS、AMQP、MQ 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/121956.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

人脸识别中的人工智能

随着人工智能技术的快速发展&#xff0c;人脸识别作为其中的一个重要应用领域&#xff0c;已经在各个行业和场景中展现出了巨大的潜力和价值。人脸识别技术通过对人脸图像进行采集、处理和分析&#xff0c;基于人工智能算法对人脸进行识别和验证&#xff0c;并在安防监控、金融…

安防监控EasyCVR视频汇聚平台无法接入Ehome5.0是什么原因?该如何解决?

视频云存储/安防监控EasyCVR视频汇聚平台基于云边端智能协同&#xff0c;支持海量视频的轻量化接入与汇聚、转码与处理、全网智能分发、视频集中存储等。安防平台EasyCVR拓展性强&#xff0c;视频能力丰富&#xff0c;具体可实现视频监控直播、视频轮播、视频录像、云存储、回放…

彻底删除Ubuntu双系统(联想小新2022)

彻底卸载Ubuntu双系统 以里联想小新pro16 i9-12900h为例子 把开机启动项设为默认Windows启动 以联想电脑为例子&#xff0c;关机后一直点击Fn F2进入Bios把windows启动项移到最上面&#xff0c;这样可以开机默认启动windows了删除ubuntu系统分区 使用磁盘管理软件 DiskGeniu…

多无人机在线路径规划的新算法

南京航空航天大学自动化学院使用NOKOV度量动作捕捉系统获取多架无人机的精确位置信息&#xff0c;实现多架无人机协同实时路径规划。 研究背景 近年来&#xff0c;无人机越来越多地应用于执行战场侦察、目标识别、跟踪打击等任务。 由多架无人机协同执行任务&#xff0c;通过…

Luatos Air700 改变BL0942串口波特率

LuatOs 改变模块串口波特率思路参照 luatos 改变AIR530串口波特率 BL0942默认串口波特率可以通过SCLK_BPS引脚接3.3V电源设置到9600bps 但如果调整到38400bps需要修改0x19寄存器 bl0942 v1.06版的特殊寄存器说明&#xff0c;注意早期版本特殊寄存器说明存在错误 完整代码 mai…

Kubernetes 问题排查全景图

伴随着混沌的微服务架构&#xff0c;多语言和多网络协议混杂。以及下沉的基础设施能力屏蔽实现细节&#xff0c;问题定界越发困难。 企业急需一种支持多语言&#xff0c;多通信协议的技术&#xff0c;并在产品层面尽可能覆盖软件栈端到端的可观测性需求。 「Kubernetes 问题排查…

java版直播商城免费搭建平台规划及常见的营销模式+电商源码+小程序+三级分销+二次开发

1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、Mybatis、Redis 3. 前端框架…

idea使用lombok编译问题

idea编译报错问题如下&#xff1a; java: You arent using a compiler supported by lombok, so lombok will not work and has been disabled.Your processor is: com.sun.proxy.$Proxy26Lombok supports: OpenJDK javac, ECJ解决方案 1.先将jdk替换为openjdk,随后将项目配置…

如何搭建一个自定义UI框架的Playground(一)

文章目录 初衷需求技术选型详细设计&#xff08;一&#xff09;1.业务设计2.交互设计3.程序设计3.1 游戏生命周期设计3.2 UI界面管理设计 初衷 想要比较系统、深入地了解游戏UI框架的设计与开发&#xff0c;就需要自己实践去开发一个可以预览的UI项目&#xff0c;但是目前没有…

Docker快速搭建Drupal内容管理系统并远程访问

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525;个人专栏:《Linux深造日志》《C干货基地》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 前言1. Docker安装Drupal2. 本地局域网访问3 . Linux 安装cpolar4. 配置Drupal公网访问地址5. 公网远程访问Drupal…

在线CRM系统的安全性高吗?企业该如何选择?

在线CRM系统具备门槛低、功能不打折扣、部署周期短等优点&#xff0c;相比本地化部署更加适合中小企业。但很多企业在选型软件时会顾虑在线CRM系统的安全性高吗&#xff1f; 通常情况下厂商会比中小企业更有实力保证数据安全&#xff0c;从技术手段保护企业隐私不被盗用。 数…

数据结构与算法之美学习笔记:16 | 二分查找(下):如何快速定位IP对应的省份地址?

目录 前言二分查找的变形问题变体一&#xff1a;查找第一个值等于给定值的元素变体二&#xff1a;查找最后一个值等于给定值的元素变体三&#xff1a;查找第一个大于等于给定值的元素变体四&#xff1a;查找最后一个小于等于给定值的元素 解答开篇内容小结 前言 本节课程思维导…

第三章:人工智能深度学习教程-人工智能与机器学习与深度学习之间的区别

人工智能基本上是通过一组规则&#xff08;算法&#xff09;将人类智能融入机器的机制。人工智能是两个词的组合&#xff1a;“人工”是指由人类或非自然物体制造的东西&#xff0c;“智能”是指相应地理解或思考的能力。另一个定义可能是“人工智能基本上是训练机器&#xff0…

KeyShot for 3dMax插件教程

KeyShot for 3dMax插件教程 KeyShot是一款先进的3D渲染和动画软件&#xff0c;通过直观、精简的用户界面和革命性的动画工作流简化了整个媒体创建过程&#xff0c;可以实时创建完全渲染的动画。 快速 立即查看结果。 这就是KeyShot渲染引擎的功能&#xff1a;您所做的每一个更…

鸿蒙原生应用开发-DevEco Studio远程真机的使用

一、先看看远程真机支持的机型情况相比本地和模拟器多了很多机型 二、远程真机使用的相关说明 该特性在DevEco Studio V2.2 Beta1及更高版本中支持。 如果开发者没有真机设备资源&#xff0c;则不能很方便的调试和验证HarmonyOS应用&#xff0c;为方便开发者&#xff0c;De…

U-Mail邮件系统安全登录解决方案

企业邮箱是企业对内对外商务往来的主要通信工具&#xff0c;并且企业邮箱里面还包含了大量企业内部隐私信息、商业机密等&#xff0c;很容易成为黑客的攻击目标。其中邮件盗号是企业邮箱遭受攻击的主要形式&#xff0c;一旦企业邮箱密码被黑客盗取&#xff0c;黑客不仅可以利用…

中国人民大学与加拿大女王大学金融硕士——在金融领域里持续探索、成长

在金融领域里持续探索、成长&#xff0c;这是一个永无止境的旅程。在这个领域里&#xff0c;机遇与挑战并存&#xff0c;未知与已知交织&#xff0c;需要我们时刻保持敏锐的洞察力和扎实的基本功。金融市场的变化日新月异&#xff0c;我们需要时刻关注市场动态&#xff0c;了解…

Tcl语言:SDC约束命令create_generated_clock详解(下)

相关阅读 Tcl语言https://blog.csdn.net/weixin_45791458/category_12488978.html?spm1001.2014.3001.5482 设定生成时钟特性 前文的末尾提到&#xff0c;当使用-divide by或-multiply_by选项创建生成时钟时&#xff0c;会根据master clock的时钟周期派生出生成时钟的周期&am…

C++ explicit关键字的作用

explicit关键字只针带一个参数的构造函数有效 #include <iostream> using namespace std;class A { public:A(int temp) //普通构造函数{a temp;cout << "普通构造函数: a " << a << endl;}A(const A &temp) //拷贝构造函数{a temp.a…
最新文章