Impala4.x源码阅读笔记(二)——Impala如何高效读取Iceberg表

前言

本文为笔者个人阅读Apache Impala源码时的笔记,仅代表我个人对代码的理解,个人水平有限,文章可能存在理解错误、遗漏或者过时之处。如果有任何错误或者有更好的见解,欢迎指正。

Iceberg表是一种用于存储大规模结构化数据的开源表格式,旨在提供高效的数据存储和丰富的查询能力。不同于Parquet,Orc等文件格式定义了数据如何在文件中存储和索引,Iceberg作为一种表格式定义的是数据文件如何组织,换句话说就是一系列的数据文件如何构成一张表以及我们如何从大量数据文件中找到我们需要的。Iceberg表支持事务性写入和多版本并发控制,这使得它适合于需要大规模数据存储和高效查询的数据湖或数据仓库环境。Iceberg表还提供了对数据架构变化的良好支持,使得数据架构的演进变得更加容易。

Iceberg表的基本结构包括数据文件和元数据文件两部分,这些文件被组织在目录结构中,其中数据文件是实际存储数据的地方,支持Parquet、Orc等业界主流数据文件格式。元数据文件除了记录表的结构信息和演进记录外,还有一种Manifest文件用于索引数据文件,支持Iceberg使用快照(Snapshot)的概念来维护表的版本信息,可以快速定位某个版本的表包括了哪些数据文件,这使得我们可以方便地在Iceberg表上进行时间旅行查询和回滚操作。另外在Iceberg V2格式的表中,还在之前V1格式表的基础之上增加了数据的行级更新与删除能力,通过写入专门的Delete File和MOR(Merge On Read)技术,可以在不重写现有数据文件的前提下实现行级别的删除。

在impala步入4.x的大版本后,对Iceberg表的支持一直是社区关注的重点。在社区的重点投入下,截止Apache Impala 4.3.0版本,Impala对Iceberg表的支持度已经相当高了,除了建表删表修改字段等常规表支持的操作外,对Iceberg特有的时间旅行、版本回滚、清理快照也进行了支持,另外还提供了从Hive表迁移到Iceberg的功能。社区目前正在大力推进对于Iceberg V2表的支持,目前已经完整支持了对Iceberg表的行级Delete和MOR读取。Impala在实现MOR时没有使用Iceberg API提供的读取方法,而是使用了自身由C++实现的执行引擎进行读取。得益于Impala本身对于HDFS+Parquet表的长期优化,使得Impala在Iceberg的MOR读取性能方面也具有优势。

本文主要根据源码就Impala如何高效读取地Iceberg表进行分析,这里的读取包括了同时包括了对Iceberg表的时间旅行和MOR的支持。分析的过程中着重于扫描Iceberg表的执行计划如何制定,以及期间做了哪些优化。

准备工作

为了分析Impala如何读取Iceberg表,我们可以从一个简单但是功能覆盖充分的例子入手。首先我们使用Impala创建一张Iceberg V2表,并进行一些数据写入和删除操作:

-- 创建一张V2格式的Iceberg表
CREATE TABLE ice_v2 (id int, name string) STORED BY ICEBERG TBLPROPERTIES('format-version'='2');
-- 插入最初的两条数据
INSERT INTO ice_v2 VALUES (1, 'a'), (2, 'b');
-- 删除id为2的一条数据
DELETE FROM ice_v2 WHERE id = 2;
-- 再插入一条数据
INSERT INTO ice_v2 VALUES (3, 'c');

现在我们就准备好了一张包含Delete数据的Iceberg表了,因为我们总共进行了三次DML操作,现在这张Iceberg表就有3个快照了。在Impala中可以通过执行DESCRIBE HISTORY语句查看Iceberg表历史版本:

在这里插入图片描述

也可以看见数据文件中有两个Insert文件和一个Delete文件,都使用Parquet格式储存:

在这里插入图片描述

为了后续一些概念的理解,我们再看下这三个文件中的数据:

在这里插入图片描述

可以发现Insert文件中的数据和我们执行的Insert语句是对应的,而Delete文件中的数据这是另外的Schema,记录的是删除的行所在的文件和位置。在有了这些信息之后,我们就可以开始分析Impala是如何读取一张Iceberg表的了。

执行计划

首先我们先看一下扫描Iceberg表的执行计划长什么样,执行如下SQL:

-- Result: (3, 'c'), (1, 'a')
SELECT * FROM ice_v2 FOR SYSTEM_VERSION AS OF 5109113003992490801

其中FOR SYSTEM_VERSION AS OF子句就是对Iceberg表进行时间旅行查询使用的,5109113003992490801DESCRIBE HISTORY中显示的Iceberg表最新的快照ID。这样我们可以指定一个Iceberg快照版本进行查询,当然不指定时Impala会默认查询最新的快照,因此实际上这个查询子句加与不加结果是一样的,只是为了后面触发时间旅行查询相关的逻辑。我们也可以用FOR SYSTEM_TIME AS OF子句来指定一个时间点来查询这个时间点时这张Iceberg表的数据,就像进行了时间旅行穿越回过去直接查询这张表一样,也就是所谓的时间旅行查询了。虽然这只是一个简单的SELECT *查询,但是它在Impala中的执行计划却不简单:

在这里插入图片描述

从SQL角度来说,一条SQL在Impala中的处理过程逻辑上大体可以分为四个阶段,解析、分析、计划与执行。其中解析就是解析SQL,通过SQL Parser将SQL字符串转换为语句类StatementBase的子类对象,比如说上文中的查询会被转换为SelectStmt,其中又包含SelectListFromClause等子句部分,FromClause又包括了一个TableRef的列表表示FROM子句后面的各个表或者类似于表的对象,在TableRef中又包括一个TimeTravelSpec对象对应了我们的时间旅行子句。这些语句对象都实现了自己的analyze方法,会在后面的分析过程中调用。比如说SelectStmtanalyze会调用FromClauseanalyze,同时自己还会进行星号展开、注册Slot等工作。FromClauseanalyze则会分析路径、建立别名并调用各个TableRefanalyze方法。在TableRefanalyze方法中除了分析Join、Hints之外也会调用TimeTravelSpecanalyze方法。TimeTravelSpecanalyze方法则会计算AS OF后的表达式得到目标时间旅行的版本或者时间,为后续索引数据文件做准备。

经过了复杂的分析之后就可以进入计划阶段了,计划阶段的任务就是根据语句类对象和先前的分析结果给查询制定一个执行计划。执行计划是一个由各种PlanNode的子类对象构成的一个树状结构PlanTree,它会指导查询如何进行执行,具体包括执行需要哪些结点参与,结点间的数据流向等等。执行计划会被转为Thrift结构体传递给Impala的C++执行引擎进行执行,执行时PlanNode会转变为对应ExecNode的子类对象,每个结点都负责各自对应的工作,比如说ScanNode负责一张表的扫描任务,是树中的叶子结点。而JoinNodeUnionNode分别负责连接任务与合并任务,都有多个输入和一个输出。数据在结点间以行批的形式传递,最终汇聚到根结点并返回给客户端完成查询执行。

计划阶段又可以大体分为两部分,首先是制定单点执行计划,给出一颗完整的执行计划树。但是单点执行计划还不足以指导查询执行,Impala作为一个MPP架构的执行引擎可以在分布式集群的多个执行者上并发执行查询,因此还需要将单点执行计划转变为分布式执行计划。分布式执行计划实际上就是将单点执行计划切分为多个片段Fragment,并在其间插入一些交换节点ExchangeNode用于在Fragment之间传递数据。在上面的执行计划树图中我们可以看到结点间有虚线和实线两种连接方式,实线连接的结点就是同属于一个Fragment的,而虚线连接的是不同的FragmentFragment是查询执行阶段可以调度的最小执行单元,调度器可以根据Fragment的性质将其调度到一个或多个执行者上创建执行实例Instance并开始实际执行。比如说对于只有一个文件的扫描结点的Fragment,我们只需要将其调度到一个执行者上执行就够了,而那些扫描文件很多或者需要高并发执行的Fragment,我们则需要将其调度到多个执行者并发加速执行。

在Impala的执行计划中一般都是一个ScanNode对应负责一张表的扫描任务,而从图中可以发现,我们为了扫描这张Iceberg表却出动了三个ScanNode,甚至还有一个JoinNode和一个UnionNode,这主要是因为Iceberg表中有Delete文件出现了。我们可以对比看一下没有Delete文件时的Iceberg表扫描查询计划是怎样的,执行SQL:

-- Result: (1, 'a'), (2, 'b')
SELECT * FROM ice_v2 FOR SYSTEM_TIME AS OF '2023-12-07 16:00:05.5';

