Kafka的简介及架构

目录

消息队列

产生背景

消息队列介绍

常见的消息队列产品

应用场景

 消息队列的消息模型

Kafka的基本介绍

简介

Kafka的架构

Kafka的使用

Kafka的shell命令

Kafka的Python API的操作

完成生产者代码

完成消费者代码


消息队列

产生背景

消息队列:指数据在一个容器中,从容器中一端传递到另一端过程

消息:指的数据,只不过这个这个数据存在一定流动状态

队列:指的容器,可以存储数据,这个容器具备FIFO(先进先出)特性

公共容器的特点:

1.公共性:各个程序都可以与之对接

2.FIFO特性:先进先出

3.具备高效的并发能力:能够承载海量数据

4.具备一定的容错能力:比如支持重新读取消息方案

消息队列介绍

常见的消息队列产品

MQ:message queue消息队列

activeMQ: 出现时期比较早的一款消息队列的中间件产品,在早期使用人群是非常多,目前整个社区活跃度严重下降,使用人群基本很少
rabbitMQ: 此款是目前使用人群比较多的一款消息队列的中间件的产品,社区活跃度比较高,主要是应用传统业务领域中
rocketMQ: 是阿里推出的一款消息队列的中间件的产品,目前主要是在阿里系环境中使用,目前支持的客户端比较少,主要是Java中应用较多
Kafka: Apache旗下的顶级开源消息,是一款消息队列的中间件产品,项目来源于领英,是大数据体系中目前为止最为常用的一款消息队列产品

应用场景

消息队列的应用场景:

1.应用解耦合

2.异步处理

3.限流削峰

4.消息驱动系统

 消息队列的消息模型

在Java中, 为了能够集成消息队列的产品, 专门提供了一个消息队列的协议: JMS(Java Message Server)  java消息服务

消息队列中两个角色:生产者(producer)和消费者(consumer)

生产者:生产/发送消息到消息队列中

消费者:从消息队列中获取消息

在JMS规范中,专门规定了两种消息消费类型:

1.点对点消费类型:一条消息最终只能被一个消费所消费,微信聊天的私聊

2.发布订阅消费模型:指一条消息最终被多个消费者所消费,微信聊天的群聊

Kafka的基本介绍

简介

Kafka是一款消息队列中间件产品,来源于领英公司,后期贡献给了Apache,目前是Apache旗下的顶级开源项目,采用语言是Scala

Kafka的特点:

1.可靠性:Kafka集群是分布式的,有多副本机制,数据可以自动复制

2.可扩展性:Kafka集群可以灵活的调整,在线扩容

3.耐用性:Kafka数据保存在磁盘上,数据有多副本机制,数据持久化,一定程度上防止数据丢失

4.高性能:Kafka可以存储海量的数据,虽然是使用磁盘进行存储,但是Kafka有各种优化手段(例如:磁盘的顺序读写,零拷贝等)提高数据的读写速度(吞吐量)

Kafka的架构

1. Kafka中集群节点叫broker,节点与节点之间没有主从之分,地位是完全一样

2.Topic:主题/话题,是业务层面对消息进行分类的

3.一个Topic可以设置多个分区

4.同一个partition分区可以设置多个副本,但是副本数不能超过(>)集群broker节点的个数

5.broker节点间没有主从之分,但是同一个partition分区的不同副本间有主从之分,分为Leader主副本和Follwer从副本

6.生产者将数据首先发送给到Leader主副本,接着是Leader主副本主动往Follower从副本上同步消息

7.Zookeeper用来管理集群,以及管理元数据信息

8.ISR同步列表,该列表中存放的是与Leader主副本消息同步程度最接近的Follower从副本,也就是消息最小的一个列表,该列表的作用是当Leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follower从副本变成Leader主副本,对外提供服务

相关名词

Kafka Cluster : kafka集群

Topic : 主题/话题

Broker : Kafka中的节点

Producer : 生产者,负责生产/发送消息到Kafka中

Consumer : 消费者,负责从Kafka中获取消息

Partition : 分区,一个Topic可以设置多个分区,没有数量限制

Kafka的使用

Kafka的shell命令

      Kafka本质上是一个消息队列中间件产品,主要负责消息数据的传递,也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据

创建Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic test02 --partitions 4 --replication-factor 2

参数说明:

        --bootstrap-server:kafka集群中broker连接信息

        --create:指定操作类型,这里是新建Topic

        --topic:指定要新建的Topic名称

        --partitions:设置Topic的分区数

        --replicattion-factor:设置Topic分区的的副本数

注意:如果副本数超过了集群broker节点个数,会报错

查看Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --list

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --list: 指定操作类型。这里是查看Kafka集群上所有可用的Topic列表

查看具体Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic test04
参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --describe: 指定操作类型。这里是查看具体Topic信息

模拟生产者Producer

./kafka-console-producer.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04
参数说明:
    --broker-list: Kafka集群中broker连接信息
    --topic: 指定要将消息发送到哪个具体的Topic

 模拟消费者Consumer

./kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --topic test04

