Kafka(二)【文件存储机制 生产者】

目录

一、Kafka 文件存储机制

二、Kafka 生产者

1、生产者消息发送流程

1.1、发送原理

2、异步发送 API

2.1、普通异步发送

案例演示

2.2、带回调函数的异步发送

2.3、同步发送 API

3、生产者分区

3.1、分区的好处

3.2、生产者发送消息的分区策略

(1)默认的分区器 DefaultPartitioner

3.3、自定义分区器 

1)需求

2)实现步骤

4、生产经验

4.1、生产者如何提高吞吐量

4.2、数据可靠性

4.3、数据去重

4.3.1、数据传递语义

4.3.2、幂等性

4.3.3、生产者事务

4.4、数据有序

4.5、数据乱序


一、Kafka 文件存储机制

        Kafka中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。

        topic 只是逻辑上的概念,而 partition 是物理上的概念。每个partition对应于一个log文件,该 log 文件中存储的就是 producer 生产的数据。Producer生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset 。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

        由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个 partition 分为多个 segment 。每个 segment 对应两个文件 ——“.index”文件和“.log”文件(一个存储当前文件的索引范围,一个存储真正的数据)。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,like 这个 topic 有三个分区,则其对应的文件夹为 like-0,like-1,like-2:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

这种存储规则很像我们 Spark Shuffle 阶段产生的文件的存储规则,顺便回忆一下 Spark 的 Shuffle 阶段:

        Shuffle 过程中每个 Map 任务会产生两个文件,即数据文件和索引文件。其中,数据文件是存储当前 Map 任务的输出结果,而索引文件中则存储了数据文件中的数据的分区信息。下一个阶段的 Reduce 任务就是根据索引文件来获取属于自己处理的那个分区的数据。

        其实 Spark 常见的有两种 Shuffle 策略:HashShuffle 和 SortShuffle ,但是这属于 Spark 调优的内容了,现在不必深究。

二、Kafka 生产者

1、生产者消息发送流程

        我们知道,Kafka 的三大组成部分:Producer、Broker、Consumer。接下来我们要学习的部分就是 Kafka 的生产者是如何把数据发送给 Kafka 集群(Broker)的。

1.1、发送原理

        在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

main 线程:

  • 拦截器是可选的(一般用 flume 的拦截器,配置更简单)
  • 序列化器(用的是Kafka自己的序列化器,而不是Java的Serializable,因为Java的序列化器太繁重)
  • 分区器(决定数据往哪个分区存储)
  • RecordAccumulator 默认 32MB(内存),分区器会把数据发送到这里,一个分区会创建一个队列,方便数据管理(都是在内存中完成的),队列中每份数据的大小(batch.size)默认是16K

sender线程:

  • 只有数据积累到 batch.size 之后,sender 才会发送数据,默认16K
  • 如果队列中的一份数据迟迟满不了(达不到 batch.size),sender 会等待 linger.ms 后发送这份数据,默认是 0ms
  • NetworkClient:我们发送数据的请求会被放到一个队列(InFlightRequests 队列,通过源码可以看到其底层数据结构Map<String, Deque<ClientRequest>> 以 broker 为 key,队列中放的是一个个发送请求)中去,当第一个请求发出后未得到响应仍然会继续发送第二个请求,但是这个队列的大小默认是 5,也就是说,如果五个请求都发出去后且都没有得到响应,那么就不可以再发出请求,因为默认每个 broker 最多缓存5个请求。
  • Sender线程通过Selector把上面的请求和数据一起发送到Kafka集群,当数据发送到Kafka集群后,Kafka集群会进行副本的复制并返回对应的acks信息。acks有三种应答级别,分别是ACK=0、ACK=1和ACK=all或ACK=-1(默认级别)。

    ACK=0:生产者在消息发送后不会等待来自服务器的任何确认。这意味着生产者无法知道消息是否成功存储在Kafka集群中。这个级别提供了最高的吞吐量,但在可靠性方面是最低的,因为可能会丢失消息。

    ACK=1:生产者会等待直到消息的领导者副本(Leader Replica)确认接收到消息。一旦领导者副本存储了消息,生产者会收到一个确认。这个级别在性能和数据可靠性之间提供了一个平衡。但如果领导者副本在确认后发生故障,而消息还未复制到追随者副本(Follower Replicas),则消息可能会丢失。

    ACK=all或ACK=-1(默认级别):生产者会等待消息被所有的同步副本(ISR,In-Sync Replicas)确认。这意味着只有当所有的同步副本都已经接收并存储了消息,生产者才会收到一个确认。这个级别提供了最高的数据可靠性,但可能会牺牲一些性能,因为需要等待所有副本的确认。

  • 如果数据发送成功,就把 InFlighRequests 队列中的请求缓存给清除掉,并且把对应 RecordAccuulator 中的数据清除掉。

  • 如果数据发送失败,会进行重试,重试的次数默认是(Intege.MAX_VALUE),也就是死磕到底,直到发送成功为止。

