Spring Boot集成RabbitMQ-之6大模式总结

A.集成
一:添加依赖
在pom.xml文件中添加spring-boot-starter-amqp依赖,以便使用Spring Boot提供的RabbitMQ支持:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二:配置RabbitMQ连接信息

  rabbitmq:
    host: 13X.9.1XX.7X
    port: 5672 #通过控制台可以查看    记得开启这个端口的防护
    username: admin
    password: admin

三:创建队列

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue() {
        //name,名字;durable,是否开启持久化
        return new Queue("logs",false);
    }
}

启动就可以得到下队列
在这里插入图片描述

四:创建控制类来生产数据


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RabbitMQController {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQController.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("aaa")
    public void simpleTest() {
        logger.info("RabbitMQController开始!");
        rabbitTemplate.convertAndSend("logs","hello world!");
        logger.info("RabbitMQController结束!");
    }
}


因为只创建了生产,消费者没有创建,所以在RabbitMQ客户端可以查看,然后点击,消费可得数据
在这里插入图片描述

五:创建消费者,获取数据

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumeBean {
    private static final Logger logger = LoggerFactory.getLogger(ConsumeBean.class);
    @RabbitListener(queues={"logs"})
    public void getMsg(String message){
        logger.info("消费者:{}",message);
    }
}

这样就可以看出,消息自动就被接收,消费掉了
在这里插入图片描述

B.消息传递的开放标准协议(AMQP)

AMQP(Advanced Message Queuing Protocol)它定义了一种抽象的消息传递模型,包括以下几个主要组件:

消息(Message):AMQP中的基本单位,是要在消息队列系统中传递的数据。消息通常包括消息体和消息头,消息体是实际要传递的数据,而消息头包含元数据信息,如消息的路由键、优先级等。

生产者(Producer):负责创建并发送消息到消息队列中的实体。生产者将消息发布到交换机(Exchange),交换机根据路由规则将消息路由到一个或多个队列中。

消费者(Consumer):从消息队列中接收并处理消息的实体。消费者订阅一个或多个队列,并在有消息到达时接收并处理它们。

交换机(Exchange):用于接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列中。AMQP定义了不同类型的交换机,如直连交换机(Direct Exchange)、主题交换机(Topic Exchange)、扇出交换机(Fanout Exchange)等。

队列(Queue):存储消息的容器,消费者从队列中获取消息进行处理。消息可以被一个或多个消费者订阅,但每条消息只会被一个消费者接收。

绑定(Binding):用于将交换机和队列之间建立关联关系的规则。绑定定义了消息如何从交换机路由到队列,通常包括交换机名称、路由键等信息。

连接(Connection):生产者和消费者与消息代理(如RabbitMQ)之间建立的网络连接。连接是长期的、持久的,用于传输消息和管理通信。

通过这些抽象组件,AMQP定义了一个灵活且可扩展的消息传递模型,使得不同的消息队列系统可以遵循相同的协议进行通信和交互。这种抽象模型使得开发者可以更容易地实现消息传递系统,并实现消息的可靠传递和处理

六大模式
1.简单队列 一个生产者一个队列一个消费者
2.工作队列 一个生产者一个队列多个消费者
3.订阅模式 一个生产者一个交换机 多个队列多个消费者(对与消一对一)
4.路由模式 一个生产者一个交换机 分类进入队列 多个队列多个消费者(对与消一对一)
5.主题模式(通配符模式) 一个生产者一个交换机 通配符分类进入队列 多个队列多个消费者(对与消一对一)
6.RPC 是一种实现远程过程调用的方式,允许客户端应用程序调用远程服务器上的服务,并等待服务端返回结果。

1.简单队列

创建生产者(Producer):
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("queueName", message);
    }
}
//创建消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {

    @RabbitListener(queues = "queueName")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

//队列配置
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue1() {
        return new Queue("queueName");
    }
}

2.工作队列

