07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

目录

  • Kafka --- 消息生产者
    • ★ 消息
    • ★ 消息的分发机制
    • ★ 分发到哪个分区
    • ★ 轮询策略(round-robin)
    • ★ 使用命令行工具发送消息
      • 演示添加消息
  • Kafka --- 消息消费者
    • ★ 消息消费者命令
    • ▲ 监听 【指定主题】 的所有消息:
    • ▲ 监听 【指定主题、指定分区】的所有消息:
    • ▲ 监听【指定主题、指定分区】的【新】消息:
    • ▲ 监听【指定主题、指定分区的、指定下标及之后】的所有消息:
    • kafka灵活之处:

Kafka — 消息生产者


★ 消息

简单来说,就是一个数据项。

▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。

从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。

▲ 下面是一个示例事件:
key: “fkjava”
value: “publish a new Book”
timestamp: “Feb. 15, 2021 at 2:06 p.m.”


★ 消息的分发机制

【注意:】当程序向主题发送消息时,该消息会被立即分给该主题下某一个领导者分区。

消息生产者向消息主题发送消息,这些消息将会立即分发给该主题下某一个分区(此处说的都是领导者分区)来保存

主题下的每条消息只会保存在一个领导者分区中,而不会在多个领导者分区中保存多份

消息实际上是存在分区中的,往主题发送消息只是一种逻辑说法。
当生产者发送一条消息到一个主题的时候,实际上这个消息马上就会被直接分发到对应的某一个领导者分区当中。

非领导者分区只是领导者分区的后备,也就是备份而已,当领导者分区挂掉的时候,非领导者分区就有可能成为领导者分区。

但是真正能够直接与客户进行交互的,就是直接接收用户的数据,或者让用户来消费数据的只能是领导者分区。

在这里插入图片描述


★ 分发到哪个分区

当生产者发送一条消息时,它会按如下规则来决定该消息被分发到哪个分区:

优先级:

最优先:(1)如果在发送消息时指定了分区,则消息分发到指定的分区。

(2)如果发送消息时没有指定分区,但消息的key不为空,则基于key的hashCode来选择一个分区。

此处暗示了:在一段时间内:同一个key的多条消息,通常会被分发到同一个分区

最次:(3)如果既没有指定分区,且消息的key也是空,则用轮询策略(round-robin)来选择一个分区。


★ 轮询策略(round-robin)

轮询策略(round-robin)就是按顺序来分发消息,

比如下面一个主题有P0、P1、P2三个分区,

那么第一条消息被分发到P0分区,
第二条消息被分发到P1分区,
第三条消息被分发到P2分区,

以此类推,第四条消息又被分发到P0分区。
在这里插入图片描述


★ 使用命令行工具发送消息

Kafka提供了kafka-console-producer.bat(.sh)工具来发送消息,例如执行如下命令:

下面命令发送带key的消息:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2 ^
--property parse.key=true

上面命令指定向test2这个主题发送消息,并通过“parse.key=true”指定发送消息时会解析消息的key,

默认解析规则为:制表符(Tab键)之前的是key,制表符(Tab键)之后的是value。

如果不指定“parse.key=true”属性,则默认不解析消息的key,也就是发送不带key的消息。

下面命令发送不带key的消息:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2


演示添加消息

因为演示过程中,我弄的3个节点老是只有一个能存活,所以把 kafka和zookeeper的存储数据的文件夹都删除了,重新启动,就好了。方便演示。

删除Kaka和zookeeper的存储数据的文件夹,重新启动

现在3个节点就能正常存活了。
在这里插入图片描述

接下来继续演示:
添加 不带key 和带key的消息,演示生产者发送消息:

解释:
1、演示生产者发送消息,消息发送到主题 test2 那里
2、发送没有带 key 的消息,kafka 采用的是轮询的策略,把消息存放到不同的分区里面;如图,test2 主题有4个分区(属于领导者分区),那么这8条消息就会轮询的发送到这4个分区里面
3、发送带key的消息,如图,key 是 ljh,kafka就会计算 ljh 这个key的hashcode 值,然后存放都某一个分区里面,因为key都是一样的,所以这几个key为ljh的消息都会被发到同一个分区里面。
但是具体发送到哪个分区,是无法指定的。
4、发送带key的消息,如图,key 和 value 之间要间隔一个 Tab 键,不要弄成空格键。

