RabbitMQ-交换机

文章目录

    • 交换机
      • fanout
      • Direct
      • topic
      • Headers
      • RPC

交换机

**交换机 **是消息队列中的一个组件,其作用类似于网络路由器。它负责将我们发送的消息转发到相应的目标,就像快递站将快递发送到对应的站点,或者网络路由器将网络请求转发到相应的服务器或客户端一样。交换机的主要功能是提供转发消息的能力,根据消息的路由规则将消息投递到合适的队列或绑定的消费者。
我们可以理解为,如果说一个快递站已经承受不了那么多的快递了,就建多个快递站。

fanout

扇出,广播
特点:消息会被转发到所有绑定到该交换机的队列
场景:很适用于发布订阅的场景,比如写日志,可以多个系统间共享

示例场景:
image.png
生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class FanoutProducer {
  // 交换机名字
  private static final String EXCHANGE_NAME = "fanout-exchange";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // 创建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
  }
}

消费者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class FanoutConsumer {
  //交换机名字
  private static final String EXCHANGE_NAME = "fanout-exchange";

  public static void main(String[] argv) throws Exception {
    //建立连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    //创建频道
    Channel channel1 = connection.createChannel();
    // 声明交换机
    channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 创建队列1,连接到交换机上
    String queueName = "xiaowang_queue";
    channel1.queueDeclare(queueName, true, false, false, null);
    channel1.queueBind(queueName, EXCHANGE_NAME, "");
    // 创建队列2,连接到交换机上
    String queueName2 = "xiaoli_queue";
    channel1.queueDeclare(queueName2, true, false, false, null);
    channel1.queueBind(queueName2, EXCHANGE_NAME, "");
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    // 创建交付回调函数1
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [小王] Received '" + message + "'");
    };
    // 创建交付回调函数2
    DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
      String message = new String(delivery.getBody(), "UTF-8");
      System.out.println(" [小李] Received '" + message + "'");
    };
    // 开始消费消息队列1
    channel1.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
    // 开始消费消息队列2
    channel1.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
  }
}

Direct

官方教程:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
特点:消息会根据路由键转发到指定的队列
场景:特定的消息只交给特定的系统(程序)来处理

注意:不同队列可以绑定相同的路由键

示例场景:
老板在发送消息同时会带上路由键,根据路由键找对应的队列来发送
image.png
生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class DirectProducer {

  private static final String EXCHANGE_NAME = "direct-exchange";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        //声明交换机是direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //输入消息 和 路由键
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String userInput = scanner.nextLine();
            String[] strings = userInput.split(" ");
            if (strings.length < 1) {
                continue;
            }
            String message = strings[0];
            String routingKey = strings[1];
            //发布消息的时候注意指定路由键
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");
        }

    }
  }
}

消费者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class DirectConsumer {

    private static final String EXCHANGE_NAME = "direct-exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机,不过生成者已经声明过了,消费者声不声明都可以
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 创建队列
        String queueName = "xiaoyu_queue";
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "xiaoyu"); //指定2交换机和路由键

        // 创建队列,随机分配一个队列名称
        String queueName2 = "xiaopi_queue";
        channel.queueDeclare(queueName2, true, false, false, null);
        channel.queueBind(queueName2, EXCHANGE_NAME, "xiaopi");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [xiaoyu] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [xiaopi] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag -> {
        });
        channel.basicConsume(queueName2, true, xiaopiDeliverCallback, consumerTag -> {
        });
    }
}

topic

官方教程:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
特点:消息会根据一个模糊的路由键转发到指定的队列
场景:特定的一类消息可以交给特定的一类系统(程序)来处理
规则:

  1. :匹配一个单词,比如.orange,那么 abc.orange、ikun.orange 都能匹配
  2. #:匹配0个或多个单词,比如orange.#,那么orange,orange.abc.ikun都能匹配


应用场景:
老板要下发一个任务,让多个组来处理
image.png
生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class TopicProducer {

  private static final String EXCHANGE_NAME = "topic-exchange";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String userInput = scanner.nextLine();
            String[] strings = userInput.split(" ");
            if (strings.length < 1) {
                continue;
            }
            String message = strings[0];
            String routingKey = strings[1];

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");
        }
    }
  }
}

