Kafka3.0.0版本——生产者分区及分区策略

目录

    • 一、生产者分区优点
    • 二、生产者发送消息的分区策略
      • 2.1、默认的分区器
      • 2.2、指定分区(partition)值
      • 2.3、没有指明分区(partition)值,但有key 的情况
      • 2.4、 既没有分区(partition)值,又没有key 值的情况
    • 三、指定分区(partition)值的代码示例
    • 四、没有指明分区(partition)值,但有key值的情况代码示例
    • 五、 既没有分区(partition)值,又没有key 值的情况代码示例

一、生产者分区优点

  • 便于合理使用存储资源。
    每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
  • 提高并行度。
    生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

二、生产者发送消息的分区策略

2.1、默认的分区器

  • 在 IDEA中 ctrl +n,全局查找 DefaultPartitioner类,DefaultPartitioner类实现了Partitioner接口。
    在这里插入图片描述

2.2、指定分区(partition)值

  • 在IDEA 中全局查找 (ctrl +n )ProducerRecord类 , 在类中可以看到如下4个构造方法:
  • 如下4个构造方法:指明了partition 的情况下,直接将指明的值作为partition值 ;
  • 例如partition=0,所有数据写入分区0
    在这里插入图片描述

2.3、没有指明分区(partition)值,但有key 的情况

  • 在IDEA 中全局查找 (ctrl +n )ProducerRecord类 , 在类中可以看到如下1个构造方法:
  • 没有指明partition 值但有key 的情况下,将key 的hash值topic的partition数进行 取余 得到partition 值;
  • 例如:key1 的hash 值=5,key2 的hash 值=6 ,topic 的partition 数=2,那么 么key1 对应的value1 写入1 号分区,key2 对应的value2 写入0
    在这里插入图片描述

2.4、 既没有分区(partition)值,又没有key 值的情况

  • 在IDEA 中全局查找 (ctrl +n )ProducerRecord类 , 在类中可以看到如下1个构造方法:

  • Kafka 采用Sticky Partition( 黏性分区器 ) , 会随机选择一个分区 , 并尽可能一直使用该分区 , 待该分区的batch已满或者已完成,Kafka 再随机一个分区进行使用( 和上一次的分区不同)

  • 例如:第一次随机选择0 号分区 ,等0 号分区当前批次满了 ( 默认16k)或者linger.ms 设置的时间到,Kafka 再随机一个分区进行使用 ( 如果还是0 会继续随机 )

    在这里插入图片描述

