Elasticsearch

https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Elasticsearch

Elasticsearch 源连接器

描述

用于从 Elasticsearch 中读取数据。

支持版本 >= 2.x 和 <= 8.x。

关键特性

√ 批处理(batch)

  • 流处理(stream)
  • 精确一次(exactly-once) √ 列投影(column projection)
  • 并行性(parallelism)
  • 支持用户定义的拆分(user-defined split)

选项

名称 类型 必需 默认值
hosts 数组 -
username 字符串 -
password 字符串 -
index 字符串 -
source 数组 -
query JSON {“match_all”: {}}
scroll_time 字符串 1m
scroll_size 整数 100
schema - -
tls_verify_certificate 布尔值 true
tls_verify_hostnames 布尔值 true
tls_keystore_path 字符串 -
tls_keystore_password 字符串 -
tls_truststore_path 字符串 -
tls_truststore_password 字符串 -
common-options - -
hosts [array] - - -

hosts [数组] Elasticsearch 集群的 HTTP 地址,格式为 host:port,允许指定多个主机。例如 [“host1:9200”, “host2:9200”]。

username [字符串] x-pack 用户名。

password [字符串] x-pack 密码。

index [字符串] Elasticsearch 索引名称,支持 * 模糊匹配。

source [数组] 索引的字段。您可以通过指定字段 _id 获取文档 ID。如果将 _id 传送到其他索引,由于 Elasticsearch 的限制,您需要为 _id 指定别名。如果不配置 source,则必须配置 schema。

query [json] Elasticsearch DSL。您可以通过它来控制所读取数据的范围。

scroll_time [字符串] Elasticsearch 将保持滚动请求的搜索上下文的时间量。

scroll_size [整数] 每个 Elasticsearch 滚动请求返回的最大命中数。

schema 数据的结构,包括字段名称和字段类型。如果不配置 schema,则必须配置 source。

tls_verify_certificate [布尔值] 为 HTTPS 端点启用证书验证。

tls_verify_hostname [布尔值] 为 HTTPS 端点启用主机名验证。

tls_keystore_path [字符串] PEM 或 JKS 密钥库的路径。该文件必须可读取,由运行 SeaTunnel 的操作系统用户拥有。

tls_keystore_password [字符串] 指定密钥库的密钥密码。

tls_truststore_path [字符串] PEM 或 JKS 信任库的路径。该文件必须可读取,由运行 SeaTunnel 的操作系统用户拥有。

tls_truststore_password [字符串] 指定信任库的密钥密码。

Examples

simple

  [plaintext]
1
2
3
4
5
6
Elasticsearch { hosts = ["localhost:9200"] index = "seatunnel-*" source = ["_id","name","age"] query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}} }

complex

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Elasticsearch { hosts = ["elasticsearch:9200"] index = "st_index" schema = { fields { c_map = "map<string, tinyint>" c_array = "array<tinyint>" c_string = string c_boolean = boolean c_tinyint = tinyint c_smallint = smallint c_int = int c_bigint = bigint c_float = float c_double = double c_decimal = "decimal(2, 1)" c_bytes = bytes c_date = date c_timestamp = timestamp } } query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}} }

SSL (Disable certificates validation)

  [plaintext]
1
2
3
4
5
6
7
8
9
source { Elasticsearch { hosts = ["https://localhost:9200"] username = "elastic" password = "elasticsearch" tls_verify_certificate = false } }

SSL (Disable hostname validation)

  [plaintext]
1
2
3
4
5
6
7
8
9
source { Elasticsearch { hosts = ["https://localhost:9200"] username = "elastic" password = "elasticsearch" tls_verify_hostname = false } }

SSL (Enable certificates validation)

  [plaintext]
1
2
3
4
5
6
7
8
9
10
source { Elasticsearch { hosts = ["https://localhost:9200"] username = "elastic" password = "elasticsearch" tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12" tls_keystore_password = "${your password}" } }

参考资料

https://seatunnel.apache.org/docs/2.3.3/concept/JobEnvConfig