Kafka_02_Producer详解

Kafka_02_Producer详解

  • Producer
    • ProducerRecord
    • Send&Close
    • 实现原理
      • ProducerInterceptor
      • Serializer
      • Partitioner
    • 事务

Producer

Producer(生产者): 生产并发送消息到Broker(推送)

  1. Producer是多线程安全的(建议通过池化以提高性能)
  2. Producer实例后可发送多条消息(可对应多个ProducerRecord)

// 0.9之后的版本是基于Java实现(之前是Scala实现)


Producer客户端发送消息大致逻辑:

  1. 配置Producer客户端参数并创建该Producer实例
  2. 构建需发送的消息
  3. 发送构建的消息
  4. 关闭实例

构造Producer必填的3个参数:

参数说明
bootstrap.servers引导程序的服务地址
格式: 地址1:端口1,地址N:端口N
(建议指定两个以上的Broker地址以保证稳定性, 且使用主机名形式)
key.serializer发送时对Key调用的序列化器
Broker仅能接受字节数组形式的消息byte[]
value.serializer发送时对Value调用的序列化器
Broker仅能接受字节数组形式的消息byte[]

// 序列化器必须以全限定名方式指定, Java的ProducerConfig类中包含所有的配置参数


ProducerRecord

ProducerRecord(构建消息): Producer每次发送的消息体

  1. ProducerRecord由多个属性构成(Topic和消息是基础属性)
  2. ProducerRecord有多个构造方法(指定属性的个数)
  3. 可根据不同需求创建特定ProducerRecord

ProducerRecord定义:

public class ProducerRecord<K, V> {
    private final String topic;      // Topic(必填)
    private final Integer partition; // Partition

    // 消息头部(0.11版本引入)
    // 指定与应用相关信息(可忽略)
    private final Headers headers;

    // 键(附加信息)
    // 其会用于计算Partition(二次归类)
    private final K key;

    // 值(消息体, 必填)
    // 为空则代表: 墓碑消息
    private final V value;

    // 消息时间戳
    // 细分为CreateTime(消息创建时间)和LogAppendTime(追加日志时间)
    private final Long timestamp;
    ......
}

Send&Close

Send(发送消息): Producer构建ProducerRecord之后发送给Broker

  1. 发送模式: 发后既忘(fire-and-forget)、同步(sync)、异步(async)
  2. 发送模式默认为异步(可通过获取返回值的方法以阻塞等待实现同步)
  3. 返回值通常为发送消息的元数据(Topic、Partition、偏移量和时间戳等)

Send()方法的定义:

public Future<RecordMetadata> send(ProducerRecord<K, V> record);

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
  1. 可通过Future的get()方法阻塞实现同步(返回RecordMetadata对象)
  2. Send()方法需配合try/catch(发送成功或发生异常)
  3. 发送导致的异常分为: 重试异常、不可重试异常

// 不可重试异常发生时会直接抛出并结束


常见的重试异常为:

可重试异常说明
NewworkException网络异常
LeaderNotAvailableException副本的leader不可用
(可能正在选举leader)
UnknownTopicOrPartitionExceptionTopic或Partition异常
NotEnoughReplicasException副本数量不足
NotCoordinatorException协调器异常

Send()方法中的Callback定义:

public interface Callback {
    void onCompletion(RecordMetadata var1, Exception var2);
}
  1. var1和var2参数互斥(两者必有个为null,后者代表异常)
  2. 若两个消息对相同Partition发送消息, 则按发送顺序调用Callback

Close(结束发送):回收Producer实例

  1. 发送结束后务必回收Producer实例(防止资源泄漏)
  2. Close默认会阻塞等待之前所有的发送请求完成之后再回收
  3. 可指定关闭的超时时间(超出该事件则强行回收, 不建议指定)

Close()方法的定义:

public void close();

public void close(long timeout, TimeUnit timeUnit);

实现原理

Producer的发送消息由两个线程完成:

  1. 主线程: 构建并处理消息后发送至RecordAccumulator
  2. Sender线程: 从RecordAccumulator获取消息, 并发送至Broker

如: Producer发送消息链路图

image

  1. RecordAccumulator: 双端队列缓存待发送ProducerBatch以减少网络影响
  2. ProducerBatch: 包含任意多个待发送的ProducerRecord(消息批次)
  3. Request: Kafka支持的各种请求协议
  4. InFlightRequests: 缓存已发送但未响应的Request

// Interceptor和Partitioner可选择性处理, 但必须经Serializer处理


