什么是RabbitMQ?有什么用如何使用?一文回答

RabbitMQ

在这里插入图片描述
在这里插入图片描述

RabbitMQ

  • channel:操作MQ的工具
  • exchange:交换机,路由消息到队列中
  • queue:队列,缓存消息
  • virtual host:虚拟主机,对queue,exchange等资源的逻辑分组

MQ模型

  • 基本消息队列
  • 工作消息队列
  • 发布订阅,根据交换机类型不同分为三种:
    • 广播
    • 路由
    • 主题

在这里插入图片描述


HelloWord案例

在这里插入图片描述

基本消息队列的消息发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列的消息接收流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

发送消息:

public void testSendMessage() throws IOException, TimeoutException {
    // 1.建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("192.168.163.129");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("123456");
    // 1.2.建立连接
    Connection connection = factory.newConnection();

    // 2.创建通道Channel
    Channel channel = connection.createChannel();

    // 3.创建队列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.发送消息
    String message = "hello, rabbitmq!222";
    channel.basicPublish("", queueName, null, message.getBytes());
    System.out.println("发送消息成功:【" + message + "】");

    // 5.关闭通道和连接
    channel.close();
    connection.close();
}

接收消息:

public static void main(String[] args) throws IOException, TimeoutException {
    // 1.建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("192.168.163.129");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("123456");
    // 1.2.建立连接
    Connection connection = factory.newConnection();

    // 2.创建通道Channel
    Channel channel = connection.createChannel();

    // 3.创建队列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.订阅消息
    channel.basicConsume(queueName, true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 5.处理消息
            String message = new String(body);
            System.out.println("接收到消息:【" + message + "】");
        }
    });
    System.out.println("等待接收消息。。。。");
}

SpringAMQP

在这里插入图片描述


HelloWord案例

发送消息

  1. 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置文件
spring:
  rabbitmq:
    host: 192.168.163.129 #RabbitMQ的地址
    username: itcast # 用户名
    password: 123456 # 密码
    port: 5672 # RabbitMQ的端口
    virtual-host: / # 虚拟主机
  1. 发送消息
@Test
public void testAMQP() {
    String queueName = "simple.queue";
    String msg = "hello,this is amqp";
    rabbitTemplate.convertAndSend(queueName,msg);
}

接收消息

  1. 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置文件
spring:
  rabbitmq:
    host: 192.168.163.129 #RabbitMQ的地址
    username: itcast # 用户名
    password: 123456 # 密码
    port: 5672 # RabbitMQ的端口
    virtual-host: / # 虚拟主机
  1. 编写一个类
@Component
public class SpringRabbitListener {
    
    @RabbitListener(queues = "simple.queue") //指定队列名称
    public void listen(String msg){
        System.out.println("接收到了消息:-->" + msg);
    }
    
}

WorkQueue案例

消费预取限制

当有多个消费者向队列中预取消息的时候,每个消费者性能不同导致一些消费者很快处理完消息,一些消费者很慢处理导致消息堆积,可以对性能差的消费者进行消费预取限制,不让他拿太多消息。如果不限制的话,比如有50条消息,两个消费者每个25,快的很快处理完25,慢的堆积处理

spring:
  rabbitmq:
    host: 192.168.163.129 #RabbitMQ的地址
    username: itcast # 用户名
    password: 123456 # 密码
    port: 5672 # RabbitMQ的端口
    virtual-host: / # 虚拟主机
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完才能获取下一条

生产者:

@Test
public void testAMQPWorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String msg = "hello,this is amqp";
    for (int i =1 ;i <=50 ;i++){
        rabbitTemplate.convertAndSend(queueName,msg + i);
        Thread.sleep(20);
    }
}

消费者:

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1 接收到了消息:-->" + msg + "====" + LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2 接收到了消息:-->" + msg + "====" + LocalTime.now());
    Thread.sleep(200);
}

Publish/Subscribe

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

exchange负责消息路由,而不负责存储,如果路由失败则消息丢失

在这里插入图片描述


发布订阅-Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue(广播)

  1. 编写配置类
