微服务——服务异步通讯RabbitMQ

 前置文章

消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序_北岭山脚鼠鼠的博客-CSDN博客

消息队列——rabbitmq的不同工作模式_北岭山脚鼠鼠的博客-CSDN博客

消息队列——spring和springboot整合rabbitmq_北岭山脚鼠鼠的博客-CSDN博客

目录

Work queues 工作队列模式 

案例:

 在生产者端

在消费者端

结果如下

 消费预取限制

 发布订阅模型

 Fanout Exchange(配置文件实现)

案例

消费者代码

生产者代码

 Direct Exchange (注解实现)

 案例

消费者代码

生产者代码

Topic Exchange

案例 

 消费者代码

生产者代码

 消息转换器

生产者代码

JSON方式序列化

生产者代码 (jackson)

 消费者代码(jackson)

 总结


Work queues 工作队列模式 

这里用的不是上面第三篇文章里面的定义配置类的形式。

案例:

 在生产者端

队列要存在才可以上传。不然代码运行不会报错,但是消息也会不知道发到哪里去。

    @Test
    public void testSendMessage2() throws InterruptedException {
        String queue_Name= "simple.queue";
        String message="hello 鼠鼠";

        for(int i=1;i<=50;i++)
        rabbitTemplate.convertAndSend(queue_Name,message+i);
        Thread.sleep(20);
    }

在消费者端

定义了两个消费者监听上面的队列,本来想三个的,但是不知道默认的交换机名字,所以弄了两个。并且根据注解的不同,第一个是可以直接创建一个队列,第二个需要队列已存在才行。

@Component
public class RabbitMQListener {


    //自动创建队列

    @RabbitListener(queuesToDeclare=@Queue("simple.queue"))
    public void ListenerWorkQueue1(Message message) throws InterruptedException {
        System.out.println("11111"+message.getBody());
        Thread.sleep(20);
    }

    //需要在rabbit_mq上手动创建队列,不然会报错
    @RabbitListener(queues="simple.queue")
    public void ListenerWorkQueue2(Message message) throws InterruptedException {
        System.out.println("22222"+message.getBody());
        Thread.sleep(200);
    }

    //3. 自动创建队列,Exchange 与 Queue绑定
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue("simple.queue"),
//            exchange = @Exchange("/")  //绑定默认交换机
//    ))
//    public void ListenerWorkQueue3(Message message) throws InterruptedException {
//        System.out.println("33333"+message.getBody());
//        Thread.sleep(200);
//    }
}

结果如下

两个队列轮流取消息导致反而变慢了。

 消费预取限制

要指定队列才有效果。

 这里就相当于指定了在simple前缀的队列上每次只能获取一条消息。

运行结果如下,大多数都交给了快的队列执行。

 发布订阅模型

 Fanout Exchange(配置文件实现)

消息路由到每个绑定的消息队列。

案例

 

消费者代码

 spring读取到这个Bean之后就会向RabbitMq发请求,创建交换机,绑定队列了。 

@Configuration
public class FanoutConfig {
    //itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fannout.queue1");
    }
    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    //fanout.queue1
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fannout.queue2");
    }
    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

定义两个监听用的方法

@Component
public class RabbitMQListener {
//    @RabbitListener(queues="boot_queue")
//    public void ListenerQueue(Message message){
//        System.out.println(message);
//    }


    //自动创建队列

//    @RabbitListener(queuesToDeclare=@Queue("simple.queue"))
//    public void ListenerWorkQueue1(Message message) throws InterruptedException {
//        System.out.println("11111"+message.getBody()+ LocalDateTime.now());
//        Thread.sleep(20);
//    }
//
//    //需要在rabbit_mq上手动创建队列,不然会报错
//    @RabbitListener(queues="simple.queue")
//    public void ListenerWorkQueue2(Message message) throws InterruptedException {
//        System.out.println("22222"+message.getBody()+ LocalDateTime.now());
//        Thread.sleep(200);
//    }

    //3. 自动创建队列,Exchange 与 Queue绑定
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue("simple.queue"),
//            exchange = @Exchange("/")  //绑定默认交换机
//    ))
//    public void ListenerWorkQueue3(Message message) throws InterruptedException {
//        System.out.println("33333"+message.getBody());
//        Thread.sleep(200);
//    }



    @RabbitListener(queuesToDeclare=@Queue("fanout.queue1"))
    public void ListenerFanoutQueue1(Message message) throws InterruptedException {
        System.out.println("11111"+message.getBody());
    }

    @RabbitListener(queuesToDeclare=@Queue("fanout.queue2"))
    public void ListenerFanoutQueue2(Message message) throws InterruptedException {
        System.out.println("22222"+message.getBody());
    }
}

生产者代码

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    //1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

//    @Test
//    public void testSend(){
//        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"boot.haha","hello 鼠鼠");
//    }

//    @Test
//    public void testSendMessage2() throws InterruptedException {
//        String queue_Name= "simple.queue";
//        String message="hello 鼠鼠";
//
//        for(int i=1;i<=50;i++)
//        rabbitTemplate.convertAndSend(queue_Name,message+i);
//        Thread.sleep(20);
//    }

    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName="itcast.fanout";
        //消息
        String message="hello 鼠鼠";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

 Direct Exchange (注解实现)

 案例

