kafka

Kafka source connector

Support Those Engines

Spark Flink Seatunnel Zeta

Key Features

batch stream exactly-once × column projection parallelism × support user-defined split

驱动

直接从 connector-kafka 下载。

Source Options

名称 类型 必需 默认值 描述
topic 字符串 - 当表用作源时,从中读取数据的主题名称。它还支持通过逗号分隔的主题列表,例如 ‘topic-1,topic-2’。
bootstrap.servers 字符串 - 逗号分隔的 Kafka 代理列表。
pattern 布尔 false 如果将 pattern 设置为 true,则表示要从中读取的主题名称的正则表达式模式。将订阅匹配指定正则表达式的所有具有匹配名称的客户端的主题。
consumer.group 字符串 SeaTunnel-Consumer-Group Kafka 消费者组 ID,用于区分不同的消费者组。
commit_on_checkpoint 布尔 true 如果为 true,则消费者的偏移量将在后台定期提交。
kafka.config Map - 除了上述 Kafka 消费者客户端必须指定的必要参数之外,用户还可以指定多个消费者客户端非强制性参数,这些参数覆盖了官方 Kafka 文档中指定的所有消费者参数。
schema Config - 数据的结构,包括字段名称和字段类型。
format 字符串 json 数据格式。默认格式为 json。可选文本格式,canal-json 和 debezium-json。如果使用 json 或 text 格式,则默认字段分隔符为 “, “。如果自定义分隔符,请添加 “field_delimiter” 选项。如果使用 canal 格式,请参阅 canal-json 了解详细信息。如果使用 debezium 格式,请参阅 debezium-json 了解详细信息。
format_error_handle_way 字符串 fail 数据格式错误的处理方法。默认值为 fail,可选值为 (fail, skip)。选择 fail 时,数据格式错误将阻塞,并且将引发异常。选择 skip 时,数据格式错误将跳过此行数据。
field_delimiter 字符串 , 用于数据格式的字段定界符的自定义字段定界符。
start_mode StartMode 枚举 group_offsets 消费者的初始消费模式。
start_mode.offsets Config - 用于消费模式为 specific_offsets 的偏移量。
start_mode.timestamp 长整数 - 用于消费模式为 “timestamp” 的时间。
partition-discovery.interval-millis 长整数 -1 动态发现主题和分区的间隔。
common-options - 源插件的通用参数,请参阅 Source Common Options 了解详细信息。

这是关于源选项的配置列表。根据您的需求,您可以适当配置这些选项。

任务示例

简单示例:

此示例读取 Kafka 的 topic_1、topic_2、topic_3 的数据,并将其打印到客户端。

如果您尚未安装和部署 SeaTunnel,则需要按照《安装 SeaTunnel》中的说明安装和部署 SeaTunnel。然后按照《使用 SeaTunnel 引擎快速入门》中的说明运行此作业。

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
  Kafka {
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
    format = text
    field_delimiter = "#"
    topic = "topic_1,topic_2,topic_3"
    bootstrap.servers = "localhost:9092"
    kafka.config = {
      client.id = client_1
      max.poll.records = 500
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
    }
  }  
}
sink {
  Console {}
}

Regex Topic

source {
    Kafka {
          topic = ".*seatunnel*."
          pattern = "true" 
          bootstrap.servers = "localhost:9092"
          consumer.group = "seatunnel_group"
    }
}

AWS MSK SASL/SCRAM

替换以下 ${username}${password} 为 AWS MSK 中的配置值。

source {
    Kafka {
        topic = "seatunnel"
        bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096"
        consumer.group = "seatunnel_group"
        kafka.config = {
            security.protocol=SASL_SSL
            sasl.mechanism=SCRAM-SHA-512
            sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"
            #security.protocol=SASL_SSL
            #sasl.mechanism=AWS_MSK_IAM
            #sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
            #sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        }
    }
}

AWS MSK IAM

请从 https://github.com/aws/aws-msk-iam-auth/releases 下载 aws-msk-iam-auth-1.1.5.jar 并将其放置在 $SEATUNNEL_HOME/plugin/kafka/lib 目录中。

请确保 IAM 策略包含 “kafka-cluster:Connect”,类似于以下内容:

"Effect": "Allow",
"Action": [
    "kafka-cluster:Connect",
    "kafka-cluster:AlterCluster",
    "kafka-cluster:DescribeCluster"
],

source config:

source {
    Kafka {
        topic = "seatunnel"
        bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098"
        consumer.group = "seatunnel_group"
        kafka.config = {
            #security.protocol=SASL_SSL
            #sasl.mechanism=SCRAM-SHA-512
            #sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"
            security.protocol=SASL_SSL
            sasl.mechanism=AWS_MSK_IAM
            sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
            sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        }
    }
}

参考资料

https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/kafka