Exercise1: Filter and Join

exercise1要求我们完成Filter和Join两种操作符,下面是相关描述:

  • Filter: This operator only returns tuples that satisfy a Predicate that is specified as part of its constructor. Hence, it filters out any tuples that do not match the predicate.

  • Join: This operator joins tuples from its two children according to a JoinPredicate that is passed in as part of its constructor. We only require a simple nested loops join, but you may explore more interesting join implementations. Describe your implementation in your lab writeup.

Filter

Filter是SQL语句中where的基础,如 select * from students where id > 2, Filter 起到条件过滤的作用, 也即过滤出来所有满足 id > 2 的 tuple。

实现

首先我们基于 OpIterator 实现抽象类:

public abstract class Operator implements OpIterator {

    /**
     * 下一行
     */
    private Tuple next = null;
    /**
     * 是否已打开
     */
    private boolean open = false;

    public boolean hasNext() throws DbException, TransactionAbortedException {
        if (!this.open)
            throw new IllegalStateException("Operator not yet open");

        if (next == null)
            next = fetchNext();
        return next != null;
    }

    public Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {
        if (next == null) {
            next = fetchNext();
            if (next == null)
                throw new NoSuchElementException();
        }

        Tuple result = next;
        next = null;
        return result;
    }

    protected abstract Tuple fetchNext() throws DbException, TransactionAbortedException;

    public void close() {
        // Ensures that a future call to next() will fail
        next = null;
        this.open = false;
    }

    public void open() throws DbException, TransactionAbortedException {
        this.open = true;
    }

    public abstract OpIterator[] getChildren();

    public abstract void setChildren(OpIterator[] children);

}

这样 Filter 就可以在这个基础上实现,fetchNext() 的实现如下:

// 找到 child 节点中,满足 predicate 条件的元素返回
@Override
protected Tuple fetchNext() throws DbException, TransactionAbortedException {
    while (this.child.hasNext()) {
        Tuple tuple = child.next();

        //? 其实这里不一定非要判断为非 null
        // 比如我要查找的就是 IsNull 的元素呢
        if(tuple != null
            && this.predicate.filter(tuple)) {
            return tuple;
        }
    }

    return null;
}

predicate.filter 比较方法实现如下:

/**
 * Compares the field number of t specified in the constructor to the
 * operand field specified in the constructor using the operator specific in
 * the constructor.
 *
 * The comparison can be made through Field's compare method.
 *
 * @param t The tuple to compare against
 * @return true if the comparison is true, false otherwise.
 */
public boolean filter(Tuple t) {
    final Field field1 = t.getField(this.field);
    final Field field2 = this.operand;

    return field1.compare(this.op, field2);
}

实际上调用的是 Field 的 compare 方法。

比如 IntField 的信息如下:

public boolean compare(Op op, Field field) {
    IntField intField = (IntField) field;

    switch (op) {
        case EQUALS:
        case LIKE:
            return value == intField.value;
        case NOT_EQUALS:
            return value != intField.value;
        case GREATER_THAN:
            return value > intField.value;
        case GREATER_THAN_OR_EQ:
            return value >= intField.value;
        case LESS_THAN:
            return value < intField.value;
        case LESS_THAN_OR_EQ:
            return value <= intField.value;
    }

    return false;
}

OrderBy 排序实现

排序的原理:把所有的行放在列表中,根据排序的字段,进行排序。然后遍历返回即可。

基础属性如下:

/**
 * 子节点
 */
private OpIterator child;
/**
 * 子节点-行列表
 */
private List<Tuple> childTupleList;
/**
 * 迭代器
 */
private Iterator<Tuple> iterator;
/**
 * 行描述
 */
private final TupleDesc tupleDesc;
/**
 * 排序的字段下标
 */
private final int orderByFieldIndex;
/**
 * 正序还是倒叙
 */
private final boolean asc;

构造器如下:

public OrderBy(OpIterator child, int orderByFieldIndex, boolean asc) {
    this.child = child;
    this.tupleDesc = child.getTupleDesc();
    this.orderByFieldIndex = orderByFieldIndex;
    this.asc = asc;
}

open() 的方法,会进行初始化:

@Override
public void open() throws DbException, TransactionAbortedException {
    // 打开执行的逻辑
    child.open();

    // 遍历初始化所有的元素
    childTupleList = new ArrayList<>();
    while (child.hasNext()) {
        childTupleList.add(child.next());
    }

    // 排序
    childTupleList.sort(new Comparator<Tuple>() {
        @Override
        public int compare(Tuple o1, Tuple o2) {
            Field field1 = o1.getField(orderByFieldIndex);
            Field field2 = o2.getField(orderByFieldIndex);
            // 二者相等
            if(field1.compare(Op.EQUALS, field2)) {
                return 0;
            }
            // 大于
            if(field1.compare(Op.GREATER_THAN, field2)) {
                return asc ? 1 : -1;
            }
            // 小于
            return asc ? -1 : 1;
        }
    });

    // 初始化迭代器
    this.iterator = childTupleList.iterator();
    super.open();
}

close() 比较简单,就是将迭代器置为 null:

@Override
public void close() {
    super.close();
    this.iterator = null;
}

fetchNext() 实现如下:

@Override
protected Tuple fetchNext() throws DbException, TransactionAbortedException {
    // filter 需要满足条件,这里只需要把排序后的元素输出即可。
    // iterator 会在 close 之后被设置为 null
    if(iterator != null
        && iterator.hasNext()) {
        return iterator.next();
    }

    return null;
}

Join 实现

描述

理解了上面Filter与Predicate的关系以及OrderBy的实现思路,来做Join和JoinPredicate就会容易一点点了。

Join是连接查询实现的基本操作符,我们在MySQL中会区分内连接和外连接,我们这里只实现内连接。

select a.*,b.* from a inner join b on a.id=b.id

Join 实现

JOIN 基本属性:

public class Join extends Operator {

    /**
     * Iterator for the left(outer) relation to join
     */
    private OpIterator child1;
    /**
     * Iterator for the right(inner) relation to join
     */
    private OpIterator child2;

    /**
     * 行迭代器
     */
    private TupleIterator tupleIterator;
    /**
     * 行描述
     */
    private TupleDesc tupleDesc;

    /**
     * join 条件
     */
    private JoinPredicate joinPredicate;
    /**
     * join 策略
     */
    private JoinStrategy joinStrategy;
}

构造器

// 根据 child1+child2 构造完整的 td
public Join(JoinPredicate joinPredicate,
            OpIterator child1, OpIterator child2) {
    this.joinPredicate = joinPredicate;
    this.child1 = child1;
    this.child2 = child2;

    List<TupleDesc.TDItem> itemList1 = child1.getTupleDesc().getDescList();
    List<TupleDesc.TDItem> itemList2 = child2.getTupleDesc().getDescList();
    List<TupleDesc.TDItem> allItemList = new ArrayList<>();
    allItemList.addAll(itemList1);
    allItemList.addAll(itemList2);
    this.tupleDesc = new TupleDesc(allItemList);
}

对应的 open/close/rewind

@Override
public void open() throws DbException, TransactionAbortedException {
    //ps: 这种迭代,导致看起来很绕。。
    this.child1.open();
    this.child2.open();
    super.open();
    // 指定策略,这个接口设计不合理,导致无法主动指定?
    this.joinStrategy = new NestedLoopJoin(child1, child2, tupleDesc, joinPredicate);
    this.tupleIterator = this.joinStrategy.doJoin();
    this.tupleIterator.open();
}

@Override
public void close() {
    this.tupleIterator.close();
    this.joinStrategy.close();
    this.child2.close();
    this.child1.close();
    super.close();
}

@Override
public void rewind() throws DbException, TransactionAbortedException {
    this.close();
    this.open();
}

fetchNext 实现如下:

@Override
protected Tuple fetchNext() throws DbException, TransactionAbortedException {
    // 直接迭代元素即可
    if (tupleIterator != null
        && tupleIterator.hasNext()) {
        return tupleIterator.next();
    }
    return null;
}

TupleIterator 行内容迭代器

对 Tuple 行内容迭代的一层封装。

public class TupleIterator implements OpIterator {

    /**
     * 内部迭代器
     */
    private Iterator<Tuple> innerIter;

    /**
     * 行描述
     */
    private final TupleDesc tupleDesc;
    /**
     * 行迭代器
     */
    private final Iterable<Tuple> tupleIterable;

    public TupleIterator(TupleDesc tupleDesc, Iterable<Tuple> tupleIterable) {
        this.tupleDesc = tupleDesc;
        this.tupleIterable = tupleIterable;
    }

}

JoinPredicate 条件

