23 执行引擎:如何把握 ShardingSphere 中的 Executor 执行模型?(下) 在上一课时,我们已经对 ShardingSphere 执行引擎中关于底层的 SQLExecuteTemplate,以及上层的 StatementExecutor 和 PreparedStatementExecutor 对象进行了全面介绍。

今天,我们在此基础上更上一层,重点关注 ShardingStatement 和 ShardingPreparedStatement 对象,这两个对象分别是 StatementExecutor 和 PreparedStatementExecutor 的使用者。

ShardingStatement

我们先来看 ShardingStatement 类,该类中的变量在前面的内容中都已经有过介绍: private final ShardingConnection connection; private final StatementExecutor statementExecutor; private boolean returnGeneratedKeys; private SQLRouteResult sqlRouteResult; private ResultSet currentResultSet;

ShardingStatement 类的构造函数同样不是很复杂,我们发现 StatementExecutor 就是在这个构造函数中完成了其创建过程:

public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { super(Statement.class); this.connection = connection; //创建 StatementExecutor statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection); }

在继续介绍 ShardingStatement 之前,我们先梳理一下与它相关的类层结构。我们在 **“06 规范兼容:JDBC 规范与 ShardingSphere 是什么关系?”** 中的 ShardingConnection 提到,ShardingSphere 通过适配器模式包装了自己的实现类,除了已经介绍的 ShardingConnection 类之外,还包含今天要介绍的 ShardingStatement 和 ShardingPreparedStament。

根据这一点,我们可以想象 ShardingStatement 应该具备与 ShardingConnection 类似的类层结构:

Drawing 0.png

然后我们来到上图中 AbstractStatementAdapter 类,这里的很多方法的风格都与 ShardingConnection 的父类 AbstractConnectionAdapter 一致,例如如下所示的 setPoolable 方法: public final void setPoolable(final boolean poolable) throws SQLException { this.poolable = poolable; recordMethodInvocation(targetClass, “setPoolable”, new Class[] {boolean.class}, new Object[] {poolable}); forceExecuteTemplate.execute((Collection) getRoutedStatements(), new ForceExecuteCallback() { @Override public void execute(final Statement statement) throws SQLException { statement.setPoolable(poolable); } });

这里涉及的 recordMethodInvocation 方法、ForceExecuteTemplate,以及 ForceExecuteCallback 我们都已经在“03 规范兼容:JDBC 规范与 ShardingSphere 是什么关系?”中进行了介绍,这里不再展开。

同样,AbstractStatementAdapter 的父类 AbstractUnsupportedOperationStatement 的作用也与 AbstractUnsupportedOperationConnection 的作用完全一致。

了解了 ShardingStatement 的类层结构之后,我们来看它的核心方法,首当其冲的还是它的 executeQuery 方法: @Override public ResultSet executeQuery(final String sql) throws SQLException { if (Strings.isNullOrEmpty(sql)) { throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY); } ResultSet result; try { //清除 StatementExecutor 中的相关变量 clearPrevious(); //执行路由引擎,获取路由结果 shard(sql); //初始化 StatementExecutor initStatementExecutor(); //调用归并引擎 MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), statementExecutor.executeQuery()); //获取归并结果 result = getResultSet(mergeEngine); } finally { currentResultSet = null; } currentResultSet = result; return result; }

这个方法中有几个子方法值得具体展开一下,首先是 shard 方法:

private void shard(final String sql) { //从 Connection 中获取 ShardingRuntimeContext 上下文 ShardingRuntimeContext runtimeContext = connection.getRuntimeContext(); //创建 SimpleQueryShardingEngine SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine()); //执行分片路由并获取路由结果 sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList()); }

这段代码就是路由引擎的入口,我们创建了 SimpleQueryShardingEngine,并调用它的 shard 方法获取路由结果对象 SQLRouteResult。

然后我们来看 initStatementExecutor 方法,如下所示: private void initStatementExecutor() throws SQLException { statementExecutor.init(sqlRouteResult); replayMethodForStatements(); }

这里通过路由结果对象 SQLRouteResult 对 statementExecutor 进行了初始化,然后执行了一个 replayMethodForStatements 方法:

private void replayMethodForStatements() { for (Statement each : statementExecutor.getStatements()) { replayMethodsInvocation(each); } }

该方法实际上就是调用了基于反射的 replayMethodsInvocation 方法,然后这个replayMethodsInvocation 方法会针对 statementExecutor 中所有 Statement的 SQL 操作执行目标方法。

最后,我们通过执行 statementExecutor.executeQuery() 方法获取 SQL 执行的结果,并用这个结果来创建归并引擎 MergeEngine,并通过归并引擎 MergeEngine 获取最终的执行结果。 归并引擎是 ShardingSphere 中与 SQL 解析引擎、路由引擎以及执行引擎并列的一个引擎,我们在下一课时中就会开始介绍这块内容,这里先不做具体展开。

以 ShardingStatement 中的其中一个 executeUpdate 方法为例,可以看到它的执行流程也与前面的 executeQuery 方法非常类似:

@Override public int executeUpdate(final String sql) throws SQLException { try { //清除 StatementExecutor 中的相关变量 clearPrevious(); //执行路由引擎,获取路由结果 shard(sql); //初始化 StatementExecutor initStatementExecutor(); return statementExecutor.executeUpdate(); } finally { currentResultSet = null; } }

当然,对于 Update 操作而言,不需要通过归并引擎做结果的归并。

ShardingPreparedStatement

我们接着来看 ShardingPreparedStatement 类,这个类的变量也基本都是前面介绍过的对象: private final ShardingConnection connection; private final String sql; private final PreparedQueryShardingEngine shardingEngine; private final PreparedStatementExecutor preparedStatementExecutor; private final BatchPreparedStatementExecutor batchPreparedStatementExecutor; private SQLRouteResult sqlRouteResult; private ResultSet currentResultSet;

这里的 ShardingEngine、PreparedStatementExecutor 和 BatchPreparedStatementExecutor 对象的创建过程都发生在 ShardingPreparedStatement 的构造函数中。

然后我们来看它的代表性方法 ExecuteQuery,如下所示: @Override public ResultSet executeQuery() throws SQLException { ResultSet result; try { clearPrevious(); shard(); initPreparedStatementExecutor(); MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), preparedStatementExecutor.executeQuery()); result = getResultSet(mergeEngine); } finally { clearBatch(); } currentResultSet = result; return result; }

这里我们没加注释,但也应该理解这一方法的执行流程,因为该方法的风格与 ShardingStatement 中的同名方法非常一致。

关于 ShardingPreparedStatement 就没有太多可以介绍的内容了,我们接着来看它的父类AbstractShardingPreparedStatementAdapter 类,看到该类持有一个 SetParameterMethodInvocation 的列表,以及一个参数列表: private final List setParameterMethodInvocations = new LinkedList<>(); private final List parameters = new ArrayList<>();

这里的 SetParameterMethodInvocation 类直接集成了介绍 ShardingConnection 时提到的 JdbcMethodInvocation 类:

public final class SetParameterMethodInvocation extends JdbcMethodInvocation { @Getter private final int index; @Getter private final Object value; public SetParameterMethodInvocation(final Method method, final Object[] arguments, final Object value) { super(method, arguments); this.index = (int) arguments[0]; this.value = value; } public void changeValueArgument(final Object value) { getArguments()[1] = value; } }

对于 ShardingPreparedStatement 而言,这个类的作用是在 JdbcMethodInvocation 中所保存的方法和参数的基础上,添加了 SQL 执行过程中所需要的参数信息。

所以它的 replaySetParameter 方法就变成了如下的风格: protected final void replaySetParameter(final PreparedStatement preparedStatement, final List parameters) { setParameterMethodInvocations.clear(); //添加参数信息 addParameters(parameters); for (SetParameterMethodInvocation each : setParameterMethodInvocations) { each.invoke(preparedStatement); } }

关于 AbstractShardingPreparedStatementAdapter 还需要注意的是它的类层结构,如下图所示,可以看到 AbstractShardingPreparedStatementAdapter 继承了 AbstractUnsupportedOperationPreparedStatement 类;而 AbstractUnsupportedOperationPreparedStatement 却又继承了 AbstractStatementAdapter 类并实现了 PreparedStatement:

Drawing 2.png

形成这种类层结构的原因在于,PreparedStatement 本来就是在 Statement 的基础上添加了各种参数设置功能,换句话说,Statement 的功能 PreparedStatement 都应该有。

所以一方面 AbstractStatementAdapter 提供了所有 Statement 的功能;另一方面,AbstractShardingPreparedStatementAdapter 首先把 AbstractStatementAdapter 所有的功能继承过来,但它自身可能有一些无法实现的关于 PreparedStatement 的功能,所以同样提供了 AbstractUnsupportedOperationPreparedStatement 类,并被最终的 AbstractShardingPreparedStatementAdapter 适配器类所继承。

这样就形成了如上图所示的复杂类层结构。

ShardingConnection

介绍完 ShardingStatement 和 ShardingPreparedStatement 之后,我们来关注使用它们的具体应用场景,这也是 ShardingSphere 执行引擎的最后一部分内容。

通过查看调用关系,我们发现创建这两个类的入口都在 ShardingConnection 类中,该类包含了用于创建 ShardingStatement 的 createStatement 方法和用于创建 ShardingPreparedStatement 的 prepareStatement 方法,以及它们的各种重载方法: @Override public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability); } @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability); }

同时,ShardingConnection 中包含了用于管理分布式事务的 ShardingTransactionManager。关于分布式事务的讨论不是今天的重点,我们后面会有专题来做详细展开。

但我们可以先看一下 commit 和 rollback 方法: @Override public void commit() throws SQLException { if (TransactionType.LOCAL == transactionType) { super.commit(); } else { shardingTransactionManager.commit(); } } @Override public void rollback() throws SQLException { if (TransactionType.LOCAL == transactionType) { super.rollback(); } else { shardingTransactionManager.rollback(); } }

可以看到这两个方法的逻辑还是比较清晰的,即当事务类型为本地事务时直接调用 ShardingConnection 父类 AbstractConnectionAdapter 中的 commit 和 rollback 方法,这两个方法会调用真正的 connection 的相关方法。

以 commit 方法为例,我们可以看到 AbstractConnectionAdapter 中基于这一设计思想的实现过程: @Override public void commit() throws SQLException { forceExecuteTemplate.execute(cachedConnections.values(), new ForceExecuteCallback() { @Override public void execute(final Connection connection) throws SQLException { connection.commit(); } }); }

ShardingDataSource

我们知道在 JDBC 规范中,可以通过 DataSource 获取 Connection 对象。ShardingSphere 完全兼容 JDBC 规范,所以 ShardingConnection 的创建过程应该也是在对应的 DataSource 中,这个 DataSource 就是ShardingDataSource

ShardingDataSource 类比较简单,其构造函数如下所示: public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException { super(dataSourceMap); checkDataSourceType(dataSourceMap); runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType()); }

