RocketMQ5.0消息过滤

前言

消费者订阅了某个主题后,RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Broker 端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。

以电商交易场景为例,用户从下单到拿到商品,中间会产生很多消息,被不同的下游系统订阅消费。下游系统往往只关心自己需要处理的消息,比如支付系统只关心支付消息,这时候生产者就可以在发送消息的时候给消息打上标签,下游系统按需订阅即可。
image.png

过滤方式

RocketMQ 支持两种消息过滤方式。

Tag标签过滤

生产者在发送消息前,可以先给消息打上标签,每条消息最多设置一个 Tag 标签:

Message message = provider.newMessageBuilder()
        .setTopic("Trade_Topic")
        .setTag("pay")
        .setBody("xxx".getBytes())
        .build();
producer.send(message);

消费者配置 Tag 标签过滤规则:

consumer.subscribe("Trade_Topic", 
                   new FilterExpression("pay", FilterExpressionType.TAG));

Tag 标签过滤规则:

  • 单 Tag 匹配:过滤表达式为目标 Tag,相同 Tag 的消息才会投递给消费者
  • 多 Tag 匹配:过滤表达式为多个目标 Tag 用||分割,消息符合任一 Tag 就会被投递
  • 全部匹配:过滤表达式为*,所有消息都会投递

SQL属性过滤

SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,每个消息都可以额外设置用户属性和系统属性,消费者订阅时可设置 SQL 语法的过滤表达式过滤多个属性。

SQL 过滤也可以实现 Tag 标签过滤的效果,Tag 属于系统属性,属性名称是 TAGS

首先,生产者发送消息前给消息设置自定义属性:

Message message = provider.newMessageBuilder()
        .setTopic("Trade_Topic")
        .setBody("xxx".getBytes())
        .addProperty("price", "99800")
        .addProperty("region", "杭州")
        .build();
producer.send(message);

消费者配置 SQL 过滤规则,这里以 杭州区域价格大于 100 的订单 为例:

consumer.subscribe("Trade_Topic", 
                   new FilterExpression("region='杭州' AND price>10000", FilterExpressionType.SQL92));

SQL 属性过滤使用 SQL92 语法作为过滤规则表达式,语法规范如下:
image.png

如何选择

尽量用 Tag 标签过滤,实现更加轻量级,效率更高,在扫描 ConsumeQueue 时就可以先通过 TagHash 过滤一遍。而消息属性是存储在 CommitLog 文件里的,意味着 SQL 属性过滤必须读到完整的消息才能判断是否要过滤,性能较差。

设计实现

org.apache.rocketmq.store.MessageFilter是 RocketMQ 抽象出来的消息过滤接口,两个方法:

  • isMatchedByConsumeQueue:通过 ConsumeQueue 里的 tagsCode 先匹配一次,也就是 Tag 标签的哈希码,tagsCode 不同 Tag 肯定不同
  • isMatchedByCommitLog:根据 CommitLog 里的完整消息属性匹配
public interface MessageFilter {
    
    boolean isMatchedByConsumeQueue(final Long tagsCode,
        final ConsumeQueueExt.CqExtUnit cqExtUnit);

    boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
        final Map<String, String> properties);
}

RocketMQ 的处理逻辑是:先根据 ConsumeQueue 里的 tagsCode 过滤,通过了再读取 CommitLog 里的完整消息走 SQL 属性过滤,实现类会根据配置的过滤规则在不关心的过滤方法里直接返回 true。

public GetMessageResult getMessage(){
    ......
	// 先通过consumequeue里的tagsCode过滤
    if (messageFilter != null
        && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        continue;
    }
	// 再从CommitLog读取完整消息
	SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
	// 再执行SQL属性过滤
    if (messageFilter != null
        && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        selectResult.release();
        continue;
    }
	......
}

Tag 标签过滤的实现
Broker 把消息写入 CommitLog 后,ReputMessageService 线程会每隔 1ms 把新消息写入到 consumequeue 文件,以加速消费者的消费效率。ConsumeQueue 文件由若干个 CqUnit 组成,每个 CqUnit 占用固定的 20 个字节:

CqUnit{
    long offset; // 消息在 CommitLog 偏移量
    int size; // 消息长度
    long tagsCode; // Tag哈希码
}

image.png
消费者在消费 ConsumeQueue 时就可以直接通过 tagsCode 进行标签过滤:

public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    // by tags code.
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
        if (tagsCode == null) {
            return true;
        }
        // '*' 订阅所有
        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
            return true;
        }
        // 消息的tagsCode是否包含在消费者订阅的Tags里面
        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
    }
}

因为是哈希码,所以 tagsCode 存在哈希冲突的可能性,不过概率极小。万一冲突了,Broker 还是会继续投递消息,RocketMQ 5.0 版本会由 Proxy 再进行一次 Tag 的精准匹配,如果不匹配不会投递给消费者;RocketMQ 4.x 版本由消费者收到消息后自行判断,Tag 不匹配的消息会直接丢弃。

