首页 > 编程学习 > Kafka消费者的线程安全问题和多线程实现

一、消费者的线程安全问题

与线程安全的 KafkaProducer 不同,KafkaConsumer 是非线程安全的

KafkaConsumer的每个公用方法在执行操作前都会调用 acquire() 方法,该方法用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出 ConcurrentModifcationException 异常

acquire() 方法的实现如下:

private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        refcount.incrementAndGet();
}

可以看到实质就是通过CAS的方式来获取当前 KafkaConsumer 的使用权,如果获取不到则抛异常

当执行的线程执行完毕后,就会调用 release() 方法来释放 KafkaConsumer 的使用权:

    private void release() {
        if (refcount.decrementAndGet() == 0)
            currentThread.set(NO_CURRENT_THREAD);
    }

KafkaConsumer 非线程安全不意味着在消费消息的时候只能以单线程的方式执行,如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费造成延迟。因此,可以采用多线程的方式来提高消费者的整体消费能力。

二、线程封闭

实现消费者多线程最常见的方式:线程封闭——即为每个线程实例化一个 KafkaConsumer对象

使用该方式实现,一般所有的消费线程都属于同一个消费者组,一个消费线程可以消费一个或多个分区中的消息,因此并发数也受限于分区的实际个数。(如果消费线程的个数大于分区数,就有部分消费线程一直处于空闲状态)

这种实现方式的好处是每个线程可以按顺序消费各个分区中的消息;缺点是每个消费线程都要维护一个独立的TCP连接,造成额外的系统开销

三、多线程处理消息

消费者吞吐量的瓶颈实际是在处理消息的效率上, 因此为了解决上面的问题,可以使用这样的线程模型:消费者线程专门用来接收消息,接收到消息后采用多线程的方式(线程池)来处理消息。(实际就是 Netty 中的 Reactor 模型)

这种方式解决了上述的系统开销问题,缺点是对于消息的顺序处理比较困难,需要做额外的开发来保障

此外,如果需要手动提交,该种方式的实现也更加困难,有可能会有数据丢失的风险

Copyright © 2010-2022 mfbz.cn 版权所有 |关于我们| 联系方式|豫ICP备15888888号