B080-RabbitMQ

目录

      • RabbitMQ认识
        • 概念
        • 使用场景
        • 优点
        • AMQP协议
        • JMS
      • RabbitMQ安装
        • 安装elang
        • 安装RabbitMQ
        • 安装管理插件
        • 登录RabbitMQ
        • 消息队列的工作流程
      • RabbitMQ常用模型
        • HelloWorld-基本消息模型
          • 生产者发送消息
            • 导包
            • 获取链接工具类
            • 消息的生产者
          • 消费者消费消息
            • 模拟消费者
            • 手动签收消息
        • Work Queues
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-FANOUT-广播
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-Direct-定向
          • Sender
          • Consume1
          • Consume2
        • 订阅模型-Topic-通配符
          • Sender
          • Consume1
          • Consume2
        • 总结
      • SpringBoot集成RabbitMQ
        • 导包
        • yml
        • config
        • producer
        • consumer

RabbitMQ认识

概念

MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。官方地址:http://www.rabbitmq.com/

使用场景

在这里插入图片描述

优点

任务异步处理:
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。(丢进去由接收方分别异步处理)

消除峰值:
异步化提速(发消息),提高系统稳定性(多系统调用),服务解耦(5-10个服务),排序保证,消除峰值
(放入队列中不用马上都处理完,有中间状态,消息分发后可由多个订阅方分别异步处理)

服务解耦:
应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
(将单体业务拆分为生产者,消息队列和消费者)

AMQP协议

AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。
(其他Python,C#,PHP也都能用)

JMS

JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的。
(只能Java用,基本已经被摒弃)

RabbitMQ安装

安装elang

otp_win64_20.2.exe
配置环境变量

安装RabbitMQ

rabbitmq-server-3.7.4.exe
可通过任务管理器或开始菜单启动或关闭服务

安装管理插件

安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到RabbitMQ的sbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management , 安装成功后重新启动RabbitMQ
(开启可视化界面)

重启MQ

登录RabbitMQ

进入浏览器,输入:http://localhost:15672,初始账号和密码:guest/guest
在这里插入图片描述

消息队列的工作流程

在这里插入图片描述

RabbitMQ常用模型

HelloWorld-基本消息模型

一个生产者与一个消费者
在这里插入图片描述

生产者发送消息
导包
<dependencies>
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <!--和springboot2.0.5对应-->
        <version>5.4.1</version>
    </dependency>
</dependencies>
获取链接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口,和管理端端口15672不一样,管理端是另外一台网页版的系统,5672才是MQ本身
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");//集群的时候才用这个参数
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}
消息的生产者
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

//消息的生产者
public class Sender {
    public static  final  String  HELLO_QUEUE="hello_queue";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建队列(hello这里用默认的交换机)
    /*  String queue :队列的名字,可自定义,   
        boolean durable: 持久化,   
        boolean exclusive:是否独占;大家都能用,传false,
        boolean autoDelete: 用完即删;关了就没了,消费者还要拿,所以传false,
        Map<String, Object> arguments:没有其他要传的属性就传false          */
        channel.queueDeclare(HELLO_QUEUE, true, false, false, null);

        String msg="今天中午吃啥";
        //4.发送消息
        channel.basicPublish("", HELLO_QUEUE, null, msg.getBytes());

        channel.close();
        conn.close();
    }
}

在这里插入图片描述

消费者消费消息
模拟消费者
import com.rabbitmq.client.*;
import java.io.IOException;

//模拟消费者
public class Consume {

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);// 消费者标识
                System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
                System.out.println("消息内容:"+new String(body));
                System.out.println("消费完成----------------");
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.HELLO_QUEUE,false,callback);

    }
}

在这里插入图片描述
在这里插入图片描述
只要消费者不关,生产者发一次消息消费者就自动监听消费一次消息

手动签收消息
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

//模拟消费者
public class Consume {

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //回调,可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);// 消费者标识
                System.out.println("envelope:"+envelope);// 消息队列里面的一些公共属性
//                System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("消费完成----------------");
                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);// 第二个参数为是否同时签收多个,传false
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收不等于消费成功,处理逻辑走完没有报错才算签收成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.HELLO_QUEUE,false,callback);

    }
}

Work Queues

在这里插入图片描述
一个生产者与多个消费者。
默认轮询,也可以改成能者多劳