命令在上面

在这里插入图片描述

我自己再添加不同key的消息,可以看出新添加的两条消息,是存放在0分区的。

在这里插入图片描述



Kafka — 消息消费者

★ 消息消费者命令

消费者用于从消息主题读取消息,Kafka提供了kafka-console-consumer.bat工具命令从指定主题、甚至指定分区读取消息。
该工具支持如下常用选项:

 --bootstrap-server:指定要连接的Kafka主机和端口。
 --from-beginning:指定从开始读取消息。
 --group:指定组ID。
 
 --offset <String: consume offset>:指定从特定下标开始读取消息,
 比如将该选项设为1,表明从第2条消息开始读取;
 该选项还支持earliest和latest两个字符串值,
 其中earliest表示从最开始处读取(类似于--from-beginning选项的作用),
 latest表示从最新处开始读取、即不读取之前的消息,latest是默认值。
 
 --partition <Integer: partition>:指定哪个分区。
 
 --property:用于指定一些额外属性,
   比如 print.timestamp=true 指定要输出时间戳,
       print.key=true表示输出消息key,
       print.offset=true表示打印消息的下标,
       print.partition=true表示打印分区信息。
       
 --topic:指定哪个主题。 


▲ 监听 【指定主题】 的所有消息:

这个监听命令,运行后是一直存在的,会一直监听,有新消息会马上监听出来的。

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

可以看到消息都成功存放在分区里面。
可以看到指定主题 test2 下的所有消息
但是目前还演示出轮询存放,先不理。

在这里插入图片描述



▲ 监听 【指定主题、指定分区】的所有消息:

看看分区2下的所有消息

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --partition 2^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

在这里插入图片描述

看看分区3下的所有消息

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --partition 3^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

分区3没有消息,所以没有任何显示:正确
在这里插入图片描述




查看分区0的消息:
有两条,正确
在这里插入图片描述


注意点:
–from-beginning:指定从开始读取消息。
添加这个命令,就是一直都会读取到从一开始就添加的消息,这样演示不够真实,因为我们消息消费之后,正常情况下我们不需要再查出来,所以可以用offset这个命令:
–offset <String: consume offset>:指定从特定下标开始读取消息。
下面就来演示这个offset命令:

 --offset <String: consume offset>:指定从特定下标开始读取消息,
 比如将该选项设为1,表明从第2条消息开始读取;
 该选项还支持earliest和latest两个字符串值,
 其中
 earliest表示从最开始处读取(类似于--from-beginning选项的作用),
 latest表示从最新处开始读取、即不读取之前的消息,latest是默认值。

▲ 监听【指定主题、指定分区】的【新】消息:

(模拟传统的ActiveMQ、RabbitMQ的消息模型):

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --offset latest ^
 --partition 0^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

演示 latest 从最新处开始读取、即不读取之前的消息,latest是默认值。

如图:现在从最新开始出监听消息,为了演示能监听最新消息,我们再打开一个命令行小黑窗,来往这个 0 分区发送消息。

因为上面key 为 l 的消息是发送到 0 分区,所以接下来发送的消息key也设置为l

在这里插入图片描述

如图:生产者刚发送消息,消费者这边马上就监听到主题为test2,分区为0 的最新的消息。

在这里插入图片描述



▲ 监听【指定主题、指定分区的、指定下标及之后】的所有消息:

这个 --offset 2 ^ 指定下标,查出来的消息是包括索引下标2 这条消息的。


先查出主题test2,分区0的所有消息,用来对比:

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --partition 0^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

在这里插入图片描述


再查出 【主题test2,分区0,指定索引下标为2及之后】 的所有消息,用来对比:

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --offset 2 ^
 --partition 0^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

上面有4条消息,所以2及之后的消息,应该有2条,为索引2 nnnnnnnnnn,索引3 mmmmmmmm

在这里插入图片描述



kafka灵活之处:

灵活之处, --offset latest ^ 这个设置,默认就是latest ,就是从最新处开始读取、不读取之前的消息,始终读取最新的消息,以前用过的消息就不会管了。
因为 kafka 内部有一个偏移主题来存储每一个分区里面的消息及消息曾经被读取到哪一条(哪个位置)。

