flink-java使用介绍,flink,java,DataStream API,DataSet API,ETL,设置 jobname

1、环境准备

文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/
仓库:https://github.com/apache/flink
下载:https://flink.apache.org/zh/downloads/
下载指定版本:https://archive.apache.org/dist/flink/flink-1.17.1/

ETL:用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。

注意:现在的flink没有bat执行文件,需要自己创建,而网上复制的 bat 文件大都有问题,最好在 Linux 系统跑!!

我下载的是 flink-1.17.1

> java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

java8, jdk-1.8.0_181

start-cluster.bat 文件

::###############################################################################
::  Licensed to the Apache Software Foundation (ASF) under one
::  or more contributor license agreements.  See the NOTICE file
::  distributed with this work for additional information
::  regarding copyright ownership.  The ASF licenses this file
::  to you under the Apache License, Version 2.0 (the
::  "License"); you may not use this file except in compliance
::  with the License.  You may obtain a copy of the License at
::
::      http://www.apache.org/licenses/LICENSE-2.0
::
::  Unless required by applicable law or agreed to in writing, software
::  distributed under the License is distributed on an "AS IS" BASIS,
::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
::  See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################
 
@echo off
setlocal EnableDelayedExpansion
 
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\log
 
SET JVM_ARGS=-Xms1024m -Xmx1024m
 
SET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*
 
SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%
 
SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
 
:: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do ( 
SET /A y = %%x+1 
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!"  2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!"  2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0"  2> nul
RENAME "%logname_tm%" "%logname_tm%.0"  2> nul
RENAME "%outname_jm%" "%outname_jm%.0"  2> nul
RENAME "%outname_tm%" "%outname_tm%.0"  2> nul
DEL "%logname_jm%.6"  2> nul
DEL "%logname_tm%.6"  2> nul
DEL "%outname_jm%.6"  2> nul
DEL "%outname_tm%.6"  2> nul
 
for %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (
    echo java.exe was not found in PATH variable
    goto :eof
)
 
echo Starting a local cluster with one JobManager process and one TaskManager process.
 
echo You can terminate the processes via CTRL-C in the spawned shell windows.
 
echo Web interface by default on http://localhost:8081/.
 
start /b java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start /b java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1
 
endlocal

flink.bat文件

::###############################################################################
::  Licensed to the Apache Software Foundation (ASF) under one
::  or more contributor license agreements.  See the NOTICE file
::  distributed with this work for additional information
::  regarding copyright ownership.  The ASF licenses this file
::  to you under the Apache License, Version 2.0 (the
::  "License"); you may not use this file except in compliance
::  with the License.  You may obtain a copy of the License at
::
::      http://www.apache.org/licenses/LICENSE-2.0
::
::  Unless required by applicable law or agreed to in writing, software
::  distributed under the License is distributed on an "AS IS" BASIS,
::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
::  See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################
 
@echo off
setlocal
 
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
 
SET JVM_ARGS=-Xmx512m
 
SET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*
 
java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*
 
endlocal

查看信息

> flink.bat -h

./flink <ACTION> [OPTIONS] [ARGUMENTS]

The following actions are available:

Action "run" compiles and runs a program.

......

2、WordCount 示例

安装 IntelliJ 编辑器,IntelliJ IDEA 2023.3.2

并安装 maven

2.1、DatStream API 实现批处理

创建项目 New Project --> Maven Archetype

在这里插入图片描述

Catalog参数是Archetype的存储的地方,可以理解为大致的分类,此处我选择Maven Central,点击后面的Manage catalogs可以知道Maven Central是要从线上下载,因此需要等一会。

Archetype参数是Maven Project Template,可以帮你快速初始化项目结构,等到catalog下载好之后,在这里输入 flink 来检索,然后选择org.apache.flink:flink-quickstart-java

Version为模板的版本号,它同时也是 flink 包的版本号。

然后点击Create创建之。

在这里插入图片描述

我们发现pom.xml文件里面已经添加好了很多依赖,这就是使用 Maven 模板的好处。

除此之外,我们还可以使用 mvn命令来开始创建 flink应用,参考地址,

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.17.1

可以发现,跟我们在 IntelliJ 中创建的参数是一样的。

它默认提供的DEMO是流式的执行环境,即 Streaming。

注意,从Flink 1.12开始,官方推荐直接使用 DataStream API 来处理流和批,然后在提交任务时通过将执行模式设置为 BATCH 来进行批处理。比如bin/flink run -Dexecution.runtime-mode=BATCH WordCount.jar,这样的好处是官方只需要维护一套 API 即可。

所以,我们可以在官方给的DEMO中来实现对 txt 内容的处理。

在项目根目录下创建文件input/wordcount.txt

hello flink
hello java
hello scala

编辑DataStreamJob这个类

package org.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Skeleton for a Flink DataStream Job.
 *
 * <p>For a tutorial how to write a Flink application, check the
 * tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution, run
 * 'mvn clean package' on the command line.
 *
 * <p>If you change the name of the main class (with the public static void main(String[] args))
 * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
 */
public class DataStreamJob {
	public static void main(String[] args) throws Exception {
		// 使用 DataStream API

         // 创建执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
         // 读取数据
         // 相对路径相对的是工程跟路径
         // D:\dev\java-intellij\word_count_5\input\wordcount.txt
         // /mnt/d/dev/java-intellij/word_count_5/input/wordcount.txt
		DataStreamSource<String> stringDataStreamSource = env.readTextFile("D:\dev\java-intellij\word_count_5\input\wordcount.txt");

         // 按行切分,转换成元组(word, 1)
         // 如果参数是接口,可以直接使用匿名类的对象,即直接实例化此接口 new InterfaceA() {}
         // alt+enter 实现接口方法
		SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
				String[] words = s.split(" ");
				for (String word : words) {
                      //转换为 (word, 1)
					Tuple2<String, Integer> stringIntegerTuple2 = Tuple2.of(word, 1);

					// 使用collector向下游发送数据
					collector.collect(stringIntegerTuple2);
				}
			}
		});

         // 按照单词分组
		wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
			@Override
			public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
				return stringIntegerTuple2.f0;// 按照二元组的第一个位置聚合
			}
		})
            .sum(1) // 按照二元组的第二个位置求和
            .print(); // 输出

		env.execute();
	}
}

