RocketMQ- 深入理解RocketMQ的消息模型

1、RocketMQ客户端基本流程

​ RocketMQ基于Maven提供了客户端的核心依赖:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.5</version>
</dependency>

​ 使用客户端进行编程时,添加这一个核心依赖就够了。 另外还有一个与权限控制相关的核心依赖也需要了解。尽量保持与服务端版一致。

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-acl</artifactId>
        <version>4.9.5</version>
</dependency>

一个最为简单的消息生产者代码如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //初始化一个消息生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 指定nameserver地址
        producer.setNamesrvAddr("192.168.232.128:9876");
        // 启动消息生产者服务
        producer.start();
        for (int i = 0; i < 2; i++) {
            try {
                // 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容
                Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //发送消息,获取发送结果
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        //消息发送完后,停止消息生产者服务。
        producer.shutdown();
    }
}

​ 一个简单的消息消费者代码如下:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        //构建一个消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        //指定nameserver地址
       consumer.setNamesrvAddr("192.168.232.128:9876");
       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅一个感兴趣的话题,这个话题需要与消息的topic一致
        consumer.subscribe("TopicTest", "*");
        // 注册一个消息回调函数,消费到消息后就会触发回调。
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
    msgs.forEach(messageExt -> {
                    try {
                        System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {}
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者服务
        consumer.start();
        System.out.print("Consumer Started");
    }
}

​ RocketMQ的客户端编程模型相对比较固定,基本都有一个固定的步骤。掌握这个固定步骤,对于学习其他复杂的消息模型也是很有帮助的。

  • 消息生产者的固定步骤

1.创建消息生产者producer,并指定生产者组名
2.指定Nameserver地址
3.启动producer。 这个步骤比较容易忘记。可以认为这是消息生产者与服务端建立连接的过程。
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer,释放资源。

  • 消息消费者的固定步骤

1.创建消费者Consumer,必须指定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer。消费者会一直挂起,持续处理消息。

​ 其中,最为关键的就是NameServer。从示例中可以看到,RocketMQ的客户端只需要指定NameServer地址,而不需要指定具体的Broker地址。

​ 指定NameServer的方式有两种。可以在客户端直接指定,例如 consumer.setNameSrvAddr("127.0.0.1:9876")。然后,也可以通过读取系统环境变量NAMESRV_ADDR指定。其中第一种方式的优先级更高。

2、消息确认机制

​ RocketMQ要支持互联网金融场景,那么消息安全是必须优先保障的。而消息安全有两方面的要求,一方面是生产者要能确保将消息发送到Broker上。另一方面是消费者要能确保从Broker上争取获取到消息。

1、消息生产端采用消息确认加多次重试的机制保证消息正常发送到RocketMQ

​ 针对消息发送的不确定性,封装了三种发送消息的方式。

第一种称为单向发送

​ 单向发送方式下,消息生产者只管往Broker发送消息,而全然不关心Broker端有没有成功接收到消息。这就好比生产者向Broker发一封电子邮件,Broker有没有处理电子邮件,生产者并不知道。

public class OnewayProducer {
    public static void main(String[] args)throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.start();
        Message message = new Message("Order","tag","order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));
        producer.sendOneway(message);
        Thread.sleep(50000);
        producer.shutdown();
    }
}

​ sendOneway方法没有返回值,如果发送失败,生产者无法补救。

​ 单向发送有一个好处,就是发送消息的效率更高。适用于一些追求消息发送效率,而允许消息丢失的业务场景。比如日志。

第二种称为同步发送

​ 同步发送方式下,消息生产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的相应结果。这就好比生产者给Broker打了个电话。通话期间生产者就停下手头的事情,直到Broker明确表示消息处理成功了,生产者才继续做其他的事情。

 SendResult sendResult = producer.send(msg);

​ SendResult来自于Broker的反馈。producer在send发出消息,到Broker返回SendResult的过程中,无法做其他的事情。

​ 在SendResult中有一个SendStatus属性,这个SendStatus是一个枚举类型,其中包含了Broker端的各种情况。

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

​ 在这几种枚举值中,SEND_OK表示消息已经成功发送到Broker上。至于其他几种枚举值,都是表示消息在Broker端处理失败了。使用同步发送的机制,我们就可以在消息生产者发送完消息后,对发送失败的消息进行补救。例如重新发送。

​ 但是此时要注意,如果Broker端返回的SendStatus不是SEND_OK,也并不表示消息就一定不会推送给下游的消费者。仅仅只是表示Broker端并没有完全正确的处理这些消息。因此,如果要重新发送消息,最好要带上唯一的系统标识,这样在消费者端,才能自行做幂等判断。也就是用具有业务含义的OrderID这样的字段来判断消息有没有被重复处理。

​ 这种同步发送的机制能够很大程度上保证消息发送的安全性。但是,这种同步发送机制的发送效率比较低。毕竟,send方法需要消息在生产者和Broker之间传输一个来回后才能结束。如果网速比较慢,同步发送的耗时就会很长。

第三种称为异步发送

​ 异步发送机制下,生产者在向Broker发送消息时,会同时注册一个回调函数。接下来生产者并不等待Broker的响应。当Broker端有响应数据过来时,自动触发回调函数进行对应的处理。这就好比生产者向Broker发电子邮件通知时,另外找了一个代理人专门等待Broker的响应。而生产者自己则发完消息后就去做其他的事情去了。

	producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });

​ 在SendCallback接口中有两个方法,onSuccess和onException。当Broker端返回消息处理成功的响应信息SendResult时,就会调用onSuccess方法。当Broker端处理消息超时或者失败时,就会调用onExcetion方法,生产者就可以在onException方法中进行补救措施。

​ 此时同样有几个问题需要注意。一是与同步发送机制类似,触发了SendCallback的onException方法同样并不一定就表示消息不会向消费者推送。如果Broker端返回响应信息太慢,超过了超时时间,也会触发onException方法。超时时间默认是3秒,可以通过producer.setSendMsgTimeout方法定制。而造成超时的原因则有很多,消息太大造成网络拥堵、网速太慢、Broker端处理太慢等都可能造成消息处理超时。

​ 二是在SendCallback的对应方法被触发之前,生产者不能调用shutdown()方法。如果消息处理完之前,生产者线程就关闭了,生产者的SendCallback对应方法就不会触发。这是因为使用异步发送机制后,生产者虽然不用阻塞下来等待Broker端响应,但是SendCallback还是需要附属于生产者的主线程才能执行。如果Broker端还没有返回SendResult,而生产者主线程已经停止了,那么SendCallback的执行线程也就会随主线程一起停止,对应的方法自然也就无法执行了。

​ 这种异步发送的机制能够比较好的兼容消息的安全性以及生产者的高吞吐需求,是很多MQ产品都支持的方式。RabbitMQ和Kafka都支持这种异步发送的机制。但是异步发送机制也并不是万能的,毕竟异步发送机制对消息生产者的主线业务是有侵入的。具体使用时还是需要根据业务场景考虑。

​ RocketMQ提供的这三种发送消息的方式,并不存在绝对的好坏之分。我们更多的是需要根据业务场景进行选择。例如在电商下单这个场景,我们就应该尽量选择同步发送或异步发送,优先保证数据安全。然后,如果下单场景的并发比较高,业务比较繁忙,就应该尽量优先选择异步发送的机制。这时,我们就应该对下单服务的业务进行优化定制,尽量适应异步发送机制的要求。这样就可以尽量保证下单服务能够比较可靠的将用户的订单消息发送到RocketMQ了。

2、消息消费者端采用状态确认机制保证消费者一定能正常处理对应的消息

​ 我们之前分析生产者的可靠性问题,核心的解决思路就是通过确认Broker端的状态来保证生产者发送消息的可靠性。对于RocketMQ的消费者来说,保证消息处理可靠性的思路也是类似的。只不过这次换成了Broker等待消费者返回消息处理状态。

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

​ 这个返回值是一个枚举值,有两个选项 CONSUME_SUCCESS和RECONSUME_LATER。如果消费者返回CONSUME_SUCCESS,那么消息自然就处理结束了。但是如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过一段时间再发起消息重试。

​ 为了要兼容重试机制的成功率和性能,RocketMQ设计了一套非常完善的消息重试机制,从而尽可能保证消费者能够正常处理用户的订单信息。

