RabbitMQ的安装与使用

RabbitMQ的安装与使用

  • 介绍
  • 一、RabbitMQ的安装
    • 1 查找镜像
    • 2 拉取镜像
    • 3 查看镜像
    • 4 创建容器
    • 5 查看容器
    • 6 访问测试
  • 二、RabbitMQ的使用
    • 1 创建项目
    • 2 配置文件
    • 3 队列配置文件
    • 4 消费者
    • 5 生产者
    • 6 测试
  • 三、交换器
  • 四、普通队列Demo
  • 五、死信队列Demo
    • 1 介绍
    • 2 示例
      • 2.1 配置
      • 2.2 生产者
      • 2.3 消费者
      • 2.4 死信消费者
      • 2.5 结果
  • 六、延时队列Demo
    • 1 安装延迟插件
      • 1.1 下载插件
      • 1.2 将插件拷贝到RabbitMQ容器的插件目录
      • 1.3 进入到容器
      • 1.4 开启插件
      • 1.5 查看
    • 2 示例
      • 2.1 配置
      • 2.2 生产者
      • 2.3 消费产者
      • 2.4 结果

介绍

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。
linux系统中安装RabbitMQ比较繁琐,这里使用的是Docker安装。

一、RabbitMQ的安装

1 查找镜像

docker search rabbitmq:management

在这里插入图片描述

2 拉取镜像

docker pull macintoshplus/rabbitmq-management

在这里插入图片描述

3 查看镜像

docker images

在这里插入图片描述

4 创建容器

docker run -d --hostname mzw-rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 c20

命令解读:

  • 运行一个镜像
  • -d 后台守护运行
  • –hostname mzw-rabbitmq 指定主机名称
  • –name 指定容器名称
  • -e RABBITMQ_DEFAULT_USER=admin 指定用户名
  • -e RABBITMQ_DEFAULT_PASS=admin 指定密码
  • -p 15672:15672 -p 5672:5672 端口映射
  • c20 镜像ID 简写
    在这里插入图片描述

5 查看容器

docker ps

在这里插入图片描述

6 访问测试

访问地址:http://192.168.2.xx:15672/
在这里插入图片描述
输入启动容器时设置的用户密码登录
在这里插入图片描述
这就表示RabbitMQ安装成功了

二、RabbitMQ的使用

1 创建项目

创建SpringBoot项目并引入相关依赖
在这里插入图片描述

2 配置文件

# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

# 自定义一个属性,设置队列的名称
mq.queue.name=hello-queue

3 队列配置文件

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

// 添加@Configuration 注解,表示一个注解类
@Configuration
public class QueueConfig {

    @Value("${mq.queue.name}")
    private String queueName;

    /**
     * 初始化短信队列
     * @return
     */
    @Bean
    public Queue delayedSmsQueueInit() {
        return new Queue(queueName);
    }
}

4 消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 创建一个rabbitmq消费者
 */
@Component
public class Receiver {

    // 接受MQ消息 并 处理消息
    @RabbitListener(queues = {"${mq.queue.name}"})
    public void process(String msg){
        // 处理消息
        System.out.println("我是MQ消费者,我接收到的消息是:" + msg );
    }
}

5 生产者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 消息提供者
 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate template;

    @Value("${mq.queue.name}")
    private String queueName;

    // 发送消息
    public void send(String msg){
        // 队列名,消息内容
        template.convertAndSend(queueName,msg);
    }

}

6 测试

  • 发送消息
    @Autowired
    private Sender sender;
    @Test
    void contextLoads() {
        sender.send("你好啊......");
    }
    
  • 接收消息
    在这里插入图片描述

三、交换器

RabbitMQ中有五种主要的交互器分别如下

交换器说明
direct发布与订阅 完全匹配
fanout广播
topic主体,规则匹配
fanout转发
custom自定义

四、普通队列Demo

上边已经演示,这里不重复演示。

五、死信队列Demo

1 介绍