Producer发送ProducerRecord的流程:

  1. 主线程将ProducerRecord加工处理后发送至RecordAccumulator尾部
  2. RecordAccumulator根据ProducerRecord分区选择对应的ProducerBatch
  3. RecordAccumulator根据内存复用原则和ProducerBatch大小决定是否新建
  4. Sender线程从RecordAccumulator头部获取ProducerBatch
  5. <分区, <Deque<ProducerBatch>>形式变为<Node, List<ProducerBatch>>
  6. 再根据各种协议请求转换为<Node, Request>形式
  7. 发送前以Map<nodeId, Deque<Request>>缓存Request
  8. 返回发送后的响应并清理InFlightRequests和RecordAccumulator

// 形式转换是为完成应用逻辑层到网络I/O层的转换


RecordAccumulator内存复用原则:

  1. RecordAccumulator通过java.io.ByteBufferBufferPool实现内存复用
  2. 若内存申请不超过指定大小, 则申请指定大小并放置于BufferPool
  3. 若内存申请超过指定大小, 则申请该内存并再使用后直接释放

// BufferPool可避免频繁的申请和释放内存


InFlightRequest中包含leastLoadedNode

  1. leastLoadedNode: 负载最小的Broker(未确认请求最少的)
  2. leastLoadedNode常用于元数据请求和Consumer组播协议的交互
  3. leastLoadedNode由Sender线程根据指定过期时间维护(主线程也可访问)

// 元数据: Broker、Topic、Partition、leader和follower副本所在的Broker等


如: Sender线程维护leatLoadedNode信息

  1. Sender线程检查元数据是否过期(默认5m)
  2. 超出则挑出leastLoadedNode, 向该Broker发送MetadataRequest请求
  3. 获取结果后将其结果存入InFlightRequests中, 并更新元数据的过期时间

ProducerInterceptor

ProducerInterceptor(拦截器): 消息发送前/后的进行的操作

  1. 不建议通过ProducerInterceptor修改topic、key和partition
  2. 可指定多个ProducerInterceptor(拦截链按配置时顺序执行)
  3. 可通过interceptor.classes参数指定Producer所使用的ProducerInterceptor

ProducerInterceptor定义:

public interface ProducerInterceptor<K, V> extends Configurable {
    // 发送前进行的操作
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    // 发送后被应答之后或失败进行的操作
    // 优先于Send()方法中定义的Callback前执行
    // 由于该方法运行于Producer的IO线程中, 应简洁
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);

    // 关闭拦截器
    public void close();
}

// 抛出的任何异常都会被记录到日志中, 并不再向上抛


Serializer

Serializer(序列化器): 将特定数据转换成字节数组(byte[])

  1. Broker仅能接受字节数组形式的数据(接收后会对其反序列化)
  2. Producer使用的Serializer需和Consumer使用的反序列化器需对应
  3. Producer指定Serializer时, 需通过全限定名方式指定(类的完整路径)

Serializer定义:

public interface Serializer<T> extends Closeable {
    // 配置序列化器
    // 常用于指定编码类型(默认UTF-8)
    void configure(Map<String, ?> configs, boolean isKey);

    // 执行序列化
    byte[] serialize(String topic, T data);

    // 关闭序列化器
    // 需保证幂等性
    void close();
}

// 不建议使用自定义Serializer或DeSerializer, 会增加耦合度


Partitioner

Partitioner(分区器): ProducerRecord分区的默认规则

  1. ProducerRecord中指定partition字段, 则略过Partitioner
  2. Partitioner的分区计算受Topic数量的影响(已分配的不受)
  3. 可通过partitioner.class参数指定Producer所使用的Partitioner

Partitioner定义:

public interface Partitioner extends Configurable, Closeable {
    // 计算并返回分区号
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    // 关闭分区器
    public void close();
}

public interface Configurable {
    // 获取配置信息并初始化数据
    void configure(Map<String, ?> configs);
}

默认的Partitioner: org.apache.kafka.clients.producer.internals.DefaultPartitioner

  1. close()方法默认为空
  2. 消息为null时, 则以轮询的方式分配可用的分区号
  3. 消息不为null时, 则进行Hash计算(MurmurHash2算法)

// 消息相同的情况下会写入相同的分区(存在消息互相覆盖的情况)


事务

事务(Transaction): Producer操作的最小原子单位(可跨Partition)

  1. 开启事务时, 必须也需开启幂等性(enable.idempotence)
  2. 开启事务时必须指定事务ID(若事务ID重复, 将结束被覆盖的事务并抛出异常)
  3. 只能使事务处于以下两种状态(否则将抛出异常): COMMIT、ABORT
  4. 事务开启后需关闭自动位移提交, 也不能位移消费

Producer中常用的事务方法:

// 初始化事务
void initTransactions();

// 开启事务
void beginTransaction();