消费者代码


@Component
public class RabbitMQListener {   
     @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name="itcast.direct" , type= ExchangeTypes.DIRECT),
            key={"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name="itcast.direct" , type= ExchangeTypes.DIRECT),
            key={"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到:"+msg);
    }
}

生产者代码

    @Test
    public void testSendDirectExchange(){
        //交换机名称
        String exchangeName="itcast.direct";
        //消息
        String message="hello 鼠鼠";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"blue",message);
    }

此条代码只有绑定了blue这个key的队列才可以收到。

换成red就是两个队列都可以收到了。

Topic Exchange

案例 

 

 消费者代码

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name="topic.queue1"),
            exchange=@Exchange(name="itcast.topic",type = ExchangeTypes.TOPIC),
            key="japan.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name="topic.queue2"),
            exchange=@Exchange(name="itcast.topic",type = ExchangeTypes.TOPIC),
            key="#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到:"+msg);
    }

生产者代码

    @Test
    public void testSendTopicExchange(){
        //交换机名称
        String exchangeName="itcast.topic";
        //消息
        String message="北岭山脚鼠鼠横死街头,究竟是人性的沦丧还是道德的....";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"japan.news",message);
    }

两个都符合,所以都能收到。

 消息转换器

定义一个队列

    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }

生产者代码

    @Test
    public void testSendObjectQueue(){
        //消息
        Map<String,Object> msg=new HashMap<>();
        msg.put("name","北岭山脚鼠鼠");
        msg.put("age","22");
        //发送消息
        rabbitTemplate.convertAndSend("object.queue",msg);
    }

可以看见消息被转换成了一长串字符,content_type写着java的序列化。

效率差,安全性也差。 

JSON方式序列化

声明好MessageConveter之后就可以自动覆盖默认序列化方式了。

导入一个核心依赖

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

生产者代码 (jackson)

 修改生产者的启动类代码,加上一个Bean

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

启动测试类之后可以看见新的消息出现了。

 消费者代码(jackson)

 然后可以正常接受到消息

如果消费者不使用对应jackson解析的话,代码会报错

 总结

推荐使用jackson的方式 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/51513.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JS——输入输出语法数组的操作

JavaScript输入输出语法 目标&#xff1a;能写出常见的JavaScript输入输出语法 输出语法 语法1&#xff1a; document.write(要输出的内容)作用&#xff1a; 向body内输出内容 注意&#xff1a; 如果输出的内容写的是标签&#xff0c;也会被解析成网页元素 语法2&#xff1a…

Verilog语法学习——LV9_使用子模块实现三输入数的大小比较