这次我们使用时间旅行回到表进行了第一次插入但是还没有进行Delete的时间点进行查询,执行计划如下:

在这里插入图片描述

这回就是简简单单一个ScanNode了。Impala究竟是如何为Iceberg表制定扫描计划的?我们可以从代码中找到答案。

代码分析

本文要分析的Iceberg表的扫描计划的制定是单点执行计划的制定的一部分,完整的执行计划制定代码是十分庞大和复杂的,幸运的是我们只需要关注其中的一小部分就能得到需要的答案。首先我们先快速深入的目标代码的位置,调用路径如下:

getPlannedExecRequest() -> createExecRequest() -> createPlans() -> createPlanFragments()
-> createSingleNodePlan() -> createQueryPlan() -> createSelectPlan() -> createTableRefsPlan()
-> createTableRefNode() -> createScanNode() {
  // 如果TableRef是实际的表就会调用createScanNode()方法为其创建扫描结点
  ...
  FeTable table = tblRef.getTable();
  // 通过表的类型选择对应的方法创建ScanNode
  // Iceberg表是FeIcebergTable同时也实现了FeFsTable接口,同时也是一种文件系统表(Hdfs表)
  if (table instanceof FeFsTable) {
    if (table instanceof FeIcebergTable) {
      // 对于Iceberg表有专门的IcebergScanPlanner创建扫描结点
      // IcebergScanPlanner构造时需要传入分析信息、查询上下文、表引用、连词和聚合信息
      // 同时还会在构造方法中完成谓词抽取,即将conjuncts中可以下推到Iceberg API中的谓词抽取出来并转换为Iceberg的谓词对象
      IcebergScanPlanner icebergPlanner = new IcebergScanPlanner(analyzer, ctx_, tblRef,
          conjuncts, aggInfo);
      return icebergPlanner.createIcebergScanPlan();
    }
    return createHdfsScanPlan(tblRef, aggInfo, conjuncts, analyzer);
  } else if (table instanceof FeDataSourceTable) {
  ...
}

接下来就进入正题了,我们看下IcebergScanPlanner::createIcebergScanPlan()是如何给Iceberg表创建扫描计划的:

public PlanNode createIcebergScanPlan() throws ImpalaException {
  // 首先是一个重要的判断,决定是否需要调用Iceberg的planFiles() API来列出待扫描文件
  // needIcebergForPlanning()等价于 !impalaIcebergPredicateMapping_.isEmpty() || tblRef_.getTimeTravelSpec() != null;
  // 即如果有谓词下推或者有时间旅行子句的话,我们是需要调用planFiles()来列出待扫描文件的
  // 因为这种情况下要扫描的文件往往不是最新快照的全部数据文件,而是其的一个子集或者是其他快照版本包含的数据文件
  if (!needIcebergForPlanning()) {
    // 为连词中引用到的字段进行槽位物化,槽位物化可以理解为在数据行的内存定义中保留一块内存存放对应字段的数据
    // 因为谓词评估需要使用这些字段进行计算,所以即使select list中没有它们,我们必须在数据行定义中给它们预留位置
    analyzer_.materializeSlots(conjuncts_);
    // 对于不需要planFiles()的扫描,我们可以直接调用setFileDescriptorsBasedOnFileStore()方法
    // 使用缓存的最新快照的文件集合FileStore来设置文件描述符FileDescriptors,用于之后的扫描计划创建
    setFileDescriptorsBasedOnFileStore();
    // 然后调用createIcebergScanPlanImpl()进行具体的扫描计划创建
    return createIcebergScanPlanImpl();
  }

  // 对于进行了谓词下推或包含时间旅行的查询,我们无法根据缓存的最新快照的文件集合来得到需要扫描的FileDescriptors
  // 需要调用filterFileDescriptors()来根据谓词或时间旅行子句来过滤得到需要FileDescriptors
  filterFileDescriptors();
  // 调用filterConjuncts()来过滤谓词,因为在filterFileDescriptors()中我们可能已经将部分谓词下推到Iceberg中了
  // 这部分谓词不需要在执行时再次计算,因为Iceberg API已经在文件层面将不符合谓词的数据文件都过滤了
  // 剩余需要扫描的文件中的数据肯定符合这些已经下推的谓词,因此我们需要将其过滤掉,避免扫描时多余的谓词计算
  // 只将那些没有被下推成功的谓词保留,并传递给扫描结点,由Impala的执行引擎负责计算和过滤数据
  filterConjuncts();
  analyzer_.materializeSlots(conjuncts_);
  return createIcebergScanPlanImpl();
}

对于上文中有时间旅行子句查询,没法进入createIcebergScanPlan()中的if分支,需要调用filterFileDescriptors()来得到应该扫描的文件,我们继续看下其是如何实现的:

private void filterFileDescriptors() throws ImpalaException {
  TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
  // 调用IcebergUtil::planFiles()并传递Iceberg表、之前抽取并转换为Iceberg谓词的谓词列表和时间旅行描述对象
  // 其封装了Iceberg API的TableScan::planFiles()方法,可以将谓词和时间旅行版本传递给Iceberg API
  // 其返回的fileScanTasks就包含了所有我们需要扫描的数据文件,包括Delete文件
  try (CloseableIterable<FileScanTask> fileScanTasks =
      IcebergUtil.planFiles(getIceTable(),
          new ArrayList<>(impalaIcebergPredicateMapping_.keySet()), timeTravelSpec)) {
    long dataFilesCacheMisses = 0;
    for (FileScanTask fileScanTask : fileScanTasks) {
      // 遍历每个FileScanTask,首先处理残余表达式,也就是没有被下推到Iceberg的谓词
      // 这些谓词无法在文件级别应用,需要Impala的扫描结点在行级别应用,将其先全部加入到集合residualExpressions_中
      Expression residualExpr = fileScanTask.residual();
      if (residualExpr != null && !(residualExpr instanceof True)) {
        residualExpressions_.add(residualExpr);
      }
      // 调用getFileDescriptor()来获取文件描述符FileDescriptor和是否命中缓存的标志
      // getFileDescriptor()中封装了一层缓存文件描述符的逻辑,即使未命中缓存也会创建新的并添加到缓存
      Pair<FileDescriptor, Boolean> fileDesc = getFileDescriptor(fileScanTask.file());
      if (!fileDesc.second) ++dataFilesCacheMisses;
      // 如果这个文件没有对应的Delete文件,我们将其加入到无Delete文件的数据文件列表dataFilesWithoutDeletes_中
      if (fileScanTask.deletes().isEmpty()) {
        dataFilesWithoutDeletes_.add(fileDesc.first);
      } else {
        // 否则将其加入到有Delete文件的数据文件列表dataFilesWithDeletes_中
        dataFilesWithDeletes_.add(fileDesc.first);
        // 同时还需要遍历其所有的Delete文件,将其加入到Delete文件列表deleteFiles_中
        for (DeleteFile delFile : fileScanTask.deletes()) {
          // 截止本文撰写时,Impala还未支持等值删除EQUALITY_DELETES,只支持位置删除POSITION_DELETES
          // 如果发现目标表有等值删除文件,我们无法扫描,需要引发一个异常
          if (delFile.content() == FileContent.EQUALITY_DELETES) {
            throw new ImpalaRuntimeException(String.format(
                "Iceberg table %s has EQUALITY delete file which is currently " +
                "not supported by Impala, for example: %s", getIceTable().getFullName(),
                delFile.path()));
          }
          Pair<FileDescriptor, Boolean> delFileDesc = getFileDescriptor(delFile);
          if (!delFileDesc.second) ++dataFilesCacheMisses;
          deleteFiles_.add(delFileDesc.first);
        }
      }
    }
    if (dataFilesCacheMisses > 0) {
      Preconditions.checkState(timeTravelSpec != null);
      LOG.info("File descriptors had to be loaded on demand during time travel: " +
          String.valueOf(dataFilesCacheMisses));
    }
  } catch (IOException | TableLoadingException e) {
    throw new ImpalaRuntimeException(String.format(
        "Failed to load data files for Iceberg table: %s", getIceTable().getFullName()),
        e);
  }
  // 最后更新下Delete文件的统计信息,包括数据行数和行数据量,用于后续预估内存消耗等
  updateDeleteStatistics();
}

对谓词的处理并非本文的关注的重点,所以我们跳过filterConjuncts()方法,继续看关键的createIcebergScanPlanImpl()方法最后如何为Iceberg扫描创建扫描结点的:

private PlanNode createIcebergScanPlanImpl() throws ImpalaException {
  // 对于没有Delete文件的情况,我们只需要一个扫描结点来扫描数据文件就行了,也就是上文中第二个查询的情况
  if (deleteFiles_.isEmpty()) {
    Preconditions.checkState(dataFilesWithDeletes_.isEmpty());
    // 创建一个新的Iceberg扫描结点对象,并传递结点ID、表引用、连词列表、聚合信息、文件列表
    // 非IDENTITY列的连词列表和需要跳过连词列表,然后调用init()方法进行初始化
    // IcebergScanNode继承了HdfsScanNode,大部分逻辑也是复用的HdfsScanNode的
    // 所以很多Hdfs Scan的优化和功能也能在Iceberg扫描中使用
    PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_,
        aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_,
        getSkippedConjuncts());
    ret.init(analyzer_);
    return ret;
  }
  // 如果有Delete文件,那么一个扫描结点就不够用了,我们需要另外一个扫描结点专门负责扫描Delete文件
  // 然后还需要一个反连接结点将两者的数据进行ANTI JOIN,实现删除行的效果
  // 这部分逻辑由createPositionJoinNode()方法实现,我们后面分析,总之它会返回一个JoinNode
  PlanNode joinNode = createPositionJoinNode();

  // 如果先前的分析阶段发现这是可以进行优化的针对Iceberg V2表的count(*)查询
  // 则剩余的无Delete文件的数据文件的不需要实际扫描了,可从元数据中得到行数
  // 因此这里可以直接返回所有数据文件和所有删除文件之间的ANTI JOIN
  // 不需要处理后面的无Delete文件对应的数据文件,它们的行数会被一个ArithmeticExpr直接加到结果中
  if (ctx_.getQueryCtx().isOptimize_count_star_for_iceberg_v2()) return joinNode;

  // 当然,当所有数据文件都有相应的删除文件,即dataFilesWithoutDeletes_为空时
  // 我们也只需要返回所有数据文件和所有删除文件之间的ANTI JOIN
  if (dataFilesWithoutDeletes_.isEmpty()) return joinNode;

  // 如果还有无Delete文件对应的数据文件的话,我们则还需要这些文件创建一个扫描结点
  IcebergScanNode dataScanNode = new IcebergScanNode(
      ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, dataFilesWithoutDeletes_,
      nonIdentityConjuncts_, getSkippedConjuncts());
  dataScanNode.init(analyzer_);
  // 然后根据表引用的槽位描述符创建输出表达式,并依此创建一个合并结点UnionNode来合并数据
  List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
      entry -> new SlotRef(entry)).collect(Collectors.toList());
  UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tblRef_.getId(),
      outputExprs, false);
  // 将之前创建的负责无Delete文件的数据扫描结点和负责有Delete文件的连接结点作为UnionNode的输入
  // 并完成初始化后返回,这样这张Iceberg表的扫描计划就创建完成了
  // 根结点为一个UnionNode,也就对应了上文中第一个查询的情况
  unionNode.addChild(dataScanNode, outputExprs);
  unionNode.addChild(joinNode, outputExprs);
  unionNode.init(analyzer_);
  Preconditions.checkState(unionNode.getChildCount() == 2);
  Preconditions.checkState(unionNode.getFirstNonPassthroughChildIndex() == 2);
  return unionNode;
}