更灵活之处:
我们可以通过 --offset 加上指定的索引下标,非常灵活的读取我们想要读取的哪个位置的消息。
从上面的消息监听可以看出,消息是一直保存在分区当中的,意味着消息被消费之后,并没有立即从分区中被删除,还可以被重复的使用,这就是kafka非常灵活的地方。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/303743.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用Trie数据结构实现搜索自动完成功能

本文旨在讨论使用 Java 的搜索自动完成的低级实现&#xff0c;将Trie在用例中使用数据结构。 这是一个示例TrieNode类&#xff1a; class TrieNode{ Map<Character,TrieNode> children; boolean isEndOfWord; TrieNode(){ children new HashMap<>(); isEndOfWor…

强化学习的数学原理学习笔记 - 值函数近似(Value Function Approximation)

文章目录 概览&#xff1a;RL方法分类值函数近似&#xff08;Value function approximation&#xff09;Basic idea目标函数&#xff08;objective function&#xff09;优化算法&#xff08;optimization algorithm&#xff09; Sarsa / Q-learning with function approximati…

IntelliJ IDEA开发工具常规设置、插件、快捷键、Debug和集成工具一篇快速入门

文章目录 常规设置同步设置快捷键&#xff08;Windows&#xff09;搜索层级关系查看光标选择代码定位代码操作Git操作编辑器操作 Debug操作集成本地Git集成本地Maven集成本地Tomcat实用插件 版本说明&#xff1a; 注意&#xff1a;若和上面的IDEA版本差异较大&#xff0c;可能存…

室外投光灯及室内无频闪方案:SM2258E 共模雷击3KV

室外投光灯在建筑物照明中起着非常重要的作用&#xff0c;而室内照明中频闪问题一直是困扰人们的一个难题。而现在&#xff0c;LED驱动芯片SM2258E的出现为这两个问题提供了解决方案。 SM2258E SM2258E是一款先进的LED照明控制芯片&#xff0c;专为高功率LED照明应用而设计。这…

架构训练营,2024年怎么突围进大厂

2024年其实也是内耗和内卷比较严重的一年&#xff0c;可以说从互联网开始内卷的那天开始就不会停止&#xff0c;但是作为技术人&#xff0c;我们如何去和内卷做斗争了&#xff0c;其实最好的武器就是先和自己内卷&#xff0c;这个如何理解了&#xff0c;那就是要要和以前的自己…

第四站:指针的进阶-(二级指针,函数指针)

目录 二级指针 二级指针的用途 多级指针的定义和使用 指针和数组之间的关系 存储指针的数组(指针数组:保存地址值) 指向数组的指针(数组指针) 传参的形式(指针) 数组传参时会退化为指针 void类型的指针 函数指针 定义: 调用:两种方式:(*指针名)(参数地址) 或者 指针…

echarts柱状图加单位,底部文本溢出展示

刚开始设置了半天都不展示单位&#xff0c;后来发现是被挡住了&#xff0c;需要调高top值 // 基于准备好的dom&#xff0c;初始化echarts实例var myChart echarts.init(document.getElementById("echartD"));rankOption {// backgroundColor: #00265f,tooltip: {…

借助 ControlNet 生成艺术二维码 – 基于 Stable Diffusion 的 AI 绘画方案

&#xfeff;背景介绍 在过去的数月中&#xff0c;亚马逊云科技已经推出了多篇 Blog&#xff0c;来介绍如何在亚马逊云科技上部署 Stable Diffusion&#xff0c;或是如何结合 Amazon SageMaker 与 Stable Diffusion 进行模型训练和推理任务。 为了帮助客户快速、安全地在亚马…

解锁前端新潜能:如何使用 Rust 锈化前端工具链

前言 近年来&#xff0c;Rust的受欢迎程度不断上升。首先&#xff0c;在操作系统领域&#xff0c;Rust 已成为 Linux 内核官方认可的开发语言之一&#xff0c;Windows 也宣布将使用 Rust 来重写内核&#xff0c;并重写部分驱动程序。此外&#xff0c;国内手机厂商 Vivo 也宣布…