死信队列就是在某种情况下,导致消息无法被正常消费(异常,过期,队列已满等),存放这些未被消费的消息的队列即为死信队列

2 示例

2.1 配置

  • 配置文件
# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin


###死信队列
mq.dlx.exchange=mq_dlx_exchange
mq.dlx.queue=mq_dlx_queue
mq.dlx.routingKey=mq_dlx_key
###备胎交换机
mq.exchange=mq_exchange
mq.queue=mq_queue
mq.routingKey=routing_key
  • 配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {
    /**
     * 普通交换机
     */
    @Value("${mq.exchange}")
    private String mqExchange;

    /**
     * 普通队列
     */
    @Value("${mq.queue}")
    private String mqQueue;

    /**
     * 普通路由key
     */
    @Value("${mq.routingKey}")
    private String mqRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${mq.dlx.exchange}")
    private String dlxExchange;

    /**
     * 死信队列
     */
    @Value("${mq.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${mq.dlx.routingKey}")
    private String dlxRoutingKey;

    /**
     * 声明死信交换机
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }

    /**
     * 声明死信队列
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }

    /**
     * 声明普通业务交换机
     * @return DirectExchange
     */
    @Bean
    public DirectExchange mqExchange() {
        return new DirectExchange(mqExchange);
    }

    /**
     * 声明普通队列
     * @return Queue
     */
    @Bean
    public Queue mqQueue() {
        // 普通队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        //死信交换机
        arguments.put("x-dead-letter-exchange", dlxExchange);
        //死信队列
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(mqQueue, true, false, false, arguments);
    }

    /**
     * 绑定死信队列到死信交换机
     * @return Binding
     */
    @Bean
    public Binding binding(Queue dlxQueue,DirectExchange dlxExchange) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
    }


    /**
     * 绑定普通队列到普通交换机
     * @return Binding
     */
    @Bean
    public Binding mqBinding(Queue mqQueue,DirectExchange mqExchange) {
        return BindingBuilder.bind(mqQueue).to(mqExchange).with(mqRoutingKey);
    }
}

2.2 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;

/**
 * 生产者
 */
@RestController
@Slf4j
public class MQProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 普通交换机
     */
    @Value("${mq.exchange}")
    private String mqExchange;
    /**
     * 普通路由key
     */
    @Value("${mq.routingKey}")
    private String mqRoutingKey;

    @RequestMapping("/sendMsg")
    public String sendMsg() {
        String msg = "Hello RabbitMQ ......";
        //发送消息  参数一:交换机 参数二:路由键(用来指定发送到哪个队列)
        rabbitTemplate.convertAndSend(mqExchange, mqRoutingKey, msg, message -> {
            // 设置消息过期时间 10秒过期    如果过期时间内还没有被消费 就会发送给死信队列
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
        log.info("生产者发送消息:{}", msg);
        return "success";
    }
}

2.3 消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@Component
@Slf4j
public class MQConsumer {

    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = {"${mq.queue}"})
    public void mqConsumer(String msg) {
        log.info("正常普通消费者消息MSG:{}", msg);
    }
}

2.4 死信消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 死信消费者
 */
@Component
@Slf4j
public class MQDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = {"${mq.dlx.queue}"})
    public void mqConsumer(String msg) {
        log.info("死信队列消费普通消息:msg{}", msg);
    }

}

2.5 结果

访问:http://127.0.0.1:9023/sendMsg 会被 消费者 消费掉
消费者 代码注释掉,在访问http://127.0.0.1:9023/sendMsg,等待10秒钟后会被死信队列接收到。
在这里插入图片描述

六、延时队列Demo

  • 两种方式:
    • 第一种:使用死信队列,将消息放入一个没有被监听的队列上,设置TTL(一条消息的最大存活时间)为延迟的时间,时间到了没有被消费,直接成为死信。监听死信队列来进行操作。
    • 第二种:使用rabbitmq官方提供的delayed插件来真正实现延迟队列。本文对第二种进行详解

