2.12日学习打卡----初学RocketMQ(三)

2.12日学习打卡

目录:

  • 2.12日学习打卡
  • 一. RocketMQ高级特性(续)
    • 消息重试
    • 延迟消息
    • 消息查询
  • 二.RocketMQ应用实战
    • 生产端发送同步消息
    • 发送异步消息
    • 单向发送消息
    • 顺序发送消息
    • 消费顺序消息
    • 全局顺序消息
    • 延迟消息
    • 事务消息
    • 消息查询

一. RocketMQ高级特性(续)

消息重试

生产端重试

例如由于网络原因导致生产者发送消息到MQ失败,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

消费端重试

同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。

  • 顺序消息重试
    对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ
    会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用
    会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必
    保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的
    发生

  • 无序消息重试
    对于无序消息(普通、定时、延时、事务消息),当消费者消费
    消息失败时,可以通过设置返回状态达到消息重试的结果。

    • 最大重试次数
      消息消费失败后,可被消息队列RocketMQ重复投递的最大
      次数。
      TCP协议无序消息重试时间间隔:
      在这里插入图片描述
    • 消费失败后重新配置方式
    • 需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
      • 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
      • 返回 Null
      • 抛出异常

    延迟消息

    Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
    消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
    定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并且会根据
    delayTimeLevel存入特定的queue,queueId = delayTimeLevel –1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

//
org/apache/rocketmq/store/config/MessageStore
Config.java
private String messageDelayLevel = "1s 5s 10s
30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h";

代码测试
生产者

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class DelayMessageProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer=new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("192.168.66.100:9876");
        producer.start();
        Message message=null;
        for(int i=0; i<20;i++){
            message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());
           //设置延迟时间0-17 0表示2秒 大于18都是2小时
            message.setDelayTimeLevel(i);
            producer.send(message);
        }
       producer.shutdown();

    }
}

消费者

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;



public class DelayMessageConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");

        consumer.setNamesrvAddr("192.168.66.100:9876");
        System.out.println("==========================================");
        //设置消息重试次数
        consumer.setMaxReconsumeTimes(5);
        //设置可以批量处理
        consumer.setConsumeMessageBatchMaxSize(1);
        //订阅主题
        consumer.subscribe("tp_demo_3","*");
        consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            System.out.println(System.currentTimeMillis()/1000);
            for(MessageExt message:list){
                System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
       consumer.start();

    }
}

运行结果
在这里插入图片描述

消息查询

在这里插入图片描述

在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。

//返回结果
SendResult [
 sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
 messageQueue=MessageQueue [topic=TopicA,
brokerName=broker-a, queueId=0],
 queueOffset=0]
  • 按MessageId查询消息
    Message Id 是消息发送后,在Broker端生成的,其包含了
    Broker的地址、偏移信息,并且会把Message Id作为结果的一
    部分返回。Message Id中属于精确匹配,代表唯一一条消息,
    查询效率更高。

  • 按照Message Key查询消息
    消息的key是开发人员在发送消息之前自行指定的,通常把具有
    业务含义,区分度高的字段作为消息的key,如用户id,订单id
    等。

  • 按照Unique Key查询消息
    除了开发人员指定的消息key,生产者在发送发送消息之前,会
    自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一
    代表一条消息

消息在消息队列RocketMQ中存储的时间默认为3天(不建议修
改),即只能查询从消息发送时间算起3天内的消息,三种查询方式
的特点和对比如下表所述:
在这里插入图片描述

二.RocketMQ应用实战

在这里插入图片描述

生产端发送同步消息

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求
在这里插入图片描述

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class SyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
        //实例化消息生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.66.100:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

运行结果

SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]
... ...

Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。

SendStatus:发送的标识。成功,失败等

Queue:相当于是Topic的分区;用于并行发送和接收消息

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。在这里插入图片描述
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;

public class AsyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.66.100:9876");
        // 启动Producer实例
        producer.start();
        //消息失败重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);
        int messageCount = 100;
        // 根据消息数量实例化倒计时计算器
        final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
        //创建消息
        for(int i=0;i<messageCount;i++){
            final int index=i;
           Message msg=new Message("topic_demo","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    countDownLatch.countDown();
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    countDownLatch.countDown();
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
           
        }
        // 等待5s
        countDownLatch.await(5, TimeUnit.SECONDS);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送
在这里插入图片描述
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别

package com.jjy.produce;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.66.100:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送单向消息,没有任何返回结果
            producer.sendOneway(msg);


        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}


