场景

seatunnel 版本: v2.3.3

从 jdbc batch 模式同步数据,可能数据量会比较大。

看了下默认是单线程执行的,如何并行执行呢?

jdbc 选项

jdbc 的选项:

名称 类型 必需 默认值 描述
url 字符串 - JDBC 连接的 URL。示例:jdbc:mysql://localhost:3306:3306/test
driver 字符串 - 用于连接到远程数据源的 JDBC 类名,如果使用 MySQL,则值为 com.mysql.cj.jdbc.Driver。
user 字符串 - 连接实例的用户名。
password 字符串 - 连接实例的密码。
query 字符串 - 查询语句。
connection_check_timeout_sec 整数 30 用于等待验证连接的数据库操作完成的时间,单位为秒。
partition_column 字符串 - 并行性分区的列名,仅支持数值类型,仅支持数值类型的主键,并且只能配置一个列。
partition_lower_bound BigDecimal - 扫描的 partition_column 的最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_upper_bound BigDecimal - 扫描的 partition_column 的最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_num 整数 job parallelism 分区计数的数量,仅支持正整数,默认值为作业并行性。
fetch_size 整数 0 对于返回大量对象的查询,可以配置查询中使用的行获取大小,以减少满足选择条件所需的数据库访问次数,零表示使用 JDBC 的默认值。
common-options - - 源插件的通用参数,请参阅 Source Common Options 了解详细信息。

主要是: partition_column 指定一个仅支持数值类型的主键的列名称,partition_num 分区计数的数量,仅支持正整数,默认值为作业并行性。

我们来来实战测试一下。

实战测试

简单的建表语句,ID 为数值类型的主键。

CREATE TABLE "person" (
  "ID" int(11) NOT NULL,
  "NAME" varchar(100) NOT NULL,
  PRIMARY KEY ("ID")
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

初始化数据

我们初始化 10W 数据。

INSERT INTO person (ID, NAME)
SELECT 
    ones.n + 10 * tens.n + 100 * hundreds.n + 1000 * thousands.n + 10000 * tenThousands.n + 1 as ID,
    CONCAT('Person', ones.n + 10 * tens.n + 100 * hundreds.n + 1000 * thousands.n + 10000 * tenThousands.n + 1) as NAME
FROM 
    (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) ones,
    (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) tens,
    (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) hundreds,
    (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) thousands,
    (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4 UNION SELECT 5 UNION SELECT 6 UNION SELECT 7 UNION SELECT 8 UNION SELECT 9) tenThousands
LIMIT 100000;

日志如下:

Query OK, 100000 rows affected (0.83 sec)
Records: 100000  Duplicates: 0  Warnings: 0

数据确认:

mysql> select count(*) from person;
+----------+
| count(*) |
+----------+
|   100000 |
+----------+

同步脚本

我们把数据从 mysql 查出来,同步到 neo4j。

基础版本

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
  job.name = "merge-test-person-BATCH"
  checkpoint.interval = 60000
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        user = "admin"
        password = "123456"
        query = "SELECT ID,NAME FROM person"
        result_table_name = "merge.test.person"
        fetch_size = 1000
        connection_check_timeout_sec = 20000
    }
}

transform {
}

sink {
    MysqlToNeo4j {
            source_table_name = "merge.test.person"

            uri = "bolt://localhost:7687"
            username = "neo4j"
            password = "12345678"
            database = "neo4j"

            max_transaction_retry_time = 30000
            max_connection_timeout = 30000

            format = "default"

            queryConfigList = [
                {
                    tableName = "merge.test.person"
                    rowKind = "INSERT"
                    query = "create(p:merge_person {ID: $ID, NAME: $NAME})"
                    queryParamPosition = {
                        ID = 0
                        NAME = 1

                    }
                }
            ]
    }
}

添加并行配置属性

我们根据上面的配置,source jdbc 修改如下:

source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        user = "admin"
        password = "123456"
        query = "SELECT ID,NAME FROM person"
        result_table_name = "merge.test.person"
        fetch_size = 1000
        connection_check_timeout_sec = 20000
        # 指定一个仅支持数值类型的主键的列名称
        partition_column = "ID"
        # 分区计数的数量,仅支持正整数,默认值为作业并行性
        partition_num = 10
    }
}

测试执行

本地单机,直接 examples 指定执行

