【数据采集与预处理】数据接入工具Kafka

目录

一、Kafka简介

(一)消息队列

(二)什么是Kafka

二、Kafka架构

三、Kafka工作流程分析

(一)Kafka核心组成

(二)写入流程

(三)Zookeeper 存储结构

(四)Kafka 消费过程

四、Kafka准备工作

(一)Kafka安装配置

(二)启动Kafka

(三)测试Kafka是否正常工作

五、编写Spark Streaming程序使用Kafka数据源


一、Kafka简介

(一)消息队列

消息队列内部实现原理

1、点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
        点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

2、发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
        发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

(二)什么是Kafka

        Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

在流式计算中,Kafka 一般用来缓存数据,Storm 通过消费 Kafka 的数据进行计算。
1、Apache Kafka 是一个开源消息系统。是由 Apache 软件基金会开发的一个开源消息系统项目。
2、Kafka 最初是由 LinkedIn 公司开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
3、Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个实例(server)称为 broker。
4、无论是 kafka 集群,还是 consumer 都依赖于 zookeeper 集群保存一些 meta 信息,来保证系统可用性。

二、Kafka架构

1、Producer :消息生产者,就是向 kafka broker 发消息的客户端;
2、Consumer :消息消费者,向 kafka broker 取消息的客户端;
3、Topic :可以理解为一个队列;
4、Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 partion 只会把消息发给该 CG 中的一个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic;
5、Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic;
6、Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给consumer,不保证一个 topic 的整体(多个 partition 间)的顺序;
7、Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka。

三、Kafka工作流程分析

(一)Kafka核心组成

(二)写入流程

Producer写入流程:

1)producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
2)producer 将消息发送给该 leader
3)leader 将消息写入本地 log
4)followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
5)leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK

(三)Zookeeper 存储结构

注意:producer 不在 zk 中注册,消费者在 zk 中注册。 

(四)Kafka 消费过程

消费者组:

        消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
        在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。

四、Kafka准备工作

(一)Kafka安装配置

1、到官网下载jar包,保存至“/usr/local/uploads”目录下。

Apache Kafkaicon-default.png?t=N7T8https://kafka.apache.org/downloads

2、解压安装Kafka,并重命名解压后的文件夹。

[root@bigdata uploads]# tar -zxvf kafka_2.11-0.8.2.2.tgz -C /usr/local
[root@bigdata uploads]# cd ..
[root@bigdata local]# mv kafka_2.11-0.8.2.2/ kafka

3、配置Spark环境

[root@bigdata local]# cd ./spark/conf
[root@bigdata conf]# vi spark-env.sh

在文件的第一行接着添加如下内容: 

