SpringBoot整合Kafka (一)

📑前言

本文主要讲了SpringBoot整合Kafka文章,如果有什么需要改进的地方还请大佬指出⛺️

🎬作者简介:大家好,我是青衿🥇
☁️博客首页:CSDN主页放风讲故事
🌄每日一句:努力一点,优秀一点

在这里插入图片描述

目录

文章目录

  • 📑前言
  • **目录**
    • 一、介绍
    • 二、主要功能
    • 三、Kafka基本概念
    • 四、Spring Boot整合Kafka的demo
      • 1、构建项目
        • 1.1、引入依赖
        • 1.2、YML配置
        • 1.3、生产者简单生产
        • 1.4、消费者简单消费
      • 2、生产者
        • 2.1、Kafka生产者消息监听
        • 2.2、生产者写入分区策略
          • 指定写入分区
          • 根据key写入分区
          • 随机选择分区
        • 2.3、带回调的生产者
  • 📑文章末尾


Kafka

一、介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

二、主要功能

1.消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
2.存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
3.日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种
consumer,例如hadoop、Hbase、Solr等。

三、Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。
首先,让我们来看一下基础的消息(Message)相关术语:

Broker
消息中间件处理节点,一个Kafka节点就是一个broker,一 个或者多个Broker可以组成一个Kafka集群
Topic
Kafka根据topic对消息进行归类,发布到Kafka集群的每条 消息都需要指定一个topic
Producer
消息生产者,向Broker发送消息的客户端 Consumer 消息消费者,从Broker读取消息的客户端ConsumerGroup
每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition
物理上的概念,一个topic可以分为多个partition,每个 partition内部消息是有序的

在这里插入图片描述

四、Spring Boot整合Kafka的demo

1、构建项目

1.1、引入依赖
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
1.2、YML配置
spring:
  kafka:
    bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个,使用逗号分隔。
    producer: # 消息提供者key和value序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 3 # 生产者发送失败时,重试发送的次数
    consumer: # 消费端反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: demo # 用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""```
1.3、生产者简单生产
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    void contextLoads() {
        ListenableFuture listenableFuture = kafkaTemplate.send("test01-topic", "Hello Wolrd test");
        System.out.println("发送完成");
    }

1.4、消费者简单消费
@Component
public class TopicConsumer {

    @KafkaListener(topics = "test01-topic")
    public void readMsg(String msg){
        System.out.println("msg = " + msg);
    }
}

2、生产者

2.1、Kafka生产者消息监听

@Component
public class KafkaSendResultHandler implements ProducerListener {

    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success : " + producerRecord.toString());

    }
    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        log.info("Message send error : " + producerRecord.toString());
    }
}

2.2、生产者写入分区策略

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka
为我们提供了默认的分区策略,同时它也支持自定义分区策略。

指定写入分区

100条数据全部写入到2号分区

for (int i = 0; i < 100; i++) {
    ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic",2,null, "toString");
    listenableFuture.addCallback(new ListenableFutureCallback() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送失败");
        }
        @Override
        public void onSuccess(Object result) {
            SendResult sendResult = (SendResult) result;
            int partition = sendResult.getRecordMetadata().partition();
            String topic = sendResult.getRecordMetadata().topic();
            System.out.println("topic:"+topic+",分区:" + partition);
        }
    });
    Thread.sleep(1000);
}

根据key写入分区
String key = "kafka-key";
System.out.println("根据key计算分区:" + (key.hashCode() % 3));
for (int i = 0; i < 10; i++) {
    ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic", key, "toString");
    listenableFuture.addCallback(new ListenableFutureCallback() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送失败");
        }

        @Override
        public void onSuccess(Object result) {
            SendResult sendResult = (SendResult) result;
            int partition = sendResult.getRecordMetadata().partition();
            String topic = sendResult.getRecordMetadata().topic();
            System.out.println("topic:" + topic + ",分区:" + partition);
        }
    });
    Thread.sleep(1000);
}
随机选择分区
 for (int i = 0; i < 10; i++) {
     ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic", "toString");
     listenableFuture.addCallback(new ListenableFutureCallback() {
         @Override
         public void onFailure(Throwable ex) {
             System.out.println("发送失败");
         }

         @Override
         public void onSuccess(Object result) {
             SendResult sendResult = (SendResult) result;
             int partition = sendResult.getRecordMetadata().partition();
             String topic = sendResult.getRecordMetadata().topic();
             System.out.println("topic:" + topic + ",分区:" + partition);
         }
     });
     Thread.sleep(1000);
 }
