【Kafka-3.x-教程】-【二】Kafka-生产者-Producer

【Kafka-3.x-教程】专栏:

【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入门
【Kafka-3.x-教程】-【二】Kafka-生产者-Producer
【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft
【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer
【Kafka-3.x-教程】-【五】Kafka-监控-Eagle
【Kafka-3.x-教程】-【六】Kafka 外部系统集成 【Flume、Flink、SpringBoot、Spark】
【Kafka-3.x-教程】-【七】Kafka 生产调优、Kafka 压力测试

【Kafka-3.x-教程】-【二】Kafka-生产者-Producer

  • 1)生产者消息发送流程
    • 1.1.🚀发送原理
    • 1.2.生产者重要参数列表
  • 2)异步发送
    • 2.1.普通异步发送
    • 2.2.带回调函数的异步发送
  • 3)同步发送 API
  • 4)生产者分区
    • 4.1.分区好处
    • 4.2.生产者发送消息的分区策略
      • 4.2.1.默认的分区器 DefaultPartitioner
      • 4.2.2.自定义分区器
  • 5)生产经验
    • 5.1.生产者如何提高吞吐量
    • 5.2.数据可靠性(ack)
    • 5.3.数据去重(一致性语义、幂等性、生产者事务)
    • 5.4.数据有序
    • 5.5.数据乱序

1)生产者消息发送流程

1.1.🚀发送原理

在这里插入图片描述

在消息发送的过程中,涉及到了两个线程 — — main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

1、外部数据通过 Producer 生产者中的 main 线程,调用 send 方法后,会经历三个阶段,分别是 Interceptors(拦截器)、Serializer(序列化器)、Partitioner(分区器)

(1)Interceptors 是做拦截过滤操作的,但是生产中一般不适用 Kafka 做拦截处理,一般都是使用 Flume 在上游做拦截处理.

(2)Serializer 将数据进行序列化操作,但是注意这里的序列化器并不是用的 java 自带的序列化(java 的序列化太重量了),Kafka 有自己的序列化器,相对于 java 更加轻量好用。

(3)Partitioner 将数据进行分区操作,Kafka 有几种不同的分区策略,根据不同的分区策略将数据发送到不同的分区上,点击此处跳转至 Kafka 分区策略详解。

2、Producer 将数据经过以上三个阶段处理后,会将数据发送到一个双端队列 RecordAccumulator 中,这个双端队列中的过程是基于内存完成的(默认 32M),在 RecordAccumulator 中,会根据分区数量创建好对应的 DQuene(队列)数量(一个分区一个 DQuene),然后按照对应的分区策略将数据发送到对应的 DQuene 中,发送到 DQuene 中的数据每一批次(ProducerBatch)大小默认是 16 KB。

扩展:双端队列 RecordAccumulator 中存在一个内存池,Producer 将数据经过以上三个阶段处理后向 RecordAccumulator 中的 DQuene 发送数据时,会直接从内存池中获取内存资源,后续数据完成写入后将资源重新释放回内存池当中。

3、Sender 现成将 RecordAccumulator 缓冲区中的数据读取出来后,将数据发送到 Kafka 集群(Broker)。

(1)batch.size(ProducerBatch) 达到阈值,或 linger.ms 达到设置的时间(默认 0ms,也就是说来一条发送一条),Sender 线程就会去 RecordAccumulator 拉取数据。

  • 可以理解为大巴车,人满了就会发车,如果人没有坐满但是发车时间到了也要发车。
  • 如果 linger.ms 设置为 0,那么 batch.size 的参数无论设置为多少都没有作用了。

(2)Sender 拉取到数据后,按照节点地址(Broker)作为 key,Request(请求)作为 value,这里的 Request(请求)中并不只包含一条或一批数据,可能是多条数据或多批数据,将数据发送给 Broker。

(3)如果 Request1 拉取到 DQuene 中的数据后发送到 Broker1 中,如果 Broker1 没有及时的应答,那么允许发送第二个请求吗?答案是允许的,最多有五个请求都没有收到应答,那么就不再发送请求了。

(4)Selector 可以理解为一条高速公里,数据可以理解为高速公路上行驶的骑车;左侧的 RecordAccumulator 缓冲区看做输入流,右侧的 Broker 看做输出流。

4、数据发送到 Kafka 集群中(也就是 Broker 中),会有一个副本同步机制进行副本同步。

(1)Kafka 集群收到数据后会有一个应答机制(ack),表明是否收到数据,ack 的级别有一下几种。

  • 0:生产者发送过来的数据,不需要等数据落盘应答。
  • 1:生产者发送过来的数据,Leader 收到数据后应答。
  • -1/all:生产者发送过来的数据,Leader 和 ISR 队列(ISR 队列中包含 Leader 和所有跟得上 Leader 进行数据同步的 Follower 的集合)里面的所有节点收齐数据后应答。-1 和 all 等价。

