206.Flink(一):flink概述,flink集群搭建,flink中执行任务,单节点、yarn运行模式,三种部署模式的具体实现

一、Flink概述

1.基本描述

Flink官网地址:Apache Flink® — Stateful Computations over Data Streams | Apache Flink

Flink是一个框架分布式处理引擎,用于对无界有界数据流进行有状态计算

 2.有界流和无界流

  • 无界流(流):
    • 有定义流的开始,没有定义结束。会无休止产生数据
    • 无界流数据必须持续处理
  • 有界流(批):
    • 有定义流的开始,也有定义流的结束
    • 可以拿到所有数据后再进行处理,并且做排序
    • 有界流通常被称为批处理

3.有状态

flink中除了流之外还会有额外的数据,用来对这些流做一些状态统计。

比如流是路上的汽车,我们是路边的人,数过去了多少车。过去一辆我们可以记一个,再过去就2个。也可以通过画正字的方式记录,最后通过统计正字来得到过去多少车。这里的数字以及正字,就是车以外的额外数据,用作统计。我们每来一个车统计一下,统计完之后可以对外输出。同时,每过一段时间会持久化一下,以防丢失。 

4.flink的特点

低延迟、高吞吐、结果准确、良好的容错

  • 高吞吐、低延迟:每秒可以处理数百万个事件,毫秒级延迟
  • 结果准确:flink提供事件事件(event_time)和处理时间(processing_time)语义。对于乱序事件流,事件事件语序仍然能提供一致且精确的结果
  • 精确一次(exactly-once)的状态一致性保证
  • 可以连接到常见的存储系统:kafka,hive,jdbc,hdfs,redis等
  • 高可用:本身就是高可用,配合k8s,yarn和mesos的紧密集成,再加上从故障中快速恢复和动态扩展的能力,可以以极少的停机时间实现7*24小时运行

5.flink和spark的区别

  • spark以批处理为根本
    • spark采用rdd模型,所谓rdd就是每3秒看做的一个批次,spark引擎处理这三秒的数据。spark streaming的Dstream实际上就是一组组rdd的集合
    • spark是批计算,将DAG划分为不同的stage,一个完成才计算下一个
  • Flink以流处理为根本
    • flink基本模型是数据流,以及事件序列
    • flink是标准的流执行模式,一个事件在一个节点处理完之后可以直接下发下一个节点处理

spark:

flink:

flink spark
计算模型 流计算 微批计算
时间语序 事件事件、处理时间 处理时间
窗口 多、灵活 少、不灵活
窗口必须是批次的整数倍
状态        没有
流式sql 没有

6.flink应用场景

电商、市场营销

物联网(IOT)

物流配送,服务业

银行,金融

7.flink分层api

  • 有状态流处理:通过底层api (处理函数),对最原始的数据加工处理。与DataStream api集成,可以处理复杂计算
  • DataStream(流处理)/DataSet(批处理) api:封装了底层api,提供转换、连接、聚合、窗口等通用模块。在flink1.12之后,DataSet被合到DataStream里面去了,即DataStream是批流都可以处理的api
  • Table api:以表为中心的声明式编程。可以与DataStream无缝切换
  • sql:以sql查询表达式的形式表现程序,可以在table api的表上执行

简单来说,就是flink的一层层封装。

二、Flink快速上手

1.创建项目

新建一个maven项目:

2.导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>FlinkTutorial-1.17</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
</project>

3.创建文件夹

新建一个input文件夹,里面一个txt,随便输入一些单词

4.批处理形式的word count编写(已过时)

注:此种方式使用的是DataSet API。我们新的版本已经将批和流都统一到DataStream API中了,因此这种方式的代码编写看一看就好,已过时。

package com.atguigu.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * TODO DataSet API 实现 wordcount(不推荐)
 */