三、指定分区(partition)值的代码示例

  • 指定数据发送到 0 号分区的代码示例

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    /**
     * @author: xz
     * @since: 2023/4/2 22:02
     * @description: 生产者将数据发往指定 partition的情况
     */
    public class CustomProducerCallbackConfirmPartitions {
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                //指定数据发送到 0 号分区,key 为空
                kafkaProducer.send(new ProducerRecord<>("news", 0,"","hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }else {
                            exception.printStackTrace();
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
  • 在kafka集群某一台服务器上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh  --bootstrap-server 192.168.136.27:9092 --topic news
    

    在这里插入图片描述

  • 在 IDEA 中执行代码,观察 开启 Kafka 消费者的服务器中是否接收到消息。如下图所示:
    在这里插入图片描述

  • 在 IDEA 控制台观察回调信息。
    在这里插入图片描述

四、没有指明分区(partition)值,但有key值的情况代码示例

  • 代码示例

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    /**
     * @author: xz
     * @since: 2023/4/2 22:25
     * @description: 生产者发送数据没有指明 partition 值,但有key值的情况下
     */
    public class CustomProducerCallbackNoPartitionsConfirmKey {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 3; i++) {
                //依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
                kafkaProducer.send(new ProducerRecord<>("news", "a","hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }else {
                            exception.printStackTrace();
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
  • 在kafka集群某一台服务器上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh  --bootstrap-server 192.168.136.27:9092 --topic news
    

    在这里插入图片描述

  • 当key="a"时,在控制台查看结果。如下图:
    在这里插入图片描述

  • 当key="b"时,在控制台查看结果。如下图:
    在这里插入图片描述

  • 当key="f"时,在控制台查看结果。如下图:
    在这里插入图片描述

五、 既没有分区(partition)值,又没有key 值的情况代码示例

  • 代码示例

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    /**
     * @author: xz
     * @since: 2023/4/2 22:49
     * @description: 生产者发送数据 既没有分区(partition)值,又没有key值
     */
    public class CustomProducerCallbackNoPartitionsAndKey {
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
  • 在kafka集群某一台服务器上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh  --bootstrap-server 192.168.136.27:9092 --topic news
    

    在这里插入图片描述

  • 在 IDEA 中执行代码,观察开启 Kafka 消费者的服务器中是否接收到消息。如下图所示:
    在这里插入图片描述

  • 在 IDEA 控制台观察回调信息。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/5599.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vscode折叠展开快捷键

1.折叠所有代码 (按住ctrl 分别点击k和0) ctrlk,ctrl0 2.展开所有代码 (按住ctrl 分别点击k和j) ctrlk,ctrlj 3. 折叠鼠标竖线所在位置的节点以及当前节点下的子节点&#xff08;递归折叠&#xff09; ctrlk,ctrl[ 4. 展开鼠标竖线所在位置的节点以及当前节点下的子节点&#…

OpenFeign 源码解读:动态代理+负载均衡实现

OpenFeign使用EnableFeignClients开启服务&#xff0c;该注解标有Import(FeignClientsRegistrar.class)&#xff0c;该ImportBeanDefinitionRegistrar会利用扫描路径的方式扫描java文件中带有的FeignClient(...)的接口&#xff0c;关于这种扫描注解的方式&#xff0c;我仿照写了…

软件测试 - 测试用例常见面试题

1.测试用例的要素测试用例是为了实施测试而向被测试的系统提供的一组集合, 这组集合包含 : 测试环境, 操作步骤, 测试数据, 预期结果等要素.例如 : 在 B 站输入框输入一个空格, 检查结果测试用例标题 : 输入框输入空格测试环境 : Windows 系统, 谷歌浏览器-版本 111.0.5563.65&…

固态硬盘需要分区吗 固态硬盘怎么分区

磁盘分区是在磁盘中划分几个逻辑部分&#xff0c;来更充分的利用磁盘空间&#xff0c;对保存的数据进行分类储存&#xff0c;方便使用。今天小编给大家介绍一下&#xff0c;固态硬盘需要分区吗&#xff0c;固态硬盘怎么分区。 一、固态硬盘需要分区吗 固态硬盘是需要分区的&a…

Redis:redis通用命令;redis常见数据结构;redis客户端;redis的序列化

一、redis命令 1.redis通用命令 Redis 通用命令是一些 Redis 下可以作用在常用数据结构上的常用命令和一些基础的命令 常见的命令有&#xff1a; keys 查看符合模板的所有key&#xff0c;不建议在生产环境设备上使用&#xff0c;因为keys会模式匹配所有符合条件的key&#…

js常见的9种报错记录一下

js常见报错语法错误(SyntaxError)类型错误(TypeError)引用错误(ReferenceError)范围错误(RangeError)运行时错误(RuntimeError)网络错误&#xff08;NetworkError&#xff09;内部错误&#xff08;InternalError&#xff09;URI错误&#xff08;URIError&#xff09;eval错误&a…

electron+vue3全家桶+vite项目搭建【五】集成Pinia全局状态管理

文章目录引入1.引入依赖2.集成Pinia3.使用pinia4.测试效果引入 在vue2的体系中&#xff0c;vuex是官方推荐的状态管理工具&#xff0c;而vue3的体系中&#xff0c;官网同样推荐了一款状态管理工具&#xff0c;他就是 Pinia Pinia官网 demo项目地址 1.引入依赖 npm install…

docker 安装运行 nacos2.0.3

目录 1、拉取镜像 2、挂载目录 mkdir -p /opt/nacos/logs/ #新建logs目录mkdir -p /opt/nacos/conf/ #新建配置目录vim /opt/nacos/conf/application.properties #修改配置文件 3、application.properties内容 4、初始化nacos的脚…

Vue的简单介绍

一、简介 Vue (发音为 /vjuː/&#xff0c;类似 view) 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建&#xff0c;并提供了一套声明式的、组件化的编程模型&#xff0c;帮助你高效地开发用户界面。无论是简单还是复杂的界面&#xff0c;…

生成式 AI 背后的共同框架:Stable Diffusion、DALL-E、Imagen

前言 如果你对这篇文章感兴趣&#xff0c;可以点击「【访客必读 - 指引页】一文囊括主页内所有高质量博客」&#xff0c;查看完整博客分类与对应链接。 框架 这些生成式 AI 的整体功能为&#xff1a;输入「文字」&#xff0c;返回「图像」&#xff0c;即 Text-to-image Gener…

DBeaver安装教程及基础使用手册

目录 一、简介 基本特性 二、DBeaver安装 三、连接SQL方法 一、简介 DBeaver是免费和开源&#xff08;GPL&#xff09;为开发人员和数据库管理员通用数据库工具。 它支持任何具有一个JDBC驱动程序数据库&#xff0c;也可以处理任何的外部数据源。 DBeaver 通过 JD…

自动化运维软件ansible

一、ansible 基于python语言。简单快捷&#xff0c;被管理端不需要启服务。直接走ssh协议,需要验证&#xff0c;所以机器多的话速度会较慢。 1、ansible环境搭建 5.确认和配置yum源(需要epel源) 免密登录复制的时候可以直接 写ip 不加参数-i 2、服务器分组&#xff08;主机清单…

java的Lambda表达式与方法引用详解

1. 定义 Lambda 表达式&#xff0c;也可称为闭包&#xff0c;它是推动 Java 8 发布的最重要新特性。 Lambda 允许把函数作为一个方法的参数&#xff08;函数作为参数传递进方法中&#xff09;。 使用 Lambda 表达式可以使代码变的更加简洁紧凑。 1.1 通用定义 lambda 表达…

知识图谱实战应用4-知识图谱中寻找相似用户(协同过滤算法)

大家好&#xff0c;我是微学AI&#xff0c;今天给大家讲一下知识图谱中利用协同过滤算法寻找相似用户。大家会看到一个新的名词&#xff1a;“协同过滤”&#xff0c;下面来介绍一下协同过滤算法。 一、协同过滤算法 协同过滤算法是一种基于用户行为分析的推荐算法。它的基本…

php微信小程序java+Vue高校课程课后辅导在线教育系统nodejs+python

目 录 1绪论 1 1.1项目研究的背景 1 1.2开发意义 1 1.3项目研究现状及内容 5 1.4论文结构 5 2开发技术介绍 7 2.1 B/S架构 7 2.2 MySQL 介绍 7 2.3 MySQL环境配置 7 2.5微信小程序技术 8 3系统分析 9 3.1可行性分析 9 3.1.1技术可行性 9 3.1.2经济可行性 9 3.1.3操作可行性 10 …

MySQL的查询完结,vju树状题组完结,cf补题

目录 MySQL 查询 比较条件 判空 逻辑条件 模糊条件 where in 聚合查询 排序查询 vju 线段树OR树状数组 - Virtual Judge cf Problem - A - Codeforces Problem - A - Codeforces Problem - B - Codeforces 周总结 MySQL 查询 比较条件 SELECT *FROM student WH…

细思极恐,第三方跟踪器正在获取你的数据,如何防范?

细思极恐&#xff0c;第三方跟踪器正在获取你的数据&#xff0c;如何防范&#xff1f; 当下&#xff0c;许多网站都存在一些Web表单&#xff0c;比如登录、注册、评论等操作需要表单。我们都知道&#xff0c;我们在冲浪时在网站上键入的数据会被第三方跟踪器收集。但是&#x…

[C++]C++基础知识概述

目录 C基础知识概述&#xff1a;&#xff1a; 1.什么是C 2.C发展史 3.C关键字 4.命名空间 5.C的输入输出 6.缺省参数 7.函数重载 8.引用 9.内联函数 10.auto关键字(C11) 11.基于范围的for循环(C11) 12.指针空值—nullptr(C11) C基础知识概述&#xff1…

React中使用lodash防抖失效解决

React中使用lodash防抖失效解决 import {Input} from antd; import lodash from lodash; // lodash下的防抖函数 const debounce lodash.debounce; // 防抖打印&#xff0c;希望输入的时候&#xff0c;延迟0.5s后打印值 const getSuggestion debounce((val:string) > {co…

SpringCloud微服务技术栈.黑马跟学(九)

SpringCloud微服务技术栈.黑马跟学 九今日目标1.分布式事务问题1.1.本地事务1.2.分布式事务1.3.演示分布式事务问题2.理论基础2.1.CAP定理2.1.1.一致性2.1.2.可用性2.1.3.分区容错2.1.4.矛盾2.2.BASE理论2.3.解决分布式事务的思路3.初识Seata3.1.Seata的架构3.2.部署TC服务一、…