选取可以看到执行的日志信息:

2024-01-26 10:15:09,785 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Starting to calculate splits.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[70001, 80000], splitId=7) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[20001, 30000], splitId=2) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[40001, 50000], splitId=4) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[30001, 40000], splitId=3) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[90001, 100000], splitId=9) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[10001, 20000], splitId=1) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[50001, 60000], splitId=5) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[60001, 70000], splitId=6) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[80001, 90000], splitId=8) to 0 reader.
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assigning JdbcSourceSplit(parameterValues=[1, 10000], splitId=0) to 0 reader.
2024-01-26 10:15:09,787 DEBUG org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assign pendingSplits to readers [0]
2024-01-26 10:15:09,787 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Assign splits [JdbcSourceSplit(parameterValues=[70001, 80000], splitId=7), JdbcSourceSplit(parameterValues=[20001, 30000], splitId=2), JdbcSourceSplit(parameterValues=[40001, 50000], splitId=4), JdbcSourceSplit(parameterValues=[30001, 40000], splitId=3), JdbcSourceSplit(parameterValues=[90001, 100000], splitId=9), JdbcSourceSplit(parameterValues=[10001, 20000], splitId=1), JdbcSourceSplit(parameterValues=[50001, 60000], splitId=5), JdbcSourceSplit(parameterValues=[60001, 70000], splitId=6), JdbcSourceSplit(parameterValues=[80001, 90000], splitId=8), JdbcSourceSplit(parameterValues=[1, 10000], splitId=0)] to reader 0
2024-01-26 10:15:09,794 DEBUG org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - No more splits to assign. Sending NoMoreSplitsEvent to reader [0].

没仔细看源码,这里应该是全部查出,然后根据 id 拆分。

***********************************************
           Job Progress Information
***********************************************
Job Id                    :  803085518936997889
Read Count So Far         :               35727
Write Count So Far        :               33678
Average Read Count        :               145/s
Average Write Count       :               145/s
Last Statistic Time       : 2024-01-26 10:18:09
Current Statistic Time    : 2024-01-26 10:19:09
***********************************************

结束日志:

***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-01-26 10:15:08
End Time                  : 2024-01-26 10:26:59
Total Time(s)             :                 710
Total Read Count          :              100000
Total Write Count         :              100000
Total Failed Count        :                   0
***********************************************

对比组1-默认 Mysql to neo4j

如果我们用默认的值,不执行并发呢?

配置文件

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  job.name = "merge-test-person-BATCH"
  checkpoint.interval = 60000
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        user = "admin"
        password = "123456"
        query = "SELECT ID,NAME FROM person"
        result_table_name = "merge.test.person"
        fetch_size = 1000
        connection_check_timeout_sec = 20000
        # 指定一个仅支持数值类型的主键的列名称
        # partition_column = "ID"
        # 分区计数的数量,仅支持正整数,默认值为作业并行性
        # partition_num = 10
    }
}

transform {
}

sink {
    MysqlToNeo4j {
            source_table_name = "merge.test.person"

            uri = "bolt://localhost:7687"
            username = "neo4j"
            password = "12345678"
            database = "neo4j"

            max_transaction_retry_time = 30000
            max_connection_timeout = 30000

            format = "default"

            queryConfigList = [
                {
                    tableName = "merge.test.person"
                    rowKind = "INSERT"
                    query = "create(p:merge_person {ID: $ID, NAME: $NAME})"
                    queryParamPosition = {
                        ID = 0
                        NAME = 1

                    }
                }
            ]
    }
}

执行测试

日志

2024-01-26 10:30:10,355 DEBUG org.apache.seatunnel.engine.server.checkpoint.CheckpointManager - reported task(40000) status READY_START
2024-01-26 10:30:10,355 DEBUG org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator - Register reader 0 to JdbcSourceSplitEnumerator.
2024-01-26 10:30:10,355 DEBUG org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask - reader register complete, current task size 1
2024-01-26 10:30:10,355 DEBUG org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5802 [seatunnel-893526] [5.1] remove async execute function from TaskGroupLocation{jobId=803089297803575297, pipelineId=1, taskGroupId=1} with id 4e3993ed-3d16-4405-81fe-f7fd6b9e7341

统计日志:

***********************************************
           Job Progress Information