SQL 属性过滤的实现
为了执行 SQL 语法实现属性过滤,SQL 语法会先被编译成 Expression 对象,再由Expression#evaluate方法得出执行结果。

Expression expression = FilterFactory.INSTANCE
						.get(ExpressionType.SQL92)
        				.compile("a>10 AND b<10 OR c=10");
expression.evaluate(context);

要对消息属性过滤,首先要把消息属性提取出来,消息属性由若干个 String 类型的键值对组成,然后执行 SQL。

public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
    // tag过滤 直接返回true
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
        return true;
    }
    ConsumerFilterData realFilterData = this.consumerFilterData;
    // 消息属性
    Map<String, String> tempProperties = properties;
    // 没有SQL表达式
    if (realFilterData == null || realFilterData.getExpression() == null
        || realFilterData.getCompiledExpression() == null) {
        return true;
    }
    if (tempProperties == null && msgBuffer != null) {
        // 从CommitLog解码出消息属性
        tempProperties = MessageDecoder.decodeProperties(msgBuffer);
    }
    Object ret = null;
    try {
        MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
        // 执行SQL92表达式过滤
        ret = realFilterData.getCompiledExpression().evaluate(context);
    } catch (Throwable e) {
        log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
    }
    if (ret == null || !(ret instanceof Boolean)) {
        return false;
    }
    return (Boolean) ret;
}

尾巴

消息过滤是 RocketMQ 防止 Broker 端因为投递大量消费者不感兴趣的消息而导致资源浪费的一种手段,消费者可以根据自己感兴趣的消息类型配置过滤规则,分为 Tag 标签过滤 和 SQL 属性过滤两种方式。Tag 标签过滤效率高,因为 Broker 在构建 consumequeue 文件时会写入消息 Tag 的哈希码,直接比较哈希码可以避免通过 CommitLog 读取完整消息。SQL 针对消息属性过滤,此时必须读取到完整的消息才能过滤,效率较低。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/288843.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue3 结合typescript 组合式函数(2)

安装axios&#xff1a;npm install axios 1、hooks文件夹下新建useURLLoader 在APP.VUE中使用useURLLoader 使用Dog API 2、使用对象中的属性&#xff0c;必须使用toRefs&#xff0c;否则Reactive响应失效 3、使用泛型 结果&#xff1a;

爬虫如何获取免费代理IP(二)

89ip代理爬取代码实现 一、代码实现 import requests import time import random from fake_useragent import UserAgent from lxml import etree import os import csv""" 89ip代理爬取 """class IPSipder(object):def __init__(self):self.u…

【损失函数】Quantile Loss 分位数损失

1、介绍 Quantile Loss&#xff08;分位数损失&#xff09;是用于回归问题的一种损失函数&#xff0c;它允许我们对不同分位数的预测误差赋予不同的权重。这对于处理不同置信水平的预测非常有用&#xff0c;例如在风险管理等领域。 当我们需要对区间预测而不单是点预测时 分位…

ArkTS语言应用开发入门指南与简单案例解析

文章目录 前言创建项目及其介绍简单案例学习本文总结问答回顾-学习前言 在前几节课中,我们已经了解了ArkTS语言的特点以及其基本语法。现在,我们将正式利用ArkTS来进行应用开发。本节课将通过一个快速入门案例,让大家熟悉开发工具的用法,并介绍UI的基础概念。 创建项目及…

5分钟理解什么是多模态

大家好&#xff0c;我是董董灿。 大模型越来越多了&#xff0c;大模型下沉的行业也越来越多。前几周一个在电厂工作的老哥发消息问我&#xff1a;大模型中所谓的多模态是什么意思&#xff1f; 我当时大概跟他解释了一下。 其实在人工智能领域&#xff0c;我们经常会听到&quo…

力扣hot100 对称二叉树 递归 队列

&#x1f468;‍&#x1f3eb; 题目地址 &#x1f468;‍&#x1f3eb; 参考思路 递归的难点在于&#xff1a;找到可以递归的点 为什么很多人觉得递归一看就会&#xff0c;一写就废。 或者说是自己写无法写出来&#xff0c;关键就是你对递归理解的深不深。 对于此题&#xf…

Java后端开发——Spring实验

文章目录 Java后端开发——Spring实验一、Spring入门1.创建项目&#xff0c;Spring依赖包。2.创建JavaBean&#xff1a;HelloSpring3.编写applicationContext.xml配置文件4.测试&#xff1a;启动Spring&#xff0c;获取Hello示例。 二、Spring基于XML装配实验1.创建JavaBean类&…

requests库中Session对象超时解决过程

