13、RockerMQ消息类型之广播与集群消息

RocketMq中提供两种消费模式:集群模式和广播模式。

集群模式
集群模式表示同一个消息会被同一个消费组中的消费者消费一次,消息被负载均衡分配到同一个消费者上的多个实例上。

在这里插入图片描述
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
在这里插入图片描述

需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。

通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。

但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。

广播模式
广播消息表示同一个消息会推送到集群里面所有的消费者,保证消息至少被每个消费者消费一次。
在这里插入图片描述

默认情况下是使用集群模式,Broker端会给每一个消费组维护一个统一的offset,这个offset能够保证一个消息在消费组里面只会被消费一次,而广播模式的实现方式,是将本来由Broker保管的offset交个消费者自行保管,而Broker只管往消费者推送消息即可。
注意事项:

01
由于offset由消费者自行保存和记录,Broker只管推送消息,如果消息失败了就不会存在重试的能力。

02
消费者维护的offset是可以在服务重启时,按照上一次消费的进度处理后面没有处理的消息不会影响服务器的性能,但是如果消费者的offset丢失了,消费者的服务可以正常运行但是此时未消费的消息就不能申请了,只能申请后面推送的。

03
offset文件会存放在本地,当然这里面存在很多的坑。

各场景源码分析

广播消息与集群消息作为RocketMQ的两大类型,存在以下几点差异,首先我们通过DefaultMQPushConsumerImpl类的start方法启动消费者。

01
广播模式不支持消息的重试。

private void copySubscription() throws MQClientException {
    try {
        //...
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            // 判断消息模式,如果是广播直接跳出不进行重试
            case BROADCASTING:
                break;
            case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

02
广播模式使用本地offset缓存(start方法)。

switch (this.defaultMQPushConsumer.getMessageModel()) {
    // 广播模式(本地的Offset文件)
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    // 集群模式 (Broker里面的offset)
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
}

03
没有所谓的负载均衡(执行rebalanceByTopic方法)。

case BROADCASTING: {
    // 根据topic获取MessageQueue集合
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    if (mqSet != null) {
        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
        if (changed) {
            this.messageQueueChanged(topic, mqSet, mqSet);
            log.info("messageQueueChanged {} {} {} {}",
                consumerGroup,
                topic,
                mqSet,
                mqSet);
        }
    } else {
        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    }
    break;
}

04
不支持顺序消息:由于关闭消息是所有的消费者都需要去消费。

// 顺序消息更新偏移量
if (processQueue.isLocked()) {
    if (!pullRequest.isPreviouslyLocked()) {
        long offset = -1L;
        try {
            offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
        } catch (Exception e) {
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
            return;
        }
        boolean brokerBusy = offset < pullRequest.getNextOffset();
        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
            pullRequest, offset, brokerBusy);
        if (brokerBusy) {
            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                pullRequest, offset);
        }

        pullRequest.setPreviouslyLocked(true);
        pullRequest.setNextOffset(offset);
    }
} else {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.info("pull message later because not locked in broker, {}", pullRequest);
    return;
}

05
并发消息消费时也没有重试(执行processConsumeResult方法时)。

// 单纯打印信息
case BROADCASTING:
    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
        MessageExt msg = consumeRequest.getMsgs().get(i);
        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
    }
    break;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/238778.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

windows下docker环境安装

开启硬件虚拟化技术 win10中开启 Hyper-V Win10 下是否开启硬件虚拟化技术&#xff0c;在控制面板&#xff0c;启用 window 功能&#xff0c;找到 Hyper-V 选项&#xff0c;点勾选确认。如图&#xff1a; Windows 11 家庭中文版新增 Hyper-V选项 注意以下的解决方案来自win1…

带你手把手 解读 firejail 沙盒源码(0.9.72版本)目录和组件 (一)

文章目录 关于firejail 的介绍src 目录每个文件夹&#xff08;组件&#xff09;的意义文件目录树 关于firejail 的介绍 Firejail 是一个用于 Linux 系统的安全工具&#xff0c;它通过创建轻量级的沙箱环境来运行应用程序。这种沙箱环境将应用程序与系统其余部分隔离&#xff0…

openEuler 20.03 (LTS-SP2) aarch64 cephadm 部署ceph18.2.0【5】 添加osd存储节点

