#

源码应该怎么阅读?

直接根据例子,找到核心的类。

回归入门回顾

我们看一下 csv 的入门例子:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) throws Exception { // 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了 String path = "D:\\github\\calcite-learn\\calcite-learn-basic\\src\\main\\resources\\csv\\"; // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等 CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE); // 2.构建Connection // 2.1 设置连接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下 // 以实现查询不同数据源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema rootSchema.add("csv", csvSchema); // 5.执行SQL查询,通过SQL方式访问csv文件 String sql = "select * from csv.depts"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 printResultSet(resultSet); }

这里面除却属性配置,最主要的就是 CsvSchema/DriverManager

CsvSchema 源码

基本属性

  [java]
1
2
3
4
public class CsvSchema extends AbstractSchema { private final File directoryFile; private final CsvTable.Flavor flavor; private Map<String, Table> tableMap;

AbstractSchema 父类

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.calcite.schema.impl; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; import java.util.Collection; import java.util.Map; import java.util.Set; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.schema.Function; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaFactory; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.SchemaVersion; import org.apache.calcite.schema.Schemas; import org.apache.calcite.schema.Table; public class AbstractSchema implements Schema { public AbstractSchema() { } public boolean isMutable() { return true; } public Schema snapshot(SchemaVersion version) { return this; } public Expression getExpression(SchemaPlus parentSchema, String name) { return Schemas.subSchemaExpression(parentSchema, name, this.getClass()); } protected Map<String, Table> getTableMap() { return ImmutableMap.of(); } public final Set<String> getTableNames() { return this.getTableMap().keySet(); } public final Table getTable(String name) { return (Table)this.getTableMap().get(name); } protected Map<String, RelProtoDataType> getTypeMap() { return ImmutableMap.of(); } public RelProtoDataType getType(String name) { return (RelProtoDataType)this.getTypeMap().get(name); } public Set<String> getTypeNames() { return this.getTypeMap().keySet(); } protected Multimap<String, Function> getFunctionMultimap() { return ImmutableMultimap.of(); } public final Collection<Function> getFunctions(String name) { return this.getFunctionMultimap().get(name); } public final Set<String> getFunctionNames() { return this.getFunctionMultimap().keySet(); } protected Map<String, Schema> getSubSchemaMap() { return ImmutableMap.of(); } public final Set<String> getSubSchemaNames() { return this.getSubSchemaMap().keySet(); } public final Schema getSubSchema(String name) { return (Schema)this.getSubSchemaMap().get(name); } public static class Factory implements SchemaFactory { public static final Factory INSTANCE = new Factory(); private Factory() { } public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { return new AbstractSchema(); } } }

构造器

  [java]
1
2
3
4
public CsvSchema(File directoryFile, CsvTable.Flavor flavor) { this.directoryFile = directoryFile; this.flavor = flavor; }

简单方法

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static String trim(String s, String suffix) { String trimmed = trimOrNull(s, suffix); return trimmed != null ? trimmed : s; } private static String trimOrNull(String s, String suffix) { return s.endsWith(suffix) ? s.substring(0, s.length() - suffix.length()) : null; } protected Map<String, Table> getTableMap() { if (this.tableMap == null) { this.tableMap = this.createTableMap(); } return this.tableMap; }

核心方法

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private Map<String, Table> createTableMap() { // 文件 Source baseSource = Sources.of(this.directoryFile); File[] files = this.directoryFile.listFiles((dir, name) -> { String nameSansGz = trim(name, ".gz"); return nameSansGz.endsWith(".csv") || nameSansGz.endsWith(".json"); }); if (files == null) { System.out.println("directory " + this.directoryFile + " not found"); files = new File[0]; } ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); File[] var4 = files; int var5 = files.length; for(int var6 = 0; var6 < var5; ++var6) { File file = var4[var6]; Source source = Sources.of(file); Source sourceSansGz = source.trim(".gz"); Source sourceSansJson = sourceSansGz.trimOrNull(".json"); if (sourceSansJson != null) { JsonTable table = new JsonTable(source); builder.put(sourceSansJson.relative(baseSource).path(), table); } else { Source sourceSansCsv = sourceSansGz.trim(".csv"); Table table = this.createTable(source); builder.put(sourceSansCsv.relative(baseSource).path(), table); } } return builder.build(); }

this.createTable(source) 方法如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
private Table createTable(Source source) { switch (this.flavor) { case TRANSLATABLE: return new CsvTranslatableTable(source, (RelProtoDataType)null); case SCANNABLE: return new CsvScannableTable(source, (RelProtoDataType)null); case FILTERABLE: return new CsvFilterableTable(source, (RelProtoDataType)null); default: throw new AssertionError("Unknown flavor " + this.flavor); } }

这里涉及到如何创建一张表,我们选择 SCANNABLE,对应的实现如下;

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CsvScannableTable extends CsvTable implements ScannableTable { CsvScannableTable(Source source, RelProtoDataType protoRowType) { super(source, protoRowType); } public String toString() { return "CsvScannableTable"; } public Enumerable<Object[]> scan(DataContext root) { final int[] fields = CsvEnumerator.identityList(this.fieldTypes.size()); final AtomicBoolean cancelFlag = (AtomicBoolean)Variable.CANCEL_FLAG.get(root); return new AbstractEnumerable<Object[]>() { public Enumerator<Object[]> enumerator() { return new CsvEnumerator(CsvScannableTable.this.source, cancelFlag, false, (String[])null, new CsvEnumerator.ArrayRowConverter(CsvScannableTable.this.fieldTypes, fields)); } }; } }

CsvEnumerator

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
package org.apache.calcite.adapter.csv; import au.com.bytecode.opencsv.CSVReader; import java.io.IOException; import java.io.Reader; import java.text.ParseException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.TimeZone; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Source; import org.apache.commons.lang3.time.FastDateFormat; class CsvEnumerator<E> implements Enumerator<E> { // 内部属性 private final CSVReader reader; private final String[] filterValues; private final AtomicBoolean cancelFlag; private final RowConverter<E> rowConverter; private E current; private static final FastDateFormat TIME_FORMAT_DATE; private static final FastDateFormat TIME_FORMAT_TIME; private static final FastDateFormat TIME_FORMAT_TIMESTAMP; // 构造器 CsvEnumerator(Source source, AtomicBoolean cancelFlag, List<CsvFieldType> fieldTypes) { this(source, cancelFlag, fieldTypes, identityList(fieldTypes.size())); } CsvEnumerator(Source source, AtomicBoolean cancelFlag, List<CsvFieldType> fieldTypes, int[] fields) { this(source, cancelFlag, false, (String[])null, converter(fieldTypes, fields)); } CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream, String[] filterValues, RowConverter<E> rowConverter) { this.cancelFlag = cancelFlag; this.rowConverter = rowConverter; this.filterValues = filterValues; try { if (stream) { this.reader = new CsvStreamReader(source); } else { this.reader = openCsv(source); } this.reader.readNext(); } catch (IOException var7) { throw new RuntimeException(var7); } } // 转换方法 private static RowConverter<?> converter(List<CsvFieldType> fieldTypes, int[] fields) { if (fields.length == 1) { int field = fields[0]; return new SingleColumnRowConverter((CsvFieldType)fieldTypes.get(field), field); } else { return new ArrayRowConverter(fieldTypes, fields); } } // 行类别推断--------------------------------------------------------------------------------------------------- static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source, List<CsvFieldType> fieldTypes) { return deduceRowType(typeFactory, source, fieldTypes, false); } static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source, List<CsvFieldType> fieldTypes, Boolean stream) { List<RelDataType> types = new ArrayList(); List<String> names = new ArrayList(); if (stream) { names.add("ROWTIME"); types.add(typeFactory.createSqlType(SqlTypeName.TIMESTAMP)); } try { CSVReader reader = openCsv(source); Throwable var7 = null; try { String[] strings = reader.readNext(); if (strings == null) { strings = new String[]{"EmptyFileHasNoColumns:boolean"}; } String[] var9 = strings; int var10 = strings.length; for(int var11 = 0; var11 < var10; ++var11) { String string = var9[var11]; int colon = string.indexOf(58); String name; CsvFieldType fieldType; if (colon >= 0) { name = string.substring(0, colon); String typeString = string.substring(colon + 1); fieldType = CsvFieldType.of(typeString); if (fieldType == null) { System.out.println("WARNING: Found unknown type: " + typeString + " in file: " + source.path() + " for column: " + name + ". Will assume the type of column is string"); } } else { name = string; fieldType = null; } RelDataType type; if (fieldType == null) { type = typeFactory.createSqlType(SqlTypeName.VARCHAR); } else { type = fieldType.toType(typeFactory); } names.add(name); types.add(type); if (fieldTypes != null) { fieldTypes.add(fieldType); } } } catch (Throwable var25) { var7 = var25; throw var25; } finally { if (reader != null) { if (var7 != null) { try { reader.close(); } catch (Throwable var24) { var7.addSuppressed(var24); } } else { reader.close(); } } } } catch (IOException var27) { } if (names.isEmpty()) { names.add("line"); types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR)); } return typeFactory.createStructType(Pair.zip(names, types)); } public static CSVReader openCsv(Source source) throws IOException { Reader fileReader = source.reader(); return new CSVReader(fileReader); } public E current() { return this.current; } public boolean moveNext() { try { label46: while(!this.cancelFlag.get()) { String[] strings = this.reader.readNext(); if (strings == null) { if (!(this.reader instanceof CsvStreamReader)) { this.current = null; this.reader.close(); return false; } try { Thread.sleep(2000L); } catch (InterruptedException var4) { throw new RuntimeException(var4); } } else { if (this.filterValues != null) { for(int i = 0; i < strings.length; ++i) { String filterValue = this.filterValues[i]; if (filterValue != null && !filterValue.equals(strings[i])) { continue label46; } } } this.current = this.rowConverter.convertRow(strings); return true; } } return false; } catch (IOException var5) { throw new RuntimeException(var5); } } public void reset() { throw new UnsupportedOperationException(); } public void close() { try { this.reader.close(); } catch (IOException var2) { throw new RuntimeException("Error closing CSV reader", var2); } } static int[] identityList(int n) { int[] integers = new int[n]; for(int i = 0; i < n; integers[i] = i++) { } return integers; } static { TimeZone gmt = TimeZone.getTimeZone("GMT"); TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt); TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt); } private static class SingleColumnRowConverter extends RowConverter { private final CsvFieldType fieldType; private final int fieldIndex; private SingleColumnRowConverter(CsvFieldType fieldType, int fieldIndex) { this.fieldType = fieldType; this.fieldIndex = fieldIndex; } public Object convertRow(String[] strings) { return this.convert(this.fieldType, strings[this.fieldIndex]); } } static class ArrayRowConverter extends RowConverter<Object[]> { private final CsvFieldType[] fieldTypes; private final int[] fields; private final boolean stream; ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) { this.fieldTypes = (CsvFieldType[])fieldTypes.toArray(new CsvFieldType[0]); this.fields = fields; this.stream = false; } ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields, boolean stream) { this.fieldTypes = (CsvFieldType[])fieldTypes.toArray(new CsvFieldType[0]); this.fields = fields; this.stream = stream; } public Object[] convertRow(String[] strings) { return this.stream ? this.convertStreamRow(strings) : this.convertNormalRow(strings); } public Object[] convertNormalRow(String[] strings) { Object[] objects = new Object[this.fields.length]; for(int i = 0; i < this.fields.length; ++i) { int field = this.fields[i]; objects[i] = this.convert(this.fieldTypes[field], strings[field]); } return objects; } public Object[] convertStreamRow(String[] strings) { Object[] objects = new Object[this.fields.length + 1]; objects[0] = System.currentTimeMillis(); for(int i = 0; i < this.fields.length; ++i) { int field = this.fields[i]; objects[i + 1] = this.convert(this.fieldTypes[field], strings[field]); } return objects; } } abstract static class RowConverter<E> { RowConverter() { } abstract E convertRow(String[] var1); // 类别转换 protected Object convert(CsvFieldType fieldType, String string) { if (fieldType == null) { return string; } else { Date date; switch (fieldType) { case BOOLEAN: if (string.length() == 0) { return null; } return Boolean.parseBoolean(string); case BYTE: if (string.length() == 0) { return null; } return Byte.parseByte(string); case SHORT: if (string.length() == 0) { return null; } return Short.parseShort(string); case INT: if (string.length() == 0) { return null; } return Integer.parseInt(string); case LONG: if (string.length() == 0) { return null; } return Long.parseLong(string); case FLOAT: if (string.length() == 0) { return null; } return Float.parseFloat(string); case DOUBLE: if (string.length() == 0) { return null; } return Double.parseDouble(string); case DATE: if (string.length() == 0) { return null; } else { try { date = CsvEnumerator.TIME_FORMAT_DATE.parse(string); return (int)(date.getTime() / 86400000L); } catch (ParseException var6) { return null; } } case TIME: if (string.length() == 0) { return null; } else { try { date = CsvEnumerator.TIME_FORMAT_TIME.parse(string); return (int)date.getTime(); } catch (ParseException var5) { return null; } } case TIMESTAMP: if (string.length() == 0) { return null; } else { try { date = CsvEnumerator.TIME_FORMAT_TIMESTAMP.parse(string); return date.getTime(); } catch (ParseException var4) { return null; } } case STRING: default: return string; } } } } }

参考资料

https://www.lixin.help/2021/04/11/Calcite-Driver-Register.html