1 安装延迟插件

官网下载:https://www.rabbitmq.com/community-plugins.html
我的RabbitMQ是3.12 b版本的,下载此插件
在这里插入图片描述

1.1 下载插件

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

在这里插入图片描述

1.2 将插件拷贝到RabbitMQ容器的插件目录

docker cp ./rabbitmq_delayed_message_exchange-3.12.0.ez de24369edeb4:/plugins

在这里插入图片描述

1.3 进入到容器

docker exec -it de24369edeb4 /bin/bash

1.4 开启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

1.5 查看

rabbitmq-plugins list

E* 或 e* 代表 插件已启用
在这里插入图片描述
在RabbitMQ控制台可以看到
在这里插入图片描述

2 示例

2.1 配置

  • 配置文件
# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

# 自定义一个属性,设置队列的名称
mq.queue.name=hello-queue
  • 配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 使用x-delayed-message 延时队列插件
 */
@Configuration
public class QueueConfig {

    @Value("${mq.queue.name}")
    private String queueName;

    /**
     * 初始化短信队列
     * @return
     */
    @Bean
    public Queue delayedSmsQueueInit() {
        return new Queue(queueName);
    }

    /**
     * 初始化延迟交换机
     * @return
     */
    @Bean
    public CustomExchange delayedExchangeInit() {
        Map<String, Object> args = new HashMap<>();
        // 设置类型,可以为fanout、direct、topic
        args.put("x-delayed-type", "direct");
        // 第一个参数是延迟交换机名字,第二个是交换机类型,第三个设置持久化,第四个设置自动删除,第五个放参数
        return new CustomExchange("delayed_exchange","x-delayed-message", true,false,args);
    }

    /**
     * 短信队列绑定到交换机
     * @param delayedSmsQueueInit
     * @param customExchange
     * @return
     */
    @Bean
    public Binding delayedBindingSmsQueue(Queue delayedSmsQueueInit, CustomExchange customExchange) {
        // 延迟队列绑定延迟交换机并设置RoutingKey为sms
        return BindingBuilder.bind(delayedSmsQueueInit).to(customExchange).with("sms").noargs();
    }
}

2.2 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * 生产者
 */
@RestController
@Slf4j
public class Sender {

    @Autowired
    private AmqpTemplate template;

    @Value("${mq.queue.name}")
    private String queueName;

    // 发送消息
    @RequestMapping("/sendMsg")
    public void send(){
        String msg = "Hello RabbitMQ ......";
        // 队列名,消息内容
        template.convertAndSend(queueName,msg);
        log.info("生产者发送消息:{}", msg);
    }

    @RequestMapping("/sendDelayedMsg")
    public void sendDelayedMsg(){
        String msg = "Hello RabbitMQ Delayed ......";
        // 第一个参数是延迟交换机名称,第二个是Routingkey,第三个是消息主题,第四个是X,并设置延迟时间,单位		是毫秒
        template.convertAndSend("delayed_exchange","sms",msg,a -> {
            a.getMessageProperties().setDelay(2000);
            return a;
        });
        log.info("生产者发送延时消息:{}", msg);
    }
}

2.3 消费产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 消费者
 */
@Component
@Slf4j
public class Receiver {

    // 接受MQ消息 并 处理消息
    @RabbitListener(queues = {"${mq.queue.name}"})
    public void process(String msg){
        // 处理消息
        log.info("我是MQ消费者,我接收到的消息是:{}", msg);
    }
}

2.4 结果

访问:http://127.0.0.1:9022/sendMsg
访问:http://127.0.0.1:9022/sendDelayedMsg
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/396599.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

初创品牌如何写好宣传文案?媒介盒子分享

初创品牌在起步阶段想要做好宣传&#xff0c;文案是重中之重&#xff0c;然后许多初创品牌在宣传阶段不知道如何写好文案&#xff0c;写文案的时候容易出现模糊不清、定位不明确等问题&#xff0c;今天媒介盒子就来和大家聊聊&#xff1a;初创品牌如何写好宣传文案。 一、 文案…