注意,wordcount.txt的路径要正确,第一个是在编辑器中运行此程序的时候要能找到这个文件;第二个是在打成 jar 包的时候,此txt文件是不会包含在内的,那么发送到flink服务器去运行的时候怎么去找到这个文件呢,我的flink也是在windows本地启动的,所以我这里填绝对路径就没问题。

此时点击main方法运行会报错,提示类找不到,我们来到 pom.xml 中,就会发现

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

其中<scope>provided</scope>的意思是,在编译和运行的时候并不会将此依赖编译进去,自然在运行的时候是找不到此依赖的。那么为什么要这么做呢,这是因为在某些情况下,此项目B会被打包成 jar 然后被程序A加载进去使用的,如果程序A中已经包含了这些依赖,那么在项目B打包的时候就没必要再把这些依赖编译进去了,这样的 jar 包会小很多,而flink就是这样的使用场景。

那么问题来了,本地该如何运行呢?

Run -> Edit Configurations,在 Build and run右边点击Modify options,勾选中Add dependencies with 'provided' scope to classpath,点击 Apply

在这里插入图片描述

注意,这里 Application 下面的类必须是执行了一次之后才有的。

再来运行 main方法,可以找到打印信息

3> (hello,1)
1> (scala,1)
3> (hello,2)
3> (hello,3)
7> (flink,1)
2> (java,1)

注意看,每一行输出前面都有个编号,可以理解为这是线程编号。并且这里输出了6行,顺序是乱的,是并行处理的,而且统计的结果是逐渐在变化,可见,虽然每个单词都由不同的线程在处理,但是聚合的结果却是正确的,这就是有状态的意思(stateFul),它内部已经维护好了这个结果。

说明程序运行正常,但是乱七八糟的打印太多,于是修改pom.xml删除以下依赖

<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>${log4j.version}</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>${log4j.version}</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>${log4j.version}</version>
    <scope>runtime</scope>
