Flink on Kubernetes (flink-operator) 部署Flink

flink on k8s 官网

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/try-flink-kubernetes-operator/quick-start/
我的部署脚本和官网不一样,有些地方官网不够详细

部署k8s集群

注意,按照默认配置至少有两台worker

安装helm

https://helm.sh/zh/docs/intro/install/

安装flink opreator

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.1.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator -n flink
  • 安装完成后,资源如下
[root@k8s1 flinkinstall]# helm list -n flink
NAME                            NAMESPACE       REVISION        UPDATED                                 STATUS          CHART                           APP VERSION
flink-kubernetes-operator       flink           1               2024-03-07 16:57:48.374299701 +0800 CST deployed        flink-kubernetes-operator-1.7.0 1.7.0 
[root@k8s1 flinkinstall]# kubectl get all -A
NAMESPACE              NAME                                             READY   STATUS    RESTARTS   AGE
cert-manager           pod/cert-manager-66b646d76-gkw55                 1/1     Running   1          2d2h
cert-manager           pod/cert-manager-cainjector-59dc9659c7-pkgrm     1/1     Running   1          2d2h
cert-manager           pod/cert-manager-webhook-7f7787f7fd-wd5vv        1/1     Running   1          2d2h
flink                  pod/flink-kubernetes-operator-857d48ff65-45mg2   2/2     Running   6          5d20h

NAMESPACE              NAME                                     TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                  AGE
cert-manager           service/cert-manager                     ClusterIP   192.178.138.19    <none>        9402/TCP                 2d2h
cert-manager           service/cert-manager-webhook             ClusterIP   192.178.130.219   <none>        443/TCP                  2d2h
flink                  service/flink-operator-webhook-service   ClusterIP   192.178.139.67    <none>        443/TCP                  5d20h

NAMESPACE              NAME                                        READY   UP-TO-DATE   AVAILABLE   AGE
cert-manager           deployment.apps/cert-manager                1/1     1            1           2d2h
cert-manager           deployment.apps/cert-manager-cainjector     1/1     1            1           2d2h
cert-manager           deployment.apps/cert-manager-webhook        1/1     1            1           2d2h
flink                  deployment.apps/flink-kubernetes-operator   1/1     1            1           5d20h

NAMESPACE              NAME                                                   DESIRED   CURRENT   READY   AGE
cert-manager           replicaset.apps/cert-manager-66b646d76                 1         1         1       2d2h
cert-manager           replicaset.apps/cert-manager-cainjector-59dc9659c7     1         1         1       2d2h
cert-manager           replicaset.apps/cert-manager-webhook-7f7787f7fd        1         1         1       2d2h
flink                  replicaset.apps/flink-kubernetes-operator-857d48ff65   1         1         1       5d20h

  • 此时k8s集群就可以支持我们按照flink-opreator的指定格式提交flink任务了

提交flink任务

session模式与application模式区别在于资源隔离度

  • session模式: jobmanager预先启动,随时准备接收flink jar,启动taskmanager,flink任务结束后jobmanager不退出,所有flink任务共享同一个jobmanager,资源隔离差,某个flink任务导致jobmanager异常,会影响到其他flink任务,小任务,不在乎异常情况可以用
  • application模式:每次提交flink任务才会启动一个jobmanger,flink任务结束后,jobmanager也退出,隔离效果好,生产常用
  • per-job模式:这个模式与application模式类似, 区别在于client的运行位置,但是新版的flink已经删除了这种提交方式

这里是flink on yarn的运行模式
https://blog.csdn.net/java_creatMylief/article/details/126172793

application模式

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: pod-template-example
spec:
  image: flink:1.15
  flinkVersion: v1_15
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      serviceAccount: flink
      containers:
        # Do not change the main container name
        - name: flink-main-container
          env:
          volumeMounts:
            - mountPath: /flink-logs
              name: flink-logs
      initContainers:
        - name: init-nginx
          image: busybox
          command: [ 'sh','-c','wget http://192.168.33.2/phoenix-client-1.0-SNAPSHOT-jar-with-dependencies.jar -O /flink-logs/StateMachineExample1.jar' ]
          volumeMounts:
            - mountPath: /flink-logs
              name: flink-logs
      volumes:
        - name: flink-logs
          emptyDir: { }
  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    resource:
      memory: "1024m"
      cpu: 1
  job:
    jarURI: local:///flink-logs/StateMachineExample1.jar
    parallelism: 1
    entryClass: org.examplexxx.test
    args: [/path/from/data,/path/to/data]
    initialSavepointPath: hdfs://flink/ckpath/xxxxx
    
kubectl apply -f ${name}.yaml

kubectl port-forward svc/basic-example-rest 8081 --address 192.168.33.81

访问 http://192.168.33.81:8081

jarURI: local:///flink-logs/StateMachineExample1.jar
此处jarURL只得是docker内部路径,且不支持远程路径(http/s3/hdfs),因此需要将jar包放到docker内部。

