kafka

Kafka source connector

Support Those Engines

Spark Flink Seatunnel Zeta

Key Features

batch stream exactly-once × column projection parallelism × support user-defined split

驱动

直接从 connector-kafka 下载。

Source Options

名称 类型 必需 默认值 描述
topic 字符串 - 当表用作源时,从中读取数据的主题名称。它还支持通过逗号分隔的主题列表,例如 ‘topic-1,topic-2’。
bootstrap.servers 字符串 - 逗号分隔的 Kafka 代理列表。
pattern 布尔 false 如果将 pattern 设置为 true,则表示要从中读取的主题名称的正则表达式模式。将订阅匹配指定正则表达式的所有具有匹配名称的客户端的主题。
consumer.group 字符串 SeaTunnel-Consumer-Group Kafka 消费者组 ID,用于区分不同的消费者组。
commit_on_checkpoint 布尔 true 如果为 true,则消费者的偏移量将在后台定期提交。
kafka.config Map - 除了上述 Kafka 消费者客户端必须指定的必要参数之外,用户还可以指定多个消费者客户端非强制性参数,这些参数覆盖了官方 Kafka 文档中指定的所有消费者参数。
schema Config - 数据的结构,包括字段名称和字段类型。
format 字符串 json 数据格式。默认格式为 json。可选文本格式,canal-json 和 debezium-json。如果使用 json 或 text 格式,则默认字段分隔符为 “, “。如果自定义分隔符,请添加 “field_delimiter” 选项。如果使用 canal 格式,请参阅 canal-json 了解详细信息。如果使用 debezium 格式,请参阅 debezium-json 了解详细信息。
format_error_handle_way 字符串 fail 数据格式错误的处理方法。默认值为 fail,可选值为 (fail, skip)。选择 fail 时,数据格式错误将阻塞,并且将引发异常。选择 skip 时,数据格式错误将跳过此行数据。
field_delimiter 字符串 , 用于数据格式的字段定界符的自定义字段定界符。
start_mode StartMode 枚举 group_offsets 消费者的初始消费模式。
start_mode.offsets Config - 用于消费模式为 specific_offsets 的偏移量。
start_mode.timestamp 长整数 - 用于消费模式为 “timestamp” 的时间。
partition-discovery.interval-millis 长整数 -1 动态发现主题和分区的间隔。
common-options - 源插件的通用参数,请参阅 Source Common Options 了解详细信息。

这是关于源选项的配置列表。根据您的需求,您可以适当配置这些选项。

任务示例

简单示例:

此示例读取 Kafka 的 topic_1、topic_2、topic_3 的数据,并将其打印到客户端。

如果您尚未安装和部署 SeaTunnel,则需要按照《安装 SeaTunnel》中的说明安装和部署 SeaTunnel。然后按照《使用 SeaTunnel 引擎快速入门》中的说明运行此作业。

  [yml]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Defining the runtime environment env { # You can set flink configuration here execution.parallelism = 2 job.mode = "BATCH" } source { Kafka { schema = { fields { name = "string" age = "int" } } format = text field_delimiter = "#" topic = "topic_1,topic_2,topic_3" bootstrap.servers = "localhost:9092" kafka.config = { client.id = client_1 max.poll.records = 500 auto.offset.reset = "earliest" enable.auto.commit = "false" } } } sink { Console {} }

Regex Topic

  [plaintext]
1
2
3
4
5
6
7
8
source { Kafka { topic = ".*seatunnel*." pattern = "true" bootstrap.servers = "localhost:9092" consumer.group = "seatunnel_group" } }

AWS MSK SASL/SCRAM

替换以下 ${username}${password} 为 AWS MSK 中的配置值。

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
source { Kafka { topic = "seatunnel" bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096" consumer.group = "seatunnel_group" kafka.config = { security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";" #security.protocol=SASL_SSL #sasl.mechanism=AWS_MSK_IAM #sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" #sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" } } }

AWS MSK IAM

请从 https://github.com/aws/aws-msk-iam-auth/releases 下载 aws-msk-iam-auth-1.1.5.jar 并将其放置在 $SEATUNNEL_HOME/plugin/kafka/lib 目录中。

请确保 IAM 策略包含 “kafka-cluster:Connect”,类似于以下内容:

  [plaintext]
1
2
3
4
5
6
"Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster" ],

source config:

  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
source { Kafka { topic = "seatunnel" bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098" consumer.group = "seatunnel_group" kafka.config = { #security.protocol=SASL_SSL #sasl.mechanism=SCRAM-SHA-512 #sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";" security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" } } }

参考资料

https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/kafka