(二开)Flink 修改源码拓展 SQL 语法

1、Flink 扩展 calcite 中的语法解析
1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例
a)类位置

flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java

在这里插入图片描述

核心方法

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("SHOW CATALOGS");
    }
b)类血缘

在这里插入图片描述

2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
a)文件位置

在这里插入图片描述

b)语法示例
/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
{
}
{
    <SHOW> <CATALOGS>
    {
        return new SqlShowCatalogs(getPos());
    }
}
3)将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
a)文件位置

在这里插入图片描述

b)config.fmpp 内容
data: {
	# 解析器文件路径
  parser: tdd(../data/Parser.tdd)
}

# 扩展文件的目录
freemarkerLinks: {
  includes: includes/
}
c)Parser.tdd 部分内容
# 生成的解析器包路径
package: "org.apache.flink.sql.parser.impl",
# 解析器名称
class: "FlinkSqlParserImpl",
# 引入的依赖类
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
# 新的关键字
keywords: [
    "CATALOGS"
  ]
# 新增的语法解析方法
statementParserMethods: [
    "SqlShowCatalogs()"
  ]
# 包含的扩展语法文件
implementationFiles: [
    "parserImpls.ftl"
  ]
4)编译模板文件和语法文件

在这里插入图片描述

5)配置扩展的解析器类
withParserFactory(FlinkSqlParserImpl.FACTORY)
2、自定义扩展 Flink 的 Parser 语法
1)定义 SqlNode 类
package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** XSHOW CATALOGS sql call. */
public class SqlXShowCatalogs extends SqlCall {
    public static final SqlSpecialOperator OPERATOR =
            new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);

    public SqlXShowCatalogs(SqlParserPos pos) {
        super(pos);
    }

    @Override
    public SqlOperator getOperator() {
        return OPERATOR;
    }

    @Override
    public List<SqlNode> getOperandList() {
        return Collections.emptyList();
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("XSHOW CATALOGS");
    }
}

2)修改 includes 目录下的 parserImpls.ftl 文件
/**
* Parse a "XShow Catalogs" metadata query command.
*/
SqlXShowCatalogs SqlXShowCatalogs() :
{
}
{
    <XSHOW> <CATALOGS>
    {
       return new SqlXShowCatalogs(getPos());
    }
}
3)修改 Parser.tdd 文件,新增-声明拓展的部分
imports:

"org.apache.flink.sql.parser.dql.SqlXShowCatalogs"

keywords:

"XSHOW"

statementParserMethods:

"SqlXShowCatalogs()"
4)重新编译
 mvn generate-resources
5)执行测试用例

可以看到,自定义 SQL 的报错,由解析失败,变为了校验失败。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());
				
				// 拓展自定义语法 xshow catalogs 前
        // SQL parse failed. Non-query expression encountered in illegal context
        tEnv.executeSql("xshow catalogs").print();

        // 拓展自定义语法 xshow catalogs 后
        // SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
    }
}

6)查看生成的扩展解析器类

可以看到,在 FlinkSqlParserImpl 中,自定义的解析语法已经生成了。

在这里插入图片描述

3、validate 概述

在向 Flink 中添加完自定义的解析规则后,报错信息如下:

SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
修改 validate 部分的代码
1)FlinkPlannerImpl#validate

作用:校验 SqlNode ,如果是 show catalogs 语法时直接返回。

在这里插入图片描述

sqlNode.isInstanceOf[SqlXShowCatalogs]
2)SqlToOperationConverter#convert

作用:将校验过的 SqlNode 转换为 Operator。

在这里插入图片描述

else if (validated instanceof SqlXShowCatalogs) {
            return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
}
3)SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */
private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {
     return new XShowCatalogsOperation();
}
4)XShowCatalogsOperation
package org.apache.flink.table.operations;

public class XShowCatalogsOperation implements ShowOperation {
    @Override
    public String asSummaryString() {
        return "SHOW CATALOGS";
    }
}
4、执行测试用例
package org.apache.flink.table.examples.java.custom;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class CustomFlinkSql {
    public static void main(String[] args) throws Exception {

        TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .build());

				// FlinkSQL原本支持的语法
        tEnv.executeSql("show catalogs").print();
        
        // 自定义语法
        tEnv.executeSql("xshow catalogs").print();
    }
}

在这里插入图片描述

5、总结-FlinkSQL 的执行流程
1、对 SQL 进行校验

final SqlNode validated = flinkPlanner.validate(sqlNode);

2、预校验重写 Insert 语句

