SpringBoot3集成Kafka

标签:Kafka3.Kafka-eagle3;

一、简介

Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案;

二、环境搭建

1、Kafka部署

1、下载安装包:kafka_2.13-3.5.0.tgz

2、配置环境变量

open -e ~/.bash_profile

export KAFKA_HOME=/本地路径/kafka3.5
export PATH=$PATH:$KAFKA_HOME/bin

source ~/.bash_profile

3、该目录【kafka3.5/bin】启动zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties

4、该目录【kafka3.5/bin】启动kafka
kafka-server-start.sh ../config/server.properties

2、Kafka测试

1、生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message

2、消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message

3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic

4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message

3、可视化工具

配置和部署

1、下载安装包:kafka-eagle-bin-3.0.2.tar.gz

2、配置环境变量

open -e ~/.bash_profile

export KE_HOME=/本地路径/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/bin

source ~/.bash_profile

3、修改配置文件:system-config.properties

efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle

4、本地新建数据库:kafka-eagle,注意用户名和密码是否一致

5、启动命令
efak-web-3.0.2/bin/ke.sh start
命令语法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}

6、本地访问【localhost:8048】 username:admin password:123456

KSQL语句测试

select * from `test-topic` where `partition` in (0)  order by `date` desc limit 5

select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3

三、工程搭建

1、工程结构

2、依赖管理

这里关于依赖的管理就比较复杂了,首先spring-kafka组件选择与boot框架中spring相同的依赖,即6.0.10版本,在spring-kafka最近的版本中3.0.8符合;

但是该版本使用的是kafka-clients组件的3.3.2版本,在Spring文档的kafka模块中,明确说明spring-boot:3.1要使用kafka-clients:3.4,所以从spring-kafka组件中排除掉,重新依赖kafka-clients组件;

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka-clients.version}</version>
</dependency>

3、配置文件

配置kafka连接地址,监听器的消息应答机制,消费者的基础模式;

spring:
  # kafka配置
  kafka:
    bootstrap-servers: localhost:9092
    listener:
      missing-topics-fatal: false
      ack-mode: manual_immediate
    consumer:
      group-id: boot-kafka-group
      enable-auto-commit: false
      max-poll-records: 10
      properties:
        max.poll.interval.ms: 3600000

四、基础用法

1、消息生产

模板类KafkaTemplate用于执行高级的操作,封装各种消息发送的方法,在该方法中,通过topickey以及消息主体,实现消息的生产;

@RestController
public class ProducerWeb {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send/msg")
    public String sendMsg (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));
            // 发送消息
            kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return "OK" ;
    }
}

2、消息消费

编写消息监听类,通过KafkaListener注解控制监听的具体信息,在实现消息生产和消费的方法测试后,使用可视化工具kafka-eagle查看topic和消息列表;

@Component
public class ConsumerListener {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaListener(topics = "boot-kafka-topic")
    public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {
        try {
            String key =  String.valueOf(record.key());
            String body = record.value();
            log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            acknowledgment.acknowledge();
        }
    }
}

五、参考源码

文档仓库:
https://gitee.com/cicadasmile/butte-java-note

源码仓库:
https://gitee.com/cicadasmile/butte-spring-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/79679.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python入门【动态添加属性和方法、正则表达式概述、match函数的使用、常用匹配符、限定符 、限定符使用示例】(二十九)

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱敲代码的小王&#xff0c;CSDN博客博主,Python小白 &#x1f4d5;系列专栏&#xff1a;python入门到实战、Python爬虫开发、Python办公自动化、Python数据分析、Python前后端开发 &#x1f4e7;如果文章知识点有错误…

List Label Standard Reporting Edition Crack

List & Label Standard Reporting Edition Crack List&Label是适用于所有主要开发平台的报告解决方案&#xff0c;提供了强大的报告引擎、灵活的API和功能丰富的报告设计器。只需要几行代码就可以在桌面、web或云应用程序中嵌入List&Label。它允许您的应用程序用户…

Oracle字段长度不足位数补零

Oracle字段长度不足位数补零 有时候从数据库中取出的月份值是1&#xff0c;而不是01&#xff0c;该怎么办呢 SELECTLPAD( CODE_MONTH, 2, 0 ) FROMtb_cube_TY001 WHERECODE_BM_MEATYPE TY20 AND code_measure MYLX01 AND code_month <> ~ AND CODE_ENTITY 01A AND…

Spring框架中JavaBean的生命周期及单例模式与多列模式

Spring框架中JavaBean的生命周期及单例模式与多列模式 1. Spring框架中JavaBean的管理过程1.1 #定义Bean1.2 Bean的实例化1.3 属性注入1.4 初始化方法1.5 Bean的使用和引用1.6 销毁方法 2. 单例模式与原型模式在JavaBean管理中的应用1.在Spring管理JavaBean的过程中&#xff0c…

抖音短视频SEO矩阵系统源码开发

一、概述 抖音短视频SEO矩阵系统源码是一项综合技术&#xff0c;旨在帮助用户在抖音平台上创建并优化短视频内容。本文将详细介绍该系统的技术架构、核心代码、实现过程以及优化建议&#xff0c;以便读者更好地理解并应用这项技术。 二、技术架构 抖音短视频SEO矩阵系统采用前…