:/usr/local/spark/examples/jars/*:/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*

接着,在“/usr/local/spark/jars”目录下新建文件夹kafka,并将“/usr/local/kafka/libs/”目录下的所有jar包都拷贝到“/usr/local/spark/jars/kafka”目录下。

[root@bigdata spark]# cd /usr/local/spark/jars
[root@bigdata jars]# mkdir kafka
[root@bigdata jars]# cd kafka
[root@bigdata kafka]# cp /usr/local/kafka/libs/* .

然后,将“/usr/local/uploads/”下的spark-streaming-kafka-0-8_2.11-2.4.0.jar包也拷贝到“/usr/local/spark/jars/kafka”目录下。

[root@bigdata kafka]# cp /usr/local/uploads/spark-streaming-kafka-0-8_2.11-2.4.0.jar .

spark-streaming-kafka-0-8_2.11-2.4.0.jar的下载地址:

http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.4.0

下图是拷贝完成后的“/usr/local/spark/jars/kafka”目录下的所有jar包。

这样,Spark环境就配好了。

(二)启动Kafka

1、启动Zookeeper服务

打开一个终端,输入下面命令启动Zookeeper服务:

[root@bigdata kafka]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/zookeeper-server-start.sh config/zookeeper.properties

千万不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。

2、启动Kafka服务

打开第二个终端,然后输入下面命令启动Kafka服务:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# bin/kafka-server-start.sh config/server.properties

千万不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了

(三)测试Kafka是否正常工作

再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的Topic:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/kafka-topics.sh  --create  --zookeeper  localhost:2181 --replication-factor  1  --partitions  1  --topic  wordsendertest
#可以用list列出所有创建的Topic,验证是否创建成功
[root@bigdata kafka]# ./bin/kafka-topics.sh  --list  --zookeeper  localhost:2181

replication-factor:每个partition的副本个数 

下面用生产者(Producer)来产生一些数据,请在当前终端(记作“数据源终端”)内继续输入下面命令:

[root@bigdata kafka]# ./bin/kafka-console-producer.sh  --broker-list  localhost:9092  --topic  wordsendertest

上面命令执行后,就可以在当前终端内用键盘输入一些英文单词,比如可以输入:

hello hadoop

hello spark

现在可以启动一个消费者,来查看刚才生产者产生的数据。请另外打开第四个终端,输入下面命令:

[root@bigdata zhc]# cd /usr/local/kafka
[root@bigdata kafka]# ./bin/kafka-console-consumer.sh  --zookeeper  localhost:2181  --topic  wordsendertest  --from-beginning

可以看到,屏幕上会显示出如下结果,也就是刚才在另外一个终端里面输入的内容:

五、编写Spark Streaming程序使用Kafka数据源

在“/home/zhc/mycode/”路径下新建文件夹sparkstreaming,再在该文件夹下新建py文件KafkaWordCount.py。

#/home/zhc/mycode/sparkstreaming/KafkaWordCount.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: KafkaWordCount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)
    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

新建一个终端(记作“流计算终端”),执行KafkaWordCount.py,命令如下:

[root@bigdata zhc]# cd /home/zhc/mycode
[root@bigdata mycode]# mkdir sparkstreaming
[root@bigdata mycode]# cd sparkstreaming
[root@bigdata sparkstreaming]# vi KafkaWordCount.py
[root@bigdata sparkstreaming]# spark-submit KafkaWordCount.py localhost:2181 wordsendertest

这时再切换到之前已经打开的“数据源终端”,用键盘手动敲入一些英文单词,在流计算终端内就可以看到类似如下的词频统计动态结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/276373.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

检测如下MHA运行条件【踩坑记录】

【masterha_check_ssh --conf/etc/mha/app1.cnf&#xff1a;SSH免密登录】 【错误信息1】 [error][/usr/share/perl5/vendor_perl/MHA/SSHCheck.pm, ln111] SSH connection from root10.0.0.53(10.0.0.53:22) to root10.0.0.51(10.0.0.51:22) failed! 【错误反馈】就是服务器…

2023年山东省高职组区块链技术竞赛任务书

2023年山东省高职组区块链技术任务书 目录 模块一&#xff1a;区块链产品方案设计及系统运维 任务1-1&#xff1a;区块链产品需求分析与方案设计 任务1-2&#xff1a;区块链系统部署与运维 任务1-3&#xff1a;区块链系统测试 模块二&#xff1a;智能合约开发与测试 任务2-1&am…

DRF从入门到精通六(排序组件、过滤组件、分页组件、异常处理)

文章目录 一、排序组件继承GenericAPIView使用DRF内置排序组件继承APIView编写排序 二、过滤组件继承GenericAPIView使用DRF内置过滤器实现过滤使用第三方模块django-filter实现and关系的过滤自定制过滤类排序搭配过滤使用 三、分页组件分页器一&#xff1a;Pagination&#xf…

Unity 创建/删除/启用/禁用组件的惯用方法

1、创建组件&#xff1a; Unity 创建组件可以通过编辑器中的"Add Component"创建&#xff0c;或者代码动态创建&#xff1a;GameObject.AddComponent<T>()&#xff0c;如&#xff1a; ameObject.AddComponent<Rigidbody>(); 2、删除组件&#xff1a; …

中介者模式-Mediator Pattern-1

如果在一个系统中对象之间的联系呈现为网状结构&#xff0c; 对象之间存在大量的多对多联系&#xff0c;将导致系统非常复杂。 这些对象既会影响别的对象&#xff0c;也会被别的对象所影响。 这些对象称为同事对象&#xff0c;它们之间通过彼此的相互作用实现系统的行为。 在网…

同义词替换降低论文相似度的注意事项 papergpt

大家好&#xff0c;今天来聊聊同义词替换降低论文相似度的注意事项&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff0c;可以借助此类工具&#xff1a; 标题&#xff1a;同义词替换降低论文相似度的注意事项 …

【Java系列】多线程案例学习——基于阻塞队列实现生产者消费者模型

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【Java系列专栏】【JaveEE学习专栏】 本专栏旨在分享学习JavaEE的一点学习心得&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录…

案例260:基于微信小程序的签到系统的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder …

git 常用命令总结

git 工作原理图&#xff1a; git 常用命令及解释: 命令解释例子git init在当前目录初始化一个新的 Git 仓库。git initgit clone <repository>克隆一个远程仓库到本地。git clone https://github.com/example/repository.gitgit add <file>将文件的变化添加到暂存…

分享44个PyQt5源码总有一个是你想要的

分享44个PyQt5源码总有一个是你想要的 学习知识费力气&#xff0c;收集整理更不易。 知识付费甚欢喜&#xff0c;为咱码农谋福利。 链接&#xff1a;https://pan.baidu.com/s/1_5H_0Ydg0XUa1fz5Jok51Q?pwd6666 提取码&#xff1a;6666 项目名称 B站直播弹幕姬&#xff…

哪种猫粮比较好?怎样囤性价比高的主食冻干品牌 ?

在过去的100多年里&#xff0c;猫咪主食市场一直被膨化猫粮主导。然而&#xff0c;随着猫咪频频出现猝死、失明、发育不良以及营养不良等问题&#xff0c;猫主人们开始质疑膨化粮是否最适合猫咪。于是&#xff0c;从上世纪90年代开始&#xff0c;出现了生骨肉喂养。生骨肉确实是…

XV7081BB陀螺仪传感器

XV7081BB是一款用于自动化机器的数字输出型陀螺仪传感器&#xff0c;具有卓越的性能&#xff0c;尤其是偏置输出稳定性和低噪声。特点如下&#xff1a; 优良的偏置温度系数为0.0024(/s)/C。 ●低角度随机游走0.065/√h ●SPI/IC串行接口 集成的用户可选择的数字滤波器 ●角…

E : DS查找—二叉树平衡因子

Description 二叉树用数组存储&#xff0c;将二叉树的结点数据依次自上而下,自左至右存储到数组中&#xff0c;一般二叉树与完全二叉树对比&#xff0c;比完全二叉树缺少的结点在数组中用0来表示。 计算二叉树每个结点的平衡因子&#xff0c;并按后序遍历的顺序输出结点的平衡…

mybatisX自动生成sql语句,尝试测试方法报错

今天我使用mybatisx自定义mapper方法生成sql语句后&#xff0c;在测试时报错 错误是MyBatis 无法找到映射的语句&#xff08;Statement&#xff09;引起的 我是这样操作的&#xff0c;在mapper接口自定义了一个方法 然后alt加enter&#xff0c;自动生成sql 结果 mapper.xml文件…

31.Java程序设计-基于Springboot的鲜花商城系统的设计与实现

引言 背景介绍&#xff1a;鲜花商城系统的兴起和发展。研究目的&#xff1a;设计并实现一个基于Spring Boot的鲜花商城系统。论文结构概述。 文献综述 回顾相关鲜花商城系统的设计与实现。分析不同系统的优缺点。强调Spring Boot在系统设计中的优越性。 系统设计 需求分析 用户…

css中sprite(css精灵)是什么,有什么优缺点

概念 将多个小图片拼接到一个图片中 。通过 background-position 和元素尺寸调节需要显示的背景图案。 优点 减少 HTTP 请求数&#xff0c;极大地提高页面加载速度 增加图片信息重复度&#xff0c;提高压缩比&#xff0c;减少图片大小 更换⻛格方便&#xff0c; 只需在一张或…

mysql保姆安装教程

一.下载install文件 1.进入Mysql官网&#xff0c;点击下载 2.选择MySQL Installer for Windows 3.推荐选择第二个安装包 4.不登陆&#xff0c;开始下载 5.等待下载完成 二.安装前的配置 通过电脑“设置”&#xff0c;检查电脑是否包含中文名&#xff0c;如果包含请重命名 …

生活常识-如何开社保证明(四川)

下载并打开天府市民云APP 注册后登陆 点击社保服务 点击社保证明 点击【四川省社会保险个人社保证明名(近24个月)】 点击下载 下载后点击【QQ发送给好友&#xff0c;然后发送给自己的电脑设备(我的电脑)】

通过C++程序实现光驱的自动化刻录和读取

文章目录 ISO文件格式光盘的基本概念光盘种类特点DVDR光盘使用windows调用Linux调用Linux平台下用到的C库:读取设备驱动列表向光驱中写文件 数字存储媒体快速发展的今天&#xff0c;光驱的使用已经不像以前那样普及了。但是在数据备份、安装软件和操作系统、旧设备兼容等领域还…

LTPI协议的理解——LTPI协议的定义和结构

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 LTPI协议的理解——LTPI协议的定义和结构 定义DC-SCM 2.0 LTPI 结构GPIO通道I2C/SMBus通道Uart通道OEM通道数据通道 总结 定义 LTPI (LVDS Tunneling Protocol & Interf…
最新文章