源码应该怎么阅读?
直接根据例子,找到核心的类。
回归入门回顾
我们看一下 csv 的入门例子:
public static void main(String[] args) throws Exception {
// 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了
String path = "D:\\github\\calcite-learn\\calcite-learn-basic\\src\\main\\resources\\csv\\";
// 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等
CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE);
// 2.构建Connection
// 2.1 设置连接参数
Properties info = new Properties();
// 不区分sql大小写
info.setProperty("caseSensitive", "false");
// 2.2 获取标准的JDBC Connection
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
// 2.3 获取Calcite封装的Connection
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
// 以实现查询不同数据源的目的
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema
rootSchema.add("csv", csvSchema);
// 5.执行SQL查询,通过SQL方式访问csv文件
String sql = "select * from csv.depts";
Statement statement = calciteConnection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
// 6.遍历打印查询结果集
printResultSet(resultSet);
}
这里面除却属性配置,最主要的就是 CsvSchema/DriverManager
DriverManager 源码
内部属性
// List of registered JDBC drivers
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
private static volatile int loginTimeout = 0;
private static volatile java.io.PrintWriter logWriter = null;
private static volatile java.io.PrintStream logStream = null;
// Used in println() to synchronize logWriter
private final static Object logSync = new Object();
/**
* The <code>SQLPermission</code> constant that allows the
* setting of the logging stream.
* @since 1.3
*/
final static SQLPermission SET_LOG_PERMISSION =
new SQLPermission("setLog");
/**
* The {@code SQLPermission} constant that allows the
* un-register a registered JDBC driver.
* @since 1.8
*/
final static SQLPermission DEREGISTER_DRIVER_PERMISSION =
new SQLPermission("deregisterDriver");
驱动的静态初始化加载
/**
* Load the initial JDBC drivers by checking the System property
* jdbc.properties and then use the {@code ServiceLoader} mechanism
*/
static {
loadInitialDrivers();
println("JDBC DriverManager initialized");
}
这里主要是通过 SPI 加载对应的驱动,然后遍历一遍。
看起来什么都没做,实际上已经把对应的类信息等加载到了 jvm 中,个人理解。
private static void loadInitialDrivers() {
String drivers;
try {
drivers = AccessController.doPrivileged(new PrivilegedAction<String>() {
public String run() {
return System.getProperty("jdbc.drivers");
}
});
} catch (Exception ex) {
drivers = null;
}
// If the driver is packaged as a Service Provider, load it.
// Get all the drivers through the classloader
// exposed as a java.sql.Driver.class service.
// ServiceLoader.load() replaces the sun.misc.Providers()
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
// *****************************************
// 3. 通过SPI加载:java.sql.Driver的所有实现,但是,在代码上,好像什么都没做,就只是纯粹的遍历了一次.
// 核心逻辑就在:Driver的实现类上了,以:org.apache.calcite.jdbc.Driver为例
// org.apache.calcite.jdbc.Driver的静态方法会创建一个实例,调用: DriverManager.registerDriver(this);
// 所以,只要通过SPI加载,就会初始化这个静态代码块,驱动着java.sql.Driver的实现类,调用:DriverManager.registerDriver进行Drvier注册.
// *****************************************
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
Iterator<Driver> driversIterator = loadedDrivers.iterator();
/* Load these drivers, so that they can be instantiated.
* It may be the case that the driver class may not be there
* i.e. there may be a packaged driver with the service class
* as implementation of java.sql.Driver but the actual class
* may be missing. In that case a java.util.ServiceConfigurationError
* will be thrown at runtime by the VM trying to locate
* and load the service.
*
* Adding a try catch block to catch those runtime errors
* if driver not available in classpath but it's
* packaged as service and that service is there in classpath.
*/
try{
while(driversIterator.hasNext()) {
driversIterator.next();
}
} catch(Throwable t) {
// Do nothing
}
return null;
}
});
println("DriverManager.initialize: jdbc.drivers = " + drivers);
if (drivers == null || drivers.equals("")) {
return;
}
String[] driversList = drivers.split(":");
println("number of Drivers:" + driversList.length);
for (String aDriver : driversList) {
try {
println("DriverManager.Initialize: loading " + aDriver);
Class.forName(aDriver, true,
ClassLoader.getSystemClassLoader());
} catch (Exception ex) {
println("DriverManager.Initialize: load failed: " + ex);
}
}
}
驱动的注册
/**
* Registers the given driver with the {@code DriverManager}.
* A newly-loaded driver class should call
* the method {@code registerDriver} to make itself
* known to the {@code DriverManager}. If the driver is currently
* registered, no action is taken.
*
* @param driver the new JDBC Driver that is to be registered with the
* {@code DriverManager}
* @exception SQLException if a database access error occurs
* @exception NullPointerException if {@code driver} is null
*/
public static synchronized void registerDriver(java.sql.Driver driver)
throws SQLException {
registerDriver(driver, null);
}
实现其实也非常简单:
/**
* Registers the given driver with the {@code DriverManager}.
* A newly-loaded driver class should call
* the method {@code registerDriver} to make itself
* known to the {@code DriverManager}. If the driver is currently
* registered, no action is taken.
*
* @param driver the new JDBC Driver that is to be registered with the
* {@code DriverManager}
* @param da the {@code DriverAction} implementation to be used when
* {@code DriverManager#deregisterDriver} is called
* @exception SQLException if a database access error occurs
* @exception NullPointerException if {@code driver} is null
* @since 1.8
*/
public static synchronized void registerDriver(java.sql.Driver driver,
DriverAction da)
throws SQLException {
/* Register the driver if it has not already been added to our list */
if(driver != null) {
// 这里是一个 COW 的列表
registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
} else {
// This is for compatibility with the original DriverManager
throw new NullPointerException();
}
println("registerDriver: " + driver);
}
Connection 的获取
主要看一下核心方法,其他的都是对于各种属性的封装。
/**
* Attempts to establish a connection to the given database URL.
* The <code>DriverManager</code> attempts to select an appropriate driver from
* the set of registered JDBC drivers.
*<p>
* <B>Note:</B> If a property is specified as part of the {@code url} and
* is also specified in the {@code Properties} object, it is
* implementation-defined as to which value will take precedence.
* For maximum portability, an application should only specify a
* property once.
*
* @param url a database url of the form
* <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
* @param info a list of arbitrary string tag/value pairs as
* connection arguments; normally at least a "user" and
* "password" property should be included
* @return a Connection to the URL
* @exception SQLException if a database access error occurs or the url is
* {@code null}
* @throws SQLTimeoutException when the driver has determined that the
* timeout value specified by the {@code setLoginTimeout} method
* has been exceeded and has at least tried to cancel the
* current database connection attempt
*/
@CallerSensitive
public static Connection getConnection(String url,
java.util.Properties info) throws SQLException {
return (getConnection(url, info, Reflection.getCallerClass()));
}
调用实现:
// Worker method called by the public getConnection() methods.
private static Connection getConnection(
String url, java.util.Properties info, Class<?> caller) throws SQLException {
/*
* When callerCl is null, we should check the application's
* (which is invoking this class indirectly)
* classloader, so that the JDBC driver class outside rt.jar
* can be loaded from here.
*/
// 获取类加载器
ClassLoader callerCL = caller != null ? caller.getClassLoader() : null;
synchronized(DriverManager.class) {
// synchronize loading of the correct classloader.
if (callerCL == null) {
callerCL = Thread.currentThread().getContextClassLoader();
}
}
if(url == null) {
throw new SQLException("The url cannot be null", "08001");
}
println("DriverManager.getConnection(\"" + url + "\")");
// Walk through the loaded registeredDrivers attempting to make a connection.
// Remember the first exception that gets raised so we can reraise it.
SQLException reason = null;
// 这里虽然是遍历,但是会优先返回第一个匹配的
for(DriverInfo aDriver : registeredDrivers) {
// If the caller does not have permission to load the driver then
// skip it.
if(isDriverAllowed(aDriver.driver, callerCL)) {
try {
println(" trying " + aDriver.driver.getClass().getName());
// 调用的依然是驱动本身的链接。
Connection con = aDriver.driver.connect(url, info);
if (con != null) {
// Success!
println("getConnection returning " + aDriver.driver.getClass().getName());
return (con);
}
} catch (SQLException ex) {
if (reason == null) {
reason = ex;
}
}
} else {
println(" skipping: " + aDriver.getClass().getName());
}
}
// if we got here nobody could connect.
// 一些异常处理
if (reason != null) {
println("getConnection failed: " + reason);
throw reason;
}
println("getConnection: no suitable driver found for "+ url);
throw new SQLException("No suitable driver found for "+ url, "08001");
}
小结
整体而说,DriverManager 这个管理类实现并不复杂。
核心的思想是通过 SPI 来加载驱动类,通过 COW 列表保存驱动类。
获取 connection 的方式就是遍历列表,返回第一个匹配的驱动链接。
我们下一节重点看一下 calcite 对应的 driver 实现。
参考资料
https://www.lixin.help/2021/04/11/Calcite-Driver-Register.html