PiflowX组件-ReadFromKafka

ReadFromKafka组件

组件说明

从kafka中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”读取数据的topic名。亦支持用分号间隔的topic列表,如 ‘topic-1;topic-2’。" "注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic-1
topic_patternTOPIC_PATTERN“”匹配读取topic名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的topic都将被Kafka consumer订阅。注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic1_*
startup_modeSTARTUP_MODE“”Set(“earliest-offset”, “latest-offset”, “group-offsets”, “timestamp”, “specific-offsets”)Kafka consumer 的启动模式。earliest-offset
schemaSCHEMA“”Kafka消息的schema信息。id:int,name:string,age:int
formatFORMAT“”Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”)用来反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。json
groupGROUP“”Kafka source的消费组id。如果未指定消费组ID,则会使用自动生成的"KafkaSource-{tableIdentifier}"作为消费组ID。group_1
propertiesPROPERTIES“”Kafka source连接器其他配置

ReadFromKafka示例配置

{
  "flow": {
    "name": "DataGenTest",
    "uuid": "1234",
    "stops": [
      {
        "uuid": "0000",
        "name": "DataGen1",
        "bundle": "cn.piflow.bundle.flink.common.DataGen",
        "properties": {
          "schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]",
          "count": "100",
          "ratio": "5"
        }
      },
      {
        "uuid": "1111",
        "name": "WriteToKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "test",
          "schema": "",
          "format": "json",
          "properties": "{}"
        }
      },
      {
        "uuid": "2222",
        "name": "ReadFromKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "test",
          "group": "test",
          "startup_mode": "earliest-offset",
          "schema": "id:int,name:string,age:int",
          "format": "json",
          "properties": "{}"
        }
      },
      {
        "uuid": "3333",
        "name": "ShowData1",
        "bundle": "cn.piflow.bundle.flink.common.ShowData",
        "properties": {
          "showNumber": "5000"
        }
      }
    ],
    "paths": [
      {
        "from": "DataGen1",
        "outport": "",
        "inport": "",
        "to": "WriteToKafka1"
      },
      {
        "from": "WriteToKafka1",
        "outport": "",
        "inport": "",
        "to": "ReadFromKafka1"
      },
      {
        "from": "ReadFromKafka1",
        "outport": "",
        "inport": "",
        "to": "ShowData1"
      }
    ]
  }
}
示例说明

本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。

字段描述
[
    {       
        "filedName": "id",
        "filedType": "INT",
        "kind": "sequence",
        "start": 1,
        "end": 10000
    },
        {       
        "filedName": "name",
        "filedType": "STRING",
        "kind": "random",
        "length": 15
    },
        {       
        "filedName": "age",
        "filedType": "INT",
        "kind": "random",
        "max": 100,
        "min": 1
    } 
]

1.id字段

id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.

2.name字段

name字段类型为STRING,使用random生成器,生成字符长度为15。

3.age字段

age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/278886.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Zookeeper之手写一个分布式锁

前言 我之前写了一篇快速上手ZK的文章:https://blog.csdn.net/qq_38974073/article/details/135293106 本篇最要是进一步加深学习ZK,算是一次简单的实践,巩固学习成果。 设计一个分布式锁 对锁的基本要求 可重入:允许同一个应…

QT上位机开发(掌握一点c++基础)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 c是c语言的补充和扩展,本身的语法构成也是在一直迭代中。相信很多同学上大学读书的时候,或多或少对c语言有所了解&#xff…

python+django网上银行业务综合管理系统vue_bvj8b

本课题主要研究如何用信息化技术改善传统网上银行综合管理行业的经营和管理模式,简化网上银行综合管理的难度,根据管理实际业务需求,调研、分析和编写系统需求文档,设计编写符合银行需要的系统说明书,绘制数据库结构模…

尽量避免删改List

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO 联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬 学习必须往深处挖&…

2024收入最高的编程语言

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版,欢迎购买。点击进入详情 1.Python Python 是最流行、用途最广泛的语言之一。它通常用于网络开发、数据科学、机器学习等。 以下是 Python 编程语言的一些主要用途: Web 开发&…

UE4运用C++和框架开发坦克大战教程笔记(十二)(第37~39集)

UE4运用C和框架开发坦克大战教程笔记(十二)(第37~39集) 37. 延时事件系统38. 协程逻辑优化更新39. 普通按键绑定 37. 延时事件系统 由于梁迪老师是写 Unity 游戏出身的,所以即便 UE4 有自带的 TimeManager 这样的延时…

