@KafkaListener指定kafka集群

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:

依赖项(其实spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》

生产代码

@Component
@Slf4j
public class KafKaProducer {

	@Autowired
	private KafkaTemplate kafkaTemplate;

	public void sendMessage(String topic, Object object) {

		/*
		 * 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于
		 * kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于
		 * ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型
		 */
		ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);

		future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
			@Override
			public void onFailure(Throwable throwable) {
				log.error("发送消息失败:" + throwable.getMessage());
			}
			@Override
			public void onSuccess(SendResult<String, Object> sendResult){
			    // log.info("发送消息成功:" + sendResult.toString());
			}
		});
	}
}

消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {

    @Resource
    Environment environment;

    @Bean
    public KafkaListenerContainerFactory<?> containerFactory() {

        Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);
        Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);

        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));
        containerFactory.setConcurrency(concurrency); // 消费并发数量
        containerFactory.setBatchListener(true);      // 批量监听消息
        containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移
        containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限
        return containerFactory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {

        String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");
        String groupId          = environment.getProperty("kafka.groupId", "consumer-group");
        String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");
        String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");
        String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");
        String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        /// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);

        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", jaasConfig);

        return props;
    }
}

消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {

    @Autowired
    private Environment environment;

    @Autowired
    private KafkaMsgHandleService msgHandleService;

    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;

    /************************
     *      接收消息
     ************************/
    @Override
    @KafkaListener( containerFactory = "containerFactory", 
    				groupId = "${kafka.groupId}", 
    				topics = "#{'${kafka.topics}'.split(',')}", 
    				concurrency = "${kafka.concurrency}")
    public void onMessage(List<ConsumerRecord<String, String>> records) {
        try {
            final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());
            log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));
            /// 处理消息
            msgs.forEach(this::processRecord);
        } catch (Exception e) {
            log.error("KafkaListener_kafka_consume_error.", e);
        }
    }

    /************************
     *      处理消息
     ************************/
    private void processRecord(String msg) {
        taskExecutor.submit(() -> {
            if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {
                log.warn("KafkaListener_turn_off_drop_message.");
                return;
            }
            msgHandleService.handle(msg);
        });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/315814.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

个人网站制作 Part 3 用JS添加高级交互(表单验证、动态内容更新) | Web开发项目

文章目录 &#x1f469;‍&#x1f4bb; 基础Web开发练手项目系列&#xff1a;个人网站制作&#x1f680; 使用JavaScript进行交互&#x1f528;表单验证&#x1f527;步骤 1: 添加JavaScript文件&#x1f527;步骤 2: 更新表单HTML &#x1f528;动态内容更新&#x1f527;步骤…

API设计:从基础到最佳实践

1*vWvkkgG6uvgmJT8GkId98A.png 在这次深入探讨中&#xff0c;我们将深入了解API设计&#xff0c;从基础知识开始&#xff0c;逐步进阶到定义出色API的最佳实践。 作为开发者&#xff0c;你可能对许多这些概念很熟悉&#xff0c;但我将提供详细的解释&#xff0c;以加深你的理解…

冷风机系统的拟定和制冷循环的热力计算

一&#xff0c;冷风机系统的概述 单元式冷风机是一种不带加湿器的非热泵型单元式空气调节机&#xff0c;只能用于夏季降温&#xff0c;按标准 GB/T 17758-1999《单元式空气调节机组》的规定1&#xff0c;单元式冷风机应能使室内温度保持在(18℃-25℃)1℃&#xff0c;相对温度 (…

数据洞察力,驱动企业财务变革

我们不得不面对一个现实&#xff0c;就是数据量的剧增。加上大部分企业并不愿意删除历史数据&#xff0c;以防未来预测分析时需要&#xff0c;这造成数据就像一个雪球&#xff0c;越滚越大。然而&#xff0c;过多的数据和数据不足一样会成为企业发展和理解分析的障碍。从海量数…

Android Studio安卓读取EM4100 TK4100卡卡号源码

本示例使用的读卡器&#xff1a;https://item.taobao.com/item.htm?spma1z10.5-c.w4002-21818769070.35.44005b43nb1q2h&id562957272162 <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmln…

Python学习从0到1 day3 python变量和debug

没关系&#xff0c;这破败的生活压不住我 ——24.1.13 一、变量的定义 1.什么是量&#xff1f; 量是程序运行中的最小单元 2.什么是变量呢&#xff1f; ①变量是存储数据的容器 ②变量存储的数据时临时的&#xff0c;变量只有在程序运行过程中是有效的&#xff0c;当程序执行结…

TreesVariety

树木品种 - 树木和植物捆绑包。与“植被引擎”兼容的包装 通用和HDRP的树木包在这里 树木品种: ● 支持Unity Wind; ● 11种树木,7种植物; ● Unity树创建器树(可编辑); ● 与内置管道配合使用; ● 支持地形广告牌。 树木: ● 8棵桦树; ● 4块枫木; ● 8块橡木; ●…

第八讲 单片机驱动彩色液晶屏 控制RA8889软件:显示图片

单片机驱动TFT彩色液晶屏系列讲座 目录 第一讲 单片机最小系统STM32F103C6T6通过RA8889驱动彩色液晶屏播放视频 第二讲 单片机最小系统STM32F103C6T6控制RA8889驱动彩色液晶屏硬件框架 第三讲 单片机驱动彩色液晶屏 控制RA8889软件:如何初始化 第四讲 单片机驱动彩色液晶屏 控…

【排序篇1】插入排序、希尔排序

目录 一、插入排序二、希尔排序 一、插入排序 思路&#xff1a; 插入排序就像玩扑克牌&#xff0c;抽出一张牌作为比较的元素&#xff0c;与前面的牌依次进行比较&#xff0c;小于继续往前比较&#xff0c;大于等于停下插入到当前位置。 图示&#xff1a; void InsertSort(…

DNS域名解析以及操作流程

dns:将域名转化为IP地址的过程&#xff0c;域名方便人们记忆&#xff0c;ip地址过长&#xff0c;且都是数字&#xff0c;不方便记忆&#xff0c;所以才出现了域名。 怎么实现访问域名等于访问ip地址 1.老方法&#xff1a;写入文件里 /etc/hosts 左边 IP地址 右边域名 格式例…

S1-05二进制信号量和计数器信号量

二进制信号量 二进制信号量&#xff0c;又叫二值信号量&#xff0c;要么是0&#xff0c;要么是1&#xff0c;也是通过Take和Give方式获取和释放&#xff0c;用于控制对共享资源的访问。在每次访问共享资源之前需要获取二进制信号量&#xff0c;若已被获取则任务会被阻塞直到二…

技术专栏——你所不知道的 RocketMQ 的集群管理:副本机制

这些精彩的技术类型的体系化文章&#xff0c;后面我会放到公众号上&#xff0c;并集中在合集“分布式消息中间件专栏”中&#xff0c;欢迎大家去订阅我的公众号和视频号“架构随笔录”&#xff0c;大家可以订阅合集&#xff0c;这样更加方便喔&#xff0c;后面会出电子版本&…

电脑上不安装Oracle,但是虚拟机装了Oracle,怎么连接到虚拟机里的Oracle数据库呢?

1、准备工作 1.1、确定数据库版本信息 注&#xff1a;如果知道数据库的版本信息&#xff0c;这个步骤可以跳过。 比较简单的方法&#xff0c;直接看数据库的安装位置&#xff0c;也就是数字&#xff08;但是这个方法确定就是&#xff0c;不好确定是多少位的数据库&#xff09;…

AI智能分析网关V4烟火检测算法解决方案

一、背景需求 根据国家消防救援局公布的数据显示&#xff0c;2023年共接报处置各类警情213.8万起&#xff0c;督促整改风险隐患397万处。火灾危害巨大&#xff0c;必须引起重视。传统靠人工报警的方法存在人员管理难、场地数量多且分散等问题&#xff0c;无法有效发现险情降低…

微信小程序(二)事件绑定

注释很详细&#xff0c;直接上代码 新增内容&#xff1a; 点击事件绑定注册页面设置页面初始化数据事件处理函数的实现更新数据并更新视图 源码&#xff1a; index.wxml <!-- 页面的数据绑定 --> <view>{{msg}}</view> <!-- 绑定点击事件 --> <but…

openssl3.2 - 官方demo学习 - cipher - aesgcm.c

文章目录 openssl3.2 - 官方demo学习 - cipher - aesgcm.c概述笔记END openssl3.2 - 官方demo学习 - cipher - aesgcm.c 概述 AES-256-GCM 在这个实验中验证了EVP_CIPHER_fetch()中算法名称字符串的来源定位. 在工程中配置环境变量PATH, 且合并环境. 这样就不用将openSSL的D…

【Python】编程练习的解密与实战(四)

​&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《Python | 编程解码》⏰诗赋清音&#xff1a;云生高巅梦远游&#xff0c; 星光点缀碧海愁。 山川深邃情难晤&#xff0c; 剑气凌云志自修。 目录 &#x1fa90;1. 初识Python &a…

Spring Boot - Application Events 的发布顺序_ApplicationStartedEvent

文章目录 Pre概述Code源码分析 Pre Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEvent 概述 Spring Boot 的广播机制是基于观察者模式实现的&#xff0c;它允许在 Spring 应用程序中发布和监听事件。这种机制的主要目的是为了实现解耦&#…

linux 服务器上安装 ftp(亲测有效)

目录 1 需求2 安装 1 需求 服务器上需要安装ftp 2 安装 1 下载地址 https://developer.aliyun.com/packageSearch?wordvsftpd或者 https://rpmfind.net/linux/rpm2html/search.php?queryvsftpd2 如何判断 服务器是否安装了ftp rpm -qa | grep vsftpd出现提示则表示已安装…

什么是国密算法

国密算法是指由中国国家密码管理局发布的密码算法标准&#xff0c;旨在保障国家信息安全。目前&#xff0c;国家密码管理局已发布了一系列国产商用密码标准算法&#xff0c;包括SM1&#xff08;SCB2&#xff09;、SM2、SM3、SM4、SM7、SM9以及祖冲之密码算法&#xff08;ZUC)等…
最新文章