思路

直接参考 calcite csv 模块的代码,实现一个最简单版本的 csv database.

csv 官方的使用方法

我们先学习下,原来的官方 csv 如何使用的。

package com.github.houbb.calcite.learn.basic;

import org.apache.calcite.adapter.csv.CsvSchema;
import org.apache.calcite.adapter.csv.CsvTable;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;

import java.io.File;
import java.sql.*;
import java.util.Properties;

public class CsvDemo {

    public static void main(String[] args) throws Exception {
        // 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了
        String path = "D:\\github\\calcite-learn\\calcite-learn-basic\\src\\main\\resources\\csv\\";

        // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等
        CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE);

        // 2.构建Connection
        // 2.1 设置连接参数
        Properties info = new Properties();
        // 不区分sql大小写
        info.setProperty("caseSensitive", "false");
        // 2.2 获取标准的JDBC Connection
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
        // 2.3 获取Calcite封装的Connection
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
        // 以实现查询不同数据源的目的
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema
        rootSchema.add("csv", csvSchema);

        // 5.执行SQL查询,通过SQL方式访问csv文件
        String sql = "select * from csv.depts";
        Statement statement = calciteConnection.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);

        // 6.遍历打印查询结果集
        printResultSet(resultSet);
    }

    public static void printResultSet(ResultSet resultSet) throws SQLException {
        // 结果输出
    }

}

核心实现

CsvSchema 对应 csv 的数据库实现,主要入口在这里。

我们学习一下如何实现。

maven 依赖

我们先引入核心依赖。

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.36.0</version>
</dependency>

CsvSchema-数据库

我们先模仿这个,实现一下 csv 的 schema

当然,这里还有一个 CsvSchemaFactory 属于一个工厂类,我们先忽略。

直接遍历 dir,针对每一个文件构建出对应的表。

package com.github.houbb.calcite.adaptor.csv;

import com.github.houbb.heaven.util.io.FileUtil;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

/**
 * csv 数据库
 *
 * @since 0.1.0
 */
public class CsvSchema extends AbstractSchema {

    /**
     * csv 文件夹目录
     */
    private final File dir;

    private final String charset;
    private final Map<String, Table> innerTableMap = new HashMap<>();

    public CsvSchema(File dir, String charset) {
        this.dir = dir;
        this.charset = charset;
        this.initTableMap();
    }

    private void initTableMap() {
        File[] files = dir.listFiles();
        for(File file : files) {
            String fileName = file.getName();
            if(fileName.endsWith(".csv")) {
                // 构建最简单的 scan table
                AbstractTable abstractTable = new CsvScannableTable(file, charset);

                innerTableMap.put(FileUtil.getFileName(fileName), abstractTable);
            }
        }
    }

    @Override
    protected Map<String, Table> getTableMap() {
        // 这里需要遍历其中de
        return innerTableMap;
    }

}

CsvScannableTable-简单的全表扫描表实现

这里我们为了简单,暂时直接把所有的列都定为字符串类别。

package com.github.houbb.calcite.adaptor.csv;

import com.github.houbb.heaven.util.io.FileUtil;
import com.github.houbb.heaven.util.lang.StringUtil;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;

import java.io.File;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * csv 数据库
 *
 * @since 0.1.0
 */
public class CsvScannableTable extends AbstractTable implements ScannableTable  {

    /**
     * csv 文件
     */
    private final File csvFile;

    private final String charset;

    public CsvScannableTable(File csvFile, String charset) {
        this.csvFile = csvFile;
        this.charset = charset;
    }

    // 行类别
    @Override
    public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
        // 这里需要处理各种类别
        final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) relDataTypeFactory;

        // fieldName 哪里来?直接解析第一行
        List<String> allLines = FileUtil.readAllLines(csvFile, charset, 0, 2, true);
        String headLine = allLines.get(0);

        // 所有的列名称 csv 第一行获取
        final List<String> fieldNameList = StringUtil.splitToList(headLine, ",");

        // 可以根据不同的内容处理,简单起见,全部设置为 varchar? 后续可以细化为不同的类别