</dependency>

在修改了pom.xml文件,或者修改了代码,或者有些class提示找不到了,我们都需要刷新一下maven。可以右键pom.xml --> Maven --> Reload Project;或者点开编辑器右边的 maven 按钮,点击刷新按钮。

最终运行结果如下

在这里插入图片描述

其实,这是以流的方式在处理 txt 文件内容,因为我们并没有设置-Dexecution.runtime-mode=BATCH参数。

2.2、DataSet API 实现批处理

为了对比批处理和流处理的效果,再写一个 DataSet API 的例子。

package org.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class DataSetBatchJob {
    public static void main(String[] args) throws Exception {
        // 使用 DataSet API 方式实现的批处理

        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 读取数据
        // 相对路径相对的是工程跟路径
        // /mnt/d/dev/java-intellij/word_count_5/input/wordcount.txt
        DataSource<String> dataSource = env.readTextFile("D:\\dev\\java-intellij\\word_count_5\\input\\wordcount.txt");

        // 按行切分,转换成元组(word, 1)
        // 如果参数是接口,可以直接使用匿名类的对象,即直接实例化此接口 new InterfaceA() {}
        // alt+enter 实现接口方法
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    //转换为 (word, 1)
                    Tuple2<String, Integer> stringIntegerTuple2 = Tuple2.of(word, 1);

                    // 使用collector向下游发送数据
                    collector.collect(stringIntegerTuple2);
                }
            }
        });

        // 按照单词分组
        wordAndOne
                .groupBy(0)// 按照二元组的第一个位置聚合
                .sum(1)// 按照二元组的第二个位置求和
                .print();// 输出
    }
}

成功执行后的打印结果

在这里插入图片描述

批处理是所有的记录执行完之后打印最终结果的。

2.3、处理无界数据流

使用 socket 连接来模拟无界的数据流。

写法跟DataStreamJob一模一样,就是数据源改一下。

package org.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class DataStreamSocketJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("127.0.0.1", 7777);

        SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> stringIntegerTuple2 = Tuple2.of(word, 1);
                    collector.collect(stringIntegerTuple2);
                }
            }
        });

        tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        }).sum(1).print();

        env.execute();
    }
}

在 WSL 启动一个TCP监听服务 nc -l 7777

执行 main 方法。

然后在 nc 这边输入

hello boy
hello girl
hello flink

我们能看到编辑器有输出

在这里插入图片描述

2.4、有界数据和无界数据

结合 2.1,2.2,2.3 的内容,可以发现,

对于有界数据(一般指日志文件),既可以使用 DataSet API批处理,也可以使用DataStream API配合参数-Dexecution.runtime-mode=BATCH来批处理,还可以使用DataStream API不带参数来流处理。

对于无界数据,我们使用DataStream API来流处理。

3、使用Maven打包成 jar

打开 maven, lifecycle ,先 clean ,再 package
打包结果在 target 目录,其中带 origin 的包是不包含任何依赖的,因此不够通用,包也很小;另外一个包是按照pom.xml来打包的。

在这里插入图片描述

为什么两个都是 7KB,那是因为在 pom.xml 中定义了provided

4、提交任务

启动 flink

> start-cluster.bat

Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.

访问:http://localhost:8081/

关闭cmd窗口就可以停止flink

为什么Available Task Slots都是 0 呢?Task Managers 为空?

在这里插入图片描述

使用自带的example测试

> flink.bat run D:\dev\php\magook\trunk\server\flink-1.17.1\examples\batch\WordCount.jar

Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 241dfa34420ee6e8beb68a86997cd9f1

也可以来到 web 上面手动提交。

任务都会超时报错:NoResourceAvailableException: Could not acquire the minimum required resources

事实证明 TaskManager 启动失败了,也可能是我复制过来的 bat 文件有问题。为什么 Flink 官方不再提供 bat 文件呢?

5、在 WSL 安装 java-1.8.0

看来只能换到 Linux 系统啦。

下载 java-1.8.0_202
https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html#license-lightbox

开始安装

> mkdir /usr/lib/jdk