​ 1、Broker不可能无限制的向消费失败的消费者推送消息。如果消费者一直没有恢复,Broker显然不可能一直无限制的推送,这会浪费集群很多的性能。所以,Broker会记录每一个消息的重试次数。如果一个消息经过很多次重试后,消费者依然无法正常处理,那么Broker会将这个消息推入到消费者组对应的死信Topic中。死信Topic相当于windows当中的垃圾桶。你可以人工介入对死信Topic中的消息进行补救,也可以直接彻底删除这些消息。RocketMQ默认的最大重试次数是16次。

​ 2、为了让这些重试的消息不会影响Topic下其他正常的消息,Broker会给每个消费者组设计对应的重试Topic。MessageQueue是一个具有严格FIFO特性的数据结构。如果需要重试的这些消息还是放在原来的MessageQueue中,就会对当前MessageQueue产生阻塞,让其他正常的消息无法处理。RocketMQ的做法是给每个消费者组自动生成一个对应的重试Topic。在消息需要重试时,会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息,往消费者组重新推送即可。这样,这些重试的消息有了自己单独的队列,就不会影响到Topic下的其他消息了。

​ 3、RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息重试时,Broker只要往消费者组中随意一个实例推送即可。这是消息重试机制能够正常运行的基础。但是,在客户端的具体实现时,MQDefaultMQConsumer并没有强制规定消费者组不能重复。也就是说,你完全可以实现出一些订阅主题和消费逻辑完全不同的消费者服务,共同组成一个消费组。在这种情况下,RocketMQ不会报错,但是消息的处理逻辑就无法保持一致了。这会给业务带来很大的麻烦。这是在实际应用时需要注意的地方。

​ 4、Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。至于消费者组自己的业务执行是否正常,Broker端是没有办法知道的。因此,在实现消费者的业务逻辑时,应该要尽量使用同步实现方式,保证在自己业务处理完成之后再向Broker端返回状态。而应该尽量避免异步的方式处理业务逻辑。

3、消费者也可以自行指定起始消费位点

​ Broker端通过Consumer返回的状态来推进所属消费者组对应的Offset。但是,这里还是会造成一种分裂,消息最终是由Consumer来处理,但是消息却是由Broker推送过来的,也就是说,Consumer无法确定自己将要处理的是哪些消息。这就好比你上班做一天事情,公司负责给你发一笔工资。如果一切正常,那么没什么问题。 但是如果出问题了呢?公司拖欠了你的工资,这时,你就还是需要能到公司查账,至少查你自己的工资记录。从上一次发工资的时候计算你该拿的钱。

​ 对消息队列也一样。虽然Offset完全由Broker进行维护,但是,RocketMQ也允许Consumer自己去查账,自己指定消费位点。核心代码是在Consumer中设定了一个属性ConsumeFromWhere,表示在Consumer启动时,从哪一条消息开始进行消费。Consumer当然不可能精确的知道Offset的具体参数,所以这个ConsumerFromWhere并不是直接传入Offset位点,而是可以传入一个ConsumerFromWhere对象,这是一个枚举值。名字一目了然。

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET, //从队列的第一条消息开始重新消费
    CONSUME_FROM_FIRST_OFFSET, //从上次消费到的地方开始继续消费
    CONSUME_FROM_TIMESTAMP; //从某一个时间点开始重新消费
}

​ 另外,如果指定了ConsumerFromWhere.CONSUME_FROM_TIMESTAMP,这就表示要从一个具体的时间开始。具体时间点,需要通过Consumer的另一个属性ConsumerTimestamp。这个属性可以传入一个表示时间的字符串。

consumer.setConsumerTimestamp("20131223171201");

​ 到这里,我们就从客户端的角度分析清楚了要如何保证消息的安全性。但是消息安全问题其实是一个非常体系化的问题,涉及到的不光是客户端,还需要服务端配合。关于这个问题,我们会在后面的分享过程当中继续带你一起思考。

3、广播消息

应用场景:

​ 广播模式和集群模式是RocketMQ的消费者端处理消息最基本的两种模式。集群模式下,一个消息,只会被一个消费者组中的多个消费者实例 共同 处理一次。广播模式下,一个消息,则会推送给所有消费者实例处理,不再关心消费者组。

