思路

直接参考 calcite csv 模块的代码,实现一个最简单版本的 csv database.

csv 官方的使用方法

我们先学习下,原来的官方 csv 如何使用的。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.github.houbb.calcite.learn.basic; import org.apache.calcite.adapter.csv.CsvSchema; import org.apache.calcite.adapter.csv.CsvTable; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.schema.SchemaPlus; import java.io.File; import java.sql.*; import java.util.Properties; public class CsvDemo { public static void main(String[] args) throws Exception { // 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了 String path = "D:\\github\\calcite-learn\\calcite-learn-basic\\src\\main\\resources\\csv\\"; // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等 CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE); // 2.构建Connection // 2.1 设置连接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下 // 以实现查询不同数据源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema rootSchema.add("csv", csvSchema); // 5.执行SQL查询,通过SQL方式访问csv文件 String sql = "select * from csv.depts"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 printResultSet(resultSet); } public static void printResultSet(ResultSet resultSet) throws SQLException { // 结果输出 } }

核心实现

CsvSchema 对应 csv 的数据库实现,主要入口在这里。

我们学习一下如何实现。

maven 依赖

我们先引入核心依赖。

  [xml]
1
2
3
4
5
<dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <version>1.36.0</version> </dependency>

CsvSchema-数据库

我们先模仿这个,实现一下 csv 的 schema

当然,这里还有一个 CsvSchemaFactory 属于一个工厂类,我们先忽略。

直接遍历 dir,针对每一个文件构建出对应的表。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.github.houbb.calcite.adaptor.csv; import com.github.houbb.heaven.util.io.FileUtil; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.schema.impl.AbstractTable; import java.io.File; import java.util.HashMap; import java.util.Map; /** * csv 数据库 * * @since 0.1.0 */ public class CsvSchema extends AbstractSchema { /** * csv 文件夹目录 */ private final File dir; private final String charset; private final Map<String, Table> innerTableMap = new HashMap<>(); public CsvSchema(File dir, String charset) { this.dir = dir; this.charset = charset; this.initTableMap(); } private void initTableMap() { File[] files = dir.listFiles(); for(File file : files) { String fileName = file.getName(); if(fileName.endsWith(".csv")) { // 构建最简单的 scan table AbstractTable abstractTable = new CsvScannableTable(file, charset); innerTableMap.put(FileUtil.getFileName(fileName), abstractTable); } } } @Override protected Map<String, Table> getTableMap() { // 这里需要遍历其中de return innerTableMap; } }

CsvScannableTable-简单的全表扫描表实现

这里我们为了简单,暂时直接把所有的列都定为字符串类别。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package com.github.houbb.calcite.adaptor.csv; import com.github.houbb.heaven.util.io.FileUtil; import com.github.houbb.heaven.util.lang.StringUtil; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.ScannableTable; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Pair; import java.io.File; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; /** * csv 数据库 * * @since 0.1.0 */ public class CsvScannableTable extends AbstractTable implements ScannableTable { /** * csv 文件 */ private final File csvFile; private final String charset; public CsvScannableTable(File csvFile, String charset) { this.csvFile = csvFile; this.charset = charset; } // 行类别 @Override public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) { // 这里需要处理各种类别 final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) relDataTypeFactory; // fieldName 哪里来?直接解析第一行 List<String> allLines = FileUtil.readAllLines(csvFile, charset, 0, 2, true); String headLine = allLines.get(0); // 所有的列名称 csv 第一行获取 final List<String> fieldNameList = StringUtil.splitToList(headLine, ","); // 可以根据不同的内容处理,简单起见,全部设置为 varchar? 后续可以细化为不同的类别 // String firstDataLine = allLines.get(1); final List<RelDataType> fieldTypeList = fieldNameList.stream() .map(new Function<String, RelDataType>() { @Override public RelDataType apply(String s) { return javaTypeFactory.createSqlType(SqlTypeName.VARCHAR); } }) .collect(Collectors.toList()); // 结果 return javaTypeFactory.createStructType(Pair.zip(fieldNameList, fieldTypeList)); } // 可以和 row 优化到一起,暂时不动 private List<RelDataType> getFieldTypes(final JavaTypeFactory javaTypeFactory) { List<String> allLines = FileUtil.readAllLines(csvFile, charset, 0, 2, true); String headLine = allLines.get(0); // 所有的列名称 csv 第一行获取 final List<String> fieldNameList = StringUtil.splitToList(headLine, ","); // 可以根据不同的内容处理,简单起见,全部设置为 varchar? 后续可以细化为不同的类别 final List<RelDataType> fieldTypeList = fieldNameList.stream() .map(new Function<String, RelDataType>() { @Override public RelDataType apply(String s) { return javaTypeFactory.createSqlType(SqlTypeName.VARCHAR); } }) .collect(Collectors.toList()); return fieldTypeList; } // 如何 scan 一个表的数据? @Override public Enumerable<Object[]> scan(DataContext root) { JavaTypeFactory typeFactory = root.getTypeFactory(); final List<RelDataType> fieldTypes = getFieldTypes(typeFactory); final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size()); // 这个做什么的? final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); // 构建一个遍历结果 List<String> dataLines = FileUtil.readAllLines(csvFile, charset, 1, Integer.MAX_VALUE, false); return new CsvEnumerable(dataLines, fieldTypes); } }

CsvEnumerable-行信息的遍历实现

处理好了行的列字段,那么如何遍历每一行数据呢?

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.github.houbb.calcite.adaptor.csv; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.rel.type.RelDataType; import java.util.List; /** * csv 数据库 * * @since 0.1.0 */ public class CsvEnumerable extends AbstractEnumerable<Object[]> { /** * csv 文件 */ private final List<String> dataLines; private final List<RelDataType> fieldTypes; private int index = 0; public CsvEnumerable(List<String> allLines, List<RelDataType> fieldTypes) { this.dataLines = allLines; this.fieldTypes = fieldTypes; index = 0; } private Object[] buildRowDataArray(String rowLine) { Object[] dataArray = new Object[fieldTypes.size()]; // 逗号拆分 String[] datas = rowLine.split(","); int index = 0; for(String data : datas) { // 根据类别转换处理,此处省略。 dataArray[index++] = data; } return dataArray; } @Override public Enumerator<Object[]> enumerator() { return new Enumerator<Object[]>() { @Override public Object[] current() { // 当前 String line = dataLines.get(0); return buildRowDataArray(line); } @Override public boolean moveNext() { index++; return index < dataLines.size(); } @Override public void reset() { index = 0; } @Override public void close() { // 这个是什么概念? // 我们一次读取完成的,暂时不需要考虑。 // 如果是文件流,这里需要做流的关闭 } }; } }

到这里,完整的 csv 的数据库查询就已经完成了。

实际测试

maven 依赖

  [xml]
1
2
3
4
5
<dependency> <groupId>com.github.houbb</groupId> <artifactId>calcite-adaptor-csv</artifactId> <version>0.1.0</version> </dependency>

测试 csv 文件

  • depts.csv
EMPNO:long,NAME:string,DEPTNO:int,GENDER:string,CITY:string,EMPID:int,AGE:int,SLACKER:boolean,MANAGER:boolean,JOINEDAT:date
100,"Fred",10,,,30,25,true,false,"1996-08-03"
110,"Eric",20,"M","San Francisco",3,80,,false,"2001-01-01"
110,"John",40,"M","Vancouver",2,,false,true,"2002-05-03"
120,"Wilma",20,"F",,1,5,,true,"2005-09-07"
130,"Alice",40,"F","Vancouver",2,,false,true,"2007-01-01"

测试代码

基本和原来一行,唯一的区别是变成了我们自己定义的实现。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.github.houbb.calcite.adaptor.csv; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.schema.SchemaPlus; import java.io.File; import java.sql.*; import java.util.Properties; public class MyCsvDemo { public static void main(String[] args) throws Exception { // 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了 String path = "D:\\code\\github\\calcite-adaptor\\calcite-adaptor-test\\src\\main\\resources\\csv\\"; // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等 CsvSchema csvSchema = new CsvSchema(new File(path), "UTF-8"); // 2.构建Connection // 2.1 设置连接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下 // 以实现查询不同数据源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema rootSchema.add("csv", csvSchema); // 5.执行SQL查询,通过SQL方式访问csv文件 String sql = "select * from csv.depts"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 printResultSet(resultSet); } public static void printResultSet(ResultSet resultSet) throws SQLException { // 获取 ResultSet 元数据 ResultSetMetaData metaData = resultSet.getMetaData(); // 获取列数 int columnCount = metaData.getColumnCount(); System.out.println("Number of columns: " + columnCount); // 遍历 ResultSet 并打印结果 while (resultSet.next()) { // 遍历每一列并打印 for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); String columnValue = resultSet.getString(i); System.out.println(columnName + ": " + columnValue); } System.out.println(); // 换行 } } }

测试效果:

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Number of columns: 10 EMPNO:long: 100 NAME:string: "Fred" DEPTNO:int: 10 GENDER:string: CITY:string: EMPID:int: 30 AGE:int: 25 SLACKER:boolean: true MANAGER:boolean: false JOINEDAT:date: "1996-08-03" EMPNO:long: 100 NAME:string: "Fred" DEPTNO:int: 10 GENDER:string: CITY:string: EMPID:int: 30 AGE:int: 25 SLACKER:boolean: true MANAGER:boolean: false JOINEDAT:date: "1996-08-03" EMPNO:long: 100 NAME:string: "Fred" DEPTNO:int: 10 GENDER:string: CITY:string: EMPID:int: 30 AGE:int: 25 SLACKER:boolean: true MANAGER:boolean: false JOINEDAT:date: "1996-08-03" EMPNO:long: 100 NAME:string: "Fred" DEPTNO:int: 10 GENDER:string: CITY:string: EMPID:int: 30 AGE:int: 25 SLACKER:boolean: true MANAGER:boolean: false JOINEDAT:date: "1996-08-03"

小结

整体而言,calcite 封装的非常强大,我们自己实现也并不困难。

这种的意义是非常大的,现在有各种数据库/文件之类的,学习成本比较高。

比如 redis/kafka/mongodb/es,每一种语言过一段时间不用就会忘记,还是 SQL 用的最多。

下一节,我们可以一起学习一下如何实现一个支持条件过滤的 csv 表。

参考资料

10分钟教你写一个数据库