2.3、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理

for (int i = 0; i < 10; i++) {
    ListenableFuture listenableFuture = kafkaTemplate.send("demo03-topic",2,null, "toString");
    listenableFuture.addCallback(new ListenableFutureCallback() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送失败");
        }
        @Override
        public void onSuccess(Object result) {
            SendResult sendResult = (SendResult) result;
            int partition = sendResult.getRecordMetadata().partition();
            String topic = sendResult.getRecordMetadata().topic();
            System.out.println("topic:"+topic+",分区:" + partition);
        }
    });
    Thread.sleep(1000);
}

以上是简单的Spring Boot整合kafka的示例,可以根据自己的实际需求进行调整。

📑文章末尾

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/117934.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

极致性能优化:前端SSR渲染利器Qwik.js | 京东云技术团队

引言 前端性能已成为网站和应用成功的关键要素之一。用户期望快速加载的页面和流畅的交互&#xff0c;而前端框架的选择对于实现这些目标至关重要。然而&#xff0c;传统的前端框架在某些情况下可能面临性能挑战且存在技术壁垒。 在这个充满挑战的背景下&#xff0c;我们引入…

安全好用的远程协同运维软件重点推荐-行云管家

对于运维小伙伴而言&#xff0c;一个安全好用的远程协同运维软件至关重要&#xff0c;不仅可以提高工作率&#xff0c;降低工作风险&#xff0c;还能快速解决运维难题。目前市面上远程协同运维软件品牌比较多&#xff0c;这里我们小编给推荐行云管家IT远程协同运维平台。 安全…

LeetCode 面试题 16.17. 连续数列

文章目录 一、题目二、C# 题解 一、题目 给定一个整数数组&#xff0c;找出总和最大的连续数列&#xff0c;并返回总和。 示例&#xff1a; 输入&#xff1a; [-2,1,-3,4,-1,2,1,-5,4] 输出&#xff1a; 6 解释&#xff1a; 连续子数组 [4,-1,2,1] 的和最大&#xff0c;为 6。…

IDEA中配置Maven

一、Maven下载 首先我们进入maven官方网站,进入网页后,点击Download去下载 下载免安装版,解压即可,解压至磁盘任意目录,尽量不要取中文名如下图: 二、配置Maven环境变量 复制Maven所在的路径 D:\maven\apache-maven-3.6.3,此电脑右键选择属性->高级系统设置->环境…

UE5——源码阅读——100——渲染——高清截图

创建事件&#xff0c;用于代码的调试 获取当前客户端所属的World 标记是否在进行重入绘制 是否开始缓存区可视化转存帧&#xff0c;主要针对请求屏幕截图或电影转存 判断是否需要高清截图 这下面这个函数执行高清截图 是否需要缓存区的可视化转存 判断是否开始渲染 如果…

黑盒测试用例设计方法之等价类划分法

等价类划分法是一种典型的黑盒测试用例设计方法。采用等价类划分法时&#xff0c;完全不用考虑程序内部结构&#xff0c;设计测试用例的唯一依据是软件需求规格说明书。 等价类 所谓等价类&#xff0c;是输入条件的一个子集合&#xff0c;该输入集合中的数据对于揭示程序中的…

MySQL 排序,分组,Limit的优化策略

目录 1. MySQL 中的两种排序方式 2. 排序优化策略 2.1 对排序字段添加索引 2.2 可以和WHERT字段创建联合索引 2.3 优化 FilerSort 排序方式 3. 分组优化策略 3.1 能WHERE不HAVING 3.2 减少ORDER BY&#xff0c;GROUP BY&#xff0c;DISTINCT 3.3 遵照最左前缀法则 4.…

更新版PHP神算网八字算命星座解梦周易占卜程序源码/PC+H5移动端整站适配/PHP源码带手机版

源码简介&#xff1a; 这个是更新版PHP神算网八字算命星座解梦周易占卜程序源码&#xff0c;能够在PCH5移动端整站适配。作为H5付费算命PHP源码&#xff0c;八字算命网站源码&#xff0c;功能很多强大实用。 2023.3 更新记录&#xff1a; 1、更新了23年属相信息&#xff1b;…

pytorch实现 --- 手写数字识别

本篇文章是博主在人工智能等领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对人工智能等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅解。文章分类在Pytorch&#xff…

