#

源码应该怎么阅读?

直接根据例子,找到核心的类。

回归入门回顾

我们看一下 csv 的入门例子:

    public static void main(String[] args) throws Exception {
        // 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了
        String path = "D:\\github\\calcite-learn\\calcite-learn-basic\\src\\main\\resources\\csv\\";

        // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等
        CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE);

        // 2.构建Connection
        // 2.1 设置连接参数
        Properties info = new Properties();
        // 不区分sql大小写
        info.setProperty("caseSensitive", "false");

        // 2.2 获取标准的JDBC Connection
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
        // 2.3 获取Calcite封装的Connection
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
        // 以实现查询不同数据源的目的
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema
        rootSchema.add("csv", csvSchema);

        // 5.执行SQL查询,通过SQL方式访问csv文件
        String sql = "select * from csv.depts";
        Statement statement = calciteConnection.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);

        // 6.遍历打印查询结果集
        printResultSet(resultSet);
    }

这里面除却属性配置,最主要的就是 CsvSchema/DriverManager

CsvSchema 源码

基本属性

public class CsvSchema extends AbstractSchema {
    private final File directoryFile;
    private final CsvTable.Flavor flavor;
    private Map<String, Table> tableMap;

AbstractSchema 父类

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.calcite.schema.impl;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;

public class AbstractSchema implements Schema {
    public AbstractSchema() {
    }

    public boolean isMutable() {
        return true;
    }

    public Schema snapshot(SchemaVersion version) {
        return this;
    }

    public Expression getExpression(SchemaPlus parentSchema, String name) {
        return Schemas.subSchemaExpression(parentSchema, name, this.getClass());
    }

    protected Map<String, Table> getTableMap() {
        return ImmutableMap.of();
    }

    public final Set<String> getTableNames() {
        return this.getTableMap().keySet();
    }

    public final Table getTable(String name) {
        return (Table)this.getTableMap().get(name);
    }

    protected Map<String, RelProtoDataType> getTypeMap() {
        return ImmutableMap.of();
    }

    public RelProtoDataType getType(String name) {
        return (RelProtoDataType)this.getTypeMap().get(name);
    }

    public Set<String> getTypeNames() {
        return this.getTypeMap().keySet();
    }

    protected Multimap<String, Function> getFunctionMultimap() {
        return ImmutableMultimap.of();
    }

    public final Collection<Function> getFunctions(String name) {
        return this.getFunctionMultimap().get(name);
    }

    public final Set<String> getFunctionNames() {
        return this.getFunctionMultimap().keySet();
    }

    protected Map<String, Schema> getSubSchemaMap() {
        return ImmutableMap.of();
    }

    public final Set<String> getSubSchemaNames() {
        return this.getSubSchemaMap().keySet();
    }

    public final Schema getSubSchema(String name) {
        return (Schema)this.getSubSchemaMap().get(name);
    }

    public static class Factory implements SchemaFactory {
        public static final Factory INSTANCE = new Factory();

        private Factory() {
        }

        public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
            return new AbstractSchema();
        }
    }
}

构造器

    public CsvSchema(File directoryFile, CsvTable.Flavor flavor) {
        this.directoryFile = directoryFile;
        this.flavor = flavor;
    }

简单方法

    private static String trim(String s, String suffix) {
        String trimmed = trimOrNull(s, suffix);
        return trimmed != null ? trimmed : s;
    }

    private static String trimOrNull(String s, String suffix) {
        return s.endsWith(suffix) ? s.substring(0, s.length() - suffix.length()) : null;
    }

    protected Map<String, Table> getTableMap() {
        if (this.tableMap == null) {
            this.tableMap = this.createTableMap();
        }

        return this.tableMap;
    }

