RabbitMq使用与整合

MQ基本概念

MQ概述

MQ全称 Message Queue([kjuː])(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

(队列是一种容器,用于存放数据的都是容器,存放消息的就是消息队列)

分布式系统的调用:

方式一:直接调用

方式二:间接调用

A将数据存放到中间一个系统,通过中间的系统发送到B

中间系统可以成为中间件MQ

MQ是用于存放消息的中间件

被调用者叫生产者 调用者是消费者

MQ的优势和劣势

优势

应用解耦:提高系统容错性和可维护性。

异步提速:提升用户体验和系统吞吐量。

削峰填谷:提高系统稳定性。

应用解耦

系统的耦合性越高,容错性就越低,可维护性就越低。

使用 MQ 使得应用间解耦,提升容错性和可维护性

异步提速

提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

削峰填谷(削峰)

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

使用MQ后,可以提高系统稳定性。

劣势

系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

系统复杂度提高

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

一致性问题

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?

消费者-->生产者

1.生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。

2.容许短暂的不一致性。

3.确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

RabbitMQ基本介绍

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

RabbitMQ 基础架构

Broker 中间者 服务

procedure 和consumer都是客户端

客户端通过链接和服务端进行通信 所以需要建立起来连接 然后进行通信a

使用channel(管道)节省资源

一个rabbitmq里面有很多的虚拟机 相当于mysql里面有很多数据库,数据库里面有很多表,都是独立的。

每个虚拟机里面有很多的exchange和queue 独立分区的作用

 RabbitMQ 中的相关概念

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker。

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

Connection:publisher/consumer 和 broker 之间的 TCP 连接。

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:

        direct (point-to-point)

        topic (publish-subscribe)

        fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ的6 种工作模式

RabbitMQ 提供了 6 种工作模式:

简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

RabbitMQ的安装和配置

安装依赖环境

在线安装依赖环境:

yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

安装Erlang

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

安装RabbitMQ

#安装依赖的包

rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm

#安装rabbitmq

rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm socat-1.7.3.2-2.el7.x86_64.rpm rabbitmq-server-3.7.18-1.el7.noarch.rpm

启动RabbitMQ

systemctl start rabbitmq-server # 启动服务

systemctl stop rabbitmq-server # 停止服务

systemctl restart rabbitmq-server # 重启服务

systemctl status rabbitmq-server # 查看状态

开启管理界面及配置

# 开启管理界面

rabbitmq-plugins enable rabbitmq_management

# 修改默认配置信息

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app

# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest

修改之后重启一下rabbitmq

入门实例

1.添加虚拟主机

2.添加用户

3.重新设置权限

点击虚拟主机设置权限

4.idea连接

项目结构搭建

mq 导入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

生产者

public static void main( String[] args ) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        connectionFactory.setHost("192.168.229.16");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
//        建立连接
        Connection connection = connectionFactory.newConnection();
//        管道
        Channel channel = connection.createChannel();
//        创建队列
        /*
        * String queue 队列的名称
        * boolean durable 持久化
        * boolean exclusive 是否独占
        * boolean autoDelete 是否自动删除
        * Map<String,Object> arguments 参数
        * */
        channel.queueDeclare("test01",false,false,false,null);
//        发布消息
        channel.basicPublish("","test01",null,"第一次发送".getBytes());
    }

消费者

{
    public static void main( String[] args ) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        connectionFactory.setHost("192.168.229.16");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
//        建立连接
        Connection connection = connectionFactory.newConnection();
//        管道
        Channel channel = connection.createChannel();
//        消费信息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println(s);
            }
        };
        channel.basicConsume("test01",true,consumer);
    }

同时消费后消息组内为 0

RabbitMQ工作模式

Work queues工作队列模式

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

代码

Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

1.复制一个消费者

2.先运行起两个消费者

3.在生产者中多发布几条消息

for (int i = 0; i < 10; i++) {
    channel.basicPublish("","test01",null,("第"+i+"次发送").getBytes());
}

4.两个消费者会采用轮询的方式拿到消息

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系

订阅模式类型

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

生产者发消息给交换机,交换机将消息路由分发给队列,消费者监听队列接收信息

Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列

Direct:定向,把消息交给符合指定routing key 的队列

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

代码实现

生产者更改代码逻辑为:

//        创建交换机
        channel.exchangeDeclare("Exchange", BuiltinExchangeType.FANOUT,false);
//        创建队列
        channel.queueDeclare("test01Ex",false,false,false,null);
        channel.queueDeclare("test02Ex",false,false,false,null);
//        队列绑定交换机
        channel.queueBind("test01Ex","Exchange","");
        channel.queueBind("test02Ex","Exchange","");
//        发布消息
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("Exchange","",null,("第"+i+"次发送").getBytes());
        }

消费者分别从两个队列获取消息