看完关键的createIcebergScanPlanImpl()方法后,Iceberg表的扫描计划是如何制定的就已经很清晰了,但是还有最后一块也是最核心的逻辑还需要再进一步分析,也就是期间我们调用的createPositionJoinNode()方法,它被用来创建实现Delete文件和数据文件之间数据行删除逻辑的ANTI JOIN结点,代码分析如下:

  private PlanNode createPositionJoinNode() throws ImpalaException {
    Preconditions.checkState(deletesRecordCount_ != 0);
    Preconditions.checkState(dataFilesWithDeletesSumPaths_ != 0);
    Preconditions.checkState(dataFilesWithDeletesMaxPath_ != 0);
	// 我们需要为数据文件和Delete文件分别创建一个扫描结点,不过在此之前还有些准备工作
    PlanNodeId dataScanNodeId = ctx_.getNextNodeId();
    PlanNodeId deleteScanNodeId = ctx_.getNextNodeId();
    // 首先需要为Delete文件先创建一张虚拟表,因为扫描结点都是以表为基础的,而Delete文件并不是实际存在的表
    IcebergPositionDeleteTable deleteTable = new IcebergPositionDeleteTable(getIceTable(),
        getIceTable().getName() + "-POSITION-DELETE-" + deleteScanNodeId.toString(),
        deleteFiles_, deletesRecordCount_, getFilePathStats());
    analyzer_.addVirtualTable(deleteTable);
    // 有了对应Delete文件的虚拟表IcebergPositionDeleteTable后,再为其创建表引用对象TableRef
    TableRef deleteDeltaRef = TableRef.newTableRef(analyzer_,
        Arrays.asList(deleteTable.getDb().getName(), deleteTable.getName()),
        tblRef_.getUniqueAlias() + "-position-delete");
    // 为了数据文件和Delete文件之间能够进行ANTI JOIN,我们还需要为数据文件的表添加相关的虚拟列
    // 这些虚拟列并不是实际存在于表中的列,而是为了作为Join Key存在的
    // addDataVirtualPositionSlots()方法会为数据表添加两个虚拟列,分别为INPUT__FILE__NAME和FILE__POSITION
    // Impala已经为Parquet和Orc文件实现了这两个虚拟列,其数据可以在扫描时填充,含义分别是文件名和行号
    addDataVirtualPositionSlots(tblRef_);
    // addDeletePositionSlots()方法会为Delete文件的虚拟表添加两个实际列,分别为file_path和pos
    // 这两个列是在Delete文件中实际存在的,可以看上文中解析Delete文件得到的结果
    addDeletePositionSlots(deleteDeltaRef);
    // 然后就可以为数据文件和Delete文件分别创建一个扫描结点并初始化了
    IcebergScanNode dataScanNode = new IcebergScanNode(
        dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
        nonIdentityConjuncts_, getSkippedConjuncts(), deleteScanNodeId);
    dataScanNode.init(analyzer_);
    IcebergScanNode deleteScanNode = new IcebergScanNode(
        deleteScanNodeId,
        deleteDeltaRef,
        Collections.emptyList(), /*conjuncts*/
        aggInfo_,
        Lists.newArrayList(deleteFiles_),
        Collections.emptyList(), /*nonIdentityConjuncts*/
        Collections.emptyList()); /*skippedConjuncts*/
    deleteScanNode.init(analyzer_);

    // 然后是ANTI JOIN结点的创建,首先需要调用createPositionJoinConjuncts()创建连接谓词
    // 它会生成两个EQ谓词,相当于INPUT__FILE__NAME = file_path AND FILE__POSITION = pos
    // 这样数据文件中的行和Delete文件中的行匹配上时就可以在ANTI JOIN中被剔除了,实现了删除的效果
    List<BinaryPredicate> positionJoinConjuncts = createPositionJoinConjuncts(
            analyzer_, tblRef_.getDesc(), deleteDeltaRef.getDesc());
    // Impala还实现了一种专门针对Iceberg Delete文件DELETE JOIN结点IcebergDeleteNode
    // 如果没设置query option disable_optimized_iceberg_v2_read的话
    // 就会使用IcebergDeleteNode代替LEFT_ANTI_JOIN的HashJoinNode来进行ANTI JOIN
    // IcebergDeleteNode继承了HashJoinNode,同时专门针对Iceberg Delete进行了优化
    TQueryOptions queryOpts = analyzer_.getQueryCtx().client_request.query_options;
    JoinNode joinNode = null;
    if (queryOpts.disable_optimized_iceberg_v2_read) {
      joinNode = new HashJoinNode(dataScanNode, deleteScanNode,
          /*straight_join=*/true, DistributionMode.NONE, JoinOperator.LEFT_ANTI_JOIN,
          positionJoinConjuncts, /*otherJoinConjuncts=*/Collections.emptyList());
    } else {
      joinNode =
          new IcebergDeleteNode(dataScanNode, deleteScanNode, positionJoinConjuncts);
    }
    // ANTI JOIN结点的创建好之后再进行一些初始化工作就可以返回了
    joinNode.setId(ctx_.getNextNodeId());
    joinNode.init(analyzer_);
    joinNode.setIsDeleteRowsJoin();
    return joinNode;
  }

