思路

直接参考 calcite csv 模块的代码,实现一个最简单版本的 csv database.

官方过滤的例子

官方的例子看了下应该只是支持 2 个

AND 和 EUQALS

  • CsvFilterableTable.java
  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static boolean addFilter(RexNode filter, @Nullable Object[] filterValues) { if (filter.isA(SqlKind.AND)) { // We cannot refine(remove) the operands of AND, // it will cause o.a.c.i.TableScanNode.createFilterable filters check failed. ((RexCall) filter).getOperands().forEach(subFilter -> addFilter(subFilter, filterValues)); } else if (filter.isA(SqlKind.EQUALS)) { final RexCall call = (RexCall) filter; RexNode left = call.getOperands().get(0); if (left.isA(SqlKind.CAST)) { left = ((RexCall) left).operands.get(0); } final RexNode right = call.getOperands().get(1); if (left instanceof RexInputRef && right instanceof RexLiteral) { final int index = ((RexInputRef) left).getIndex(); if (filterValues[index] == null) { filterValues[index] = ((RexLiteral) right).getValue2().toString(); return true; } } } return false; }

和原来的 scan 对比,这里多了后面的 List<RexNode> filters 参数。

这里应该就是 where 后面的条件参数。

自定义 filterable table

和原来的 scannable table 类似。

我们需要在 csv 遍历的时候,加一下我们的过滤器。

scan 原来的样子

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override public Enumerable<Object[]> scan(DataContext root, List<RexNode> list) { JavaTypeFactory typeFactory = root.getTypeFactory(); final List<RelDataType> fieldTypes = getFieldTypes(typeFactory); final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size()); // 这个做什么的? final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); // 构建一个遍历结果 List<String> dataLines = FileUtil.readAllLines(csvFile, charset, 1, Integer.MAX_VALUE, false); // TODO: 添加过滤 return new CsvEnumerable(dataLines, fieldTypes); }

添加过滤条件

代码调整如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override public Enumerable<Object[]> scan(DataContext root, List<RexNode> list) { JavaTypeFactory typeFactory = root.getTypeFactory(); final List<RelDataType> fieldTypes = getFieldTypes(typeFactory); final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size()); // 这个做什么的? final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); // 构建一个遍历结果 List<String> dataLines = FileUtil.readAllLines(csvFile, charset, 1, Integer.MAX_VALUE, false); // TODO: 添加过滤 Map<String, List<String>> conditionMap = new HashMap<>(); for(RexNode rexNode : list) { addFilter(rexNode, conditionMap); } return new CsvEnumerable(dataLines, fieldTypes, conditionMap); }

官方例子原始的 RexNode 实现,使用的是数组。

实际上只是做了简单的事情,存储对应的 index 下面要 equals 过滤的词而已。

我们这里直接使用 Map 来存储,效果类似。

addFilter

addFilter 的实现如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//key 字段名称 // value 是一个list,可能这样写 where name='A' and name='B'。虽然比较傻,但是支持。 private static void addFilter(RexNode filter, final Map<String, List<String>> conditionMap) { if (filter.isA(SqlKind.AND)) { // We cannot refine(remove) the operands of AND, // it will cause o.a.c.i.TableScanNode.createFilterable filters check failed. // 递归 ((RexCall) filter).getOperands().forEach(subFilter -> addFilter(subFilter, conditionMap)); } else if (filter.isA(SqlKind.EQUALS)) { final RexCall call = (RexCall) filter; RexNode left = call.getOperands().get(0); if (left.isA(SqlKind.CAST)) { left = ((RexCall) left).operands.get(0); } final RexNode right = call.getOperands().get(1); if (left instanceof RexInputRef && right instanceof RexLiteral) { final String leftName = ((RexInputRef)left).getName(); String rightValue = ((RexLiteral) right).getValue2().toString(); List<String> conditonList = conditionMap.getOrDefault(leftName, new ArrayList<>()); conditonList.add(rightValue); conditionMap.put(leftName, conditonList); } } }

CsvEnumerable 枚举类

添加 conditionMap,我们需要在在遍历的时候,做一下数据的过滤处理。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.github.houbb.calcite.adaptor.csv; import com.github.houbb.heaven.util.util.MapUtil; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.rel.type.RelDataType; import java.util.HashMap; import java.util.List; import java.util.Map; /** * csv 数据库 * * @since 0.1.0 */ public class CsvEnumerable extends AbstractEnumerable<Object[]> { /** * csv 文件 */ private final List<String> dataLines; private final List<RelDataType> fieldTypes; private int index = 0; /** * @since v0.2.0 条件 map */ private final Map<Integer, List<String>> conditionMap; /** * 当前行信息 * @since 0.2.0 */ private Object[] currentRow; public CsvEnumerable(List<String> allLines, List<RelDataType> fieldTypes) { this(allLines, fieldTypes, new HashMap<>()); } public CsvEnumerable(List<String> allLines, List<RelDataType> fieldTypes, final Map<Integer, List<String>> conditionMap) { this.dataLines = allLines; this.fieldTypes = fieldTypes; index = 0; this.conditionMap = conditionMap; } private Object[] buildRowDataArray(String rowLine) { Object[] dataArray = new Object[fieldTypes.size()]; // 逗号拆分 String[] datas = rowLine.split(","); int index = 0; for(String data : datas) { // 根据类别转换处理,此处省略。 dataArray[index++] = data; } return dataArray; } @Override public Enumerator<Object[]> enumerator() { return new Enumerator<Object[]>() { @Override public Object[] current() { return currentRow; } @Override public boolean moveNext() { if(index >= dataLines.size()) { return false; } // 根据 match 匹配 boolean match = false; while (index < dataLines.size()) { match = isMatchFilter(); index++; if(match) { // 满足,则跳过循环 break; } } return match; } @Override public void reset() { index = 0; } @Override public void close() { // 这个是什么概念? // 我们一次读取完成的,暂时不需要考虑。 // 如果是文件流,这里需要做流的关闭 } }; } private boolean isMatchFilter() { String currentLine = dataLines.get(index); currentRow = buildRowDataArray(currentLine); if(MapUtil.isEmpty(conditionMap)) { return true; } // 条件判断 for(Map.Entry<Integer, List<String>> entry : conditionMap.entrySet()) { Integer index = entry.getKey(); List<String> expectEqualValues = entry.getValue(); // 判断是否全部满足 String realValue = String.valueOf(currentRow[index]); for(String expectValue : expectEqualValues) { if(!expectValue.equals(realValue)) { return false; } } } return true; } }

主要是加了个 isMatchFilter 的过滤。

测试

  [java]
1
2
3
4
// 5.执行SQL查询,通过SQL方式访问csv文件 String sql = "select * from csv.depts where name='Fred'"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql);

文本数据:

  [plaintext]
1
2
3
4
5
6
EMPNO:long,NAME:string,DEPTNO:int,GENDER:string,CITY:string,EMPID:int,AGE:int,SLACKER:boolean,MANAGER:boolean,JOINEDAT:date 100,Fred,10,,,30,25,true,false,1996-08-03 110,Eric,20,M,San Francisco,3,80,,false,2001-01-01 110,John,40,M,Vancouver,2,,false,true,2002-05-03 120,Wilma,20,F,,1,5,,true,2005-09-07 130,Alice,40,F,Vancouver,2,,false,true,2007-01-01

测试结果

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
Number of columns: 10 empno: 100 name: Fred deptno: 10 gender: city: empid: 30 age: 25 slacker: true manager: false joinedat: 1996-08-03

小结

整体而言,filter 相对而言会麻烦一点。

整体思路就是解析 RexNode,然后存储所有的条件。在遍历数据的时候,进行统一的过滤。

参考资料

10分钟教你写一个数据库