可以看到,ShardingRuntimeContext 这个上下文对象是在 ShardingDataSource 的构造函数中被创建的,而创建 ShardingConnection 的过程也很直接:

@Override public final ShardingConnection getConnection() { return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get()); }

在 ShardingDataSource 的实现上,也同样采用的是装饰器模式,所以它的类层结构也与 ShardingConnection 的类似。在 ShardingDataSource 的父类 AbstractDataSourceAdapter 中,主要的工作是完成 DatabaseType 的创建,核心方法 createDatabaseType 如下所示:

private DatabaseType createDatabaseType(final DataSource dataSource) throws SQLException { if (dataSource instanceof AbstractDataSourceAdapter) { return ((AbstractDataSourceAdapter) dataSource).databaseType; } try (Connection connection = dataSource.getConnection()) { return DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL()); } }

可以看到这里使用到了 DatabaseTypes 类,该类负责 DatabaseType 实例的动态管理。而在 ShardingSphere 中,DatabaseType 接口代表数据库类型:

public interface DatabaseType { //获取数据库名称 String getName(); //获取 JDBC URL 的前缀 Collection getJdbcUrlPrefixAlias(); //获取数据源元数据 DataSourceMetaData getDataSourceMetaData(String url, String username); }

可以想象 ShardingSphere 中针对各种数据库提供了 DatabaseType 接口的实现类,其中以 MySQLDatabaseType 为例:

public final class MySQLDatabaseType implements DatabaseType { @Override public String getName() { return “MySQL”; } @Override public Collection getJdbcUrlPrefixAlias() { return Collections.singletonList("jdbc:mysqlx:"); } @Override public MySQLDataSourceMetaData getDataSourceMetaData(final String url, final String username) { return new MySQLDataSourceMetaData(url); } }

上述代码中的 MySQLDataSourceMetaData 实现了 DataSourceMetaData 接口,并提供如下所示的对输入 url 的解析过程:

public MySQLDataSourceMetaData(final String url) { Matcher matcher = pattern.matcher(url); if (!matcher.find()) { throw new UnrecognizedDatabaseURLException(url, pattern.pattern()); } hostName = matcher.group(4); port = Strings.isNullOrEmpty(matcher.group(5)) ? DEFAULT_PORT : Integer.valueOf(matcher.group(5)); catalog = matcher.group(6); schema = null; }

显然,DatabaseType 用于保存与特定数据库元数据相关的信息,ShardingSphere 还基于 SPI 机制实现对各种 DatabaseType 实例的动态管理。

最后,我们来到 ShardingDataSourceFactory 工厂类,该类负责 ShardingDataSource 的创建: public final class ShardingDataSourceFactory { public static DataSource createDataSource( final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException { return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props); } }

我们在这里创建了 ShardingDataSource,同时发现 ShardingRule 的创建过程实际上也是在这里,通过传入的 ShardingRuleConfiguration 来构建一个新的 ShardingRule 对象。

一旦创建了 DataSource,我们就可以使用与 JDBC 规范完全兼容的 API,通过该 DataSource 完成各种 SQL 的执行。我们可以回顾 ShardingDataSourceFactory 的使用过程来加深对他的理解: public DataSource dataSource() throws SQLException { //创建分片规则配置类 ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration(); //创建分表规则配置类 TableRuleConfiguration tableRuleConfig = new TableRuleConfiguration(“user”, “ds${0..1}.user${0..1}”); //创建分布式主键生成配置类 Properties properties = new Properties(); result.setProperty(“worker.id”, “33”); KeyGeneratorConfiguration keyGeneratorConfig = new KeyGeneratorConfiguration(“SNOWFLAKE”, “id”, properties); result.setKeyGeneratorConfig(keyGeneratorConfig); shardingRuleConfig.getTableRuleConfigs().add(tableRuleConfig); //根据年龄分库,一共分为 2 个库 shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration(“sex”, “ds${sex % 2}”)); //根据用户 id 分表,一共分为 2 张表 shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration(“id”, “user${id % 2}”)); //通过工厂类创建具体的 DataSource return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties()); }

一旦获取了目标 DataSource 之后,我们就可以使用 JDBC 中的核心接口来执行传入的 SQL 语句:

List getUsers(final String sql) throws SQLException { List result = new LinkedList<>(); try (Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql); ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { User user= new User(); //省略设置 User 对象的赋值语句 result.add(user); } } return result; }

ShardingSphere 通过在准备阶段获取的连接模式,在执行阶段生成内存归并结果集流式归并结果集,并将其传递至结果归并引擎,以进行下一步工作。

从源码解析到日常开发

基于适配器模式完成对 JDBC 规范的重写,是我们学习 ShardingSphere 框架非常重要的一个切入点,同样也是我们将这种模式应用到日常开发工作中的一个切入点。

适配器模式是作为两个不兼容的接口之间的桥梁。在业务系统中,我们经常会碰到需要与外部系统进行对接和集成的场景,这个时候为了保证内部系统的功能演进,能够独立于外部系统进行发展,一般都需要采用适配器模式完成两者之间的隔离。

当我们设计这种系统时,可以参考 JDBC 规范中的接口定义方式,以及 ShardingSphere 中基于这种接口定义方式,而完成适配的具体做法。

小结与预告

这是 ShardingSphere 执行引擎的最后一个课时,我们围绕执行引擎的上层组件,给出了以“ Sharding”作为前缀的各种 JDBC 规范中的核心接口实现类。

其中 ShardingStatement 和 ShardingPreparedStatement 直接依赖于上一课时介绍的 StatementExecutor 和 PreparedStatementExecutor;而 ShardingConnection 和 ShardingDataSource 则为我们使用执行引擎提供了入口。

这里给你留一道思考题:ShardingSphere 中,AbstractShardingPreparedStatementAdapter 的类层结构为什么会比 AbstractStatementAdapter 复杂很多?欢迎你在留言区与大家讨论,我将逐一点评解答。

现在,我们已经通过执行引擎获取了来自不同数据源的结果数据,对于查询语句而言,我们通常都需要对这些结果数据进行归并才能返回给客户端。在接下来的内容中,就让我们来分析一下 ShardingSphere 的归并引擎。

参考资料

https://learn.lianglianglee.com/%e4%b8%93%e6%a0%8f/ShardingSphere%20%e6%a0%b8%e5%bf%83%e5%8e%9f%e7%90%86%e7%b2%be%e8%ae%b2-%e5%ae%8c/23%20%20%e6%89%a7%e8%a1%8c%e5%bc%95%e6%93%8e%ef%bc%9a%e5%a6%82%e4%bd%95%e6%8a%8a%e6%8f%a1%20ShardingSphere%20%e4%b8%ad%e7%9a%84%20Executor%20%e6%89%a7%e8%a1%8c%e6%a8%a1%e5%9e%8b%ef%bc%9f%ef%bc%88%e4%b8%8b%ef%bc%89.md