3、调用 SqlNode.validate() 进行校验

	1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】
	2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode
	3)如果是:SqlRichExplain
	4)其它:validator.validate(sqlNode)
		
			1.校验作用域和表达式:validateScopedExpression(topNode, scope)
					a)将 SqlNode 进行规范化重写
          b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询
          c)校验 validateQuery 
          	i)validateFeature
          	ii)validateNamespace
          	iii)validateModality
          	iv)validateAccess
          	v)validateSnapshot
          d)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导
       
       2.获取校验之后的节点类型

2、将 SQLNode 转换为 Operation

converter.convertSqlQuery(validated)

	1)生成逻辑执行计划 RelNode
	RelRoot relational = planner.rel(validated);
		
		1.对查询进行转换
		sqlToRelConverter.convertQuery(validatedSqlNode)
		
	2)创建 PlannerQueryOperation
	new PlannerQueryOperation(relational.project());
	
3、将 Operation 转换为 List<Transformation<?>>
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));

	1)对 RelNode 逻辑执行计划进行优化,获取 optimizedRelNodes
	val optimizedRelNodes = optimize(relNodes)
	
	2)将 optimizedRelNodes 转换为 execGraph
	val execGraph = translateToExecNodeGraph(optimizedRelNodes)
	
	3)将 execGraph 转换为 transformations
	
		1.使用代码生成技术生成Function,后续可以反射调用
		val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/107347.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java中的IO流

一.分类 1.IO流按流的方向可以分为&#xff1a;输入流&#xff0c;输出流&#xff1b; 2.按流中数据的最小单位分&#xff1a;字节流&#xff0c;字符流 字节输入流&#xff1a;把磁盘或网络中的数据以一个个字节形式读到内存中 字节输出流&#xff1a;把内存中的数据以一个个…

人工智能轨道交通行业周刊-第64期(2023.10.16-10.29)

本期关键词&#xff1a;北斗应用、供电智能运维、5G-R、铁路职称、星火大模型 1 整理涉及公众号名单 1.1 行业类 RT轨道交通人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通RailMetro轨道世界铁路那…

SparkSQL综合案例-省份维度的销售情况统计分析

一、项目背景 二、项目需求 &#xff08;1&#xff09;需求 ①各省销售指标&#xff0c;每个省份的销售额统计 ②TOP3销售省份中&#xff0c;有多少家店铺日均销售额1000 ③TOP3省份中&#xff0c;各个省份的平均单价 ④TOP3省份中&#xff0c;各个省份的支付类型比例 &#x…

汽车行驶性能的主观评价方法(2)-驾驶员的任务

人&#xff08;驾驶员&#xff09;-车辆-环境闭环控制系统 驾驶过程中&#xff0c;驾驶员承担着操纵车辆和控制车辆的任务。驾驶员在不知不觉中接受了大量光学、声学和动力学信息并予以评价&#xff0c;同时不断地通过理论值和实际值的比较来完成控制作用&#xff08;图 2.1&a…

贪心算法总结(未完结)

贪心的定义&#xff08;摘自百度百科&#xff09; 贪心算法&#xff08;greedy algorithm&#xff0c;又称贪婪算法&#xff09;是指&#xff0c;在对问题求解时&#xff0c;总是做出在当前看来是最好的选择。也就是说&#xff0c;不从整体最优上加以考虑&#xff0c;算法得到的…

Jmeter 接口测试,参数值为列表,如何参数化?

最近在我的教学过程中&#xff0c;我的一个学生问了我一个问题&#xff0c;他们公司的一个接口参数值是列表&#xff0c;列表中值的数量有多有少&#xff0c;问我在 jmeter 中如何让这个参数的值进行参数化&#xff1f; 看到这种问题&#xff0c;你的第一反应是什么&#xff1f…

mysql-面试50题-2

一、查询数据 学生表 Student create table Student(SId varchar(10),Sname varchar(10),Sage datetime,Ssex varchar(10)); insert into Student values(01 , 赵雷 , 1990-01-01 , 男); insert into Student values(02 , 钱电 , 1990-12-21 , 男); insert into Student v…

如何理解my_map.yaml中origin的含义

当然可以。首先,我们先了解一下2D地图的基本构成。2D地图实际上是一个网格系统,其中每个单元格(或像素)代表现实世界中的一个区域。当我们谈论origin时,我们实际上是在描述这个网格如何在真实的3D空间中放置。 让我们通过一个简单的示意图来解释: 假设上面的矩形表示一个…

【可变形注意力(1)】Multi-scale Deformable Attention Transformers 多尺度变形注意力