@Configuration
public class FanoutConfig {
    /**
     * 声明一个交换机,名字叫:itcast.fanout
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    /**
     * 声明一个队列,名字叫:fanout.queue1
     * @return
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 将交换机跟队列绑定
     * @param fanoutExchange
     * @param fanoutQueue1
     * @return 绑定关系对象binding
     */
    @Bean
    public Binding bindingFanoutExchange(FanoutExchange fanoutExchange,Queue fanoutQueue1){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Binding bindingFanoutExchange2(FanoutExchange fanoutExchange,Queue fanoutQueue2){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
  1. 编写生产者
    @Test
    public void testSendFanoutExchange() {
//        交换机名称
        String exchangeName = "itcast.fanout";
//        消息内容
        String msg = "hello,everyone";
        rabbitTemplate.convertAndSend(exchangeName,"",msg);
    }
  1. 编写消费者
/**
 * 广播模式
 * @param msg
 * @throws InterruptedException
 */
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {
    System.err.println("消费者1监听:fanout.queue1 接收到了消息:-->" + msg + "====" + LocalTime.now());
    Thread.sleep(200);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2监听:fanout.queue2 接收到了消息:-->" + msg + "====" + LocalTime.now());
    Thread.sleep(200);
}

发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式 (routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

在这里插入图片描述

  1. 消费者不选择用声明bean的形式了,采用注解形式
/**
 * 路由模式
 * @param msg
 * @throws InterruptedException
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "itcast.direct"),
        key = {"blue","red"}
))
public void listenDirectQueue1(String msg) throws InterruptedException {
    System.err.println("消费者1监听:direct.queue1 接收到了消息:-->" + msg + "====" + LocalTime.now());
    Thread.sleep(200);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "itcast.direct"),
        key = {"red","yellow"}
))
public void listenDirectQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2监听:direct.queue2 接收到了消息:-->" + msg + "====" + LocalTime.now());
    Thread.sleep(20);
}
  1. 生成者
    @Test
    public void testSendDirectExchangeRed() {
//        交换机名称
        String exchangeName = "itcast.direct";
//        消息内容
        String redMsg = "hello,everyone,this is redMsg";
        rabbitTemplate.convertAndSend(exchangeName,"red",redMsg);

        String yellowMsg = "hello,everyone,this is yellowMsg";
        rabbitTemplate.convertAndSend(exchangeName,"yellow",yellowMsg);

        String blueMsg = "hello,everyone,this is blueMsg";
        rabbitTemplate.convertAndSend(exchangeName,"blue",blueMsg);
    }
消费者1监听:direct.queue1 接收到了消息:-->hello,everyone,this is redMsg====20:55:30.302807
消费者2监听:direct.queue2 接收到了消息:-->hello,everyone,this is redMsg====20:55:30.302807
消费者2监听:direct.queue2 接收到了消息:-->hello,everyone,this is yellowMsg====20:55:30.336717400
消费者1监听:direct.queue1 接收到了消息:-->hello,everyone,this is blueMsg====20:55:30.505301300

路由模式跟广播模式有什么区别?

  1. 广播模式会将消息推送给于交换机绑定的所有队列
  2. 路由模式只会将消息推送给与交换机绑定的,且带有指定key的队列
  3. 如果与交换机绑定的所有队列都带有同一个key,那么指定这个key推送的时候就跟广播模式一样了
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"), //消费者所在的队列
        exchange = @Exchange(name = "itcast.direct"),  //消费者所在队列绑定的交换机
        key = {"red","yellow"} //消费者所在队列与交换机绑定的key,后续生产者发送消息时,交换机根据key推送消息
))

发布订阅-TopicExchange

在这里插入图片描述

  1. 消费者
/**
 * 主题模式
 * @param msg
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue("topic.queue1"),
        exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.err.println("消费者1监听:topic.queue1 接收到了消息:-->" + msg);
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue("topic.queue2"),
        exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
        key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.err.println("消费者2监听:topic.queue2 接收到了消息:-->" + msg);
}
  1. 生产者
    @Test
    public void testSendTopicExchangeRed() {
//        交换机名称
        String exchangeName = "itcast.topic";
//        消息内容
        String redMsg = "这个应该输出两边";
        rabbitTemplate.convertAndSend(exchangeName,"china.news",redMsg);

        String yellowMsg = "这个输出在消费者1";
        rabbitTemplate.convertAndSend(exchangeName,"china.foods",yellowMsg);

        String blueMsg = "这个输出在消费者2";
        rabbitTemplate.convertAndSend(exchangeName,"usa.news",blueMsg);
    }
消费者2监听:topic.queue2 接收到了消息:-->这个应该输出两边
消费者1监听:topic.queue1 接收到了消息:-->这个应该输出两边
消费者2监听:topic.queue2 接收到了消息:-->这个输出在消费者2
消费者1监听:topic.queue1 接收到了消息:-->这个输出在消费者1

路由模式跟主题模式有什么区别?

  1. 路由模式是一个单词的,例如:blue
  2. 主题模式是由多个单词拼接起来的,例如:china.gaungzhou.news
  3. 主题模式在消费者队列绑定交换机的时候是支持通配符*#的,

消息转化器

MessageConverter,因为在发布和订阅的时候,接收内容的参数一直都是object,说明是可以发对象的,而底层都是通过字节码发送的,所以需要将java对象转化成字节码(序列化)发布出去,且订阅的时候需要将字节码转换成对象接收(反序列化),而spring是通过jdk自带的字节码转化器ObjectOutputStream,不好用,可以使用jack提供的MessageConverter

生产者跟消费者要用一样的消息转换器

  1. 生产者
<!--        jack依赖,用来覆盖spring自带的jdk的序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
@Bean //配置类,配置序列化方式,类似的在redis中用到过,同样是对象转字节
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}
@Test
public void testSendObjectMsg() {
    List<String> msg = new ArrayList<>();
    msg.add("1");
    msg.add("2");
    msg.add("3");
    rabbitTemplate.convertAndSend("ObjectQueue",msg);
}
  1. 消费者
<!--        jack依赖,用来接收消息,反序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>
@Bean
public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
}
@RabbitListener(queues = "ObjectQueue")
public void listener(List<String> msg){ //消费者接收方式要跟生产者发送方式一样,比如都是List
    System.err.println("消费者 接收到了消息:-->" + msg);
}

服务异步通讯

MQ的常见问题:

  • 消息的可靠性问题:如何确保发送的消息至少被消费一次
  • 延迟消息问题:如何实现消息的延迟投递
  • 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
  • 高可用问题:如何避免单点的MQ故障而导致的不可用问题

消息可靠性

在这里插入图片描述

什么时候有可能会出现消息丢失的问题呢?

  1. 发送消息时丢失:
    • 生产者发送的消息未到达交换机
    • 消息到交换机后未到达队列
  2. MQ宕机,消息在队列中丢失
  3. 消费者宕机,消费者收到消息后未消费就宕机

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

publisher-confirm,发送者确认

  • 消息成功投递到交换机,返回ack
  • 消息未投递到交换机,返回nack

publisher-return,发送者回执

  • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

确认消息发送的时候,需要给每个消息都设置一个全局唯一id,用于区分不同的消息,避免ack冲突

在这里插入图片描述

实现

  1. 引入依赖
spring:
  rabbitmq:
    #生产者确认消息
    publisher-confirm-type: correlated #publisher-confirm-type:开启publisher-confirm,
    #simple:同步等待confirm结果,直到超时。correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-returns: true #开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback。只有开启了这个才有返回结果
    template:
      mandatory: true #定义消息路由失败时的策略。true,则调用ReturnCallback;false: 则直接丢弃消息
      #到底返不返回结果,由这个决定,如果为false,返回失败就之久丢弃了
  1. 编写ReturnCallBack,这个是全局唯一的类,只有一种情况会调用这个ReturnCallBack,就是经过交换机但是没有到达队列
/**
 * ApplicationContextAware spring的通知接口,spring的bean工厂准备好了之后就会调用这个接口
 */
@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//        从bean工厂里获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//        给RabbitTemplate对象配置ReturnCallBack,生产者发送消息的返回结果,生产者消息回执
//        什么时候会调用这个方法?消息经过交换机后,没有路由到队列
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                        replyCode,replyText,exchange,routingKey, message);
                //可以选择重发消息
            }
        });
    }
}
  1. 编写CallBack,这个每次发消息都可以是不同的,针对不同的消息,检测ACK消息到达队列,和NACK消息没到交换机
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    String message = "hello, spring amqp!";

    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {
        @Override
        public void onSuccess(CorrelationData.Confirm confirm) {
            if (confirm.isAck()){
                //ACK
                log.debug("消息成功投递到交换机,消息id是:" + correlationData.getId());
            }else {
                //NACK
                log.error("消息没到交换机,消息id是:" + correlationData.getId());
            }
        }
    }, new FailureCallback() {
        @Override
        public void onFailure(Throwable throwable) {
            log.error("消息发送失败-->" + throwable);
        }
    });
    rabbitTemplate.convertAndSend("amq.topic", "simple.queue", message,correlationData);
}