Sender
//消息的生产者
/*
    如果有多个消费者监听同一个队列,默认轮询
 */
public class Sender {
    public static  final  String  WORK_QUEUE="work_queue";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建队列
        /*
        String queue :队列的名字
        boolean durable: 持久化
        boolean exclusive:是否独占
        boolean autoDelete: 用完即删
        Map<String, Object> arguments
         */
        channel.queueDeclare(WORK_QUEUE, true, false, false, null);

        String msg="今天中午吃啥";
        //4.发送消息
        channel.basicPublish("", WORK_QUEUE, null, msg.getBytes());

        channel.close();
        conn.close();
    }
}
Consume1
//模拟消费者
public class Consume1 {

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
//        channel.basicQos(1);
        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
//                try {
//                    Thread.sleep(100);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                System.out.println("------------------------------------");
                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.WORK_QUEUE,false,callback);
    }
}
Consume2
//模拟消费者
public class Consume2 {

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
//        channel.basicQos(1);
        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                System.out.println("消息内容:"+new String(body));
//                try {
//                    Thread.sleep(10000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
                System.out.println("------------------------------------");
                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(Sender.WORK_QUEUE,false,callback);
    }
}

订阅模型-FANOUT-广播

在这里插入图片描述
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

Sender
//消息的生产者
/*
    变化
        1.不创建 队列
        2.创建交换机
        3.给交换机发送消息
 */
public class Sender {
    public static  final  String  FANOUT_EXCHANGE="fanout_exchange";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建交换机
        /*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

        String msg="今天晚上吃啥";
        //4.发送消息
        channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}
Consume1
//模拟消费者
/*
    1.创建队列
    2.队列绑定到交换机
    3.每个消费者要监听自己的队列
 */
public class Consume1 {
    public  static final  String FANOUT_QUEUE1="fanout_queue1";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);
        //绑定到交换机
        channel.queueBind(FANOUT_QUEUE1, Sender.FANOUT_EXCHANGE, "");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(FANOUT_QUEUE1,false,callback);
    }
}
Consume2
//模拟消费者
/*
    1.创建队列
    2.队列绑定到交换机
    3.每个消费者要监听自己的队列
 */
public class Consume2 {
    public  static final  String FANOUT_QUEUE2="fanout_queue2";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);
        //绑定到交换机
        channel.queueBind(FANOUT_QUEUE2, Sender.FANOUT_EXCHANGE, "");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(FANOUT_QUEUE2,false,callback);
    }
}

订阅模型-Direct-定向

在这里插入图片描述
把消息交给符合指定routing key 的队列 一堆或一个

Sender
//消息的生产者
/*
    变化
        1.交换机类型
        2.给交换机发送消息,指定 routing key
 */
public class Sender {
    public static  final  String  DIRECT_EXCHANGE="direct_exchange";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建交换机
        /*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        String msg="今天晚上吃啥";
        //4.发送消息
        channel.basicPublish(DIRECT_EXCHANGE, "dept", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}
Consume1
//模拟消费者
/*
    1.指定routing key
 */
public class Consume1 {
    public  static final  String DIRECT_QUEUE1="direct_queue1";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);
        //绑定到交换机
        channel.queueBind(DIRECT_QUEUE1, Sender.DIRECT_EXCHANGE, "emp.delete");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(DIRECT_QUEUE1,false,callback);
    }
}
Consume2
//模拟消费者
/*
    1.指定routing key
 */
public class Consume2 {
    public  static final  String DIRECT_QUEUE2="direct_queue2";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);
        //绑定到交换机
        channel.queueBind(DIRECT_QUEUE2, Sender.DIRECT_EXCHANGE, "dept");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(DIRECT_QUEUE2,false,callback);
    }
}

订阅模型-Topic-通配符

在这里插入图片描述
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert

通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词

Sender
//消息的生产者
/*
    变化
        1.交换机类型
        2.给交换机发送消息,指定 routing key
 */
public class Sender {
    public static  final  String  TOPIC_EXCHANGE="topic_exchange";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //3.创建交换机
        /*
            exchange:交换机的名字
            type:交换机的类型
            durable:是否持久化
         */
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);

        String msg="今天晚上吃啥";
        //4.发送消息
        channel.basicPublish(TOPIC_EXCHANGE, "user.insert.add.pubilsh", null, msg.getBytes());

        channel.close();
        conn.close();
    }
}
Consume1
//模拟消费者
/*
    1.指定routing key
 */