1、可以将flink版本和jar包打到一个镜像中。
2、可以使用pvc挂载进去。
3、使用initContainers和 containers使用相同的挂载路径,然后使用远程文件下载放到挂载路径中,containers就能获取到该jar包

此处使用第三种情况,使用initContainers变相支持远程文件地址,使用起来比较方便。

yarn-application 对比

yarn-applicationk8s-application
-p (并行度)spec.job.parallelism
-yjm (jobmanager内存)spec.jobManager.resource.memory
-ytm (taskmanager内存)spec.taskManager.resource.memory
-ys (taskmanger的slot槽数)spec.flinkConfiguration.taskmanager.numberOfTaskSlots
-c (主类)spec.job.entryClass
jar (jar包)spec.job.jarURI
-s (恢复点启动)spec.job.initialSavepointPath

session模式

部署session cluster

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: session-deployment-only
spec:
  image: flink:1.13.6
  flinkVersion: v1_13
  imagePullPolicy: IfNotPresent # 镜像拉去策略,优先本地,没有,仓库拉去
  ingress:
    template: "flink.k8s.io/{{namespace)}/{{name}}(/|$)(.*)"
    className: "Nginx"
    annotations:
      taskmanager.numberOfTaskSlots: "2"
    serviceAccount: flink
    jobManager:
      replicas: 1
      resource:
        memory: "1024m"
        cpu: 1
    taskManager:
      replicas: 1
      resource:
        memory: "1024m"
        cpu: 1

kubectl apply -f ${name}.yaml


在这里插入图片描述
部署cluster完成,配置svcType 后即可访问,flink web ui,此时jobManager是启动着的 taskmanager随着flink jar进行启动和停止

部署flink jar

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  namespace: flink
  name: session-job-only
  job:
    jarUrl: sasa
    entryClass: aa
    parallelism: 1
    upgradeMode: stateless


kubectl apply -f ${name}.yaml

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/502441.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

概率论经典题目-二维随机变量及分布--求离散型的联合分布律和边缘分布律问题

题目&#xff1a;一整数N等可能地在1,2,3,…,10十个值中取一个值设DD(N)是能整除N的正整数的个数,FF(N)是能整除N的素数的个数(注意1不是素数).试写出D和F的联合分布律,并求边缘分布律&#xff1f; 解答&#xff1a; 1&#xff09;要确定整数 N 能够被整除的正整数个数 D 和素…

Quiet-STaR:让语言模型在“说话”前思考

大型语言模型(llm)已经变得越来越复杂&#xff0c;能够根据各种提示和问题生成人类质量的文本。但是他们的推理能力让仍然是个问题&#xff0c;与人类不同LLM经常在推理中涉及的隐含步骤中挣扎&#xff0c;这回导致输出可能在事实上不正确或缺乏逻辑。 考虑以下场景:正在阅读一…

可重复不限数量结构数列的演化

有一个6*6的平面&#xff0c;这个平面的行和列可以自由的变换&#xff0c;在这个平面上有一个4点结构数列 按照8&#xff0c;13&#xff0c;5&#xff0c;8的顺序排列。让这个数列按照4-5-4的方式演化 这个数列很快收敛,收敛顺序为13&#xff0c;8&#xff0c;8&#xff0c;5 8…

Revit文件版本查看小工具

最近群里和私信的时候&#xff0c;经常有小伙伴询问如何不打开Revit查看Revit文件的版本。 习惯性的&#xff0c;第一思路是打开Dynamo&#xff0c;但是第一反应还需要先开Revit。 另外呢&#xff0c;群里小伙伴说优比的插件也可以。 总之呢&#xff0c;都需要一些工具&#xf…

对接中泰极速行情 | DolphinDB XTP 插件使用教程

XTP 是中泰证券推出的高性能交易平台&#xff0c;专为专业投资者提供高速行情及交易系统&#xff0c;旨在提供优质便捷的市场接入通道。目前支持股票、基金、ETF、债券、期权等多个市场&#xff0c;可满足不同投资者需求。 基于 XTP 官方 C SDK&#xff0c;DolphinDB 开发了 X…

【IDEA】使用debug方式去运行java程序

什么是debug工具&#xff1f; 调试工具&#xff08;debug工具&#xff09;是一种用于帮助程序员识别和修复程序中的错误的工具。它们提供了一系列的功能&#xff0c;帮助程序员在代码执行的过程中跟踪和检测问题&#xff0c;例如查看变量的值、检查函数的调用栈、设置断点来停…

算法学习——LeetCode力扣动态规划篇2

算法学习——LeetCode力扣动态规划篇2 343. 整数拆分 343. 整数拆分 - 力扣&#xff08;LeetCode&#xff09; 描述 给定一个正整数 n &#xff0c;将其拆分为 k 个 正整数 的和&#xff08; k > 2 &#xff09;&#xff0c;并使这些整数的乘积最大化。 返回 你可以获得…

java: 找不到符号 符号: 变量 log

