「Kafka」入门篇

「Kafka」入门篇

基础架构

image-20231208182616047

image-20231208182131415

Kafka 快速入门

集群规划

image-20231227110547650

集群部署

官方下载地址:http://kafka.apache.org/downloads.html

  1. 解压安装包:

    [atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
    
  2. 修改解压后的文件名称:

    [atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka
    
  3. 进入到 /opt/module/kafka 目录,修改配置文件

    [atguigu@hadoop102 kafka]$ cd config/
    [atguigu@hadoop102 config]$ vim server.properties
    

    输入以下内容:

    # broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=0
    # 处理网络请求的线程数量
    num.network.threads=3
    # 用来处理磁盘 IO 的线程数量
    num.io.threads=8
    # 发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    # 接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    # 请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    # kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/opt/module/kafka/datas
    # topic 在当前 broker 上的分区个数
    num.partitions=1
    # 用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    # segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    # 每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    # 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
    
  4. 分发安装包

    [atguigu@hadoop102 module]$ xsync kafka/
    
  5. 分别在 hadoop103 和 hadoop104 上修改配置文件 /opt/module/kafka/config/server.properties 中的 broker.id=1broker.id=2

    注:broker.id 不得重复,整个集群中唯一。

    image-20231227111302720

  6. 配置环境变量

    • /etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置

      [atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh
      
    • 增加如下内容:

      #KAFKA_HOME
      export KAFKA_HOME=/opt/module/kafka
      export PATH=$PATH:$KAFKA_HOME/bin
      
    • 刷新一下环境变量:

      [atguigu@hadoop102 module]$ source /etc/profile
      
    • 分发环境变量文件到其他节点,并 source:

      [atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
      [atguigu@hadoop103 module]$ source /etc/profile
      [atguigu@hadoop104 module]$ source /etc/profile
      
  7. 启动集群

    • 先启动 Zookeeper 集群,然后启动 Kafka:

      [atguigu@hadoop102 kafka]$ zk.sh start 
      
    • 依次在 hadoop102、hadoop103、hadoop104 节点上启动 Kafka:

      [atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
      [atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
      [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
      

      注意:配置文件的路径要能够到server.properties

  8. 关闭集群

    [atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh 
    [atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh 
    [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh 
    

集群启停脚本

  1. /home/atguigu/bin 目录下创建文件 kf.sh 脚本文件

    [atguigu@hadoop102 bin]$ vim kf.sh
    

    脚本如下:

    #! /bin/bash
    case $1 in
    "start"){
    	for i in hadoop102 hadoop103 hadoop104
    	do
    		echo " --------启动 $i Kafka-------"
    		ssh  $i  "/opt/module/kafka/bin/kafka-server-start.sh  -daemon /opt/module/kafka/config/server.properties"
    	done
    };;
    "stop"){
    	for i in hadoop102 hadoop103 hadoop104
    	do
    		echo " --------停止 $i Kafka-------"
    		ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    	done
    };;
    esac
    
  2. 添加执行权限

    [atguigu@hadoop102 bin]$ chmod +x kf.sh
    
  3. 启动集群命令

    [atguigu@hadoop102 ~]$ kf.sh start
    
  4. 停止集群命令

    [atguigu@hadoop102 ~]$ kf.sh stop
    

image-20231227113030694

Kafka 命令行操作

image-20231227113215848

主题命令行操作
  1. 查看操作主题命令参数

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh
    

    image-20231227113437850

  2. 查看当前服务器中的所有 topic

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
    
  3. 创建 first topic

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
    
    选项说明:
    --topic:定义 topic 名
    --replication-factor:定义副本数
    --partitions:定义分区数
    
  4. 查看 first 主题的详情

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
    hadoop102:9092 --describe --topic first
    
  5. 修改分区数(注意:分区数只能增加,不能减少)

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
    
  6. 再次查看 first 主题的详情

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
    
  7. 删除 topic

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
    
生产者命令行操作
  1. 查看操作生产者命令参数

    [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
    

    image-20231227113828506

  2. 发送消息

    [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    >hello world
    >atguigu atguigu
    
消费者命令行操作
  1. 查看操作消费者命令参数

    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh
    

    image-20231227113934044

    image-20231227113944307

  2. 消费消息

    • 消费 first 主题中的数据:

      [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
      
    • 把主题中所有的数据都读取出来(包括历史数据):

      [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
      

笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/274690.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入浅出:分布式、CAP 和 BASE 理论(荣耀典藏版)

大家好,我是月夜枫,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。 在计算机科学领域,分布式系统是一门极具挑战性的研究方向,也是互联网应用中必不可少的优化实践&…

网站显示不安全警告怎么办?消除网站不安全警告超全指南

网站显示不安全警告怎么办?当用户访问你的网站,而您的网站没有部署SSL证书实现HTTPS加密时,网站就会显示不安全警告,这种警告,不仅有可能阻止用户继续浏览网站,影响网站声誉,还有可能影响网站在…

easycython和cython将py编译为pyd对比

前提了解 为了实验的准确性,在全过程使用的python环境版本都为同一版本 easycython和cython编译为pyd文件的不同在于,easycython编译的原始文件后缀为pyx,cython编译的原始文件为py 1.cython 1.1原始文件 def ZWHCythonTest():print(&qu…

中职网络安全Server2002——Web隐藏信息获取

B-2:Web隐藏信息获取 任务环境说明: 服务器场景名:Server2002(关闭链接)服务器场景用户名:未知 有问题需要环境加q 通过本地PC中渗透测试平台Kali使用Nmap扫描目标靶机HTTP服务子目录,将扫描子…

【15.5K⭐】QuickLook—文件快速预览工具

【15.5K⭐】QuickLook—文件快速预览工具 在我们日常的工作和生活中,经常会遇到需要快速查看不同类型文件内容的情况。这可能包括文档、图片、视频、音频甚至是代码文件。在这种情况下,为了省去了打开文件或应用等繁琐步骤,一款高效的文件预…

JAVA语言—AOP基础

1、AOP概述 AOP:AOP(Aspect Oriented Programming),即面向切面编程,可以说是OOP(Object Oriented Programming,面向对象编程)的补充和完善。 场景:案例部分功能运行较慢&…

Springboot+vue的医疗报销系统(有报告),Javaee项目,springboot vue前后端分离项目

演示视频: Springbootvue的医疗报销系统(有报告),Javaee项目,springboot vue前后端分离项目 项目介绍: 本文设计了一个基于Springbootvue的前后端分离的医疗报销系统,采用M(model&a…

Windows/Linux环境登入mysql、mysqldump命令等多方式解决方案之简易记录

Windows/Linux环境登入mysql、mysqldump命令等多方式解决方案之简易记录 之前发布过Window方式,这次结合以上主题,完善下Linux相关登入方式过程,纯属做个记录,有需要的朋友可以做个学习参考。 一、Windows环境提示“‘mysql’ 不是内部或外部命令,也不是可运行的程序或批…

Flask 账号管理列表

Flask 账号管理列表 web/controllers/account/Account.py /index route_account Blueprint( account_page,__name__ )route_account.route( "/index" ) def index():resp_data {}req request.valuespage int( req[p] ) if ( p in req and req[p] ) else 1qu…

详细讲解Java使用EasyExcel函数来操作Excel表(附实战)

目录 前言1. EasyExcel类2. 原理分析3. demo4. 实战 前言 前阵时间好奇下载Excel,特意学习实战了该功能:详细讲解Java使用HSSFWorkbook函数导出Excel表(附实战) 现在发觉还有个EasyExcel也可专门用来读写Excel表 1. EasyExcel类…

vue3+luckyexcel+php在线编辑excel文件

开发过程中,需要开发一个在线编辑excel文档的功能,找到了这个合适的组件 Luckysheet ,一款纯前端类似excel的在线表格,功能强大、配置简单、完全开源。 可以导入文档,预览、编辑、保存、导出等功能,可以满…

关于“Python”的核心知识点整理大全45

目录 15.4.6 绘制直方图 die_visual.py 注意 15.4.7 同时掷两个骰子 dice_visual.py 15.4.8 同时掷两个面数不同的骰子 different_dice.py 15.5 小结 第 16 章 16.1 CSV 文件格式 16.1.1 分析 CSV 文件头 highs_lows.py 注意 16.1.2 打印文件头及其位置 highs_l…

电影“AI化”已成定局,华为、小米转战入局又将带来什么?

从华为、Pika、小米等联合打造电影工业化实验室、到Pika爆火,再到国内首部AI全流程制作《愚公移山》开机……业内频繁的新动态似乎都在预示着2023年国内电影开始加速进入新的制片阶段,国内AI电影热潮即将来袭。 此时以华为为首的底层技术科技企业加入赛…

OCP NVME SSD规范解读-2.复位与控制器配置要求-part2

Maximum Data Transfer Size (MDTS):设备应支持至少256KB的最大数据传输大小。 CSTS.CFS Reporting: 设备固件应支持报告CSTS.CFS(Controller Status and Capabilities Field in the Status Register)。 Queue Depths: 每个提交队列的SQ最小…

钉钉机器人接入定时器(钉钉API+XXL-JOB)

钉钉机器人接入定时器(钉钉APIXXL-JOB) 首先需要创建钉钉内部群 在群设置中找到机器人选项 选择“自定义”机器人 通过Webhook接入自定义服务 创建完成后会生成一个send URL和一个加签码 下面就是干货 代码部分了 DingDingUtil.sendMessageByText(webho…

Java EasyExcel 导入代码

Java EasyExcel 导入代码 导入方法 /*** 仓库库位导入** param req* param res* param files* throws Exception*/RequestMapping(value {"/import/line_store_locs"}, method {RequestMethod.POST})ResponseBodypublic void importStoreLoc(HttpServletRequest …

工具系列:TimeGPT_(8)使用不规则时间戳进行时间序列预测

文章目录 介绍不规则时间戳的单变量时间预测不规则时间戳的外生变量时间预测 介绍 在处理时间序列数据时,时间戳的频率是一个关键因素,可以对预测结果产生重大影响。像每日、每周或每月这样的常规频率很容易处理。然而,像工作日这样的不规则…

Linux磁盘与文件系统管理

在linux系统中使用硬盘 建立分区 安装文件系统 挂载 磁盘的数据结构 磁盘:扇区固定大小,每个扇区4k。磁盘会进行磨损,损失生命周期。 扇区 磁道 柱面 磁盘接口类型 ide SATA SAS SCSI SCSI 设备类型 块设备:block …

2024年HTML+CSS+JS 网页版烟花代码

💂 个人网站:【 海拥】【神级代码资源网站】【办公神器】🤟 基于Web端打造的:👉轻量化工具创作平台💅 想寻找共同学习交流的小伙伴,请点击【全栈技术交流群】 直接跳到末尾 获取完整源码 在线体验地址&…

分支指令的方向预测

对于分支指令来说,它的方向只有两个:发生跳转(taken)和不发生跳转(nottaken),因此可以用1 和0 来表示。 很多分支指令的方向是有规律可循的。 方式一:last-outcom prediction 其准确度,无法接受; 方式二:基于两位饱和计数器的分…