至此,Iceberg表的扫描计划的制定就结束了,后续再完成一些别的必须的工作就可以下发到执行引擎中进行执行了。

总结

这篇文章主要是在执行计划的制定方面分析了Iceberg表在Impala中是如何扫描的,总的来说为了实现高性能的Iceberg的MOR功能,对于一张包含Delete数据的Iceberg表Impala会可能会使用多个扫描结点以及ANTI JOIN结点和UNION结点协同工作,配合完成任务,这样的设计和实现既能复用很多现有的Impala代码,也能利用起现有的很多针对HDFS表的扫描优化。但是文章篇幅有限,实际上代码中还有许多内容无法详细展开分析,对于扫描的实际执行也还未能涉猎,有机会的话会在后续的文章中继续介绍。有兴趣的读者也可以自行阅读,Impala的代码注释还是比较丰富的,代码风格规范也比较好,阅读难度不大。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/241356.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue指令之v-on

v-on指令用于注册事件&#xff0c;作用是添加监听与提供事件触发后对应的处理函数。 v-on有两种语法&#xff0c;在提供处理函数的时候既可以直接使用内联语句&#xff0c;也可以提供函数的名字。 第一种语法是直接提供内联语句&#xff0c;如下 v-on:事件名 "内联语句…

外贸SOHO建站教程?海洋建站推广如何做?