接上篇 openEuler 20.03 (LTS-SP2) aarch64 cephadm 部署ceph18.2.0【1】离线部署 准备基础环境-CSDN博客 openEuler 20.03 (LTS-SP2) aarch64 cephadm 部署ceph18.2.0【2】离线部署 podman配置registries 部署registry私服 准备离线镜像-CSDN博客 openEuler 20.03 (LTS-SP2…

Python手撕kmeans源码

参考了两篇文章 K-Means及K-Means算法Python源码实现-CSDN博客 使用K-means算法进行聚类分析_kmeans聚类分析结果怎么看-CSDN博客 # 定义kmeans类 from copy import deepcopy from sklearn.datasets import make_blobs import numpy as np import matplotlib.pyplot as pltc…

如何充分准备面试,迅速融入团队并在工作中取得卓越成就

首先&#xff0c;关于如何筹备面试&#xff0c;首先需要对所申请公司与职位进行深入的调查了解&#xff0c;并依据可能提出的面试问题预先准备相应的答案&#xff0c;并提前调试面试所需的仪器设备。同时&#xff0c;也要注重自身形象的塑造。更为关键的是 1. 在计算机领域的面…

redis-学习笔记(Jedis)

自定义的 Redis 客户端 咱们可以实现编写出一个自定义的 Redis 客户端 因为 Redis 公开了自己使用的自定义协议 ---- RESP 协议清楚了, 那么通信数据格式就清除了, 就能完成各层次之间的数据传输, 就能开发服务器和客户端 RESP — Redis 的 序列化 协议 特点: 简单好实现快读进…

ETLCloud的应用策略——实时数据处理是关键

一、ETLCloud是什么&#xff1f; ETLCloud又称数据集成&#xff08;DataOps&#xff09;&#xff0c;是RestCloud旗下的一款数据仓库管理工具&#xff0c;通过自动化数据转换和集成来实现企业内部和外部数据的无缝对接&#xff0c;从而帮助企业快速获取准确的数据信息&#xff…

活动预告 | 微盟技术沙龙 - Elasticsearch 在微盟的实践 12/21/2023

微盟技术沙龙 「微盟技术沙龙」是由微盟研发中心发起并联合各方小伙伴为开发者举办的系列技术沙龙&#xff0c;从用户&#xff0c;产品&#xff0c;技术等方面与开发者进行交流。 微盟技术沙龙关注开发者在实际应用中遇到的问题。提供最真实的干货&#xff0c;以技术会友&…

【SpringBoot】从入门到精通的快速开发指南

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《SpringBoot》。&#x1f3af;&#x1f3af; &…

[MySQL]SQL优化之sql语句优化

&#x1f308;键盘敲烂&#xff0c;年薪30万&#x1f308; 目录 一、索引优化 回顾&#xff1a; &#x1f4d5;索引分类&#xff1a; &#x1f4d5;索引失效&#xff1a; &#x1f4d5;设计原则&#xff1a; &#x1f4d5;SQL性能分析 二、SQL优化 语句优化 &#x1f4d…

Gorm 的关联查询

背景介绍 gorm 与 mybatis-plus 、hibernate 等 ORM 框架一样&#xff0c;为了应对查询场景居多的现象&#xff0c;支持原生 sql 和 api 两种方式读数据库。 gorm 原生 sql 参见&#xff1a;https://gorm.io/docs/sql_builder.html。 gorm 提供的 api 支持关联插入、关联查询…

Jmeter入门

一、下载jmeter 官网下载 下载之后解压&#xff0c;在目录/bin下面找到jmeter.bat双击之后即可启动Jmeter。 二、使用 如下左图&#xff0c;选择语言为中文&#xff0c;可以修改测试计划的名称。如下右图&#xff0c;添加线程组 添加线程组 添加http请求 路径传参方式 …

Linux——MySQL数据库系统

一、 MySQL的编译安装 1、准备工作 &#xff08;1&#xff09;为了避免发生端口冲突&#xff0c;程序冲突等现象&#xff0c;建议先查询MySQL软件的安装情况&#xff0c;确认没有使用以Rpm方式安装的mysql-server、mysql软件包&#xff0c;否则建议将其卸载 [rootlocalhost ~]…

mars3d加载arcgis发布的服务,⽀持4523坐标

问题 1.从这个服务地址加载&#xff0c;具体在哪⾥去转坐标呢&#xff1f; 加个 usePreCachedTilesIfAvailable&#xff1a;false 参数即可 坐标系为4490的arcgis影像服务图层&#xff0c;配置后瓦片加载不出来&#xff0c;没报错 甚至可以跳转 没有看出问题&#xff0c;或者测…

RK3568驱动指南|第八篇 设备树插件-第75章ConfigFS的核心数据结构

瑞芯微RK3568芯片是一款定位中高端的通用型SOC&#xff0c;采用22nm制程工艺&#xff0c;搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码&#xff0c;支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU&#xff0c;可用于轻量级人工…

神由之星加入元宇宙产业委员会共谋数字发展新篇章

近年来,元宇宙产业呈现出飞速发展的趋势,成为全球范围内备受瞩目的新兴行业。在这个充满机遇与挑战的时代,常州神由之星数字信息产业发展有限公司凭借敏锐的洞察力和卓越的数字产品,迅速抓住元宇宙的发展势头,大力发展元宇宙业务,成为该领域内一颗冉冉升起的新星。 神由之星荣膺…

数据分享 I 全国市级商品房屋销售数据,shp/excel格式,2005-2020年数据

基本信息. 数据名称: 全国市级商品房屋销售数据 数据格式: Shp、excel 数据时间: 2005-2020年 数据几何类型: 面 数据坐标系: WGS84坐标系 数据来源&#xff1a;网络公开数据 数据字段&#xff1a; 序号字段名称字段说明1spxse商品房销售额&#xff08;亿元&#xf…

优思学院|如何建立公司运营指标体系?如何推行六西格玛改进运营指标?

关键绩效指标 (KPI) 是测量您团队或组织朝重要商业目标进展表现如何的量化指标&#xff0c;组织会在多个层面使用 KPI&#xff0c;这视乎您想要追踪何指标而定&#xff0c;您可以设定全组织的、特定团队的、或甚至是个人 KPI。 良好的KPI能让公司管理者掌握组织的营运是否进度…

在设计和考虑建造室外雨水收集池时需要注意的因素

在设计和建造室外雨水收集池时&#xff0c;需要考虑以下因素&#xff1a; 地质条件&#xff1a;建造雨水收集池需要考虑到地质条件&#xff0c;例如土壤类型、地基承载能力等。这些因素可能对水池的建造和结构产生影响。 气候条件&#xff1a;不同地区的降雨量、湿度、气温等…

Spring基于xml半注解开发

目录 Component的使用 依赖注解的使用 非自定义Bean的注解开发 Component的使用 基本Bean注解&#xff0c;主要是使用注解的方式替代原有的xml的<bean>标签及其标签属性的配置&#xff0c;使用Component注解替代<bean>标签中的id以及class属性&#xff0c;而对…