//队列配置
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig2{

    @Bean
    public Queue taskQueue() {
        return new Queue("taskQueue");
    }
}
//创建生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendTask(String task) {
        rabbitTemplate.convertAndSend("taskQueue", task);
    }
}


//创建消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TaskConsumer {

    @RabbitListener(queues = "taskQueue")
    public void processTask(String task) {
        System.out.println("Processing task: " + task);
        // Simulate task processing
        try {
            Thread.sleep(1000); // Simulate task processing time
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("Task processed: " + task);
    }
}

3.订阅模式

//创建生产者(Producer)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer3 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("fanoutExchange", "", message);
    }
}

//创建消费者(Consumer)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumerA {

    @RabbitListener(queues = "queueFanout1")
    public void receiveMessage(String message) {
        System.out.println("Consumer 1 received message: " + message);
    }
}
@Component
public class MessageConsumerB {

    @RabbitListener(queues = "queueFanout2")
    public void receiveMessage(String message) {
        System.out.println("Consumer 2 received message: " + message);
    }
}


//配置交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig3 {

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    public Queue queueFanout1() {
        return new Queue("queueFanout1");
    }

    @Bean
    public Queue queueFanout2() {
        return new Queue("queueFanout2");
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queueFanout1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queueFanout2()).to(fanoutExchange());
    }
}

4.路由模式

//创建生产者(Producer)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer4 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message, String routingKey) {
        rabbitTemplate.convertAndSend("directExchange", routingKey, message);
    }
}


//创建消费者(Consumer)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumerly1 {

    @RabbitListener(queues = "queueDirect1")
    public void receiveMessage(String message) {
        System.out.println("Consumer 1 received message: " + message);
    }
}
@Component
public class MessageConsumerly2 {

    @RabbitListener(queues = "queueDirect2")
    public void receiveMessage(String message) {
        System.out.println("Consumer 2 received message: " + message);
    }
}


//配置交换机和队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig4 {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }

    @Bean
    public Queue queueDirect1() {
        return new Queue("queueDirect1");
    }

    @Bean
    public Queue queueDirect2() {
        return new Queue("queueDirect2");
    }

    @Bean
    public Binding bindingDirect1() {
        return BindingBuilder.bind(queueDirect1()).to(directExchange()).with("routingDirectKey1");
    }

    @Bean
    public Binding bindingDirect2() {
        return BindingBuilder.bind(queueDirect2()).to(directExchange()).with("routingDirectKey2");
    }
}

5.主题模式

//创建生产者(Producer)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer5 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message, String routingKey) {
        rabbitTemplate.convertAndSend("topicExchange", routingKey, message);
    }
}

//创建消费者(Consumer)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer5 {

    @RabbitListener(queues = "queueTopic5")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

//配置交换机和队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig5 {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    public Queue queueTopic5() {
        return new Queue("queueTopic5");
    }

    @Bean
    public Binding bindingTopic5() {
        return BindingBuilder.bind(queueTopic5()).to(topicExchange()).with("topic.*");
    }
}

6.RPC模式

//创建RPC客户端(Client)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RpcClient {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public String sendMessageAndReceiveResponse(String message) {
        return (String) rabbitTemplate.convertSendAndReceive("rpcExchange", "rpcQueue", message);
    }
}

//创建RPC服务端(Server)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RpcServer {

    @RabbitListener(queues = "rpcQueue")
    public String processMessage(String message) {
        // Perform some processing based on the message
        return "Processed: " + message;
    }
}