参数名称

描述

bootstrap.servers

生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。

key.serializer和value.serializer

指定发送消息的key和value的序列化类型。一定要写全类名。

buffer.memory

RecordAccumulator缓冲区总大小,默认32m。

batch.size

缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。

acks

0:生产者发送过来的数据,不需要等数据落盘应答。

1:生产者发送过来的数据,Leader收到数据后应答。

-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

max.in.flight.requests.per.connection

允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。

retries

当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647

如果设置了重试,还想保证消息的有序性,需要设置

MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

retry.backoff.ms

两次重试之间的时间间隔,默认是100ms。

enable.idempotence

是否开启幂等性,默认true,开启幂等性。

compression.type

生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。

支持压缩类型:none、gzip、snappylz4zstd。

2、异步发送 API

send 方法有两种传参方式,一种有回调函数,一种不带回调函数。 

2.1、普通异步发送

        所谓的异步发送指的是外部的数据使用异步的方式吧数据发送到 RecordAccumulator 的内存队列当中去,异步的体现就是外部数据发送到队列中后,并不会等待 RecordAccumulator 把数据传给 Broker 节点并返回成功的消息才继续发送;而是直接把数据扔到 RecordAccumulator 的内存队列后就撒手不管了。

案例演示

导入依赖:

<dependencies>
     <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.0.0</version>
     </dependency>
</dependencies>

注意:使用 kafka 前必须启动 zookeeper,不然报错无法使用。

public class CustomProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的 key 和 value 的序列化类型 key.serialize
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
        // 这两个是等价的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like","test"+i));
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

我们可以看到,一条消息就是一个 ProducerRecord 对象,不管你的消息多么短,哪怕是一个标点,它也会被包装进一个对象里面去。 

运行结果: 

        我们发现,我们的编程步骤和我们上面Kafka的发送原理中生产者的发送步骤是一致的,只不过我们这里没有设置拦截器和分区器,这是一个最简单的 Kafka 生产者程序了。

2.2、带回调函数的异步发送

现在我们来使用带回调函数的异步发送:

只需要修改send方法即可:

    for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", "test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
        }

运行结果: 

这种带回调函数的方法可以让我们知道数据更多的元数据信息(比如主题、分区...)。

2.3、同步发送 API

所谓的同步发送,就是外部数据发送到 RecordAccumulator 的内存队列后,必须等待数据被 Selector 发送到 Broker 并返回响应信息才能继续发送。代码的实现很简单,只需要在send方法之后加一个 get() 即可:

for (int i = 0; i < 5; i++) {
      kafkaProducer.send(new ProducerRecord<>("like","test"+i)).get();
}

3、生产者分区

3.1、分区的好处

分区的好处我们太清楚了,之前的 Hadoop、Spark、Flink 都有分区的概念,在 Shuffle 的时候、在 keyBy 的时候,分区的好处显然可以增加并行度,提高我们数据的处理效率;可以负载均衡,不会出现服务器涝的涝死,旱的旱死。

所以,从大数据的存储和计算的角度来看,分区有这么两种好处:

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,合理控制分区的任务,可以实现负载均衡。
  2. 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。

3.2、生产者发送消息的分区策略

(1)默认的分区器 DefaultPartitioner

我们可以看到,在初始化 ProducerRecord 时,有 6 种初始化的传参方式:

接下来我们就利用上面的回调函数可以返回数据的元数据信息,来测试一下是不是像它说的这样:

1、指定分区: 

for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", 0,"","test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
        }

运行结果:

topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0