消费者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class TopicConsumer {

  private static final String EXCHANGE_NAME = "topic-exchange";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");

      // 创建队列
      String queueName = "frontend_queue";
      channel.queueDeclare(queueName, true, false, false, null);
      channel.queueBind(queueName, EXCHANGE_NAME, "#.前端.#");

      // 创建队列
      String queueName2 = "backend_queue";
      channel.queueDeclare(queueName2, true, false, false, null);
      channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");

      // 创建队列
      String queueName3 = "product_queue";
      channel.queueDeclare(queueName3, true, false, false, null);
      channel.queueBind(queueName3, EXCHANGE_NAME, "#.产品.#");

      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      DeliverCallback xiaoaDeliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), "UTF-8");
          System.out.println(" [xiaoa] Received '" +
                  delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
      };

      DeliverCallback xiaobDeliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), "UTF-8");
          System.out.println(" [xiaob] Received '" +
                  delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
      };

      DeliverCallback xiaocDeliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), "UTF-8");
          System.out.println(" [xiaoc] Received '" +
                  delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
      };

      channel.basicConsume(queueName, true, xiaoaDeliverCallback, consumerTag -> {
      });
      channel.basicConsume(queueName2, true, xiaobDeliverCallback, consumerTag -> {
      });
      channel.basicConsume(queueName3, true, xiaocDeliverCallback, consumerTag -> {
      });
  }
}

这样生产者发消息:前端.后端
image.png
就可以匹配到前端和后端两个队列
image.png

Headers

可以根据headers中的内容来指定发送到哪个队列,由于性能差,比较复杂,一般不推荐使用

RPC

支持用消息队列来模拟RPC的调用,但是一般没必要,直接用 Dubbo、GRPC 等 RPC 框架就好了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/553365.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用prompt_toolkit构建交互式命令行工具

prompt_toolkit是一个python库&#xff0c;用于构建命令行工具和终端应用。其官网介绍如下&#xff0c; prompt_toolkit is a library for building powerful interactive command line and terminal applications in Python. 安装命令如下&#xff0c; pip install prompt_to…

CCIA信息系统业务安全服务资质证书介绍条件要求

CCIA&#xff08;中国通信工业协会&#xff09;证书是由中国通信工业协会颁发的一种专业资质证书&#xff0c;旨在评估和认证信息化建设企业在信息化项目建设中提供的服务能力。该证书不涉及技术和产品标准&#xff0c;而是重点强调企业在行业服务方向、安全服务意识和专业服务…

基于Springboot的心灵治愈交流平台

基于SpringbootVue的心灵治愈交流平台的设计与实现 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringbootMybatis工具&#xff1a;IDEA、Maven、Navicat 系统展示 用户登录 首页展示 系统公告 心理咨询师 心灵专栏 压力测试 小纸条 后台登录界面 后…

CST电磁仿真物体表面的Sheet结构和生成3D Model【基础教程】

由Sheet结构生成3D Model 使用Shell Solid and Thicken Sheet&#xff01; Modeling > Tools > Shape Tools > Shell Solid or Thicken Sheet Shell Solidor ThickenSheet会根据不同类型的模型提供两种完全不同的功能。 如033.由3D Model生成Cavity 所述&#xff…

量子密钥分发系统设计与实现(一):系统基本架构讨论

经过一段时间讨论&#xff0c;我们了解到量子密钥分发设备是当前量子保密通信系统的基础。从本文开始&#xff0c;我将开启量子密钥分发系统设计与实现系列&#xff0c;详细讨论量子密钥分发设备如何从0到1的搭建。 1.QKD系统总体讨论 QKD系统的核心功能就是为通信双方提供理论…

后端插入数据库问题

IDEA报错&#xff1a;Error updating database. Cause: java.sql.SQLException: Column count doesn’t match value count at row 1 1、看报错消息&#xff0c;SQLException&#xff0c;定位到SQL语句问题 并且看best guess最好猜测&#xff0c;再去找路径下的ShoppingCartMa…

第十二章 屏幕后处理效果

屏幕后处理效果是实现屏幕特效的常见方法。 建立一个基本的屏幕后处理的脚本 屏幕后处理指的是在渲染完整个场景得到屏幕图像后,再对这个图像进行一系列操作,实现各种屏幕特效。 想要实现屏幕后处理的基础在于抓取屏幕。Unity为我们提供了一个接口-OnRenderImage函数。 声…

【C语言回顾】函数

前言1. 函数的概念和分类2.库函数3. 自定义函数3.1 自定义函数的简单介绍3.2 自定义函数举例 4. 形参和实参4.1 形参4.2 实参4.3 形参和实参的关系4.3.1 理解4.3.2 举例代码和调试 5. 嵌套函数和链式访问5.1 嵌套函数5.2 链式访问 6. 函数的声明和定义6.1 单个文件6.2 多个文件…

Java PDF文件流传输过程中速度很慢,如何解决?

