Kafka-多线程消费及分区设置

目录

  • 一、Kafka是什么?
    • 消息系统:Publish/subscribe(发布/订阅者)模式
    • 相关术语
  • 二、初步使用
    • 1.yml文件配置
    • 2.生产者类
    • 3.消费者类
    • 4.发送消息
  • 三、减少分区数量
    • 1.停止业务服务进程
    • 2.停止kafka服务进程
    • 3.重新启动kafka服务
    • 4.重新启动业务服务
  • 参考文章

一、Kafka是什么?

Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统。可满足每秒百万级的消息生产和消费;有一套完善的消息存储机制,确保数据高效安全且持久化;Kafka作为一个集群运行在一个或多个服务器上,可以跨多个机房,当某台故障时,生产者和消费者转而使用其他的Kafka。

消息系统:Publish/subscribe(发布/订阅者)模式

1.消息发布者发布消息到主题中,有多个订阅者消费该消息。
2.当发布者发布消息时,不管是否有订阅者都不会报错。
3.一定要先有消息发布者,后有消息订阅者。

相关术语

1.Broker:Kafka服务器,负责创建topic、消息存储和转发。
2.Topic:消息类别(主题),用于区分消息。
3.Partition:分区,真正的存储数据单元。每个Topic包含一个或多个分区,用于保存消息和维护偏移量。(一般为kafka节点数CPU的总核心数量)
4.offset:分区消息此时被消费的位置。分区中消息的唯一id。
5.Producer:消息生产者。
6.Consumer:消息消费者。
7.Consumer Group:消费者组。由消费不同的分区的多个消费者实例组成,共用同一个Group-id。
8.Message:消息,由offset(分区上的消息id)、MessageSize(消息内容data大小)、data(消息具体内容)组成。

二、初步使用

1.yml文件配置

spring:
	kafka:
	    bootstrap-servers: http://127.0.0.1:9002
	    properties:
	      security:
	        protocol: SASL_PLAINTEXT
	      sasl:
	        mechanism: PLAIN
	        jaas:
	          config: org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456";
	    producer:
	      # 发生错误后,消息重发的次数。
	      retries: 0
	      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
	      batch-size: 16384
	      # 设置生产者内存缓冲区的大小。
	      buffer-memory: 33554432
	      # 键的序列化方式
	      key-serializer: org.apache.kafka.common.serialization.StringSerializer
	      # 值的序列化方式
	      value-serializer: org.apache.kafka.common.serialization.StringSerializer
	      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
	      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
	      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
	      acks: 1
	    consumer:
	      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
	      auto-commit-interval: 1S
	      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
	      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
	      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
	      auto-offset-reset: earliest
	      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
	      enable-auto-commit: false
	      # 键的反序列化方式
	      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
	      # 值的反序列化方式
	      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
	      # 消费者超时时间 6properties:
	        max:
	          poll:
	            interval:
	              ms: 6000
	    listener:
	      # 在侦听器容器中运行的线程数。消费者组中的实例数量。 【本次重点】
	      concurrency: 5
	      #listner负责ack,每调用一次,就立即commit
	      ack-mode: manual_immediate
	      missing-topics-fatal: false

2.生产者类

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
@Slf4j
public class KafkaProducer {

    // 消费者组
    public static final String TOPIC_GROUP2 = "topic.group2";


    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic,Object obj) {
        String obj2String = JSONObject.toJSONString(obj);
        log.info("准备发送消息为:{}", obj2String);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}

3.消费者类

使用注解的方式来创建主题和分区。

package com.lezhi.szxy.oa.core.kafka;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.formula.functions.T;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.RetryingBatchErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class KafkaConsumer {

    @Resource
    private addService addService;

    @Resource
    private RedisLockUtil redisLockUtil;
    @Resource
    RedissonClient redissonClient;

    @Resource
    RedisTemplate<String,String> redisTemplate;

    private static final String ADD_LOCK_PREFIX = "ADD_LOCK_PREFIX";
  
    ObjectMapper objectMapper = new ObjectMapper();


    /**
     * 初始化主题分区
     * @return
     */
    @Bean
    public NewTopic batchTopic() {
        log.info("初始化主题分区batchTopic : add_topic,分区:5,副本数:1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
        return new NewTopic("add_topic", 5, (short) 1);
    }

    /**
     * 添加消息
     * @param ack
     */
    @KafkaListener(topics = "add_topic"C,groupId = KafkaProducer.TOPIC_GROUP2)
    public void handleAddMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("add_topic-队列消费端 topic:{}, 收到消息>>>>>>>>>>>>>>>>>", topic);
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            try {
                ParamImport param =  objectMapper.readValue(String.valueOf(msg) , ParamImport .class);
                String fullKey = redisLockUtil.getFullKey(ADD_LOCK_PREFIX , String.valueOf(msg));
                if(redisLockUtil.getLock(fullKey , 10000)){
					// 业务代码...
                    
                    log.info("add_topic 消费了: Topic:" + topic + ",Message:" + String.valueOf(msg));
                }else {
                    log.info("add_topic 已经被消费: Topic:" + topic + ",Message:" + String.valueOf(msg));
                }
                ack.acknowledge();
            } catch (Exception e) {
                e.printStackTrace();
                log.error("解析 <"+OaConstant.SALARY_SEND_MESSAGE_KAFKA_TOPIC+"> 数据异常");
            }
        }
    }
}

