业务背景
如果我每一次调用的参数都是动态的。
select count(*) from users where create_time between #{startTime} and #{endTime}
接下来,让我们一步步的实现。
说明
实战记录一下 mysql 如何整合 apache calcite。
mysql 数据准备
建表语句
use test;
drop table if exists users;
CREATE TABLE "users" (
"id" int(11) NOT NULL,
"username" varchar(255) NOT NULL,
"email" varchar(255) NOT NULL,
`create_time` datetime(3) NOT NULL DEFAULT current_timestamp(3) COMMENT '创建时间',
`update_time` datetime(3) NOT NULL DEFAULT current_timestamp(3) ON UPDATE current_timestamp(3) COMMENT '更新时间',
PRIMARY KEY ("id")
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
插入数据
insert into users (id, username, email) values (1, 'u-1', '1@email.com');
insert into users (id, username, email) values (2, 'u-2', '2@email.com');
insert into users (id, username, email) values (3, 'u-3', '3@email.com');
数据确认:
mysql> select * from users;
+----+----------+-------------+-------------------------+-------------------------+
| id | username | email | create_time | update_time |
+----+----------+-------------+-------------------------+-------------------------+
| 1 | u-1 | 1@email.com | 2024-06-25 21:28:33.871 | 2024-06-25 21:28:33.871 |
| 2 | u-2 | 2@email.com | 2024-06-25 21:28:33.881 | 2024-06-25 21:28:33.881 |
| 3 | u-3 | 3@email.com | 2024-06-25 21:28:34.616 | 2024-06-25 21:28:34.616 |
+----+----------+-------------+-------------------------+-------------------------+
3 rows in set (0.00 sec)
mysql 整合基本例子-v1
maven 引入
<dependencies>
<!-- jdbc 应该是 core 的一部分-->
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.36.0</version>
</dependency>
<!-- MySQL JDBC Driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version> <!-- 或者最新版本 -->
</dependency>
</dependencies>
java 代码
package com.github.houbb.calcite.learn.mysql;
import com.mysql.jdbc.jdbc2.optional.MysqlDataSource;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
/**
* https://blog.csdn.net/a17816876003/article/details/125592222
*/
public class CalciteMySQLExample {
public static void main(String[] args) throws Exception {
// check driver exist
Class.forName("org.apache.calcite.jdbc.Driver");
Class.forName("com.mysql.jdbc.Driver");
// the properties for calcite connection
Properties info = new Properties();
//LEX: 使用传统的 SQL 解析器,也称为 "LEX" 解析器。这是 Calcite 默认的解析器类型,能够处理大多数 SQL 语法。
//JAVA: 使用 Java 解析器,也称为 "JAVA" 解析器。这个解析器是针对 Calcite 提供的 SQL 格式进行解析。
info.setProperty("lex", "JAVA");
// remarks 是指定是否应该包含数据库的备注信息(也称为注释)。当设置为 true 时,表示在元数据中包含数据库对象的备注信息;当设置为 false 时,不包含备注信息。
info.setProperty("remarks","true");
// 所有函数
info.setProperty("fun","all");
// 大小写敏感
info.setProperty("caseSensitive","false");
// SqlParserImpl can analysis sql dialect for sql parse
info.setProperty("parserFactory","org.apache.calcite.sql.parser.impl.SqlParserImpl#FACTORY");
// create calcite connection and schema
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
System.out.println(calciteConnection.getProperties());
// code for mysql datasource
MysqlDataSource dataSource = new MysqlDataSource();
// please change host and port maybe like "jdbc:mysql://127.0.0.1:3306/test"
dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC");
dataSource.setUser("admin");
dataSource.setPassword("123456");
// mysql schema, the sub schema for rootSchema, "test" is a schema in mysql
SchemaPlus rootSchema = calciteConnection.getRootSchema();
Schema schema = JdbcSchema.create(rootSchema, "test", dataSource, null, "test");
rootSchema.add("test", schema);
// Set default schema for unqualified table names,这样可以让 sql 不加前缀?
calciteConnection.setSchema("test");
// run sql query
Statement statement = calciteConnection.createStatement();
ResultSet resultSet = statement.executeQuery("select count(*) from users");
while (resultSet.next()) {
System.out.println(resultSet.getObject(1));
}
statement.close();
connection.close();
}
}
输出日志:
{caseSensitive=false, fun=all, lex=JAVA, parserFactory=org.apache.calcite.sql.parser.impl.SqlParserImpl#FACTORY, remarks=true}
3
输出对应的个数为 3
配置信息
Properties 主要用于指定一些配置文件,可以参见 CalciteConnectionProperty
类。
APPROXIMATE_DISTINCT_COUNT("approximateDistinctCount", Type.BOOLEAN, false,
false),
/** Whether approximate results from "Top N" queries
* ({@code ORDER BY aggFun DESC LIMIT n}) are acceptable. */
APPROXIMATE_TOP_N("approximateTopN", Type.BOOLEAN, false, false),
/** Whether approximate results from aggregate functions on
* DECIMAL types are acceptable. */
APPROXIMATE_DECIMAL("approximateDecimal", Type.BOOLEAN, false, false),
/** Whether to treat empty strings as null for Druid Adapter.
*/
NULL_EQUAL_TO_EMPTY("nullEqualToEmpty", Type.BOOLEAN, true, false),
/** Whether to store query results in temporary tables. */
AUTO_TEMP("autoTemp", Type.BOOLEAN, false, false),
/** Whether Calcite should use materializations. */
MATERIALIZATIONS_ENABLED("materializationsEnabled", Type.BOOLEAN, true,
false),
/** Whether Calcite should create materializations. */
CREATE_MATERIALIZATIONS("createMaterializations", Type.BOOLEAN, true, false),
/** How NULL values should be sorted if neither NULLS FIRST nor NULLS LAST are
* specified. The default, HIGH, sorts NULL values the same as Oracle. */
DEFAULT_NULL_COLLATION("defaultNullCollation", Type.ENUM, NullCollation.HIGH,
true, NullCollation.class),
/** How many rows the Druid adapter should fetch at a time when executing
* "select" queries. */
DRUID_FETCH("druidFetch", Type.NUMBER, 16384, false),
/** URI of the model. */
MODEL("model", Type.STRING, null, false),
/** Lexical policy. */
LEX("lex", Type.ENUM, Lex.ORACLE, false),
/** Collection of built-in functions and operators. Valid values include
* "standard", "mysql", "oracle", "postgresql" and "spatial", and also
* comma-separated lists, for example "oracle,spatial". */
FUN("fun", Type.STRING, "standard", true),
/** How identifiers are quoted.
* If not specified, value from {@link #LEX} is used. */
QUOTING("quoting", Type.ENUM, null, false, Quoting.class),
/** How identifiers are stored if they are quoted.
* If not specified, value from {@link #LEX} is used. */
QUOTED_CASING("quotedCasing", Type.ENUM, null, false, Casing.class),
/** How identifiers are stored if they are not quoted.
* If not specified, value from {@link #LEX} is used. */
UNQUOTED_CASING("unquotedCasing", Type.ENUM, null, false, Casing.class),
/** Whether identifiers are matched case-sensitively.
* If not specified, value from {@link #LEX} is used. */
CASE_SENSITIVE("caseSensitive", Type.BOOLEAN, null, false),
/** Parser factory.
*
* <p>The name of a class that implements
* {@link org.apache.calcite.sql.parser.SqlParserImplFactory}. */
PARSER_FACTORY("parserFactory", Type.PLUGIN, null, false),
/** Name of initial schema. */
SCHEMA("schema", Type.STRING, null, false),
/** Schema factory.
*
* <p>The name of a class that implements
* {@link org.apache.calcite.schema.SchemaFactory}.
*
* <p>Ignored if {@link #MODEL} is specified. */
SCHEMA_FACTORY("schemaFactory", Type.PLUGIN, null, false),
/** Schema type.
*
* <p>Value may be null, "MAP", "JDBC", or "CUSTOM"
* (implicit if {@link #SCHEMA_FACTORY} is specified).
*
* <p>Ignored if {@link #MODEL} is specified. */
SCHEMA_TYPE("schemaType", Type.ENUM, null, false, JsonSchema.Type.class),
/** Specifies whether Spark should be used as the engine for processing that
* cannot be pushed to the source system. If false (the default), Calcite
* generates code that implements the Enumerable interface. */
SPARK("spark", Type.BOOLEAN, false, false),
/** Returns the time zone from the connect string, for example 'gmt-3'.
* If the time zone is not set then the JVM time zone is returned.
* Never null. */
TIME_ZONE("timeZone", Type.STRING, TimeZone.getDefault().getID(), false),
/** If the planner should try de-correlating as much as it is possible.
* If true (the default), Calcite de-correlates the plan. */
FORCE_DECORRELATE("forceDecorrelate", Type.BOOLEAN, true, false),
/** Type system. The name of a class that implements
* {@link org.apache.calcite.rel.type.RelDataTypeSystem} and has a public
* default constructor or an {@code INSTANCE} constant. */
TYPE_SYSTEM("typeSystem", Type.PLUGIN, null, false),
/** SQL conformance level. */
CONFORMANCE("conformance", Type.ENUM, SqlConformanceEnum.DEFAULT, false);
private final String camelName;
private final Type type;
private final Object defaultValue;
private final boolean required;
private final Class valueClass;
mysql 整合-v2
说明
我们想加一个时间限制,来读取数据。
数据准备
为了便于区分,我们再插入一些数据
insert into users (id, username, email) values (4, 'u-4', '4@email.com');
insert into users (id, username, email) values (5, 'u-5', '5@email.com');
insert into users (id, username, email) values (6, 'u-6', '6@email.com');
结果:
mysql> select * from users;
+----+----------+-------------+-------------------------+-------------------------+
| id | username | email | create_time | update_time |
+----+----------+-------------+-------------------------+-------------------------+
| 1 | u-1 | 1@email.com | 2024-06-25 21:28:33.871 | 2024-06-25 21:28:33.871 |
| 2 | u-2 | 2@email.com | 2024-06-25 21:28:33.881 | 2024-06-25 21:28:33.881 |
| 3 | u-3 | 3@email.com | 2024-06-25 21:28:34.616 | 2024-06-25 21:28:34.616 |
| 4 | u-4 | 4@email.com | 2024-06-25 21:33:54.436 | 2024-06-25 21:33:54.436 |
| 5 | u-5 | 5@email.com | 2024-06-25 21:33:54.446 | 2024-06-25 21:33:54.446 |
| 6 | u-6 | 6@email.com | 2024-06-25 21:33:55.137 | 2024-06-25 21:33:55.137 |
+----+----------+-------------+-------------------------+-------------------------+
失败的方式1
select * from users where create_time between '2024-06-25 21:28:33' and '2024-06-25 21:29:33'
直接把时间用字符串表示,mysql 测试正常。
oracle 异常
但是 oracle 会报错。
1) CAST 失败
测试发现 SELECT CAST('2024-06-25 10:30:00' AS TIMESTAMP) FROM DUAL;
oracle 报错无效的月份,为什么?
这个问题发现
SELECT * FROM NLS_SESSION_PARAMETERS WHERE PARAMETER = 'NLS_DATE_FORMAT';
发现这个值是 DD-MON-RR
这个和我们指定的格式字符串不是相同的。
2) 尝试 TO_TIMESTAMP
SELECT TO_TIMESTAMP('2024-06-25 10:30:00', 'YYYY-MM-DD HH24:MI:SS') FROM DUAL;
发现会报错 这个报错内部缓冲区的字符串过长
直接使用字符串,最后会使用 TO_TIMESTAMP,依然会导致失败。
失败的方式2
采用占位符设置。
String sql = "select count(*) from users where create_time between ? and ?";
PreparedStatement statement = calciteConnection.prepareStatement(sql);
statement.setTimestamp(1, Timestamp.valueOf("2024-06-25 21:28:33"));
statement.setTimestamp(2, Timestamp.valueOf("2024-06-25 21:29:33"));
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
System.out.println(resultSet.getObject(1));
}
statement.close();
connection.close();
mysql
mysql 执行时不会错误,但是结果是不对的。
{caseSensitive=false, fun=all, lex=JAVA, parserFactory=org.apache.calcite.sql.parser.impl.SqlParserImpl#FACTORY, remarks=true}
0
但是实际上应该是 3 才对。
oracle
oracle 这种会直接报错,估计是 calcite 的实现问题。
发现 apache calcite 把 TIMESTAMP 转转为了 LONG,导致 oracle 执行类型不匹配。预期需要 timestamp,但是为 number
尝试方式3
最后,尝试首先采用 calcite 内置函数,比如 PARSE_DATETIME 把结果转化为时间戳,这样才能匹配。
可以参考文档:
比如:
SELECT PARSE_DATETIME('%Y-%m-%d %H:%M:%S', '1998-10-18 13:45:55') AS datetime;
/*---------------------*
| datetime |
+---------------------+
| 1998-10-18T13:45:55 |
*---------------------*/
java 实现
注意:info.setProperty("fun","all");
这个比较重要,否则报错方法不存在。
String sql = "select count(*) from users where create_time between PARSE_DATETIME('%Y-%m-%d %H:%M:%S', ?) and PARSE_DATETIME('%Y-%m-%d %H:%M:%S', ?)";
PreparedStatement statement = calciteConnection.prepareStatement(sql);
statement.setTimestamp(1, Timestamp.valueOf("2024-06-25 21:28:33"));
statement.setTimestamp(2, Timestamp.valueOf("2024-06-25 21:29:33"));
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
System.out.println(resultSet.getObject(1));
}
statement.close();
connection.close();
但是这样会报错:
Exception in thread "main" java.sql.SQLException: Error while preparing statement [select count(*) from users where create_time between PARSE_DATETIME('%Y-%m-%d %H:%M:%S', ?) and PARSE_DATETIME('%Y-%m-%d %H:%M:%S', ?)]
at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:226)
at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:205)
at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:101)
at org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:178)
at com.github.houbb.calcite.learn.mysql.CalciteMySQLExampleDynamicVarV2.main(CalciteMySQLExampleDynamicVarV2.java:59)
Caused by: org.apache.calcite.runtime.CalciteContextException: At line 1, column 90: Illegal use of dynamic parameter
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:507)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:948)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:933)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5517)
at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:2039)
at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:2121)
at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:2121)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4584)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4576)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3829)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:61)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:88)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1154)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1125)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:282)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:797)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:607)
at org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:257)
at org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:220)
at org.apache.calcite.prepare.CalcitePrepareImpl.prepare2_(CalcitePrepareImpl.java:666)
at org.apache.calcite.prepare.CalcitePrepareImpl.prepare_(CalcitePrepareImpl.java:519)
at org.apache.calcite.prepare.CalcitePrepareImpl.prepareSql(CalcitePrepareImpl.java:487)
at org.apache.calcite.jdbc.CalciteConnectionImpl.parseQuery(CalciteConnectionImpl.java:236)
at org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:216)
... 4 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of dynamic parameter
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:507)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:601)
... 28 more
尝试方案4
既然必须要转换为 timestamp,还不能占位符。
那留给我们的路就不多了
String sql = "select count(*) from users where create_time between PARSE_DATETIME('%Y-%m-%d %H:%M:%S', '2024-06-25 21:28:33') and PARSE_DATETIME('%Y-%m-%d %H:%M:%S', '2024-06-25 21:29:33')";
PreparedStatement statement = calciteConnection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
System.out.println(resultSet.getObject(1));
}
此时 mysql 是支持的,结果为 3
正确。
小结
总的来说,这样的一个插件引入,导致我们最简单的方式都变得非常复杂。
解决方案也是有的,就是过于曲折。
参考资料
https://blog.csdn.net/a17816876003/article/details/125592222