最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

环球快资讯:实战Flink sql语法改造

来源:博客园

Flink是流计算领域的事实标准,而Flink sql的广泛应用极大地推动了这一标准的确立。

在mysql里我们可以很方便地通过show tables from db1 like "%a"直接从上百个库找出db1库a开头的表,但之前flink只支持show tables查出当前库的表,该如何改造呢?


(资料图)

背景知识

Flink sql运行流程

标准的一条 flink sql 运行起来的流程如下:

sql 解析阶段:

  1. calcite parser 解析(sql文本 -> SqlNode抽象语法树)
  2. SqlNode 验证转换阶段:基于Catalog校验(语法、表达式、表信息),校验通过后转换成Operation(SqlNode -> Operation)
  3. 语义分析阶段:Operation 转换为 RelNode,RelNode 即 Logical Plan(Operation -> RelNode)
  4. 优化阶段:calcite optimizer 优化(RelNode -> RelNode,剪枝、谓词下推等)
  5. 物理计划生成阶段:Logical Plan 转换为 Physical Plan(等同于 RelNode 转换成 Transformation)
  6. 执行阶段:通过 Transformation 集合生成 StreamGraph,提交到运行环境执行。

初看起来确实比较复杂,但对于这种分层的架构而言,我们只要关注与改造语法相关的第一、第二层即可。

Calcite解析规则

Calcite是一个开源的动态数据管理框架,可用来解析优化执行sql,提供了标准的SQL语言、多种查询优化和连接各种数据源的能力。

Calcite 使用 javacc 作为语法解析器,并且使用FMPP作为模板生成器。

三者的关系如下,这样calcite就可以通过javacc对用户自定义的语法进行解析。

fmpp(Parser.jj 模板生成) -> javacc(Parser 生成) -> calcite

以1.12旧版本的 show tables 语法规则为例,开头是一个声明,包括返回值类型、规则名和一个冒号,紧接着的一对花括号中写一些变量声明。下一对花括号中写该规则的具体内容,常用规则如下:

[]: 可忽略

+:前面的内容出现一次或多次

*: 前面的内容出现0次或多次

?: 前面的内容出现0次或一次

|: 前面或后面,与()合用

SqlShowTables SqlShowTables() :{}{         {        return new SqlShowTables(getPos());    }}

定义新的sql语法

首先通过调研spark、mysql等流行引擎,如下语法是最合适的,可以看出语法还是比较复杂的,有很多可选的组合,比如

  • show table
  • show tables from db
  • show table in db
  • show tables from catalog.db like "%a"
  • show tables in catalog.db not like "%a"
SHOW TABLES [ ( FROM | IN ) [catalog.]db ] [ [NOT] LIKE regex_pattern ]

接着就是定义语法文件,Flink使用的fmpp模板生成器将模板文件放在flink-sql-parser模块下,文件名是parserImpls.ftl。根据前面讲到的Calcite解析规则,修改文件如下:

SqlShowTables SqlShowTables() :{    SqlIdentifier databaseName = null;    SqlCharStringLiteral likeLiteral = null;    String prep = null;    boolean notLike = false;    SqlParserPos pos;}{         { pos = getPos(); }    [        (  { prep = "FROM"; } |  { prep = "IN"; } )        { pos = getPos(); }        databaseName = CompoundIdentifier()    ]    [        [                        {                notLike = true;            }        ]                  {            String likeCondition = SqlParserUtil.parseString(token.image);            likeLiteral = SqlLiteral.createCharString(likeCondition, getPos());        }    ]    {        return new SqlShowTables(pos, prep, databaseName, notLike, likeLiteral);    }}

SqlNode转换为Operation

Flink sql框架在处理完校验抽象语法树后,会将SqlNode转换为Operation,由于已经修改了语法树,转换规则、转换后的operation也发生了改变。相关转换在SqlToOperationConverter#convertValidatedSqlNode()。

改造前,由于show tables语法本身非常简单,所以相应的转换也很简单。

private Operation convertShowTables(SqlShowTables sqlShowTables) {        return new ShowTablesOperation();    }

改造后,show tables语法后面要跟很多可选的参数,需要对这些参数进行判断和转换,然后生成新的operation。

SqlShowTables这个SqlNode包含数组形式的库名,这个数组可以是[db],也可以是[catalog, db],如果是前者应该将新operation的catalog置为当前catalog,如果是后者应该将catalog置为给定的catalog。

private Operation convertShowTables(SqlShowTables sqlShowTables) {        if (sqlShowTables.getPreposition() == null) {            return new ShowTablesOperation(                    sqlShowTables.getLikeSqlPattern(),                    sqlShowTables.isWithLike(),                    sqlShowTables.isNotLike());        }        String[] fullDatabaseName = sqlShowTables.fullDatabaseName();        if (fullDatabaseName.length > 2) {            throw new ValidationException(                    String.format(                            "show tables from/in identifier [ %s ] format error",                            String.join(".", fullDatabaseName)));        }        String catalogName =                (fullDatabaseName.length == 1)                        ? catalogManager.getCurrentCatalog()                        : fullDatabaseName[0];        String databaseName =                (fullDatabaseName.length == 1) ? fullDatabaseName[0] : fullDatabaseName[1];        return new ShowTablesOperation(                catalogName,                databaseName,                sqlShowTables.getLikeSqlPattern(),                sqlShowTables.isWithLike(),                sqlShowTables.isNotLike(),                sqlShowTables.getPreposition());    }

验证

Flink SQL> create database d1;[INFO] Execute statement succeed.Flink SQL> create table d1.b1(id int) with ("connector"="print");[INFO] Execute statement succeed.Flink SQL> create table t1(id int) with ("connector"="print");[INFO] Execute statement succeed.Flink SQL> create table m1(id int) with ("connector"="print");[INFO] Execute statement succeed.Flink SQL> show tables like "m%";+------------+| table name |+------------+|         m1 |+------------+1 row in setFlink SQL> show tables from d1 like "b%";+------------+| table name |+------------+|         b1 |+------------+1 row in set

关键词: 运行环境 查询优化 事实标准