返回结果:

  1. 没有错误,消息发送到了队列

    • 消息成功投递到交换机,消息id是:9db3541c-7c35-4db6-8c68-39522e48d3dc
      
  2. 消息没有到达交换机

    • 消息没到交换机,消息id是:884d0de4-31c3-49be-834a-4e17aaf73a04
      
  3. 消息到达了交换机,但是没到消息队列

    • 消息发送到队列失败,响应码:312,失败原因:NO_ROUTE,交换机:amq.topic,路由key:4simple.queue,消息:(Body:'hello, spring amqp!' MessageProperties [headers={spring_returned_message_correlation=dedbc4da-2a6f-4bd3-b073-1a6a5e0ac900}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]
      

消息持久化

默认情况下,通过Spring创建的交换机,队列,消息都是默认持久化的。new创建的时候会有两个参数传进去。

第二个是持久化开启,第三个是独占队列,第四个是是否自动删除,即没有使用这个队列的时候自动删除。

public Queue(String name) {
   this(name, true, false, false);
}

交换机同理,消息是经过了convert将消息处理了(持久化)


消费者消息确认

RabbitMQ支持消费者确认机制,即: 消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。

SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack; 抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto #none关闭,auto自动ack,manual手动ack

失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力。

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true #开启消费者失败重试机制
          initial-interval: 1000 #初次的失败等待时长为1000ms
          multiplier: 3 # 下次失败的等待时长倍数,下次等待时长=multiplier * last-interval,比如0 1 3
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态,false有状态。如果业务中包含事务,这里改为false

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

在这里插入图片描述

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }
    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(directExchange()).with("error");
    }
    /**
     * 用来接收错误的交换机消息,并且通过交换机名称,路由key来重新发送消息。
     * @param rabbitTemplate
     * @return
     */
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

