Exercise1: Filter and Join

exercise1要求我们完成Filter和Join两种操作符,下面是相关描述:

  • Filter: This operator only returns tuples that satisfy a Predicate that is specified as part of its constructor. Hence, it filters out any tuples that do not match the predicate.

  • Join: This operator joins tuples from its two children according to a JoinPredicate that is passed in as part of its constructor. We only require a simple nested loops join, but you may explore more interesting join implementations. Describe your implementation in your lab writeup.

Filter

Filter是SQL语句中where的基础,如 select * from students where id > 2, Filter 起到条件过滤的作用, 也即过滤出来所有满足 id > 2 的 tuple。

实现

首先我们基于 OpIterator 实现抽象类:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public abstract class Operator implements OpIterator { /** * 下一行 */ private Tuple next = null; /** * 是否已打开 */ private boolean open = false; public boolean hasNext() throws DbException, TransactionAbortedException { if (!this.open) throw new IllegalStateException("Operator not yet open"); if (next == null) next = fetchNext(); return next != null; } public Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException { if (next == null) { next = fetchNext(); if (next == null) throw new NoSuchElementException(); } Tuple result = next; next = null; return result; } protected abstract Tuple fetchNext() throws DbException, TransactionAbortedException; public void close() { // Ensures that a future call to next() will fail next = null; this.open = false; } public void open() throws DbException, TransactionAbortedException { this.open = true; } public abstract OpIterator[] getChildren(); public abstract void setChildren(OpIterator[] children); }

这样 Filter 就可以在这个基础上实现,fetchNext() 的实现如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 找到 child 节点中,满足 predicate 条件的元素返回 @Override protected Tuple fetchNext() throws DbException, TransactionAbortedException { while (this.child.hasNext()) { Tuple tuple = child.next(); //? 其实这里不一定非要判断为非 null // 比如我要查找的就是 IsNull 的元素呢 if(tuple != null && this.predicate.filter(tuple)) { return tuple; } } return null; }

predicate.filter 比较方法实现如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** * Compares the field number of t specified in the constructor to the * operand field specified in the constructor using the operator specific in * the constructor. * * The comparison can be made through Field's compare method. * * @param t The tuple to compare against * @return true if the comparison is true, false otherwise. */ public boolean filter(Tuple t) { final Field field1 = t.getField(this.field); final Field field2 = this.operand; return field1.compare(this.op, field2); }

实际上调用的是 Field 的 compare 方法。

比如 IntField 的信息如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean compare(Op op, Field field) { IntField intField = (IntField) field; switch (op) { case EQUALS: case LIKE: return value == intField.value; case NOT_EQUALS: return value != intField.value; case GREATER_THAN: return value > intField.value; case GREATER_THAN_OR_EQ: return value >= intField.value; case LESS_THAN: return value < intField.value; case LESS_THAN_OR_EQ: return value <= intField.value; } return false; }

OrderBy 排序实现

排序的原理:把所有的行放在列表中,根据排序的字段,进行排序。然后遍历返回即可。

基础属性如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/** * 子节点 */ private OpIterator child; /** * 子节点-行列表 */ private List<Tuple> childTupleList; /** * 迭代器 */ private Iterator<Tuple> iterator; /** * 行描述 */ private final TupleDesc tupleDesc; /** * 排序的字段下标 */ private final int orderByFieldIndex; /** * 正序还是倒叙 */ private final boolean asc;

构造器如下:

  [java]
1
2
3
4
5
6
public OrderBy(OpIterator child, int orderByFieldIndex, boolean asc) { this.child = child; this.tupleDesc = child.getTupleDesc(); this.orderByFieldIndex = orderByFieldIndex; this.asc = asc; }

open() 的方法,会进行初始化:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override public void open() throws DbException, TransactionAbortedException { // 打开执行的逻辑 child.open(); // 遍历初始化所有的元素 childTupleList = new ArrayList<>(); while (child.hasNext()) { childTupleList.add(child.next()); } // 排序 childTupleList.sort(new Comparator<Tuple>() { @Override public int compare(Tuple o1, Tuple o2) { Field field1 = o1.getField(orderByFieldIndex); Field field2 = o2.getField(orderByFieldIndex); // 二者相等 if(field1.compare(Op.EQUALS, field2)) { return 0; } // 大于 if(field1.compare(Op.GREATER_THAN, field2)) { return asc ? 1 : -1; } // 小于 return asc ? -1 : 1; } }); // 初始化迭代器 this.iterator = childTupleList.iterator(); super.open(); }

close() 比较简单,就是将迭代器置为 null:

  [java]
1
2
3
4
5
@Override public void close() { super.close(); this.iterator = null; }

fetchNext() 实现如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
@Override protected Tuple fetchNext() throws DbException, TransactionAbortedException { // filter 需要满足条件,这里只需要把排序后的元素输出即可。 // iterator 会在 close 之后被设置为 null if(iterator != null && iterator.hasNext()) { return iterator.next(); } return null; }

Join 实现

描述

理解了上面Filter与Predicate的关系以及OrderBy的实现思路,来做Join和JoinPredicate就会容易一点点了。

Join是连接查询实现的基本操作符,我们在MySQL中会区分内连接和外连接,我们这里只实现内连接。

  [sql]
1
select a.*,b.* from a inner join b on a.id=b.id

Join 实现

JOIN 基本属性:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Join extends Operator { /** * Iterator for the left(outer) relation to join */ private OpIterator child1; /** * Iterator for the right(inner) relation to join */ private OpIterator child2; /** * 行迭代器 */ private TupleIterator tupleIterator; /** * 行描述 */ private TupleDesc tupleDesc; /** * join 条件 */ private JoinPredicate joinPredicate; /** * join 策略 */ private JoinStrategy joinStrategy; }

构造器

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 根据 child1+child2 构造完整的 td public Join(JoinPredicate joinPredicate, OpIterator child1, OpIterator child2) { this.joinPredicate = joinPredicate; this.child1 = child1; this.child2 = child2; List<TupleDesc.TDItem> itemList1 = child1.getTupleDesc().getDescList(); List<TupleDesc.TDItem> itemList2 = child2.getTupleDesc().getDescList(); List<TupleDesc.TDItem> allItemList = new ArrayList<>(); allItemList.addAll(itemList1); allItemList.addAll(itemList2); this.tupleDesc = new TupleDesc(allItemList); }

对应的 open/close/rewind

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override public void open() throws DbException, TransactionAbortedException { //ps: 这种迭代,导致看起来很绕。。 this.child1.open(); this.child2.open(); super.open(); // 指定策略,这个接口设计不合理,导致无法主动指定? this.joinStrategy = new NestedLoopJoin(child1, child2, tupleDesc, joinPredicate); this.tupleIterator = this.joinStrategy.doJoin(); this.tupleIterator.open(); } @Override public void close() { this.tupleIterator.close(); this.joinStrategy.close(); this.child2.close(); this.child1.close(); super.close(); } @Override public void rewind() throws DbException, TransactionAbortedException { this.close(); this.open(); }

fetchNext 实现如下:

  [java]
1
2
3
4
5
6
7
8
9
@Override protected Tuple fetchNext() throws DbException, TransactionAbortedException { // 直接迭代元素即可 if (tupleIterator != null && tupleIterator.hasNext()) { return tupleIterator.next(); } return null; }

TupleIterator 行内容迭代器

对 Tuple 行内容迭代的一层封装。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TupleIterator implements OpIterator { /** * 内部迭代器 */ private Iterator<Tuple> innerIter; /** * 行描述 */ private final TupleDesc tupleDesc; /** * 行迭代器 */ private final Iterable<Tuple> tupleIterable; public TupleIterator(TupleDesc tupleDesc, Iterable<Tuple> tupleIterable) { this.tupleDesc = tupleDesc; this.tupleIterable = tupleIterable; } }

JoinPredicate 条件

只是对 Tuple 之间比较的一层封装。

我们通过制定的 index 字段进行比较。

ps: 这里可以发现,封装了一堆东西,导致理解变得复杂。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
/** * Apply the predicate to the two specified tuples. The comparison can be * made through Field's compare method. * * @return true if the tuples satisfy the predicate. */ public boolean filter(Tuple t1, Tuple t2) { // some code goes here Field field1 = t1.getField(this.field1); Field field2 = t2.getField(this.field2); return field1.compare(this.op, field2); }

JoinStrategy 策略

抽象父类,实现了最基础的方法。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/** * 将两个 tuple 合并为一个 * @param tuple1 第一个 * @param tuple2 第二个 * @param td 描述 * @return 结果 */ protected Tuple mergeTuple(final Tuple tuple1, final Tuple tuple2, final TupleDesc td) { final Tuple tuple = new Tuple(td); //1. 加入 t1 int nums1 = tuple1.getTupleDesc().getDescList().size(); for(int i = 0; i < nums1; i++) { tuple.setField(i, tuple1.getField(i)); } //2. 加入 t2 int nums2 = tuple2.getTupleDesc().getDescList().size(); for(int j = 0; j < nums2; j++) { tuple.setField(j+nums1, tuple1.getField(j)); } return tuple; } /** * 将 child 中的元素列表,放入到 tuples 数组中。 * @param child 待获取的迭代器 * @param tuples 数组 * @return 数量 * @throws Exception ex */ protected int fetchTuples(final OpIterator child, final Tuple[] tuples) throws Exception { // 使用 List 可能更加简单。使用的场景比较固定,使用数组也不错。 int count = 0; //1. 清空原始数组 Arrays.fill(tuples, null); // 是否需要判断 tuples.length ? while (child.hasNext()) { Tuple tuple = child.next(); tuples[count++] = tuple; } return count; }

NestedLoopJoin

这是一个双层的嵌套循环,性能比较差。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 性能最差的 O(N^2) 的迭代 @Override public TupleIterator doJoin() { List<Tuple> tupleList = new ArrayList<>(); child1.rewind(); while (child1.hasNext()) { Tuple tuple1 = child1.next(); // 遍历 child2,匹配 child2.rewind(); while (child2.hasNext()) { Tuple tuple2 = child2.next(); if(this.joinPredicate.filter(tuple1, tuple2)) { Tuple mergeTuple = mergeTuple(tuple1, tuple2, tupleDesc); tupleList.add(mergeTuple); } } } // 返回迭代结果 return new TupleIterator(this.tupleDesc, tupleList); }

如何优化呢?

SortMergeJoin

主要分为几个步骤:

  • 构建两个 block 缓冲块

  • 对于 输入源 child1, 利用 block1 缓冲其每一个 block, 然会遍历 child2 的每一个 block, 进行 sortMergeJoin:

    • 先对两个 block 进行排序

    • 然后利用双指针算法, 进行匹配输出即可

ps: 最核心的就是 mergeJoin() 方法。排序后有助于提升匹配的性能。说白了就是 sort+binarySearch?

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/** * 排序合并 * * @author binbin.hou * @since 1.1.0 */ public class SortMergeJoin extends JoinStrategy { private final int blockCacheSize = 131072 * 5; private Tuple[] block1; private Tuple[] block2; private JoinPredicate lt; private JoinPredicate eq; public SortMergeJoin(final OpIterator child1, final OpIterator child2, final TupleDesc td, final JoinPredicate joinPredicate) { super(child1, child2, td, joinPredicate); final int tuple1Num = this.blockCacheSize / child1.getTupleDesc().getSize(); final int tuple2Num = this.blockCacheSize / child2.getTupleDesc().getSize(); // build cache block this.block1 = new Tuple[tuple1Num]; this.block2 = new Tuple[tuple2Num]; final int field1 = this.joinPredicate.getField1(); final int field2 = this.joinPredicate.getField2(); this.lt = new JoinPredicate(field1, Op.LESS_THAN, field2); this.eq = new JoinPredicate(field1, Op.EQUALS, field2); } @Override public TupleIterator doJoin() { final List<Tuple> tupleList = new ArrayList<>(); // fetch child1 try { child1.rewind(); while (child1.hasNext()) { int end1 = fetchTuples(child1, block1); // Fetch each block of child2, and do merge join child2.rewind(); while (child2.hasNext()) { int end2 = fetchTuples(child2, block2); mergeJoin(tupleList, end1, end2); } } } catch (Exception e) { e.printStackTrace(); System.out.println("Error happen when sort merge join:" + e.getMessage()); } Arrays.fill(this.block1, null); Arrays.fill(this.block2, null); return new TupleIterator(this.tupleDesc, tupleList); } private void mergeJoin(final List<Tuple> tupleList, int end1, int end2) { // 1.Sort each block final int field1 = this.joinPredicate.getField1(); final int field2 = this.joinPredicate.getField2(); sortTuples(block1, end1, field1); sortTuples(block2, end2, field2); // 2.Join int index1 = 0, index2 = 0; final Op op = this.joinPredicate.getOp(); switch (op) { case EQUALS: { while (index1 < end1 && index2 < end2) { final Tuple lTuple = this.block1[index1]; final Tuple rTuple = this.block2[index2]; if (eq.filter(lTuple, rTuple)) { // If equal , we should find the right boundary that equal to lTuple in block1 and rTuple in block2 final JoinPredicate eq1 = new JoinPredicate(field1, Op.EQUALS, field1); final JoinPredicate eq2 = new JoinPredicate(field2, Op.EQUALS, field2); int begin1 = index1 + 1, begin2 = index2 + 1; while (begin1 < end1 && eq1.filter(lTuple, this.block1[begin1])) begin1++; while (begin2 < end2 && eq2.filter(rTuple, this.block2[begin2])) begin2++; for (int i = index1; i < begin1; i++) { for (int j = index2; j < begin2; j++) { tupleList.add(mergeTuple(this.block1[i], this.block2[j], this.tupleDesc)); } } index1 = begin1; index2 = begin2; } else if (lt.filter(lTuple, rTuple)) { index1++; } else { index2++; } } return; } case LESS_THAN: case LESS_THAN_OR_EQ: { while (index1 < end1) { final Tuple lTuple = this.block1[index1++]; while (index2 < end2 && !this.joinPredicate.filter(lTuple, this.block2[index2])) index2++; while (index2 < end2) { final Tuple rTuple = this.block2[index2++]; tupleList.add(mergeTuple(lTuple, rTuple, this.tupleDesc)); } } return; } case GREATER_THAN: case GREATER_THAN_OR_EQ: { while (index1 < end1) { final Tuple lTuple = this.block1[index1++]; while (index2 < end2 && this.joinPredicate.filter(lTuple, this.block2[index2])) index2++; for (int i = 0; i < index2; i++) { final Tuple rTuple = this.block2[i]; tupleList.add(mergeTuple(lTuple, rTuple, this.tupleDesc)); } } } } } private void sortTuples(final Tuple[] tuples, int field, int len) { final JoinPredicate lt = new JoinPredicate(field, Op.LESS_THAN, field); final JoinPredicate gt = new JoinPredicate(field, Op.GREATER_THAN, field); Arrays.sort(tuples, 0, len, (o1, o2) -> { if (lt.filter(o1, o2)) { return -1; } if (gt.filter(o1, o2)) { return 1; } return 0; }); } @Override public void close() { this.block1 = null; this.block2 = null; } }

Hash-join

如果是相等的情况,可以采用 Hash 保留对应的 field.key。values 就是一个列表。

使用 Map 将时间复杂度降低为 O(1)。

不过应用场景比较低,其实也满足,因为日常开发中使用 join 基本也都是 a.id=b.id 的场景。

参考资料

https://github.com/CreatorsStack/CreatorDB/blob/master/document/lab2-resolve.md