***********************************************
Job Id                    :  803089297803575297
Read Count So Far         :               10190
Write Count So Far        :                8141
Average Read Count        :               169/s
Average Write Count       :               135/s
Last Statistic Time       : 2024-01-26 10:30:10
Current Statistic Time    : 2024-01-26 10:31:10
***********************************************

       Job Statistic Information *********************************************** Start Time                : 2024-01-26 10:30:09 End Time                  : 2024-01-26 10:42:04 Total Time(s)             :                 715 Total Read Count          :              100000 Total Write Count         :              100000 Total Failed Count        :                   0 ***********************************************

发现性能感觉差别…

猜测原因:很可能是 neo4j 的写入存在性能瓶颈。或者是哪里配置的存在问题。

我们引入对比组2:mysql 读取,直接控台输出。

对比组2-默认 Mysql to console

配置

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  job.name = "merge-test-person-BATCH"
  checkpoint.interval = 60000
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        user = "admin"
        password = "123456"
        query = "SELECT ID,NAME FROM person"
        result_table_name = "merge.test.person"
        fetch_size = 1000
        connection_check_timeout_sec = 20000
        # 指定一个仅支持数值类型的主键的列名称
        # partition_column = "ID"
        # 分区计数的数量,仅支持正整数,默认值为作业并行性
        # partition_num = 10
    }
}

transform {
}

sink {
    ConsoleBinlog {
    }
}

ConsoleBinlog 中每次输出到控台,默认沉睡 1ms。

发现直接使用 console,1ms 10W 条就全部执行完了,没有区分度。

测试

日志:

***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-01-26 10:53:05
End Time                  : 2024-01-26 10:55:51
Total Time(s)             :                 165
Total Read Count          :              100000
Total Write Count         :              100000
Total Failed Count        :                   0
***********************************************

对比组3-10并发 Mysql to console

配置

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 10
  job.mode = "BATCH"
  job.name = "merge-test-person-BATCH"
  checkpoint.interval = 60000
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        user = "admin"
        password = "123456"
        query = "SELECT ID,NAME FROM person"
        result_table_name = "merge.test.person"
        fetch_size = 1000
        connection_check_timeout_sec = 20000
        # 指定一个仅支持数值类型的主键的列名称
        partition_column = "ID"
        # 分区计数的数量,仅支持正整数,默认值为作业并行性
        partition_num = 10
    }
}

transform {
}

sink {
    ConsoleBinlog {
    }
}

测试

***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-01-26 10:57:16
End Time                  : 2024-01-26 11:00:51
Total Time(s)             :                 214
Total Read Count          :              100000
Total Write Count         :              100000
Total Failed Count        :                   0
***********************************************

发现性能并没有提升,甚至还下降了。

难道是这个属性用错了。

疑问1-apache seatunnel partition_column 实际测试无效?

通过对比组2 VS 对比组3,看起来这个属性是完全无效的?

这个可以看一下源码部分。

还是说,所有的瓶颈都还是主要看 sink 的部分?

还是说本地代码测试模式,会忽略这个配置属性?

linux 单机运行模式

我们用运行的单机服务验证一下。

TODO….

如何解决 neo4j 的写入瓶颈问题?

优缺点

转 oneByOne 为 batch。

首先:为什么默认用的逐个写入,逐个写入的缺点是性能差,但是优点是没有缓存问题,单条的影响范围最小。比如一条数据已经存在,只影响这1条。批量就可能导致整个失败,不过这个在从零的全量同步,还好。

batch 的问题:看了下 neo4j 默认的 batch 写入模式,如果没理解过,是本地引入一个 buffer 缓存,到达阈值后触发真正的写入。这个在无限流模式还行,不过存在延迟。但是批模式,比如数量为 998,batch_size=100,是否会导致最后 98 个数据无法写入呢?如何解决这个问题呢?

batch 模式写入端如何解决 bufferList 可能丢失数据问题?

如果是流模式,有新的数据,肯定会填满。或者定时刷新都没有问题。但是批模式,执行结束就结束了,肯不满足最后的阈值。

思路1:添加一个阈值,比如 1W 数据,10 并发,id >= 9900 的时候,就把批量的阈值动态调整为 1,避免写入丢失的问题?这个阈值让用户自己填写。

直接根据 batchSize 实现的方式

配置