只是对 Tuple 之间比较的一层封装。

我们通过制定的 index 字段进行比较。

ps: 这里可以发现,封装了一堆东西,导致理解变得复杂。

/**
 * Apply the predicate to the two specified tuples. The comparison can be
 * made through Field's compare method.
 *
 * @return true if the tuples satisfy the predicate.
 */
public boolean filter(Tuple t1, Tuple t2) {
    // some code goes here
    Field field1 = t1.getField(this.field1);
    Field field2 = t2.getField(this.field2);
    return field1.compare(this.op, field2);
}

JoinStrategy 策略

抽象父类,实现了最基础的方法。

/**
 * 将两个 tuple 合并为一个
 * @param tuple1 第一个
 * @param tuple2 第二个
 * @param td 描述
 * @return 结果
 */
protected Tuple mergeTuple(final Tuple tuple1, final Tuple tuple2, final TupleDesc td) {
    final Tuple tuple = new Tuple(td);
    //1. 加入 t1
    int nums1 = tuple1.getTupleDesc().getDescList().size();
    for(int i = 0; i < nums1; i++) {
        tuple.setField(i, tuple1.getField(i));
    }
    //2. 加入 t2
    int nums2 = tuple2.getTupleDesc().getDescList().size();
    for(int j = 0; j < nums2; j++) {
        tuple.setField(j+nums1, tuple1.getField(j));
    }
    return tuple;
}

/**
 * 将 child 中的元素列表,放入到 tuples 数组中。
 * @param child 待获取的迭代器
 * @param tuples 数组
 * @return 数量
 * @throws Exception ex
 */
protected int fetchTuples(final OpIterator child, final Tuple[] tuples) throws Exception {
    // 使用 List 可能更加简单。使用的场景比较固定,使用数组也不错。
    int count = 0;
    //1. 清空原始数组
    Arrays.fill(tuples, null);
    // 是否需要判断 tuples.length ?
    while (child.hasNext()) {
        Tuple tuple = child.next();
        tuples[count++] = tuple;
    }
    return count;
}

NestedLoopJoin

这是一个双层的嵌套循环,性能比较差。

// 性能最差的 O(N^2) 的迭代
@Override
public TupleIterator doJoin() {
    List<Tuple> tupleList = new ArrayList<>();
    child1.rewind();
    while (child1.hasNext()) {
        Tuple tuple1 = child1.next();
        // 遍历 child2,匹配
        child2.rewind();
        while (child2.hasNext()) {
           Tuple tuple2 = child2.next();
           if(this.joinPredicate.filter(tuple1, tuple2)) {
                Tuple mergeTuple = mergeTuple(tuple1, tuple2, tupleDesc);
                tupleList.add(mergeTuple);
           }
        }
    }
    // 返回迭代结果
    return new TupleIterator(this.tupleDesc, tupleList);
}

如何优化呢?

SortMergeJoin

主要分为几个步骤:

  • 构建两个 block 缓冲块

  • 对于 输入源 child1, 利用 block1 缓冲其每一个 block, 然会遍历 child2 的每一个 block, 进行 sortMergeJoin:

    • 先对两个 block 进行排序

    • 然后利用双指针算法, 进行匹配输出即可

ps: 最核心的就是 mergeJoin() 方法。排序后有助于提升匹配的性能。说白了就是 sort+binarySearch?

/**
 * 排序合并
 *
 * @author binbin.hou
 * @since 1.1.0
 */
public class SortMergeJoin extends JoinStrategy {

    private final int     blockCacheSize = 131072 * 5;
    private Tuple[]       block1;
    private Tuple[]       block2;

    private JoinPredicate lt;
    private JoinPredicate eq;

    public SortMergeJoin(final OpIterator child1, final OpIterator child2, final TupleDesc td,
                         final JoinPredicate joinPredicate) {
        super(child1, child2, td, joinPredicate);
        final int tuple1Num = this.blockCacheSize / child1.getTupleDesc().getSize();
        final int tuple2Num = this.blockCacheSize / child2.getTupleDesc().getSize();

        // build cache block
        this.block1 = new Tuple[tuple1Num];
        this.block2 = new Tuple[tuple2Num];

        final int field1 = this.joinPredicate.getField1();
        final int field2 = this.joinPredicate.getField2();
        this.lt = new JoinPredicate(field1, Op.LESS_THAN, field2);
        this.eq = new JoinPredicate(field1, Op.EQUALS, field2);
    }

