RocketMQ(二):原生API快速入门

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门


目录

  • 一、RocketMQ快速入门
    • 1、生产者发送消息
    • 2、消费者接受消息
    • 3、代理者位点和消费者位点
  • 二、消费模型特点
    • 1、同一个消费组的不同消费者,订阅主题必须相同
    • 2、不同消费者组订阅同一主题,都会收到一份消息
    • 3、消费者组内负载均衡模式,消费者固定队列接收消息
    • 4、消费模式
  • 三、不同类型消息
    • 1、发送同步消息
    • 2、发送异步消息
    • 3、发送单向消息
    • 4、发送延迟消息
    • 5、发送批量消息
    • 6、发送顺序消息
    • 7、发送带标签的消息
    • 8、发送带key的消息
  • 四、消息重试和死信消息
    • 1、生产者重试
    • 2、消费者重试
    • 3、死信消息

一、RocketMQ快速入门

pom.xml

<!-- 原生的api   -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.2</version>
</dependency>

1、生产者发送消息

@Test
public void simpleProducer() throws Exception {
    // 创建一个生产者  (制定一个组名)
    DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
    // 连接namesrv
    producer.setNamesrvAddr("localhost:9876");
    // 启动
    producer.start();
    // 创建一个消息
    Message message = new Message("testTopic", "我是一个简单的消息".getBytes());
    // 发送消息
    SendResult sendResult = producer.send(message);
    // 这是个同步消息,所以这里可以拿到消息的消费状态
    System.out.println(sendResult.getSendStatus());
    // 关闭生产者
    producer.shutdown();
}
  • dashboard客户端界面,查看主题界面可以看到刚刚创建的testTopic主题

在这里插入图片描述

  • 如下状态按钮进入的页面,默认四个队列,目前队列1有一条消息未消费

在这里插入图片描述

  • 如下CONSUMER(消费者)管理按钮进入的页面,目前还没创建消费者

在这里插入图片描述

2、消费者接受消息

  • 消费监听MessageListenerConcurrently是多线程消费,默认20个线程
  • 返回消费状态RECONSUME_LATER报错null
    • 消息会重新回到队列,之后重试发送给消费者
@Test
public void testConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("testTopic", "*");
    // 注册一个消费监听 MessageListenerConcurrently 是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + "----" + msgs);
            // 返回消费的状态 如果是CONSUME_SUCCESS 则成功,若为RECONSUME_LATER则该条消息会被重回队列,重新被投递
            // 重试的时间为messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            // 也就是第一次1s 第二次5s 第三次10s  ....  如果重试了18次 那么这个消息就会被终止发送给消费者
			// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 这个start一定要写在registerMessageListener下面
    consumer.start();
    System.in.read();
}
  • 状态栏这里记录的是生产者发送的条数,所以这里没有变化

在这里插入图片描述

  • CONSUMER(消费者)管理栏记录消费的情况

在这里插入图片描述

3、代理者位点和消费者位点

  • 生产者发送10条消息,消费者还没接受,则是如下

在这里插入图片描述

  • 以队列2为例
  • 在队列中,生产者发送一个消息,代理者位点,向左移动一位
  • 消费者接收一个消息,消费者位点,向左移动一位,如果消费者异常,则不移动,之后还会给消费者再发此消息
  • 差值则是代理者位点减去消费者位点,也就是等待发送给消费者的消息数量

在这里插入图片描述

  • 开启消费者将消息全部接收后,差值为0

在这里插入图片描述

二、消费模型特点

1、同一个消费组的不同消费者,订阅主题必须相同

  • 如果订阅主题不同,那么两个消费者将接收不到来自不同主题的的消息
  • 只有关闭其中一个消费者,另外一个消费者就能正常运行了

在这里插入图片描述

