概述

简单看一下 ES 的查询是如何实现的

ElasticsearchSchema

基本属性

  [java]
1
2
3
4
5
6
public class ElasticsearchSchema extends AbstractSchema { //es client private final RestClient client; private final ObjectMapper mapper; private final Map<String, Table> tableMap; private final int fetchSize;

构造器

基本的创建

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index) { this(client, mapper, index, 5196); } @VisibleForTesting ElasticsearchSchema(RestClient client, ObjectMapper mapper, String index, int fetchSize) { this.client = (RestClient)Objects.requireNonNull(client, "client"); this.mapper = (ObjectMapper)Objects.requireNonNull(mapper, "mapper"); Preconditions.checkArgument(fetchSize > 0, "invalid fetch size. Expected %s > 0", new Object[]{fetchSize}); this.fetchSize = fetchSize; if (index == null) { try { this.tableMap = this.createTables(this.indicesFromElastic()); } catch (IOException var6) { throw new UncheckedIOException("Couldn't get indices", var6); } } else { this.tableMap = this.createTables(Collections.singleton(index)); } }

方法

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
protected Map<String, Table> getTableMap() { return this.tableMap; } private Map<String, Table> createTables(Iterable<String> indices) { ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); Iterator var3 = indices.iterator(); while(var3.hasNext()) { String index = (String)var3.next(); ElasticsearchTransport transport = new ElasticsearchTransport(this.client, this.mapper, index, this.fetchSize); // 创建所有的 ES index => table builder.put(index, new ElasticsearchTable(transport)); } return builder.build(); } //向ES服务器,发送GET请求,获得所有的索引库名称 private Set<String> indicesFromElastic() throws IOException { String endpoint = "/_alias"; Response response = this.client.performRequest(new Request("GET", "/_alias")); InputStream is = response.getEntity().getContent(); Throwable var4 = null; HashSet var7; try { JsonNode root = this.mapper.readTree(is); if (!root.isObject() || root.size() <= 0) { String message = String.format(Locale.ROOT, "Invalid response for %s/%s Expected object of at least size 1 got %s (of size %d)", response.getHost(), response.getRequestLine(), root.getNodeType(), root.size()); throw new IllegalStateException(message); } //获得所有的索引库名称 Set<String> indices = Sets.newHashSet(root.fieldNames()); var7 = indices; } catch (Throwable var16) { var4 = var16; throw var16; } finally { if (is != null) { if (var4 != null) { try { is.close(); } catch (Throwable var15) { var4.addSuppressed(var15); } } else { is.close(); } } } return var7; }

代码写了一堆,实际事情非常简单。

发送请求到 ES,然后根据 index 构建对应的 table 信息。

创建表的核心逻辑是下面:

  [java]
1
2
3
ElasticsearchTransport transport = new ElasticsearchTransport(this.client, this.mapper, index, this.fetchSize); // 创建所有的 ES index => table builder.put(index, new ElasticsearchTable(transport));

ElasticsearchTransport

基本属性

  [java]
1
2
3
4
5
6
7
8
9
10
11
final class ElasticsearchTransport { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchTable.class); // 默认 fetch size static final int DEFAULT_FETCH_SIZE = 5196; private final ObjectMapper mapper; private final RestClient restClient; final String indexName; final ElasticsearchVersion version; final ElasticsearchMapping mapping; final int fetchSize;

构造器

  [java]
1
2
3
4
5
6
7
8
ElasticsearchTransport(RestClient restClient, ObjectMapper mapper, String indexName, int fetchSize) { this.mapper = (ObjectMapper)Objects.requireNonNull(mapper, "mapper"); this.restClient = (RestClient)Objects.requireNonNull(restClient, "restClient"); this.indexName = (String)Objects.requireNonNull(indexName, "indexName"); this.fetchSize = fetchSize; this.version = this.version(); this.mapping = this.fetchAndCreateMapping(); }

search 查询

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Function<ObjectNode, ElasticsearchJson.Result> search() { return this.search(Collections.emptyMap()); } Function<ObjectNode, ElasticsearchJson.Result> search(Map<String, String> httpParams) { Objects.requireNonNull(httpParams, "httpParams"); return (query) -> { Hook.QUERY_PLAN.run(query); String path = String.format(Locale.ROOT, "/%s/_search", this.indexName); HttpPost post; try { URIBuilder builder = new URIBuilder(path); httpParams.forEach(builder::addParameter); // 这里实际上是封装了 HTTP 请求 post = new HttpPost(builder.build()); String json = this.mapper.writeValueAsString(query); LOGGER.debug("Elasticsearch Query: {}", json); post.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON)); } catch (URISyntaxException var7) { throw new RuntimeException(var7); } catch (JsonProcessingException var8) { throw new UncheckedIOException(var8); } return (ElasticsearchJson.Result)this.rawHttp(ElasticsearchJson.Result.class).apply(post); }; }

http 封装那一步,就是帮 SQL 转换为 ES 的查询语句:

  [plaintext]
1
2
3
4
5
6
7
8
9
10
// 把所有的请求参数,转换成:json字符串 // SELECT _MAP['id'],_MAP['title'],_MAP['price'] FROM es.books WHERE _MAP['price'] > 60 LIMIT 2 // {"query":{"constant_score":{"filter":{"range":{"price":{"gt":60}}}}},"_source":["id","title","price"],"size":2} // SELECT * FROM es.books WHERE _MAP['price'] > 10 offset 0 fetch next 10 rows only // {"query":{"constant_score":{"filter":{"range":{"price":{"gt":10}}}}},"from":0,"size":10} // 统计SQL // SELECT count(*) FROM es.books WHERE _MAP['price'] > 50 // {"query":{"constant_score":{"filter":{"range":{"price":{"gt":50}}}}},"_source":false,"size":0,"stored_fields":"_none_","track_total_hits":true}

小结

这里个人只有 2 步:

1) SQL 转换为标准的 ES

2) ES 通过 http 请求,获得响应

参考资料

https://www.lixin.help/2021/04/11/Calcite-SQL-ES-Source.html