如何确保RabbitMQ消息的可靠性

从三个方面回答:生产者方面,MQ方面,消费者方面

  • 生产者方面:
    • 开启生产者确认机制,确保生产者的消息能够到达队列
  • MQ方面:
    • 开启持久化机制,确保消息在未消费前在队列中不会丢失
  • 消费者方面:
    • 开启消费者消息确认机制为auto,由Spring确认消息处理成功后返回ack,失败后返回nack
    • 开启消费者失败重试机制,并且设置MessageRecoverer,多次失败后将消息投递到异常交换机由人工处理

延迟消息问题

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信 (dead letter) :

  • 消费者使用basicreject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)。

在这里插入图片描述

什么样的消息会称为死信消息?

  • nack了或者reject,且requeue=false
  • 消息超时未消费
  • 队列满了,早期的消息就会成为死信消息

如何给队列绑定死信交换机

  • 给队列设置dead-letter-exchange属性,指定一个交换机
  • 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

死信消息默认是被丢弃的

其实死信交换机跟RepublishMessageRecoverer有点类似

  • 死信交换机是由队列将消息投递到死信交换机
  • RepublishMessageRecoverer是由消费者将消息投递到错误异常交换机

TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间

在这里插入图片描述

意味着如果一个消息设置了ttl=5000ms,那这个消息从进入队列的那一刻就开始计时5s,一旦到了5s,就成为死信消息进入死信交换机,由死信交换机通过死信队列投递到监听的消费者。实现了延迟队列的效果


实现

有两种实现办法:

  1. 消息队列本来就有TTL,超时这个消息就自动变成死信消息了
  2. 消息本身就有TTL,消息超时也变成了死信消息

队列TTL