2、不同消费者组订阅同一主题,都会收到一份消息

  • 订阅的消费者组一定会收到消息,但是具体的消费者不一样可以收到
  • 消费者组的策略
    • 负载均衡模式:同一组内消费者轮训获取到消息
    • 广播模式:同一组内消费者都能获取到消息

在这里插入图片描述

3、消费者组内负载均衡模式,消费者固定队列接收消息

  • 如果有c1/c2两个消费者,4个通道,那么c1只接收0/1队列的消息,c2只接收2/3队列的消息
  • 也就是系统会给消费者尽量平均分配可以接收的队列,没有分配的队列消息不会接收
  • 如果消费者多,那么多出来的消费者永远接收不到消息

在这里插入图片描述

4、消费模式

  • MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull
  • Push是服务端【MQ】主动推送消息给客户端
    • 优点是及时性较好
    • 但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃
  • Pull是客户端需要主动到服务端取数据
    • 优点是客户端可以依据自己的消费能力进行消费
    • 但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时
  • Push模式也是基于Pull模式的,只是客户端内部封装了api(长轮训方式)
    • 一般场景下,上游消息生产量或者均速的时候,选择push模式
    • 在特殊场景下,例如电商促,抢优惠券等场景可以选择pull模式

三、不同类型消息

1、发送同步消息

  • 上面的快速入门就是发送同步消息
  • 发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认
  • 这种方式非常安全,但是性能上并没有这么高
  • 而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回
  • 针对重要的消息可以选择这种方式

在这里插入图片描述

2、发送异步消息

  • 异步消息通常用在对响应时间敏感的业务场景
  • 即发送端不能容忍长时间地等待Broker的响应
  • 发送完以后会有一个异步消息通知
    • 与同步发送相比,send方法没有返回值
    • 通过回调接口SendCallback获取发送成功还是失败
@Test
public void asyncProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }

        @Override
        public void onException(Throwable e) {
            System.err.println("发送失败:" + e.getMessage());
        }
    });
    System.out.println("我先执行");
    System.in.read();
}

执行结果:

我先执行
发送成功
  • 因为是异步,所以发送消息后,不论成功失败,继续往下走,执行“我先发送”
  • 之后消费发送成功,回调函数执行“发送成功”

3、发送单向消息

  • 这种方式主要用在不关心发送结果的场景
  • 这种方式吞吐量很大,但是存在消息丢失的风险
  • 例如日志信息的发送
@Test
public void onewayProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    Message message = new Message("onewayTopic", "日志xxx".getBytes());
    producer.sendOneway(message);
    System.out.println("成功");
    producer.shutdown();
}

4、发送延迟消息

  • 消息放入mq后,过一段时间,才会被监听到,然后消费
  • 比如抢票业务
    • 一个人抢到票,无论付款与否,都发送一个延时消息(车票id,此时状态为占用)
    • 15分钟后, 去处理这里车票id
    • 通过车票id判断是否付款,如果付款则什么都不处理
    • 如果没有付款则将此车票id状态修改位未占用
@Test
public void msProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("orderMsTopic", "订单号,座位号".getBytes());
    // 给消息设置一个延迟时间
    message.setDelayTimeLevel(3);
    // 发延迟消息
    producer.send(message);
    producer.shutdown();
}
  • 通过message对象设置延迟对象,如下等级对应延迟时间
  • 当然以后springboot项目也可以通过配置文件属性delayTimeLevel自定义时间

在这里插入图片描述

5、发送批量消息

  • 可以一次性发送一组消息
@Test
public void testBatchProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    List<Message> msgs = Arrays.asList(
            new Message("batchTopic", "我是一组消息的A消息".getBytes()),
            new Message("batchTopic", "我是一组消息的B消息".getBytes()),
            new Message("batchTopic", "我是一组消息的C消息".getBytes())
    );
    SendResult send = producer.send(msgs);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}
  • 一组消息都在同一个队列里,排队消费

在这里插入图片描述

  • 一组3个消息,多线程一次消费了三个消息
