Flink+Paimon多流拼接性能优化实战

目录

(零)本文简介

(一)背景

(二)探索梳理过程

(三)源码改造

(四)修改效果

1、JOB状态

2、Level5的dataFile总大小

3、数据延迟

(五)未来展望:异步Compact


(零)本文简介

Paimon多流拼接/合并性能优化;

        为解决离线T+1多流拼接数据时效性Flink实时状态太大任务稳定性问题,这里基于数据湖工具Apache Paimon进行近实时的多流拼接。

        使用Flink+Paimon基于ParmaryKey TablePartialUpdate)进行多流拼接的时候,跑一段时间有时会遇到周期性背压、checkpoint时间过长等情况,本文通过剖析源码逻辑、修改源码,在一定程度上解决了这个问题。

Apache Paimon基础 、多流拼接方法 及 与Hudi 的对比 可参考前面文章:

新一代数据湖存储技术Apache Paimon入门Demo_Leonardo_KY的博客-CSDN博客

基于数据湖的多流拼接方案-HUDI概念篇_Leonardo_KY的博客-CSDN博客

(一)背景

       这里使用 Flink 1.14 + Apache Paimon 0.5 snapshot 进行多流拼接(前端埋点流 + 服务端埋点流);

        当前情况是一天一个分区,一个分区100个bucket;就会出现如下情况:分区/bucket中的数据越来越多,到达下午或者傍晚的时候就会出现 paimon 作业周期性背压(因为mergeTree中维护的数据越来越多,tree越来越大),checkpoint时间也会比较长;于是决定将mergeTree中的过期数据删除,即让其不进入tree中,减少计算量;

        这里的“过期”按需自定义,比如调研发现99.9%的数据都可以使用3个小时之内的数据拼接上,那就根据时间戳与当前时间戳(假设没有很严重的消费积压)相比,时间差超过3小时的数据就将其丢弃;

具体细节涉及到(这里先将结论给出):

    1. data文件创建后是否还会修改?(不会)
    2. 根据时间排序的data数据文件是增量还是全量?(几个最新文件加起来就是全量)
    3. 应该根据dataFile的创建/修改时间判断过期 还是 通过具体每个record字段值的时间戳判断过期?(通过record)

(二)探索梳理过程

1、首先观察hdfs文件之后发现,dataFile只保留最近一个小时的文件,超过一小时的文件就会被删除,这里应该对应参数 partition.expiration-check-interval = 1h,由此可知data文件不是增量的【下文compact只有几个文件再次加强验证】(那么就不能通过dataFile的最新修改时间判断文件过期将数据过滤);

2、观察flink log发现,每次compaction都只读几个文件,如下所示:

        每次其实只读取一个level0的file,再加上几个level5的file(level5这里file就是之前的全部数据,包含多个流的),最后将compact之后的文件再命名为新的名字写到level5;

        随着分区数据量的增多,参与compact的file也会越来越多(这也是会导致tree偏大,出现周期性背压的原因);

另外,dataFile命名呈现如下规律:

        level5的第二个文件总是跟第一个中间隔一个(这个跟改源码没有关系,只是适合观察规律);

到晚间的时候参与compact的file更多了:

3、观察每次level5生成的dataFile(理论上level5的dataFile会越来越大/多,当单个文件大小超过128M *(1+rate)时,会生成新文件);

        所有level5的文件大小加起来会越来越大,即永远是呈增长趋势;

        如下每一层的总大小在不断增大,同时当文件到一定程度之后,每层2个文件变成3个文件;

4、【以上3点均为原始实现思路,从这里开始改造】思考:既然已知每个bucket中只要最新的几个dataFile就包含了全部的data数据(dataFile不是增量的),那么就不能通过文件最新修改时间来判断数据是否过期,只能从最新的几个dataFile的每条记录来进行判断了,即原本每次参与合并的record是从这个partition+bucket建立开始的全部数据,那么是否可以通过修改源码判断每条record是否过期,从而不参与mergeTree,在compact完成之后也不会再次写入新的dataFile(如果还是写进来,每次读进tree时都需要判断是否过期,是否进入tree)?【答案当然是可以的!】

(三)源码改造

1、首先说明一下,在源码中有这么一段

// IntervalPartition.partition()
public List<List<SortedRun>> partition() {
    List<List<SortedRun>> result = new ArrayList<>();
    List<DataFileMeta> section = new ArrayList<>();
    BinaryRow bound = null;

    for (DataFileMeta meta : files) {
        if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
            // larger than current right bound, conclude current section and create a new one
            result.add(partition(section));
            section.clear();
            bound = null;
        }
        section.add(meta);
        if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {
            // update right bound
            bound = meta.maxKey();
        }
    }
    if (!section.isEmpty()) {
        // conclude last section
        result.add(partition(section));
    }

    return result;
}

        此处为了将文件排序、再将有overlap的放在一个list里边,一但产生gap(即没有overlap),那么就创建新的list,最终将这些 list 再放到List>中:

示意图如下:

2、后续通过一些处理变成 List> 的格式,这里的KeyValue就包含我们想要去操纵的record!

源码是这样的:

public <T> RecordReader<T> mergeSort(
        List<ReaderSupplier<KeyValue>> lazyReaders,
        Comparator<InternalRow> keyComparator,
        MergeFunctionWrapper<T> mergeFunction)
        throws IOException {
    if (ioManager != null && lazyReaders.size() > spillThreshold) {
        return spillMergeSort(lazyReaders, keyComparator, mergeFunction);
    }

    List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());
    for (ReaderSupplier<KeyValue> supplier : lazyReaders) {
        try {
            readers.add(supplier.get());
        } catch (IOException e) {
            // if one of the readers creating failed, we need to close them all.
            readers.forEach(IOUtils::closeQuietly);
            throw e;
        }
    }

    return SortMergeReader.createSortMergeReader(
            readers, keyComparator, mergeFunction, sortEngine);
}

        这里的return就会创建sortMergeReader了,我们可以在将数据传入这里之前,先进行过滤(通过判断每一条record是否超过过期时间),修改如下:

public <T> RecordReader<T> mergeSort(
        List<ReaderSupplier<KeyValue>> lazyReaders,
        Comparator<InternalRow> keyComparator,
        MergeFunctionWrapper<T> mergeFunction)
        throws IOException {
    if (ioManager != null && lazyReaders.size() > spillThreshold) {
        return spillMergeSort(lazyReaders, keyComparator, mergeFunction);
    }

    List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());
    for (ReaderSupplier<KeyValue> supplier : lazyReaders) {
        try {
            // 过滤掉过期数据
            RecordReader<KeyValue> filterSupplier =
                    supplier.get()
                            .filter(
                                    (KeyValue keyValue) ->
                                            isNotExpiredRecord(
                                                    keyValue.value(), expireTimeMillis));
            readers.add(filterSupplier);
        } catch (IOException e) {
            // if one of the readers creating failed, we need to close them all.
            readers.forEach(IOUtils::closeQuietly);
            throw e;
        }
    }

    return SortMergeReader.createSortMergeReader(
            readers,
            keyComparator,
            mergeFunction,
            sortEngine,
            keyType.getFieldTypes(),
            valueType.getFieldTypes());
}

// 判断这条数据是否过期
public boolean isNotExpiredRecord(InternalRow row, long expireTimeMillis) {
    if (expireTimeMillis <= 0) {
        return true;
    }
    // 只要有一个字段不为空,且大于0,且过期时间大于expireTimeMillis,就判断为过期
    for (Integer pos : expireFieldsPosSet) {
        if ((!row.isNullAt(pos))
                && row.getLong(pos) > 0
                && (System.currentTimeMillis() - row.getLong(pos)) > expireTimeMillis) {
            return false;
        }
    }
    return true;
}

与此同时,将相关参数暴露出来,可以在建表时进行自定义配置:

public static final ConfigOption<Integer> RECORDS_EXPIRED_HOUR =
        key("record.expired-hour")
                .intType()
                .defaultValue(-1)
                .withDescription(
                        "Records in streams WON'T be offered into MergeTree when they are expired."
                                + " (Inorder to avoid too large MergeTree; -1 means never expired). ");

public static final ConfigOption<String> RECORDS_EXPIRED_FIELDS =
        key("record.expired-fields")
                .stringType()
                .noDefaultValue()
                .withDescription(
                        "Records in streams WON'T be offered into MergeTree when they are judged as [expired] according to these fields."
                                + "If you specify multiple fields, delimiter is ','.");

使用方法:

val createPaimonJoinTable = (
  s"CREATE TABLE IF NOT EXISTS ${paimonTable}(\n"
    + " uuid STRING,\n"
    + " metaid STRING,\n"
    + " cid STRING,\n"
    + " area STRING,\n"
    + " ts1 bigint,\n"
    + " ts2 bigint,\n"
    + " d STRING, \n"
    + " PRIMARY KEY (d, uuid) NOT ENFORCED \n"
    + ") PARTITIONED BY (d) \n"
    + " WITH (\n" +
    "    'merge-engine' = 'partial-update',\n" +
    "    'changelog-producer' = 'full-compaction', \n" +
    "    'file.format' = 'orc', \n" +
    s"    'sink.managed.writer-buffer-memory' = '${sinkWriterBuffer}', \n" +
    s"    'full-compaction.delta-commits' = '${fullCompactionCommits}', \n" +
    s"    'scan.mode' = '${scanMode}', \n" +
    s"    'bucket' = '${bucketNum}', \n" +
    s"    'sink.parallelism' = '${sinkTaskNum}', \n" +
    s"    'record.expired-hour' = '3' , \n" +   // user defined para
    "     'record.expired-fileds' = '4,5' , \n" +   // user defined para
    "     'sequence.field' = 'ts1' \n" +
    ")"
  )