外贸SOHO建站推广的步骤&#xff1f;国际贸易网站建设方法&#xff1f; 随着互联网的普及和发展&#xff0c;越来越多的外贸SOHO从业者选择通过建立自己的网站来拓展业务。那么&#xff0c;如何搭建一个专业、高效的外贸网站呢&#xff1f;海洋建站将为您提供一份详细的外贸SO…

Java - Bean的生命周期

Bean的生命周期之5步 Bean生命周期的管理&#xff0c;可以参考Spring的源码&#xff1a;AbstractAutowireCapableBeanFactory类的doCreateBean()方法。 Bean生命周期可以粗略的划分为五大步&#xff1a; 第一步&#xff1a;实例化Bean 第二步&#xff1a;Bean属性赋值 第三…

扫描电镜(SEM)样品在进行扫描电镜观察前需要进行哪些处理

对于扫描电镜&#xff08;Scanning Electron Microscope&#xff0c;SEM&#xff09;样品的制备&#xff0c;需要经过一系列处理步骤以确保样品表面的干净、导电性好&#xff0c;并且能够提供高质量的显微图像。以下是一些常见的处理步骤&#xff1a; 1. 固定样品&#xff08;…

Vue 学习随笔系列七 -- 表单动态生成

表单动态生成 文章目录 表单动态生成1、动态表单组件封装2、组件引用3、实现效果 1、动态表单组件封装 <!-- 动态生成下拉框&#xff0c;可同理生成input框等 --> <template><el-dialogcustom-class"custom-dialog":title"dialogTitle":vi…