public class BatchWordCount {
	public static void main(String[] args) throws Exception {
		// TODO 1. 创建执行环境
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// TODO 2.读取数据:从文件中读取
		DataSource<String> lineDS = env.readTextFile("input/word.txt");

		// TODO 3.切分、转换 (word,1)
		FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
				// TODO 3.1 按照 空格 切分单词
				String[] wo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/98363.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【webpack】HMR热更新原理

本文&#xff1a;参考文章 一、HMR是什么&#xff0c;为什么出现 1、出现的原因 之前&#xff0c;应用的加载、更新都是一个页面级别的操作&#xff0c;即使单个代码文件更新&#xff0c;整个页面都要刷新&#xff0c;才能拿到最新的代码同步到浏览器&#xff0c;导致会丢失…

【C语言】循环语句详解

✨个人主页&#xff1a; Anmia.&#x1f389;所属专栏&#xff1a; C Language &#x1f383;操作环境&#xff1a; Visual Studio 2019 版本 目录 1.什么是循环结构&#xff1f; 2.while循环 while流程图 while语句中的break和continue break continue 3.for循环 for流…

滑动窗口系列4-Leetcode322题零钱兑换-限制张数-暴力递归到动态规划再到滑动窗口

这个题目是Leecode322的变种&#xff0c;322原题如下&#xff1a; 我们这里的变化是把硬币变成可以重复的&#xff0c;并且只有coins数组中给出的这么多的金币&#xff0c;也就是说有数量限制&#xff1a; package dataStructure.leecode.practice;import java.util.Arrays; i…

金融风控数据分析-信用评分卡建模(附数据集下载地址)

本文引用自&#xff1a; 金融风控&#xff1a;信用评分卡建模流程 - 知乎 (zhihu.com) 在原文的基础上加上了一部分自己的理解&#xff0c;转载在CSDN上作为保留记录。 本文涉及到的数据集可直接从天池上面下载&#xff1a; Give Me Some Credit给我一些荣誉_数据集-阿里云…

Docker搭建私有仓库并迁移

目录 方案 A、B机器安装docker 设置阿里云镜像源 安装 Docker-CE并设置为开机自动启动 A机器准备数据 拷贝数据 B机器运行redis、mysql镜像 重启docker服务 方案 准备两台机器&#xff1a;A机器&#xff08;可以连接外网&#xff09;&#xff0c;B机器&#xff08;内网机器…

python web GUI框架-NiceGUI 教程(一)

python web GUI框架-NiceGUI 教程&#xff08;一&#xff09; streamlit可以在一些简单的场景下仍然推荐使用&#xff0c;但是streamlit实在不灵活&#xff0c;受限于它的核心机制&#xff0c;NiceGUI是一个灵活的web框架&#xff0c;可以做web网站也可以打包成独立的exe。 基…

Docker 将容器打包成镜像推送镜像到仓库

Docker 将容器打包成镜像&推送镜像到仓库 一、将容器打包成镜像 $ docker commit <容器ID> <镜像名称:标签>示例&#xff1a; $ sudo docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS …

字节跳动推出AI对话工具“豆包”:免费用

我是卢松松&#xff0c;点点上面的头像&#xff0c;欢迎关注我哦&#xff01; 听说松松客服的小马爆料了一个消息&#xff1a;字节跳动推出了一个新的AI大模型对话工具&#xff0c;叫做“豆包”。竟然一查发现&#xff0c;早在8月18号就已经上线了呢。原来这个“豆包”其实是之…

Kind创建本地环境安装Ingress

目录 1.K8s什么要使用Ingress 2.在本地K8s集群安装Nginx Ingress controller 2.1.使用Kind创建本地集群 2.1.1.创建kind配置文件 2.1.2.执行创建命令 2.2.找到和当前k8s版本匹配的Ingress版本 2.2.1.查看当前的K8s版本 2.2.2.在官网中找到对应的合适版本 2.3.按照版本安…

day30 日期转换

一&#xff1a;Date Date类&#xff1a; 这个类是java.util.Date getTime() : 获取内部维护的long值 Date date new Date(); long time date.getTime(); setTime()&#xff1a;按照指定的long值&#xff08;表示的时间&#xff09;设置Date表示的时间 time 60*60*24*1000;…

Android学习之路(11) ActionBar与ToolBar的使用

自android5.0开始&#xff0c;AppCompatActivity代替ActionBarActivity&#xff0c;而且ToolBar也代替了ActionBar&#xff0c;下面就是ActionBar和ToolBar的使用 ActionBar 1、截图 2、使用 2.1、AppCompatActivity和其对应的Theme AppCompatActivity使用的是v7的ActionBa…

快乐开源活动全面升级!提PR,赢PS5、Switch等缤纷好礼

快乐开源 活动升级 礼品升级 PS5、Switch、Apple、雷蛇、富士…… 开发者们想要的 我们安排&#xff01; 飞桨快乐开源活动旨在鼓励更多的开发者参与到飞桨社区的开源建设中&#xff0c;帮助修复 bug 或贡献 feature。活动起初只是一个「提 PR 领取新年礼物 &#x1f381;」…

python简介

Python 是一门优雅而健壮的编程语言&#xff0c;它继承了传统编译语言的强大性和通用性&#xff0c;同时也借鉴了脚本语言和解释语言的易用性。 Python 的历史 Python是由创始人贵铎范罗萨姆&#xff08;Guido van Rossum&#xff09;在阿姆斯特丹于1989年圣诞节期间&#xf…

多张图片转为pdf怎么弄?

多张图片转为pdf怎么弄&#xff1f;在网络传输过程中&#xff0c;为了避免图片格式文件出现差错&#xff0c;并确保图片的清晰度和色彩不因不同设备而有所改变&#xff0c;常见的做法是将图片转换为PDF格式。然而&#xff0c;当涉及到多张图片时&#xff0c;逐一转换将会变得相…

PyCharm切换虚拟环境

PyCharm切换虚拟环境 为了满足不同任务需要不同版本的包&#xff0c;可以在Anaconda或者Miniconda创建多个虚拟环境文件夹&#xff0c;并在PyCharm下切换虚拟环境。 解决方案 1、打开Ananconda Prompt 2、创建自己的虚拟环境 格式&#xff1a;conda create -n 虚拟环境名字…

虚拟机的使用

首先需要安装VMware软件&#xff0c;这是虚拟机&#xff0c;在里面可以实现在windows的笔记本上运行包括&#xff0c;windows11和linux系统的开发和研究。 VMware是一种虚拟化技术&#xff0c;可以让你在一台物理计算机上运行多个操作系统和应用程序&#xff0c;而不需要重启或…

如何在,Linux中安装Luajit2.*

1.文件下载The LuaJIT Project 2.将下载文件上传到对应的服务器&#xff1a;例如/opt 3.进入对应的文件夹 4.make PREFIX/usr/local&#xff0c;设置安装路径 5.make install&#xff0c;编译安装 6.进入安装目录&#xff0c;cd /usr/local/include/luajit-2.0 7.luajit -v…

目标检测笔记(十二):如何通过界面化操作YOLOv5完成数据集的自动标注

文章目录 一、意义二、修改源码获取三、自动标注前期准备四、开始自动标注五、可视化标注效果六、XML转换TXT 一、意义 通过界面化操作YOLOv5完成数据集的自动标注的意义在于简化数据标注的流程&#xff0c;提高标注的效率和准确性。 传统的数据集标注通常需要手动绘制边界框…

第三届计算机、物联网与控制工程国际学术会议(CITCE 2023)

第三届计算机、物联网与控制工程国际学术会议&#xff08;CITCE 2023) The 3rd International Conference on Computer, Internet of Things and Control Engineering&#xff08;CITCE 2023) 第三届计算机、物联网与控制工程国际学术会议&#xff08;CITCE 2023&#xff09;…

ChatGPT在医疗领域可应用于改善与患者的沟通

注意&#xff1a;本信息仅供参考&#xff0c;发布该内容旨在传递更多信息的目的&#xff0c;并不意味着赞同其观点或证实其说法。 自从ChatGPT在2022年末对公众开放以来&#xff0c;OpenAI的这款生成式AI聊天机器人在医疗领域展示出了巨大潜力。它已经通过了美国医学执照考试&a…