LV9_使用子模块实现三输入数的大小比较 题目来源于牛客网 [牛客网在线编程_Verilog篇_Verilog快速入门 (nowcoder.com)](https://www.nowcoder.com/exam/oj?page1&tabVerilog篇&topicId301) 题目 描述 在数字芯片设计中&#xff0c;通常把完成特定功能且相对独立的…

特殊矩阵的压缩存储

1 数组的存储结构 1.1 一维数组 各数组元素大小相同&#xff0c;且物理上连续存放。第i个元素的地址位置是&#xff1a;a[i] LOC i*sizeof(ElemType) (LOC为起始地址) 1.2 二维数组 对于多维数组有行优先、列优先的存储方法 行优先&#xff1a;先行后列&#xff0c;先存储…

无涯教程-jQuery - Select menu组件函数

小部件选择菜单功能可与JqueryUI中的小部件一起使用&#xff0c;它提供了可替换样式的选择元素。一个简单的选择菜单如下所示。 Select menu - 语法 $( "#menu" ).selectmenu(); Select menu - 示例 以下是显示选择菜单用法的简单示例- <!doctype html> &…

自动驾驶感知系统-全球卫星定位系统

卫星定位系统 车辆定位是让无人驾驶汽车获取自身确切位置的技术&#xff0c;在自动驾驶技术中定位担负着相当重要的职责。车辆自身定位信息获取的方式多样&#xff0c;涉及多种传感器类型与相关技术。自动驾驶汽车能够持续安全可靠运行的一个关键前提是车辆的定位系统必须实时…

Go语言进阶 + 依赖管理

依赖配置 - version开始&#xff0c;就开始很难听懂了&#xff0c;需要结合很多课后配套资料查阅很多文档和网站....然而好像没有那么多时间&#xff0c;一天给3小时学Go真的顶天了.....还有算法和Linux的Mysql... 这几天学Go已经把算法给挤掉了.....下步要权衡一下&#xff0c…

Centos7中实现脚本使用mysqldump实现分库分表备份

脚本 #!/bash/bin userroot #用户名 password123456 #密码 back_path/backup/db databases_file/backup/databases.list [ -f $databases_file ] || touch /backup/databases.list if [[ ! -s ${databases_file} ]] then while read line do[ -d ${back_path}/$line ] …

ERROR in unable to locate ‘***/public/**/*‘ glob

前提 自己搭了一个react项目的脚手架&#xff0c;npm包下载一切都很正常&#xff0c;启动的时候突然就报ERROR in unable to locate ***/public/**/* glob这个错误&#xff0c;根据百度分析了一下产生的原因&#xff1a;webpack配置文件中的CopyWebpackPlugin导致的 网上给出的…

C语言指针应该这么学?

数组名的意义&#xff1a; 1. sizeof(数组名)&#xff0c;这里的数组名表示整个数组&#xff0c;计算的是整个数组的大小。 2. &数组名&#xff0c;这里的数组名表示整个数组&#xff0c;取出的是整个数组的地址。 3. 除此之外所有的数组名都表示首元素的地址。 根据以上数…

用asp.net开发h5网页版视频播放网站,类似优酷,jellyfin,emby

之前用jellyfin开源软件搞了一个视频播放服务器,用来共享给家里人看电影和电视剧,jellyfin虽然各方面功能都很强大,但是界面和使用习惯都很不适合,于是就想着利用下班休息时间做一套自己喜欢的视频网站出来. 本来是打算直接用jellyfin的源码进行修改,源码是用C# netcore 写的服…

【C++进阶:哈希--unordered系列的容器及封装】

本课涉及到的所有代码都见以下链接&#xff0c;欢迎参考指正&#xff01; practice: 课程代码练习 - Gitee.comhttps://gitee.com/ace-zhe/practice/tree/master/Hash unordered系列关联式容器 在C98中&#xff0c;STL提供了底层为红黑树结构的一系列关联式容器&#xff0c;在…

使用Pytest生成HTML测试报告

背景 最近开发有关业务场景的功能时&#xff0c;涉及的API接口比较多&#xff0c;需要自己模拟多个业务场景的自动化测试&#xff08;暂时不涉及性能测试&#xff09;&#xff0c;并且在每次测试完后能够生成一份测试报告。 考虑到日常使用Python自带的UnitTest&#xff0c;所…

观察者模式与观察者模式实例EventBus

什么是观察者模式 顾名思义&#xff0c;观察者模式就是在多个对象之间&#xff0c;定义一个一对多的依赖&#xff0c;当一个对象状态改变时&#xff0c;所有依赖这个对象的对象都会自动收到通知。 观察者模式也称为发布订阅模式(Publish-Subscribe Design Pattern)&#xff0…

ViT-vision transformer

ViT-vision transformer 介绍 Transformer最早是在NLP领域提出的&#xff0c;受此启发&#xff0c;Google将其用于图像&#xff0c;并对分类流程作尽量少的修改。 起源&#xff1a;从机器翻译的角度来看&#xff0c;一个句子想要翻译好&#xff0c;必须考虑上下文的信息&…

【Linux后端服务器开发】MAC地址与其他重要协议

目录 一、以太网 二、MAC地址 三、MTU 四、ARP协议 五、DNS系统 六、ICMP协议 七、NAT技术 八、代理服务器 一、以太网 “以太网”不是一种具体的网路&#xff0c;而是一种技术标准&#xff1a;既包含了数据链路层的内容&#xff0c;也包含了一些物理层的内容&#xf…

k8s容器日志收集方案

背景 由于以下容器本身特性和现有日志采集工具的缺陷&#xff0c;开发者在收集Kubernetes分布式集群日志时常常遇到困扰&#xff1a; 容器本身特性&#xff1a; 采集目标多&#xff1a;容器本身的特性导致采集目标多&#xff0c;需要采集容器内日志、容器stdout。对于容器…

实验六 调度器-实验部分

目录 一、知识点 1.进程调度器设计的目标 1.1.进程的生命周期 1.2.用户进程创建与内核进程创建 1.3.进程调度器的设计目标 2.ucore 调度器框架 2.1.调度初始化 2.2.调度过程 2.2.1.调度整体流程 2.2.2.设计考虑要点 2.2.3.数据结构 2.2.4.调度框架应与调度算法无关…

Zabbix分布式监控快速入门

目录 1 Zabbix简介1.1 软件架构1.2 版本选择1.3 功能特性 2 安装与部署2.1 时间同步需求2.2 下载仓库官方源2.3 Zabbix-Server服务端的安装2.3.1 安装MySQL2.3.1.1 创建Zabbix数据库2.3.1.2 导入Zabbix库的数据文件 2.3.2 配置zabbix_server.conf2.3.3 开启Zabbix-Server服务2.…

ElementUI Select选择器如何根据value值显示对应的label

修改前效果如图所示&#xff0c;数据值状态应显示为可用&#xff0c;但实际上仅显示了状态码1&#xff0c;并没有显示其对应的状态信息。在排查了数据类型对应关系问题后&#xff0c;并没有产生实质性影响&#xff0c;只好对代码进行了如下修改。 修改前代码&#xff1a; <…

微服务划分的原则

微服务的划分 微服务的划分要保证的原则 单一职责原则 1、耦合性也称块间联系。指软件系统结构中各模块间相互联系紧密程度的一种度量。模块之间联系越紧密&#xff0c;其耦合性就越强&#xff0c;模块的独立性则越差。模块间耦合高低取决于模块间接口的复杂性、调用的方式及…
最新文章