示例代码:

​ 消费者核心代码

consumer.setMessageModel(MessageModel.BROADCASTING);

​ 启动多个消费者,广播模式下,这些消费者都会消费一次消息。

实现思路:

​ 默认模式(也就是集群模式)下,Broker端会给每个ConsumerGroup维护一个统一的Offset,这个Offset可以保证一个消息,在同一个ConsumerGroup内只会被消费一次。而广播模式的实现方式,是将Offset转移到消费者端自行保管,这样Broker端只管向所有消费者推送消息,而不用负责维护消费进度。

注意点:

1、Broker端不维护消费进度,意味着,如果消费者处理消息失败了,将无法进行消息重试。

2、消费者端维护Offset的作用是可以在服务重启时,按照上一次消费的进度,处理后面没有消费过的消息。丢了也不影响服务稳定性。

​ 比如生产者发送了1~10号消息。消费者当消费到第6个时宕机了。当他重启时,Broker端已经把第10个消息都推送完成了。如果消费者端维护好了自己的Offset,那么他就可以在服务重启时,重新向Broker申请6号到10号的消息。但是,如果消费者端的Offset丢失了,消费者服务依然可以正常运行,但是6到10号消息就无法再申请了。后续这个消费者就只能获取10号以后的消息。

​ 实际上,Offset的维护数据是放在 ${user.home}/.rocketmq_offset/${clientIp}${instanceName}/${group}/offsets.json 文件下的。

​ 消费者端存储广播消费的本地offsets文件的默认缓存目录是 System.getProperty(“user.home”) + File.separator + “.rocketmq_offsets” ,可以通过定制 rocketmq.client.localOffsetStoreDir 系统属性进行修改。

​ 本地offsets文件在缓存目录中的具体位置与消费者的clientIp 和 instanceName有关。其中instanceName默认是DEFAULT,可以通过定制系统属性 rocketmq.client.name 进行修改。另外,每个消费者对象也可以单独设定instanceName。

​ RocketMQ会通过定时任务不断尝试本地Offsets文件的写入,但是,如果本地Offsets文件写入失败,RocketMQ不会进行任何的补救。

如果想要了解更多细节,可以看下我的博文: https://blog.csdn.net/roykingw/article/details/126351010。这里也揭示了一个小问题,如果在Windows本地使用SpringBoot集成RocketMQ的话,广播消息的offset.json文件将无法保存。

4、顺序消息机制

应用场景:

​ 每一个订单有从下单、锁库存、支付、下物流等几个业务步骤。每个业务步骤都由一个消息生产者通知给下游服务。如何保证对每个订单的业务处理顺序不乱?

示例代码:

​ 生产者核心代码:

for (int i = 0; i < 10; i++) {
                int orderId = i;
                for(int j = 0 ; j <= 5 ; j ++){
                    Message msg =
                            new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,
                                    ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
                    System.out.printf("%s%n", sendResult);
                }
            }

​ 通过MessageSelector,将orderId相同的消息,都转发到同一个MessageQueue中。

​ 消费者核心代码:

consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for(MessageExt msg:msgs){
                    System.out.println("收到消息内容 "+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

​ 注入一个MessageListenerOrderly实现。

实现思路:

​ 基础思路:只有放到一起的一批消息,才有可能保持消息的顺序。

image.png

​ 1、生产者只有将一批有顺序要求的消息,放到同一个MesasgeQueue上,Broker才有可能保持这一批消息的顺序。

​ 2、消费者只有一次锁定一个MessageQueue,拿到MessageQueue上所有的消息,

注意点:

​ 1、理解局部有序与全局有序。大部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留一个MessageQueue。性能显然非常低。

​ 2、生产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于几种导致数据热点竞争。

​ 2、消费者端只能用同步的方式处理消息,不要使用异步处理。更不能自行使用批量处理。

​ 3、消费者端只进行有限次数的重试。如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序。

​ 4、消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。

5、延迟消息

应用场景:

​ 延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

虽然不太起眼,但是这是RocketMQ非常有特色的一个功能。对比下RabbitMQ和Kafka。RabbitMQ中只能通过使用死信队列变相实现延迟消息,或者加装一个插件来支持延迟消息。 Kafka则不太好实现延迟消息。

示例代码:

​ 生产者端核心代码:

msg.setDelayTimeLevel(3);

​ 只要给消息设定一个延迟级别就行了,无比简单。

​ RocketMQ给消息定制了18个默认的延迟级别,分别对应18个不同的预设好的延迟时间。

image.png

实现思路:

​ 延迟消息的难点其实是性能,需要不断进行定时轮训。全部扫描所有消息是不可能的,RocketMQ的实现方式是预设一个系统Topic,名字叫做SCHEDULE_TOPIC_XXXX。在这个Topic下,预设18个延迟队列。然后每次只针对这18个队列里的消息进行延迟操作,这样就不用一直扫描所有的消息了。

image.png

注意点:

​ 这样预设延迟时间其实是不太灵活的。5.x版本已经支持预设一个具体的时间戳,按秒的精度进行定时发送。

​ 但是可以看到,这18个延迟级别虽然无法调整,但是每个延迟级别对应的延迟时间其实是可以调整的。只需要修改截图中的参数就行。不过通常不建议这么做。

6、批量消息

应用场景:

​ 生产者要发送的消息比较多时,可以将多条消息合并成一个批量消息,一次性发送出去。这样可以减少网络IO,提升消息发送的吞吐量。

示例代码:

​ 生产者核心代码:

        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

        producer.send(messages);

注意点:

​ 批量消息的使用非常简单,但是要注意RocketMQ做了限制。同一批消息的Topic必须相同,另外,不支持延迟消息。

​ 还有批量消息的大小不要超过1M,如果太大就需要自行分割。

7、过滤消息

应用场景:

​ 同一个Topic下有多种不同的消息,消费者只希望关注某一类消息。

​ 例如,某系统中给仓储系统分配一个Topic,在Topic下,会传递过来入库、出库等不同的消息,仓储系统的不同业务消费者就需要过滤出自己感兴趣的消息,进行不同的业务操作。

image.png

示例代码1:简单过滤

​ 生产者端需要在发送消息时,增加Tag属性。比如我们上面举例当中的入库、出库。核心代码:

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 15; i++) {
            Message msg = new Message("TagFilterTest",
                tags[i % tags.length],
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

​ 消费者端就可以通过这个Tag属性订阅自己感兴趣的内容。核心代码:

        consumer.subscribe("TagFilterTest", "TagA");

​ 这样,后续Consumer就只会出处理TagA的消息。

示例代码2:SQL过滤

​ 通过Tag属性,只能进行简单的消息匹配。如果要进行更复杂的消息过滤,比如数字比较,模糊匹配等,就需要使用SQL过滤方式。SQL过滤方式可以通过Tag属性以及用户自定义的属性一起,以标准SQL的方式进行消息过滤。

​ 生产者端在发送消息时,出了Tag属性外,还可以增加自定义属性。核心代码:

 String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 15; i++) {
            Message msg = new Message("SqlFilterTest",
                tags[i % tags.length],
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

​ 消费者端在进行过滤时,可以指定一个标准的SQL语句,定制复杂的过滤规则。核心代码:

 consumer.subscribe("SqlFilterTest",
            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));

实现思路:

​ 实际上,Tags和用户自定义的属性,都是随着消息一起传递的,所以,消费者端是可以拿到消息的Tags和自定义属性的。比如:

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg.getTags());
                    System.out.println(msg.getProperties());
                }
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

​ 这样,剩下的就是在Consumer中对消息进行过滤了。Broker会在往Consumer推送消息时,在Broker端进行消息过滤。是Consumer感兴趣的消息,就往Consumer推送。

​ Tag属性的处理比较简单,就是直接匹配。而SQL语句的处理会比较麻烦一点。RocketMQ也是通过ANLTR引擎来解析SQL语句,然后再进行消息过滤的。

ANLTR是一个开源的SQL语句解析框架。很多开源产品都在使用ANLTR来解析SQL语句。比如ShardingSphere,Flink等。

注意点:

​ 1、使用Tag过滤时,如果希望匹配多个Tag,可以使用两个竖线(||)连接多个Tag值。另外,也可以使用星号(*)匹配所有。

​ 2、使用SQL顾虑时,SQL语句是按照SQL92标准来执行的。SQL语句中支持一些常见的基本操作:

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

​ 2、消息过滤,其实在Broker端和在Consumer端都可以做。Consumer端也可以自行获取用户属性,不感兴趣的消息,直接返回不成功的状态,跳过该消息就行了。但是RocketMQ会在Broker端完成过滤条件的判断,只将Consumer感兴趣的消息推送给Consumer。这样的好处是减少了不必要的网络IO,但是缺点是加大了服务端的压力。不过在RocketMQ的良好设计下,更建议使用消息过滤机制。

​ 3、Consumer不感兴趣的消息并不表示直接丢弃。通常是需要在同一个消费者组,定制另外的消费者实例,消费那些剩下的消息。但是,如果一直没有另外的Consumer,那么,Broker端还是会推进Offset。

8、事务消息

应用场景:

​ 事务消息是RocketMQ非常有特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据一致性。

​ 以电商为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。这种场景,非常适合使用RocketMQ的解耦功能来进行串联。

image.png

​ 考虑到事务的安全性,即要保证相关联的这几个业务一定是同时成功或者同时失败的。如果要将四个服务一起作为一个分布式事务来控制,可以做到,但是会非常麻烦。而使用RocketMQ在中间串联了之后,事情可以得到一定程度的简化。由于RocketMQ与消费者端有失败重试机制,所以,只要消息成功发送到RocketMQ了,那么可以认为Branch2.1,Branch2.2,Branch2.3这几个分支步骤,是可以保证最终的数据一致性的。这样,一个复杂的分布式事务问题,就变成了MinBranch1和Branch2两个步骤的分布式事务问题。

​ 然后,在此基础上,RocketMQ提出了事务消息机制,采用两阶段提交的思路,保证Main Branch1和Branch2之间的事务一致性。

image.png

​ 具体的实现思路是这样的:

image.png

  1. 生产者将消息发送至Apache RocketMQ服务端。
  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

示例代码:

参见 org.apache.rocketmq.example.transaction.TransactionProducer

​ 实现时的重点是使用RocketMQ提供的TransactionMQProducer事务生产者,在TransactionMQProducer中注入一个TransactionListener事务监听器来执行本地事务,以及后续对本地事务的检查。

注意点:

​ 1、半消息是对消费者不可见的一种消息。实际上,RocketMQ的做法是将消息转到了一个系统Topic,RMQ_SYS_TRANS_HALF_TOPIC。

​ 2、事务消息中,本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定,默认60秒。超过回查次数后,消息将会被丢弃。

​ 3、其实,了解了事务消息的机制后,在具体执行时,可以对事务流程进行适当的调整。

image.png

​ 4、如果你还是感觉不到RocketMQ事务消息机制的作用,那么可以看看下面这个面试题:

image.png

9、ACL权限控制机制

应用场景:

​ RocketMQ提供了针对队列、用户等不同维度的非常全面的权限管理机制。通常来说,RocketMQ作为一个内部服务,是不需要进行权限控制的,但是,如果要通过RocketMQ进行跨部门甚至跨公司的合作,权限控制的重要性就显现出来了。

权限控制体系:

​ 1、RocketMQ针对每个Topic,就有完整的权限控制。比如,在控制平台中,就可以很方便的给每个Topic配置权限。

image.png

​ perm字段表示Topic的权限。有三个可选项。 2:禁写禁订阅,4:可订阅,不能写,6:可写可订阅

​ 2、在Broker端还提供了更详细的权限控制机制。主要是在broker.conf中打开acl的标志:aclEnable=true。然后就可以用他提供的plain_acl.yml来进行权限配置了。并且这个配置文件是热加载的,也就是说要修改配置时,只要修改配置文件就可以了,不用重启Broker服务。文件的配置方式,也非常简单,一目了然。

#全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
#第一个账户
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false 
  defaultTopicPerm: DENY #默认Topic访问策略是拒绝
  defaultGroupPerm: SUB #默认Group访问策略是只允许订阅
  topicPerms:
  - topicA=DENY #topicA拒绝
  - topicB=PUB|SUB #topicB允许发布和订阅消息
  - topicC=SUB #topicC只允许订阅
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB
#第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源
- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

​ 接下来,在客户端就可以通过accessKey和secretKey提交身份信息了。客户端在使用时,需要先引入一个Maven依赖包。

<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-acl</artifactId>
 <version>4.9.1</version>
</dependency>

​ 然后在声明客户端时,传入一个RPCHook。

 //声明时传入RPCHook
 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());


    private static final String ACL_ACCESS_KEY = "RocketMQ";
    private static final String ACL_SECRET_KEY = "1234567";
    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/211842.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