消息发送时的权衡

发送方式发送TPS发送结果反馈可靠性使用场景
同步发送可靠邮件、短信、推送
异步发送可靠视频转码
单向发送最快可能丢失日志收集

顺序发送消息

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;

public class OrderProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
        producer.setNamesrvAddr("192.168.66.100:9876");
        producer.start();
        // 获取指定主题的MQ列表
        final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");


        Message message = null;
        MessageQueue messageQueue = null;
        for (int i = 0; i < 100; i++) {
            // 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序
            // 发送到同一个MQ
            messageQueue = messageQueues.get(i % 8);
            //创建message对象
            //发送创建订单消息
            message = new Message("tp_demo_11", ("hello rocketmq order create - " + i).getBytes());
            producer.send(message, messageQueue);
            //发送付款订单消息
            message = new Message("tp_demo_11", ("hello rocketmq order pay - " + i).getBytes());
            producer.send(message, messageQueue);
            //发送订单送货消息
            message = new Message("tp_demo_11", ("hello rocketmq order delivery - " + i).getBytes());
            producer.send(message, messageQueue);
        }


        producer.shutdown();
    }
}

消费顺序消息

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
        consumer.setNamesrvAddr("192.168.66.100:9876");
        //订阅主题
        consumer.subscribe("tp_demo_11", "*");

       //最小消费线程数
        consumer.setConsumeThreadMin(1);
        //最大消费线程数
        consumer.setConsumeThreadMax(1);
        //一次拉取的消息数量
        consumer.setPullBatchSize(1);
        //一次消费的消息数量
        consumer.setConsumeMessageBatchMaxSize(1);


        // 使用有序消息监听器
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeOrderlyContext context) {


                for (MessageExt msg : msgs) {
                    System.out.println(
                            msg.getTopic() + "\t" +
                                    msg.getQueueId() + "\t" +
                                    new String(msg.getBody())
                    );
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

全局顺序消息

生产者

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;

public class GlobalOrderProducer {
   public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

       DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
       producer.setNamesrvAddr("192.168.66.100:9876");
       producer.start();
       Message message = null;
       for(int i=0;i<100;i++){
           message=new Message("tp_demo_11",("全局有序消息...."+i).getBytes());
           producer.send(message, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                   return list.get((Integer)0);
               }
           },1);

       }
       producer.shutdown();
   }
}

消费者

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class GlobalConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
        consumer.setNamesrvAddr("192.168.66.100:9876");
        //订阅主题
        consumer.subscribe("tp_demo_11", "*");

        //最小消费线程数
        consumer.setConsumeThreadMin(1);
        //最大消费线程数
        consumer.setConsumeThreadMax(1);
        //一次拉取的消息数量
        consumer.setPullBatchSize(1);
        //一次消费的消息数量
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("消费线程=" + Thread.currentThread().getName() +
                            ", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

延迟消息

生产者

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class DelayMessageProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer=new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("192.168.66.100:9876");
        producer.start();
        Message message=null;
        for(int i=0; i<20;i++){
            message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());
           // 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2h
            message.setDelayTimeLevel(i);
            producer.send(message);
        }
       producer.shutdown();

    }
}

消费者

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;



public class DelayMessageConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");

        consumer.setNamesrvAddr("192.168.66.100:9876");
        System.out.println("==========================================");
        //设置消息重试次数
        consumer.setMaxReconsumeTimes(5);
        //设置可以批量处理
        consumer.setConsumeMessageBatchMaxSize(1);
        //订阅主题
        consumer.subscribe("tp_demo_3","*");
        consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            System.out.println(System.currentTimeMillis()/1000);
            for(MessageExt message:list){
                System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
       consumer.start();

    }
}

事务消息

