拓展阅读

calcite简介和使用 quick-sql 查询

背景介绍

背景说明

项目相关组件现状:

  • 多个引擎需要投入多倍的人力,在人员有限的情况下,对引擎的掌控力会减弱

  • 语法兼容问题(Hive/Impala/Spark)

  • 语义支持问题(Hive/Impala/Oracle)

  • 扩展时重复工作量多

  • 每一个新引擎的学习成本(Hive/Impala/GreenPlum/Presto/…)

  • 每个新功能的维护成本(…)

不能赋能中台:

  • 不利于专家知识库的建设(重复问题)

  • 多项目会造成成本飙升(…)

项目目标

整体目标

公司内部统一的SQL分析中间件,作为一个简单,安全,快速的跨数据源统一SQL 查询引擎

  • 减少在使用不同数据引擎时需要的学习成本和切换成本;

  • 忽略不同数据引擎底层存储和数据查询方式的差异

  • 使用户仅需要关注查询的业务逻辑和数据本身。

应用场景

数据分析

  • 数据分析/挖掘

  • 生成报表

  • ETL

即时查询

  • 数据采样

  • 小数据交互查询

支持多数据源查询

  • MySQL join ElasticSearch union Hive

运维监控

研发策略

自研不等于自主可控,以“集成式创新”为出发点,拥抱开源和构建开源生态。

拥抱开源

积极参与社区贡献,加深和社区的合作,和社区融为一体 - 插件化

将特异性的需求独立出来,形成插件,降低与主干的耦合性,轻量化的迭代

生态化

  • 通用易用的数据访问方式

  • 高性能的数据查询能力

  • 完备的企业级特性支持

  • 丰富的生态支持与构建

产品起步

产品调研

目前市面上已有的产品:

  • presto

  • quick-sql

  • linkis

  • XQL/IQL

https://github.com/luons/query-engine

已有的开源产品的问题

presto:

优点 性能优越 跨源查询 SQL支持 缺点: 容错性差,当某个worker的查询失败后,整个query失效,没有重试机制 容易OOM,运行过程中对于内存极为敏感,连表查,可能产生大量的临时数据,因此速度会变慢 不支持实时 学习成本高

linkis

缺点: 仅作转发 学习成本较高

XQL/IQL

缺点: 不开源

quick-sql:

优点: 支持实时 多引擎支持 基本满足当前需求

缺点: 修改了SQL解析开源实现Calcite,无法跟社区同步

选择自研

调研最终选择的QuickSql,其自身有很多的问题:

  • 它将Calcite以源码形式导入到了工程中并做了十几处改动

  • 由于导入的Calcite版本较低,很多新功能都无法使用

  • QuickSQL去掉导入的Calcite部分,本身只有1万多行

  • 我们列出的很多扩展性的功能,QuickSQL也不支持

基于上述原因,我们选择自研,而不是基于QuickSQL做二次开发。

在 第一阶段、第二阶段,整体架构逻辑都参考自QuickSql。

自研时,我们对Calcite不做源码修改,只依赖。 Calcite选择最新的1.26版本 除SQL解析赖于 Calcite,其他并无特别依赖。

其他依赖:

  • calcite和avatica

  • 日志、commons组件、guava

  • jetty

  • spark(可选),flink(可选)

产品优势

  • 支持跨数据源查询(mysql、oracle、hive、es等),消除数据孤岛,针对数据价值挖掘有着更强大的功能

  • 多引擎支持(目前计划支持spark、flink,目前需手动指定)

  • 所有查询采用统一的sql语法(减少开发人员使用不同的组件的学习成本和开发成本)

  • 易扩展(扩展更多的数据源,目前理论支持所有可JDBC连接的数据源)

  • 拥抱开源,核心calcite紧跟开源社区(社区活跃且强大)

架构概述

整体架构

my-project 整体分为四层:

客户端 接入层 解析层 引擎层

struct

如上图,my-project的核心包括了接入层、解析层、引擎层 接入层提供一个TCP服务,供客户端调用 解析层是最重要的一层,在这一层里会将接入层获取到的查询信息进行分析,之后交给引擎层 引擎层根据解析层的指示,选择spark、flink或者JDBC直连的方式进行查询 引擎层最终调用的就是一个个具体的存储服务

下面是更细节的架构图

细节结构图

my-project需要将元数据信息注入到服务中,这里的元数据指的是客户端查询的库、表等信息。比如:

SELECT t.id,t.name,t.info FROM my_db.my_table AS t WHERE t.id > 10;

在这个SQL中,客户需要查询my_db这个库,但他并没有将my_db库的配置信息(url、用户名、密码等)告诉my-project,所以要完成上述的SQL查询,需要先将my_db库的配置信息注入到my-project中。 而meta模块就是用来完成元数据注入的,它下面指向的mysql是内部库,仅供my-project使用。

Core模块中,由runner子模块接收服务端解析的请求内容,也就是一个具体的SQL语句,以及相关的配置信息(可选)。

在Core模块中,会将接收到的SQL做法语解析,生成语法树,并根据语法树决定是单数据源查询、还是多数据源查询,而具体的查询动作是交给pipeline子模块完成的,由这个子模块去调用 spark 或者 JDBC完成具体的查询操作。

Core模块中还有一个Optimze子模块,负责对语法树进行优化,将一个查询效率比较差的SQL语句,优化成一个查询效果更高的SQL语句,Optimze这个子模块是可选的。

上图中的my-project是一个JVM进程,my-project本身是无状态的,可以方便的扩容/缩量。

接入层架构

JDBC 方式的服务端架构如下:

JDBC 方式

客户端需要先引入my-project驱动。

只需要对传统JDBC方式方式稍作配置即可,传统JDBC查询代码如下:

Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://localhost:3306/my_db";
Connection  conn = DriverManager.getConnection(url, properties);
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM my_db.my_table");
// some logic .....

使用my-project驱动后,将url和driver替换掉即可使用:

Class.forName("com.my-project.client.Driver");
String url = "jdbc:my-project:http//localhost:15888/my_db";
Connection  conn = DriverManager.getConnection(url, properties);
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM my_db.my_table");
// some logic .....

my-project驱动底层是HTTP方式的通讯,服务端是内嵌的Jetty。

客户端发送的JDBC请求实际是一个HTTP请求,而JDBC的请求内容被封装到HTTP的body中。

HTTP body有两种编码方式:

JSON protobuf

服务端解析到请求后,会交给自定义的my-projectHandler来处理。

my-projectHandler首先会解析请求,根据指定的 JSON 方式或者 protobuf 得到具体的内容,也就是一个具体的SQL。

之后就是执行这个SQL,通过调用my-project-Core模块完成具体的查询操作。

解析层

解析层的执行流程如下:

解析层

对于一个跨数据源的SQL:

SELECT * FROM Oracle_db.a Join MySQL_db.b ON a.id = b.id WHERE b.id > 10

解析层首先将这个SQL解析,得到一个语法树。

再遍历这棵树,就能确定需要查询的数据源,通过数据源的数量,也就确定了是否为跨数据源查询。

在具体执行之前,有一步可选的优化:

RBO:基于规则的优化,包括谓词下推、列裁剪、常量折叠等 CBO:基于代价的优化

如果是单数据源查询,对应的就是一个普通的JDBC查询。

如果是跨数据源查询,则交给 Spark 或者 Flink 去执行。

引擎层

可插拔的引擎层架构如下:

引擎层

根据前面解析得到语法树,交给pipeline去调用一个具体的引擎来执行。 这里的引擎可以是JDBC、可以是Flink、也可以是Spark。

每种类型的引擎都是以独立的ClassLoader方式引入的,这样可以保证引擎执行不会出现jar冲突

对于上述的架构,可以引入Spark 2.x作为引擎层;也可以同时引入Spark 3.x作为引擎层;或者可以引入其他任意类型的执行引擎。 my-project并不依赖于某一种具体的引擎,只是把具体的引擎当作黑盒使用。

对于跨数据源查询时(比如选用Spark),会动态的生成一些代码,然后将这些代码提交到 Spark的集群执行:

生成import 语句 生成查询 Oracle 的代码,并将结果写入到 tempView A 中 生成查询 MySQL 的代码,并将结果写入到 tempView B 中 最后执行对 A 和 B 执行一个联合查询

JDBC服务端解析

架构

服务端整体架构如下:

架构

服务端依托于Jetty运行的,通过内嵌的方式启动一个jetty,将AvaticaJsonHandler注册到jeety中。

客户端发送的是JSON或者Protobuf格式的协议,服务端接收到请求后会调用AvaticaJsonHandler来处理这个请求。

AvaticaJsonHandler首先解析请求,然后执行请求内容,在执行的时候根据是否是直连会选择两种执行方式:

  • 原始的JDBC方式执行

  • 调用my-project来执行,这里就是调用SqlRunner、Pipeline那套流程