海康威视解码器维修DS-6900系列DS-6916UD

海康威视解码器常见维修型号&#xff1a;DS-6916UD/DS-6901/DS-6904/DS-6908/DS-6910/DS-6912UD/6A16 DS-6A16UD 产品类型&#xff1a;视音频解码器纠错 I/O接口&#xff1a;输入 DVI-I纠错&#xff1b;输出 VGA&#xff0c;BNC纠错&#xff1b;音频输入 HDMI纠错 产品特性 …

安科瑞关于新能源电动汽车有序充电的对策-安科瑞黄安南

摘要 随着我国能源战略发展以及低碳行动的实施&#xff0c;电动汽车已逐步广泛应用&#xff0c;而电动汽车的应用非常符合当今社会对环保意识的要求&#xff0c;以及有效节省化石燃料的消耗。由于其没有污染排放的优点以及政府部门的关注&#xff0c;电动汽车将成为以后出行的…

【网络知识必知必会】聊聊数据链路层以太网

文章目录 前言1. 认识以太网2. 以太网帧格式已经有了ip地址, 为什么还要有 mac 地址呢?认识MTUMTU对IP协议的影响MTU对UDP协议的影响MTU对于TCP协议的影响 总结 前言 本文继续来聊聊网络传输中数据链路层中的一个代表协议, 以太网. 以太这个词其实最早出现在物理学当中, 在早…

基于SpringAOP实现自定义接口权限控制

文章目录 一、接口鉴权方案分析1、接口鉴权方案2、角色分配权限树 二、编码实战1、定义权限树与常用方法2、自定义AOP注解3、AOP切面类&#xff08;也可以用拦截器实现&#xff09;4、测试一下 一、接口鉴权方案分析 1、接口鉴权方案 目前大部分接口鉴权方案&#xff0c;一般…

HTML5的语义元素

HTML5语义元素&#xff1a; HTML5提供新的语义元素来明确一个web页面的不同部分&#xff1a;<head>、<nav>、<section>、<article>、<aside>、<figcation>、<figure>、<footer>。 1&#xff09;、<section>元素&#x…

dockerfile避坑笔记(VMWare下使用Ubuntu在Ubuntu20.04基础镜像下docker打包多个go项目)

一、docker简介 docker是一种方便跨平台迁移应用的程序&#xff0c;通过docker可以实现在同一类操作系统中&#xff0c;如Ubuntu和RedHat两个linux操作系统中&#xff0c;实现程序的跨平台部署。比如我在Ubuntu中打包了一个go项目的docker镜像&#xff08;镜像为二进制文件&am…

2023年11月5日网规考试备忘

早上题目回忆&#xff1a; pki体系 ipsec&#xff0c;交换安全&#xff08;流量抑制&#xff09; aohdlc bob metclaf —ethernet pon tcp三次握手 OSPF lsa&#xff1f;交换机组ospf配置问题&#xff0c;ping网关可通&#xff0c;AB不通 raid6 300G*8 网络利用率 停等协议10…

【C++初阶】一、入门知识讲解(C++关键字、命名空间、C++输入输出、缺省参数、函数重载)

相关代码gitee自取&#xff1a; C语言学习日记: 加油努力 (gitee.com) 接上期&#xff1a; 【数据结构初阶】十一、归并排序(比较排序)的讲解和实现 &#xff08;递归版本 非递归版本 -- C语言实现&#xff09;-CSDN博客 引入&#xff1a;什么是C C语言是结构化和模块化的…

剑指JUC原理-9.Java无锁模型

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码&#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44d;三连支持&…

Flink SQL 窗口聚合详解

1.滚动窗⼝&#xff08;TUMBLE&#xff09; **滚动窗⼝定义&#xff1a;**滚动窗⼝将每个元素指定给指定窗⼝⼤⼩的窗⼝&#xff0c;滚动窗⼝具有固定⼤⼩&#xff0c;且不重叠。 例如&#xff0c;指定⼀个⼤⼩为 5 分钟的滚动窗⼝&#xff0c;Flink 将每隔 5 分钟开启⼀个新…

如何在知识付费系统小程序开发中实现社区互动和用户参与

在知识付费系统小程序的开发中&#xff0c;实现社区互动和用户参与可以通过以下步骤实现&#xff1a; 1. 建立用户身份验证和管理系统 // 后端示例代码&#xff08;Node.js&#xff09; // 用户注册 app.post(/register, (req, res) > {const { username, email, passwor…