// 事务内的位移提交
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)

// 提交事务
void commitTransaction();

// 终止事务(回滚)
void abortTransaction();

事务协调器(TransactionCoordinator): 负责事务中的各类操作

  1. 每个Producer都对应个事务协调器, 由其负责Producer中各类请求
  2. 事务协调器会将事务的信息都存储至内部Toipc的__transaction_state

如: 事务的执行流程

image

  1. 查找事务协调器: 找到事务协调器所在的Broker并建立连接(同时查找Partition)
  2. 获取PID: 通过InitProducerIdRequest请求获取该事务ID
  3. 执行事务: 通过各类请求处理Record并将数据存储至内部Topic
  4. 结束事务: 发送各类请求结束事务, 同时将事务信息存储至内部Topic和日志文件

Consumer的事务受以下限制:

  1. 采用日志压缩策略的Topic, 其Record可能被覆盖
  2. Consumer在消费时可能没有分配到事务内的所有Partition
  3. Record可能分布在Partition的多个LogSegment, 存在部分被清除的可能
  4. Consumer可通过位移提交/位移消费访问Record, 可能导致遗漏事务中的Record

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/299909.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

组合数据(Python实现)

一、主要目的&#xff1a; 1&#xff0e;熟悉组合数据的类型。 2&#xff0e;掌握列表、元组、字典、集合等组合数据的创建、访问方法。 3&#xff0e;掌握组合数据推导式的使用方法 4&#xff0e;熟悉组合数据的常见应用。 二、主要内容和结果展示&#xff1a; 1. 使用两…

OpenCV中实现图像旋转的方法

OpenCV中实现图像旋转的方法 函数&#xff1a;cv2.flip() 功能&#xff1a;水平或者垂直翻转 格式&#xff1a;dst cv2.flip(src,flipCode[,dst]) 参数说明&#xff1a; src&#xff1a;输入图像 dst&#xff1a;和原图像具有相同大小、类型的目标图像。 flipCode&#…

Python小细节之Gui图形化界面库的对比和选择(一分钟版)

引言 我想要把打包的python程序变得好看 交互起来变得简单 遂 图形化界面 然 相关的库有很多 所以 对比&#xff01; 开整 8个图形化界面库 在Python中&#xff0c;有多种图形用户界面&#xff08;GUI&#xff09;库可以用来创建丰富的图形化应用程序。以下是一些主要的图…

竞赛练一练 第23期:NOC大赛每日一练,python题目刷题第8天,包含答案解析

题目来自:NOC 大赛创客智慧编程赛项Python 复赛模拟题(二) NOC大赛创客智慧编程赛项Python 复赛模拟题(二) 第一题: 编写一个成绩评价系统,当输入语文、数学和英语三门课程成绩时,输出三门课程总成绩及其等级。 (1)程序提示用户输入三个数字,数字分别表示语文、数学、…

3.1 数据链路层概述

目录 3.1 数据链路层概述3.1.1 关于数据链路层什么是数据链路从协议栈看数据链路层数据链路层信道类型 3.1.2 三个基本问题封装成帧透明传输差错控制循环冗余检验CRC&#xff08;Cyclic Redundancy Check&#xff09;原理 3.1 数据链路层概述 3.1.1 关于数据链路层 什么是数据…

odoo17 | 模型视图继承

前言 Odoo的强大之处在于它的模块化。模块专门用于满足业务需求&#xff0c;但模块也可以彼此交互。这对于扩展现有模块的功能非常有用。例如&#xff0c;在我们的房地产场景中&#xff0c;我们希望在常规用户视图中直接显示销售人员的属性列表。 但是在讨论特定的Odoo模块继…

HackTheBox - Medium - Linux - UpDown

UpDown UpDown 是一台中等难度的 Linux 机器&#xff0c;暴露了 SSH 和 Apache 服务器。在Apache服务器上&#xff0c;有一个Web应用程序&#xff0c;允许用户检查网页是否已启动。服务器上标识了一个名为“.git”的目录&#xff0c;可以下载以显示目标上运行的“dev”子域的源…

GA算法简介

GA算法简介 前言一、GA是什么二、GA简介1.思想2.流程3.过程 前言 今天学习一下优化中非常出名的遗传(GA)算法 &#xff0c;它的起源可是来自达尔文的生物进化论。 一、GA是什么 百科定义&#xff1a;遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;最早是…

Java多线程技术11——ThreadPoolExecutor类的使用1-备份

1 概述 ThreadPoolExecutor类可以非常方便的创建线程池对象&#xff0c;而不需要程序员设计大量的new实例化Thread相关的代码。 2 队列LinkedBlockingQueue的使用 public class Test1 {public static void main(String[] args) {LinkedBlockingQueue queue new LinkedBlocki…

