RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及批量发送消息、消息过滤实战

文章目录

      • 批量发送消息
      • 消息过滤

批量发送消息

批量发送消息可以减少网络的 IO 开销,让多个消息通过 1 次网络开销就可以发送,提升数据发送的吞吐量

在这里插入图片描述

虽然批量发送消息可以减少网络 IO 开销,但是一次也不能发送太多消息

批量消息直接将多个消息放入集合中发送即可,生产者代码如下:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1、创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");

        // 2、为生产者对象设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 3、把我们的生产者直接启动起来
        producer.start();

        // 4、创建消息、并发送消息
        List<Message> reqList = new ArrayList<>(12);
        for (int i = 0; i < 12; i++) {
            // public Message(String topic, String tags, String keys, byte[] body) {
            Message message = new Message(
                    "custom-batch-topic",
                    "batchTag",
                    "CUSTOM_BATCH",
                    ("("+i+")Hello Message From BATCH Producer, " +
                            "date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes()
            );
            reqList.add(message);

        }

        // 利用生产者对象,将消息直接批量发送出去
        producer.send(reqList);

        System.out.println("Send Finished.");
    }
}

消费者代码如下:

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("custom-batch-topic", "*");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
        System.out.println("Consumer Started Finished.");
    }
}

消息过滤

消费者组中还可以有过滤操作,对同一个 Topic 下的消息的 Tag 标签进行过滤

但是使用消息过滤时需要 保证同一个消费组中消费的消息的 Tag 相同 ,如果同一个消费者组中的两个消费者订阅了不同的 Tag,比如消费者 A 订阅了 Tag1,消费者 B 订阅了 Tag2,那么可能 B 收到了 Tag1 的数据,发现不是自己想要的,于是将 Tag1 的数据过滤掉了,那么就导致了 A 也收不到 Tag1 的数据,造成数据消失的现象

消息过滤流程图如下:

在这里插入图片描述

消息过滤生产者如下:

public class FilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        List<Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            Order order = new Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (Order order : list) {
            Message msg = new Message(
                    "Filter-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Filter_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
            // 直接将 msg 发送出去
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}

过滤 tag 的几种用法:

过滤消息的 tag 主要修改一行代码:consumer.subscribe("Filter-Test-Topic", "tag1");,过滤也分几种情况:

  1. 过滤所有 tag

    consumer.subscribe("Filter-Test-Topic", "*");

  2. 过滤单个 tag

    consumer.subscribe("Filter-Test-Topic", "tag1");

  3. 过滤多个 tag

    consumer.subscribe("Filter-Test-Topic", "TG2 || TG3");

  4. 订阅 SQL92 方式(需要修改 custom.conf 文件,添加一行配置:enablePropertyFilter=true)

    consumer.subscribe("Filter-Test-Topic", MessageSelector.bySql("idx > 10"));

    这里的 idx > 10 的 idx 是在生产者中通过下边这行代码放入的:

    msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
    

消息过滤消费者代码如下(只过滤出 tag = tag1 的消息):

public class Subscribe02_Single_Consumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Subscribe02_Single_Consumer");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("Filter-Test-Topic", "tag1");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
        System.out.println("Consumer Started Finished.");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/253904.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

docker-harbor仓库

Docker 镜像 容器 仓库 仓库&#xff1a;保存镜像 私有&#xff1a;自定义用户的形式登录仓库&#xff0c;拉取或者上传镜像&#xff08;内部管理的用户&#xff09; Harbor&#xff1a;是VMware公司开发的&#xff0c;开源的企业级的docker register项目 帮助用户快速的搭建…

对DataFrame中每列的数值进行限值 指定最小值和最大值:超过最大值的数据,则改为最大值小于最小值的数据,则改为最小值 DataFrame.clip()

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 对DataFrame中每列的数值进行限值 指定最小值和最大值&#xff1a; 超过最大值的数据&#xff0c;则改为最大值 小于最小值的数据&#xff0c;则改为最小值 DataFrame.clip() [太阳]选择题 请…

GoLang 学习 (入门)

go run 1.go 执行命令 go build 1.go 打包为exe 快速 并且无依赖 在开始项目 需要 生成 go.mod go mod init mod 终端执行 go: creating new go.mod: module mod go: to add module requirements and sums:go mod tidy go的基本目录结构 src ------gocode ------------项…

整合SpringSecurity

目录 前言 数据库设计 用户表 角色表 用户角色表 权限表 角色权限表 插入数据 表的实体类 用户表实体类 角色表实体类 权限表实体类 mapper层接口 UserMapper RoleMapper AuthorityMapper 封装登录信息 统一响应结果 上下文相关类 jwt令牌工具类 依赖导入…

Win11极速安装Tensorflow-gpu+CUDA+cudnn

文章目录 0.pip/conda换默认源1.Anacondapython虚拟环境2.安装CUDA以及cudnn测试tensorflow的GPU版本安装成功的办法参考文献 不要使用官网版本&#xff0c;直接使用conda版本&#xff0c;有对应的包&#xff0c;安装很方便 0.pip/conda换默认源 为了高效下载&#xff0c;建议…

nodejs微信小程序+python+PHP邮件过滤系统的设计与实现-计算机毕业设计推荐

邮件过滤系统综合网络空间开发设计要求。该系统主要设计并完成了管理过程中的用户登录、个人信息修改、邮件信息、垃圾箱、意见反馈、论坛等功能。该系统操作简便&#xff0c;界面设计简洁&#xff0c;不但可以基本满足本行业的日常管理工作&#xff0c; 目的是将邮件过滤通过网…

Flink系列之:自定义函数

Flink系列之&#xff1a;自定义函数 一、自定义函数二、概述三、开发指南四、函数类五、求值方法六、类型推导七、自动类型推导八、定制类型推导九、确定性十、内置函数的确定性十一、运行时集成十二、标量函数十三、表值函数十四、聚合函数十五、表值聚合函数 一、自定义函数 …

HttpRunner接口自动化测试框架

简介 HttpRunner是一款面向 HTTP(S) 协议的通用测试框架&#xff0c;只需编写维护一份 YAML/JSON 脚本&#xff0c;即可实现自动化测试、性能测试、线上监控、持续集成等多种测试需求。 项目地址&#xff1a;GitHub - httprunner/httprunner: HttpRunner 是一个开源的 API/UI…

【MySQL表的增删改查】

目录&#xff1a; 前言表的增删改查Create(创建)1.插入插入测试插入否则更新 2.替换 Retrieve(查找)1.SELECT 列全列查找指定列查找查询字段为表达式字段重命名结果去重 2.WHERE条件英语不及格的同学及英语成绩&#xff08;<60&#xff09;语文成绩在 [80, 90] 分的同学及语…

RTOS队列的写入与读出

我们在stm32f103c8t6单片机上验证RTOS队列的写入与读出&#xff0c;利用stm32cube进行RTOS的配置。在选择TIM2当做RTOS的时钟&#xff0c;裸机的时钟源默认是 SysTick&#xff0c;但是开启 FreeRTOS 后&#xff0c;FreeRTOS会占用 SysTick &#xff08;用来生成1ms 定时&#x…

ChatGLM基于LangChain应用开发实践(一)

一、概述 在使用大模型&#xff08;LLM&#xff09;做应用开发时&#xff0c;LangChain是一个主流的开发框架&#xff0c;通过它来构建Agent&#xff0c;根据用户查询访问企业私有数据&#xff0c;调用自定义或者第三方工具库&#xff0c;然后再调用LLM&#xff0c;利用其推理…

qt-C++笔记之std::tostring()、.toStdString()、.toLocal8Bit().constData()的使用场景

qt-C笔记之std::tostring()、.toStdString()、.toLocal8Bit().constData()的使用场景 参考博文&#xff1a;C笔记之system()用于在Qt中执行系统命令的习惯 code review! 注&#xff1a;之所以记录该笔记&#xff0c;是因为在Qt中自己经常使用C语言的int system( const char …

c++11--左值,右值,移动语义,引用折叠,模板类型推断,完美转发

1.移动语义 移动构造和移动赋值均属于移动语义范畴。 移动语义的实现依赖于右值概念&#xff0c;右值引用。 1.1.一个移动构造的实例 #include <iostream> using namespace std; class HasPtrMem{ public:HasPtrMem():d(new int(3)){cout << "Construct: &qu…

信号与线性系统翻转课堂笔记4——连续LTI系统的微分方程模型与求解

信号与线性系统翻转课堂笔记4——连续LTI系统的微分方程模型与求解 The Flipped Classroom4 of Signals and Linear Systems 对应教材&#xff1a;《信号与线性系统分析&#xff08;第五版&#xff09;》高等教育出版社&#xff0c;吴大正著 一、要点 &#xff08;1&#x…

gitee提交代码步骤介绍(含git环境搭建)

1、gitee官网地址 https://gitee.com; 2、Windows中安装git环境 参考博客&#xff1a;《Windows中安装Git软件和TortoiseGit软件》&#xff1b; 3、设置用户名和密码 这里的用户名和密码就是登录gitee网站的用户名和密码如果设置错误&#xff0c;可以在Windows系统的“凭据管理…

Kubernetes (k8s) 快速认知

应用部署方式 传统部署时代 早期的时候&#xff0c;各个组织是在物理服务器上运行应用程序。缺点 资源分配问题&#xff1a; 无法限制在物理服务器中运行的应用程序资源使用 维护成本问题&#xff1a; 部署多个物理机&#xff0c;维护许多物理服务器的成本很高 虚拟化部署时…

论文修改润色算学术不端吗 快码论文

大家好&#xff0c;今天来聊聊论文修改润色算学术不端吗&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff0c;可以借助此类工具&#xff1a; 标题&#xff1a;论文修改润色是否算学术不端&#xff1f;专业软件…

U-boot启动流程与加载内核过程

目录 一、U-boot启动过程流程图二、U-boot启动过程函数简单注释 本篇文章梳理了一下对正点原子的驱动开发教程中u-boot启动流程的梳理&#xff0c;制作了一份流程图&#xff0c;并简单的记录了一下各函数的作用&#xff0c;方便回头翻阅。 一、U-boot启动过程流程图 二、U-boot…

git-lfs基本知识讲解

目录 1. 基本知识2. 安装 1. 基本知识 git-lfs 是 Git Large File Storage 的缩写&#xff0c;是 Git 的一个扩展&#xff0c;用于处理大文件的版本控制。 它允许你有效地管理和存储大型二进制文件&#xff0c;而不会使 Git 仓库变得过大和不稳定。以下是一些与 git-lfs 相关…

机器学习——自领域适应作业

任务 游戏里面的话有很多跟现实不一样的情况。 想办法让中间的特征更加的接近&#xff0c;让feat A适应feat B&#xff0c;产生相对正常的输出。 在有标签数据和没有数据的上面进行训练&#xff0c;并能预测绘画图像。 数据集 训练5000张总数&#xff0c;每类有500张测试100…
最新文章