tableEnv.executeSql(createPaimonJoinTable)

(四)修改效果

1、JOB状态

运行到晚上20点尚未出现背压:

checkpoint时间也没有过长(如果不剔除过期数据,到这个时间cp时长应该在3分钟左右):

生产到Kafka的消息也没有严重的断流或者锯齿现象:

还是有可能出现exception如下(但对数据量没有任何影响):

2、Level5的dataFile总大小

        上边只是现象,最终还是要数据说话。

        修改源码之后,观察dataFile,理论上每一层的size总大小可能会出现减小的情况 (因为过期数据就不会再写入到 level5 新的data文件中了)

        如下图:levelSize diff(下一次level总size - 上一次level总size),确实出现了“有正有负”的情况,于是验证源码修改生效(即每次进行compact只会读取近 n 个小时的数据进行合并)!

3、数据延迟

有意思的是,当我们修改源码(将过期的数据丢弃)之后,数据延迟也变小了。

数据延迟计算方法:paimon处理完将数据写到kafka队列的时间戳 - 前端埋点被触发被服务器接收到的时间戳;

修改前:

修改后:

(五)未来展望:异步Compact

官方提供的paimon源码,里边的compaction是 sync 模式的,我尝试改成过 async 的,但是时不时会出现很少量的数据丢失(感觉可能是因为同一时刻有多个compact任务在进行),后续有机会可以再继续尝试一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/98965.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

研华I/O板卡 Win10+Qt+Cmake 开发环境搭建