@Test
public void testBatchConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-producer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   表达式,默认是*
    consumer.subscribe("batchTopic", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

执行结果:

ConsumeMessageThread_3----我是一组消息的C消息
ConsumeMessageThread_1----我是一组消息的A消息
ConsumeMessageThread_2----我是一组消息的B消息

6、发送顺序消息

  • 消息有序指的是可以按照消息的发送顺序来消费
  • 虽然队列FIFO先入先出,但RocketMQ的broker有四个queue
    • 默认的情况下消息发送会采取轮询方式把消息发送到不同的queue
    • 而消费消息的时候从多个queue上拉取消息
    • 这种情况发送和消费是不能保证顺序
  • 但是如果控制发送的顺序消息只依次发送到同一个queue中
    • 消费的时候只从这个queue上依次拉取,则就保证了顺序
  • 当发送和消费参与的queue只有一个,则是全局有序
  • 如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的

发送顺序消息

  • 两组订单,110订单和120订单
  • 每组需要保证顺序:下订单->物流->签收

  • 发送消息的send方法需要传一个MessageQueueSelector的实现类
  • 实现select方法,返回MessageQueue对象(当前send消息放入哪个队列
@Test
public void testOrderlyProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    List<Order> orderList = Arrays.asList(
            new Order(1, 110, "下订单"),
            new Order(2, 110, "物流"),
            new Order(3, 110, "签收"),
            
            new Order(4, 120, "下订单"),
            new Order(5, 120, "物流"),
            new Order(6, 120, "拒收")
    );
    // 循环集合开始发送
    orderList.forEach(order -> {
        Message message = new Message("TopicTest", order.toString().getBytes());
        try {
            // 发送的时候 相同的订单号选择同一个队列
            producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 当前主题有多少个队列
                    int queueNumber = mqs.size();
                    // 这个arg就是后面传入的 order.getOrderNumber()
                    Integer i = (Integer) arg;
                    // 用这个值去%队列的个数得到一个队列
                    int index = i % queueNumber;
                    // 返回选择的这个队列即可 ,那么相同的订单号 就会被放在相同的队列里 实现FIFO了
                    return mqs.get(index);
                }
            }, order.getOrderNumber());
        } catch (Exception e) {
            System.out.println("发送异常");
        }
    });
    // 关闭实例
    producer.shutdown();
}

接收顺序消息

  • 因为要顺序消费,所以不能用默认的MessageListenerConcurrently多线程消费
  • 这里需要用到MessageListenerOrderly单线程消费
