RocketMQ消息轨迹产生的背景以及使用方式

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

背景

最近在维护RocketMQ经常会出现这种问题
消息发送方和接收方出现扯皮,消息发送方说我的消息已经发送成功了,消费方说我没接收到消息。两边各持己见,谁也不会说服谁。这时候就非常希望RocketMQ能有消息的一个消息发送和消费的一个业务log了,类似什么时候发送了消息,什么时候消费了消息,消费成功还是失败了

正常的消息查询页面一般只有消息是否消费,没有消息消费成功还是失败

不管消费成功还是失败,这里显式的都是CONSUMED,非常不方便排查问题,那么RocketMQ是不是有类似的log功能呢?
答案是有的,这里就引出了我们今天的主角,消息轨迹

RocketMQ版本

  • 5.1.0

消息轨迹是什么

RocketMQ消息轨迹主要是用来记录消息的发送消费记录,算是一种消息的log

如何使用

RocketMQ的消息轨迹开启主要是三个地方

  1. broker
  2. producer
  3. consumer

broker

broker启动配置文件添加如下配置

traceTopicEnable=true

producer开启消息轨迹

DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);

和一般的消息发送不同,主要是添加一个新的构造函数的参数

之前的构造方式

DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

consumer开启消息轨迹

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);

和一般的消息消费也不同,我们也添加了enableMsgTracetrue

测试

消息发送

public class LocalProducer {

