Apache Paimon 文件操作

本文旨在澄清不同文件操作对文件的影响。

本页面提供具体示例和实用技巧,以有效地管理这些操作。此外,通过对提交(commit)和压实(compact)等操作的深入探讨,我们旨在提供有关文件创建和更新的见解。

前提

对以下几篇有了解:
1、Apache Paimon 介绍
2、Apache Paimon 基础概念
3、Apache Paimon 文件布局设计
4、知道如何在 Flink 中使用 Paimon

创建 catalog

在 Flink lib 中放入 paimon-flink 依赖包,执行  ./sql-client.sh 启动 Flink SQL Client,然后执行下面的命令去创建 Paimon catlog:

CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 'file:///tmp/paimon'
);

USE CATALOG paimon;

执行完后会在 file:///tmp/paimon 路径下创建目录 default.db

创建 Table

执行下面的命令会创建一个带有 3 个属性的 Paimon 表:

CREATE TABLE T (
  id BIGINT,
  a INT,
  b STRING,
  dt STRING COMMENT 'timestamp string in format yyyyMMdd',
  PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt);

执行后,Paimon 表 T 会在 /tmp/paimon/default.db/T 目录下生成目录,它的 schema 会存放在目录 /tmp/paimon/default.db/T/schema/schema-0 下。

c37214dd4edc3b3e73604a12c1d91d38.png

写入数据到 Table

INSERT INTO T VALUES (1, 10001, 'varchar00001', '20230501');
5ee958a74e2c7719d687bf90f4242668.png

用户可以通过执行查询 SELECT * FROM T 来验证这些记录的可见性,该查询将返回一行结果。提交过程会创建一个位于路径 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。快照-1 的文件布局如下所述:

一旦任务运行完成变成 finished 时,提交成功后数据就写入到 Paimon 表中。用户可以通过执行查询 SELECT * FROM T 来验证数据的可见性,该查询将返回一行结果。

另外可以发现目录的结构发生如下变化,新增 dt=20230501manifestsnapshot三个目录。三个的目录结构如下:

10beb60936705a7907e03ffa1169a724.png

查询目录下的数据如下所示:

4ac9fbff8bda08899c8c86c09a8ca923.png

这是因为提交过程中会创建一个位于 /tmp/paimon/default.db/T/snapshot/snapshot-1 的快照。snapshot-1 的文件布局如下所述:

ac5165245eb8114a939e11e1b51262b7.png
snapshot-1的内容包含了这个 snapshot 的元数据,比如 manifest list  和 schema id:

{
  "version" : 3,
  "id" : 1,
  "schemaId" : 0,
  "baseManifestList" : "manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0",
  "deltaManifestList" : "manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1",
  "changelogManifestList" : null,
  "commitUser" : "5132ef16-41ec-4172-8bf4-01a304507b36",
  "commitIdentifier" : 9223372036854775807,
  "commitKind" : "APPEND",
  "timeMillis" : 1697080282120,
  "logOffsets" : { },
  "totalRecordCount" : 1,
  "deltaRecordCount" : 1,
  "changelogRecordCount" : 0,
  "watermark" : -9223372036854775808
}

需要提醒的是,manifest list 包含了 snapshot 的所有更改,baseManifestList 是应用在 deltaManifestList 中的更改所基于的基本文件。第一次提交将导致生成 1 个清单文件,并创建了 2 个清单列表(文件名可能与你的实验中的不同):

➜  T 
manifest-232271e3-f294-4556-8947-ee87483c3bfd-0
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1
6636815cd4bff6ef359c6e0e46705656.png
image.png

manifest-232271e3-f294-4556-8947-ee87483c3bfd-0: 如前面文件布局图中的 manifest-1-0,存储了关于 snapshot 中数据文件信息的 manifest。

cf9bbbbb14fdab1b0ffdb68af60e2b09.png

manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0: 是基础的 baseManifestList,如前面文件布局图中的 manifest-list-1-base,实际上是空的。

00210ba45c64d5d1a01f54ce88389212.png

manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1:是deltaManifestList,如前面文件布局图中的 manifest-list-1-delta,其中包含一系列对数据文件执行操作的清单条目,而在这种情况下,清单条目为 manifest-1-0

5254d85be145f7145c1a4d0176519df7.png

在不同分区中插入一批记录,在 Flink SQL 中执行以下语句:

INSERT INTO T VALUES 
(2, 10002, 'varchar00002', '20230502'),
(3, 10003, 'varchar00003', '20230503'),
(4, 10004, 'varchar00004', '20230504'),
(5, 10005, 'varchar00005', '20230505'),
(6, 10006, 'varchar00006', '20230506'),
(7, 10007, 'varchar00007', '20230507'),
(8, 10008, 'varchar00008', '20230508'),
(9, 10009, 'varchar00009', '20230509'),
(10, 10010, 'varchar00010', '20230510');
78c1c0510ee7690053425a27d2f2c475.png

等任务执行完成并提交快照后,执行 SELECT * FROM T 将返回 10 行数据。

53935a35ecbf59ebf67b1e59f904d4ac.png

创建了一个新的快照,即 snapshot-2,并给出了以下物理文件布局:

ac71d4fd4c53aa07e60a8f4e0b80d2f8.png

dt=20230501
dt=20230502
dt=20230503
dt=20230504
dt=20230505
dt=20230506
dt=20230507
dt=20230508
dt=20230509
dt=20230510
manifest
schema
snapshot


➜  T ll snapshot
EARLIEST
LATEST
snapshot-1
snapshot-2

➜  T ll manifest
manifest-232271e3-f294-4556-8947-ee87483c3bfd-0 //snapshot-1 manifest file
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-0 //snapshot-1 baseManifestList
manifest-list-989f0744-4419-48dd-b0fd-955343e6e803-1 //snapshot-1 deltaManifestList

manifest-ed94ba0c-e71f-4a01-863c-479b34af2551-0 //snapshot-2 manifest file
manifest-list-03bad670-05ae-48b2-b88c-de631ad14333-0 //snapshot-2 baseManifestList
manifest-list-03bad670-05ae-48b2-b88c-de631ad14333-1 snapshot-2 baseManifestList

新的快照文件布局图如下:

d83d7ca3ede10fcff688245a1649159a.png

删除数据

接下来删除满足条件 dt>=20230503 的数据。在 Flink SQL Client 中,执行以下语句:

DELETE FROM T WHERE dt >= '20230503';

注意⚠️:

1、需使用 Flink 1.17 及以上版本,否则不支持该语法

Flink SQL> DELETE FROM T WHERE dt >= '20230503';
>
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Unsupported query: DELETE FROM T WHERE dt >= '20230503';
3f6e34d0e4fa8108ee248f35d9e333e0.png

2、使用 batch 模式,否则报错不支持

Flink SQL> DELETE FROM T WHERE dt >= '20230503';
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: DELETE statement is not supported for streaming mode now.
8cba985e4a6077c4c8d03e0fcd7ccae1.png

需要设置 SET 'execution.runtime-mode' = 'batch';

f6ae65f4d88ef90106d7f7a91be18d20.png

cfe389a547190d230108158d2d4bdc7f.png

第三次提交完成,并生成了快照 snapshot-3。现在,表下的目录,会发现没有分区被删除。相反,为分区 20230503 到 20230510 创建了一个新的数据文件:

71d27104f729d5dafd9f49a3bc265894.png

➜  T ll dt=20230510/bucket-0

data-0531fa1e-6ff1-47ed-aeea-3a01896c9698-0.orc # newer data file created by the delete statement 
data-0c8a6a16-13c5-4049-b4ed-e968d34e20ae-0.orc # older data file created by the insert statement

这是有道理的,因为我们在第二次提交中插入了一条数据(为 +I[10, 10010, 'varchar00010', '20230510'] ),然后在第三次提交中删除了数据。现在再执行一下 SELECT * FROM T只能查询到两条数据。

+I[1, 10001, 'varchar00001', '20230501']
+I[2, 10002, 'varchar00002', '20230502']
9e2bfece666a555f0c787c08db76be83.png

f54388d8ebd37539035300e9f3b61db4.png

snapshot-3后新的文件布局如下图:

cf60933c659066f1353336aa7d6d9654.png

9625cbe157d4995a7c90f8214bbc9a69.png

4dfa9936a847f42d9e550daadae4c426.png

请注意,manifest-3-0 包含了 8 个 ADD 操作类型的清单条目,对应着 8 个新写入的数据文件。

ae60db9dd79c4b573cedd100e82d1320.png

Compact Table

你可能已经注意到的,随着连续 snapshot 的增加,小文件的数量会增加,这可能会导致读取性能下降。因此,需要进行全量压缩以减少小文件的数量。

通过 flink run 运行一个专用的合并小文件任务:

<FLINK_HOME>/bin/flink run \
    -D execution.runtime-mode=batch \
    ./paimon-flink-action-0.6-SNAPSHOT.jar \
    compact \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    [--partition <partition-name>] \
    [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
    [--table-conf <paimon-table-dynamic-conf> [--table-conf <paimon-table-dynamic-conf>] ...]

例如(提前下载好压缩任务 jar 在 Flink 客户端下):

./bin/flink run \
   -D execution.runtime-mode=batch \
    ./paimon-flink-action-0.6-20231012.001913-36.jar \
    compact \
    --path file:///tmp/paimon/default.db/T
e5563a9678ae637da9037c652ed4d9fc.png

d3e949f396eadd31eb1e8bc113d5b85d.png

压缩任务结束后,再来看下 snapshot-4 文件结构:

{
  "version" : 3,
  "id" : 4,
  "schemaId" : 0,
  "baseManifestList" : "manifest-list-00af18ef-486d-4046-be60-25533e078333-0",
  "deltaManifestList" : "manifest-list-00af18ef-486d-4046-be60-25533e078333-1",
  "changelogManifestList" : null,
  "commitUser" : "ab094a14-17e0-4215-8e79-cd0651436dee",
  "commitIdentifier" : 9223372036854775807,
  "commitKind" : "COMPACT",
  "timeMillis" : 1697102418177,
  "logOffsets" : { },
  "totalRecordCount" : 2,
  "deltaRecordCount" : -16,
  "changelogRecordCount" : 0,
  "watermark" : -9223372036854775808
}
dae13a84c0deafcf77d7b3459948e9b7.png

16d2ce5a4f4b9c41ea7bbb10b07c9f4e.png

manifest-4-0 包含 20 个清单条目(18 个 DELETE 操作和 2 个 ADD 操作):

  • 对于分区 20230503 到 20230510,有两个删除操作,对应两个数据文件

  • 对于分区 20230501 到 20230502,有一个删除操作和一个添加操作,对应同一个数据文件。

Alter Table

执行以下语句来配置全量压缩:

ALTER TABLE T SET ('full-compaction.delta-commits' = '1');

这将为 Paimon 表创建一个新的 schema,即 schema-1,但在下一次提交之前,不会有任何 snapshot 使用此模式。

3e105c1f595d8d263e76ea29b28e59ff.png

过期 snapshot

请注意,标记为删除的数据文件直到 snapshot 过期且没有任何消费者依赖于该 snapshot 时才会真正被删除。参考 Manage Snapshots 可以查阅更多信息。

在 snapshot 过期过程中,首先确定 snapshot 的范围,然后标记这些 snapshot 内的数据文件以进行删除。只有当存在引用特定数据文件的 DELETE 类型的清单条目时,才会标记该数据文件进行删除。这种标记确保文件不会被后续的 snapshot 使用,并且可以安全地删除。

假设上图中的所有 4 个 snapshot 即将过期。过期过程如下:

1、首先删除所有标记为删除的数据文件,并记录任何更改的 bucket。
2、然后删除所有的 changelog 文件和关联的 manifests。
3、最后,删除快照本身并写入最早的提示文件。

如果删除过程后留下的空目录,也将被删除。

假设创建了另一个快照 snapshot-5,并触发了快照过期。将删除 snapshot-1 到 snapshot-4。为简单起见,我们只关注以前快照的文件,快照过期后的最终布局如下:

20b502dfeebe97fd3c1763b3935a4233.png
因此,分区 20230503 到 20230510 的数据被物理删除了。

Flink 流式写入

我们通过利用 CDC 数据的示例来测试 Flink 流式写入,将介绍将变更数据捕获并写入 Paimon 的过程,以及异步压缩、快照提交和过期的机制背后的原理。帮我们更详细地了解 CDC 数据摄取的工作流程以及每个参与组件所扮演的独特角色。

d0a88a3857ccb937c6ce88ebb9d27e83.png

1、MySQL CDC Source 统一读取快照数据和增量数据,其中 SnapshotReader 读取快照数据,而 BinlogReader 读取增量数据。
2、Paimon Sink 将数据按 Bucket 级别写入 Paimon 表中。其中的 CompactManager 将异步触发压缩操作。
3、Committer Operator 是一个单例,负责提交和过期快照。

接下来,我们将逐步介绍端到端的数据流程:

0c81aca22d6de1f13cd71dd5e312c7de.png

MySQL CDC Source 首先读取快照数据和增量数据,然后对它们进行规范化处理,并将其发送到下游。

f1c5797ae530bdb31755ef8f5d4d07e6.png

Paimon Sink 首先将新记录缓存在基于堆的 LSM 树中,并在内存缓冲区满时将其 flush 到磁盘上。请注意,每个写入的数据文件都是一个 sorted run。在这个阶段,还没有创建 manifest 文件和 snapshot。在 Flink 执行 Checkpoint 之前,Paimon Sink 将 flush 所有缓冲的记录并发送可提交的消息到下游,下游会在 Checkpoint 期间由 Committer Operator 读取并提交。

b439f35f502c32b49c12c9d4e318a091.png

在 Checkpoint 期间,Committer Operator 将创建一个新的 snapshot,并将其与 manifest lists 关联,以便snapshot 包含表中所有数据文件的信息。

a91a006ef76561aab533def80691a5c8.png

稍等一会后,可能会进行异步压缩,CompactManager 生成的可提交消息包含有关先前文件和合并文件的信息,以便 Committer Operator 可以构建相应的 manifest entries。在这种情况下,Committer Operator 在Flink Checkpoint 期间可能会生成两个 snapshot,一个用于写入的数据(类型为 Append 的快照),另一个用于压缩(类型为 Compact 的快照)。如果在 Checkpoint 间隔期间没有写入数据文件,则只会创建类型为Compact 的快照。Committer Operator 将检查 snapshot 的过期情况,并对标记为删除的数据文件执行物理删除操作。

b2863a1c2486c6419bf27704d9ec5ff4.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/379915.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Qt未来市场洞察

跨平台开发&#xff1a;Qt作为一种跨平台的开发框架&#xff0c;具有良好的适应性和灵活性&#xff0c;未来将继续受到广泛应用。随着多设备和多平台应用的增加&#xff0c;Qt的前景在跨平台开发领域将更加广阔。 物联网应用&#xff1a;由于Qt对嵌入式系统和物联网应用的良好支…

【Spring】Spring 对 Ioc 的实现

一、Ioc 控制反转 控制反转是一种思想 控制反转是为了降低程序耦合度&#xff0c;提高程序扩展力&#xff0c;达到 OCP 原则&#xff0c;达到 DIP 原则 控制反转&#xff0c;反转的是什么&#xff1f; 将对象的创建权利交出去&#xff0c;交给第三方容器负责 将对象和对象之…

Redis Centos7 安装到启动

文章目录 安装Redis启动redis查看redis状况连接redis服务端 安装Redis 1.下载scl源 yum install centos-release-scl-rh2.下载redis yum install rh-redis5-redis 3. 创建软连接 1.cd /usr/bin 2. In -s /opt/rh/rh-redis5/root/usr/bin/redis-server ./redis-server 3. …

Django(十)

1. Ajax请求 浏览器向网站发送请求时&#xff1a;URL 和 表单的形式提交。 GETPOST 特点&#xff1a;页面刷新。 除此之外&#xff0c;也可以基于Ajax向后台发送请求&#xff08;偷偷的发送请求&#xff09;。 依赖jQuery编写ajax代码 $.ajax({url:"发送的地址"…

二维差分算法小笔记

文章目录 一.二维差分构造差分二维数组二维差分算法状态dp求b[i][j]数组的二维前缀和图解 二.三维前缀和与差分三维前缀和图解:三维差分核心公式图解:模板题 一.二维差分 给定一个原二维数组a[i][j],若要给a[i][j]中以(x1,y1)和(x2,y2)为对角线的子矩阵中每个数都加上一个常数…

v-if 和v-for的联合规则及示例

第073个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使用&#xff0c;computed&a…

机器学习10-特征缩放

特征缩放的目的是确保不同特征的数值范围相近&#xff0c;使得模型在训练过程中更加稳定&#xff0c;加速模型收敛&#xff0c;提高模型性能。具体而言&#xff0c;零均值和单位方差的目标有以下几点好处&#xff1a; 1. 均值为零&#xff08;Zero Mean&#xff09;&#xff1a…

【LeetCode】37. 解数独(困难)——代码随想录算法训练营Day30

题目链接&#xff1a;37. 解数独 题目描述 编写一个程序&#xff0c;通过填充空格来解决数独问题。 数独的解法需 遵循如下规则&#xff1a; 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&…

Spring Authorization Server Spring Security密码加密

文章目录 一、修改密码编码器二、效果三、注意点1. RegisteredClient2. UserDetailsService 一、修改密码编码器 以BCryptPasswordEncoder举例。 直接将其注册成PasswordEncoder 的Bean即可。 Beanpublic PasswordEncoder passwordEncoder() {// 密码为明文方式 // ret…

数据结构(C语言)代码实现(八)——顺序栈实现数值转换行编辑程序括号分配汉诺塔

目录 参考资料 顺序栈的实现 头文件SqStack.h&#xff08;顺序栈函数声明&#xff09; 源文件SqStack.cpp&#xff08;顺序栈函数实现&#xff09; 顺序栈的三个应用 数值转换 行编辑程序 顺序栈的实现测试 栈与递归的实现&#xff08;以汉诺塔为例&#xff09; 参考资…

预测模型:MATLAB线性回归

1. 线性回归模型的基本原理 线性回归是统计学中用来预测连续变量之间关系的一种方法。它假设变量之间存在线性关系&#xff0c;可以通过一个或多个自变量&#xff08;预测变量&#xff09;来预测因变量&#xff08;响应变量&#xff09;的值。基本的线性回归模型可以表示为&…

【03】C++ 类和对象 2:默认成员函数

文章目录 &#x1f308; 前言&#x1f308; Ⅰ 构造函数1. 构造函数概念2. 构造函数特性3. 初始化列表 &#x1f308; Ⅱ 析构函数1. 析构函数概念2. 析构函数特性 &#x1f308; Ⅲ 拷贝构造1. 拷贝构造概念2. 拷贝构造特性3. 深度拷贝构造 &#x1f308; Ⅳ 赋值重载1. 运算符…

Selenium自动化测试框架的搭建

说起自动化测试&#xff0c;我想大家都会有个疑问&#xff0c;要不要做自动化测试&#xff1f; 自动化测试给我们带来的收益是否会超出在建设时所投入的成本&#xff0c;这个嘛别说是我&#xff0c;即便是高手也很难回答&#xff0c;自动化测试的初衷是美好的&#xff0c;而测试…

【Linux】vim的基本操作与配置(下)

Hello everybody!今天我们继续讲解vim的操作与配置&#xff0c;希望大家在看过这篇文章与上篇文章后都能够轻松上手vim! 1.补充 在上一篇文章中我们说过了&#xff0c;在底行模式下set nu可以显示行号。今天补充一条&#xff1a;set nonu可以取消行号。这两条命令大家看看就可…

SpringCloud-Ribbon:负载均衡(基于客户端)

6. Ribbon&#xff1a;负载均衡(基于客户端) 6.1 负载均衡以及Ribbon Ribbon是什么&#xff1f; Spring Cloud Ribbon 是基于Netflix Ribbon 实现的一套客户端负载均衡的工具。简单的说&#xff0c;Ribbon 是 Netflix 发布的开源项目&#xff0c;主要功能是提供客户端的软件负…

JRebel激活-nginx版本

nginx转发流量&#xff08;代替其他网上说的那个工具&#xff09; proxy_pass http://idea.lanyus.com; 工具激活 填写内容说明&#xff1a; 第一行的激活网址是&#xff1a;http://127.0.0.1:8888/ 正确的GUID。GUID 可以通过专门的网站来生成&#xff08;点击打开&#…

Redis事务和Redis管道

文章目录 1.Redis事务1.1 Redis事务是什么&#xff0c;能干嘛&#xff1f;1.2 Redis事务和数据库事务的差异1.3 Redis事务的相关命令 2.Redis管道2.1 Redis管道是什么2.2 管道与原生批量命令对比2.3 管道与事务对比2.4 使用管道注意事项 1.Redis事务 1.1 Redis事务是什么&…

机器学习:分类决策树(Python)

一、各种熵的计算 entropy_utils.py import numpy as np # 数值计算 import math # 标量数据的计算class EntropyUtils:"""决策树中各种熵的计算&#xff0c;包括信息熵、信息增益、信息增益率、基尼指数。统一要求&#xff1a;按照信息增益最大、信息增益率…

c++随机数生成进阶random与随之种子生成器的使用

随机数的作用我就不说了&#xff0c;但凡要用随机数的童鞋一定是有这个需求。下面我们就分三个层次来介绍随机数生成。 文章目录 一、利用rand函数生成随机数1、rand裸奔2、随机数种子srand-随机数生成器3、如何得到不同的种子值&#xff08;1&#xff09;、利用系统时间戳tim…

文件上传-Webshell

Webshell简介 webshell就是以aspphpjsp或者cgi等网页文件形式存在的一种命令执行环境&#xff0c;也可以将其称做为一种网页木马后门。 攻击者可通过这种网页后门获得网站服务器操作权限&#xff0c;控制网站服务器以进行上传下载文件、查看数据库、执行命令等… 什么是木马 …
最新文章