配置消费端主题分区启动后,查看kafka,add_topic主题生成五个分区实例
kafka配置
注意:一个消费线程,可以对应若干分区。但是为了保证数据的一致性,同一个分区同时只能备一个消费者实例消费,所以超过分区数量的消费者实例个数是多余的,会被闲置。

将消费者实例(消费线程)比为一个人,分区消息相当于一个办公位。办公位数>人数时,哪个办公位有消息待消费,人就到哪一个工位处理消息。当办公位数<人数时,后面的人数需要排队等待前面的人离开,才可以进入办公位消费。
当人再多时,只有一个办公位,人也得排队办公,属于同步消费;当办公位有多个时,才能实现多人同时操作。

单机kafka分区最好不超过5。默认使用轮询策略。

4.发送消息

public void addTopicMsg(ParamImport param) throws ServiceException {
        String json;
        try {
            json = objectMapper.writeValueAsString(param);
        } catch (JsonProcessingException e) {
            log.error("addTopicMsg-发送消息,kafka消息转换失败:{}", e);
            throw new ServiceException("发送失败");
        }
        log.info("addTopicMsg-发送消息,发送kafka请求>>>>>>>>>>>>>>>>>>>>>>>");
        kafkaTemplate.send("add_topic", json);
    }

三、减少分区数量

上文中,我们使用了new NewTopic()的方式创建分区,分区数量只能动态增加不能减少。所以我们需要根据以下步骤来重新生成分区,达成减少分区的目的。

1.停止业务服务进程

停止业务服务进程,使得不会重复生成分区。修改代码内配置的new NewTopic()配置分区数。

2.停止kafka服务进程

停止kafka服务进程,清空分区、主题等数据。

3.重新启动kafka服务

4.重新启动业务服务

此时就会根据修改后的分区设置重新生成分区。

参考文章

【SpringBoot】在Springboot中怎么设置Kafka自动创建Topic
SpringBoot+Kafka之如何优雅的创建topic
想弄明白Kafka到底是什么吗?看完这篇你就知道了!(概念、数据存储、生产者、消费者)
图解Kafka,看本篇就足够啦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/339960.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构与算法】1.数据结构绪论

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;小杨水平有限&#xff0c;欢迎各位大佬指点&…

用户头像上传

将用户上传的头像存储在腾讯云存储桶里 注册腾讯云 https://cloud.tencent.com/login 创建存储桶 配置跨域 来源 * (任何都可以访问) put get post 请求都可以 点击概览&#xff0c;查看存储桶基本信息 记录保存存储桶名称和地域 找到api密钥管理&#xff0c;新建密钥 ht…

1028 人口普查分数 (测试点3 20分)

某城镇进行人口普查&#xff0c;得到了全体居民的生日。现请你写个程序&#xff0c;找出镇上最年长和最年轻的人。 这里确保每个输入的日期都是合法的&#xff0c;但不一定是合理的——假设已知镇上没有超过 200 岁的老人&#xff0c;而今天是 2014 年 9 月 6 日&#xff0c;所…

基于深度学习的细胞感染性识别与判定

基于深度学习的细胞感染性识别与判定 基于深度学习的细胞感染性识别与判定引言项目背景项目意义项目实施数据采集与预处理模型选择与训练模型评估与优化 结果与展望结论 基于深度学习的细胞感染性识别与判定 引言 随着深度学习技术的不断发展&#xff0c;其在医学图像处理领域…

【遥感数字图像处理(朱文泉)】各章博文链接汇总及思维导图

遥感数字图像处理课程汇总 第0章 绪论第一章 数字图像基础第二章 数字图像存储与处理第三章 空间域处理方法第四章 变换域处理方法第五章 辐射校正第六章 几何校正第七章 图像去噪声第八章 图像增强第九章 感兴趣目标及对象提取第十章 特征提取与选择第十一章 遥感数字图像分类…

2.【SpringBoot3】用户模块接口开发

文章目录 开发模式和环境搭建开发模式环境搭建 1. 用户注册1.1 注册接口基本代码编写1.2 注册接口参数校验 2. 用户登录2.1 登录接口基本代码编写2.2 登录认证2.2.1 登录认证引入2.2.2 JWT 简介2.2.3 登录功能集成 JWT2.2.4 拦截器 3. 获取用户详细信息3.1 获取用户详细信息基本…

数字孪生技术助力澳大利亚绿色能源行业

OpenUtilities可实现变电站智能数字化设计&#xff0c;减少对环境的影响并节省 50% 的成本 将智能数字化设计扩展到小型基建工程 Essential Energy 的电网跨越 73.7 万公里&#xff0c;覆盖了澳大利亚新南威尔士州约 95&#xff05;的地区&#xff0c;为 1,500 个地区、农村和…

C++并发编程:线程启动