RxJava

Single 使用 Flowable 比较重一般使用Single onSubscribe 产生订阅时调用 线程切换1 2 发送顺序事件.just just 源码 钩子方法,进行验证再处理 Single 对象 订阅,RxJavaPlugins.onSubscribe 钩子方法,产生订阅和过滤 Single 核心方法,抽象的,实现为SingleJust 订阅和执行成功回…

同旺科技 USB TO SPI / I2C --- 调试W5500_Ping测试

所需设备&#xff1a; 内附链接 1、USB转SPI_I2C适配器(专业版); 首先&#xff0c;连接W5500模块与同旺科技USB TO SPI / I2C适配器&#xff0c;如下图&#xff1a; 设置寄存器&#xff1a; SHAR&#xff08;源MAC地址寄存器&#xff09;&#xff0c;该寄存器用来设置源MAC…

网络入门---网络编程初步认识和实践

目录标题 前言准备工作udpserver.hpp成员变量构造函数初始化函数(socket,bind)start函数(recvfrom) udpServer.ccudpClient.hpp构造函数初始化函数run函数(sendto) udpClient.cc测试 前言 在上一篇文章中我们初步的认识了端口号的作用&#xff0c;ip地址和MAC地址在网络通信时…

HuggingFace学习笔记--Prompt-Tuning高效微调