面试题:如何把一张 MySQL 表的数据都放到一个 Kafka 的分区当中去?

答:生产者在发送数据时指定数据的分区为 表名 。

2、不指定分区,只指定 key:

kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });

 我们指定 key 为 i (i 的值为0,1,2,3,4),运行结果:

topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1

3,不指定 partition 也不指定 key。这种方式其实我们上面在学异步发送的时候已经演示过了,它默认会等这一批都满了(16K)或者达到 linger.ms(默认0ms)才会发送。

我们这里在循环的时候,让线程睡眠 2ms,这样就不会因为默认的 linger.ms 太短立即发送,导致数据量太小,发送太快看不出来粘性分区的特点:

for (int i = 0; i < 25; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
            Thread.sleep(2);
        }

运行结果: 

topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 1

我们可以看到,粘性分区是这样的:它每次的分区和上一次的分区是不一样的,并且每次的数据尽可能会放到一个分区去。 

3.3、自定义分区器 

在工作当中,一些特殊场景我们现有的分区策略是无法实现的,这就需要我们自定义来实现分区器了。

1)需求

        实现一个分区器,发送过来的数据如果包含 "大傻春" 就发往 0 号分区,否则发往 1 号分区。

2)实现步骤

1. 实现 Partitioner 接口

2. 重写 partition() 方法

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String val = value.toString();
        if (val.contains("大傻春"))
            return 0;
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

使用自定义分区器:

// 关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 发送数据
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record;
    if (i==3)
        record = new ProducerRecord<>("like", "" + i, "大傻春");
    else
        record = new ProducerRecord<>("like", ""+i,"test" + i);

kafkaProducer.send(record, (RecordMetadata recordMetadata, Exception e)-> {
    if (e == null){ // 如果异常为空 说明正常执行
           System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());
       }
    });
}

4、生产经验

4.1、生产者如何提高吞吐量

        我们外部的数据是放到 RecordAccumulator 的内存队列当中等待发送的,队列中每份数据的大小(batch.size)默认是16K,但我们知道,如果数据迟迟不能达到 batch.size 的话,会根据默认的配置 linger.ms (默认是 0ms)来发送。但事实上,0ms就意味着不需要数据达到 batch.size 也就是 batch.size 这一配置形同虚设不起作用,也就是只要它察觉到有数据就立即发送,这样的效率其实并不高。

        为了提高我们的吞吐量,我们当然需要调整这两个配置的参数大小:

  • batch.size :内存队列中每个批次的大小,默认 16K
  • linger.ms:等待时间,修改为 5-100ms

        当然这个参数的值我们需要慎重考虑,就像我们 Flink 当中水位线允许迟到的时间一样,不能说为了保证数据的迟到率最低,就把等待时间设置为几秒,那样 Flink 辛辛苦苦实现的毫秒级延迟有啥用呢。

  • compression.type:压缩 snappy
  • RecordAccumulator:缓冲区大小,修改为 64 MB
public class CustomProducerParameters {
    public static void main(String[] args) {

        // 0. 配置信息
        Properties properties = new Properties();
        // 连接 kafka
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        // key 和 value 的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // batch.size 单位: KB
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //默认none,可配置值gzip、snappy、lz4和zstd
        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432*2); // 修改为64MB

        // 1. 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like","value"+i));
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

4.2、数据可靠性

        数据的可靠性,指的其实就是当 Selector 把数据发送到 Broker 之后,是否等待响应(ack)之后再发送数据。

我们知道,ack 的值有三种:

  1. ack = 0:发送完直接走,不用等响应,可靠性差,但是效率高。(一般不用)
  2. ack = 1:发送完需要等待 leader 节点响应才能继续发送下一个数据,可靠性中等,效率中等。
  3. ack = -1:发送完需要等待所有节点(leader 和 其他 fllower)响应才能继续发送下一个数据。可靠性高,效率低。

 

        在生产环境中,acks=0 很少用;acks=1:一般用于传输普通日志,允许丢个别数据;acks=-1:一般用于传输和钱有关的数据,用于对可靠性的要求比较高的场景。

完全可靠性在这种情况下尽管概率特别低,但是仍然不能排除,我们会在下面的数据去重去学习。

代码中配置 ACK 级别: 

// acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);

4.3、数据去重

4.3.1、数据传递语义

就像我们 Flink 中的时间语义,Kafka 生产者也有它的数据传递语义:

  • 至少一次(At least once)= ACK级别设置为1 + 分区副本大于等于2 + ISR里应答的最小副本数大于等于2
  • 最多一次(At most once)= ACK级别设置为0
  • 总结:
    • Al least once 可以保证数据不丢失,但是不能保证数据不重复
    • At most once 可以保证数据不重复,但是不能保证数据不丢失
  • 精确一次(Exactly once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不能丢失

Kafka 0.11 版本后,引入了一个重要的特性:幂等性和事务

4.3.2、幂等性
  • 幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。
  • 精确一次(Exactly once)= 幂等性 + 至少一次(ack = -1 + 分区副本数 >= 2 + ISR 最小副本数 >= 2)

        重复数据的判断标准就是数据的三个属性必须唯一(PID、Partition、SeqNumber),其中 PID 是 Kafka 每次启动自动生成的,Partition是分区号,SeqNumber 是一个单调递增的数。

        幂等性只能保证数据在单分区内不会重复

配置幂等性

enable.idempotence 默认就是 true,flase 关闭。

4.3.3、生产者事务

我们知道,幂等性只能保证数据在单分区内不会重复,但是还是不能保证绝对的唯一,比如 Kafka 挂掉了需要重启,那么重启之后之前数据的 PID 就失效了,所以当有重复的数据时,并不能识别出来。这就需要事务的方式来解决了。

说明:开启事务必须开启幂等性

其实,Kafka 实现精确一次的保证机制和我们 Flink 在保证端到端一致性时输出端的保证方式是很相似的(幂等写入和事务写入)

Kafka 的事务一共如下 5 个 API:

// 1初始化事务
void initTransactions();

// 2开启事务
void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 4提交事务
void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

测试:

public class CustomProducerTransaction {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的 key 和 value 的序列化类型 key.serialize
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
        // 这两个是等价的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01");

        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();

        kafkaProducer.beginTransaction();

        try {
            // 2. 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("like","test"+i));
            }
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            kafkaProducer.abortTransaction();   // 终止事务
        }finally {
            // 3. 关闭资源
            kafkaProducer.close();
        }
    }
}

注意:一定要记得手动指定事务id(保证唯一)。我们要把发送数据的代码写进 try-catch 中,如果有异常那么久终止事务。

4.4、数据有序

        数据有序的保证一直是流处理领域的一个问题,就像我们的 Flink 通过 水位线和Barrier 对齐算法保证数据容错和有序性。我们 Kafka 中也是一样的,单分区的话我们数据当然是有序的,但因为是多分区,所以分区和分区间的数据不能保证哪个数据先被读取,所以说分区间数据的顺序是无序的。

        至于多分区要做到有序,可以把每个分区的数据在消费者这里进行排序,但是这样的效率不是很高,所以我们 Kafka 一般都是只保证单分区有序,个人认为,Kafka只要做到数据的尽量有序就可以了,反正Kafka的数据到下游传递时,一般也都是并行读取,比如 Flink 读取 Kafka 的数据就支持多个 Sink 算子,所以数据到了 Flink 的多个算子链里会出现乱序。

        单分区内数据有序,但是不一定发送过去仍然是有序的,这就需要给它增加一些条件了,也就是接下来要学习的用幂等性解决数据乱序。

4.5、数据乱序

Kafka 1.x 版本之后保证数据单分区有序的条件:

  • 未开启幂等性
    • max.in.flight.requests.per.connection 设置为 1(相当于 NetworkClient 中只能存在一个请求,那数据肯定是有序的,都不要使用幂等性)
  • 开启幂等性
    • max.in.flight.requests.per.connection 需要设置为 <= 5(启用幂等性之后,Kafka 服务端会缓存最近的  5 个request元数据,所以无论如何,都可以保证最近的5个request的数据都是有序的。而且这个配置的值不能>5,因为Kafka服务端最多缓存5个请求)