    /**
     * The number of produced messages.
     */
    public static final int MESSAGE_COUNT = 1;
    public static final String PRODUCER_GROUP = "xiao-zou-topic-producer";
    
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9000";
    public static final String TOPIC = "xiao-zou-topic";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);

        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.addRetryResponseCode(RemotingSysResponseCode.SYSTEM_BUSY);
        producer.start();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {
                Message msg = new Message(TOPIC /* Topic */,
                    TAG /* Tag */,
                    ("Hello xiaozou " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
//                msg.setDelayTimeLevel(2);
                SendResult sendResult = producer.send(msg, 5000);
                DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
                TimeUnit.SECONDS.sleep(2);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

消息消费

public class LocalConsumer {

    public static final String CONSUMER_GROUP = "gid-xiao-zou-topic";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9001";
    public static final String TOPIC = "xiao-zou-topic";

    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, true);
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

这里消费故意返回失败,便于观察消费轨迹

然后我们发送消息后用msgId去查看消息轨迹看看

消息轨迹查看

我们一般用的是消息的查询,现在我们直接去消息轨迹那里查看

我们查看消息轨迹可以看到非常详细的消费记录
包括消息的

  • 发送时间
  • 消费是否成功还是失败
  • 重试测试等
  • 消费者的ip
  • broker的ip

消息轨迹的存储

消息轨迹默认存储的TopicRMQ_SYS_TRACE_TOPIC,也可以自己设置。
存储方式有两种

普通模式

RocketMQ集群中每一个Broker节点均用于存储Client端收集并发送过来的消息轨迹数据。因此,对于RocketMQ集群中的Broker节点数量并无要求和限制。

物理IO隔离模式

对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹(只在该broker创建轨迹Topic),使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。在该模式下,RocketMQ集群中至少有两个Broker节点,其中一个Broker节点定义为存储消息轨迹数据的服务端。

总结

总的来说消息轨迹开启后会非常方便我们定位问题,但是会增加额外的存储开支,如果消息量很大,推荐使用物理隔离的方式,单独使用一个broker存储消息轨迹

参考

  • 官方文档: https://rocketmq.apache.org/zh/docs/4.x/bestPractice/03messagetra/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/74203.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AI黑马挑战赛,探索研发新趋势丨IDCF

随着AI的出现&#xff0c;获取知识的成本大幅降低&#xff0c;当DevOps与AI相结合时&#xff0c;必将产生全新的化学反应。不断涌现的AI新工具提醒我们&#xff0c;一个全新的研发工作范式正在逐渐形成。而DevOps的核心理念是敏捷协同&#xff0c;作为工程师&#xff0c;如何通…

【从零学习python 】19. 循环遍历列表和列表嵌套的应用

文章目录 列表的循环遍历1. 使用while循环2. 使用for循环3. 交换2个变量的值1. 列表嵌套2. 应用 进阶案例 列表的循环遍历 1. 使用while循环 为了更有效率的输出列表的每个数据&#xff0c;可以使用循环来完成 namesList [xiaoWang,xiaoZhang,xiaoHua] length len(namesLi…

基于C#的无边框窗体阴影绘制方案 - 开源研究系列文章

今天介绍无边框窗体阴影绘制的内容。 上次有介绍使用双窗体的方法来显示阴影&#xff0c;这次介绍使用API函数来进行绘制。这里使用的是Windows API函数&#xff0c;操作系统的窗体也是用的这个来进行的绘制。 1、 项目目录&#xff1b; 下面是项目目录&#xff1b; 2、 函数介…

浅谈早期基于模板匹配的OCR的原理

基于模板匹配的概念是一种早期的字符识别方法&#xff0c;它基于事先准备好的字符模板库来与待识别字符进行比较和匹配。其原理如下&#xff1a; 1. 字符模板库准备&#xff1a;首先&#xff0c;针对每个可能出现的字符&#xff0c;制作一个对应的字符模板。这些模板可以手工创…

同步_异步请求和Ajax并利用axios框架简化

目录 同步和异步 原生的Ajax 创建XMLHttpRequest对象 常用方法 常用属性 axios框架 同步和异步 同步请求&#xff1a;发送请求后&#xff0c;会做出回应&#xff0c;回应的内容会覆盖浏览器中的内容&#xff0c;这样会打断其他正常的操作&#xff0c;显得不太友好&#…

uni-app中使用pinia

目录 Pinia 是什么&#xff1f; uni-app 使用Pinia main.js 中引用pinia 创建和注册模块 定义pinia方式 选项options方式 定义pinia 页面中使用 pinia选项options方式 函数方式 定义pinia 页面中使用 函数方式 定义的pinia Pinia 是什么&#xff1f; Pinia&#xff0…

两个pdf合并成一个pdf怎么合并?这几个方法值得推荐

两个pdf合并成一个pdf怎么合并&#xff1f;pdf文件的合并是一个很常见的需求&#xff0c;特别是在处理工作文件或学习资料时。为了更好的帮助你了解如何将两个pdf文件合并成一个&#xff0c;下面就给大家详细介绍几种合并方法。 方法一&#xff1a;使用迅捷PDF转换器 这是一款…

Linux服务器上配置HTTP和HTTPS代理

本文将向你分享如何在Linux服务器上配置HTTP和HTTPS代理的方法&#xff0c;解决可能遇到的问题&#xff0c;让你的爬虫项目顺利运行&#xff0c;畅爬互联网&#xff01; 配置HTTP代理的步骤 1. 了解HTTP代理的类型&#xff1a;常见的有正向代理和反向代理两种类型。根据实际需求…

Kotlin Executors线程池newSingleThreadExecutor单线程

Kotlin Executors线程池newSingleThreadExecutor单线程 import java.util.concurrent.Executorsfun main() {val mExecutorService Executors.newSingleThreadExecutor()for (i in 1..5) {mExecutorService.execute {println("seq-$i tid:${Thread.currentThread().threa…

【Android Framework系列】第10章 PMS之Hook实现广播的调用

1 前言 前面章节我们学习了【Android Framework系列】第4章 PMS原理我们了解了PMS原理&#xff0c;【Android Framework系列】第9章 AMS之Hook实现登录页跳转我们知道AMS可以Hook拦截下来实现未注册Activity页面的跳转&#xff0c;本章节我们来尝试一下HookPMS实现广播的发送。…

【Java】2021 RoboCom 机器人开发者大赛-高职组(初赛)题解

7-1 机器人打招呼 机器人小白要来 RoboCom 参赛了&#xff0c;在赛场中遇到人要打个招呼。请你帮它设置好打招呼的这句话&#xff1a;“ni ye lai can jia RoboCom a?”。 输入格式&#xff1a; 本题没有输入。 输出格式&#xff1a; 在一行中输出 ni ye lai can jia Robo…

煜邦转债,华设转债,兴瑞转债,神通转债上市价格预测

煜邦转债 基本信息 转债名称&#xff1a;煜邦转债&#xff0c;评级&#xff1a;A&#xff0c;发行规模&#xff1a;4.10806亿元。 正股名称&#xff1a;煜邦电力&#xff0c;今日收盘价&#xff1a;8.82元&#xff0c;转股价格&#xff1a;10.12元。 当前转股价值 转债面值 / …

UI自动化测试

前言 随着智能化信息基础设施的推进&#xff0c;软件开发的进程也不断加快。软件测试工作也逐渐由传统的手工测试向软件自动化测试跨越。 对于很多企业来说&#xff0c;做好软件自动化测试工作已经不仅仅是通过测试工具进行“点点点”&#xff0c;要想找出软件测试过程中的缺…

ESLint是什么?

ESLint 介绍 ESLint 是一款插件&#xff0c;主要用来检测编写的&#xff08; JavaScript &#xff09;代码是否符合规范。当然在一个团队中也会自定义一些规范条件。另外正常情况下我们不需要单独安装 ESLint 去使用&#xff0c;这里只是为了做演示。例如 vue-cli 脚手架搭建的…

【C快学-C语言程序设计(基础篇)】从VSCode中使用C编写我的第一个Hello world

简介&#xff1a;本专栏是一个C语言基础入门知识学习的一个专栏 面向&#xff1a;广大C友 工具&#xff1a;VSCODE 博主&#xff1a;一个友好且宠粉的博主&#xff0c;送书活动小专栏&#xff0c;不定期抽奖送图书给粉丝 社区&#xff1a;&#x1f988;山鱼社区 1.如何配置C语言…

matlab使用教程(15)—图论基础

1.有向图和无向图 1.1什么是图&#xff1f; 图是表示各种关系的节点和边的集合&#xff1a; • 节点 是与对象对应的顶点。 • 边 是对象之间的连接。 • 图的边有时会有权重 &#xff0c;表示节点之间的每个连接的强度&#xff08;或一些其他属性&#xff09;。 这些定…

Tree相关

1.树相关题目 1.1 二叉树的中序遍历&#xff08;简单&#xff09;&#xff1a;递归 题目&#xff1a;使用中序遍历二叉树 思想&#xff1a;按照访问左子树——根节点——右子树的方式遍历这棵树&#xff0c;而在访问左子树或者右子树的时候我们按照同样的方式遍历&#xff0…

​LeetCode解法汇总1572. 矩阵对角线元素的和

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 描述&#xff1a; 给你一个正…

Camx--概述

该部分代码主要位于 vendor/qcom/proprietary/ 目录下&#xff1a; 其中 camx 代表了通用功能性接口的代码实现集合&#xff08;CamX&#xff09;&#xff0c;chi-cdk代表了可定制化需求的代码实现集合&#xff08;CHI&#xff09;&#xff0c;从图中可以看出Camx部分对上作为H…

Redis_缓存2_缓存删除和淘汰策略

14.5 缓存数据的删除和替换 14.5.1 过期数据 可以使用ttl查看key的状态。已过期的数据&#xff0c;redis并未马上删除。优先去执行读写数据操作&#xff0c;删除操作延后执行。 14.5.2 删除策略 redis中每一个value对应一个内存地址&#xff0c;在expires&#xff0c;一个内…
最新文章