1--Prompt-Tuning介绍 Prompt-Tuning 高效微调只会训练新增的Prompt的表示层&#xff0c;模型的其余参数全部固定&#xff1b; 新增的 Prompt 内容可以分为 Hard Prompt 和 Soft Prompt 两类&#xff1b; Soft prompt 通常指的是一种较为宽泛或模糊的提示&#xff0c;允许模型在…

规则引擎专题---3、Drools组成和入门

Drools概述 drools是一款由JBoss组织提供的基于Java语言开发的开源规则引擎&#xff0c;可以将复杂且多变的业务规则从硬编码中解放出来&#xff0c;以规则脚本的形式存放在文件或特定的存储介质中(例如存放在数据库中)&#xff0c;使得业务规则的变更不需要修改项目代码、重启…

初识RocketMQ

1、简介 RocketMQ 是阿里巴巴在 2012 年开源的消息队列产品&#xff0c;用 Java 语言实现&#xff0c;在设计时参考了 Kafka&#xff0c;并做出了自己的一些改进&#xff0c;后来捐赠给 Apache 软件基金会&#xff0c;2017 正式毕业&#xff0c;成为 Apache 的顶级项目。Rocket…

avue-crud中时间范围选择默认应该是0点却变成了12点

文章目录 一、问题二、解决三、最后 一、问题 在avue-crud中时间范围选择&#xff0c;正常默认应该是0点&#xff0c;但是不知道怎么的了&#xff0c;选完之后就是一直是12点。具体问题如下动图所示&#xff1a; <template><avue-crud :option"option" /&g…

YOLOv8改进 | 2023 | SCConv空间和通道重构卷积(精细化检测,又轻量又提点)

一、本文介绍 本文给大家带来的改进内容是SCConv&#xff0c;即空间和通道重构卷积&#xff0c;是一种发布于2023.9月份的一个新的改进机制。它的核心创新在于能够同时处理图像的空间&#xff08;形状、结构&#xff09;和通道&#xff08;色彩、深度&#xff09;信息&#xf…

计算机组成原理笔记——存储器(静态RAM和动态RAM的区别,动态RAM的刷新, ROM……)