文章目录 前言论文 《Deformable DETR: Deformable Transformers for End-to-End Object Detection》的多尺度变形注意力的解读DEFORMABLE TRANSFORMERS FOR END-TO-END OBJECT DETECTION **2.** Deformable Attention ModuleDeformable Attention Module 3. Multi-Scale Defor…

【开源】基于SpringBoot的森林火灾预警系统的设计和实现

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 系统基础模块2.3 烟雾传感器模块2.4 温度传感器模块2.5 历史记录模块2.6 园区数据模块 三、系统设计3.1 用例设计3.1.1 森林园区基础系统用例设计3.1.2 森林预警数据用例设计 3.2 数据库设计3.2.1 烟雾…

C# | Chaikin算法 —— 计算折线对应的平滑曲线坐标点

Chaikin算法——计算折线对应的平滑曲线坐标点 本文将介绍一种计算折线对应的平滑曲线坐标点的算法。该算法使用Chaikin曲线平滑处理的方法&#xff0c;通过控制张力因子和迭代次数来调整曲线的平滑程度和精度。通过对原始点集合进行切割和插值操作&#xff0c;得到平滑的曲线坐…

【原创】解决Kotlin无法使用@Slf4j注解的问题

前言 主要还是辟谣之前的网上的用法&#xff0c;当然也会给出最终的使用方法。这可是Kotlin&#xff0c;关Slf4j何事&#xff01;&#xff1f; 辟谣内容&#xff1a;创建注解来解决这个问题 例如&#xff1a; Target(AnnotationTarget.CLASS) Retention(AnnotationRetentio…

开源Linux社区Armbian开发指南

1. 什么是armbian Armbian是一个基于Debian或Ubuntu的开源操作系统&#xff0c;专门针对嵌入式ARM平台进行优化和定制。Armbian可以运行在多种不同的嵌入式设备上&#xff0c;例如树莓派、ArmSoM、香蕉派等等。Armbian针对不同的嵌入式平台&#xff0c;提供了相应的硬件支持&a…

利用HTTP2,新型DDoS攻击峰值破纪录

亚马逊、Cloudflare 和谷歌周二联合发布消息称&#xff0c;一种依赖于 HTTP/2 快速重置技术的攻击行为对它们造成了破纪录的分布式拒绝服务 (DDoS) 攻击。 根据披露的信息&#xff0c;该攻击自8月下旬以来便一直存在&#xff0c;所利用的漏洞被跟踪为CVE-2023-44487&#xff0c…

基于SpringBoot+Vue的服装销售系统

基于SpringBootVue的服装销售平台的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBootMyBatisVue工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 主页 我的订单 登录界面 管理员界面 摘要 基于SpringBoot和Vue的服装销售系统…

管理类联考——数学——汇总篇——知识点突破——数据分析——记忆

文章目录 考点记忆/考点汇总——按大纲 整体目录大纲法记忆宫殿法绘图记忆法 局部数字编码法对号不对号 归类记忆法重点记忆法歌决记忆法口诀&#xff1a;加法分类&#xff0c;类类相加&#xff1b;乘法分步&#xff0c;步步相乘。 谐音记忆法涂色 理解记忆法比较记忆法转图像记…

Qwt开发环境搭建(保姆级教程)

1.简介 QWT&#xff0c;即Qt Widgets for Technical Applications&#xff0c;其目标是以基于2D方式的窗体部件来显示数据&#xff0c; 数据源以数值&#xff0c;数组或一组浮点数等方式提供&#xff0c; 输出方式可以是Curves&#xff08;曲线&#xff09;&#xff0c;Slider…

计算机毕设 opencv 图像识别 指纹识别 - python

文章目录 0 前言1 课题背景2 效果展示3 具体实现3.1 图像对比过滤3.2 图像二值化3.3 图像侵蚀细化3.4 图像增强3.5 特征点检测 4 OpenCV5 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&#xff0c;往…

liunx Centos-7.5上 rabbitmq安装

在安装rabbitmq中需要注意&#xff1a; 1、rabbitmq依赖于erlang&#xff0c;需要先安装erlang 2、erlang和rabbitmq版本有对应关系 可参考网页&#xff1a;https://www.rabbitmq.com/which-erlang.html 第一步&#xff0c;安装编译工具及库文件,如果服务器上已经有了&…

如何在vscode中添加less插件

Less &#xff08;Leaner Style Sheets 的缩写&#xff09; 是一门向后兼容的 CSS 扩展语言。它对CSS 语言增加了少许方便的扩展&#xff0c;通过less可以编写更少的代码实现更强大的样式。但less不是css&#xff0c;浏览器不能直接识别&#xff0c;即浏览器无法执行less代码&a…
最新文章