在以下位置加上该配置"-Djps.track.ap.dependenciesfalse"

文件操作(随机读写篇)

1. 铺垫 建议先看&#xff1a; 文件操作&#xff08;基础知识篇&#xff09;-CSDN博客 文件操作&#xff08;顺序读写篇&#xff09;-CSDN博客 首先要指出的是&#xff0c;本篇文章中的“文件指针”并不是指FILE*类型的指针&#xff0c;而是类似于打字时的光标的东西。 打…

C++:类的6大默认成员函数:赋值运算符重载

文章目录 赋值运算符重载1.1 运算符重载的引用1.2 运算符重载的概念1.3 赋值运算符重载总结一下(赋值运算符) 赋值运算符重载 1.1 运算符重载的引用 有一个日期类Date: class Date { public:Date(int year 1900, int month 1, int day 1){_year year;_month month;_da…

Rust使用feature特性和条件编译,以及常用feature使用说明

Cargo Feature 是非常强大的机制&#xff0c;可以为大家提供条件编译和可选依赖的高级特性&#xff0c;可以为你省下不少的代码量来判断操作系统和条件编译等功能。rust官方条件编译文档&#xff1a;Conditional compilation - The Rust Reference features特性 Featuure 可以…

GeometryInstance点击改变颜色

目录 项目地址实现效果核心代码 项目地址 https://github.com/zhengjie9510/webgis-demo 实现效果 核心代码 // Draw different instances each with a unique color const rectangleInstance new Cesium.GeometryInstance({geometry: new Cesium.RectangleGeometry({recta…

EFCore的空迁移(EFCore操作已存在的数据库表,不影响其中的数据)

背景&#xff1a;EFCore默认的会自动创建数据表&#xff0c;但是有时又是DBFirst&#xff0c;数据库写好了要用现成的表。这个时候就需要进行一些特殊的操作了 1、写出跟要对接数据库的实体类 比如我的表是这样创建的 create table mail_test (user_id bigint auto_increment …

【Entity Framework】EF中DbSet类详解

【Entity Framework】EF中DbSet类详解 文章目录 【Entity Framework】EF中DbSet类详解一、概述二、定义DbSet2.1 具有DbSet属性的DbContext2.2 具有 IDbSet 属性的 DbContext 2.3 具有 IDbSet 属性的 DbContext三、DbSet属性四、DbSet方法五、DbContext动态生成DbSet 一、概述 …

医院消防巡检系统革新:凡尔码平台二维码技术引领安全升级

医院消防巡检&#xff0c;传统依赖手工记录&#xff0c;效率和准确性受限。凡尔码平台的二维码消防巡检系统&#xff0c;以创新技术颠覆传统&#xff0c;实现即时、精准的安全管理&#xff0c;确保医院消防安全无虞。 凡尔码平台的消防巡检系统不仅提升了医院安全管理的效率&a…

基于SpringBoot + Vue实现的校园失物招领系统设计与实现+毕业论文

介绍 系统包含用户和管理员两个角色 用户&#xff1a;登录、注册、留言板、公告信息、失物招领、失物认领、寻物启事、个人中心、我发布的失物信息、我的失物认领、我发布的寻物启事、寻物启事留言等功能。 管理员&#xff1a;登录、基础数据管理、系统管理、留言板管理、失物信…

Linux项目自动化构建工具-make/ makefile及其应用:多文件编写第一个linux程序:进度条(懒人学习必备博文!!!)

目录 1.前言--make/makefile的引入 2.快速上手make/makefile---自动化构建 3.关于依赖关系和依赖方法 4.自动化清理 为什么我们执行编译的时候&#xff0c;make一下就好&#xff0c;清理却要使用make clean? 5. make/makefile是如何知道当前目录下可执行文件是否为最新 6.文件…

SQLServer sys.default_constraints介绍

sys.default_constraints 是 SQL Server 的系统视图&#xff0c;它包含了数据库中所有默认约束的信息。默认约束是数据库对象&#xff08;如表中的列&#xff09;的约束&#xff0c;它为列定义了一个默认值&#xff0c;当在插入新行时没有为该列提供值时&#xff0c;将使用这个…

集合嵌套,Collections,斗地主案例,日志框架

文章目录 集合嵌套List嵌套ListList嵌套MapMap嵌套Map Collections类方法排序 sort 乱序 shuffle 斗地主案例需求思路代码 日志框架介绍优势体系结构Logback概述快速入门配置详解 集合嵌套 List嵌套List public static void main(String[] args){//一个年级有许多班级&#xf…

spring boot 整合j2cache 基础操作

spring boot 整合缓存的内容呢 已经学了好久了 那么 今天 我们开始学习 j2cache 这个技术 并不是一个缓存 而是一个框架 我们可以将其他缓存配到这套框架上来 那么 我们就还是弄最熟悉的 ehcache redis进行整合 首先 我们启动 redis 然后 我们打开项目 pom.xml 注入依赖 …
最新文章