启动线程 C中构造 std::thread 对象启动线程 void do_some_work(); std::thread my_thread(do_some_work); 最简单的情况下是无参数无返回的函数。启动一个新的线程执行hello()函数。这种函数在其所属线程上运行&#xff0c;函数执行完毕&#xff0c; 线程结束。为了让编译器…

【深度学习:数据增强 】提高标记数据质量的 5 种方法

【深度学习&#xff1a;数据增强 】提高标记数据质量的 5 种方法 计算机视觉中常见的数据错误和质量问题&#xff1f;为什么需要提高数据集的质量&#xff1f;提高标记数据质量的五种方法使用复杂的本体结构作为标签人工智能辅助标签识别标签错误的数据改进注释者管理 计算机视…

Gitee Reward让开源作者不再为爱发电

一、什么是Gitee Reward&#xff1f; Gitee Reward是Gitee为改善开源开发生命周期提出的新策略。开源项目的支持者们可以更轻松地为其喜爱的项目提供资金&#xff0c;贡献者们也可以因为其不懈的开源贡献得到奖励。 二、Gitee Reward上允许哪些类型的项目&#xff1f; 允许任…

2024最新版Python 3.12.1安装使用指南

2024最新版Python 3.12.1安装使用指南 Installation and Configuration Guide to the latest version Python 3.12.1 in 2024 By Jackson Python编程语言&#xff0c;已经成为全球最受欢迎的编程语言之一&#xff1b;它简单易学易用&#xff0c;以标准库和功能强大且广泛外挂…

瑞_数据结构与算法_二叉树

文章目录 1 什么是二叉树2 二叉树的存储2.1 使用树节点类TreeNode存储&#xff08;代码&#xff09;2.2 使用数组存储 3 二叉树的遍历3.1 广度优先遍历3.2 深度优先遍历3.2.1 深度优先——前序遍历3.2.2 深度优先——中序遍历3.2.3 深度优先——后序遍历 3.3 代码实现3.3.1 递归…

03--数据库连接池

1、数据库连接池 1.1 JDBC数据库连接池的必要性 在使用开发基于数据库的web程序时&#xff0c;传统的模式基本是按以下步骤&#xff1a; 在主程序&#xff08;如servlet、beans&#xff09;中建立数据库连接进行sql操作断开数据库连接 这种模式开发&#xff0c;存在的问题:…

小程序样例2:简单图片分类查看

基本功能&#xff1a; 1、根据分类展示图片&#xff0c;点击类目切换图片&#xff1a; 2、点击分类编辑&#xff0c;编辑分类显示&#xff1a; 3、点击某个分类&#xff0c;控制主页该分类显示和不显示&#xff1a; 类目2置灰后&#xff0c;主页不再显示 4、点击分类跳转到具…

【C++语言1】基本语法

前言 &#x1f493;作者简介&#xff1a; 加油&#xff0c;旭杏&#xff0c;目前大二&#xff0c;正在学习C&#xff0c;数据结构等&#x1f440; &#x1f493;作者主页&#xff1a;加油&#xff0c;旭杏的主页&#x1f440; ⏩本文收录在&#xff1a;再识C进阶的专栏&#x1…

Python语法进阶——类

Python中的数据类型都属于类。int、str、list都是Python定义好的数据类型类。 print(type(list))#<class type> print(type(list()))#<class list> 一、自定义数据类型 一、语法 class 类名():pass #类名 要求首字母大写 #()可写可省略。 #pass在这里只是用来保证…

推荐IDEA一个小插件,实用性很高!!

插件&#xff1a; Convert YAML and Properties File 由于每个人的开发习惯不同&#xff0c;在开发过程中会遇到各种小细节的问题。今天给大家介绍一个小插件&#xff0c;作用不大&#xff0c;细节很足。 就是properties类型文件和yml文件互相自由转换 解决&#xff1a;…

2023年DevOps国际峰会暨 BizDevOps 企业峰会(DOIS北京站):核心内容与学习收获(附大会核心PPT下载)

随着科技的飞速发展&#xff0c;软件开发的模式和流程也在不断地演变。在众多软件开发方法中&#xff0c;DevOps已成为当下热门的软件开发运维一体化模式。特别是在中国&#xff0c;随着越来越多的企业开始认识到DevOps的价值&#xff0c;这一领域的研究与实践活动日益活跃。本…

计算机网络——运输层(2)暨小程送书

计算机网络——运输层&#xff08;2&#xff09;暨小程送书 小程一言专栏链接: [link](http://t.csdnimg.cn/ZUTXU) 运输层&#xff08;2&#xff09;TCP/IP对比TCP&#xff08;传输控制协议&#xff09;&#xff1a;IP&#xff08;互联网协议&#xff09;&#xff1a;总结 拥塞…

【设计模式】适配器和桥接器模式有什么区别?

今天我探讨一下适配器模式和桥接模式&#xff0c;这两种模式往往容易被混淆&#xff0c;我们希望通过比较他们的区别和联系&#xff0c;能够让大家有更清晰的认识。 适配器模式&#xff1a;连接不兼容接口 当你有一个类的接口不兼容你的系统&#xff0c;而你又不希望修改这个…
最新文章