public class Consume1 {
    public  static final  String TOPIC_QUEUE1="topic_queue1";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);
        //绑定到交换机
        /*
            #.1到多个单词
            *. 一个单词
         */
        channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE, "user.#");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(TOPIC_QUEUE1,false,callback);
    }
}
Consume2
//模拟消费者
/*
    1.指定routing key
 */
public class Consume2 {
    public  static final  String TOPIC_QUEUE2="topic_queue2";

    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection conn = ConnectionUtil.getConnection();
        //2.获取通道
        Channel channel = conn.createChannel();
        //同时处理的消息数量
        channel.basicQos(1);
        //创建队列
        channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);
        //绑定到交换机
        /*
            #.1到多个单词
            *. 一个单词
         */
        channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE, "email.*");

        //回调
        Consumer callback=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("consumerTag:"+consumerTag);
                System.out.println("envelope:"+envelope);
                 //System.out.println(1/0);
                System.out.println("消息内容:"+new String(body));
                System.out.println("------------------------------------");

                //所有业务逻辑结束以后 手动签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //3.监听队列
        /*
            queue :队列名字
            autoAck:自动签收    签收 不等于  消费成功
            Consumer callback: 回调
         */
        channel.basicConsume(TOPIC_QUEUE2,false,callback);
    }
}

总结

01_hello

生产者	1.获取连接	2.获取通道	3.创建队列	4.发送消息

消费者	1.获取连接	2.获取通道	3.监听队列 (并回调)

02_workqueue	默认轮询		可修改(能者多劳)

生产者	1.获取连接	2.获取通道	3.创建队列	4.发送消息

消费者	1.获取连接	2.获取通道	3.监听队列 (并回调)

03_fanout	广播	将消息交给所有绑定到交换机的队列(多个消费者都能收到)

生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机

消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)

04_direct	定向	把消息交给符合指定 routing key 的队列 一堆或一个

生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机

消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)

05_topic		通配符	把消息交给符合routing pattern (路由模式) 的队列 一堆或一个

生产者	1.获取连接	2.获取通道	3.创建交换机	4.发送消息到交换机

消费者	1.获取连接	2.获取通道	创建队列	绑定到交换机	3.监听队列 (并回调)

SpringBoot集成RabbitMQ

导包

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <!--spirngboot集成rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

yml

server:
  port: 44000
spring:
  application:
    name: test‐rabbitmq‐producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /
    listener:
      simple:
        acknowledge-mode: manual #手动签收
        prefetch: 1 #消费者的消息并发处理数量
    #publisher-confirms: true #消息发送到交换机失败回调
    #publisher-returns: true  #消息发送到队列失败回调
    template:
      mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

config

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_SPRINGBOOT="exchange_springboot";
    public static final String QUEUE1_SPRINGBOOT="queue1_springboot";
    public static final String QUEUE2_SPRINGBOOT="queue2_springboot";

    //创建一个交换机
    @Bean
    public Exchange createExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();
    }

    //创建两个队列
    @Bean
    public Queue createQueue1(){
        return  new Queue(QUEUE1_SPRINGBOOT,true);
    }
    @Bean
    public Queue createQueue2(){
        return  new Queue(QUEUE2_SPRINGBOOT,true);
    }

    //把交换机和队列绑定到一起
    @Bean
    public Binding bind1(){
        return BindingBuilder.bind(createQueue1()).to(createExchange()).with("user.*").noargs();
    }
    @Bean
    public Binding bind2(){
        return BindingBuilder.bind(createQueue2()).to(createExchange()).with("email.*").noargs();
    }


    //消费者 还原对象方式(从MQ里取出json转为对象)
    @Bean("rabbitListenerContainerFactory")
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);
        return factory;
    }

    //放到消息队列里面的转换(转为json存进MQ)
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

producer

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = App.class)
@RunWith(SpringRunner.class)
public class Sender {

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    public void test(){
        /*
            问题:多系统之间 信息交互  传递对象
               解决方案:转换为json存储
               实现:
                    1.fastjson    对象 - josn  (作业)
                    2.重写转换器模式
        */

        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT
                    , "email.save", new User(1L,"文达"));
        }
        System.out.println("消息发送完毕");
    }
}

consumer

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

//消费者
@Component
public class Consu {