客户端和服务端进行交互的时候,是根据不同的操作,调用对应的对象,再将这些对象 编码/解码

比如,要执行创建连接,那么会触发一个openConnection的操作,之后生成OpenConnectionRequest的对象。客户端会将这个对象编码为 JSON 或者 Protobuf。

类似的,服务端会接收到这个 JSON,然后将其解码成OpenConnectionRequest对象,再触发对应的操作。

客户端封装的请求类型如下(下面的都是一系列操作对象,发送前会被编码为JSON格式):

客户端 -> 服务端的交互概览如下:

交互概览

客户端执行 JDBC 查询,比如openConnect、createStatement等操作,这会委托给 AvaticaConnection 这个类去做。

AvaticaConnection 又会调用到Meta,Meta只是一个接口,所以需要一个具体的实现类。

这里的实现类是QuicksqlRemoteMeta,但看起来RemoteMeta也能完成,不清楚 quicksql 的实现有何用处

似乎Meta的实现类只是作为一个桥接用的,用来连接 AvaticaConnection 和 具体发送者之间的桥梁。

RemoteMeta 最后会交给 JsonService 来完成。在 JsonService 内部完成对象的编码 和 解码,HTTP发送动作是由 RemoteService来做的。 以上就是客户端的工作了,再看服务端: 服务端是依托于 Jetty 的,jetty 接收到请求会交给自定义的 AvaticaJsonHandler, 再交给 JsonHandler 来完成 decode 和 encode

所谓的 decode 就是将请求的 json 解码(用jackson将json解析成对象类型),之后交给 my-projectServiceMeta,这个类也类似于桥梁的作用,真正执行的是交给后面的 my-project-code去做的。

查询过程

创建连接

执行一个创建连接的动作:

Connection connection = DriverManager.getConnection(url, properties);

客户端会发送一个UUID,服务端根据这个UUID会将连接java.sql.Connection、java.sql.Statement给缓存(Guava)起来。

下次再有请求过来会首先从缓存中查找。

这里客户端需要服务端执行一个openConnection操作,也就是下面 json 中request中表示的 客户端发送json:

{
	"request": "openConnection",
	"connectionId": "cebe4551-9788-439e-8d1a-792064cd7a00",
	"info": {
		"schemaPath": {
			"version": "1.0",
			"defaultSchema": "my_test",
			"schemas": [{
				"name": "my_test",
				"type": "custom",
				"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
				"operand": {
					"jdbcDriver": "com.mysql.jdbc.Driver",
					"jdbcUrl": "jdbc:mysql://10.200.64.11:3306/my_test?useSSL=false",
					"jdbcUser": "name",
					"jdbcPassword": "password",
					"dbType": "mysql"
				}
			}]
		}

	}
}

服务端返回的json:

{
	"response": "openConnection",
	"rpcMetadata": {
		"response": "rpcMetadata",
		"serverAddress": "KJBJ-01-DN-004889:15888"
	}
}

创建statement

再执行一个创建statement的动作:

Statement statement1 = connection.createStatement();

这里会继续触发一个 HTTP 请求,客户端会继续使用之前的 UUID,服务端根据 请求的 UUID 从缓存中取出连接。

这次客户端会要求执行connectionSync这个操作: 客户端发送的json:

{
	"request": "connectionSync",
	"connectionId": "cebe4551-9788-439e-8d1a-792064cd7a00",
	"connProps": {
		"connProps": "connPropsImpl",
		"autoCommit": null,
		"readOnly": null,
		"transactionIsolation": null,
		"catalog": null,
		"schema": null,
		"dirty": true
	}
}

服务端返回的json:

{
	"response": "connectionSync",
	"connProps": {
		"connProps": "connPropsImpl",
		"autoCommit": null,
		"readOnly": null,
		"transactionIsolation": null,
		"catalog": null,
		"schema": null,
		"dirty": false
	},
	"rpcMetadata": {
		"response": "rpcMetadata",
		"serverAddress": "KJBJ-01-DN-004889:15888"
	}
}

之后客户端会要求执行createStatement这个操作 客户端再次发送:

{
	"request": "createStatement",
	"connectionId": "cebe4551-9788-439e-8d1a-792064cd7a00"
}

服务端响应:

{
	"response": "createStatement",
	"connectionId": "cebe4551-9788-439e-8d1a-792064cd7a00",
	"statementId": 0,
	"rpcMetadata": {
		"response": "rpcMetadata",
		"serverAddress": "KJBJ-01-DN-004889:15888"
	}
}

执行查询

执行的时序图如下:

时序图

首先客户端发送请求给jetty,jetty接受到请求后,会调用到自定义的handler,也就是AvaticaJsonHandler。 AvaticaJsonHandler是真正执行具体逻辑的地方,这里主要干两件事:

根据请求对象解析出传递的内容,也就是解析json;根据json格式生成对应的对象 执行这个对象,并生成json结果,最后返回给客户端 这里解析完json后,得到的是PrepareAndExecuteRequest这么一个对象,之后会触发到QuicksqlServerMeta来执行prepareAndExecute函数。 这个函数就是用于执行具体sql的,它会根据客户端传递的参数,来决定执行方式:

如果客户端传递的直连查询,就用原始的JDBC方式查询,比如创建mysql驱动再查询mysql,或者创建oracle驱动再查询oracle 非直连查询,这里走的就是正常的calcite逻辑,也就是调用 SqlRunner解析sql并得到一个pipeline,再执行pipeline,最后将结果封装成ExecuteResutl并返回给客户端

客户端发送查询,要求执行prepareAndExecute操作

{
	"request": "prepareAndExecute",
	"connectionId": "0f276a88-335f-4f01-8916-36d11075e223",
	"statementId": 1,
	"sql": "select * from my_test",
	"maxRowsInFirstFrame": -1,
	"maxRowCount": -1
}

服务端返回结果:

{
	"response": "executeResults",
	"missingStatement": false,
	"rpcMetadata": {
		"response": "rpcMetadata",
		"serverAddress": "KJBJ-01-DN-004889:5888"
	},
	"results": [{
		"response": "resultSet",
		"connectionId": "0f276a88-335f-4f01-8916-36d11075e223",
		"statementId": 1,
		"ownStatement": true,
		"signature": {
			"columns": [{
				"ordinal": 0,
				"autoIncrement": true,
				"caseSensitive": false,
				"searchable": true,
				"currency": false,
				"nullable": 0,
				"signed": true,
				"displaySize": 11,
				"label": "id",
				"columnName": "id",
				"schemaName": "",
				"precision": 11,
				"scale": 0,
				"tableName": "my_test",
				"catalogName": "linkis_test",
				"type": {
					"type": "scalar",
					"id": 4,
					"name": "INT",
					"rep": "PRIMITIVE_INT"
				},
				"readOnly": false,
				"writable": true,
				"definitelyWritable": true,
				"columnClassName": "java.lang.Integer"
			}, {
				"ordinal": 1,
				"autoIncrement": false,
				"caseSensitive": false,
				"searchable": true,
				"currency": false,
				"nullable": 1,
				"signed": false,
				"displaySize": 200,
				"label": "name",
				"columnName": "name",
				"schemaName": "",
				"precision": 200,
				"scale": 0,
				"tableName": "my_test",
				"catalogName": "linkis_test",
				"type": {
					"type": "scalar",
					"id": 12,
					"name": "VARCHAR",
					"rep": "STRING"
				},
				"readOnly": false,
				"writable": true,
				"definitelyWritable": true,
				"columnClassName": "java.lang.String"
			}],
			"sql": null,
			"parameters": [],
			"cursorFactory": {
				"style": "LIST",
				"clazz": null,
				"fieldNames": null
			},
			"statementType": null
		},
		"firstFrame": {
			"offset": 0,
			"done": true,
			"rows": [
				[1, "aaaaa"],
				[2, "bbbb"],
				[3, "ccccc"],
				[4, "dddd"],
				[5, "eeee"],
				[8, "xxxxxxxx!!!"],
				[9, "kkkkkkk"],
				[10, "wokao"],
				[11, null],
				[12, "xxxxx!!!"]
			]
		},
		"updateCount": -1,
		"rpcMetadata": {
			"response": "rpcMetadata",
			"serverAddress": "KJBJ-01-DN-004889:5888"
		}
	}]
}

参考资料

https://code0xff.org/post/2021/12/%E7%BB%9F%E4%B8%80%E6%9F%A5%E8%AF%A2%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D/

https://code0xff.org/post/2021/12/%E7%BB%9F%E4%B8%80%E6%9F%A5%E8%AF%A2%E6%95%B4%E5%90%88calcite/