RabbitMQ的基本使用,进行实例案例的消息队列

目录

一、介绍

1. 概述

2. 作用

3. 工作原理

二、RabbitMQ安装部署

1. 安装

2. 部署

3. 增加用户

三、实现案例

1. 项目创建

2. 项目配置

3. 生产者代码

4. 消费者代码

四、测试

每篇一获


一、介绍

1. 概述

RabbitMQ 是一种开源的消息代理和队列服务器,用于通过简单和可扩展的方式在分布式系统中传递消息。它实现了高级消息队列协议(AMQP)。

  • 服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信)
  • 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送  给另一端,称为延迟消息通讯(异步通信)
  • 一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议
  • 消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.

以下是关于RabbitMQ的一些详细信息:

  1. 消息代理:RabbitMQ是一个消息代理,它接受并转发消息。你可以把它想象成一个邮局:当你把邮件放在邮筒里时,你可以确定邮差最终会把邮件送到你的收件人。在这个比喻中,RabbitMQ是邮筒、邮局和邮差。

  2. 可靠性:RabbitMQ支持消息持久化、传递确认、发布者确认和高可用性。

  3. 灵活的路由:RabbitMQ提供了多种消息路由模式,包括点对点、发布/订阅和路由模式。

  4. 集群:多个RabbitMQ服务器可以组成一个集群,形成一个高可用、负载均衡的系统。

  5. 多协议支持:RabbitMQ支持多种消息队列协议,包括AMQP、STOMP、MQTT等。

  6. 客户端支持:RabbitMQ为多种编程语言提供了客户端库,包括Java、.NET、Python、Ruby、PHP等。

  7. 管理界面:RabbitMQ提供了一个易于使用的用户界面,用于管理和监控你的RabbitMQ服务器。

  8. 跟踪:如果你需要查看消息传递的详细信息,RabbitMQ提供了消息跟踪功能。

  9. 插件机制:RabbitMQ支持通过插件扩展其核心功能。

2. 作用

RabbitMQ的主要作用和优势如下:

  1. 解耦:在系统设计中,组件之间的高度耦合是非常不利的。RabbitMQ作为消息队列中间件,可以有效地解耦系统,使得系统组件之间不直接通信,只通过消息队列来交换信息。

  2. 异步通信:RabbitMQ提供了异步处理的能力。当一个操作需要大量时间时,可以将该操作作为一个消息发送到队列,然后立即返回。这样,用户不需要等待这个操作完成,提高了系统的响应性能。

  3. 缓冲:RabbitMQ可以在处理高负载的情况下起到缓冲的作用。当消息的产生速度超过处理速度时,RabbitMQ可以暂存这些消息,等待处理程序准备好后再进行处理。

  4. 可靠性:RabbitMQ提供了消息持久化、传递确认、发布者确认等机制,确保消息不会丢失。

  5. 路由能力:RabbitMQ提供了灵活的消息路由能力,如点对点、发布/订阅等模式,满足不同的业务需求。

  6. 扩展性:RabbitMQ支持集群,可以通过增加更多的RabbitMQ节点来提高系统的处理能力。

  7. 跨平台和语言无关:RabbitMQ提供了多种语言的客户端,如Java、.NET、Python等,可以在不同的平台和语言之间进行通信。

  8. 监控:RabbitMQ提供了管理界面,可以方便地监控和管理消息队列的状态。

RabbitMQ作为一个消息队列中间件,可以帮助我们构建高效、可靠、可扩展的分布式系统。

3. 工作原理

RabbitMQ的工作原理主要基于生产者-消费者模型和消息队列。以下是其基本的工作流程:

  1. 生产者:生产者是创建消息的应用程序。它创建消息并发送到RabbitMQ。

  2. 队列:队列是RabbitMQ的内部结构,用于存储消息。多个生产者可以发送消息到一个队列,多个消费者可以从一个队列中获取消息。

  3. 交换器:生产者发送消息到交换器(Exchange),然后交换器根据一定的规则(路由键)将消息路由到一个或多个队列。RabbitMQ提供了几种类型的交换器,如直接交换器、主题交换器、头交换器和扇出交换器。

  4. 消费者:消费者是接收消息的应用程序。消费者连接到RabbitMQ并订阅一个队列,当新的消息到达队列时,RabbitMQ会将消息推送给消费者,或者消费者可以主动从队列中拉取消息。

  5. 消息确认:当消费者处理完一个消息后,它需要向RabbitMQ发送一个确认,告诉RabbitMQ这个消息已经被处理,可以从队列中删除。如果消费者处理消息时发生错误,它可以发送一个拒绝,告诉RabbitMQ这个消息没有被正确处理。

  6. 持久化:为了防止消息丢失,RabbitMQ提供了消息持久化的功能。生产者在发送消息时可以设置消息为持久化,RabbitMQ会将这些消息存储到磁盘,即使RabbitMQ服务器重启,这些消息也不会丢失。