//给ttl队列绑定ttl交换机和dl死信交换机
@Configuration
public class TTLMessageConfig {
    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue") //指定队列名称,且持久化
                .ttl(10000) //队列的超时时间
                .deadLetterExchange("dl.direct") //绑定死信交换机
                .deadLetterRoutingKey("dl") //死信交换机的RoutingKey
                .build();
    }
    @Bean
    public DirectExchange TTLDirectExchange(){
        return new DirectExchange("ttl.direct");
    }
    @Bean
    public Binding DLBinding(DirectExchange TTLDirectExchange, Queue ttlQueue){
        return BindingBuilder.bind(ttlQueue).to(TTLDirectExchange).with("ttl");
    }
    @Bean
    public Queue DLQueue(){
        return new Queue("dl.queue");
    }
    @Bean
    public DirectExchange DLDirectExchange(){
        return new DirectExchange("dl.direct");
    }
    @Bean
    public Binding binding(Queue DLQueue,DirectExchange DLDirectExchange){
        return BindingBuilder.bind(DLQueue).to(DLDirectExchange).with("dl");
    }
}

消息TTL

    @Test
    public void testDDLMessage() {
        String msg = "通过死信交换机实现延迟队列,消息延迟5s";
        Message message = MessageBuilder
                .withBody(msg.getBytes(StandardCharsets.UTF_8))
                .setExpiration("5000")
                .build();
        String key = "ttl";
// 生产者消息确认机制
//        消息的唯一id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        lambada表达式
        correlationData.getFuture().addCallback(confirm -> {
            if (confirm.isAck()){
                log.debug("消息投递到了交换机,消息id: " + correlationData.getId());
            }else {
                log.debug("消息投递失败,返回了nack,消息id: " + correlationData.getId());
            }
        }, throwable -> {
            log.debug("消息投递异常,消息id: " + correlationData.getId());
        });
        rabbitTemplate.convertAndSend("ttl.direct", key, message, correlationData);
        log.debug("消息成功发送");
    }