参数说明:
    --bootstrap-server: Kafka集群中broker连接信息
    --topic: 指定要从哪个Topic中消费消息
    --from-beginning: 指定该参数以后,会从最旧的地方开始消费
    latest: 消费者(默认)从最新的地方开始消费
    --max-messages: 最多消费的条数。满足条数后,就会自动结束
    --group: 指定消费组名称。一个消费者只能属于一个消费组;一个消费组里面可以有多个消费者。同一个Topic中的同一条数据,只能被同一个消费组中的一个消费者所消费
    
在工作中的参数一般如何使用?
答: 推荐latest、--max-messages、--group一同使用。因为实际企业中Topic的数据量是特别大的,消费、打印都需要消耗服务器的资源,如果不限定消费的最大条数,可能造成服务器宕机。

修改Topic

./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --alter --topic test01 --partitions 10

分区: 只能增大,不能减小。而且没有数量限制
副本: 既不能增大,也不能减小

查看消费组中有多少个消费者

./kafka-consumer-groups.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --group g_01 --members --describe

Kafka的Python API的操作

准备工作:在服务器的节点上安装一个python用于操作Kafka的库

安装命令:
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

API使用的参考文档:
https://kafka-python.readthedocs.io/en/master/usage.html#kafkaproducer

完成生产者代码

import time

from kafka import KafkaProducer

# 同步发送
def sync_send():
    global topic, partition, offset
    # 2.1- 同步发送数据/消息
    metadata = producer.send("test01", value=f"hello_java_{i}".encode("UTF-8")).get()
    # metadata = producer.send("test03",value=f"hello_spark_{i}".encode("UTF-8")).get()
    # 2.2- 获取元信息中的内容
    topic = metadata.topic
    partition = metadata.partition
    """
        offset消息偏移量,从0开始编号。也就是一条消息在分区中的序号/索引
        在不同分区间,消息偏移量是无序
        在同一个分区里面,消息偏移量是有序
    """
    offset = metadata.offset
    print(f"{topic},{partition},{offset},{metadata}")


if __name__ == '__main__':

    # 1- 创建生产者
    producer = KafkaProducer(
        bootstrap_servers=["node1.itcast.cn:9092","node2.itcast.cn:9092"]
    )

    # 2- 发送消息
    for i in range(10):
        # 同步发送
        # sync_send()

        # 2.3- 异步发送
        """
            异步发送,需要等待一下,或者明确关闭Producer生产者
        """
        producer.send("test01", value=f"hello_hive_{i}".encode("UTF-8"))

    time.sleep(1)

    # 3- 释放资源/关闭生产者
    # producer.close()

完成消费者代码

from kafka import KafkaConsumer

if __name__ == '__main__':

    # 1- 创建消费者
    consumer = KafkaConsumer(
        "test01",
        bootstrap_servers=["node1.itcast.cn:9092", "node2.itcast.cn:9092"]
    )

    # 2- 消费消息
    for msg in consumer:
        topic = msg.topic
        partition = msg.partition
        offset = msg.offset
        # key和value消费出来都是bytes数据类型,需要进行解码
        key = msg.key
        value = msg.value

        print(f"{topic},{partition},{offset},{key},{value.decode('UTF-8')},{msg}")

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/311955.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# 微信小程序获取群id

前提 有个需求,需要限制小程序的抽奖只能在某个群内,需要知道谁在群里面,但是微信并没有提供谁在群里面的方法,不过提供了获取群id的方法,这样加上限制分享就能保证群里的参加,即时分享出去了,…

Vue3 中使用 Vuex 和 Pinia 对比之 Vuex的用法

本文基于 Vue3 的 composition API 来展开 Vuex 和 Pinia 的用法比较 Pinia传送门 Vuex传送门 Vuex 状态管理的核心概念 状态- 驱动应用的数据源;视图 - 以声明方式将状态映射到视图;操作 - 响应在视图上的用户输入导致的状态变化 下面是源自Vuex 官…

每日一题——LeetCode1154.一年中的第几天