文章目录 一.研华I/O板卡 Win10QtCmake 开发环境搭建 一.研华I/O板卡 Win10QtCmake 开发环境搭建 参考这个链接安装研华I/O板卡驱动程序系统环境变量添加研华板卡dll Qt新建一个c项目 cmakeList.txt中添加研华库文件 cmake_minimum_required(VERSION 3.5)project(advantechDA…

科技资讯|苹果发布新专利:可在车内定位苹果的智能设备

根据美国商标和专利局近期公示的清单&#xff0c;苹果公司获得了一项名为《车内定位移动设备的系统和方式》专利&#xff0c;概述了在车内狭窄空间内如何定位 iPhone 等移动设备。 Find My 服务现阶段没有使用 UWB 来追踪 iPhone 或者 iPad&#xff0c;而是依赖 GPS 等相关辅…

【Java并发】聊聊对象内存布局和syn锁升级过程

对象存储解析&#xff1a;一个空Object对象到底占据多少内存&#xff1f; 对象内存布局 Mark Word占用8字节&#xff0c;类型指针占用8个字节&#xff0c;对象头占用16个字节。 好了&#xff0c;我们来看一下一个Object对占用多少空间&#xff0c; 因为java默认是开启压缩…

Spring框架知识点汇总

01.Spring框架的基本理解 关键字&#xff1a;核心思想IOC/AOP&#xff0c;作用&#xff08;解耦&#xff0c;简化&#xff09;&#xff0c;简单描述框架组成&#xff1b; Spring框架是一款轻量级的开发框架&#xff0c;核心思想是IOC&#xff08;反转控制&#xff09;和AOP&a…

【C++】输入输出及格式控制

在各类算法竞赛和机试中&#xff0c;对测试数据和输出格式往往会有明确的规定&#xff0c;笔者结合个人刷题经历&#xff0c;得到了以下C语言输入输出控制的方法。 cin&#xff1a;从缓冲区中读取数据 cin>>从缓冲区中读取数据时&#xff0c;若缓冲区中第一个字符是空格…

DBeaver 23.1.5 发布

导读DBeaver 是一个免费开源的通用数据库工具&#xff0c;适用于开发人员和数据库管理员。DBeaver 23.1.5 现已发布&#xff0c;更新内容如下. Data editor 重新设计了词典查看器面板 UI 空间数据类型&#xff1a;曲线几何线性化已修复 数据保存时结果选项卡关闭的问题已解决…

2第一个Java程序

目录 1第一个Java代码 2类class 3运行Java文件 1第一个Java代码 public class Hello {public static void main(String[] args) {System.out.println("Hello, world!");} } 2类class public class Hello {public static void main(String[] args) {System.ou…

Win11搭建 Elasticsearch 7 集群(一)

一&#xff1a; ES与JDK版本匹配一览表 elasticsearch从7.0开始默认安装了java运行环境&#xff0c;以便在没有安装java运行环境的机器上运行。如果配置了环境变量JAVA_HOME&#xff0c;则elasticsearh启动时会使用JAVA_HOME作为java路径&#xff0c;否则使用elasticsearch根目…

以udp协议创建通信服务器

概念图 创建服务器让A,B主机完成通信。 认识接口 socket 返回值&#xff1a;套接字&#xff0c;你可以认为类似fd 参数&#xff1a; domain->:哪种套接字&#xff0c;常用AF_INET(网络套接字)、AF_LOCAL(本地套接字)type->&#xff1a;发送数据类型&#xff0c;常用 …

【校招VIP】校招考点之前端安全和注入

考点介绍&#xff1a; 随着前端的快速发展&#xff0c;各种技术不断更新&#xff0c;前端的安全问题也越来越值得我们重视。千万不要等到项目上线之后才去重视安全问题&#xff0c;到时候被黑客攻击一切都太晚了。今天的专题将讲述前端几大常见安全问题&#xff0c;在校招面试中…

【mysql】MySQL服务无法启动 NET HELPMSG 3534

MySQL服务无法启动 NET HELPMSG 3534 错误描述寻找原因解决方法 错误描述 mysql版本&#xff1a;8.1.0 mysql安装成功之后&#xff0c;使用net start mysql来启动mysql&#xff0c;然后出现了报错 MySQL服务无法启动 NET HELPMSG 3534 寻找原因 1、在cmd中&#xff0c;进入…

OpenCV: cv2.findContours - ValueError: too many values to unpack

OpenCV找轮廓findContours报错 ValueError: not enough values to unpack (expected 3,got 2) 问题指向这行代码&#x1f447; binary, cnts, hierarchy cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) 报错的意思是需要3个返回值但只给了两…

RabbitMQ工作模式-路由模式

官方文档参考&#xff1a;https://www.rabbitmq.com/tutorials/tutorial-four-python.html 使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才…

CSP的理解与绕过

文章目录 前言CSP简介CSP如何工作CSP指令CSP指令值 例题[AFCTF 2021]BABY_CSP 前言 刚学习完xss&#xff0c;把xsss-labs靶场都通了打算试试水&#xff0c;遇到此题[AFCTF 2021]BABY_CSP&#xff0c;借此机会学习下CSP CSP简介 Content Security Policy (CSP)内容安全策略&am…

【广州华锐互动】AR昆虫认知学习系统实现对昆虫形态的捕捉和还原

随着科技的不断发展&#xff0c;人们对自然界的认识也在不断加深。在这个过程中&#xff0c;AR&#xff08;增强现实&#xff09;技术的出现为人们带来了全新的体验方式。为此&#xff0c;广州华锐互动开发了AR昆虫认知学习系统&#xff0c;本文将为大家详细介绍这款系统的特点…

MinIO分布式存储k8s集群部署

一、MinIO是什么 MinIO是go开发的&#xff0c;高性能分布式存储&#xff1b;基于GNU AGPL v3开源&#xff0c;可免费使用&#xff1b; 官网&#xff1a;https://min.io/ github: https://github.com/minio/minio 官网宣传MinIO是世界上速度最快的分布式对象存储&#xff1b; …

【数据结构】顺序表详解

当我们写完通讯录后&#xff0c;顺序表肯定难不倒你&#xff0c;跟着小张一起来学习顺序表吧&#xff01; 线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线性表&#x…

【SpringCloud】SpringCloud整合openFeign

文章目录 前言1. 问题分析2. 了解Feign3. 项目整合Feign3.1 引入依赖3.2 添加注解3.3 编写Feign客户端3.4 测试3.5 总结 4. 自定义配置4.1 配置文件方式4.2 Java代码方式 5. Feign使用优化5.1 引入依赖5.2 配置连接池 6. Feign最佳实践6.1 继承方式6.2 抽取方式 前言 微服务远…

OpenCV

文章目录 OpenCV学习报告读取图片和网络摄像头1.1 图片读取1.2 视频读取1.1.1 读取视频文件1.1.2读取网络摄像头 OpenCV基础功能调整、裁剪图像3.1 调整图像大小3.2 裁剪图像 图像上绘制形状和文本4.1 图像上绘制形状4.2图像上写文字 透视变换图像拼接颜色检测轮廓检测人脸检测…

Spring Boot集成MyBatis Plus

文章目录 一、前言二、步骤2.1、步骤 1&#xff1a;创建 Spring Boot 项目2.2、添加依赖2.2.1、基本的Spring和Spring MVC功能2.2.2、MySQL驱动依赖2.2.3、 MyBatis Plus 的依赖 2.3、配置数据库连接2.4、创建实体类2.5、创建 Mapper 接口2.6、编写 Service 层2.7、编写 Contro…
最新文章