解释

        为什么一定可以保证单分区内数据有序呢,因为幂等性就是(ProducerID,Partition,SeqNumber),其中 SeqNumber 要保证单调递增,对应我们的request,如果生产者的 RecoedAccumulator 中某个broker对应的请求队列中 request1和 request2 发送成功,但是request3 却失败了,那么当然会去重试,但是此时request4和request5也发送过去了怎么办。因为我们开启了幂等性,所以当 request1和request2 发送到Kafka的服务端(broker)之后,因为它们是不重复且有序的,所以会立即落盘,但是我们的 request3 由于发送失败,此时 request4和request5又发送过来了,但是由于 request4 和 request5 并不满足幂等性要求,所以不会落盘,而是留在内存当中,所以只有当 request3 再次到来之后,满足幂等性并落盘之后,request4和request5才能落盘。        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/334390.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高光谱分类论文解读分享之HybridSN:基于 3-D–2-D CNN 的高光谱分类(经典回顾)

IEEE GRSL 2019&#xff1a;HybridSN&#xff1a;基于 3-D–2-D CNN 的高光谱分类 题目 HybridSN: Exploring 3-D–2-D CNN Feature Hierarchy for Hyperspectral Image Classification 作者 Swalpa Kumar Roy, Student Member, IEEE, Gopal Krishna, Shiv Ram Dubey , Mem…

逸学Docker【java工程师基础】3.4Docker安装redis

1.拉取redis docker pull redis 2.选择一个合适的redis 版本的配置文件 Redis configuration | Redis 或者这个 链接&#xff1a;https://pan.baidu.com/s/1RRdtgec4xBAgQghlhm0x1Q 提取码&#xff1a;ycyc 在1044行修改密码 3.提前在服务器建立 /data/redis 文件夹&…

ConcurrentHashMap 原理

ConcurrentHashMap ConcurrentHashMap的整体架构ConcurrentHashMap的基本功能ConcurrentHashMap在性能方面的优化 concurrentHashMap&#xff1a; ConcurrentHashMap的整体架构 concurrentHashMap是由数组链表红黑树组成 当我们初始化一个ConcurrentHashMap实例时&#xff0c…

一个简单的Web程序(详解创建一个Flask项目后自带的一个简单的Web程序)

程序代码截图如下&#xff1a; 1.应用初始化 在创建 Flask 程序时&#xff0c;通常需要先创建一个应用实例进行应用初始化。 from flask import Flask # 应用的初始化 app Flask(__name__) 上述代码中&#xff0c;使用 Flask 类创建了一个应用实例 app。 __name__ 参数用…

【C++】std::string 转换成非const类型 char* 的三种方法记录

std::string 有两个方法&#xff1a;data() 和 c_str()&#xff0c;都是返回该字符串的const char类型&#xff0c;那如何转换成非const的char呢&#xff1f; 下面展示三种方法&#xff1a; 强转&#xff1a;char* char_test (char*)test.c_str();使用string的地址&#xff…

利用浏览器开发者工具进行网页性能优化

目录 学习目标&#xff1a; 学习内容&#xff1a; 学习时间&#xff1a; 学习产出&#xff1a; 网页性能优化的基本概念和指标&#xff1a; 浏览器开发者工具的基本功能和使用方法&#xff1a; 使用网络面板进行网页加载性能分析&#xff1a; 使用性能面板进行网页渲染性能分析…

目标检测难题 | 小目标检测策略汇总

大家好&#xff0c;在计算机视觉中&#xff0c;检测小目标是最有挑战的问题之一&#xff0c;本文给出了一些有效的策略。 从无人机上看到的小目标 为了提高模型在小目标上的性能&#xff0c;本文推荐以下技术&#xff1a; 提高图像采集的分辨率 增加模型的输入分辨率 tile你…

基于SpringBoot Vue自习室管理系统

大家好✌&#xff01;我是Dwzun。很高兴你能来阅读我&#xff0c;我会陆续更新Java后端、前端、数据库、项目案例等相关知识点总结&#xff0c;还为大家分享优质的实战项目&#xff0c;本人在Java项目开发领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目&#x…

【Linux系统编程】程序地址空间

进程地址空间 进程地址空间是指每个进程在计算机内存中所占用的地址空间。地址空间是指能被访问的内存地址范围&#xff0c;它由若干个连续的内存块组成。每个进程都有自己的地址空间&#xff0c;这意味着每个进程都有自己的内存地址范围&#xff0c;不会与其他进程冲突。进程地…