基于JAVA的考研专业课程管理系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 考研高校模块2.3 高校教师管理模块2.4 考研专业模块2.5 考研政策模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 考研高校表3.2.2 高校教师表3.2.3 考研专业表3.2.4 考研政策表 四、系统展示五、核…

命令行创建Vue项目

Vue项目创建 1. 打开UI界面 在命令行中,执行如下指令: vue ui 2. 打开项目管理器 3. 创建项目 创建项目的过程,需要联网进行,这可能会耗时比较长的时间,请耐心等待。 windows的命令行,容易卡顿&#xff0c…

WPF 漂亮长方体、正文体简单实现方法 Path实现长方体 正方体方案 WPF快速实现长方体、正方体的方法源代码

这段XAML代码在WPF中实现了一个类似长方体视觉效果的图形 声明式绘制:通过Path、PathGeometry和PathFigure等元素组合,能够以声明方式精确描述长方体每个面的位置和形状,无需编写复杂的绘图逻辑,清晰直观。 层次结构与ZIndex控制…

RabbitMQ之快速入门、上手

前言 学习一样新技术、新框架,最重要的是学习其思想、原理。即原理性思维。 如果是因为工作原因,需要快速上手RabbitMQ,本篇或许适合你。 核心概念 Connection:publisher/consumer 和 broker 之间的 TCP 连接Channel…

Hadoop之Yarn 详细教程

1、yarn 的基本介绍和产生背景 YARN 是 Hadoop2 引入的通用的资源管理和任务调度的平台,可以在 YARN 上运行 MapReduce、Tez、Spark 等多种计算框架,只要计算框架实现了 YARN 所定义的 接口,都可以运行在这套通用的 Hadoop 资源管理和任务调…

nodejs+vue+微信小程序+python+PHP的冷链物流配送系统-计算机毕业设计推荐

对于冷链物流信息调度系统所牵扯的管理及数据保存都是非常多的,例如管理员;首页、用户管理(管理员、客户、业务员、配送员)客户管理(货物信息、客户运输单、车辆信息、调度安排)这给管理者的工作带来了巨大…

【机组期末速成】指令系统|机器指令概述|操作数类型与操作类型|寻址方式|指令格式

🎥 个人主页:深鱼~🔥收录专栏:计算机组成原理🌄欢迎 👍点赞✍评论⭐收藏 目录 前言: 一、本章考点总览 二、考点分析 1、以下有关指令系统的说法中错误的是( )。 2…

【电商项目实战】MD5登录加密及JSR303自定义注解

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是Java方文山,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的专栏《电商项目实战》。🎯🎯 &am…

mac安装k8s环境

安装kubectl brew install kubectl 确认一下安装的版本 kubectl version --client 如果想在本地运行kubernetes 需要安装minikube brew install minikube 需要注意安装minikube需要本地的docker服务是启动的 启动 默认连接的是google的仓库 minikube start 指定阿…

下载和安装AD14 - Altium Designer 14.3.20.54863

这个版本应该还支持XP 系统[doge],总之就是想安装一下,没什么特别的意义。 下载 资源来自毛子网站:https://rutracker.net/forum/viewtopic.php?t5140739,带上个网页翻译插件就行。要用磁力链接下载,推荐用qbittorr…

远程网络唤醒家庭主机(openwrt设置)

远程网络唤醒家庭主机(openwrt设置) 前提: 1.配置好主板bios的网络唤醒功能(网络教程自己百度一下找) 2.电脑开启网络唤醒功能(网络教程自己百度一下找) 3.路由器通过ddns实现域名和动态IP绑定内网穿透方法汇总_不修改光猫进行内网穿透-C…

最新AI系统ChatGPT网站H5系统源码,支持AI绘画,GPT语音对话+ChatFile文档对话总结+DALL-E3文生图

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

代码质量评价及设计原则

1.评价代码质量的标准 1.1 可维护性 可维护性强的代码指的是: 在不去破坏原有的代码设计以及不引入新的BUG的前提下,能够快速的修改或者新增代码. 不易维护的代码指的是: 在添加或者修改一些功能逻辑的时候,存在极大的引入新的BUG的风险,并且需要花费的时间也很长. 代码可…

如何让python在手机上运行,python程序在手机上运行

大家好,给大家分享一下python怎么在手机上运行爱心代码,很多人还不知道这一点。下面详细解释一下。现在让我们来看看! 1. 写在前面的话 天天都在PC端运行Python代码的我,今天突然灵光一现,想着是不是能够在移动端运行P…
最新文章