Routing路由模式(Direct:定向)

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在 向Exchange发送消息时,也必须指定消息的RoutingKey

Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的 RoutingKey完全一致,才会接收到消息

 channel.queueBind(queuename,exchangename,"error"); // error
        channel.queueBind(queuename2,exchangename,"error");// error info 
        channel.queueBind(queuename2,exchangename,"info");
        // 发送消息
        //String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(exchangename,"info",null,"hello".getBytes());

Topics通配符模式

Topic与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符

RoutingKey一般都是有一个或多个单词组成,多个单词之间以”.”分割

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

channel.queueBind(queuename,exchangename,"order.*");
        channel.queueBind(queuename,exchangename,"*.error");
        channel.queueBind(queuename2,exchangename,"*.*");
        //发送消息
        
        //String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(exchangename,"goods.info",null,"hello".getBytes());

模式总结

RabbitMQ工作模式:

简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

发布订阅模式 Publish/subscribe

需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

路由模式 Routing

需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

通配符模式 Topic

需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

SpringBoot整合Mq

在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ

尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。

创建工程结构

添加依赖(sys-mq

<parent>
        <groupId>com.example</groupId>
        <artifactId>mqdemo02</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <artifactId>sys-mq</artifactId>
    <packaging>pom</packaging>

    <name>sys-mq</name>
    <url>http://maven.apache.org</url>
    <modules>
        <module>mq-product</module>
        <module>mq-consumer</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
<!--        rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

sys-mq里面两个子模块的 application.yml

server

     port: 记得修改

spring:
  rabbitmq:
    username: root
    password: root
    host: 192.168.229.16
    virtual-host: /test
    port: 5672
server:
  port: 8086

生产者

@Configuration
public class RabbitMqConfig {
//    设置交换机的名称和队列的名字
    public static final String EXCHANGE_NAME="exchange_topic-test";
    public static final String QUEUE_NAME="queue_topic-test";
    public static final String QUEUE_NAME2="queue_topic-test2";
//    创建交换机 将交换机作为bean注入到spring中
    @Bean("topicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(false).build();
    }
//    队列
    @Bean("topicQueue")
    public Queue topicQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    @Bean("topicQueue2")
    public Queue topicQueue2(){
        return QueueBuilder.durable(QUEUE_NAME2).build();
    }
//    将队列与交换机进行绑定
    @Bean
    public Binding exchangeWithQueue(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("*.test").noargs();
    }
    @Bean
    public Binding exchangeWithQueue2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue2") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}
@SpringBootTest
public class MyTest {
    public static final String EXCHANGE_NAME="exchange_topic-test";
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    void sendMsg() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME,"success.test","测试整合");
    }
}

消费者