通过这种方式,RabbitMQ可以在分布式系统中实现消息的可靠传递。 

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言.

  • Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
  • Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
  • Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
  • ExchangeType:交换机类型决定了路由消息行为,
  • RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
  • Message Queue:消息队列,用于存储还未被消费者消费的消息. Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内 容.
  • BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.

二、RabbitMQ安装部署

1. 安装

在虚拟机中下载RabbitMQ的镜像

命令:

docker pull rabbitmq:management 

2. 部署

查看防火墙列表的端口是否开启

5672(RabbitMQ的用户端口)和15672(RabbitMQ的管理员端口)

命令:

firewall-cmd --zone=public --list-ports

开放端口5672:

firewall-cmd --zone=public --add-port=5672/tcp --permanent 

开放端口15672:

firewall-cmd --zone=public --add-port=15672/tcp --permanent

更新防火墙端口:

firewall-cmd --reload

创建并运行RabbitMQ的容器:

docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management 

--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)

-e:指定环境变量:

RABBITMQ_DEFAULT_VHOST:默认虚拟机名

RABBITMQ_DEFAULT_USER:默认的用户名

RABBITMQ_DEFAULT_PASS:默认用户名的密码

最后是运用RabbitMQ镜像;

3. 增加用户

使用虚拟机IP和RabbitMQ的管理员端口登入后台管理:

如图所示增加用户:

点击创建的用户,在点击设置应用

之后退出,登入创建的用户

三、实现案例

在实现案例的时候,虚拟机的RabbitMQ容器不用停止运行,虚拟机不用关闭。

1. 项目创建

打开我们的开放工具,创建项目,来实现生产者-消费者的消息队列:

根据如图创建项目:

父项目

生产者(publisher)在父项目中

创建消费者(consumer)在父项目中:

2. 项目配置

在生产者(publisher)项目中配置yml文件:

server:
    port: 9949
spring:
    rabbitmq:
        host: 192.***.***.***
        username: Jun
        password: 123456
        port: 5672
        virtual-host: my_vhost

server.port:配置生产者的端口

host:配置虚拟机的IP(这里需要根据自己的虚拟机IP进行填写)

username:配置我们在RabbitMQ中创建的用户名称

password:配置我们在RabbitMQ中创建的用户密码

port:配置RabbitMQ的用户端口

virtual-host: 配置默认虚拟机名(my_vhost)

消费者(consumer)项目中配置yml文件:

server:
    port: 8848
spring:
    rabbitmq:
        host: 192.168.211.129
        username: Jun
        password: 123456
        port: 5672
        virtual-host: my_vhost

server.port:配置消费者的端口

host:配置虚拟机的IP(这里需要根据自己的虚拟机IP进行填写)

username:配置我们在RabbitMQ中创建的用户名称

password:配置我们在RabbitMQ中创建的用户密码

port:配置RabbitMQ的用户端口

virtual-host: 配置默认虚拟机名(my_vhost)

3. 生产者代码

在生产者中创建一个配置类,使用@Configuration注解的类表示这个类包含了一个或多个@Bean注解的方法,这些方法将会被Spring容器调用,其返回值将被添加到Spring的应用上下文中,作为一个bean供其他部分使用。

这个配置类的名字是RabbitConfig(可以自己修改),它的主要作用是配置RabbitMQ的队列

RabbitConfig:

package com.cloudjun.publisher;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@SuppressWarnings("all")
public class RabbitConfig {

    // 创建队列
    @Bean
    public Queue messageQueue() {
        return new Queue("messageQueue");
    }
    @Bean
    public Queue messageUser() {
        return new Queue("messageUser");
    }

}