(2)如果 Selector 接收到 ack 返回的成功信号,那么首先要把 NetWorkClient 中对应的的 Request1 清理掉,同时清理掉 RecordAccumulator 缓冲区内对应的分区数据;如果 Selector 接收到 ack 返回的失败信号,那么可以进行 retries(重试),retries 默认次数为 int 的最大值,一般情况下我们是进行重新配置的。

以上为 Produer 发送数据到 Kafka 集群的流程原理

1.2.生产者重要参数列表

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2)异步发送

2.1.普通异步发送

在这里插入图片描述

在企业中一般采用异步发送策略,异步发送是指,在外部数据经过 main 线程后进入到双端队列 RecordAccumulator 中后,不用等待数据写入 Kafka 集群,直接继续发送数据到 RecordAccumulator 中。

1、导入依赖

<dependencies>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>

2、代码编写

public class CustomProducer {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","count"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

2.2.带回调函数的异步发送

在这里插入图片描述

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

代码编写:

public class CustomProducerCallback {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 500; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "count" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

3)同步发送 API

只需在异步发送的基础上,再调用一下 get()方法即可。

代码编写:

public class CustomProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","count"+i)).get();
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

4)生产者分区

4.1.分区好处

(1)便于合理使用存储资源,每个 Partition 在一个 Broker 上存储,可以把海量的数据按照分区切割一块一块数据存储在多台 Broker 上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在这里插入图片描述

4.2.生产者发送消息的分区策略

Kafka 分区策略

4.2.1.默认的分区器 DefaultPartitioner

在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

在这里插入图片描述

1、将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", 1,"","hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

2、没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

public class CustomProducerCallback {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 500; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

4.2.2.自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器。

1、需求

例如我们实现一个分区器实现,发送过来的数据中如果包含 hello,就发往 0 号分区,不包含 hello,就发往 1 号分区。

2、实现步骤

(1)定义类实现 Partitioner 接口。

(2)重写 partition()方法。

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取数据 atguigu  hello
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("hello")){
            partition = 0;
        }else {
            partition = 1;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

(3)使用分区器的方法,在生产者的配置中添加分区器参数。

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 关联自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", 1,"","hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

5)生产经验

5.1.生产者如何提高吞吐量

在这里插入图片描述

  • batch.size:批次大小,默认 16k

  • linger.ms:等待时间,修改为 5-100ms

  • compression.type:压缩 snappy

  • RecordAccumulator:缓冲区大小,修改为 64m

代码编写:

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        // 1 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","count"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

5.2.数据可靠性(ack)

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

数据可靠性分析:

如果分区副本设置为 1 个,或者 ISR 里应答的最小副本数量( min.insync.replicas 默认为 1)设置为 1,和 ack = 1 的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。

所以数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

代码编写:

public class CustomProducerAcks {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // acks
        properties.put(ProducerConfig.ACKS_CONFIG,"all");

        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","hello"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

5.3.数据去重(一致性语义、幂等性、生产者事务)

1、数据传递语义

在这里插入图片描述

2、幂等性

PID:ProducerId,Kafka 每次重启都会生成一个新的 PID。所以幂等性不能保证多会话时数据不重复,只能保证单会话时数据不重复,一旦 Kafka 重启产生新的会话那么就会造成数据重复。通过幂等性判断出来的重复数据不会落盘,直接在内存中将数据清理掉。

Partition:分区号。证明多个分区也可以有想同的数据。

SeqNumber:序列化 number,单调递增。
在这里插入图片描述

如何使用幂等性:开启参数 enable.idempotence 默认为 true,false 关闭。

3、Kafka 事务

(1)Kafka 事务原理

开启事务,必须开启幂等性。

刚才上面说的幂等性不能保证 Kafka 重启时的数据重复问题,那么为了解决这一问题,衍生出了 Kafka 事务。

Transaction Coordinator(事务协调器):Kafka 事务的主要负责者。Kafka 事务开启后会存在一个存储事务信息的特殊主题(也就是说可以将事务信息持久化到磁盘中),这个主题默认有 50 个分区,每个分区负责一部分事务,事务的划分是根据 transactionnal.id(可以再代码中进行配置,唯一值)的 hashcode 值 % 50 计算出该事务属于哪个分区,该分区 Leader 副本所在的 Broker 节点即为这个 transactional.id 对应的 Transaction Coordinator 节点。

Kafka 事务流程:

① Producer 向协调器请求 PID(幂等性需要)。

② 返回 PID 给 Producer。

③ Producer 接收到 PID 后向 topic 发送消息。

④ Producer 同时向协调器发出 commit 请求

⑤ commit 请求持久化到存储事务信息的特殊主题。

⑥ 持久化成功后通知 Producer 已经持久化成功。

⑦ 协调器在后台向 topic 分区所在的节点发送一个 commit 请求,用来验证数据是否已经写入到对应的节点中。

⑧ topic 分区所在的节点返回数据成功写入的信息。

⑨ 将事务成功信息持久化到存储事务信息的特殊主题。

在这里插入图片描述

(2)Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

(3)单个 Producer,使用事务保证消息的仅一次发送

public class CustomProducerTranactions {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置事务 id(必须),事务 id 任意起名
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
		// 初始化事务
        kafkaProducer.initTransactions();
		// 开启事务
        kafkaProducer.beginTransaction();