@Component
public class MyListener {
//    监听队列的消息
    @RabbitListener(queues = "queue_topic-test")
    public void listenQueue(Message message) {
        byte[] body = message.getBody();
        System.out.println(new String(body));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/188052.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode—45.跳跃游戏II【中等】

2023每日刷题&#xff08;四十&#xff09; Leetcode—45.跳跃游戏II 贪心法思想 实现代码 #define MAX(a, b) (a > b ? (a) : (b))int jump(int* nums, int numsSize) {int start 0;int end 1;int ans 0;int maxStride 0;while(end < numsSize) {maxStride 0;fo…

2016年12月13日 Go生态洞察:2016年Go用户调查与企业问卷

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

以“防方视角”观文件上传功能

为方便您的阅读&#xff0c;可点击下方蓝色字体&#xff0c;进行跳转↓↓↓ 01 案例概述02 攻击路径03 防方思路 01 案例概述 这篇文章来自微信公众号“NearSec”&#xff0c;记录的某师傅接到一个hw项目&#xff0c;在充分授权的情况下&#xff0c;针对客户的系统进行渗透测试…

Java 简单解决 返回日期格式带 ‘T‘ 问题

问题描述 接口返回数据给前端时&#xff0c;返回的日期带 ‘T’ 解决方案&#xff1a; 在返回的实体类字段中&#xff0c;使用JsonFormat(pattern "yyyy-MM-dd",timezone"GMT8")格式化日期

springboot2.0 集成swagger3+Knife4j导出离线API 配置

springboot 版本2.3.1 一、集成swagger3 引入swagger依赖包 <!--swagger3集成--><dependency><groupId>org.springframework.plugin</groupId><artifactId>spring-plugin-core</artifactId><version>2.0.0.RELEASE</version>…

BGP联邦及路由反射器配置

需求 1 AS1存在两个环回&#xff0c;一个地址为192.168.1.0/24&#xff0c;该地址不能再任何协议中宣告 AS3存在两个环回&#xff0c;一个地址为192.168.2.0/24&#xff0c;该地址不能再任何协议中宣告 AS1还有一个环回地址为10.1.1.0/24&#xff0c;AS3另一个环回地址是11.1.1…

微服务 BFF 架构设计

文章目录 一、什么是 BFF二、典型的进程间微服务架构薄 BFF厚 BFF微服务模式对比 三、BFF 的问题四、BFF 的治理方向五、总结 在现代软件开发中&#xff0c;由于程序、团队、数据规模太大&#xff0c;需要把企业的业务能力进行复用&#xff0c;将领域服务剥离&#xff0c;提供通…

[PyTorch][chapter 66][强化学习-值函数近似]

前言 现实强化学习任务面临的状态空间往往是连续的,无穷多个。 这里主要针对这种连续的状态空间处理。后面DQN 也是这种处理思路。 目录&#xff1a; 1&#xff1a; 原理 2&#xff1a; 梯度更新 3&#xff1a; target 和 预测值 4 流程 一 原理 强化学习最重要的是得到 …

大中小协作 共筑科学梦——华中科技大学附属花城中学举办首届科技节

为普及科学知识&#xff0c;张扬科学精神&#xff0c;创设浓郁的科学氛围&#xff0c;11月24日&#xff0c;华中科技大学附属花城中学举办了以“走近科学&#xff0c;触碰未来”为主题的首届科技节暨科创文化展示周活动。学生们在学习中感受科技的魅力&#xff0c;在“玩”中感…

ChatGPT文章批量改写伪原创软件说明文档

大家好&#xff0c;我是淘小白~ 最近有很多朋友咨询&#xff0c;chatGPT文章改写插件和改写软件&#xff0c;这个软件之前已经做出来了&#xff0c;用的朋友不是很多&#xff0c;这几天有不少咨询的&#xff0c;现在把说明文档补一下&#xff0c;(#^.^#) 1、软件语言 Pytho…

初出茅庐的小李之C语言必备知识预处理

编译预处理 编译预处理就是在编译源代码之前进行的一系列处理&#xff0c;将源程序中的一些特殊命令进行展开或处理&#xff0c;生成扩展的源代码。这些特殊命令通常以“#”开头&#xff0c;占单独的行&#xff0c;语句尾部不需要加分号。 宏定义 (#define)是一种常见的编译…

Kotlin学习——流程控制,when,循环,range工具 kt里的equals if实现类似三元表达式的效果

Kotlin 是一门现代但已成熟的编程语言&#xff0c;旨在让开发人员更幸福快乐。 它简洁、安全、可与 Java 及其他语言互操作&#xff0c;并提供了多种方式在多个平台间复用代码&#xff0c;以实现高效编程。 https://play.kotlinlang.org/byExample/01_introduction/02_Functio…

张弛声音变现课,惊悚电影配音篇

在提供惊悚片的声音配音服务时&#xff0c;配音员旨在制造一种让观众的心率加快、情绪紧张的气氛。惊悚片侧重于心理层面的紧张和预期的恐怖&#xff0c;声音在塑造这种心理效应中起到了至关重要的作用。演员需通过对声音的精细雕琢和调整来强化电影的悬念和紧迫感。以下是为惊…

C语言SO EASY(ZZULIOJ1220: SO EASY)

题目描述 Superbin最近在研究初等数论&#xff0c;初等数论 是研究数的规律&#xff0c;特别是整数性质的数学分支。它是数论的一个最古老的分支。它以算术方法为主要研究方法&#xff0c;主要内容有整数的整除理论、同余理论、连分数理论和某些特殊不定方程。 是定义在正整数…

2017年2月16日 Go生态洞察:Go 1.8版本的革新

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

rocky8.9配置K8S集群kubernetes,centos同理

rocky8.9配置K8S集群 节点主机名IP地址mastertang1192.168.211.101node1tang2192.168.211.102node2tang3192.168.211.103 1&#xff09;准备工作 全部主机都配置静态ip vi /etc/sysconfig/network-scriptsTYPEEthernet PROXY_METHODnone BROWSER_ONLYno BOOTPROTOstatic DE…

华为ensp:单臂路由

通过单臂路由实现vlan之间的互通 将vlan和trunk配置好&#xff0c;我直接就在r1上演示单臂路由 我们要在r1的e0/0/0上面随便配置个ip&#xff0c;如果你不在接口上配置ip那就无法开启协议 R1 interface e0/0/0 进入真实接口随便配置个ip ip add 192.168.10.1 24 再进入子接…

YOLOV7主干改进,使用fasternet轻量化改进主干(完整教程)

1&#xff0c;Pconv&#xff08;来自Fasternet&#xff09;&#xff08;可作为模型中的基础卷积模块使用&#xff09; 论文链接&#xff1a;https://arxiv.org/abs/2303.03667 2&#xff0c;为了大家方便的使用&#xff0c;这里我对原本的PConv的代码做了部分的改动&#xff0…

Executors(线程池操作类)

一&#xff0c;常用方法 二&#xff0c;案例 package XianChengChildren;import java.util.concurrent.*;public class ThewadPoolTest1 {public static void main(String[] args) throws Exception { // ExecutorService pool new ThreadPoolExecutor(3,5,8, // …
最新文章