创建实体对象来作为传输信息内容:

User:
 

package com.cloudjun.publisher;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.Serializable;

@SuppressWarnings("all")
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {

        private String username;
        private String userpwd;

}

创建一个控制器类,使用@RestController注解的类表示这个类是一个控制器,它可以处理HTTP请求。

这个控制器类的名字是TestController,它的主要作用是处理HTTP请求,并通过RabbitMQ发送消息。

TestController

package com.cloudjun.publisher;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author CloudJun
 */
@RestController
public class TestController {

    @Autowired
    private AmqpTemplate template;
    @Autowired
    private ObjectMapper objectMapper;

    @RequestMapping("test01")
    public String test01(){
        // 发送消息到名为messageQueue的队列
        // 这里的messageQueue是RabbitMQ中定义的队列名称
        // 这里的"Hello World!"是发送的消息内容
        template.convertAndSend("messageQueue", "HelloWorld!");
        return "💖";
    }

    @RequestMapping("test02")
    public String test02() throws Exception {
        // 发送消息到名为messageQueue的队列
        // 这里的messageQueue是RabbitMQ中定义的队列名称
        User user = new User("Jun", "123456");
        // 序列化对象转换为JSON字符串
        String json = objectMapper.writeValueAsString(user);
        template.convertAndSend("messageUser", json);
        return "💖";
    }

}

类及代码说明:

在这个类中,使用了@Autowired注解来自动注入AmqpTemplateObjectMapper对象。AmqpTemplate是Spring提供的一个操作RabbitMQ的工具,可以用来发送和接收消息。ObjectMapper是Jackson库提供的一个工具,可以用来将对象转换为JSON字符串,或者将JSON字符串转换为对象。

  1. test01方法发送了一个字符串"Hello World!"到名为messageQueue的队列。
  2. test02方法创建了一个User对象,然后使用ObjectMapper将这个对象转换为JSON字符串,然后发送这个JSON字符串到名为messageUser的队列。

4. 消费者代码

创建实体对象来作为接收生产者的信息内容:

User:

package com.cloudjun.consumer;

import lombok.*;

import java.io.Serializable;

@SuppressWarnings("all")
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User implements Serializable {

        private String username;
        private String userpwd;

}

创建消息消费者类,使用@Component注解的类表示这个类是一个组件,它会被Spring管理。

这个消息消费者类的名字是Receiver(名称可以直接修改),它的主要作用是接收并处理RabbitMQ的消息

Receiver:

package com.cloudjun.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "messageQueue")
public class Receiver {

        @RabbitHandler
        public void messageMsg(String msg) {
            log.warn("接收到:" + msg);
        }

}

类及代码说明:

在这个类中,使用了@Slf4j注解来启用日志,使用了@RabbitListener注解来监听名为messageQueue的队列,这个队列是在前面的RabbitConfig配置类中定义的。

这个类定义了一个名为process的方法,这个方法使用了@RabbitHandler注解,表示这个方法是处理消息的方法。当messageQueue队列中有新的消息时,这个方法会被调用,方法的参数msg就是接收到的消息。

process方法中,使用了log.warn来打印接收到的消息,这样我们就可以在日志中看到接收到的消息。

总的来说,这个类的作用是接收并处理RabbitMQ的消息,并将接收到的消息打印在日志中。

在创建一个消息消费者类,这个消息消费者类的名字是PojoReceiver,它的主要作用是接收并处理RabbitMQ的消息

PojoReceiver:

package com.cloudjun.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "messageUser")
public class PojoReceiver {

        @Autowired
        private ObjectMapper objectMapper;

        @RabbitHandler
        public void messageUser(String json) throws Exception {
            User user = objectMapper.readValue(json, User.class);
            // 处理user对象
            log.warn("接收到:" + user);
        }

}

类及代码说明:

这个类定义了一个名为process的方法,这个方法使用了@RabbitHandler注解,表示这个方法是处理消息的方法。当messageUser队列中有新的消息时,这个方法会被调用,方法的参数json就是接收到的消息。

不同于前面的Receiver类,这个类在接收到消息后,会使用ObjectMapper将消息从JSON字符串转换为User对象。这样,我们就可以在处理消息时,直接操作User对象,而不需要手动解析JSON字符串。

