概述

简单看一下 ES 的查询是如何实现的

ElasticsearchSchema

基本属性

public class ElasticsearchSchema extends AbstractSchema {
	//es client
    private final RestClient client;
    private final ObjectMapper mapper;
    private final Map<String, Table> tableMap;
    private final int fetchSize;

构造器

基本的创建

	public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index) {
        this(client, mapper, index, 5196);
    }

    @VisibleForTesting
    ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index, int fetchSize) {
        this.client = (RestClient)Objects.requireNonNull(client, "client");
        this.mapper = (ObjectMapper)Objects.requireNonNull(mapper, "mapper");
        Preconditions.checkArgument(fetchSize > 0, "invalid fetch size. Expected %s > 0", new Object[]{fetchSize});
        this.fetchSize = fetchSize;
        if (index == null) {
            try {
                this.tableMap = this.createTables(this.indicesFromElastic());
            } catch (IOException var6) {
                throw new UncheckedIOException("Couldn't get indices", var6);
            }
        } else {
            this.tableMap = this.createTables(Collections.singleton(index));
        }

    }

方法

	protected Map<String, Table> getTableMap() {
        return this.tableMap;
    }

    private Map<String, Table> createTables(Iterable<String> indices) {
        ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
        Iterator var3 = indices.iterator();

        while(var3.hasNext()) {
            String index = (String)var3.next();
            ElasticsearchTransport transport = new ElasticsearchTransport(this.client, this.mapper, index, this.fetchSize);
			// 创建所有的 ES index => table
            builder.put(index, new ElasticsearchTable(transport));
        }

        return builder.build();
    }

	//向ES服务器,发送GET请求,获得所有的索引库名称
    private Set<String> indicesFromElastic() throws IOException {
        String endpoint = "/_alias";
        Response response = this.client.performRequest(new Request("GET", "/_alias"));
        InputStream is = response.getEntity().getContent();
        Throwable var4 = null;

        HashSet var7;
        try {
            JsonNode root = this.mapper.readTree(is);
            if (!root.isObject() || root.size() <= 0) {
                String message = String.format(Locale.ROOT, "Invalid response for %s/%s Expected object of at least size 1 got %s (of size %d)", response.getHost(), response.getRequestLine(), root.getNodeType(), root.size());
                throw new IllegalStateException(message);
            }

			//获得所有的索引库名称
            Set<String> indices = Sets.newHashSet(root.fieldNames());
            var7 = indices;
        } catch (Throwable var16) {
            var4 = var16;
            throw var16;
        } finally {
            if (is != null) {
                if (var4 != null) {
                    try {
                        is.close();
                    } catch (Throwable var15) {
                        var4.addSuppressed(var15);
                    }
                } else {
                    is.close();
                }
            }

        }

        return var7;
    }

代码写了一堆,实际事情非常简单。

发送请求到 ES,然后根据 index 构建对应的 table 信息。

创建表的核心逻辑是下面:

ElasticsearchTransport transport = new ElasticsearchTransport(this.client, this.mapper, index, this.fetchSize);
// 创建所有的 ES index => table
builder.put(index, new ElasticsearchTable(transport));

ElasticsearchTransport

基本属性

final class ElasticsearchTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchTable.class);

	// 默认 fetch size
    static final int DEFAULT_FETCH_SIZE = 5196;
    private final ObjectMapper mapper;
    private final RestClient restClient;
    final String indexName;
    final ElasticsearchVersion version;
    final ElasticsearchMapping mapping;
    final int fetchSize;

构造器

	ElasticsearchTransport(RestClient restClient, ObjectMapper mapper, String indexName, int fetchSize) {
        this.mapper = (ObjectMapper)Objects.requireNonNull(mapper, "mapper");
        this.restClient = (RestClient)Objects.requireNonNull(restClient, "restClient");
        this.indexName = (String)Objects.requireNonNull(indexName, "indexName");
        this.fetchSize = fetchSize;
        this.version = this.version();
        this.mapping = this.fetchAndCreateMapping();
    }

search 查询

	Function<ObjectNode, ElasticsearchJson.Result> search() {
        return this.search(Collections.emptyMap());
    }

    Function<ObjectNode, ElasticsearchJson.Result> search(Map<String, String> httpParams) {
        Objects.requireNonNull(httpParams, "httpParams");
        return (query) -> {
            Hook.QUERY_PLAN.run(query);
            String path = String.format(Locale.ROOT, "/%s/_search", this.indexName);

            HttpPost post;
            try {
                URIBuilder builder = new URIBuilder(path);
                httpParams.forEach(builder::addParameter);
				// 这里实际上是封装了 HTTP 请求
                post = new HttpPost(builder.build());

                String json = this.mapper.writeValueAsString(query);
                LOGGER.debug("Elasticsearch Query: {}", json);
                post.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
            } catch (URISyntaxException var7) {
                throw new RuntimeException(var7);
            } catch (JsonProcessingException var8) {
                throw new UncheckedIOException(var8);
            }

            return (ElasticsearchJson.Result)this.rawHttp(ElasticsearchJson.Result.class).apply(post);
        };
    }

http 封装那一步,就是帮 SQL 转换为 ES 的查询语句:

// 把所有的请求参数,转换成:json字符串
// SELECT _MAP['id'],_MAP['title'],_MAP['price'] FROM es.books WHERE _MAP['price'] > 60 LIMIT 2
// {"query":{"constant_score":{"filter":{"range":{"price":{"gt":60}}}}},"_source":["id","title","price"],"size":2}

// SELECT * FROM es.books WHERE _MAP['price'] > 10 offset 0 fetch next 10 rows only
// {"query":{"constant_score":{"filter":{"range":{"price":{"gt":10}}}}},"from":0,"size":10}

// 统计SQL
// SELECT count(*) FROM es.books WHERE _MAP['price'] > 50
// {"query":{"constant_score":{"filter":{"range":{"price":{"gt":50}}}}},"_source":false,"size":0,"stored_fields":"_none_","track_total_hits":true}

小结

这里个人只有 2 步:

1) SQL 转换为标准的 ES

2) ES 通过 http 请求,获得响应

参考资料

https://www.lixin.help/2021/04/11/Calcite-SQL-ES-Source.html