Linux 使用定时任务

在Linux中&#xff0c;你可以使用cron&#xff08;定时任务管理器&#xff09;来设置和管理定时任务。以下是使用cron的基本步骤 编辑定时任务列表 打开终端&#xff0c;输入以下命令来编辑当前用户的定时任务列表 crontab -e如果是要编辑系统范围的定时任务&#xff0c;可以…

如何在忘记密码的情况下恢复解锁 iPhone

您忘记了 iPhone 密码吗&#xff1f;Apple 官方通常建议将 iPhone 恢复至出厂设置以将其删除。这种修复很不方便&#xff0c;甚至可能比问题本身更麻烦。 如果您也经历过同样的情况&#xff0c;并且想知道忘记了 iPhone 密码并且不想恢复它该怎么办&#xff0c;我们的终极指南…

docker基本管理和docker相关概念

docker是开源的的应用容器引擎&#xff0c;基于go语言开发的&#xff0c;运行在linux系统当中的开源的轻量级的"虚拟机。 docker的容器技术可以在一台主机上轻松的为任何应用创建一个轻量级的&#xff0c;可以移植的&#xff0c;自给自足的容器 docker的宿主机是linux系…

ElementPlus table 中嵌套 input 输入框

文章目录 需求分析 需求 vue3 项目中 使用UI组件库 ElementPlus 时&#xff0c;table 中嵌入 input输入框 分析 <template><div class"p-10"><el-table :data"tableData" border><el-table-column prop"date" label&qu…

jemeter,http cookie管理器

Http Cookie管理器自动实现Cookie关联的原理&#xff1a; (默认:作用域在同级别的组件) 一:当Jmeter第1次请求服务器的时候,如果说服务器有通过响应头的Set-Cookie有返回Cookie,那么Http Cookie管理器就会自动的保存这些Cookie的值。 二&#xff1a;当Jmeter第2-N次请求服务器的…