延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(DelayQueue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

下载插件DelayExchange

相比于之前的声明,不同就是需要多一个属性delayed,其他一样

@Bean
public Queue DelayedQueue(){
    return new Queue("delayed.queue");
}

/**
 * 声明具有延迟功能的交换机
 * @return
 */
@Bean
public DirectExchange DelayedDirectExchange(){
    return ExchangeBuilder
            .directExchange("delayed.direct")
            .delayed()
            .durable(true) //持久化
            .build();
}
@Bean
public Binding DelayedBinding(Queue DelayedQueue,DirectExchange DelayedDirectExchange){
    return BindingBuilder.bind(DelayedQueue).to(DelayedDirectExchange).with("delayed");
}

与之前的发送消息相比,不同的是多了一个头信息setHeader(“x-delay”,3000)

    @Test
    public void testDelayedMessage() {
        String msg = "通过插件实现延迟队列,消息延迟3s";
        Message message = MessageBuilder
                .withBody(msg.getBytes(StandardCharsets.UTF_8))
                .setHeader("x-delay",3000) //延迟3000ms
                .build();
        String key = "delayed";
        rabbitTemplate.convertAndSend("delayed.direct", key, message, correlationData);
        log.debug("消息成功发送");
    }

消息堆积

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限

惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
@Bean
public Queue LazyQueue(){
    return QueueBuilder.durable("lazy.queue")
            .lazy()
            .build();
}

非持久化的数据也会写入磁盘,只是有条件的:等内存不足的情况下才会被写入到磁盘中。

持久化消息在到达队列时写入磁盘,同时会在内存中保存一份备份,当内存吃紧时,消息从内存中清除。这会提高一定的性能。非持久化消息一般只存于内存中,当内存压力大时,数据刷盘处理,以节省内存空间。


MQ集群

RabbitMO的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMO的集群有两种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
    • 如果其中一个节点宕机了,那那个节点上的消息和队列就丢失了
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性
  • 仲裁集群

普通集群

在一个节点的创建的消息可以同步给集群中其他节点,其他节点可以看到并且接收这个消息。

因为其他节点都有这个消息的原信息,知道这个消息是在哪个节点上,如果有消费者在其他节点上消费这个消息,那这些节点会到消息的原节点上取消息给消费者。


镜像集群

在这里插入图片描述


仲裁集群

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/5549.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java 8 - Lambda 表达式

1. 函数式接口 当一个接口中只有一个非 default 修饰的方法&#xff0c;这个接口就是一个函数式接口用 FunctionalInterface 标注 1&#xff09;只有一个抽象方法 FunctionalInterface public interface MyInterface {void print(int x); } 2&#xff09;只有一个抽象方法和…

射频接收机概述

接收机架构 射频接收机架构是指电子设备中用于接收无线电信号的部分。它通常由前置放大器、中频放大器、混频器、局部振荡器和带通滤波器等组成。以下是一个基本的射频接收机架构&#xff1a; 前置放大器&#xff1a;前置放大器的作用是放大接收天线接收到的微弱无线电信号&am…

程序员万万不能去的3种公司,越做越倒退,过来人的经验

俗话说“条条大路通罗马”&#xff0c;但是对于程序员来说&#xff0c;有些路千万别走&#xff0c;走得越久越难以抽身&#xff0c;甚至说毁掉你的职业生涯。 今天来跟大家讲一下&#xff0c;作为程序员&#xff0c;有些公司千万不要进去&#xff0c;你以为稀松平常&#xff0…

用Python发送电子邮件?这也太丝滑了吧(21)

小朋友们好&#xff0c;大朋友们好&#xff01; 我是猫妹&#xff0c;一名爱上Python编程的小学生。 欢迎和猫妹一起&#xff0c;趣味学Python。 今日主题 猫爸赚钱养家&#xff0c;细想起来真的不容易啊&#xff01; 起早贪黑&#xff0c;都是6点早起做早饭&#xff0c;送…

Autodesk AutoCAD 2023(CAD设计软件)自动化工具介绍以及图文安装教程

Autodesk AutoCAD是一款功能强大的计算机辅助设计软件&#xff0c;主要用于2D和3D设计、制图和草图。它适用于多种行业&#xff0c;包括建筑、土木工程、机械工程、电气工程等等。 Autodesk AutoCAD具有2D和3D设计、多种工具和功能、可扩展性、与其他Autodesk软件集成和多平台…

记录一次解决Maven问题的坑

记录一次解决Maven问题的坑目录概述需求&#xff1a;设计思路实现思路分析1.一步步的解决问题比较方法2.后来感觉和这个没关系3.最后查询资料拓展实现性能参数测试&#xff1a;参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfec…

python get方法及常用的代码

1.首先&#xff0c;我们需要下载一个 Python的 pygame库。 2.接着&#xff0c;我们需要在 Pygame中去注册一个自己的账户。 3.登录成功后&#xff0c;我们就可以去下载 pygame中的文件了。 4.我们现在只需要将下载文件放入到 Pygame库中即可&#xff0c;这就完成了下载&#xf…

算法学习day43

算法学习day431. 力扣1049. 最后一块石头的重量 II1.1 分析1.2 代码2. 力扣494. 目标和2.1 分析2.2 代码3. 力扣474.一和零3.1 分析3.2 代码4.参考资料1. 力扣1049. 最后一块石头的重量 II 1.1 分析 动规五部曲&#xff1a; 1.确定dp数组以及下标的含义 dp[j]表示容量为j的背…

第⑦讲:Ceph集群RGW对象存储核心概念及部署使用

文章目录1.RadosGW对象存储核心概念1.1.什么是RadosGW对象存储1.2.RGW对象存储架构1.3.RGW对象存储的特点1.4.对象存储中Bucket的特性1.4.不同接口类型的对象存储访问对比2.在集群中部署RadosGW对象存储组件2.1.部署RGW组件2.2.集群中部署完RGW组件后观察集群的信息状态2.3.修改…

剑指offer JZ27 二叉树的镜像

Java JZ27 二叉树的镜像 文章目录Java JZ27 二叉树的镜像一、题目描述二、辅助栈三、递归法使用辅助栈和递归法解决剑指offer JZ27 二叉树的镜像的问题。 一、题目描述 操作给定的二叉树&#xff0c;将其变换为源二叉树的镜像。   数据范围&#xff1a;二叉树的节点数 0≤n≤…

--编写一个存储过程,输入一个日期,返回该日期与当下日期的时间差,如果该差是负的,则提示该日期已经过去XX天,不然提示距离该日期还有xx天

--创建存储过程&#xff0c;一个输入参数&#xff0c;一个输出参数 create or replace procedure sp_minus(i_date varchar2,o_minus out varchar2) is --声明一个变量&#xff0c;用来存放异常 v_errm varchar2(200); begin --判断输入格式 if length(i_date)<>8 th…

Redis主从复制

文章目录定义用途怎么使用案例演示三大命令&#xff1a;修改配置文件细节常见方式一主二仆薪火相传反客为主复制原理和工作流程主从复制的缺点定义 主从复制&#xff0c;master以写为主&#xff0c;slave以读为主&#xff0c;当master数据变化的时候&#xff0c;自动将新的数据…

十分钟搞懂Java限流及常见方案

目录限流基本概念QPS和连接数控制传输速率黑白名单分布式环境限流方案常用算法令牌桶算法漏桶算法滑动窗口常用的限流方案Nginx限流中间件限流限流组件合法性验证限流Guawa限流网关层限流从架构维度考虑限流设计限流基本概念 QPS和连接数控制 传输速率 黑白名单 分布式环境…

HTML5 <abbr> 标签 和 HTML5 <applet> 标签

标签定义及使用说明 <abbr> 标签用来表示一个缩写词或者首字母缩略词&#xff0c;如"WWW"或者"NATO"。 通过对缩写词语进行标记&#xff0c;您就能够为浏览器、拼写检查程序、翻译系统以及搜索引擎分度器提供有用的信息。 实例 被标记的缩写词如…

《程序员面试金典(第6版)》面试题 08.04. 幂集(回溯算法,位运算,C++)不断更新

题目描述 幂集。编写一种方法&#xff0c;返回某集合的所有子集。集合中不包含重复的元素。 说明&#xff1a;解集不能包含重复的子集。 示例: 输入&#xff1a; nums [1,2,3] 输出&#xff1a; [ [3], [1], [2], [1,2,3], [1,3], [2,3], [1,2], [] ] 解题思路与代码 其实…

博客让谷歌或是百度收录

参考以下大佬的博客教程 Hexo框架(六)&#xff1a;SEO优化及站点被搜索引擎收录设置 | 你真是一个美好的人类 第一步 安装百度和 Google 的站点地图生成插件&#xff1a; npm install hexo-generator-baidu-sitemap --save npm install hexo-generator-sitemap --save 然后来…

文件或目录损坏且无法读取错误的恢复方法

我们在日常的生活当中经常都会遇到各种各样的问题。比如有些时候将磁盘插入电脑之后突然跳出来一个“磁盘结构损坏且无法读取”的提示框&#xff0c;那么像这个情况该怎么解决呢?别着急&#xff0c;小编现在就将磁盘结构损坏且无法读取这个问题的解决方法来分享给你们 文件或目…

数据结构和算法学习记录——栈和队列习题-用队列实现栈、用栈实现队列(核心思路、解题过程、完整题解)

目录 用队列实现栈 题目描述 题目示例 核心思路 解题过程 定义结构体 创建栈结构体函数 入栈函数 出栈函数 取栈顶数据函数 判断栈是否为空函数 销毁栈函数 完整题解&#xff08;C语言&#xff09; 用栈实现队列 题目描述 题目示例 核心思路 完整题解…

计算机网络管理 ARP 地址解析协议 ARP的基础原理 Wireshark ARP 报文分析 ARP的通信过程

⬜⬜⬜ ---&#x1f7e7;&#x1f7e8;&#x1f7e9;&#x1f7e6;&#x1f7ea; (*^▽^*)欢迎光临 &#x1f7e7;&#x1f7e8;&#x1f7e9;&#x1f7e6;&#x1f7ea;---⬜⬜⬜ ✏️write in front✏️ &#x1f4dd;个人主页&#xff1a;陈丹宇jmu &#x1f381;欢迎各位→…

GPT4和ChatGPT的区别,太让人震撼

文 | Serendipity知乎 前言 GPT4上午朋友圈已经刷屏啦&#xff0c;不过我还在忙&#xff0c;刚刚才登上 GPT-4 &#xff0c;现在来体验一下~ 附 GPT-4 能力测试站&#xff08;无需魔法&#xff0c;仅供国内研究测试&#xff09;&#xff1a; https://gpt4test.com 附 Cha…
最新文章