        try {
            // 2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
            }

            int i = 1 / 0;
			// 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
        	// 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
}

5.4.数据有序

单分区内数据有序

多分区、分区与分区间数据无需。

在这里插入图片描述

5.5.数据乱序

1、Kafka 在 1.x 版本之前保证数据单分区有序,条件如下(Request 每次只发送一个):

max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。

2、kafka在1.x及以后版本保证数据单分区有序,条件如下:

(1)未开启幂等性

max.in.flight.requests.per.connection需要设置为 1。

(2)开启幂等性

max.in.flight.requests.per.connection 需要设置小于等于 5。如果配置为大于 5 就不一定能保证了,因为 Sender 中最多只保证 5 个 Request 的排序(最多缓存 5 个请求)。

原因说明:因为在 Kafka1.x 以后,启用幂等后,Kafka 服务端会缓存 Producer 发来的最近 5 个 Request 的元数据,故无论如何,都可以保证最近 5 个 Request 的数据都是有序的。

在缓存中根据幂等性中的序列号进行排序后进行落盘。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/317066.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ROS无人机开发常见错误

飞控部分 一、解锁时飞控不闪红灯&#xff0c;无任何反应&#xff0c;地面站也无报错 解决办法&#xff1a; 打开地面站的遥控器一栏 首先检查右下角Channel Monitor是否有识别出遥控各通道的值&#xff0c;如果没有&#xff0c;检查遥控器是否打开&#xff0c;遥控器和接收…

【Python】Python语言 3小时速通(有C语言基础版)

python从入门到实践 变量 message"hello world"并不需要指出变量类型 方法 tittle()#以首字母大写的形式输出单词upper()#全部大写输出lower()#全部小写输出存储数据时经常使用lower&#xff0c;因为无法确保数据是大写还是小写 rstrip()#输出删除字符串尾部多余…

Linux系统中使用ln命令创建软连接

大家应该和我一样&#xff0c;第一次听到软连接这个词时感觉好高级啊&#xff0c;但其实也就那么回事&#xff0c;你完全可以将他类比为Windows系统中的快捷方式。 链接只是一个指向&#xff0c;并不是物理移动&#xff0c;类似Windows系统的快捷方式 1.功能和语法 功能&…

C#进阶学习

目录 简单数据结构类ArrayList声明增删查改遍历装箱拆箱 Stack声明增取查改遍历装箱拆箱 Queue声明增取查改遍历 Hashtable声明增删查改遍历装箱拆箱 泛型泛型分类泛型的作用泛型约束 常用泛型数据结构类List声明增删查改遍历 Dictionary声明增删查改遍历 LinkedList声明增删查…

Seata TM管理分支事务

TM相当于一个中间商&#xff0c;是没有涉及到任何数据库底层操作的。 TransactionalTemplate 1、TM向TC端发起一次开启全局事务的请求 io.seata.tm.api.TransactionalTemplate#beginTransaction --> io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.Strin…

APM链路监控: Linux 部署 pinpoint

目录 一、实验 1.环境 2. 准备 3.HBase单机部署 4.pinpoint部署 二、问题 1.pinpoint有哪些功能 2.pinpoint架构是如何组成的 3.Linux中自带的jdk 如何设置JAVA_HOME 4. hbase启动报错 5.hbase的master启动失败 6.JPS命令如何安装和使用 一、实验 1.环境 &#x…

Centos7.9服务器编译安装Nginx1.24.0和php8.3

Centos7.9服务器编译安装Nginx1.24.0和php8.3 服务器nginx原版本有安全漏洞,需要升级,由于原始是yum源安装,通过yum直接升级,无法正常升级完成,故而需要卸载yum源,重新编译安装。 1、查看原来nginx版本,ps查看原来nginx进程,运行状态: ps aux | grep nginx ​ root …

JAVAEE——request对象(三)

1. request对象 1.1 知识点 &#xff08;1&#xff09;乱码问题的两种解决方式 &#xff08;2&#xff09;post和get提交的区别 &#xff08;3&#xff09;request接收同名参数的问题 1.2 具体内容 使用request接收参数 <%page contentType"text/html; charsetut…

一个命令查看linux系统是Centos还是Ubuntu

目 录 一、 背景介绍 二、一个命令查看linux系统的简单方法 1、 uname -a 2、cat /etc/issue 3、lsb_release -a 4、 dmesg | grep Ubuntu 一、 背景介绍 Linux 系统基本上分为两大类&#xff1a; 1. Red Hat 系列&#xff1a;包括 Red Ha…

ipv6(centos布置-亲自操刀)

这一篇本来不太想写&#xff0c;但想想&#xff0c;不写出来后面又忘记了&#xff0c;就写写看了&#xff0c; 切记&#xff0c;大家看完别去用来做别的事情哈 正文来了&#xff1a; 开始就去注册一个Ipv6隧道服务吧 在隧道信息的下方有Example IPv6 Tunnel Configurations …

openEuler安装Docker艰辛路程

文章目录 安装docker测试docker关于windows docker拉取镜像查看所有镜像删除镜像删除不在运行的进程强制删除正在运行的进程 启动docker容器服务-d测试 停止docker容器服务查看docker启动进程更新容器(没有自启动功能&#xff0c;更新为自启动)docker端口映射进入容器修改内容退…

python + selenium 初步实现数据驱动

如果在进行自动化测试的时候将测试数据写在代码中&#xff0c;若测试数据有变&#xff0c;不利于数据的修改和维护。但可以尝试通过将测试数据放到excel文档中来实现测试数据的管理。 示例&#xff1a;本次涉及的项目使用的12306 selenium 重构------三层架构 excel文件数据如…

单机物理机部署Datax

一、概述 DataX 是阿里巴巴开源的一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 源码地址&#xff1a;https://github.com/alibaba/DataX 为了解决异构数据…

Portalgraph VR空间投影仪:可以将VR空间投射到任意平面上的新型VR投影技术

通过一项创新的科技突破&#xff0c;Portalgraph VR空间投影仪成功地在现实与虚拟空间之间搭建起了一座神奇的“时空传送门”。这投影一技术不仅打破了传统虚拟现实设备的局限&#xff0c;更让人们无需佩戴任何头戴显示器&#xff0c;仅凭裸眼就能在任何平面上看到虚拟现实空间…

RibbonGroup添加QAction

实际项目中&#xff0c;group中需要添加按钮与点击事件&#xff1a; 添加实例如下&#xff1a; if (Qtitan::RibbonGroup* groupClipboard pageHome->addGroup(tr("Clipboard"))) { //右下角按钮显示 groupClipboard->setO…

高效办公:在文件夹名称左边插入关键字,提高文件管理效率

在繁忙的工作环境中&#xff0c;经常要处理大量的文件和文件夹。有效的文件管理是一个挑战&#xff0c;大量的文件和文件夹难以找到所需的资料。下面一起来看云炫文件管理器如何在文件夹名称左边批量插入关键字。 文件夹名称左边添加关键字前后对比图。 文件夹名称左边批量插…

电脑扩容升级硬盘选1T还是2T

SSD固态有必要升级2TB吗&#xff1f;----------吴中函 某大二学生用的一台笔记本电脑&#xff0c;512GB的硬盘空间已经严重不够用了&#xff0c;想给笔记本扩容升级一下硬盘&#xff1b; 这位学生是学设计专业的、平时也喜欢摄影、电脑里面也装了一些游戏&#xff0c;经常整理、…

OFBiz RCE漏洞复现(CVE-2023-51467)

漏洞名称 Apache OFBiz 鉴权绕过导致命令执行 漏洞描述 Apache OFBiz是一个非常著名的电子商务平台&#xff0c;是一个非常著名的开源项目&#xff0c;提供了创建基于最新J2EE/XML规范和技术标准&#xff0c;构建大中型企业级、跨平台、跨数据库、跨应用服务器的多层、分布式…

【java八股文】之MYSQL基础篇

1、数据库三大范式是什么 第一范式&#xff1a;每个列都不可以再拆分。 第二范式&#xff1a;在第一范式的基础上&#xff0c;非主键列完全依赖于主键&#xff0c;而不能是依赖于主键的一部分。 第三范式&#xff1a;在第二范式的基础上&#xff0c;非主键列只依赖于主键&#…

数模学习day13-典型相关分析

典型相关分析&#xff08;Canonical Correlation analysis&#xff09; 研究两组变量&#xff08;每组变量中都可能有多个指标&#xff09;之间相关关系的一种多元统计方法。它能够揭示出两组变量之间的内在联系。 注&#xff1a;本文源于数学建模学习交流相关公众号观…
最新文章