回归预测 | MATLAB实现TSO-BP金枪鱼群优化算法优化BP神经网络多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现TSO-BP金枪鱼群优化算法优化BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现TSO-BP金枪鱼群优化算法优化BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果…

机器学习|决策树:数学原理及代码解析

机器学习&#xff5c;决策树&#xff1a;数学原理及代码解析 决策树是一种常用的监督学习算法&#xff0c;适用于解决分类和回归问题。在本文中&#xff0c;我们将深入探讨决策树的数学原理&#xff0c;并提供 Python 示例代码帮助读者更好地理解和实现该算法。 决策树数学原…

3.若依前后端分离版开发用户自定义配置表格功能

一、背景 在项目上线测试的时候&#xff0c;关于同一个界面的表格&#xff0c;不同的用户会出现不同的字段排列需求&#xff0c;有些用户希望把A字段排在最前面&#xff0c;有些用户则希望A字段不显示。针对这种情况&#xff0c;开发一个表格自定义配置的功能&#xff0c;每个…

QT的核心——信号与槽

目录 回顾C 语言信号 1、信号与槽 2、关联信号与槽 2.1自动关联信号与槽 2.2手动关联信号与槽 2.3断开信号与槽 3、自定义信号 3.1自定义信号使用条件 3.2自定义槽函数使用条件 4、信号与槽参数传递 4.1自定义一个带参的信号 4.2关联带参的信号与槽 4.3发送一个带…

奥威BI数据可视化工具:个性化定制,打造独特大屏

每个人都有自己独特的审美&#xff0c;因此即使是做可视化大屏&#xff0c;也有很多人希望做出不一样的报表&#xff0c;用以缓解审美疲劳的同时提高报表浏览效率。因此这也催生出了数据可视化工具的个性化可视化大屏制作需求。 奥威BI数据可视化工具&#xff1a;个性化定制&a…

[线程/C++]线程同(异)步和原子变量

文章目录 1.线程的使用1.1 函数构造1.2 公共成员函数1.2.1 get_id()1.2.2 join()2.2.3 detach()2.2.5 joinable()2.2.6 operator 1.3 静态函数1.4 call_once 2. this_thread 命名空间2.1 get_id()2.2 sleep_for()2.3 sleep_until()2.4 yield() 3. 线程同步之互斥锁3.1 std:mute…

Open cv C++安装

注意;要退出conda的虚拟环境 依赖 1.更新系统 sudo apt-get update sudo apt-get upgrade 2.安装相关的依赖 sudo apt-get install build-essential cmake git libgtk2.0-dev pkg-config libavcodec-dev libavformat-dev libswscale-dev sudo apt-get install libjpeg-de…

C语言 poll多路复用

NAME poll, ppoll - wait for some event on a file descriptor SYNOPSIS #include <poll.h> 函数原型&#xff1a; int poll(struct pollfd *fds, nfds_t nfds, int timeout); #define _GNU_SOURCE /* See feature_test_macros(7) */ …

网格(mesh)点跟踪及在贴图中的应用

本文介绍网格跟踪的思路及其在贴图中的使用效果。网格跟踪即跟踪所有的网格点&#xff0c;然后根据网格点估算某一点的变形&#xff0c;相较于曲面跟踪可以在保证一定精度条件下大幅提高处理速度。这里介绍一种简单的网格跟踪思路&#xff0c;效果如下图所示&#xff1a; 创建网…

vue3中使用第三方插件mitt实现任意组件通讯

vue3中使用第三方插件mitt实现任意组件通讯 组件通讯是vue3组合式开发的核心之一&#xff0c;现在我在写代码时&#xff0c;一个组件的代码超过了200行&#xff0c;基本都会拆分组件。组件拆分后&#xff0c;组件之间的通讯就很重要&#xff0c;总结了一下&#xff0c;目前有这…

微服务—Eureka注册中心

eureka相当于是一个公司的管理人事HR,各部门之间如果有合作时&#xff0c;由HR进行人员的分配以及调度&#xff0c;具体选哪个人&#xff0c;全凭HR的心情&#xff0c;如果你这个部门存在没有意义&#xff0c;直接把你这个部门撤销&#xff0c;全体人员裁掉&#xff0c;所以不想…

vue根据template结构自动生成css/scss/less样式嵌套

vscode搜索安装插件&#xff1a;AutoScssStruct4Vue

mqtt协议

MQTT&#xff08;Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议&#xff09;&#xff0c;是一种基于发布/订阅&#xff08;publish/subscribe&#xff09;模式的"轻量级"通讯协议&#xff0c;该协议构建于TCP/IP协议上&#xff0c;由IBM在19…

特殊数字专题

特殊数字 1.奇数2.偶数3.完数4.素数5.回文数6.水仙花数7.中位数9.随机数11.求年份&#xff1a;闰年12.求数字&#xff1a;两个整数的最大公约数及最小公倍数 1.奇数 代码案例&#xff1a; //输出所有1-1000之间的奇数 #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h&…

如何避免爬虫IP被屏蔽

各位爬友们好&#xff0c;作为一名专业的爬虫代理提供者&#xff0c;我要和大家分享一些避免爬虫IP被屏蔽的实用技巧。你知道吗&#xff0c;当我们爬取数据的时候&#xff0c;很容易被目标网站识别出来并封禁我们的IP地址&#xff0c;导致无法继续爬取数据。这个问题困扰了很多…
最新文章