//配置交换机和队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig6 {

    @Bean
    public DirectExchange rpcExchange() {
        return new DirectExchange("rpcExchange");
    }

    @Bean
    public Queue rpcQueue() {
        return new Queue("rpcQueue");
    }

    @Bean
    public Binding rpcBinding() {
        return BindingBuilder.bind(rpcQueue()).to(rpcExchange()).with("rpcQueue");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/599213.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【第三版 系统集成项目管理工程师】第2章 信息技术发展(知识总结)

持续更新。。。。。。。。。。。。。。。 【第2章】 信息技术发展 考情分析2. 1信息技术及其发展2.1.1 计算机软硬件-P501.计算机硬件2.计算机软件-P51 2.1.2计算机网络1.通信基础-P522.网络基础-P534.网络标准协议-P543.网络设备-P535.软件定义网络-P576.第五代移动通信技术-P…

2024抖音小店最新注册流程来了,快快收藏!

大家好&#xff0c;我是电商糖果 2024年想开一家抖音小店&#xff0c;但是不知道具体的开店流程。 不要着急&#xff0c;这篇文章就给大家详细的讲解一下。 首先&#xff0c;准备开店材料&#xff1a;5000左右的类目保证金&#xff0c;电脑&#xff0c;手机号&#xff0c;法…

Dos命令Tree

查看tree的用法 tree /?tree > 文件名&#xff0c;输出文件路径到指定的位置

苹果手机突然黑屏打不开怎么办?多种方法合集

苹果手机突然黑屏打不开怎么办&#xff1f;苹果手机突然开不开机了怎么办&#xff1f;手机打不开机了按键长按没有反应怎么办&#xff1f; 如果您的苹果手机出现这些情况无法正常开机&#xff0c;可以尝试以下办法恢复&#xff1a; 方法1&#xff1a;给手机充电 首先&#xf…

面试集中营—Redis面试题

一、Redis的线程模型 Redis是基于非阻塞的IO复用模型&#xff0c;内部使用文件事件处理器&#xff08;file event handler&#xff09;&#xff0c;这个文件事件处理器是单线程的&#xff0c;所以Redis才叫做单线程的模型&#xff0c;它采用IO多路复用机制同时监听多个socket&a…

Richard 林旅强:说说社区的故事和对 RTE 社区的畅想

各位 RTE 开发者社区的小伙伴们&#xff0c;大家好&#xff1a; 我是 Richard 林旅强&#xff0c;今年起开始担任我们 RTE 社区联合主理人&#xff0c;很荣幸能在这里跟杜金房老师和陈靖老师一起做点事情&#xff0c;为社区的大家服务 &#x1f603; 今天想跟各位分享&#x…

数据结构---动态数组

一、数据结构基本理论 数据结构是相互之间存在一种或多种特定关系的数据元素的集合。强调数据元素之间的关系 算法五个特性&#xff1a; 输入、输出、有穷、确定、可行 数据结构分类&#xff1a; 逻辑结构&#xff1a;集合、线性结构、树形结构、图形结构 物理…

Baidu Comate:你的智能编程伙伴,编程界的AI革命者

文章目录 Baidu Comate 介绍Baidu Comate下载安装Baidu Comate 实操体验代码解释函数注释行间注释调优建议生成单测注释生成实时续写常用快捷方式智能对话问答 Baidu Comate 建议改进Baidu Comate 体验总结 Baidu Comate 介绍 Baidu Comate 智能编码助手 是基于文心大模型&…

【nginx 开发】nginx安装,Nginx介绍

Nginx基础介绍 Nginx反向代理负载均衡动静分离 Nginx的安装NginxNginx常用命令Nginx配置文件 Nginx Nginx是一个高性能的Http和反向代理服务器&#xff0c;特点是占有内存少&#xff0c;并发能力强&#xff0c;Nginx可以作为静态页面的web服务器&#xff0c;Nginx专为性能优化…

4G工业路由器快递柜应用案例(覆盖所有场景)

快递柜展示图 随着电商的蓬勃发展,快递行业迎来高速增长。为提高快递效率、保障快件安全,智能快递柜应运而生。但由于快递柜部署环境复杂多样,网络接入成为一大难题。传统有线宽带难以覆盖所有场景,而公用WiFi不稳定且存在安全隐患。 星创易联科技有限公司针对这一痛点,推出了…

我独自升级崛起在哪下载 我独自升级崛起客户端下载教程

定于5月8日全球盛放的《我独自升级&#xff1a;崛起》——这一激动人心的动作角色扮演游戏巨作&#xff0c;汲取了同名动漫及网络漫画的精髓&#xff0c;誓将以其无与伦比的魅力&#xff0c;引领玩家迈入一个探索深远、规模宏大的奇幻之旅。游戏构筑在一个独一无二的网络武侠世…

JavaScript:正则表达式属于字符串吗-不属于/字符串转正则表达式的两种方法

一、需求描述 js 字符串转正则表达式 二、理解正则表达式属于字符串吗? 正则表达式不属于字符串&#xff0c;它是一种用于匹配、查找和操作文本的模式。正则表达式是一种特殊的语法&#xff0c;用于描述字符串的特征。通过使用正则表达式&#xff0c;可以检查一个字符串是否…

项目计划书(Word原件)

项目开发计划包括项目描述、项目组织、成本预算、人力资源估算、设备资源计划、沟通计划、采购计划、风险计划、项目过程定义及项目的进度安排和里程碑、质量计划、数据管理计划、度量和分析计划、监控计划和培训计划等。 软件资料清单列表部分文档&#xff1a; 工作安排任务书…

如何有效识别限界上下文?

在实施DDD的过程中&#xff0c;识别限界上下文是一大难点&#xff0c;但也并非无章可循。在本文内容中&#xff0c;我们将分别从业务维度、工作维度以及技术维度进行展开&#xff0c;讨论如何有效识别限界上下文的方法和技巧。 从业务维度识别限界上下文 从业务维度识别限界上…

羊大师解析,鲜为人知的羊奶冷知识

羊大师解析&#xff0c;鲜为人知的羊奶冷知识 羊奶的脂肪球更小&#xff1a;相较于牛奶&#xff0c;羊奶中的脂肪球直径更小&#xff0c;这有助于其更快地被人体消化和吸收。 羊奶含有更多的中链脂肪酸&#xff1a;羊奶中含有较多的中链脂肪酸&#xff08;MCT&#xff09;&am…

安装nginx-1.25.5与ngx_http_headers_more_filter_module模块

#下载nginx的代码 curl -O http://nginx.org/download/nginx-1.25.5.tar.gz #下载headers-more-nginx-module代码 git clone https://github.com/openresty/headers-more-nginx-module#解压 tar -xzf nginx-1.25.5.tar.gzcd nginx-1.25.5#--add-dynamic-module 下载下来的目录 …

Al Agent:开启智能化未来的关键角色,让机器更智能的为我们服务

文章目录 &#x1f680;Al Agent是什么&#x1f4d5;Al Agent的工作原理与技术&#x1f4aa;Al Agent应用领域&#x1f680;智能家居应用&#x1f308;医疗健康领域⭐金融服务行业&#x1f302;交通运输管理&#x1f3ac;教育培训应用 &#x1f512;Al Agent优势与挑战✊Al Age…

移动端自适应

基本实现核心思想 基本原则上是&#xff0c;布局更多地使用flex&#xff0c;然后尺寸使用rem&#xff0c;vw&#xff0c;vh为单位如果是根据不同的屏幕需要有不同的布局了&#xff0c;一般通过检测屏幕尺寸换不同的站点或者媒体查询使用css rem 以html字体太小为1rem的大小&…

LM4562NA 直插DIP8双运放 音频hifi运算放大器

LM4562NA是一款高性能音频运算放大器&#xff0c;其应用领域主要集中在音频和声音处理方面&#xff0c;包括但不限于&#xff1a; 1. 专业录音设备&#xff1a;在录音棚、广播电台和电视台等专业环境中&#xff0c;用于信号放大和处理&#xff0c;确保高质量的声音录制和传输…

揭秘数据可视化:五款利器助力决策

在当今这个数据驱动的时代&#xff0c;数据可视化已成为企业决策、数据分析不可或缺的一部分。通过直观、生动的图形、图像&#xff0c;数据可视化能够更快速、更准确地传达信息&#xff0c;帮助企业洞察数据背后的价值。本文将为您介绍几款优秀的数据可视化工具。 一、山海鲸…
最新文章