核心方法

    private Map<String, Table> createTableMap() {
        // 文件
        Source baseSource = Sources.of(this.directoryFile);
        File[] files = this.directoryFile.listFiles((dir, name) -> {
            String nameSansGz = trim(name, ".gz");
            return nameSansGz.endsWith(".csv") || nameSansGz.endsWith(".json");
        });
        if (files == null) {
            System.out.println("directory " + this.directoryFile + " not found");
            files = new File[0];
        }


        ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
        File[] var4 = files;
        int var5 = files.length;

        for(int var6 = 0; var6 < var5; ++var6) {
            File file = var4[var6];
            Source source = Sources.of(file);
            Source sourceSansGz = source.trim(".gz");
            Source sourceSansJson = sourceSansGz.trimOrNull(".json");
            if (sourceSansJson != null) {
                JsonTable table = new JsonTable(source);
                builder.put(sourceSansJson.relative(baseSource).path(), table);
            } else {
                Source sourceSansCsv = sourceSansGz.trim(".csv");
                Table table = this.createTable(source);
                builder.put(sourceSansCsv.relative(baseSource).path(), table);
            }
        }

        return builder.build();
    }

this.createTable(source) 方法如下:

    private Table createTable(Source source) {
        switch (this.flavor) {
            case TRANSLATABLE:
                return new CsvTranslatableTable(source, (RelProtoDataType)null);
            case SCANNABLE:
                return new CsvScannableTable(source, (RelProtoDataType)null);
            case FILTERABLE:
                return new CsvFilterableTable(source, (RelProtoDataType)null);
            default:
                throw new AssertionError("Unknown flavor " + this.flavor);
        }
    }

这里涉及到如何创建一张表,我们选择 SCANNABLE,对应的实现如下;

public class CsvScannableTable extends CsvTable implements ScannableTable {
    CsvScannableTable(Source source, RelProtoDataType protoRowType) {
        super(source, protoRowType);
    }

    public String toString() {
        return "CsvScannableTable";
    }

    public Enumerable<Object[]> scan(DataContext root) {
        final int[] fields = CsvEnumerator.identityList(this.fieldTypes.size());
        final AtomicBoolean cancelFlag = (AtomicBoolean)Variable.CANCEL_FLAG.get(root);
        return new AbstractEnumerable<Object[]>() {
            public Enumerator<Object[]> enumerator() {
                return new CsvEnumerator(CsvScannableTable.this.source, cancelFlag, false, (String[])null, new CsvEnumerator.ArrayRowConverter(CsvScannableTable.this.fieldTypes, fields));
            }
        };
    }
}

CsvEnumerator

package org.apache.calcite.adapter.csv;

import au.com.bytecode.opencsv.CSVReader;
import java.io.IOException;
import java.io.Reader;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Source;
import org.apache.commons.lang3.time.FastDateFormat;

class CsvEnumerator<E> implements Enumerator<E> {
    // 内部属性
    private final CSVReader reader;
    private final String[] filterValues;
    private final AtomicBoolean cancelFlag;
    private final RowConverter<E> rowConverter;
    private E current;
    private static final FastDateFormat TIME_FORMAT_DATE;
    private static final FastDateFormat TIME_FORMAT_TIME;
    private static final FastDateFormat TIME_FORMAT_TIMESTAMP;

