2.7日学习打卡----初学RabbitMQ(二)

2.7日学习打卡

在这里插入图片描述
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则
——JMS,用于操作消息中间件。JMS即Java消息服务
(JavaMessage Service)应用程序接口,是一个Java平台中关于面
向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多
MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没
有实现JMS规范,但是开源社区有JMS的实现包。

创建项目

# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

创建普通maven项目,添加RabbitMQ依赖:

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqpclient</artifactId>
        <version>5.14.0</version>
    </dependency>
</dependencies>

一. RabbitMQ 简单模式在这里插入图片描述

P:生产者,也就是要发送消息的程序

C:消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

特点:

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

生产者代码实现

步骤:

  1. 创建连接工厂ConnectionFactory
  2. 设置工厂的参数
  3. 创建连接 Connection
  4. 创建管道 Channel
  5. 简单模式中没有交换机exchange,所以不用创建(RabbitMQ会使用默认的交换机!)
  6. 创建队列 queue
  7. 设置发送内容,使用channal.basicPublish()发送
  8. 释放资源

代码实现

package com.jjy.mq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//生产者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //使用自己的服务器ip地址
        connectionFactory.setHost("192.168.66.100");
        //rabbitmq的默认端口5672
        connectionFactory.setPort(5672);
        //用户名
        connectionFactory.setUsername("jjy");
        //密码
        connectionFactory.setPassword("jjy");
        //虚拟机
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列,如果队列已存在,则使用该队列
        /**
        //     * 参数1:队列名
        //     * 参数2:是否持久化,true表示MQ重启后队列还在。
        //     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
        //     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
        //     * 参数5:其他额外参数
        //     */
        channel.queueDeclare("simple_queue",false,false,false,null);
        //5.发送消息
        String mesg="hello rabbitmq";
        /**
         * 参数1:交换机名,""表示默认交换机
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        channel.basicPublish("","simple_queue",null,mesg.getBytes());

        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
        System.out.println("发送成功");
    }
}

消费者代码实现

在这里插入图片描述

步骤:

1.创建连接工厂ConnectionFactory
2.设置工厂参数
3.创建连接
4.创建信道
前四步代码基本是一致的,需要注意的是生产者与消费者的Channel是不同Connection中的!不是同一个对象.
5. 最简单的模型没有交换机exchange,所以此处RabbitMQ会使用默认的交换机
6. 接收消息,有一个回调方法 channel.basicConsume()

代码实现

package com.jjy.mq.simple;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.监听队列
        /**
         * 参数1:监听的队列名
         * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
         * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
         */
        channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("接受消息,消息为:"+message);
            }
        });
        //
    }
}

二. RabbitMQ 工作队列模式

在这里插入图片描述
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该
模式也使用direct交换机,应用于处理消息较多的情况。特点如
下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

生产者代码实现

代码实现

package com.jjy.mq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        // 2.创建连接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        // 4.创建队列,持久化队列
        channel.queueDeclare("work_queue",true,false,false,null);
        // 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中
        for(int i=0;i<100;i++){

            channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("你好,这是今天的第"+i+"条消息").getBytes());
        }
        // 6.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

在这里插入图片描述

消费者1:

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        // 4.监听队列,处理消息
        channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者1消费消息,消息为:" + message);
            }
        });

    }
}

消费者2

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer2 {

        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.66.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("jjy");
            connectionFactory.setPassword("jjy");
            connectionFactory.setVirtualHost("/");
            //2.创建连接
            Connection connection = connectionFactory.newConnection();
            //3.建立信道
            Channel channel = connection.createChannel();
            // 4.监听队列,处理消息
            channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消费者2消费消息,消息为:" + message);
                }
            });

        }


}


消费者3

package com.jjy.mq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer3 {

        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.66.100");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("jjy");
            connectionFactory.setPassword("jjy");
            connectionFactory.setVirtualHost("/");
            //2.创建连接
            Connection connection = connectionFactory.newConnection();
            //3.建立信道
            Channel channel = connection.createChannel();
            // 4.监听队列,处理消息
            channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消费者3消费消息,消息为:" + message);
                }
            });

        }
    }


三. RabbitMQ 发布订阅模式

在这里插入图片描述
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电
商网站的同一条促销信息需要短信发送、邮件发送、站内信发送
等。此时可以使用发布订阅模式(Publish/Subscribe)
特点:

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic(常用):通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失
在这里插入图片描述