    @RabbitListener(queues = {RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory = "rabbitListenerContainerFactory")//用这个转换器接
    public void user(@Payload User user, Channel channel, Message message) throws IOException {
        System.out.println(message);

        System.out.println("user队列:"+user);
        //手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = {RabbitMQConfig.QUEUE2_SPRINGBOOT})
    public void email(@Payload User user,Channel channel,Message message ) throws IOException {
        System.out.println(message);
        System.out.println("email队列:"+user);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

队列内容可传string,entity序列化对象,json对象,

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/100402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

控制goroutine 的并发执行数量

goroutine的数量上限是1048575吗&#xff1f; 正常项目&#xff0c;协程数量超过十万就需要引起重视。如果有上百万goroutine&#xff0c;一般是有问题的。 但并不是说协程数量的上限是100多w 1048575的来自类似如下的demo代码: package mainimport ( "fmt" "ma…

机械臂+2d相机实现复合机器人定位抓取

硬件参数 机械臂&#xff1a;艾利特 相机&#xff1a;海康相机 2d识别库&#xff1a;lindmod&#xff0c;github可以搜到 光源&#xff1a;磐鑫光源 软件参数 系统&#xff1a;windows / Linux 开发平台&#xff1a;Qt 开发语言&#xff1a;C 开发视觉库&#xff1a;OpenCV …

【算法系列篇】位运算

文章目录 前言什么是位运算算法1.判断字符是否唯一1.1 题目要求1.2 做题思路1.3 Java代码实现 2. 丢失的数字2.1 题目要求2.2 做题思路2.3 Java代码实现 3. 两数之和3.1 题目要求3.2 做题思路3.3 Java代码实现 4. 只出现一次的数字4.1 题目要求4.2 做题思路4.3 Java代码实现 5.…

【若依框架RuoYi-Vue-Plus 图片回显不显示问题,OSS文件上传或者本地上传】

一、问题 1.设计表 product&#xff08;商品表&#xff09; 有 id &#xff08;id&#xff09; name&#xff08;商品名&#xff09;icon&#xff08;图标&#xff09; 2.使用若依代码生成功能&#xff0c;导入product表&#xff0c;代码生成。 3.将生成的代码导入到项目中得到…

3D点云处理:提取指定圆环内的点(附源码)

文章目录 0. 测试效果1. 基本内容2. 代码实现文章目录:3D视觉个人学习目录微信:dhlddxB站: Non-Stop_目标:提取指定范围的点云0. 测试效果 红色为根据指定条件提取的点 1. 基本内容 要提取指定圆环内和指定高度范围内的点云,可以按照以下步骤进行操作: 定义圆环和高度参数…

ArcGIS地块面积分割调整工具插件

地块分割调整工具可以实现将选定的图斑按照面积比例或者指定的面积&#xff0c;分割成多个图斑。 各个图斑的面积用逗号分隔&#xff0c;比例分割设置时&#xff0c;用整数表示。 面积分割时&#xff0c;最后一个图斑的面积可以不写&#xff0c;插件可以自动计算图斑的面积&a…

基于Springboot实现的Echarts图表

概述 ECharts是百度开源的一个前端组件。它是一个使用 JavaScript 实现的开源可视化库&#xff0c;可以流畅的运行在 PC 和移动设备上&#xff0c;兼容当前绝大部分浏览器&#xff08;IE8/9/10/11&#xff0c;Chrome&#xff0c;Firefox&#xff0c;Safari等&#xff09;&…

yolov8机器视觉-工业质检

使用训练好的模型进行预测 yolo predict taskdetect model训练好的模型路径 source测试图片文件夹路径 showTrue效果展示 切换模型进行训练&#xff08;yolov8s&#xff09; 修改main.py训练参数文件 使用云gpu进行训练&#xff0c;很方便&#xff1a;点击链接转至在线云gpu…

Javase | IO流

目录&#xff1a; 1.输入 (Intput/Read)2.输出 (Output/Write)3.IO4.IO流5.IO流的分类&#xff1a;5.1 分类总述5.2 按照 “流的方向” 进行分类5.3 按照 “读取数据的方式” 进行分类 6.IO包下要重点掌握的流&#xff1a;6.1 文件专属 (流)6.2 转换流 ( 将字节流转换为字符流 …

IntelliJ IDEA 2023.2.1 Android开发变化

IntelliJ IDEA 2023.2.1之前的版本&#xff0c;Empty Activity是指Empty View Activity&#xff0c;而现在Empty Activity是指Empty Compose Activity&#xff0c;另外多了一个Empty View Activity的选项 这表明官方推荐使用Compose这种声明式的编程方式来描述UI&#xff0c;命…

Idea安装免注册版ChatGPT

文章目录 一、前期准备二、开始使用 一、前期准备 1.准备Idea开发软件并打开&#xff08;VS Code同理&#xff09;! 2.【CtrlAltS】快捷键调出Settings窗口&#xff0c;如图 3.找到NexChatGPT 此插件不需要注册&#xff0c;可以直接使用&#xff08;高级一些的需要会员收费限…

Linux网络编程 网络基础知识

目录 1.网络的历史和协议的分成 2.网络互联促成了TCP/IP协议的产生 3.网络的体系结构 4.TCP/IP协议族体系 5.网络各层的协议解释 6.网络的封包和拆包 7.网络预备知识 1.网络的历史和协议的分成 Internet-"冷战"的产物 1957年十月和十一月&#xff0c;前苏…

操作系统备考学习 day1 (1.1.1-1.3.1)

操作系统备考学习 day1 计算机系统概述操作系统的基本概念操作系统的概念、功能和目标操作系统的四个特征并发共享虚拟异步 操作系统的发展和分类操作系统的运行环境操作系统的运行机制 年初做了一个c的webserver 的项目&#xff0c;在学习过程中已经解除部分操作系统的知识&am…

【Linux】fork函数的基础知识

文章目录 前言一、fork的返回值二、常见问题 1.为什么fork要给子进程返回0&#xff0c;给父进程返回子进程pid&#xff1f;2.一个函数返回两次值怎么理解&#xff1f; 3.一个变量怎么会有不同的内容&#xff1f; 4.fork函数干了什么&#xff1f; 前言 fork初识&#xff1a; …

MySQL 数据库常用命令大全(完整版)

文章目录 1. MySQL命令2. MySQL基础命令3. MySQL命令简介4. MySQL常用命令4.1 MySQL准备篇4.1.1 启动和停止MySQL服务4.1.2 修改MySQL账户密码4.1.3 MySQL的登陆和退出4.1.4 查看MySQL版本 4.2 DDL篇&#xff08;数据定义&#xff09;4.2.1 查询数据库4.2.2 创建数据库4.2.3 使…

【Ant Design】Form.Item创建自定义表单

一、概述 Antd是一个非常强大的UI组件库&#xff0c;里面的Form表单组件也基本能满足我们大多数场景。但是也有需要自定义表单的场景。 Vue2里我们使用v-model&#xff0c;结合子组件的model属性&#xff0c;来实现自定义组件的双向绑定。 Vue3里我们使用v-model&#xff0c;…

[Unity]UI和美术出图效果不一致

问题描述&#xff1a;美术使用PS在Gamma空间下设计的UI图&#xff0c;导入到Unity&#xff0c;因为Unity使用的是线性空间&#xff0c;导致半透明的UI效果和美术设计的不一致。 解决方案&#xff1a; &#xff08;一&#xff09;让美术在线性空间下工作 &#xff08;二&…

【LeetCode】《LeetCode 101》第十二章:字符串

文章目录 12.1 字符串比较242 . 有效的字母异位词&#xff08;简单&#xff09;205. 同构字符串&#xff08;简单&#xff09;647. 回文子串&#xff08;中等&#xff09;696 . 计数二进制子串&#xff08;简单&#xff09; 12.2 字符串理解224. 基本计算器&#xff08;困难&am…

Python Opencv实践 - 霍夫圆检测(Hough Circles)

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/steelpipes.jpg") print(img.shape) plt.imshow(img[:,:,::-1])#转为二值图 gray cv.cvtColor(img, cv.COLOR_BGR2GRAY) plt.imshow(gray, cmap plt.cm.gray…

大数据HBase学习圣经:一本书实现HBase学习自由

学习目标&#xff1a;三栖合一架构师 本文是《大数据HBase学习圣经》 V1版本&#xff0c;是 《尼恩 大数据 面试宝典》姊妹篇。 这里特别说明一下&#xff1a;《尼恩 大数据 面试宝典》5个专题 PDF 自首次发布以来&#xff0c; 已经汇集了 好几百题&#xff0c;大量的大厂面试…