//        String firstDataLine = allLines.get(1);
        final List<RelDataType> fieldTypeList = fieldNameList.stream()
                .map(new Function<String, RelDataType>() {
                    @Override
                    public RelDataType apply(String s) {
                        return javaTypeFactory.createSqlType(SqlTypeName.VARCHAR);
                    }
                })
                .collect(Collectors.toList());

        // 结果
        return javaTypeFactory.createStructType(Pair.zip(fieldNameList, fieldTypeList));
    }

    // 可以和 row 优化到一起,暂时不动
    private List<RelDataType> getFieldTypes(final JavaTypeFactory javaTypeFactory) {
        List<String> allLines = FileUtil.readAllLines(csvFile, charset, 0, 2, true);
        String headLine = allLines.get(0);

        // 所有的列名称 csv 第一行获取
        final List<String> fieldNameList = StringUtil.splitToList(headLine, ",");

        // 可以根据不同的内容处理,简单起见,全部设置为 varchar? 后续可以细化为不同的类别
        final List<RelDataType> fieldTypeList = fieldNameList.stream()
                .map(new Function<String, RelDataType>() {
                    @Override
                    public RelDataType apply(String s) {
                        return javaTypeFactory.createSqlType(SqlTypeName.VARCHAR);
                    }
                })
                .collect(Collectors.toList());

        return fieldTypeList;
    }

    // 如何 scan 一个表的数据?
    @Override
    public Enumerable<Object[]> scan(DataContext root) {
        JavaTypeFactory typeFactory = root.getTypeFactory();
        final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
        final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());

        // 这个做什么的?
        final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);

        // 构建一个遍历结果
        List<String> dataLines = FileUtil.readAllLines(csvFile, charset, 1, Integer.MAX_VALUE, false);

        return new CsvEnumerable(dataLines, fieldTypes);
    }

}

CsvEnumerable-行信息的遍历实现

处理好了行的列字段,那么如何遍历每一行数据呢?

package com.github.houbb.calcite.adaptor.csv;

import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;

import java.util.List;

/**
 * csv 数据库
 *
 * @since 0.1.0
 */
public class CsvEnumerable extends AbstractEnumerable<Object[]> {

    /**
     * csv 文件
     */
    private final List<String> dataLines;

    private final List<RelDataType> fieldTypes;

    private int index = 0;

    public CsvEnumerable(List<String> allLines, List<RelDataType> fieldTypes) {
        this.dataLines = allLines;
        this.fieldTypes = fieldTypes;
        index = 0;
    }

    private Object[] buildRowDataArray(String rowLine) {
        Object[] dataArray = new Object[fieldTypes.size()];

        // 逗号拆分
        String[] datas = rowLine.split(",");
        int index = 0;
        for(String data : datas) {
            // 根据类别转换处理,此处省略。
            dataArray[index++] = data;
        }
        return dataArray;
    }

    @Override
    public Enumerator<Object[]> enumerator() {
        return new Enumerator<Object[]>() {
            @Override
            public Object[] current() {
                // 当前
                String line = dataLines.get(0);
                
                return buildRowDataArray(line);
            }

            @Override
            public boolean moveNext() {
                index++;
                return index < dataLines.size();
            }

            @Override
            public void reset() {
                index = 0;
            }

            @Override
            public void close() {
                // 这个是什么概念?
                // 我们一次读取完成的,暂时不需要考虑。
                // 如果是文件流,这里需要做流的关闭
            }
        };
    }

}

到这里,完整的 csv 的数据库查询就已经完成了。

实际测试

maven 依赖

<dependency>
    <groupId>com.github.houbb</groupId>
    <artifactId>calcite-adaptor-csv</artifactId>
    <version>0.1.0</version>
</dependency>

测试 csv 文件

  • depts.csv