Nightingale 夜莺监控系统 - 自愈篇(4)

Author&#xff1a;rab 官方文档&#xff1a;https://flashcat.cloud/docs/content/flashcat-monitor/nightingale/install/ibex/ 目录 前言一、部署1.1 MySQL1.2 Ibex Server1.3 Ibex Agent 二、验证2.1 n9e 上配置告警自愈2.2 创建自愈脚本2.3 告警规则配置回调地址2.4 模拟…

高效批量剪辑技巧:一键按指定时长精准分割视频的方法,轻松制作视频

随着社交媒体和数字内容的快速发展&#xff0c;视频制作的需求也日益增长。在制作视频时&#xff0c;我们经常需要将长视频分割成多个片段&#xff0c;或者将多个片段连接在一起。为了提高效率&#xff0c;我们可以使用一些高效的批量剪辑技巧&#xff0c;一键按指定时长精准分…

旅游项目day07

目的地攻略展示 根据目的地和主题查询攻略 攻略条件查询 攻略排行分析 推荐排行榜&#xff1a;点赞数收藏数 取前十名 热门排行榜&#xff1a;评论数浏览数 取前十名 浏览数跟评论数差距过大&#xff0c;可设置不同权重&#xff0c;例如&#xff1a;将浏览数权重设置为0.3…

市场监管总局发布区块链和分布式记账技术6项标准,中创积极推动区块链产业发展!

近日&#xff0c;市场监管总局&#xff08;国家标准委&#xff09;批准发布一批重要国家标准&#xff0c;涉及生产生活、绿色可持续等多个领域&#xff0c;这些标准将在引领产业发展、促进绿色转型、助力对外贸易、推动城乡建设、提升生活品质等方面发挥重要作用。 其中一项标…

【小笔记】算法训练基础超参数调优思路

【学而不思则罔&#xff0c;思维不学则怠】 本文总结一下常见的一些算法训练超参数调优思路&#xff08;陆续总结更新&#xff09;&#xff0c;包括&#xff1a; batchsize学习率epochsdropout&#xff08;待添加&#xff09; Batch_size 2023.9.29 简单来说&#xff0c;较…

Qt/C++自定义界面大全/20套精美皮肤/26套精美UI界面/一键换肤/自定义颜色/各种导航界面

一、前言 这个系列对应自定义控件大全&#xff0c;一个专注于控件的编写&#xff0c;一个专注于UI界面的编写&#xff0c;程序员有两大软肋&#xff0c;一个是忌讳别人说自己的程序很烂很多bug&#xff0c;一个就是不擅长UI&#xff0c;基本上配色就直接rgb&#xff0c;对于第…

J4 - ResNet与DenseNet结合

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制 目录 环境模型设计模型效果展示总结与心得体会 环境 系统: Linux语言: Python3.8.10深度学习框架: Pytorch2.0.0cu118显卡&#xff1a;GT…

基于springboot+vue的高校心理教育辅导系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目背景…

SpringCloud源码系列之(Ribbon、Hystrix超时正确配置)

这块的代码debug了一个礼拜&#xff0c;一开始看Fegin创建源码的时候没注意到&#xff0c;是基于RxJava的响应式编程。分析栈桢的时候&#xff0c;有很多异步栈桢。由于只是想搞清楚如下配置的生效时机、以及失效情况&#xff0c;期间还看了一堆与此无关的源码&#xff0c;真的…

新能源智慧充电桩方案:AI视频分析技术如何助力充电桩智能监管?

随着AI人工智能、大数据、云计算等技术快速发展与落地&#xff0c;视频智能分析技术在智慧充电桩场景中的应用也越来越广泛。这种技术能够为充电桩站点提供全方位的监控和管理&#xff0c;提高运营效率&#xff0c;保障充电桩设备的安全和稳定运行。 通过TSINGSEE青犀&触角…

Spring重要知识点

一、Spring中相关概念 1.IOC 控制反转 IoC&#xff08;Inverse of Control:控制反转&#xff09;是⼀种设计思想&#xff0c;就是将原本在程序中⼿动创建对象的控制权&#xff0c;交由Spring框架来管理。IoC 在其他语⾔中也有应⽤&#xff0c;并⾮ Spring 所独有。 IoC 容器…
最新文章