参考 neo4j 的批量入库实现:

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  job.name = "merge-test-person-BATCH"
  checkpoint.interval = 60000
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        user = "admin"
        password = "123456"
        query = "SELECT ID,NAME FROM person"
        result_table_name = "merge.test.person"
        fetch_size = 1000
        connection_check_timeout_sec = 20000
    }
}

transform {
}

sink {
    MysqlToNeo4j {
            source_table_name = "merge.test.person"

            uri = "bolt://localhost:7687"
            username = "neo4j"
            password = "12345678"
            database = "neo4j"

            max_transaction_retry_time = 30000
            max_connection_timeout = 30000

            format = "default"
            max_batch_size = 1000
            write_mode = "BATCH"

            queryConfigList = [
                {
                    tableName = "merge.test.person"
                    rowKind = "INSERT"

                    query = "unwind $batch as row create(p:merge_person {ID: row.ID, NAME: row.NAME})"
                    queryParamPosition = {
                        ID = 0
                        NAME = 1
                    }
                }
            ]
    }
}

效果

效果显著,果然写入才是瓶颈。

***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-01-26 13:04:55
End Time                  : 2024-01-26 13:05:01
Total Time(s)             :                   5
Total Read Count          :              100000
Total Write Count         :              100000
Total Failed Count        :                   0
***********************************************

测试数据不是 max_batch_size 的整数吧倍时

准备

我们把目标库 neo4j 清空

MATCH (n:merge_person) delete n;

删除掉原始库 mysql 的一条数据

delete from person where id=1;
select count(*) from person;

如下:

+----------+
| count(*) |
+----------+
|    99999 |
+----------+

再次测试

其他不变,再次执行。

***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-01-26 13:10:34
End Time                  : 2024-01-26 13:10:39
Total Time(s)             :                   5
Total Read Count          :               99999
Total Write Count         :               99999
Total Failed Count        :                   0
***********************************************

实际数据库 neo4j 数据:

╒════════╕
│count(*)│
╞════════╡
│99000   │
└────────┘

最后的 999 因为没有触发条件,而丢失了,如何解决呢?

解决方式

在 close 之前,直接处理掉即可。

@Override
public void close() throws IOException {
    flushAllWriteBuffer();
    
    session.close();
    driver.close();
}

这里可以看到生命周期的重要性。

重新测试

统计日志:

***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-01-26 13:18:53
End Time                  : 2024-01-26 13:18:59
Total Time(s)             :                   6
Total Read Count          :               99999
Total Write Count         :               99999
Total Failed Count        :                   0
***********************************************

目标库数据:

MATCH (n:merge_person) RETURN count(*)

╒════════╕
│count(*)│
╞════════╡
│99999   │
└────────┘

OK, 问题解决!

TODO

对应的脚本生成。

执行异常

checkpoint 异常

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.engine.SeaTunnelEngineMysqlDefaultToNeo4jDefineConcurrencyExample.main(SeaTunnelEngineMysqlDefaultToNeo4jDefineConcurrencyExample.java:46)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:255)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:532)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

	at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
	... 2 more

原因:这个原来解决过。

解决方式:直接修改 seatunnel.yaml checkpoint.timeout and checkpoint.interval

D:\_my\seatunnel-2.3.3-release-slim\seatunnel-engine\seatunnel-engine-common\src\main\resources\seatunnel.yaml

配置

seatunnel:
    engine:
        backup-count: 1
        queue-type: blockingqueue
        print-execution-info-interval: 60
        slot-service:
            dynamic-slot: true
        checkpoint:
            interval: 300000
            timeout: 10000
            storage:
                type: localfile
                max-retained: 3
                plugin-config:
                    namespace: C:\ProgramData\seatunnel\checkpoint\

checkpoint 值改大一点:

seatunnel:
    engine:
        backup-count: 1
        queue-type: blockingqueue
        print-execution-info-interval: 60
        slot-service:
            dynamic-slot: true
        checkpoint:
            interval: 6000000
            timeout: 6000000
            storage:
                type: localfile
                max-retained: 3
                plugin-config:
                    namespace: C:\ProgramData\seatunnel\checkpoint\

参考资料

https://github.com/apache/seatunnel/issues/5555

[Bug] [Zeta Engine] the checkpoint lock cause checkpoint-flow blocking with long time