专栏集锦&#xff0c;大佬们可以收藏以备不时之需&#xff1a; Spring Cloud 专栏&#xff1a;http://t.csdnimg.cn/WDmJ9 Python 专栏&#xff1a;http://t.csdnimg.cn/hMwPR Redis 专栏&#xff1a;http://t.csdnimg.cn/Qq0Xc TensorFlow 专栏&#xff1a;http://t.csdni…

Https网站接口被黑被恶意调取

背景&#xff1a; 维护的一个网站最近短信接口被黑&#xff0c;发送大量短信。起初以为是在网站内部操作&#xff0c;优化了发送短信前的操作&#xff0c;如添加图形验证码&#xff0c;屏蔽国外IP等。但后续还存在被调取情况&#xff0c;定位排查到是该接口在外部被恶意调取。 …

牛客Linux高并发服务器开发学习第二天

Gcc编译 利用gcc 生成应用时如果不加-o 和应用名&#xff0c;默认生成a.out 可以用./ a.out打开 Gcc工作流程 可执行程序Windows系统中为.exe Linux系统中为.out g也可以编辑c程序 gcc也可以编译cpp代码&#xff0c;只是在编译阶段gcc不能自动共和C程序使用的库进行联接&…

论文笔记:Are Human-generated Demonstrations Necessary for In-context Learning?

iclr 2024 reviewer 评分 6668 1 intro 大型语言模型&#xff08;LLMs&#xff09;已显示出在上下文中学习的能力 给定几个带注释的示例作为演示&#xff0c;LLMs 能够为新的测试输入生成输出然而&#xff0c;现行的上下文学习&#xff08;ICL&#xff09;范式仍存在以下明显…

海信发布《黑神话:悟空》定制电视E8N新品,重塑大屏游戏体验

4月17日&#xff0c;在“AI美好生活”2024海信电视E8系列新品发布会上&#xff0c;海信电视官宣成为《黑神话&#xff1a;悟空》全球官方合作伙伴。同时&#xff0c;海信电视还为广大游戏玩家带来了《黑神话&#xff1a;悟空》的显示CP&#xff0c;推出了官方定制电视——旗舰新…

夸克AI PPT初体验:一键生成大纲,一键生成PPT,一键更换模板!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;所以创建了“AI信息Gap”这个公众号&#xff0c;专注于分享AI全维度知识…

FineBI 6.0 Linux 部署、ClickHouse 源配置

文章目录 FineBI 概述FineBI 部署安装环境说明1.下载安装包2.安装3.初始化设置4.登录5.快速入门 启动与关闭启动关闭 ClickHouse 源配置开启驱动上传功能驱动上传数据库连接配置基础表属性设置数据导入 FineBI 概述 FineBI 是一款国产的商业智能&#xff08;BI&#xff09;软件…

基于Ultrascale+系列GTY收发器64b/66b编码方式的数据传输(一)——Async Gearbox使用及上板测试

于20世纪80年代左右由IBM提出的传统8B/10B编码方式在编码效率上较低&#xff08;仅为80%&#xff09;&#xff0c;为了提升编码效率&#xff0c;Dgilent Techologies公司于2000年左右提出了64b/66b编码并应用于10G以太网中。Xilinx GT手册中没有过多64b/66b编码介绍&#xff0c…

生活中的洪特规则

不知道你还记不记得高中物理所学的一个奇特的物理规则&#xff1a;洪特规则。 洪特规则是德国人弗里德里希洪特&#xff08;F.Hund&#xff09;根据大量光谱实验数据总结出的一个规律&#xff0c;它指出电子分布到能量简并的原子轨道时&#xff0c;优先以自旋相同的方式分别占…

【算法一则】矩阵置零 【矩阵】【空间复用】

题目 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1],[1,1,1]] 输出&#xff1a;[[1,0,1],[0,0,0],[1,0,1]]示例 2&#xff1a; …

深度学习 Lecture 9 信息增益、One-hot、回归树、集成树、随机森林、XGBoost模型

一、信息增益&#xff08;Information Gain) 决定使用什么特征来划分一个节点取决于什么样的特征选择最能减少熵&#xff08;也就是使纯度最大化&#xff09; 在决策树中&#xff0c;熵的减少被称为信息增益。 所以如何选择呢&#xff1f; 假设现在有三个特征可以选择&#…

CUDA 以及MPI并行矩阵乘连接服务器运算vscode配置

一、CUDA Vscode配置 &#xff08;一&#xff09;扩展安装 本地安装 服务器端安装 &#xff08;二&#xff09; CUDA 配置 .vscode c_cpp_properties.json {"configurations": [{"name": "Linux","includePath": ["${workspa…
最新文章