生产者

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class  TransactionProducer {
    public static void main(String[] args) throws MQClientException {
        TransactionListener listener=new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                // 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
                System.out.println("执行本地事务......");
               // return LocalTransactionState.COMMIT_MESSAGE;
                //使用下面事务回滚 消费端无法接收到消息了
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                // 该方法用于获取本地事务执行的状态。
                System.out.println("检查本地事务的状态:" + messageExt);
                 return LocalTransactionState.COMMIT_MESSAGE;
            }
        };
        //创建事务消息生产者
        TransactionMQProducer producer=new TransactionMQProducer("producer_grp_01");
       //设置事务监听器
        producer.setTransactionListener(listener);
        producer.setNamesrvAddr("192.168.66.100:9876");
        producer.start();
        Message message=null;
        message=new Message("tp_demo_11","hello translation message".getBytes());
        producer.sendMessageInTransaction(message,"{\" name\":\"zhansan\"}");


    }
}

消费者

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TransactionMsgConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");
        consumer.setNamesrvAddr("192.168.66.100:9876");
        consumer.subscribe("tp_demo_11", "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}


消息查询

package com.itbaizhan.consumer;


public class QueryingMessageDemo {
  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    //创建消费者对象
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");
    //设置nameserver地址
    consumer.setNamesrvAddr("192.168.66.100:9876");
    //设置消息监听器
    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
    //根据messageId查询消息
    MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");
    System.out.println(message);
    System.out.println(message.getMsgId());
    consumer.shutdown();
   }
}

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/389114.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】友元、内部类和匿名对象

&#x1f497;个人主页&#x1f497; ⭐个人专栏——C学习⭐ &#x1f4ab;点击关注&#x1f929;一起学习C语言&#x1f4af;&#x1f4ab; 目录 1. 友元 1.1 友元函数 1.2 友元类 2. 内部类 2.1 成员内部类 2.2 局部内部类 3. 匿名对象 1. 友元 友元提供了一种突破封装…

yarl,一个强大的 Python 库!

更多Python学习内容&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个强大的 Python 库 - yarl。 Github地址&#xff1a;https://github.com/aio-libs/yarl Python 中的 yarl 库是一个强大的工具&#xff0c;用于处理 URL&#xff08;统一资源定位符&#xff09…

行测考试怎么搜题找答案?用这5款神器就够了!!! #职场发展#媒体

以下软件拥有强大的搜索功能&#xff0c;能够快速找到与题目相关的资料和答案&#xff0c;让大学生们更容易理解和掌握知识点。 1.GT4T 可在14万个语言对间进行翻译。GT4T不仅能在任何窗口获得所选内容的翻译建议&#xff0c;还也可批量翻译Office、PDF、CAT、Markdown、Asci…

事务及在SpringBoot项目中使用的两种方式

1.事务简介 事务&#xff08;transaction&#xff09;是访问并可能操作各种数据项的一个数据库操作序列&#xff0c;这些操作要么全部执行&#xff0c;要么全部不执行&#xff0c;是一个不可分割的工作单位。 事物的四大特性: 原子性&#xff08;Atomicity&#xff09;&#xf…

【智能家居入门4】(FreeRTOS、MQTT服务器、MQTT协议、微信小程序)

前面已经发了智能家居入门的1、2、3了&#xff0c;在实际开发中一般都会使用到实时操作系统&#xff0c;这里就以FreeRTOS为例子&#xff0c;使用标准库。记录由裸机转到实时操作系统所遇到的问题以及总体流程。相较于裸机&#xff0c;系统实时性强了很多&#xff0c;小程序下发…

相机图像质量研究(17)常见问题总结:CMOS期间对成像的影响--靶面尺寸

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

用EL操作JAVABEAN属性

用EL操作JAVABEAN属性 问题陈述 Smart SoftWare Inc.想要开发一款维护雇员数据(例如姓、名字、职位)的Web应用程序。该组织决定将雇员数据存储在一个JavaBean中。另外,它还希望该Web应用程序能让用户从此JavaBean中检索数据并用JSP页面和EL显示。 解决方案 要解决上述问题…

.net和jar包windows服务部署

一.NetCore 1.创建启动脚本run_instal.bat,例如程序文件为ApiDoc.exe set serviceName"Apidoc Web 01" set serviceFilePath%~dp0ApiDoc.exe set serviceDescription"ApiDoc 动态接口服务 web 01"sc create %serviceName% BinPath%serviceFilePath% sc c…

【JavaEE】_文件与IO

目录 1.文件概述 1.1 文件的概念 1.2 文件的存储 1.3 文件的分类 1.4 目录结构 1.5 文件操作 1.5.1 文件系统操作 1.5.2 文件内容操作 2. Java文件系统操作 2.1 File类所处的包 2.2 构造方法 2.3 方法 2.3.1 与文件路径、文件名有关的方法 2.3.2 文件是否存在与普…