■ 随机存取存储器 ■ 1.随机存取存储器&#xff1a;按存储信息的原理不同分为&#xff1a;静态RAM和动态RAM 2.静态RAM&#xff08;SRAM&#xff09;&#xff1a;用触发器工作原理存储信息&#xff0c;但电源掉电时&#xff0c;存储信息会丢失具有易失性。 3.存储器的基本单元…

代码随想录算法训练营第三十四天|62.不同路径,63. 不同路径 II

62. 不同路径 - 力扣&#xff08;LeetCode&#xff09; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#…

Kubernetes技术与架构-策略

Kubernetes集群提供系统支持的策略&#xff0c;也提供开放接口给第三方定义的策略&#xff0c;这些策略用于可定义的配置文件或者Kubernetes集群的运行时环境&#xff0c;其中包括进程ID数量的申请与限制策略&#xff0c;服务器节点Node内的进程ID的数量限制策略&#xff0c;Po…

PyQt6 QCheckBox复选框按钮控件

​锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计33条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话…

深入了解c语言中的结构体

介绍&#xff1a; 在C语言中&#xff0c;结构体是一种用户自定义的数据类型&#xff0c;它允许我们将不同类型的数据组合在一起&#xff0c;形成一个更为复杂的数据结构。结构体可以用来表示现实世界中的实体&#xff0c;如人员、学生、图书等。本篇博客将介绍结构体的基本概念…

iceoryx(冰羚)-进程间消息同步

iceoryx进程间消息同步 iceoryx进程间消息同步&#xff0c;是用socket或管道实现的,定义在iceoryx\iceoryx_posh\include\iceoryx_posh\internal\runtime\ipc_interface_base.hpp namespace platform { #if defined(_WIN32) using IoxIpcChannelType iox::posix::NamedPipe; …

Linux下为可执行文件添加图标

Ubuntu 18.04上使用Qt5.14.2创建一个简单的Qt Widgets项目test&#xff0c;添加2个Push Button按钮&#xff0c;点击分别获取github和csdn地址&#xff0c;在mainwindow.cpp中添加的代码如下: #include "mainwindow.h" #include "ui_mainwindow.h" #inclu…

Linux 互斥锁 读写锁 条件变量 信号量 (备查)

线程同步 1&#xff09;所谓的同步并不是多个线程同时对内存进行访问&#xff0c;而是按照先后顺序依次进行的。 2&#xff09;如没有对线程进行同步处理&#xff0c;会导致多个线程访问共享资源出现数据混乱的问题。 3&#xff09;所谓共享资源就是多个线程共同访问的变量&…

javaweb校车校园车辆管理系统springboot+jsp

结构设计&#xff1a;总体采用B/S结构设计模式 (1)用户登录模块&#xff1a;用户通过手动登录&#xff0c;检测是否是校内人员的车辆。 (2)用户车辆信息编辑、上传、模块&#xff1a;通过上传车辆入场信息的操作权限&#xff0c;以用户的名义发布资料上传至校园停车场系统中。…

可视化数据库管理客户端:Adminer

简介&#xff1a;Adminer&#xff08;前身为phpMinAdmin&#xff09;是一个用PHP编写的功能齐全的数据库管理工具。与phpMyAdmin相反&#xff0c;它由一个可以部署到目标服务器的文件组成。Adminer可用于MySQL、PostgreSQL、SQLite、MS SQL、Oracle、Firebird、SimpleDB、Elast…

Java+SSM+MySQL基于微信的在线协同办公小程序(附源码 调试 文档)

基于微信的在线协同办公小程序 一、引言二、系统设计三、技术架构四、管理员功能设计五、员工功能设计六、系统实现七、界面展示八、源码获取 一、引言 随着科技的飞速发展&#xff0c;移动互联网已经深入到我们生活的各个角落。在这个信息时代&#xff0c;微信作为全球最大的…

头歌JUnit单元测试相关实验入门

一、入门实验 1.1第一个Junit测试程序 任务描述 请学员写一个名为testSub()的测试函数&#xff0c;来测试给定的减法函数是否正确。 相关知识 Junit编写原则 1、简化测试的编写&#xff0c;这种简化包括测试框架的学习和实际测试单元的编写。 2、测试单元保持持久性。 3、利用…
最新文章