Flink 客户端操作命令及可视化工具

Flink提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从Flink命令行、Scala ShellSQL ClientRestful APIWeb五个方面进行整理。

Flink安装目录的bin目录下可以看到flinkstart-scala-shell.shsql-client.sh等文件,这些都是客户端操作的入口。
[点击并拖拽以移动] ​

flink 常见操作:可以通过 -help 查看帮助

run 运行任务

-d:以分离模式运行作业
-c:如果没有在jar包中指定入口类,则需要在这里通过这个参数指定;
-m:指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager,可以说是yarn集群名称;
-p:指定程序的并行度。可以覆盖配置文件中的默认值;
-s:保存点savepoint的路径以还原作业来自(例如hdfs:///flink/savepoint-1537);

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID dce7b69ad15e8756766967c46122736f

就可以看到我们提交的JobManager,默认是一个并发。
[点击并拖拽以移动] ​

点进去就可以看到详细的信息
[点击并拖拽以移动] ​

点击左侧TaskManager —Stdout能看到具体输出的日志信息。
[点击并拖拽以移动] ​

或者查看TaskManager节点的log目录下的*.out文件,也能看到具体的输出信息。
[点击并拖拽以移动] ​

list 查看任务列表

-mjobmanager<arg>作业管理器(主)的地址连接。

[root@hadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop 停止任务

需要指定jobmanagerip:protjobId。如下报错可知,一个job能够被stop要求所有的source都是可以stoppable的,即实现了 StoppableFunction接口。

[root@hadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f
Suspending job "dce7b69ad15e8756766967c46122736f" with a savepoint.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job "dce7b69ad15e8756766967c46122736f".
    at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)

StoppableFunction接口如下,属于优雅停止任务。

 /**
 * @Description 需要 stoppabel 的函数必须实现此接口,例如流式任务 source*
 *               stop() 方法在任务收到 stop信号的时候调用
 *               source 在接收到这个信号后,必须停止发送新的数据优雅的停止。
 * @Date 2020/7/9 17:26
 */
 @PublicEvolving
 public interface StoppableFunction {
     /**
     * 停止 source,与 cancel() 不同的是,这是一个让 source优雅停止的请求。
     * 等待中的数据可以继续发送出去,不需要立即停止
    */
    void stop();
}

Cancel 取消任务

如果在conf/flink-conf.yaml里面配置state.savepoints.dir,会保存savepoint,否则不会保存savepoint。(重启)

state.savepoints.dir: file:///tmp/savepoint

执行 Cancel命令 取消任务

[root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory.
Cancelled job e8ce0d111262c52bf8228d5722742d47. Savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.

也可以在停止的时候显示指定savepoint目录

1 [root@hadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353
DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use "stop" instead.
Cancelling job f58bb4c49ee5580ab5f27fdb24083353 with savepoint to /tmp/savepoint.
Cancelled job f58bb4c49ee5580ab5f27fdb24083353. Savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.

取消和停止(流作业)的区别如下:
cancel()调用, 立即调用作业算子的cancel()方法,以尽快取消它们。如果算子在接到cancel()调用后没有停止,Flink将开始定期中断算子线程的执行,直到所有算子停止为止。
stop()调用 ,是更优雅的停止正在运行流作业的方式。stop()仅适用于source实现了StoppableFunction接口的作业。当用户请求停止作业时,作业的所有source都将接收stop()方法调用。直到所有source正常关闭时,作业才会正常结束。这种方式,使 作业正常处理完所有作业。

触发 savepoint

当需要生成savepoint文件时,需要手动触发savepoint。如下,需要指定正在运行的 JobID 和生成文件的存放目录。同时,我们也可以看到它会返回给用户存放的savepoint的文件名称等信息。

 [root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
 Executing TopSpeedWindowing example with default input data set.
 Use --input to specify file input.
 Printing result to stdout. Use --output to specify output path.
 Job has been submitted with JobID 216c427d63e3754eb757d2cc268a448d
 [root@hadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/
 Triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.
 Waiting for response...
 Savepoint completed. Path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfd
 You can resume your program from this savepoint with the run command.

savepointcheckpoint的区别:
checkpoint是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;savepoint是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。
checkpoint是作业failover的时候自动使用,不需要用户指定。savepoint一般用于程序的版本更新,bug修复,A/B Test等场景,需要用户指定。

从指定 savepoint 中启动

[root@hadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1a5c5ce279e0e4bd8609f541b37652e2

查看JobManager的日志能够看到Reset the checkpoint ID为我们指定的savepoint文件中的ID
[点击并拖拽以移动] ​

modify 修改任务并行度

这里修改masterconf/flink-conf.yamltask slot数修改为4。并通过xsync分发到 两个slave节点上。

taskmanager.numberOfTaskSlots: 4

修改参数后需要重启集群生效:关闭/启动集群

[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh && bin/start-cluster.sh 
Stopping taskexecutor daemon (pid: 8236) on host hadoop2.
Stopping taskexecutor daemon (pid: 8141) on host hadoop3.
Stopping standalonesession daemon (pid: 22633) on host hadoop1.
Starting cluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.

启动任务

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar 
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 2e833a438da7d8052f14d5433910515a

从页面上能看到Task Slots总计变为了8,运行的Slot1,剩余Slot数量为7
[点击并拖拽以移动] ​

这时候默认的并行度是1
[点击并拖拽以移动] ​

Flink1.0版本命令行flink modify已经没有这个行为了,被移除了。。。Flink1.7上是可以运行的。

[root@hadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3
"modify" is not a valid action.

Info 显示程序的执行计划

[root@hadoop1 flink-1.10.1]# bin/flink info examples/streaming/TopSpeedWindowing.jar 
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

拷贝输出的json内容,粘贴到这个网站:http://flink.apache.org/visualizer/可以生成类似如下的执行图。

[点击并拖拽以移动] ​

可以与实际运行的物理执行计划进行对比。
[点击并拖拽以移动] ​

SQL Client Beta

进入 Flink SQL

[root@hadoop1 flink-1.10.1]# bin/sql-client.sh embedded

Select查询,按Q退出如下界面;

Flink SQL> select 'hello word';
                                                                                                        SQL Query Result (Table)
 Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:37:04.649

                    EXPR$0
                hello word




Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

打开http://hadoop1:8081能看到这条select语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的Custom Source,输出用的是Stream Collect Sink,且只输出一条结果。
[点击并拖拽以移动] ​

[点击并拖拽以移动] ​

explain 查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
== Abstract Syntax Tree ==         //抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
+- LogicalValues(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

== Optimized Logical Plan ==      //优化后的逻辑执行计划
GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
+- Exchange(distribution=[hash[name]])
   +- Values(type=[RecordType(VARCHAR(5) name)], tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

== Physical Execution Plan ==    //物理执行计划
Stage 13 : Data Source
    content : Source: Values(tuples=[[{ _UTF-16LE'Bob' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg' }, { _UTF-16LE'Bob' }]])

    Stage 15 : Operator
        content : GroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
        ship_strategy : HASH

结果展示

SQL Client支持两种模式来维护并展示查询结果:

table mode

在内存中物化查询结果,并以分页table形式展示。用户可以通过以下命令启用table mode:例如如下案例;

Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.

Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
                                                                                                          SQL Query Result (Table)
 Table program finished.                                                                                       Page: Last of 1                                                                                         Updated: 16:55:08.589

                      name                       cnt
                     Alice                         1
                      Greg                         1
                       Bob                         2



Q Quit                                         + Inc Refresh                                  G Goto Page                                    N Next Page                                    O Open Row
R Refresh                                      - Dec Refresh                                  L Last Page                                    P Prev Page

​ [点击并拖拽以移动] ​​

​ [点击并拖拽以移动] ​​

changelog mode

不会物化查询结果,而是直接对continuous query产生的添加和撤回retractions结果进行展示:如下案例中的-表示撤回消息

Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.

Flink SQL>  SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
                                                                                                        SQL Query Result (Changelog)
 Table program finished.                                                                                                                                                                                               Updated: 16:58:05.777

 +/-                      name                       cnt
   +                       Bob                         1
   +                     Alice                         1
   +                      Greg                         1
   -                       Bob                         1
   +                       Bob                         2



Q Quit                                                                        + Inc Refresh                                                                 O Open Row
R Refresh                                                                     - Dec Refresh

​ [点击并拖拽以移动] ​​

​ [点击并拖拽以移动] ​​

Environment Files

CREATE TABLE 创建表DDL语句:

Flink SQL> CREATE TABLE pvuv_sink (
>     dt VARCHAR,
>     pv BIGINT,
>     uv BIGINT
> ) ;
[INFO] Table has been created.

SHOW TABLES 查看所有表名

Flink SQL>  show tables;
pvuv_sink

DESCRIBE 表名 查看表的详细信息;

Flink SQL>  describe pvuv_sink;
root
 |-- dt: STRING
 |-- pv: BIGINT
 |-- uv: BIGINT

插入等操作均与关系型数据库操作语句一样,省略N个操作

Restful API

接下来我们演示如何通过rest api来提交jar包和执行任务。
[点击并拖拽以移动] ​

通过Show Plan可以看到执行图
[点击并拖拽以移动] ​

提交之后的操作,取消的话点击页面的Cancel Job

​ [点击并拖拽以移动] ​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/268054.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

面向船舶结构健康监测的数据采集与处理系统(一)系统架构

世界贸易快速发展起始于航海时代&#xff0c;而船舶作为重要的水上交通工具&#xff0c;有 其装载量大&#xff0c;运费低廉等优势。但船舶在运营过程中出现的某些结构处应力值 过大问题往往会给运营部门造成重大的损失&#xff0c;甚至造成大量的人员伤亡和严重 的环境污染…

【网络安全/CTF】unseping 江苏工匠杯

该题考察序列化反序列化及Linux命令执行相关知识。 题目 <?php highlight_file(__FILE__);class ease{private $method;private $args;function __construct($method, $args) {$this->method $method;$this->args $args;}function __destruct(){if (in_array($thi…

Kali Linux—借助 SET+MSF 进行网络钓鱼、生成木马、获主机shell、权限提升、远程监控、钓鱼邮件等完整渗透测试(三)

钓鱼邮件 当攻击者制作了钓鱼网站、木马程序后&#xff0c;便会想法设法将其传给受害者&#xff0c;而常见的传播方式便是钓鱼网站了。安全意识较差的用户在收到钓鱼邮件后点击邮件中的钓鱼链接、下载附件中的木马程序&#xff0c;便可能遭受攻击&#xff01; 工具简介 Swak…

Altium Designer(AD24)新工程复用设计文件图文教程及视频演示

&#x1f3e1;《专栏目录》 目录 1&#xff0c;概述2&#xff0c;复用方法一视频演示2.1&#xff0c;创建工程2.2&#xff0c;复用设计文件 3&#xff0c;复用方法二视频演示4&#xff0c;总结 欢迎点击浏览更多高清视频演示 1&#xff0c;概述 本文简述使用AD软件复用设计文件…

Adobe Photoshop Lightroom各版本安装指南

下载链接​ https://pan.baidu.com/s/1FiqQUcMJu3TrLRWFpaaX3A?pwd0531 #2024版 1.鼠标右击【Lrc2024(64bit)】压缩包&#xff08;win11及以上系统需先点击“显示更多选项”&#xff09;【解压到 Lrc2024(64bit)】。 2.打开解压后的文件夹&#xff0c;鼠标右击【Setup】选择…

2024-AI人工智能学习-安装了pip install pydot但是还是报错

2024-AI人工智能学习-安装了pip install pydot但是还是报错 出现这样子的错误&#xff1a; /usr/local/bin/python3.11 /Users/wangyang/PycharmProjects/studyPython/tf_model.py 2023-12-24 22:59:02.238366: I tensorflow/core/platform/cpu_feature_guard.cc:182] This …

基于YOLOv7算法的高精度实时海洋生物检测识别系统(PyTorch+Pyside6+YOLOv7)

摘要&#xff1a;基于YOLOv7算法的高精度实时海洋生物目标检测系统可用于日常生活中检测与定位海胆、海参、扇贝和海星&#xff0c;此系统可完成对输入图片、视频、文件夹以及摄像头方式的目标检测与识别&#xff0c;同时本系统还支持检测结果可视化与导出。本系统采用YOLOv7目…

嵌入式开发——DMA外设到内存

学习目标 加强理解DMA数据传输过程加强掌握DMA的初始化流程掌握DMA数据表查询理解源和目标的配置理解数据传输特点能够动态配置源数据学习内容 需求 uint8_t data; 串口接收(&data);data有数据了 实现串口的数据接收,要求采用dma的方式。 数据交互流程 CPU配置好DMA外…

jvm对象探究

hostpot虚拟机对象探究 jvm虚拟机创建对象的流程 ava虚拟机&#xff08;JVM&#xff09;创建对象的过程包括以下步骤&#xff1a; 类加载&#xff1a; 首先&#xff0c;JVM会检查对象的类是否已经被加载。如果该类还没有被加载&#xff0c;JVM会通过类加载器加载该类的字节码…

实在没货,简历(软件测试)咋写?

简历咋写&#xff0c;这是很多没有【软件测试实际工作经验】的同学们非常头疼的事情。 简历咋写&#xff1f;首先你要知道简历的作用。 简历的作用是啥呢&#xff1f; 一句话就是&#xff1a;让HR小姐姐约你。 如何让HR看你一眼&#xff0c;便相中你的简历&#xff0c;实现在众…

50 个具有挑战性的概率问题 [01/50]:袜子抽屉

一、说明 我最近对与概率有关的问题产生了兴趣。我偶然读到了弗雷德里克莫斯特勒&#xff08;Frederick Mosteller&#xff09;的《概率论中的五十个具有挑战性的问题与解决方案》&#xff08;Fifty Challenge Problems in Probability with Solutions&#xff09;一书。我认为…

React AntDesign form表单文件上传 nodejs formidable 接受参数并把文件放置后端项目相对目录指定文件夹下面

umijs/max 请求方法 // 上传文件改成form表单 export async function uploadFile(data, options) {return request(CMMS_UI_HOST /api/v1/uploadFile, {method: POST,data,requestType: form,...(options || {}),}); }前端调用方法 注意upload组件上传 onChange的如下方法&am…

51单片机的羽毛球计分器系统【含proteus仿真+程序+报告+原理图】

1、主要功能 该系统由AT89C51单片机LCD1602显示模块按键等模块构成。适用于羽毛球计分、乒乓球计分、篮球计分等相似项目。 可实现基本功能: 1、LCD1602液晶屏实时显示比赛信息 2、按键控制比赛的开始、暂停和结束&#xff0c;以及两位选手分数的加减。 本项目同时包含器件清…

不同文化背景下,如何调整绩效管理策略以适应不同的价值观和工作习惯

在不同文化背景下调整绩效管理策略以适应不同的价值观和工作习惯是一个复杂而关键的过程。以下是一些建议&#xff1a; 了解并尊重文化差异&#xff1a; 首先&#xff0c;需要深入了解不同文化背景下的价值观、工作习惯、沟通方式等。这包括对个人空间、权威、时间观念、团队…

Skywalking 中 Agent 自动同步配置源码解析

文章目录 前言正文实现架构实现模型OAP 同步 ApolloConfigWatcherRegisterConfigChangeWatcher Agent 侧 前言 本文代码 OAP 基于 v9.7&#xff0c;Java Agent 基于 v9.1&#xff0c;配置中心使用 apollo。 看本文需要配合代码“食用”。 正文 Skywalking 中就使用这种模型…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Image图片组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之Image图片组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、Image组件 Image 用来加载并显示图片的基础组件&#xff0c;它支持从内存、本…

c++缺省参数与函数重载(超详细)

文章目录 前言一、缺省参数1.缺省参数的概念与使用2.缺省参数的分类3.缺省参数注意事项 二、函数重载1.什莫事函数重载2.函数重载的几种形式3.函数重载与缺省值的结合4.为什么c支持函数重载&#xff1f;&#xff1f; 总结 前言 在本文章中&#xff0c;我们将要详细介绍一下Cc缺…

产品设计 之 创建完美产品需求文档的4个核心要点

客户描述他们想要的产品和最终交付的产品之间的误解一般很大&#xff0c;设计者和客户的角度不同&#xff0c;理解的程度也不同&#xff0c;就需要一个统一的交流中介。这里包含PRD。 为了说明理解误差的问题。下面这张有趣的图画可以精准阐述。 第一张图片展示了客户所描述…

vmware安装中标麒麟高级服务器操作系统 V7.0

vmware安装中标麒麟高级服务器操作系统 V7.0 1、下载中标麒麟高级服务器操作系统 V7.0镜像2、安装中标麒麟高级服务器操作系统 V7.02.1、新建虚拟机2.2、安装虚拟机 3、配置中标麒麟高级服务器操作系统 V7.03.1、登录系统3.2、配置静态IP地址 和 dns3.3、查看磁盘分区3.4、查看…