> tar -zxf jdk-8u202-linux-x64.tar.gz -C /usr/lib/jdk

> vi /etc/profile

export JAVA_HOME=/usr/lib/jdk/jdk1.8.0_202
export JRE_HOME=${JAVA_HOME}/jre    
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib    
export PATH=${JAVA_HOME}/bin:$PATH


> source /etc/profile 

> java -version
java version "1.8.0_202"
Java(TM) SE Runtime Environment (build 1.8.0_202-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)

运行 Flink

> cd /mnt/d/dev/php/magook/trunk/server/flink-1.17.1

> bin/start-cluster.sh

Starting cluster.
Starting standalonesession daemon on host windows10-jack.
Starting taskexecutor daemon on host windows10-jack.

访问:http://localhost:8081/

在这里插入图片描述

JobManager 将任务分配到 TaskManager 去执行。

TaskManager:执行数据流的task,一个task通过设置并行度,可能会有多个subtask。 每个TaskManager都是作为一个独立的JVM进程运行的。他主要负责在独立的线程执行的operator。其中能执行多少个operator取决于每个taskManager指定的slots数量(默认一个 TaskManager 设置了一个 slot)。Task slot是Flink中最小的资源单位。假如一个taskManager有3个slot,他就会给每个slot分配1/3的内存资源,目前slot不会对cpu进行隔离。同一个taskManager中的slot会共享网络资源和心跳信息。

5.1、命令行提交任务

使用自带的example测试

> bin/flink run /mnt/d/dev/php/magook/trunk/server/flink-1.17.1/examples/batch/WordCount.jar

Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1999ddc8ad4d3ba97eb0e07e76692705
Program execution finished
Job with JobID 1999ddc8ad4d3ba97eb0e07e76692705 has finished.
Job Runtime: 1463 ms
Accumulator Results:
- 6687ca7bfce1aae232b5c6988b84ee8e (java.util.ArrayList) [170 elements]