php实现讯飞星火大模型3.5

前期准备 vscode下载安装好 composer下载安装好 php环境安装好 &#xff08;以上可以自行网上查阅资料&#xff09; 开始实现 1.注册讯飞星火用户&#xff0c;获取token使用 讯飞星火认知大模型-AI大语言模型-星火大模型-科大讯飞 2.修改对应php文件中的key等 可以参考…

Gitlab操作流程

阶段1-构建账户信息 1.1 管理员分配账户 方式1-推荐 企业正常使用gitlab时&#xff0c;一般由项目经理(超级管理员)手动创建开发者账户信息&#xff0c;然后将账户发送给开发者&#xff0c;以便登录使用&#xff1b; 流程如下&#xff1a; 点击创建用户按钮&#xff1b; 创…

从零开始学习Netty - 学习笔记 - NIO基础 - ByteBuffer: 简介和基本操作

NIO基础 1.三大组件 1.1. Channel & Buffer Channel 在Java NIO&#xff08;New I/O&#xff09;中&#xff0c;“Channel”&#xff08;通道&#xff09;是一个重要的概念&#xff0c;用于在非阻塞I/O操作中进行数据的传输。Java NIO提供了一种更为灵活和高效的I/O处理方…

vue 使用Html2Canvas对元素截图 下载

介绍 官网&#xff1a;https://html2canvas.hertzen.com/ 一款轻量化的网页截图工具&#xff0c;可以对元素截图下载&#xff0c;只有几十KB&#xff0c;很强大&#xff0c;例如程序发送错误&#xff0c;就可以调用方法继续截图&#xff0c;或者用户在干什么都可以继续记录。 …

戴尔Dell R740服务器开机冒烟亮黄灯故障维修

今天分享的是一台过保修期的DELL PowerEdge R740服务器开机冒烟的维修案例。先上图&#xff1a; 接到用户报修后工程师立即响应&#xff0c;由于用户也是刚开工第一天服务器开机就出现了这种祥龙吐雾的祥兆&#xff0c;导致工厂业务流程无法正常使用&#xff0c;这台机器在东莞…

TSINGSEE智能分析网关V4的AI算法在消防场景中有哪些应用?

随着科技的不断创新和发展&#xff0c;人工智能已经成为现代社会的重要组成部分。除了在交通、医疗、电力等领域得到了广泛应用外&#xff0c;人工智能在消防领域也有着广泛的应用。AI烟火识别算法作为TSINGSEE青犀视频AI智能分析网关V4的重要组成部分&#xff0c;在城市消防领…

Vuex学习记录

目录 一、Vuex概述 1.1Vuex是什么 1.2使用Vuex统一管理的好处 1.3什么样的数据适合存储在Vuex中 二、Vuex的基本使用 2.1创建Vuex项目 视图式&#xff08;版本&#xff1a;vue3vuex4&#xff09; 命令式&#xff08; 版本&#xff1a;vue2vuex3&#xff09; 可自定义选…

安全架构设计理论与实践

一、考点分布 安全架构概述&#xff08;※※&#xff09;安全模型&#xff08;※※※&#xff09;信息安全整体架构设计网络安全体系架构设计区块链技术&#xff08;※※&#xff09; 二、安全架构概述 被动攻击&#xff1a;收集信息为主&#xff0c;破坏保密性 主动攻击&#…

【数据结构】每天五分钟,快速入门数据结构(一)——数组

目录 一.初始化语法 二.特点 三.数组中的元素默认值 四.时间复杂度 五.Java中的ArrayList类 可变长度数组 1 使用 2 注意事项 3 实现原理 4 ArrayList源码 5 ArrayList方法 一.初始化语法 // 数组动态初始化&#xff08;先定义数组&#xff0c;指定数组长度&#xf…