    @Override
    public TupleIterator doJoin() {
        final List<Tuple> tupleList = new ArrayList<>();

        // fetch child1
        try {
            child1.rewind();
            while (child1.hasNext()) {
                int end1 = fetchTuples(child1, block1);
                // Fetch each block of child2, and do merge join
                child2.rewind();
                while (child2.hasNext()) {
                    int end2 = fetchTuples(child2, block2);
                    mergeJoin(tupleList, end1, end2);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Error happen when sort merge join:" + e.getMessage());
        }
        Arrays.fill(this.block1, null);
        Arrays.fill(this.block2, null);
        return new TupleIterator(this.tupleDesc, tupleList);
    }

    private void mergeJoin(final List<Tuple> tupleList, int end1, int end2) {
        // 1.Sort each block
        final int field1 = this.joinPredicate.getField1();
        final int field2 = this.joinPredicate.getField2();
        sortTuples(block1, end1, field1);
        sortTuples(block2, end2, field2);

        // 2.Join
        int index1 = 0, index2 = 0;
        final Op op = this.joinPredicate.getOp();
        switch (op) {
            case EQUALS: {
                while (index1 < end1 && index2 < end2) {
                    final Tuple lTuple = this.block1[index1];
                    final Tuple rTuple = this.block2[index2];
                    if (eq.filter(lTuple, rTuple)) {
                        // If equal , we should find the right boundary that equal to lTuple in block1 and rTuple in block2
                        final JoinPredicate eq1 = new JoinPredicate(field1, Op.EQUALS, field1);
                        final JoinPredicate eq2 = new JoinPredicate(field2, Op.EQUALS, field2);
                        int begin1 = index1 + 1, begin2 = index2 + 1;
                        while (begin1 < end1 && eq1.filter(lTuple, this.block1[begin1]))
                            begin1++;
                        while (begin2 < end2 && eq2.filter(rTuple, this.block2[begin2]))
                            begin2++;
                        for (int i = index1; i < begin1; i++) {
                            for (int j = index2; j < begin2; j++) {
                                tupleList.add(mergeTuple(this.block1[i], this.block2[j], this.tupleDesc));
                            }
                        }
                        index1 = begin1;
                        index2 = begin2;
                    } else if (lt.filter(lTuple, rTuple)) {
                        index1++;
                    } else {
                        index2++;
                    }
                }
                return;
            }
            case LESS_THAN:
            case LESS_THAN_OR_EQ: {
                while (index1 < end1) {
                    final Tuple lTuple = this.block1[index1++];
                    while (index2 < end2 && !this.joinPredicate.filter(lTuple, this.block2[index2]))
                        index2++;
                    while (index2 < end2) {
                        final Tuple rTuple = this.block2[index2++];
                        tupleList.add(mergeTuple(lTuple, rTuple, this.tupleDesc));
                    }
                }
                return;
            }
            case GREATER_THAN:
            case GREATER_THAN_OR_EQ: {
                while (index1 < end1) {
                    final Tuple lTuple = this.block1[index1++];
                    while (index2 < end2 && this.joinPredicate.filter(lTuple, this.block2[index2]))
                        index2++;
                    for (int i = 0; i < index2; i++) {
                        final Tuple rTuple = this.block2[i];
                        tupleList.add(mergeTuple(lTuple, rTuple, this.tupleDesc));
                    }
                }
            }
        }
    }

    private void sortTuples(final Tuple[] tuples, int field, int len) {
        final JoinPredicate lt = new JoinPredicate(field, Op.LESS_THAN, field);
        final JoinPredicate gt = new JoinPredicate(field, Op.GREATER_THAN, field);
        Arrays.sort(tuples, 0, len, (o1, o2) -> {
            if (lt.filter(o1, o2)) {
                return -1;
            }
            if (gt.filter(o1, o2)) {
                return 1;
            }
            return 0;
        });
    }

    @Override
    public void close() {
        this.block1 = null;
        this.block2 = null;
    }

}

Hash-join

如果是相等的情况,可以采用 Hash 保留对应的 field.key。values 就是一个列表。

使用 Map 将时间复杂度降低为 O(1)。

不过应用场景比较低,其实也满足,因为日常开发中使用 join 基本也都是 a.id=b.id 的场景。

参考资料

https://github.com/CreatorsStack/CreatorDB/blob/master/document/lab2-resolve.md