生产者代码实现

与之前的步骤相比,多了创建交换机和绑定交换机与队列的操作

代码实现

package com.jjy.mq.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机
        
        /*
        exchangeDeclare(String exchange,                  -- 交换机的名称
                        String type,                      -- 交换机的类型,4种
                                                             枚举(direct,fanout,topic,headers)
                        boolean durable,                  -- 持久化
                        boolean autoDelete,               -- 自动删除
                        boolean internal,                 -- 内部使用,基本是false
                        Map<String, Object> arguments)    -- 参数
         *
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
        //5.创建队列
        //短信队列
        channel.queueDeclare("SEND_MAIL",true,false,false,null);
        //消息队列
        channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
        //站内信息
        channel.queueDeclare("SEND_STATION",true,false,false,null);
        //6.交换机绑定队列
        /**
         * 参数1:队列名
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
        channel.queueBind("SEND_MAIL","exchange_fanout","");
        channel.queueBind("SEND_MESSAGE","exchange_fanout","");
        channel.queueBind("SEND_STATION","exchange_fanout","");
        //7.发送消息
        for (int i = 1; i <= 10 ; i++) {
            channel.basicPublish("exchange_fanout","",null,
                    ("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));
        }
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

接下来编写三个消费者,分别监听各自的队列。
//站内信消费者

package com.jjy.mq.publish;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 站内信消费者
public class CustomerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送站内信:"+message);
            }
        });
    }
}

邮件消费者

 
package com.jjy.mq.publish;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送邮件:"+message);
            }
        });
    }
}

短信消费者

package com.jjy.mq.publish;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送短信:"+message);
            }
        });
    }
}


也可以使用工作队列+发布订阅模式同时使用,两个消费者同时监听
一个队列:


// 短信消费者2
public class CustomerMessage2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.162");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("itbaizhan");
        connectionFactory.setPassword("itbaizhan");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送短信2:"+message);
            }
        });
    }
}

两个不一样的系统,对同一条消息做不一样的处理

发布订阅模式与工作队列模式的区别
(1)工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机

(2)发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)

(3)发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机

四. RabbitMQ 路由模式

在这里插入图片描述
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多
时候,不是所有消息都无差别的发布到所有队列中。比如电商网站
的促销活动,双十一大促可能会发布到所有队列;而一些小的促销
活动为了节约成本,只发布到站内信队列。此时需要使用路由模式
(Routing)完成这一需求。
特点:

  1. 每个队列绑定路由关键字RoutingKey
  2. 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模
    式使用direct交换机。

队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
在这里插入图片描述

生产者代码实现

package com.jjy.mq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
        // 5.创建队列
        channel.queueDeclare("SEND_MAIL2",true,false,false,null);
        channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
        channel.queueDeclare("SEND_STATION2",true,false,false,null);
        //6.交换机绑定队列
        /**
         * 参数1:队列名
         * 参数2:交换机名
         * 参数3:路由关键字,发布订阅模式写""即可
         */
        channel.queueBind("SEND_MAIL2","exchange_routing","import");
        channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
        channel.queueBind("SEND_STATION2","exchange_routing","import");
        channel.queueBind("SEND_STATION2","exchange_routing","normal");
        //7.发送消息
        channel.basicPublish("exchange_routing","import",null,
                "双十一大促活动".getBytes());
        channel.basicPublish("exchange_routing","normal",null,
                "小型促销活动".getBytes());
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

站内信消费者

package com.jjy.mq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 站内信消费者
public class CustomerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送站内信:"+message);
            }
        });
    }
}

短信消费者

package com.jjy.mq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送短信:"+message);
            }
        });
    }
}

邮件消费者

package com.jjy.mq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送邮件:"+message);
            }
        });
    }
}

总的来说就一句话:

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

五. RabbitMQ 通配符模式

在这里插入图片描述
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活,使用topic交换
机.
通配符规则

  1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。
  2. 队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。

生产者代码实现

在这里插入图片描述
代码实现

package com.jjy.mq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建交换机
        /**
         * 参数1:交换机名
         * 参数2:交换机类型
         * 参数3:交换机持久化
         */
        channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
        // 5.创建队列
        channel.queueDeclare("SEND_MAIL3",true,false,false,null);
        channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
        channel.queueDeclare("SEND_STATION3",true,false,false,null);
        //6.交换机绑定队列

        channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
        channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
        channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
        //7.发送消息
        channel.basicPublish("exchange_topic","mail.message.station",null,
                "双十一大促活动".getBytes());
        channel.basicPublish("exchange_topic","station",null,
                "小型促销活动".getBytes());
        //8.关闭资源
        channel.close();
        connection.close();
    }
}

