Spark-Streaming核心编程

Kafka命令行的使用

创建topic

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic test1 --partitions 3 --replication-factor 3

分区数量,副本数量,都是必须的。

数据的形式:

主题名称-分区编号。

在Kafka的数据目录下查看

设定副本数量,不能大于broker的数量。

查看所有的topic(list)

kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181

查看某个topic的详细信息(describe)

kafka-topics.sh --describe --zookeeper node01:2181,node02:2181,node03:2181 --topic test1

删除topic(delete)

kafka-topics.sh --delete --zookeeper node01:2181,node02:2181,node03:2181 --topic test1

生产数据
 

使用 Kafka 生产数据的命令

kafka-console-producer.sh:

指定broker

指定topic

写数据的命令:

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test1

注意:写数据,实际上就是写log, 追加日志。

可在kafka的/root/kafkadata目录下查看分区中log。

每一条数据,只存在于当前主题的一个分区中,所有的副本中,都有数据。

消费数据

使用 Kafka 消费数据的命令

kafka-console-consumer.sh --topic test1 --bootstrap-server node01:9092,node02:9092,node03:9092

注意: 此命令会从日志文件中的最后的位置开始消费。

如果想从头开始消费:

kafka-console-consumer.sh --topic test1 --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning

会从头(earliest)开始读取数据。

读取数据时,分区间的数据是无序的,分区中的数据是有序。

如果想指定groupid,可以通过参数来指定:

kafka-console-consumer.sh --topic test1 --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --consumer-property group.id=123

一个topic中的数据,只能被一个groupId所属的consumer消费一次。(记录偏移量)

DStream创建

Kafka数据源:

DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。

Kafka 0-10 Direct 模式

需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

导入依赖

编写代码

开启Kafka集群

开启Kafka生产者,产生数据

最后运行程序,接收Kafka生产的数据并进行相应处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/139.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring AI - Redis缓存对话

先看效果 对话过程被缓存到了Redis 中。 原理 在上一节我们快速入门了SpringAI,具体文章请查看:快速入门Spring AI 创建 ChatClient 的代码如下: this.chatClient ChatClient.builder(chatModel).defaultSystem(DEFAULT_PROMPT).defaultAd…

图像预处理-模板匹配

就是用模板图在目标图像中不断的滑动比较,通过某种比较方法来判断是否匹配成功,找到模板图所在的位置。 - 不会有边缘填充。 - 类似于卷积,滑动比较,挨个比较象素。 - 返回结果res大小是:目标图大小-模板图大小1(H-…

【算法笔记】动态规划基础(一):dp思想、基础线性dp

目录 前言动态规划的精髓什么叫“状态”动态规划的概念动态规划的三要素动态规划的框架无后效性dfs -> 记忆化搜索 -> dp暴力写法记忆化搜索写法记忆化搜索优化了什么?怎么转化成dp?dp写法 dp其实也是图论首先先说结论:状态DAG是怎样的…

网络原理————HTTP

1,HTTP简介 我们上一期谈到了网络编程尤其是TCP和UDP,使用网络套接字来实现网络编程,上一期忘记说了,我们使用TCP的时候,我们用了线程池,这样就可以处理很多客户端而不会阻塞,那么如果客户端一…

rust编程学习(三):8大容器类型

1简介 rust标准库std::collections也提供了像C STL库中的容器&#xff0c;分为4种通用的容器&#xff0c;8种类型&#xff0c;如下表所示。 线性容器类型&#xff1a; 名称简介Vec<T>内存空间连续&#xff0c;可变长度的数组&#xff0c;类似于C中Vector<T>容器…

Javase 基础入门 —— 02 基本数据类型

本系列为笔者学习Javase的课堂笔记&#xff0c;视频资源为B站黑马程序员出品的《黑马程序员JavaAI智能辅助编程全套视频教程&#xff0c;java零基础入门到大牛一套通关》&#xff0c;章节分布参考视频教程&#xff0c;为同样学习Javase系列课程的同学们提供参考。 01 注释 单…

【 React 】重点知识总结 快速上手指南

react 是 facebook 出的一款针对视图层的库。react 使用的是单向数据流的机制 React 官方中文文档 基础 api 和语法 jsx 语法 就是在 js 中插入 html 片段 在 React 中所有的组件都是 function 组件定义 function 定义组件 就是使用 function 定义组件 任何一个 function …

C++:继承

目录 一&#xff1a;继承的概念 1.1 继承的定义 1.2 继承方式 1.3 可见性区别 公有方式 私有方式 保护方式 1.4 一般规则 二、继承中的隐藏规则 三、基类和派生类间的转换 四、派生类的默认成员函数 实现一个不能被继承的类 继承与友元 五、继承与静态成员 六、多…

十二种存储器综合对比——《器件手册--存储器》

存储器 名称 特点 用途 EEPROM 可电擦除可编程只读存储器&#xff0c;支持按字节擦除和写入操作&#xff0c;具有非易失性&#xff0c;断电后数据不丢失。 常用于存储少量需要频繁更新的数据&#xff0c;如设备配置参数、用户设置等。 NOR FLASH 支持按字节随机访问&…

C语言高频面试题——malloc 和 calloc区别

在 C 语言中&#xff0c;malloc 和 calloc 都是用于动态内存分配的函数&#xff0c;但它们在 内存初始化、参数形式 和 使用场景 上有显著区别。以下是详细的对比分析&#xff1a; 1. 函数原型 malloc void* malloc(size_t size);功能&#xff1a;分配 未初始化 的连续内存块…

Qt -对象树

博客主页&#xff1a;【夜泉_ly】 本文专栏&#xff1a;【暂无】 欢迎点赞&#x1f44d;收藏⭐关注❤️ 目录 前言构造QObject::QObjectQObjectPrivate::setParent_helper 析构提醒 #mermaid-svg-FTUpJmKG24FY3dZY {font-family:"trebuchet ms",verdana,arial,sans-s…

JavaScript与TypeScript

TypeScript 和 JavaScript 都是用于构建 Web 应用的编程语言&#xff0c;但它们有着不同的设计目标和特性。 一、JavaScript 1. 定义与特点 动态脚本语言&#xff1a;由 Brendan Eich 在 1995 年创建&#xff0c;最初用于浏览器端的交互逻辑。弱类型/动态类型&#xff1a;变量…