Exercise2: Aggregates
介绍
一个额外的 SimpleDB 运算符使用 `GROUP BY` 子句实现基本 SQL 聚合。
您应该实现五个 SQL 聚合(`COUNT`、`SUM`、`AVG`、`MIN`、`MAX`)并支持分组。
您只需要支持单个字段的聚合,并按单个字段分组。
为了计算聚合,我们使用“聚合器(聚合器)”接口将新元组合并到聚合的现有计算中。
`Aggregator` 在构造过程中被告知它应该使用什么操作进行聚合。
随后,客户端代码应为子迭代器中的每个元组调用“Aggregator.mergeTupleIntoGroup()”。
合并所有元组后,客户端可以检索聚合结果的 OpIterator。
结果中的每个元组都是“(groupValue,aggregateValue)”形式的一对,除非分组依据字段的值是“Aggregator.NO_GROUPING”,在这种情况下,结果是“(aggregateValue)形式的单个元组`。
请注意,此实现需要与不同组的数量呈线性关系的空间。
就本实验而言,您无需担心组数超过可用内存的情况。
exerciese2要求我们实现各种聚合运算如count、sum、avg、min、max等,并且聚合器需要拥有分组聚合的功能。
如以下SQL语句:
SELECT SUM(fee) AS country_group_total_fee, country FROM member GROUP BY country
Aggregator
接口
其实接口本身的定义应该很简单:
public interface Aggregator extends Serializable {
/**
* Merge a new tuple into the aggregate for a distinct group value;
* creates a new group aggregate result if the group value has not yet
* been encountered.
*
* @param tup the Tuple containing an aggregate field and a group-by field
*/
void mergeTupleIntoGroup(Tuple tup);
/**
* Create a OpIterator over group aggregate results.
* @see com.github.houbb.simpledb.learn.storage.row.TupleIterator for a possible helper
*/
OpIterator iterator();
}
这里演示了常见的把接口当做常量定义的方式(不推荐的方式):
int NO_GROUPING = -1;
/**
* SUM_COUNT and SC_AVG will
* only be used in lab7, you are not required
* to implement them until then.
* */
enum Op implements Serializable {
MIN, MAX, SUM, AVG, COUNT,
/**
* SUM_COUNT: compute sum and count simultaneously, will be
* needed to compute distributed avg in lab7.
* */
SUM_COUNT,
/**
* SC_AVG: compute the avg of a set of SUM_COUNT tuples,
* will be used to compute distributed avg in lab7.
* */
SC_AVG;
/**
* Interface to access operations by a string containing an integer
* index for command-line convenience.
*
* @param s a string containing a valid integer Op index
*/
public static Op getOp(String s) {
return getOp(Integer.parseInt(s));
}
/**
* Interface to access operations by integer value for command-line
* convenience.
*
* @param i a valid integer Op index
*/
public static Op getOp(int i) {
return values()[i];
}
//toString()
}
StringAggregator
如果我们分组的是 string 类型的字段,那么是只支持 count 操作的
select count(*), add_date from user group by add_date;
属性
public class StringAggregator implements Aggregator {
// 分组的 map
private Map<Field, Integer> groupMap;
// Group by field
private int gbField;
private Type gbFieldType;
// Aggregation field
private int agField;
// Aggregation operation
private Op op;
/**
* 默认字段
* ps: 这个名称是否会重复?还说 default 是 keyword?
*/
private Field DEFAULT_FIELD = new StringField("Default", 10);
/**
* 行描述
*/
private TupleDesc tupleDesc;
对应的构造器为:
public StringAggregator(int gbField, Type gbFieldType,
int agField, Op what) {
// 基本校验
if(what != Op.COUNT) {
throw new DbException("string field only support count");
}
this.gbField = gbField;
this.gbFieldType = gbFieldType;
this.agField = agField;
// 确切的说,这里的 op 是固定的。
this.op = what;
this.groupMap = new HashMap<>();
}
mergeTupleIntoGroup
/**
* Merge a new tuple into the aggregate, grouping as indicated in the constructor
* @param tup the Tuple containing an aggregate field and a group-by field
*/
@Override
public void mergeTupleIntoGroup(Tuple tup) {
//构建 desc
if(this.tupleDesc == null) {
//ps: 这里也有一个问题,每次的行内容是否可变,如果不变,则只需要初始化一次。
this.initTupleDesc(tup.getTupleDesc());
}
final Field groupByField = tup.getField(this.gbField);
final Field actualGbField = (this.gbField == NO_GROUPING ? DEFAULT_FIELD : groupByField);
// 分组之后,直接根据符合条件的行累加次数即可。
int times = this.groupMap.getOrDefault(actualGbField, 0) + 1;
this.groupMap.put(actualGbField, times);
}
构造 TupleDesc 的方法如下:
private void initTupleDesc(final TupleDesc tupleDesc) {
// 什么情况下没有分组字段?纯粹是没有调用 group by 吗?
if (this.gbField == NO_GROUPING) {
Type[] types = new Type[]{Type.INT_TYPE};
String[] names = new String[]{tupleDesc.getFieldName(this.gbField)};
this.tupleDesc = new TupleDesc(types, names);
} else {
Type[] types = new Type[]{this.gbFieldType, Type.INT_TYPE};
String[] names = new String[]{tupleDesc.getFieldName(this.gbField), tupleDesc.getFieldName(this.agField)};
this.tupleDesc = new TupleDesc(types, names);
}
}
iterator
@Override
public OpIterator iterator() {
final List<Tuple> tupleList = new ArrayList<>();
// 不做 group by select count(*) from t?
if(gbField == NO_GROUPING) {
final Tuple tuple = new Tuple(this.tupleDesc);
tuple.setField(0, new IntField(this.groupMap.get(DEFAULT_FIELD)));
tupleList.add(tuple);
} else {
// 其实这里有一个问题,对于单字段的 group by,其实 groupMap 中应该也只有一个 key 才对。
// 如果一次调用,group by 还不同,那么原来的 TupleDesc 构建一次就存在问题。
this.groupMap.forEach((key, count)->{
final Tuple tuple = new Tuple(this.tupleDesc);
//key=分组的字段
tuple.setField(0, key);
//对应的个数
tuple.setField(1, new IntField(count));
tupleList.add(tuple);
});
}
return new TupleIterator(this.tupleDesc, tupleList);
}
IntegerAggregator
对于整数类型,我们支持 sum avg min max count
其中 avg = sum / count;
所以我们需要预处理统计 4 个值就可以了:sum count min max
属性
// 分组的 map
private Map<Field, AggInfo> groupMap;
// Group by field
private int gbField;
private Type gbFieldType;
// Aggregation field
private int agField;
// Aggregation operation
private Op op;
/**
* 默认字段
* ps: 这个名称是否会重复?还说 default 是 keyword?
*/
private Field DEFAULT_FIELD = new StringField("Default", 10);
/**
* 行描述
*/
private TupleDesc tupleDesc;
属性其实和字符串的聚合类似,AggInfo 的定义如下:
private static class AggInfo {
private int count = 0;
//ps: sum 其实使用 long 或者 bigint 更加合理,此处暂时忽略
private int sum = 0;
private int min = Integer.MAX_VALUE;
private int max = Integer.MIN_VALUE;
//getter & setter
}
mergeTupleIntoGroup
/**
* Merge a new tuple into the aggregate, grouping as indicated in the constructor
* @param tup the Tuple containing an aggregate field and a group-by field
*/
@Override
public void mergeTupleIntoGroup(Tuple tup) {
//构建 desc
if(this.tupleDesc == null) {
//ps: 这里也有一个问题,每次的行内容是否可变,如果不变,则只需要初始化一次。
this.initTupleDesc(tup.getTupleDesc());
}
final Field groupByField = tup.getField(this.gbField);
final Field actualGbField = (this.gbField == NO_GROUPING ? DEFAULT_FIELD : groupByField);
// 根据 op 处理
final IntField agField = (IntField) tup.getField(this.agField);
final int agFieldVal = agField.getValue();
AggInfo preInfo = this.groupMap.getOrDefault(actualGbField, new AggInfo());
switch (op) {
case MIN:
preInfo.setMin(Math.min(preInfo.getMin(), agFieldVal));
break;
case MAX:
preInfo.setMax(Math.max(preInfo.getMin(), agFieldVal));
break;
case COUNT:
preInfo.setCount(preInfo.getCount()+1);
break;
case SUM:
preInfo.setSum(preInfo.getSum() + agFieldVal);
case AVG:
preInfo.setCount(preInfo.getCount()+1);
preInfo.setSum(preInfo.getSum() + agFieldVal);
break;
}
// 更新
groupMap.put(actualGbField, preInfo);
}
这里就是根据上面几种聚合函数,进行数的基本累加。
initTupleDesc 此处不再赘述。
iterator
迭代和 String 聚合很像,但是需要对结果进行一点处理。
@Override
public OpIterator iterator() {
final List<Tuple> tupleList = new ArrayList<>();
// 不做 group by select count(*) from t?
if(gbField == NO_GROUPING) {
final Tuple tuple = new Tuple(this.tupleDesc);
tuple.setField(0, new IntField(parseIntValue(DEFAULT_FIELD)));
tupleList.add(tuple);
} else {
this.groupMap.forEach((key, count)->{
final Tuple tuple = new Tuple(this.tupleDesc);
tuple.setField(0, key);
tuple.setField(1, new IntField(parseIntValue(key)));
tupleList.add(tuple);
});
}
return new TupleIterator(this.tupleDesc, tupleList);
}
主要就是一个 parseIntValue 方法:
private int parseIntValue(Field aggField) {
// key 是否存在的判断是多余的,本来就是以 key 作为条件循环。
AggInfo aggInfo = groupMap.get(aggField);
switch (op) {
case MIN:
return aggInfo.getMin();
case MAX:
return aggInfo.getMax();
case COUNT:
return aggInfo.getCount();
case SUM:
return aggInfo.getSum();
case AVG:
return aggInfo.getSum() / aggInfo.getCount();
}
return 0;
}
ps: 这里应该进一步的抽象,比如 number 类型,都可以支持对应的操作运算。
Aggregate
说明
上面说到,AVG运算当需要获取聚合结果时,再进行计算返回,那么在哪里会来获取聚合结果呢?
在Aggregate中,因为Aggregate是真正暴露给外部执行SQL语句调用的,Aggregate会根据聚合字段的类型来选择具体的聚合器。
实现
属性
/**
* 聚合字段
*/
private final int agField;
/**
* 分组字段
*/
private final int gbField;
/**
* 分组字段类型
*/
private final Type gbFieldType;
/**
* 聚合操作符
*/
private final Aggregator.Op op;
/**
* 待处理信息
*/
private OpIterator child;
/**
* 行描述
*/
private TupleDesc tupleDesc;
/**
* 行迭代器
*/
private TupleIterator iterator;
对应的构造器如下:
public Aggregate(OpIterator child, int agField, int gbField, Aggregator.Op aop) {
this.agField = agField;
this.gbField = gbField;
this.op = aop;
this.child = child;
TupleDesc originTd = this.child.getTupleDesc();
this.gbFieldType = (this.gbField == Aggregator.NO_GROUPING
? null : originTd.getFieldType(this.gbField));
}
ps: 没什么特别的,就是如果没有分组字段,则类型为 null 而已。
open
public void open() throws NoSuchElementException, DbException, TransactionAbortedException {
super.open();
this.child.open();
TupleDesc originTd = this.child.getTupleDesc();
// Build aggregator
/**
* 聚合实现
*/
Aggregator aggregator;
if (originTd.getFieldType(agField) == Type.INT_TYPE) {
aggregator = new IntegerAggregator(this.gbField, this.gbFieldType, this.agField, this.op);
} else {
aggregator = new StringAggregator(this.gbField, this.gbFieldType, this.agField, this.op);
}
// Merge tuples into group
while (this.child.hasNext()) {
final Tuple tuple = this.child.next();
aggregator.mergeTupleIntoGroup(tuple);
}
this.iterator = (TupleIterator) aggregator.iterator();
this.iterator.open();
}
主要就是根据聚合的字段类型,选定对应的 aggregator 策略。
然后调用 aggregator.mergeTupleIntoGroup(tuple);
方法。
获取对应的 aggregator.iterator();
结果。
getTupleDesc
这个方法也比较简单,和前面类似:
public TupleDesc getTupleDesc() {
if (this.tupleDesc != null) {
return this.tupleDesc;
}
if (this.gbField == Aggregator.NO_GROUPING) {
Type[] types = new Type[]{Type.INT_TYPE};
String[] names = new String[]{this.aggregateFieldName()};
this.tupleDesc = new TupleDesc(types, names);
} else {
Type[] types = new Type[]{this.gbFieldType, Type.INT_TYPE};
String[] names = new String[]{this.groupFieldName(), this.aggregateFieldName()};
this.tupleDesc = new TupleDesc(types, names);
}
return this.tupleDesc;
}
参考资料
https://github.com/CreatorsStack/CreatorDB/blob/master/document/lab2-resolve.md