MySQL

https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Mysql

JDBC MySQL 数据源连接器

支持的引擎

Spark Flink SeaTunnel Zeta

主要特点

√ 批处理 × 流处理 √ 精确一次 √ 列投影 √ 并行性 √ 支持用户定义的拆分

支持查询 SQL 并能够实现投影效果。

数据库依赖

请下载与 ‘Maven’ 对应的支持列表,并将其复制到 ‘$SEATNUNNEL_HOME/plugins/jdbc/lib/’ 工作目录中。

例如,MySQL 数据源:cp mysql-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

源选项

名称 类型 必需 默认值 描述
url 字符串 - JDBC 连接的 URL。示例:jdbc:mysql://localhost:3306:3306/test
driver 字符串 - 用于连接到远程数据源的 JDBC 类名,如果使用 MySQL,则值为 com.mysql.cj.jdbc.Driver。
user 字符串 - 连接实例的用户名。
password 字符串 - 连接实例的密码。
query 字符串 - 查询语句。
connection_check_timeout_sec 整数 30 用于等待验证连接的数据库操作完成的时间,单位为秒。
partition_column 字符串 - 并行性分区的列名,仅支持数值类型,仅支持数值类型的主键,并且只能配置一个列。
partition_lower_bound BigDecimal - 扫描的 partition_column 的最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_upper_bound BigDecimal - 扫描的 partition_column 的最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_num 整数 job parallelism 分区计数的数量,仅支持正整数,默认值为作业并行性。
fetch_size 整数 0 对于返回大量对象的查询,可以配置查询中使用的行获取大小,以减少满足选择条件所需的数据库访问次数,零表示使用 JDBC 的默认值。
common-options - - 源插件的通用参数,请参阅 Source Common Options 了解详细信息。

提示

如果未设置 partition_column,则将以单一并发运行。

如果设置了 partition_column,将根据任务的并发度并行执行。

当您的分片读取字段为大数类型,如 bigint(30) 及以上,并且数据不均匀分布时,建议将并行级别设置为1,以确保解决数据倾斜问题。

任务示例

简单示例:

此示例在单一并行度中查询在您的测试 “database” 中的 “table” 表中的 16 条数据,并查询其所有字段。

您还可以指定要在最终输出到控制台的查询哪些字段。

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "123456"
        query = "select * from type_bin limit 16"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

并行示例:

使用您配置的分片字段和分片数据并行读取您的查询表。

如果您想要读取整个表,可以使用此方法。

source {
    Jdbc {
        url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "123456"
        # Define query logic as required
        query = "select * from type_bin"
        # Parallel sharding reads fields
        partition_column = "id"
        # Number of fragments
        partition_num = 10
    }
}

并行边界示例:

根据您配置的上限和下限边界指定查询范围,按照您配置的上下边界读取数据源更为高效。

数据源配置示例:

source {
    Jdbc {
        url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "123456"
        # 根据需要定义查询逻辑
        query = "select * from type_bin"
        partition_column = "id"
        # 读取起始边界
        partition_lower_bound = 1
        # 读取结束边界
        partition_upper_bound = 500
        partition_num = 10
    }
}

这个配置示例使用了Jdbc数据源,连接到MySQL数据库。它执行了一个查询,从表type_bin中选择所有字段。

并且通过指定分片字段id和设置分片的上下边界,以及分片数量,实现了并行读取数据的配置。

读取的数据将在id从1到500之间进行分片,并将分成10个分片进行并行读取。

参考资料

https://seatunnel.apache.org/docs/2.3.3/concept/JobEnvConfig