四则运算 C语言xdoj20

问题描述&#xff1a; 输入两个整数和一个四则运算符&#xff0c;根据运算符计算并输出其运算结果&#xff08;和、差、积、商、余之一&#xff09;。注意做整除及求余运算时&#xff0c;除数不能为零。 输入说明&#xff1a; 使用scanf()函数输入两个整数和一个运算符&#xf…

【好书推荐】深入理解现代JavaScript

目录 推荐理由内容简介本书阅读对象为什么推荐这本书&#xff0c;看大佬们怎么说总结 T. J. Crowder是一位拥有30年经验的软件工程师。在他的整个职业生涯中&#xff0c;他至少有一半时间是在使用JavaScript从事开发工作。他经营着软件承包和产品公司Farsight Software。他经常…

工业协议转换网关:打破通信壁垒,实现设备互联

在工业自动化领域&#xff0c;各种设备和系统间的通信协议不尽相同&#xff0c;这给不同设备间的集成和数据交互带来了挑战。工业协议转换网关作为一种解决这一问题的关键设备&#xff0c;能够实现不同协议间的转换和数据传输&#xff0c;打破通信壁垒&#xff0c;提高设备的协…

2.8 EXERCISES

如果我们想使用每个线程来计算向量加法的一个输出元素&#xff0c;那么将线程/块索引映射到数据索引的表达式是什么&#xff1f; 答&#xff1a;C 假设我们想用每个线程来计算向量加法的两个&#xff08;相邻&#xff09;元素。将线程/块索引映射到i&#xff08;由线程处理的…

SpringSecurity集成JWT实现后端认证授权保姆级教程-数据准备篇

&#x1f341; 作者&#xff1a;知识浅谈&#xff0c;CSDN签约讲师&#xff0c;CSDN博客专家&#xff0c;华为云云享专家&#xff0c;阿里云专家博主 &#x1f4cc; 擅长领域&#xff1a;全栈工程师、爬虫、ACM算法 &#x1f492; 公众号&#xff1a;知识浅谈 &#x1f525;网站…

进阶学习——Linux系统安全及应用

目录 一、系统安全加固 1.账号安全基本措施 1.1系统账号清理 1.1.1延伸 1.2密码安全控制 1.3命令历史限制 1.4终端自动注销 二、使用su命令切换用户 1.用途及用法 2.密码验证 3.限制使用su命令的用户 4.查看su操作记录 5.sudo&#xff08;superuse do&#xff09;…

Linux下QT生成的(.o)、(.a)、(.so)、(.so.1)、(.so.1.0)、(.so.1.0.0)之间的区别

记录一下遇到的问题&#xff1a;Linux系统下Qt编译第三方动态库会生成多个.so文件&#xff0c;不了解的小伙伴可能很疑惑&#xff1a; &#xff08;1&#xff09;Linux 下 QT 生成的&#xff08;.o&#xff09;、&#xff08;.a&#xff09;和&#xff08;.so&#xff09;三个文…

如何向嵌入式设备中添加tcpdump工具

说明&#xff1a;tcpdump是一个在网络设备调试中一个非常重要的工具&#xff0c;它并不像hexdump等工具集成在busybox里面&#xff0c;也不像其他的软件一样只需要依赖linux标准的库就可以实现&#xff0c;它需要pcap相关的库和加密的相关库。 本文主要是基于realtek 83系列的…

APPnium 自动化实践 :第一步adb 连接手机

1. 下载安装 adb ,添加到环境变量。 ADB Download - Get the latest version of ADB and fastboot 2. 手机开启开发者模式 https://developer.huawei.com/consumer/cn/doc/quickApp-Guides/quickapp-open-developer-option-0000001137005543 3. adb 连接设备 【And…

网络安全与IP地址:构建数字世界的前沿堡垒

网络安全是当今数字社会中不可忽视的挑战之一。而IP地址&#xff0c;作为互联网通信的基础协议&#xff0c;既是数字化时代的桥梁&#xff0c;也是网络安全的关键节点。本文将剖析IP地址在网络安全领域的作用&#xff0c;以及如何利用其特性建立有效的网络安全策略。 IP地址&a…

【图神经网络导论】之第9章模型变体(刘知远)

第9章不同图类型的模型变体 文章目录 第9章不同图类型的模型变体9.1 有向图9.2 异构图9.3 带有边信息的图9.4 动态图9.5 多维图 第4章介绍的基础GNN模型"被用于处理无向图&#xff0c;这些图包含具有标签的节点&#xff0c;是最简单的图。然而&#xff0c;在现实世界中还有…
最新文章