(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
.
.
.

可见 flink 是启动成功的。

我们现在有了三个类

DataSetBatchJob
DataStreamJob
DataStreamSocketJob

接下来我们要修改一下,将txt文件地址改成/mnt/d/dev/java-intellij/word_count_5/input/wordcount.txt,重新打包。

如果在 pom.xml 中没有指定 mainClass ,或者设置的 mainClass 并不是你要执行的,那么在提交任务的时候就要指定 entryCLass,比如 -c org.example.DataSetBatchJob

在这里插入图片描述

> bin/flink run -c org.example.DataSetBatchJob /mnt/d/dev/java-intellij/word_count_5/target/word_count_5-1.0-SNAPSHOT.jar

Job has been submitted with JobID f6b25200fdba6be70dbd595adf57372e
Program execution finished
Job with JobID f6b25200fdba6be70dbd595adf57372e has finished.
Job Runtime: 1380 ms
Accumulator Results:
- 79e0e4dadaeee895df041c1ff01385f8 (java.util.ArrayList) [4 elements]


(flink,1)
(hello,3)
(java,1)
(scala,1)

命令是阻塞状态,直到任务被执行完毕,可以加上参数 -d 或 --detached,命令立即返回,但是打印信息要去 webUI 查看。

在这里插入图片描述

在这里插入图片描述

5.2、webUI 上提交任务

还是选择这个 jar 包

在这里插入图片描述

可见 entryClass 默认就是 pom.xml 中的设置,当然你还以修改这个参数,此处我们改为DataSetBatchJob,然后点击submit。报错

在这里插入图片描述

重点是下面这句

Caused by: org.apache.flink.api.common.InvalidProgramException: Job was submitted in detached mode. 
Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure 
your program doesn't call an eager execution function [collect, print, printToErr, count]. 

detached 模式:分离的,指的是通过客户端、Java API 或 Restful 等方式提交的任务,是不会等待作业运行结束的。如果代码中带有collect, print, printToErr, count 操作,对于DataSet API,会直接报错,如上;对于DataStream API,是可以运行的,需要去 webUI 中查看打印信息。

blocking 模式:同步阻塞的,指的是提交作业的时候,会等待作业被执行完,返回结果,打印结果,我们可以通过关闭终端或 Ctrl + C 的方式直接关闭正在运行的 flink 作业,比如我们在命令行执行 bin/flink run ...。当然,也可以在命令行下通过指定 --detached 来使用 detached 模式提交,这样命令行是看不到打印结果的。

bin/flink -h

我们来提交DataStreamJob这个类试试。还是这个包,因此不用重新上传,只需要修改一下 entryClass 然后点 submit 即可。

在这里插入图片描述

这次居然没有报错,那么它打印的信息在哪里呢?

任务是已经FINISHED,我们点开任务详情。
在这里插入图片描述
实际上,这一个 Job 包含了三个计算任务,而每个计算任务又可能分配到不同的 TaskManager 上运行(显然此处我们只有一个 TaskManager),所以你并不知道 Print 操作是在哪个 TaskManager 执行的。

显然第三个任务包含了Stdout,点它。

在这里插入图片描述

Stdout中就能看到打印的信息,

在这里插入图片描述
另外,在 Log List中也能找到打印的信息。找到.out结尾的日志文件,比如我的flink-Ubuntu-taskexecutor-1-windows10-jack.out

在这里插入图片描述

很明显这是流式处理。

接下来我们提交一个无界数据流的任务,也就是DataStreamSocketJob这个类,注意 nc 服务要启动。在 nc 上依次输入

hi girl
hi boy
hi lady

查看日志文件

在这里插入图片描述

这种任务会一直处于RUNNING状态,可以点击Cancel Job将其结束,但是 nc 也会被结束。

所以,使用 webUI 来提交任务还是挺局限的,首先它是detached,其次还不能设置命令参数。

重启 flink ,清除任务记录

> bin/stop-cluster.sh 
> bin/start-cluster.sh

依次执行以下命令

> bin/flink run -c org.example.DataSetBatchJob /mnt/d/dev/java-intellij/word_count_5/target/word_count_5-1.0-SNAPSHOT.jar

> bin/flink run -c org.example.DataStreamJob /mnt/d/dev/java-intellij/word_count_5/target/word_count_5-1.0-SNAPSHOT.jar

> bin/flink run -c org.example.DataStreamJob -Dexecution.runtime-mode=BATCH /mnt/d/dev/java-intellij/word_count_5/target/word_count_5-1.0-SNAPSHOT.jar 

在这里插入图片描述

从 Job Name 这一栏看,-Dexecution.runtime-mode=BATCH是生效的。但是这里还有一个问题,如何指定 Job Name 呢?我们来查看 Flink 的开发配置:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/

使用参数 -Dpipeline.name='test_DataStream_api_jar' 来设置 Job Name。

bin/flink run -c org.example.DataStreamJob -Dpipeline.name='test_DataStream_api_jar' /mnt/d/dev/java-intellij/word_count_5/target/word_count_5-1.0-SNAPSHOT.jar

在这里插入图片描述

flink run 命令其实也是投递到webui那个接口,因此可以指定IP和端口,比如-m hadoop002:8081

6、运行与部署

部署模式:会话模式(session mode),应用模式(application mode),单作业模式(per-job mode)。

运行模式,standalone模式,k8s模式,yarn模式。

我们在上面启动的就是 standalone 模式,这种模式不会动态的伸缩计算节点,也就是 TaskManager 在集群启动的时候就要指定好,不能自适应增减节点。因此官方将flink和Yarn做了集成,使用 yarn-session.sh 命令就能以Yarn的方式来运行flink,这样Yarn就会根据任务的数量来动态增减TaskManager的数量。

Yarn是Hadoop的组件,因此需要先部署Hadoop环境和HDFS并运行之。

7、Flink API 简介

Flink将数据处理接口抽象成四层:

  • 1、SQL API:SQL语言的学习成本低,能够让数据分析人员和开发人员快速上手,帮助其更加专注业务本身而不受限于复杂的编程接口,可以通过SQL API完成对批计算和流计算的处理;
  • 2、Table API:将内存中 DataStream 和 DataSet 在原有的基础上增加Schema信息,将数据类型统一抽象成表结构,然后通过Table API提供的接口处理对应的数据集;
  • 3、DataStream/DataSet API:主要面向具有开发经验的用户,用户可以根据API处理无界流数据和批量数据;
  • 4、Stateful Stream Processing:是Flink中最底层的开发接口,可以使用接口中操作状态、时间等底层数据,可以实现非常复杂的流式计算逻辑。

我们上面的例子就是第三层的API,显然第一层的 SQL API 是抽象程度最高的,也是兼容性最好的,使用最简单的。

越往下越接近底层,使用的时候需要注意的东西就越多,越麻烦。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/344200.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

洛谷C++简单练习day4

day4---进制转化---1.22 习题概述 题目描述 今天小明学会了进制转换&#xff0c;比如&#xff08;10101&#xff09;2 &#xff0c;那么它的十进制表示的式子就是 : 1*2^40*2^31*2^20*2^11*2^0&#xff0c; 那么请你编程实现&#xff0c;将一个M进制的数N转换成十进制表示…

VSCode Debug 参数设置说明

如果想在vscode中debug一个项目&#xff0c;比如python3 run.py --args 这个时候你需要着重关注几个参数&#xff0c;参数用两个双引号分开&#xff0c;不能有空格。 cwd :运行代码的基础目录env: 设置环境变量 PYTHONPATH&#xff1a; 设置项目用到的模块搜索路径&#xff…

开源模型应用落地-KnowLM模型小试-入门篇(一)

一、前言 你是否了解知识图谱&#xff1f;如果了解&#xff0c;你们的业务场景是否应用了知识图谱&#xff1f;实际上&#xff0c;知识图谱在各行各业都被广泛使用。例如&#xff0c;在搜索引擎优化方面&#xff0c;通过利用知识图谱&#xff0c;搜索引擎可以更好地理解网页内容…

【英文干货】【Word_Search】找单词游戏(第3天)

本期主题&#xff1a;Doors&#xff08;各式各样的门&#xff09; 本期单词&#xff1a; Automatic (Door) 自动门 Back (Door) 后门 Barn (Door) 谷仓的门 Battened (Door) 用木条加固的门 Fire (Door) 防火门 Front (Door) 前门 Garage (Door) 车库的门 Glazed (Door…

大数据学习之Flink算子、了解(Transformation)转换算子(基础篇三)

Transformation转换算子&#xff08;基础篇三&#xff09; 目录 Transformation转换算子&#xff08;基础篇三&#xff09; 三、转换算子&#xff08;Transformation&#xff09; 1.基本转换算子 1.1 映射&#xff08;Map&#xff09; 1.2 过滤&#xff08;filter&#xf…

【优先级队列 之 堆的实现】

文章目录 前言优先级队列 PriorityQueue优先队列的模拟实现 堆堆的储存方式堆的创建建堆的时间复杂度堆的插入与删除 总结 前言 优先级队列 PriorityQueue 概念&#xff1a;对列是先进先出的的数据结构&#xff0c;但有些情况&#xff0c;数据可能带有优先级&#xff0c;一般出…

C++:使用tinyXML生成矢量图svg

先说一下tinyXML库的配置&#xff1a; 很简单&#xff0c;去下面官网下载 TinyXML download | SourceForge.net 解压后是这样 直接将红框中的几个文件放到项目中即可使用 关于svg文件&#xff0c;SVG是基于XML的可扩展矢量图形&#xff0c;svg是xml文件&#xff0c;但是xml…

TCP三握四挥(面试需要)

TCP建立连接需要三次握手过程&#xff0c;关闭连接需要四次挥手过程 三次握手 从图中可以看出&#xff0c;客户端在发起connect时&#xff0c;会发起第一次和第三次握手。服务端在接收客户端连接时&#xff0c;会发起第二次握手。 这三次握手&#xff0c;都会通过SYNACK的方式…

阿里云4核8G云服务器价格、带宽及系统盘费用

阿里云服务器4核8g配置云服务器u1价格是955.58元一年&#xff0c;4核8G配置还可以选择ECS计算型c7实例、计算型c8i实例、计算平衡增强型c6e、ECS经济型e实例、AMD计算型c8a等机型等ECS实例规格&#xff0c;规格不同性能不同&#xff0c;价格也不同&#xff0c;阿里云服务器网al…

EasyExcel入门使用

EasyExcel是一个基于Java的、快速、简洁、解决大文件内存溢出的Excel处理工具。他能让你在不用考虑性能、内存的等因素的情况下&#xff0c;快速完成Excel的读、写等功能。 EasyExcel 的主要特点如下&#xff1a; 1、高性能&#xff1a;EasyExcel 采用了异步导入导出的方式&…

数据结构:搜索二叉树 | 红黑树 | 验证是否为红黑树

文章目录 1.红黑树的概述2.红黑树的性质3.红黑树的代码实现3.1.红黑树的节点定义3.2.红黑树的插入操作3.3.红黑树是否平衡 黑红树是一颗特殊的搜索二叉树&#xff0c;本文在前文的基础上&#xff0c;图解红黑树插入&#xff1a;前文 链接&#xff0c;完整对部分关键代码展示&a…

macOS跨进程通信: Unix Domain Socket 创建实例

macOS跨进程通信: Unix Domain Socket 创建实例 一&#xff1a; 简介 Socket 是 网络传输的抽象概念。 一般我们常用的有Tcp Socket和 UDP Scoket&#xff0c; 和类Unix 系统&#xff08;包括Mac&#xff09;独有的 Unix Domain Socket&#xff08;UDX&#xff09;。 Tcp So…

避雷!仅1天撤回55篇中国学者的研究论文!这本毕业神刊需要注意!

【SciencePub学术】 这本国际期刊&#xff0c;仅仅去年12月一个月的时间&#xff0c;就撤回了近60篇国人文章&#xff01;这本期刊就是来自Hindawi出版社的APPLIED BIONICS AND BIOMECHANICS。 01 期刊信息简介 APPLIED BIONICS AND BIOMECHANICS IF(2022)&#xff1a;2.2&…

三维城市模型提升日本的智慧城市管理

MicroStation 将工作效率提高 50%&#xff0c;实现了前所未有的逼真模拟 构建三维城市模型生态系统 PLATEAU 项目由日本国土交通省牵头&#xff0c;是一项三维城市模型和数字孪生计划&#xff0c;旨在到 2027 年为日本 500 个城市构建开放的城市模型数字生态系统。作为日本最…

java获取linux和window序列号

前言 获取系统序列号在Java中并不是一个直接支持的功能&#xff0c;因为Java语言本身并不提供直接访问硬件级别的信息&#xff0c;如CPU序列号。但是&#xff0c;我们可以使用一些平台特定的工具或命令来实现这一功能。下面我将展示如何使用Java获取Windows和Linux系统上的CPU…

通过代理服务器的方式解决跨域问题

学习源码可以看我的个人前端学习笔记 (github.com):qdxzw/frontlearningNotes 觉得有帮助的同学&#xff0c;可以点心心支持一下哈 这里以本地访问https://heimahr.itheima.net/api/sys/permission接口为列子 Node.js 代理服务器 (server.js) 本次考虑使用JSONP或CORS代理来…

PHP“引用”漏洞

今日例题&#xff1a; <?php highlight_file(__FILE__); error_reporting(0); include("flag.php"); class just4fun { var $enter; var $secret; } if (isset($_GET[pass])) { $pass $_GET[pass]; $passstr_replace(*,\*,$pass); } $o unser…

【操作系统】实验三 编译 Linux 内核

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的很重要&…

[Linux]HTTP状态响应码和示例

1xx&#xff1a;信息响应类&#xff0c;表示接收到请求并且继续处理 2xx&#xff1a;处理成功响应类&#xff0c;表示动作被成功接收、理解和接受 3xx&#xff1a;重定向响应类&#xff0c;为了完成指定的动作&#xff0c;必须接受进一步处理 4xx&#xff1a;客户端错误&#x…

如何使用Docker本地部署Jupyter Notebook并结合内网穿透实现远程访问

&#x1f4d1;前言 本文主要是Linux下通过使用Docker本地部署Jupyter Notebook并结合内网穿透实现远程访问的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;…
最新文章