process方法中,使用了log.warn来打印接收到的User对象,这样我们就可以在日志中看到接收到的消息。

总的来说,这个类的作用是接收并处理RabbitMQ的消息,并将接收到的消息从JSON字符串转换为User对象,然后将User对象打印在日志中。 

四、测试

启动两个项目,在浏览器中访问生产者的配置路径,并且在消费者中看看是否可以查看到,生产者转递过来的信息。

方法一:

方法二:

每篇一获

学习RabbitMQ的基本使用后,你可以从以下几个方面受益:

  1. 可靠性:通过使用RabbitMQ消息队列技术,可以确保消息的可靠性,即使在消息处理过程中出现故障,也可以确保消息不会丢失。

  2. 异步处理:使用RabbitMQ可以实现异步处理,将消息发送到队列中,然后再异步处理它们。这样可以加速应用程序的响应时间,提高系统的吞吐量。

  3. 解耦合:使用RabbitMQ可以实现应用程序之间的解耦合,例如一个应用程序可以发送消息到一个队列中,而另一个应用程序可以从该队列中接收并处理消息。这样可以降低应用程序之间的依赖性,提高系统的可维护性和可扩展性。

  4. 伸缩性:使用RabbitMQ可以轻松地水平扩展消息处理能力,通过添加更多的消费者来实现更高的吞吐量。

  5. 可视化管理:RabbitMQ提供了一个易于使用的Web管理界面,可以监控和管理RabbitMQ服务器,包括队列、交换机、绑定等等。

总之,学习RabbitMQ可以帮助你更好地理解消息队列的概念和实现方式,并且可以应用到实际项目中,提高应用程序的可靠性、响应时间和可维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/334314.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【RocketMQ每日一问】RocketMQ nameserver的作用是什么?

Name Server 在 Apache RocketMQ 集群中扮演着以下几个重要作用: 服务注册与发现: Name Server 负责管理和协调整个集群,维护集群中所有 Broker 的信息,包括 Broker 的 IP 地址、端口号、存储容量等。当 Producer 和 Consumer 需…

内存分析CE寻找天龙八部人物状态及基址