计算机网络概论和数据通信基础

文章目录 计算机网络概论从物理构成上看&#xff0c;计算机网络包括硬件、软件和协议三大部分计算机网络的功能组成计算机网络的分类网络体系结构分层与体系结构接口、协议和服务数据传送单位OSI模型TCP/IP模型 数据通信基础数字信号调制为模拟信号正交振幅调制QAM 模拟数据编码…

vue框架-vue-cli

vue-cli Vue CLI是一个官方的脚手架工具,用于快速搭建基于Vue.js的项目。Vue CLI提供了一整套可配置的脚手架,可以帮助开发人员快速构建现代化的Web应用程序。 Vue CLI通过提供预先配置好的Webpack模板和插件,使得开发人员可以在不需要手动编写Webpack配置的情况下快速创建…

专145+总420+哈尔滨工业大学803信号与系统和数字逻辑电路考研经验哈工大电子信息与通信,真题,大纲,参考书。

自从高考失利没有考入哈工大&#xff0c;一直带着遗憾&#xff0c;今年初试专业课803信号与系统和数字逻辑电路145&#xff0c;总分420顺利圆满哈工大&#xff0c;了却了一块心病&#xff0c;回看这一年的复习起起落落&#xff0c;心中的那块初心&#xff0c;让我坚持到了上岸&…

Linux中信号机制

信号机制 信号的概念 概念&#xff1a;信号是在软件层次上对中断机制的一种模拟&#xff0c;是一种异步通信方式 所有信号的产生及处理全部都是由内核完成的信号的产生&#xff1a; 1 按键产生 2 系统调用函数产生&#xff08;比如raise&#xff0c; kill&#xff09; 3 硬件…

内存计算研究进展-通用的近数据计算架构

通用的近数据计算架构方面代表性工作有&#xff1a; AMD Research的 TOP-PIM&#xff0c;Carnegie Mellon Univeristy 的 TOM&#xff0c; University of Wisconsin-Madison 的 DRAMA 和 NDA&#xff0c;Seoul National University 的 PEI &#xff0c;IBM Research 的 AMC (ac…

算法-矩阵置零

1、题目来源 73. 矩阵置零 - 力扣&#xff08;LeetCode&#xff09; 2、题目描述 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1…

MySQL错误-this is incompatible with sql_mode=only_full_group_by完美解决方案

项目场景 有时候&#xff0c;遇到数据库重复数据&#xff0c;需要将数据进行分组&#xff0c;并取出其中一条来展示&#xff0c;这时就需要用到group by语句。 但是&#xff0c;如果mysql是高版本&#xff0c;当执行group by时&#xff0c;select的字段不属于group by的字段的…

阿里云幻兽帕鲁Windows 服务器怎么下载存档?

阿里云幻兽帕鲁Windows 服务器怎么下载存档&#xff1f;通过远程连接window服务器桌面的方式。 远程连接到阿里云的 Windows 服务器后&#xff0c;可以将压缩后的存档文件&#xff0c;拖动到 workbench\Download 目录后&#xff0c;就会触发浏览器的文件下载&#xff0c;然后将…

数据结构排序:插入排序、希尔排序、选择排序、冒泡排序、堆排序、快速排序

文章目录 插入排序希尔排序选择排序冒泡排序堆排序快速排序 插入排序 基本思想&#xff1a; 直接插入排序是一种简单的插入排序法&#xff0c;其基本思想是&#xff1a; 把待排序的值按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完…

基于STL的演讲比赛流程管理系统(个人学习笔记黑马学习)

1、演讲比赛程序需求 1.1比赛规则 学校举行一场演讲比赛&#xff0c;共有12个人参加。比赛共两轮&#xff0c;第一轮为淘汰赛&#xff0c;第二轮为决赛。每名选手都有对应的编号&#xff0c;如 10001~10012比赛方式:分组比赛&#xff0c;每组6个人;第一轮分为两个小组&#xff…