引言 在使用Python进行网络请求时&#xff0c;requests库是一个非常常用的工具。它提供了Session对象来管理和持久化参数&#xff0c;例如cookies、headers等。但是&#xff0c;对于一些需要长时间运行的请求&#xff0c;我们需要设置超时时间来避免长时间等待或者无限期阻塞的…

互联网加竞赛 Yolov安全帽佩戴检测 危险区域进入检测 - 深度学习 opencv

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; Yolov安全帽佩戴检测 危险区域进入检测 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 该项目较为新颖&am…

Java学习——设计模式——结构型模式2

结构型模式 结构型模式主要涉及如何组合各种对象以便获得更好、更灵活的结构。虽然面向对象的继承机制提供了最基本的子类扩展父类的功能&#xff0c;但结构型模式不仅仅简单地使用继承&#xff0c;而更多地通过组合与运行期的动态组合来实现更灵活的功能。 包括&#xff1a; 1…

jmeter的安装与目录介绍

1、启动 apache-jmeter-5.0\bin 2、永久修改中文配置 zh-CN就行了

海外静态IP和动态IP有什么区别?推荐哪种?

什么是静态ip、动态ip&#xff0c;二者有什么区别&#xff1f;哪种好&#xff1f;关于这个问题&#xff0c;不难发现&#xff0c;在知道、知乎上面的解释有很多&#xff0c;但据小编的发现&#xff0c;这些回答都是关于静态ip和动态ip的专业术语解释&#xff0c;普通非专业人事…

IDEA设置新建类注释、手动注释详解

文章目录 一、背景二、模板三、设置方法1、新建类注释设置2、手动注释设置 一、背景 每次在一台新电脑安装idea&#xff0c;都需要重新设置idea注释配置&#xff0c;说常用吧&#xff0c;也就新安装时才用&#xff0c;时间久步骤容易忘记&#xff0c;所以用此文章记录一下。 二…

学习Java中的数据结构及API这一篇就够了

Java中的数据结构及API 1. 线性表1-1. 顺序表Array数组ArrayList集合 1-2. 链表自定义链表LinkedList 2. 队列2-1. ArrayDeque2-2. LinkedList2-3. 区别 3. 栈3-1. ArrayDeque3-2. LinkedList 4. 树4-1. 二叉树定义 5. 图5-1. 图定义 1. 线性表 1-1. 顺序表 顺序表是指用一组…

用js让用户输入一个数累加和

需求&#xff1a;用户输入一个数&#xff0c; 计算 1 到这个数的和。 比如 用户输入的是 5&#xff0c; 则计算 1~5 之间的累加和 并且输出到控制台 <body><script>let numprompt(请输入一个数)let sum0for(let i1;i<num;i){sumi}console.log(sum)</script…

java servlet软件缺陷库管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java servlet软件缺陷库管理系统是一套完善的java web信息管理系统 系统采用serlvetdaobean&#xff08;mvc模式)&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOM…

Axure鲜花商城网站原型图,网上花店订花O2O本地生活电商平台

作品概况 页面数量&#xff1a;共 30 页 兼容软件&#xff1a;仅支持Axure RP 9/10&#xff0c;非程序软件无源代码 应用领域&#xff1a;鲜花网、花店网站、本地生活电商 作品特色 本作品为「鲜花购物商城」网站模板&#xff0c;高保真高交互&#xff0c;属于O2O本地生活电…

翻转课堂是什么意思

在教育方面&#xff0c;老师们常听到各种新颖的教学理念和模式&#xff0c;但翻转课堂无疑是最具颠覆性和创新性的一个。那么&#xff0c;翻转课堂究竟怎么翻转呢&#xff1f; 让我们先了解一下“翻转”二字。在传统的课堂上&#xff0c;教师是知识的传授者&#xff0c;学生则是…

阿里云服务器系统盘高效云盘、ESSD Entry云盘、SSD云盘、ESSD云盘测评

阿里云服务器系统盘或数据盘支持多种云盘类型&#xff0c;如高效云盘、ESSD Entry云盘、SSD云盘、ESSD云盘、ESSD PL-X云盘及ESSD AutoPL云盘等&#xff0c;阿里云百科aliyunbaike.com详细介绍不同云盘说明及单盘容量、最大/最小IOPS、最大/最小吞吐量、单路随机写平均时延等性…

Python电能质量扰动信号分类(四)基于CNN-BiLSTM的一维信号分类模型

往期精彩内容&#xff1a; 引言 1 数据集制作与加载 1.1 导入数据 1.2 制作数据集 2 CNN-BiLSTM分类模型和超参数选取 2.1定义CNN-BiLSTM分类模型 2.2 设置参数&#xff0c;训练模型 3 模型评估 3.1 准确率、精确率、召回率、F1 Score 3.2 十分类混淆矩阵&#xff1a…
最新文章