    // 构造器
    CsvEnumerator(Source source, AtomicBoolean cancelFlag, List<CsvFieldType> fieldTypes) {
        this(source, cancelFlag, fieldTypes, identityList(fieldTypes.size()));
    }
    CsvEnumerator(Source source, AtomicBoolean cancelFlag, List<CsvFieldType> fieldTypes, int[] fields) {
        this(source, cancelFlag, false, (String[])null, converter(fieldTypes, fields));
    }
    CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream, String[] filterValues, RowConverter<E> rowConverter) {
        this.cancelFlag = cancelFlag;
        this.rowConverter = rowConverter;
        this.filterValues = filterValues;

        try {
            if (stream) {
                this.reader = new CsvStreamReader(source);
            } else {
                this.reader = openCsv(source);
            }

            this.reader.readNext();
        } catch (IOException var7) {
            throw new RuntimeException(var7);
        }
    }


    // 转换方法
    private static RowConverter<?> converter(List<CsvFieldType> fieldTypes, int[] fields) {
        if (fields.length == 1) {
            int field = fields[0];
            return new SingleColumnRowConverter((CsvFieldType)fieldTypes.get(field), field);
        } else {
            return new ArrayRowConverter(fieldTypes, fields);
        }
    }

    // 行类别推断---------------------------------------------------------------------------------------------------
    static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source, List<CsvFieldType> fieldTypes) {
        return deduceRowType(typeFactory, source, fieldTypes, false);
    }
    static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source, List<CsvFieldType> fieldTypes, Boolean stream) {
        List<RelDataType> types = new ArrayList();
        List<String> names = new ArrayList();
        if (stream) {
            names.add("ROWTIME");
            types.add(typeFactory.createSqlType(SqlTypeName.TIMESTAMP));
        }

        try {
            CSVReader reader = openCsv(source);
            Throwable var7 = null;

            try {
                String[] strings = reader.readNext();
                if (strings == null) {
                    strings = new String[]{"EmptyFileHasNoColumns:boolean"};
                }

                String[] var9 = strings;
                int var10 = strings.length;

                for(int var11 = 0; var11 < var10; ++var11) {
                    String string = var9[var11];
                    int colon = string.indexOf(58);
                    String name;
                    CsvFieldType fieldType;
                    if (colon >= 0) {
                        name = string.substring(0, colon);
                        String typeString = string.substring(colon + 1);
                        fieldType = CsvFieldType.of(typeString);
                        if (fieldType == null) {
                            System.out.println("WARNING: Found unknown type: " + typeString + " in file: " + source.path() + " for column: " + name + ". Will assume the type of column is string");
                        }
                    } else {
                        name = string;
                        fieldType = null;
                    }

                    RelDataType type;
                    if (fieldType == null) {
                        type = typeFactory.createSqlType(SqlTypeName.VARCHAR);
                    } else {
                        type = fieldType.toType(typeFactory);
                    }

                    names.add(name);
                    types.add(type);
                    if (fieldTypes != null) {
                        fieldTypes.add(fieldType);
                    }
                }
            } catch (Throwable var25) {
                var7 = var25;
                throw var25;
            } finally {
                if (reader != null) {
                    if (var7 != null) {
                        try {
                            reader.close();
                        } catch (Throwable var24) {
                            var7.addSuppressed(var24);
                        }
                    } else {
                        reader.close();
                    }
                }

            }
        } catch (IOException var27) {
        }

        if (names.isEmpty()) {
            names.add("line");
            types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
        }

        return typeFactory.createStructType(Pair.zip(names, types));
    }

    public static CSVReader openCsv(Source source) throws IOException {
        Reader fileReader = source.reader();
        return new CSVReader(fileReader);
    }

    public E current() {
        return this.current;
    }

    public boolean moveNext() {
        try {
            label46:
            while(!this.cancelFlag.get()) {
                String[] strings = this.reader.readNext();
                if (strings == null) {
                    if (!(this.reader instanceof CsvStreamReader)) {
                        this.current = null;
                        this.reader.close();
                        return false;
                    }

                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException var4) {
                        throw new RuntimeException(var4);
                    }
                } else {
                    if (this.filterValues != null) {
                        for(int i = 0; i < strings.length; ++i) {
                            String filterValue = this.filterValues[i];
                            if (filterValue != null && !filterValue.equals(strings[i])) {
                                continue label46;
                            }
                        }
                    }

                    this.current = this.rowConverter.convertRow(strings);
                    return true;
                }
            }

            return false;
        } catch (IOException var5) {
            throw new RuntimeException(var5);
        }
    }

    public void reset() {
        throw new UnsupportedOperationException();
    }

    public void close() {
        try {
            this.reader.close();
        } catch (IOException var2) {
            throw new RuntimeException("Error closing CSV reader", var2);
        }
    }

    static int[] identityList(int n) {
        int[] integers = new int[n];

        for(int i = 0; i < n; integers[i] = i++) {
        }

        return integers;
    }

    static {
        TimeZone gmt = TimeZone.getTimeZone("GMT");
        TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
        TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
        TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
    }

    private static class SingleColumnRowConverter extends RowConverter {
        private final CsvFieldType fieldType;
        private final int fieldIndex;

        private SingleColumnRowConverter(CsvFieldType fieldType, int fieldIndex) {
            this.fieldType = fieldType;
            this.fieldIndex = fieldIndex;
        }

        public Object convertRow(String[] strings) {
            return this.convert(this.fieldType, strings[this.fieldIndex]);
        }
    }

    static class ArrayRowConverter extends RowConverter<Object[]> {
        private final CsvFieldType[] fieldTypes;
        private final int[] fields;
        private final boolean stream;

        ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) {
            this.fieldTypes = (CsvFieldType[])fieldTypes.toArray(new CsvFieldType[0]);
            this.fields = fields;
            this.stream = false;
        }

        ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields, boolean stream) {
            this.fieldTypes = (CsvFieldType[])fieldTypes.toArray(new CsvFieldType[0]);
            this.fields = fields;
            this.stream = stream;
        }

        public Object[] convertRow(String[] strings) {
            return this.stream ? this.convertStreamRow(strings) : this.convertNormalRow(strings);
        }

        public Object[] convertNormalRow(String[] strings) {
            Object[] objects = new Object[this.fields.length];

            for(int i = 0; i < this.fields.length; ++i) {
                int field = this.fields[i];
                objects[i] = this.convert(this.fieldTypes[field], strings[field]);
            }

            return objects;
        }

        public Object[] convertStreamRow(String[] strings) {
            Object[] objects = new Object[this.fields.length + 1];
            objects[0] = System.currentTimeMillis();

            for(int i = 0; i < this.fields.length; ++i) {
                int field = this.fields[i];
                objects[i + 1] = this.convert(this.fieldTypes[field], strings[field]);
            }

            return objects;
        }
    }

    abstract static class RowConverter<E> {
        RowConverter() {
        }

        abstract E convertRow(String[] var1);

        // 类别转换
        protected Object convert(CsvFieldType fieldType, String string) {
            if (fieldType == null) {
                return string;
            } else {
                Date date;
                switch (fieldType) {
                    case BOOLEAN:
                        if (string.length() == 0) {
                            return null;
                        }

                        return Boolean.parseBoolean(string);
                    case BYTE:
                        if (string.length() == 0) {
                            return null;
                        }

                        return Byte.parseByte(string);
                    case SHORT:
                        if (string.length() == 0) {
                            return null;
                        }

                        return Short.parseShort(string);
                    case INT:
                        if (string.length() == 0) {
                            return null;
                        }

                        return Integer.parseInt(string);
                    case LONG:
                        if (string.length() == 0) {
                            return null;
                        }

                        return Long.parseLong(string);
                    case FLOAT:
                        if (string.length() == 0) {
                            return null;
                        }

                        return Float.parseFloat(string);
                    case DOUBLE:
                        if (string.length() == 0) {
                            return null;
                        }

                        return Double.parseDouble(string);
                    case DATE:
                        if (string.length() == 0) {
                            return null;
                        } else {
                            try {
                                date = CsvEnumerator.TIME_FORMAT_DATE.parse(string);
                                return (int)(date.getTime() / 86400000L);
                            } catch (ParseException var6) {
                                return null;
                            }
                        }
                    case TIME:
                        if (string.length() == 0) {
                            return null;
                        } else {
                            try {
                                date = CsvEnumerator.TIME_FORMAT_TIME.parse(string);
                                return (int)date.getTime();
                            } catch (ParseException var5) {
                                return null;
                            }
                        }
                    case TIMESTAMP:
                        if (string.length() == 0) {
                            return null;
                        } else {
                            try {
                                date = CsvEnumerator.TIME_FORMAT_TIMESTAMP.parse(string);
                                return date.getTime();
                            } catch (ParseException var4) {
                                return null;
                            }
                        }
                    case STRING:
                    default:
                        return string;
                }
            }
        }
    }
}

参考资料

https://www.lixin.help/2021/04/11/Calcite-Driver-Register.html