@Test
public void testOrderlyConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerOrderly 是顺序消费 单线程消费
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new String(messageExt.getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

7、发送带标签的消息

  • RocketMQ提供消息过滤功能,通过tag进行区分
  • 订阅关系一致:同一个消费者组下所有消费者实例所订阅的Topic、Tag必须完全一致
  • 我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分
  • 比如带有tagA标签的被A消费,带有tagB标签的被B消费

生产者发送标签消息(同一个主题,不同的标签

@Test
public void tagProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
    Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
    producer.send(message);
    producer.send(message2);
    System.out.println("发送成功");
    producer.shutdown();
}
  • 消费者组a只监听主题为tagTopic,标签为vip1
  • subscribe订阅方法第二个参数,默认*监听所有的标签
@Test
public void tagConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("tagTopic", "vip1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我是vip1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}
  • 消费者组b监听主题为tagTopic,标签为vip1或vip2
@Test
public void tagConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("tagTopic", "vip1 || vip2");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我是vip2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

什么时候该用 Topic,什么时候该用 Tag?

总结:不同的业务应该使用不同的Topic如果是相同的业务里面有不同表的表现形式,那么我们要使用tag进行区分

可以从以下几个方面进行判断:

  1. 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分
  2. 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分
  3. 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
  4. 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic

通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息

8、发送带key的消息

  • 在rocketmq中的消息,默认会有一个messageId当做消息的全局唯一标识

在这里插入图片描述

  • 我们也可以给消息携带一个key,用作业务唯一标识
  • 如果发送两次相同内容消息
    • 业务唯一标识key肯定一样,可以阻止重复消费
    • 但是上面默认的messageId则不一样,这样则无法区分重复数据

带key消息生产者

@Test
public void testKeyProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest","我是一个带key的消息".getBytes());
    // 通过Message对象设置key
    String key = UUID.randomUUID().toString();
    msg.setKeys(key);
    SendResult send = producer.send(msg);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

带key消息消费者(从MessageExt对象中获取key

@Test
public void testKeyConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费 
    consumer.subscribe("TopicTest","*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            // 从MessageExt对象获取key
            System.out.println("key值: " + messageExt.getKeys());
            System.out.println("消息体: " + new String(messageExt.getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

根据主题和key查询消息

在这里插入图片描述

四、消息重试和死信消息

1、生产者重试

  • 可以分别设置同步消息和异步消息发送的重试次数
    • 同步发送失败,则轮转到下一个Broker
    • 异步发送失败,则只会在当前Broker进行重试
  • 一般不用设置,默认都是2次
  • 发送消息超时时间默认3000毫秒,如果因为超时,那么便不再尝试重试
@Test
public void retryProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();

    // 生产者发送消息 重试次数
    producer.setRetryTimesWhenSendFailed(2); // 同步消息重试次数
    producer.setRetryTimesWhenSendAsyncFailed(2); // 异步消息重试次数

    Message message = new Message("retryTopic", "我是普通消息".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

2、消费者重试

  • 默认的重试间隔:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 即默认-1代表重试16
  • 广播模式下,保证消息至少被消费一次,但不提供重发的选项
  • 在单线程的顺序模式下,默认-1代表重试Integer.MAX_VALUE次,间隔1秒
@Test
public void retryConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("retryTopic", "*");
    
    // 设定重试次数
    consumer.setMaxReconsumeTimes(2);
    
    // 并发模式
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println("当前时间: " + new Date());
            System.out.println("重试次数: " + messageExt.getReconsumeTimes());
            System.out.println("接收内容: " + new String(messageExt.getBody()));
            System.out.println("----------------------------------------------");
            // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
	
	// 单线程模式
 // consumer.registerMessageListener(new MessageListenerOrderly() {
 //     @Override
 //     public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
 //         // 参数取值范围10~30000ms,默认值-1,表示1000ms,即1秒重试一次
 //         context.setSuspendCurrentQueueTimeMillis(1000);
 //         return ConsumeOrderlyStatus.SUCCESS;
 //     }
 // });
    consumer.start();
    System.in.read();
}

设置重试二次的执行结果:

在这里插入图片描述

3、死信消息

  • 当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
  • 死信队列是死信Topic下分区数唯一的单独队列
  • 死信Topic名称为%DLQ%原消费者组名,死信队列的消息将不会再被消费

上一节的消费者重试两次后,就会将消息放入死信队列

在这里插入图片描述

处理死信消息方式一:

  • 监听死信队列处理消息
@Test
public void retryDeadConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("%DLQ%retry-consumer-group", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 处理消息 签收了
            System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

处理死信消息方式二:

  • 控制重试次数,重试几次后,直接记录到数据库等等
@Test
public void retryConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("retryTopic", "*");
    // 设定重试次数
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new Date());
            // 业务处理
            try {
                int i = 1 / 0;
            } catch (Exception e) {
                // 重试
                int reconsumeTimes = messageExt.getReconsumeTimes();
                if (reconsumeTimes >= 3) {
                    // 不要重试了
                    System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/160586.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python每日一练@前言

Python每日一练前言 导读 人生苦短&#xff0c;我用Python 大家好&#xff0c;我是鹅不糊涂 欢迎大家来到Python每日一练 好处 加强编程能力: 每日一练可以帮助提升编程技能&#xff0c;通过解决各种编程问题和挑战&#xff0c;你能够不断锻炼自己的逻辑思维和解决问题的能力…

不允许你还没有了解哈希表、哈希桶、哈希冲突的解决,如何避免冲突

✏️✏️✏️今天给各位带来的是哈希桶、哈希冲突方面的知识。 清风的CSDN博客 &#x1f61b;&#x1f61b;&#x1f61b;希望我的文章能对你有所帮助&#xff0c;有不足的地方还请各位看官多多指教&#xff0c;大家一起学习交流&#xff01; 动动你们发财的小手&#xff0c;点…

实用篇-ES-DSL查询文档

数据的存储不是目的&#xff0c;我们希望从海量的酒店数据中检索出需要的信息&#xff0c;这就是ES的搜索功能 官方文档: https://elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html#query-dsl。DSL是用来查询文档的 Elasticsearch提供了基于JSON的DSL来定…

vite vue3配置eslint和prettier以及sass

准备 教程 安装eslint 官网 vue-eslint ts-eslint 安装eslint yarn add eslint -D生成配置文件 npx eslint --init安装其他插件 yarn add -D eslint-plugin-import eslint-plugin-vue eslint-plugin-node eslint-plugin-prettier eslint-config-prettier eslint-plugin…

【数据结构】图的存储结构及实现(邻接表和十字链表)

一.邻接矩阵的空间复杂度 假设图G有n个顶点e条边&#xff0c;则存储该图需要O&#xff08;n^2) 不适用稀疏图的存储 二.邻接表 1.邻接表的存储思想&#xff1a; 对于图的每个顶点vi&#xff0c;将所有邻接于vi的顶点链成一个单链表&#xff0c;称为顶点vi的边表&#xff08…

C/C++ 运用VMI接口查询系统信息

Windows Management Instrumentation&#xff08;WMI&#xff09;是一种用于管理和监视Windows操作系统的框架。它为开发人员、系统管理员和自动化工具提供了一种标准的接口&#xff0c;通过这个接口&#xff0c;可以获取有关计算机系统硬件、操作系统和应用程序的信息&#xf…

PS学习笔记——新建文档/修改文档

文章目录 新建文档文档属性像素/分辨率颜色模式背景内容高级选项存储预设 修改文档 新建文档 方法一&#xff1a;ctrlN快捷键可直接打开新建文档界面 方法二&#xff1a;点击菜单栏中 文件->新建&#xff0c;即可打开新建文档界面 文档参数可按需调节(标题可以提前设定或者…

face_recognition:高准确率、简单易用的人脸识别库 | 开源日报 No.79

ageitgey/face_recognition Stars: 49.8k License: MIT 这个项目是一个使用 Python 编写的人脸识别库&#xff0c;可以从图片中识别和操作人脸。它基于 dlib 开发&#xff0c;并采用深度学习技术构建了最先进的人脸识别模型&#xff0c;在 Labeled Faces in the Wild 数据集上…

Redis(消息队列Stream)

Stream是一个轻量级的消息队列。 Redis中Stream的作用是提供一种高效的消息传递机制&#xff0c;允许多个消费者并行地消费消息&#xff0c;并且不会重复消费已经处理过的消息。它可以用于实现分布式任务队列、日志收集、实时数据处理等场景。Redis中的Stream支持多个消费者组…

Python数据分析实战① Python实现数据可视化

文章目录 一、数据可视化介绍二、matplotlib和pandas画图1.matplotlib简介和简单使用2.matplotlib常见作图类型3.使用pandas画图4.pandas中绘图与matplotlib结合使用 三、订单数据分析展示四、Titanic灾难数据分析显示 一、数据可视化介绍 数据可视化是指将数据放在可视环境中…

6.2 List和Set接口

1. List接口 List接口继承自Collection接口&#xff0c;List接口实例中允许存储重复的元素&#xff0c;所有的元素以线性方式进行存储。在程序中可以通过索引访问List接口实例中存储的元素。另外&#xff0c;List接口实例中存储的元素是有序的&#xff0c;即元素的存入顺序和取…

【Linux网络编程】高级I/O

目录 五种I/O模型 阻塞和非阻塞 非阻塞I/O I/O多路复用之Select、Poll、与Epoll 本文目的是深入浅出理解高级I/O相关的知识&#xff0c;结尾附上代码加深理解相关知识。 五种I/O模型 1.阻塞I/O&#xff1a;在内核将数据准备好之前&#xff0c;系统调用会一直等待。所有的套…

【踩坑笔记】国科GK7202V300芯片开发常见问题解决办法

国科Linux芯片开发常见问题&解决办法 0.读前须知 不管什么时候&#xff0c;下载程序还是啥&#xff0c;一定要检查路径&#xff01;&#xff01;&#xff01;别问我为什么&#xff0c;呜呜呜~ tips&#xff1a;该芯片是仿造海思的产品&#xff0c;所以&#xff0c;有些不…

cp: can‘t stat ‘/usr/share/zoneinfo/Asia/Shanghai‘: No such file or directory

目录 问题描述问题分析解决方案容器时区验证 问题描述 使用下面的 Dockerfile 为 youlai-boot 项目制作镜像设置容器时区报错。 # 基础镜像 FROM openjdk:17-jdk-alpine # 时区修改 RUN /bin/cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \&& echo Asia/Sha…

【每周一测】Java阶段三阶段考试

目录 1、SpringBoot在整合RabbitMQ时需要导入的包是 2、下列关于RabbitMQ的confirm消息确认机制解释说明正确的是 3、关于SpringBoot的配置文件&#xff0c;以下说法正确的是&#xff08;&#xff09; 4、变量命名规范说法正确的是? 5、哪个关键字可以对对象加互斥锁&…

计算机视觉的应用18-一键抠图人像与更换背景的项目应用,可扩展批量抠图与背景替换

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下计算机视觉的应用18-一键抠图人像与更换背景的项目应用&#xff0c;可扩展批量抠图与背景替换。该项目能够让你轻松地处理和编辑图片。这个项目的核心功能是一键抠图和更换背景。这个项目能够自动识别图片中的主体&…

医院绩效考核系统源码 医院绩效考核系统方案

医院绩效考核系统源码 医院绩效考核系统是现代医院管理的重要方法和科学的管理工具。良好的绩效管理&#xff0c;有助于带动全院职工的工作积极性&#xff0c;有助于提高工作效率、提高医疗质量、改善服务水平、降低运营成本&#xff0c;全面提升医院的精细化管理水平。 医院绩…

Flask学习一:概述

搭建项目 安装框架 pip install Flask第一个程序 from flask import Flaskapp Flask(__name__)app.route(/) def hello_world():return "Hello World"if __name__ __main__:app.run()怎么说呢&#xff0c;感觉还不错的样子。 调试模式 if __name__ __main__:a…

后端老项目迁移方法

老项目迁移方法 需求&#xff1a; 因某个模块MySQL表结构、表关系 错乱复杂&#xff0c;而且其他模块的代码也在操作这个模块的数据库 耦合严重 导致Web工程代码紊乱、不易理解、性能低下&#xff0c; 故在 系统由A JavaWeb工程迁移至B工程 时&#xff0c;重构MySQL表结构、表…

VS中修改解决方案名称和项目名称

如何修改visual studio2019中的项目名 - 知乎 (zhihu.com) 查了很多&#xff0c;还是这个可行。虽然文中说不是最简单的&#xff0c;但在所查找资料中是可行且最简单的。 要点主要是&#xff1a; 1、比如我们复制一个解决方案&#xff0c;最好是带代码哈&#xff0c;也就是添…