消费者代码实现

站内信消费者

package com.jjy.mq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 站内信消费者
public class CustomerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送站内信:"+message);
            }
        });
    }
}

短信消费者

package com.jjy.mq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送短信:"+message);
            }
        });
    }
}

邮件消费者

package com.jjy.mq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class CustomerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("jjy");
        connectionFactory.setPassword("jjy");
        connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = conn.createChannel();
        // 4.监听队列
        channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "utf-8");
                System.out.println("发送邮件:"+message);
            }
        });
    }
}

总述:topics模式比routing模式要更加灵活,笼统的说就是routing模式加上通配符

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/380801.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构-->线性表-->单链表

链表的定义 链表&#xff1a;链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的。 与顺序表不同的是&#xff0c;链表里的每节都是独立申请下来的空间&#xff0c;我们称之为“节点、结点”。 节点的组成主要由…

横扫Spark之 - 22个常见的转换算子

水善利万物而不争&#xff0c;处众人之所恶&#xff0c;故几于道&#x1f4a6; 文章目录 1. map()2. flatMap()3. filter()4. mapPartitions()5. mapPartitionsWithIndex()6. groupBy()7. distinct()8. coalesce()9. repartition()10. sortBy()11. intersection()12.union()13.…

5G技术对物联网的影响

随着数字化转型的加速&#xff0c;5G技术作为通信领域的一次重大革新&#xff0c;正在对物联网&#xff08;IoT&#xff09;产生深远的影响。对于刚入行的朋友们来说&#xff0c;理解5G技术及其对物联网应用的意义&#xff0c;是把握行业发展趋势的关键。 让我们简单了解什么是…

使用python-numpy实现一个简单神经网络

目录 前言 导入numpy并初始化数据和激活函数 初始化学习率和模型参数 迭代更新模型参数&#xff08;权重&#xff09; 小彩蛋 前言 这篇文章&#xff0c;小编带大家使用python-numpy实现一个简单的三层神经网络&#xff0c;不使用pytorch等深度学习框架&#xff0c;来理解…

探索设计模式的魅力:代理模式揭秘-软件世界的“幕后黑手”

设计模式专栏&#xff1a;http://t.csdnimg.cn/U54zu 目录 引言 一、魔法世界 1.1 定义与核心思想 1.2 静态代理 1.3 动态代理 1.4 虚拟代理 1.5 代理模式结构图 1.6 实例展示如何工作&#xff08;场景案例&#xff09; 不使用模式实现 有何问题 使用模式重构示例 二、…

【Rust日报】2024-02-08 Loungy:使用 Rust 和 GPUI 开发的 MacOS 启动器

Mira Screenshare&#xff1a;基于 Rust 和 WebRTC 的高性能屏幕分享工具 一群大学生宣布推出了他们的期末项目&#xff1a;Mira Screenshare&#xff0c;一个开源、高性能的屏幕共享工具&#xff0c;由 Rust 和 WebRTC 构建。此项目支持 4k 60 FPS 和 110ms 端到端延迟的屏幕…

CS50x 2024 - Lecture 2 - Arrays

00:00:00 - Introduction 00:01:01 - Story Time 00:06:03 - Compiling make本身并不是编译器&#xff0c;实际上是一个自动运行编译器的程序&#xff0c;如c语言的clang clang -o hello hello.csrc/ $ clang -o hello hello_world.c /usr/bin/ld: /tmp/hello_world-67f51…

Oracle 面试题 | 19.精选Oracle高频面试题

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

你的立身之本是什么?

去年发生的一切&#xff0c;大到疫情、政治经济形势、行业的萎靡和震荡&#xff0c;小到身边的跳槽、裁员、公司倒闭……似乎都在告诉我们&#xff1a; 当冲击到来的时候&#xff0c;它是不会提前跟你打招呼的。 接下来的10年&#xff0c;我们所面临的不确定性&#xff0c;比起…

Linux 软件管理(YUM RPM)