方法一 列举法: 用一个数组把每个月份的天数都列举出来 判断闰年,是闰年2月份有29天 循环对当前月份之前的月份天数求和 加上当天月份的天数 var dayOfYear function(date) {let year date.slice(0, 4);let month date.slice(5, 7);let day dat…

第十二章Session

第十二章Session 1.什么是Session2.Session的创建与获取3.session域中数据的存取4.Session超时的控制5.浏览器和session之间关联的技术内幕 1.什么是Session 注意:前面的Cookie是保存在客户端,而session是在服务端的 2.Session的创建与获取 这里Session…

2024几个测试接口的好工具,效率加倍~

作为一名后端程序员,一定要对自己写的接口负责,保证接口的正确和稳定性。因此,接口测试也是后端开发中的关键环节。 但我相信,很多朋友是懒得测试接口的,觉得这很麻烦。一般自己写的接口自己都不调用,而是…

RT-Thread入门笔记4-跑马灯线程实例

RT-Thread操作系统是基于线程调度的多任务系统。 线程状态切换 调度过程是一种完全抢占式的基于优先级的调度算法。 支持8/32/256优先级,其中0表示最高,7/31/255表示最低。最低优先级7/31/255优先级用于空闲线程。 支持以相同优先级运行的线程。 共享时…

如何理解线程池中的参数设计

如何理解线程池中的参数设计 你的线程池的参数怎么配置?线程数量设置多少合理?如何确定一个线程池中的人物已经完成了为什么不建议使用java自带的Executors创建线程池线程池里面的阻塞队列设置多少合理? 考察:了解你对技术的掌握…

springboot摄影跟拍预定管理系统源码和论文

首先,论文一开始便是清楚的论述了系统的研究内容。其次,剖析系统需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确系统的需求。然后在明白了系统的需求基础上需要进一步地设计系统,主要包罗软件架构模式、整体功能模块、数据库设计。本项…

如何通过 3 个步骤,管理项目可交付成果?

没有可交付成果,就没有项目。无论是构建软件、公寓、汽车还是其他东西,项目工作都可以定义为实现项目可交付成果。 项目管理中的可交付成果 项目可交付成果是项目要实现的最终结果。“可交付成果 "的内容没有限制,可以是实体产品&…

石大版跳一跳(UPC)

题目描述 还记得微信上那个风靡全国的跳一跳小程序吧,估计曾经也受到不少我校同学的喜爱吧。话说唐克也在玩这款游戏,不过,与一般玩家的境界不一样,唐克并不沉迷于游戏,唐克玩游戏是为了开发游戏,作为中国石…

tiktok_浅谈hook ios之发包x-ss-stub

frida-trace ios手机一部,需要越狱的电脑一台idacrackerXI 目标app: ipa 包,点击前往 密码:8urs 协议分析起始从抓包开始,个人习惯 一般安卓逆向可以直接搜关键词,但是ios 都在 Mach-O binary (reverse…

[JAVA数据结构] 认识 Iterable、Collection、List 的常见方法签名以及含义

目录 (一)Iterable 1. 介绍 2. 常见方法 (二)Collection 1. 介绍 2. 常见方法 (三) List 1. 介绍 2. 常见方法 总结 (一) Iterable 1. 介绍 Iterable接口是Java中的一个接口,它是集合框架中的根接口之一。Iterable接口表示实现了迭代功能,即可以通过迭…

JetCache源码解析——缓存处理

在Java技术体系中,如果想要在不改变已有代码逻辑的情况下,对已有的函数进行功能增强,一般可以使用两种方式,如AOP(Aspect Oriented Programming),即面向切面编程,以及代理模式&#…

最详细手把手教你安装 Vivado2019.2

Vivado 是 FPGA 厂商赛灵思公司(Xilinx)于 2012 年起发布的集成设计环境。 Vivado 2019.2 是 2019 年 Xilinx 推出的 Vivado 最后一个版本,相对稳定,并推出了新式的嵌入式开发平台 Vitis。 软件下载 官网可下载各个版本百度网盘…

Android 通知简介

Android 通知简介 1. 基本通知 图1: 基本通知详情 小图标 : 必须提供,通过 setSmallIcon( ) 进行设置.应用名称 : 由系统提供.时间戳 : 由系统提供,也可隐藏时间.大图标(可选) : 可选内容(通常仅用于联系人照片,请勿将其用于应用图标),通过setLargeIcon( ) 进行设置.标题 : 可选…

一日难再晨及时当勉励 date

文章目录 Linux shell 获取更改系统时间默认输入显示时区世界协调时格式化日期更多信息 Linux shell 获取更改系统时间 … note:: 时光只解催人老,不信多情,长恨离亭,泪滴春衫酒易醒。 - 晏殊《采桑子时光只解催人老》date命令可以用来打印…

12.扩展字典(ExtensionDictionary)

愿你出走半生,归来仍是少年! 环境:.NET FrameWork4.5、ObjectArx 2016 64bit、Entity Framework 6. 在10.扩展数据(XData)中我们讲到每个DbObject有一个XData对象可以存储数据,除此之外每个DbObject对象还有一个ExtensionDictionary(扩展字典)可以进行数据存储。…

P1328 [NOIP2014 提高组] 生活大爆炸版石头剪刀布————C++

目录 [NOIP2014 提高组] 生活大爆炸版石头剪刀布题目背景题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1 样例 #2样例输入 #2样例输出 #2 提示 解题思路Code调用函数的Code(看起来简洁一点)运行结果 [NOIP2014 提高组] 生活大爆炸版石头剪刀布 …

Tensorflow2.0笔记 - Tensor的数据索引和切片

主要涉及的了基础下标索引"[]",逗号",",冒号":",省略号"..."操作,以及gather,gather_nd和boolean_mask的相关使用方法。 import tensorflow as tf import numpy as nptf.__version__tensor tf.random.uniform([1,5,5,3],…

if单分支,二分支,多分支,语句嵌套,while语句,for语句(Python实现)

一、主要目的: 1.熟悉程序设计结构的三种方式 2.掌握if单分支语句、if二分支语句、if多分支语句及if语句嵌套的使用方法 3.掌握while语句的使用方法 4.掌握for语句的使用方法 5.掌握循环嵌套的使用方法 二、主要内容和结果展现: 1&…