Elasticsearch

https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Elasticsearch

Elasticsearch 源连接器

描述

用于从 Elasticsearch 中读取数据。

支持版本 >= 2.x 和 <= 8.x。

关键特性

√ 批处理(batch)

  • 流处理(stream)
  • 精确一次(exactly-once) √ 列投影(column projection)
  • 并行性(parallelism)
  • 支持用户定义的拆分(user-defined split)

选项

名称 类型 必需 默认值
hosts 数组 -
username 字符串 -
password 字符串 -
index 字符串 -
source 数组 -
query JSON {“match_all”: {}}
scroll_time 字符串 1m
scroll_size 整数 100
schema - -
tls_verify_certificate 布尔值 true
tls_verify_hostnames 布尔值 true
tls_keystore_path 字符串 -
tls_keystore_password 字符串 -
tls_truststore_path 字符串 -
tls_truststore_password 字符串 -
common-options - -
hosts [array] - - -

hosts [数组] Elasticsearch 集群的 HTTP 地址,格式为 host:port,允许指定多个主机。例如 [“host1:9200”, “host2:9200”]。

username [字符串] x-pack 用户名。

password [字符串] x-pack 密码。

index [字符串] Elasticsearch 索引名称,支持 * 模糊匹配。

source [数组] 索引的字段。您可以通过指定字段 _id 获取文档 ID。如果将 _id 传送到其他索引,由于 Elasticsearch 的限制,您需要为 _id 指定别名。如果不配置 source,则必须配置 schema。

query [json] Elasticsearch DSL。您可以通过它来控制所读取数据的范围。

scroll_time [字符串] Elasticsearch 将保持滚动请求的搜索上下文的时间量。

scroll_size [整数] 每个 Elasticsearch 滚动请求返回的最大命中数。

schema 数据的结构,包括字段名称和字段类型。如果不配置 schema,则必须配置 source。

tls_verify_certificate [布尔值] 为 HTTPS 端点启用证书验证。

tls_verify_hostname [布尔值] 为 HTTPS 端点启用主机名验证。

tls_keystore_path [字符串] PEM 或 JKS 密钥库的路径。该文件必须可读取,由运行 SeaTunnel 的操作系统用户拥有。

tls_keystore_password [字符串] 指定密钥库的密钥密码。

tls_truststore_path [字符串] PEM 或 JKS 信任库的路径。该文件必须可读取,由运行 SeaTunnel 的操作系统用户拥有。

tls_truststore_password [字符串] 指定信任库的密钥密码。

Examples

simple

Elasticsearch {
    hosts = ["localhost:9200"]
    index = "seatunnel-*"
    source = ["_id","name","age"]
    query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}}
}

complex

Elasticsearch {
    hosts = ["elasticsearch:9200"]
    index = "st_index"
    schema = {
        fields {
            c_map = "map<string, tinyint>"
            c_array = "array<tinyint>"
            c_string = string
            c_boolean = boolean
            c_tinyint = tinyint
            c_smallint = smallint
            c_int = int
            c_bigint = bigint
            c_float = float
            c_double = double
            c_decimal = "decimal(2, 1)"
            c_bytes = bytes
            c_date = date
            c_timestamp = timestamp
        }
    }
    query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}}
}

SSL (Disable certificates validation)

source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"
        
        tls_verify_certificate = false
    }
}

SSL (Disable hostname validation)

source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"
        
        tls_verify_hostname = false
    }
}

SSL (Enable certificates validation)

source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"
        
        tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
        tls_keystore_password = "${your password}"
    }
}

参考资料

https://seatunnel.apache.org/docs/2.3.3/concept/JobEnvConfig