扫描类型为未知的数值首次扫描 通过改变角色状态 扫描类型变动的数值和未变动的数值扫描地址 选择3FCBD25C为人物状态地址 0站立 2走路 6打坐 7打怪 找基址 鼠标右键找出是什么访问了这个地址 查看第一个的详细信息 与02 和 00 进行判断(走路和站立&#…

Architecture Lab:part A 【实现sum_list/rsum_list/copy_block/熟悉Y86-64指令】

Architecture Lab 对应CS:APP的Chap 4——处理器体系结构。Part A要实现三个函数,分别为sum_list/rsum_list/copy_block。建议先得到x86-64指令,然后再转换为Y86-64指令。 准备工作 在misc目录下,键入以下命令用来生成汇编代码。命令执行完…

Linux快速部署文件服务器

参考文档: Linux命令之nohup详解 - 掘金 【Linux】ps -ef|grep详解-CSDN博客 有个简单想法,我的一些文件放在机器某个目录下面,可以简单提供团队内部人员浏览和下载功能,节约时间,用最简单方法实现。 注:…

MyBatisPlus学习笔记五-插件功能

0、插件功能 MyBatisPlus提供的内置拦截器有下面这些 1、分页插件 2、通用分页实体 3、通用分页实体-强化 需求: 在PageQuery中定义方法,将PageQuery对象转为MyBatisPlus中的Page对象在PageDTO中定义方法,将MyBatisPlus中的Page结果转为Page…

mysql原理--事务的隔离级别与 MVCC

1.事前准备 为了故事的顺利发展,我们需要创建一个表: CREATE TABLE hero (number INT,name VARCHAR(100),country varchar(100),PRIMARY KEY (number) ) EngineInnoDB CHARSETutf8;然后向这个表里插入一条数据:INSERT INTO hero VALUES(1, 刘…

想做一名严肃的伦敦金投资者?那请做好以下这两个准备

在伦敦金市场中,如果投资者想成为一名脚踏实地的投资者,首先要在心态上、思想上对自己进行改造,起码接受自己是严肃投资者的身份,然后再完成下面我们提出的这两种准备。 选择一种自己喜欢的交易策略。既然要成为一名严肃的投资者&…

栈、队列专题

文章目录 栈栈的概述栈的实现栈在函数调用中的应用栈在表达式求值中的应用逆波兰表达式求值 栈在括号匹配中的应用有效的括号最长的有效括号删除字符串中的所有相邻重复项 如何获取栈内最小元素呢如何实现浏览器的前进和后退 队列队列的定义队列的实现循环队列队列的应用队列在…

Pytorch实战——3、数据加载与处理

🍅 写在前面 👨‍🎓 博主介绍:大家好,这里是hyk写算法了吗,一枚致力于学习算法和人工智能领域的小菜鸟。 🔎个人主页:主页链接(欢迎各位大佬光临指导) ⭐️近…

【音视频原理】图像相关概念 ③ ( RGB 色彩简介 | RGB 排列 | YUV 色彩简介 | YUV 编码好处 )

文章目录 一、RGB 色彩1、RGB 色彩简介2、RGB 排列 二、YUV 色彩1、YUV 色彩简介2、YUV 编码好处 一、RGB 色彩 1、RGB 色彩简介 RGB 是 计算机 中的 颜色编码方法 , 红 ( R ) / 绿 ( G ) / 蓝 ( B ) 三个颜色通道 可以设置不同的值 , 每个 通道 的 颜色值都可以取值 0 ~ 255 ,…

Python智能挖掘数据新秘器

大家好,本次分享一款在数据探索中表现出色的工具—Python Lux ,通过自动化可视化和数据分析过程,使得数据探索变得更加快捷方便。 Lux的使用方法非常简单,只需在Jupyter notebook中输入dataframe,Lux就会智能推荐一组基…

Java项目:10 Springboot的电商书城管理系统

作者主页:舒克日记 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 该系统分为前台展示和后台管理两大模块,前台主要是为消费者服务。该子系统实现了注册,登录,以及从浏览、下单到支付…

第三讲_ArkTS的初识

ArkTS的初识 1. ArkTS的基本组成2. ArkTS自定义组件 1. ArkTS的基本组成 装饰器: 用于装饰类、结构、方法以及变量,并赋予其特殊的含义。自定义组件:可复用的UI单元,可组合其他组件,图示中Component装饰的struct Hello…

Halcon 一维测量

文章目录 算子矩形算子弧形算子移动到新的参考点 Halcon 案例测量保险丝的宽度(边缘对测量)使用助手进行测量 halcon 案例获取芯片引脚的个数平均宽度距离,连续两个边缘的距离(measure_pos )halcon 定位测量Halcon 测量…

C#,水仙花数(Narcissistic number)的计算方法及源代码

一、水仙花数(Narcissistic number) 水仙花数(Narcissistic number)也被称为:超完全数字不变数(pluperfect digital invariant, PPDI)、自恋数、自幂数、阿姆斯壮数或阿姆斯特朗数(A…

B站提示:“当前浏览器版本较低……”可行的解决方案(edge浏览器)

文章目录 问题研究和分析使用User-Agent Switcher for Chrome插件的解决方法使用userAgent switcher的解决方法 问题研究和分析 问题:使用最新版浏览器访问B站,首页总是有一条横幅提示:当前浏览器版本较低,为保证您的使用体验&am…

vscode设置terminal的最大行数

今天跑代码出现一个问题,就是整个程序跑完,整个程序的输出信息过多,最开始输出的信息已经被vscode的缓存冲掉了,只能看到最后的一部分,具体的原因是vscode的terminal默认只能保存1000行的信息,所以如果想保…

智慧仓储物流远程监控方案分析

智慧仓储物流远程监控方案分析 随着物联网、大数据、云计算等技术的快速发展,智慧仓储物流逐渐成为现代物流发展的重要方向。远程监控作为智慧仓储物流的重要组成部分,可以有效提高仓储物流的效率、准确性和安全性。 一、智慧仓储物流远程监控方案概述 …

llvm pass

pass们组合在一起,处理IR 而最后的目标代码生成阶段,会生成另一种MIR(Machine IR) PassManager管理这些pass pass处理IR之后会改变分析的情况,这些关于IR的信息由 AnalysisManager处理 1、pass (1&…