1 YUM yum&#xff08;全称为 Yellow dog Updater, Modified&#xff09;是一个在Fedora和RedHat以及CentOS中的Shell前端软件包管理器。基于RPM包管理&#xff0c;能够从指定的服务器自动处理依赖性关系&#xff0c;并且一次安装所有依赖的软件包&#xff0c;无须繁琐地一次次…

fps cf游戏,一键断网辅助工具

一键断网瞬移 工具特色&#xff1a;一改常规断网操作&#xff08;断网开启&#xff0c;所有人都卡住&#xff0c;使得还原后找不到人的问题 &#xff09;&#xff0c;不影响任何人移动&#xff0c;开启断网跟着别人一起走&#xff0c;其他人无任何异常卡顿。 工具功能&…

Linux应用程序几种参数传递方式

大家好&#xff0c;今天给大家介绍Linux应用程序几种参数传递方式&#xff0c;文章末尾附有分享大家一个资料包&#xff0c;差不多150多G。里面学习内容、面经、项目都比较新也比较全&#xff01;可进群免费领取。 在Linux中&#xff0c;应用程序可以通过多种方式接收参数。以下…

文心一言 VS 讯飞星火 VS chatgpt (198)-- 算法导论14.3 6题

六、用go语言&#xff0c;说明如何来维护一个支持操作MIN-GAP的一些数的动态集Q&#xff0c;使得该操作能给出Q中两个最接近的数之间的差值。例如&#xff0c;Q(1&#xff0c;5&#xff0c;9&#xff0c;15&#xff0c;18&#xff0c;22)&#xff0c;则MIN-GAP返回18-153&#…

【EAI 011】SayCan: Grounding Language in Robotic Affordances

论文标题&#xff1a;Do As I Can, Not As I Say: Grounding Language in Robotic Affordances 论文作者&#xff1a;Michael Ahn, Anthony Brohan, Noah Brown, Yevgen Chebotar, Omar Cortes, Byron David, Chelsea Finn, Chuyuan Fu, Keerthana Gopalakrishnan, Karol Hausm…

【正式】今年第一篇CSDN(纯技术教学)

一、文件上传简介 文件上传漏洞是指用户上传了一个可执行的脚本文件&#xff08;木马、病毒、恶意脚本、webshell等&#xff09;&#xff0c;并通过此脚本文件获得了执行服务器端命令的能力。上传点一般出现在头像、导入数据、上传压缩包等地方&#xff0c;由于程序对用户上传…

C语言笔试题之求出二叉树的最大深度(递归解决)

实例要求&#xff1a; 1、给定一个二叉树 root &#xff0c;返回其最大深度&#xff1b;2、二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数&#xff1b; 案例展示&#xff1a; 实例分析&#xff1a; 1、判断根节点是否为空&#xff1b;2、分别递归处理左…

物联网数据隐私保护技术

在物联网&#xff08;IoT&#xff09;的世界中&#xff0c;无数的设备通过互联网连接在一起&#xff0c;不断地收集、传输和处理数据。这些数据有助于提高生产效率、优化用户体验并创造新的服务模式。然而&#xff0c;随着数据量的剧增&#xff0c;数据隐私保护成为了一个不能忽…

【java苍穹外卖项目实战二】苍穹外卖环境搭建

文章目录 1、前端环境搭建2、后端环境搭建1、项目结构搭建2、Git版本控制3、数据库创建 开发环境搭建主要包含前端环境和后端环境两部分。 前端的页面我们只需要导入资料中的nginx&#xff0c; 前端页面的代码我们只需要能看懂即可。 1、前端环境搭建 前端运行环境的nginx&am…

《MySQL 简易速速上手小册》第7章:MySQL监控和日志分析(2024 最新版)

文章目录 7.1 配置和使用 MySQL 监控工具7.1.1 基础知识7.1.2 重点案例&#xff1a;使用 Python 和 Prometheus 监控 MySQL 性能7.1.3 拓展案例 1&#xff1a;自动化 MySQL 慢查询日志分析7.1.4 拓展案例 2&#xff1a;实时警报系统 7.2 解读 MySQL 日志文件7.2.1 基础知识7.2.…

【Spring】Bean 的实例化方式

Spring 为 Bean 提供了多种实例化方式&#xff0c;通常包括4种方式 也就是说在 Spring 中为 Bean 对象的创建准备了多种方案&#xff0c;目的是&#xff1a;更加灵活 第一种&#xff1a;通过构造方法实例化 第二种&#xff1a;通过简单工厂模式实例化 第三种&#xff1a;通过…
最新文章