EMPNO:long,NAME:string,DEPTNO:int,GENDER:string,CITY:string,EMPID:int,AGE:int,SLACKER:boolean,MANAGER:boolean,JOINEDAT:date
100,"Fred",10,,,30,25,true,false,"1996-08-03"
110,"Eric",20,"M","San Francisco",3,80,,false,"2001-01-01"
110,"John",40,"M","Vancouver",2,,false,true,"2002-05-03"
120,"Wilma",20,"F",,1,5,,true,"2005-09-07"
130,"Alice",40,"F","Vancouver",2,,false,true,"2007-01-01"

测试代码

基本和原来一行,唯一的区别是变成了我们自己定义的实现。

package com.github.houbb.calcite.adaptor.csv;

import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;

import java.io.File;
import java.sql.*;
import java.util.Properties;

public class MyCsvDemo {

    public static void main(String[] args) throws Exception {
        // 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了
        String path = "D:\\code\\github\\calcite-adaptor\\calcite-adaptor-test\\src\\main\\resources\\csv\\";

        // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等
        CsvSchema csvSchema = new CsvSchema(new File(path), "UTF-8");

        // 2.构建Connection
        // 2.1 设置连接参数
        Properties info = new Properties();
        // 不区分sql大小写
        info.setProperty("caseSensitive", "false");
        // 2.2 获取标准的JDBC Connection
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
        // 2.3 获取Calcite封装的Connection
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
        // 以实现查询不同数据源的目的
        SchemaPlus rootSchema = calciteConnection.getRootSchema();

        // 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema
        rootSchema.add("csv", csvSchema);

        // 5.执行SQL查询,通过SQL方式访问csv文件
        String sql = "select * from csv.depts";
        Statement statement = calciteConnection.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);

        // 6.遍历打印查询结果集
        printResultSet(resultSet);
    }

    public static void printResultSet(ResultSet resultSet) throws SQLException {
        // 获取 ResultSet 元数据
        ResultSetMetaData metaData = resultSet.getMetaData();

        // 获取列数
        int columnCount = metaData.getColumnCount();
        System.out.println("Number of columns: " + columnCount);

        // 遍历 ResultSet 并打印结果
        while (resultSet.next()) {
            // 遍历每一列并打印
            for (int i = 1; i <= columnCount; i++) {
                String columnName = metaData.getColumnName(i);
                String columnValue = resultSet.getString(i);
                System.out.println(columnName + ": " + columnValue);
            }
            System.out.println(); // 换行
        }
    }

}

测试效果:

Number of columns: 10
EMPNO:long: 100
NAME:string: "Fred"
DEPTNO:int: 10
GENDER:string: 
CITY:string: 
EMPID:int: 30
AGE:int: 25
SLACKER:boolean: true
MANAGER:boolean: false
JOINEDAT:date: "1996-08-03"

EMPNO:long: 100
NAME:string: "Fred"
DEPTNO:int: 10
GENDER:string: 
CITY:string: 
EMPID:int: 30
AGE:int: 25
SLACKER:boolean: true
MANAGER:boolean: false
JOINEDAT:date: "1996-08-03"

EMPNO:long: 100
NAME:string: "Fred"
DEPTNO:int: 10
GENDER:string: 
CITY:string: 
EMPID:int: 30
AGE:int: 25
SLACKER:boolean: true
MANAGER:boolean: false
JOINEDAT:date: "1996-08-03"

EMPNO:long: 100
NAME:string: "Fred"
DEPTNO:int: 10
GENDER:string: 
CITY:string: 
EMPID:int: 30
AGE:int: 25
SLACKER:boolean: true
MANAGER:boolean: false
JOINEDAT:date: "1996-08-03"

小结

整体而言,calcite 封装的非常强大,我们自己实现也并不困难。

这种的意义是非常大的,现在有各种数据库/文件之类的,学习成本比较高。

比如 redis/kafka/mongodb/es,每一种语言过一段时间不用就会忘记,还是 SQL 用的最多。

下一节,我们可以一起学习一下如何实现一个支持条件过滤的 csv 表。

参考资料

10分钟教你写一个数据库