新时代异步 IO 框架:IO_URING 的原理、用法、业界示例分析

文章目录 IO_URING基本介绍常见 I/O 模型IO_URING 原理核心结构工作模式高级特性 用法APIliburing基本流程Demo 业界示例SeaStar / ScyllaDBCEPHRocksDBClickHouse IO_URING 基本介绍 常见 I/O 模型 当前 Linux 的几种 I/O 模型&#xff1a; I/O 模型 同步 I/O 是目前应用最…

2024.2.10 HCIA - Big Data笔记

1. 大数据发展趋势与鲲鹏大数据大数据时代大数据的应用领域企业所面临的挑战和机遇华为鲲鹏解决方案2. HDFS分布式文件系统和ZooKeeperHDFS分布式文件系统HDFS概述HDFS相关概念HDFS体系架构HDFS关键特性HDFS数据读写流程ZooKeeper分布式协调服务ZooKeeper概述ZooKeeper体系结构…

【C++】C++11上

C11上 1.C11简介2.统一的列表初始化2.1 {} 初始化2.2 initializer_list 3.变量类型推导3.1auto3.2decltype3.3nullptr 4.范围for循环5.final与override6.智能指针7. STL中一些变化8.右值引用和移动语义8.1左值引用和右值引用8.2左值引用与右值引用比较8.3右值引用使用场景和意义…

GPU独显下ubuntu屏幕亮度不能调节解决方法

GPU独显下屏幕亮度不能调节&#xff08;假设你已经安装了合适的nvidia显卡驱动&#xff09;&#xff0c;我试过修改 /etc/default/grub 的 GRUB_CMDLINE_LINUX_DEFAULT"quiet splash acpi_backlightvendor" &#xff0c;没用。修改和xorg.conf相关的文件&#xff0c;…

【python】类创建、实例化和调用类方法、子类、继承、私有属性、静态方法

一、类属性&#xff1a;定义在类中函数外的属性;self代表类的实例。 其中 number属于类属性&#xff0c;name、age属于实例属性&#xff1b;实例属性一般在初始函数中定义。 class people:number300 #类属性def __init__(self,name,age):#初始化方法 左右两个下划线self.nam…

就是民族的气节

我们拥有一个名字叫中国 - 张明敏 一把黄土塑成千万个你我 静脉是长城 动脉是黄河五千年的文化是生生不息的脉搏&#xff08;齐楚燕韩赵魏秦&#xff09;提醒你 提醒我我们拥有个名字叫中国&#xff08;中原地区为主体&#xff0c;河南&#xff0c;山东&#xff0c;安徽&…

语言与真实世界的关系(超级语言生成能力将促进世界深刻变化)

语言与真实世界之间存在着紧密且复杂的关系。在人类社会中&#xff0c;语言是认知、表达和交流现实世界的主要工具&#xff0c;它帮助我们构建并理解周围环境&#xff0c;并将我们的思维和经验概念化。 1. 符号与指代&#xff1a; 语言是一种符号系统&#xff0c;通过词汇、句…

【精选】Java面向对象进阶——接口

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏 …

(11)Hive调优——explain执行计划

一、explain查询计划概述 explain将Hive SQL 语句的实现步骤、依赖关系进行解析&#xff0c;帮助用户理解一条HQL 语句在底层是如何实现数据的查询及处理&#xff0c;通过分析执行计划来达到Hive 调优&#xff0c;数据倾斜排查等目的。 官网指路&#xff1a; https://cwiki.ap…

第14讲投票帖子详情实现

投票帖子详情实现 后端,根据id查询投票帖子信息&#xff1a; /*** 根据id查询* param id* return*/ GetMapping("/{id}") public R findById(PathVariable(value "id")Integer id){Vote vote voteService.getById(id);WxUserInfo wxUserInfo wxUserInf…

【Go语言】第一个Go程序

第一个 Go 程序 1 安装 Go Go语言官网&#xff1a;Download and install - The Go Programming Language&#xff0c;提供了安装包以及引导流程。 以 Windows 为例&#xff0c;进入windows安装包下载地址&#xff1a;All releases - The Go Programming Language&#xff0c…
最新文章