【同步FIFO_2023.12.13】

同步fifo&#xff0c;写时钟和读时钟为同一个时钟&#xff0c;用于交互数据缓冲 fifo的深度&#xff1a;同一块数据内存的大小 reg [2:0] Mem [8];//宽度3&#xff0c;深度8典型同步fifo的三部分 fifo写控制逻辑&#xff1a;写地址、写有效信号&#xff0c;fifo写满、写错等状…

ArkUI组件

目录 一、概述 声明式UI 应用模型 二、常用组件 1、Image&#xff1a;图片展示组件 示例 配置控制授权申请 2、Text&#xff1a;文本显示组件 示例 3、TextInput&#xff1a;文本输入组件 示例 4、Button&#xff1a;按钮组件 5、Slider&#xff1a;滑动条组件 …

Navicat 技术指引 | 适用于 GaussDB 分布式的数据查看器

Navicat Premium&#xff08;16.3.3 Windows 版或以上&#xff09;正式支持 GaussDB 分布式数据库。GaussDB 分布式模式更适合对系统可用性和数据处理能力要求较高的场景。Navicat 工具不仅提供可视化数据查看和编辑功能&#xff0c;还提供强大的高阶功能&#xff08;如模型、结…

【Git 小妙招】一文快速上手 Git 基本操作(两万字图文讲解)

文章目录 前言1. 创建 Git 本地仓库2. 配置 Git3. 认识工作区, 暂存区, 版本库3.1 添加文件(场景一)3.2 查看 .git 文件3.3 添加文件(场景二) 4. 修改文件5. 版本回退6. 撤销修改6.1 对于工作区的代码&#xff0c;还没有 add(场景一)6.2 已经 add &#xff0c;但没有 commit(场…

苹果电脑Python编辑开发软件pycharm pro 2023功能介绍

PyCharm Pro 2023是由JetBrains开发的一款专为Python开发者设计的跨平台集成开发环境&#xff08;IDE&#xff09;。它提供了丰富的功能和直观的用户界面&#xff0c;旨在提高在Mac平台上进行Python编程的效率。 PyCharm Pro 2023是PyCharm系列中的专业版&#xff0c;具有更多高…

D3846关键参数计算及设置方法D3846在电焊机产品中的作用是什么?

D3846是一块电流模式的PWM控制电路。 主要特点&#xff1a; ● 自动前馈补偿 ● 可编程控制的逐个脉冲限流功能 ● 推挽输出结构^ 下自动对称校正 ● 负载响应特性好 ● 可并联运行&#xff0c;适用于模块系统 ● 内置差动电流检测放大器&#xff0c; 共模输入范围宽 ● 双脉…

Linux高级管理--安装MySQL数据库系统

MySQL服务基础 MySQL.是一个真正的多线程、多用户的SQL数据库服务&#xff0c;凭借其高性能、高可靠和易于使 用的特性&#xff0c;成为服务器领域中最受欢迎的开源数据库系统。在2008年以前&#xff0c;MySOL项目由MySQL AB公司进行开发&#xff0c;发布和支持&#xff0c;之后…

查看mysql服务器的版本

在cmd下&#xff0c;可以使用命令mysql -V或者mysql --version查看mysql服务器的版本。例如&#xff1a; 如果已经登录mysql&#xff0c;可以使用命令show variables like version;来查询mysql服务器的版本。例如&#xff1a;

OpenAI Q* (Q Star)简单介绍

一、Q Star 名称由来 Q* 的两个可能来源如下&#xff1a; 1&#xff09;Q 可能是指 "Q-learning"&#xff0c;这是一种用于强化学习的机器学习算法。 Q 名称的由来*&#xff1a;把 "Q*"想象成超级智能机器人的昵称。 Q 的意思是这个机器人非常善于做决定…

串行计时芯片D1380/D1381,2.0V~5.5V 工作电流: 2V时 与TTL 兼容,采用DIP8、SOP8封装

D1380/D1381是一个带秒、分、时、日、日期、月、年的串行时钟保持芯片,每个月多少天以及闰年能自动调节, D1380/D1381低功耗工作方式, D1380/D1381用若干寄存器存储对应信息&#xff0c;一个32.768kHz 的晶振校准时钟&#xff0c;为了使用最小弓|脚&#xff0c;D1380/D1381使用…
最新文章