汉泰克1025G信号发生器二次开发(python和C)

信号发生器&#xff1a;汉泰克1025G SDK开发资料&#xff1a;http://www.hantek.com.cn/products/detail/48 1.python接口 网上已经有大神制作了python的封装接口&#xff1a;https://github.com/AIMAtlanta/Hantek_1025G 这里为了方便查找就再张贴一遍&#xff1a; # -*- c…

升级 Vite 5 出现警告 The CJS build of Vite‘s Node API is deprecated.

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

DOM高级

1.1 自定义属性操作 1.1.1 获取属性值 element.属性 element.getAttribute(属性) 区别&#xff1a; element.属性&#xff1a;获取元素内置属性 element.getAttribute(属性)&#xff1a;获取自定义的属性 1.1.2 设置属性值 element.属性 值 element.setAttribute(属性&a…

多特征变量序列预测(一)——CNN-LSTM风速预测模型

目录 往期精彩内容&#xff1a; 前言 1 多特征变量数据集制作与预处理 1.1 导入数据 1.2 数据集制作与预处理 2 基于Pytorch的CNN-LSTM 预测模型 2.1 定义CNN-LSTM预测模型 2.2 设置参数&#xff0c;训练模型 3 模型评估与可视化 3.1 结果可视化 3.2 模型评估 代码…

11.文件和异常

文件和异常 实际开发中常常会遇到对数据进行持久化操作的场景&#xff0c;而实现数据持久化最直接简单的方式就是将数据保存到文件中。说到“文件”这个词&#xff0c;可能需要先科普一下关于文件系统的知识&#xff0c;但是这里我们并不浪费笔墨介绍这个概念&#xff0c;请大…

柯桥小语种学习,留学韩语 生活日常口语 语法

① N이다/A/V/았ㄹ/을지도 모르다 说不定 이미 도착했을 지도 모르니까 전화해 봐요 说不定已经到了&#xff0c;打电话试试 주말에 세일이 있을지도 모르니까 주말에 가 보자 周末说不定会搞活动&#xff0c;我们周末去吧 ② ㄴ/은/는/았었는/ㄹ/을지 모르다 不知道 처음이…

第四站:C/C++基础-指针

目录 为什么使用指针 函数的值传递&#xff0c;无法通过调用函数&#xff0c;来修改函数的实参 被调用函数需要提供更多的“返回值”给调用函数 减少值传递时带来的额外开销&#xff0c;提高代码执行效率 使用指针前: 使用指针后: 指针的定义: 指针的含义(进阶): 空指针…

4.6 BOUNDARY CHECKS

我们现在扩展了tile矩阵乘法内核&#xff0c;以处理具有任意宽度的矩阵。扩展必须允许内核正确处理宽度不是tile宽度倍数的矩阵。通过更改图4.14中的示例至33 M、N和P矩阵&#xff0c;图4.18创建了矩阵的宽度为3&#xff0c;不是tile宽度&#xff08;2&#xff09;的倍数。图4.…

怎么将营业执照图片转为excel表格?(批量合并识别技巧)

一、为何要将营业执照转为excel表格&#xff1f; 1、方便管理&#xff1a;将营业执照转为excel格式&#xff0c;可以方便地进行管理和整理&#xff0c;快速查找需要的信息。 2、数据处理&#xff1a;Excel可以提供丰富的计算和数据分析功能&#xff0c;转化为excel后方便数据…

【算法设计与分析】网络流

目录 max-flow 和 min-cut流网络 Flow network最小割 Min-cut最大流 Max-flow Greedy algorithmFord–Fulkerson algorithm剩余网络 Residual networkFord–Fulkerson algorithm算法流程 最大流最小割理论 max-flow min-cut theorem容量扩展算法 capacity-scaling algorithm时间…

Rustdesk本地配置文件存在什么地方?

环境&#xff1a; rustdesk1.1.9 Win10 专业版 问题描述&#xff1a; Rustdesk本地配置文件存在什么地方&#xff1f; 解决方案&#xff1a; RustDesk 是一款功能齐全的远程桌面应用。 支持 Windows、macOS、Linux、iOS、Android、Web 等多个平台。 支持 VP8 / VP9 / AV1 …
最新文章