拓展阅读

ETL-10-apache SeaTunnel Connector v2 source mysql cdc

mysql binlog

BUGS

Apache seatunnel version:v2.3.3

Mysql version: 5.7.31-log

BUG: mysql cdc table-names config one table, works fine. But config multi tables, meets org.apache.kafka.connect.errors.DataException: xxx is not a valid field name(mysql cdc 当 table-names 指定监听多张表的时候,examples 本地测试直接失败。只指定一张表的时候,不存在问题)

Run Mode: local seatunnel-examples run config file directly.

Ask: How to fix this problem? I think mysql-cdc should support config multi-tables in one config.

config file

For simple, sink is console.

  • mysql_cdc_to_console.conf
# Defining the runtime environment
env {
  # You can set flink configuration here
  parallelism = 1
  job.mode = "STREAMING"
  job.name = "merge_cdc.user_info-STREAMING"
  checkpoint.interval = 10000
}
source{
    MySQL-CDC {
        base-url = "jdbc:mysql://127.0.0.1:13306/cdc?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        username = "admin"
        password = "123456"
        table-names = ["cdc.user_info", "cdc.role_info"]

        startup.mode = "initial"
        result_table_name="merge_cdc.user_info"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {
    }
}

exception logs

2024-01-29 18:00:26,226 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
===============================================================================



Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.engine.cdc.MysqlCdcDefaultToLocalFileMultiTablesExample.main(MysqlCdcDefaultToLocalFileMultiTablesExample.java:44)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: username is not a valid field name
	at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
	at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:84)
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractAfterRow(SeaTunnelRowDebeziumDeserializeSchema.java:209)
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:178)
	at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:110)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:155)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:130)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

	at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
	... 2 more

local debug

local debug,found the database metadata is mismatch with data.

test

parpare info

admin user

CREATE USER 'admin'@'%' IDENTIFIED BY '123456';
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;
flush privileges;

enable binlog

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| binlog_format            | ROW   |
| binlog_row_image         | FULL  |
| enforce_gtid_consistency | ON    |
| gtid_mode                | ON    |
| log_bin                  | ON    |
+--------------------------+-------+
5 rows in set, 1 warning (0.00 sec)

init tables

tables

create database cdc;
use cdc;

drop table if exists user_info;
create table user_info
(
    id int unsigned auto_increment comment '主键' primary key,
    username varchar(128) not null comment '用户名',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '用户表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;

drop table if exists role_info;
create table role_info
(
    id int unsigned auto_increment comment '主键' primary key,
    role_name varchar(128) not null comment '角色名',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '角色表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;

data

truncate table user_info;

insert into user_info (username) values ('u1');
insert into user_info (username) values ('u2');
insert into user_info (username) values ('u3');
insert into user_info (username) values ('u4');


truncate table role_info;
insert into role_info (role_name) values ('r1');
insert into role_info (role_name) values ('r2');
insert into role_info (role_name) values ('r3');
insert into role_info (role_name) values ('r4');

test:

mysql> select * from user_info;
+----+----------+---------------------+---------------------+
| id | username | create_time         | update_time         |
+----+----------+---------------------+---------------------+
|  1 | u1       | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  2 | u2       | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  3 | u3       | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  4 | u4       | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
+----+----------+---------------------+---------------------+

mysql> select * from role_info;
+----+-----------+---------------------+---------------------+
| id | role_name | create_time         | update_time         |
+----+-----------+---------------------+---------------------+
|  1 | r1        | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  2 | r2        | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  3 | r3        | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  4 | r4        | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
+----+-----------+---------------------+---------------------+

提了对应的需求:https://github.com/apache/seatunnel/issues/6302

本地启动

/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --config mysql_cdc_to_console.conf -e local

问题解决

发现需要指定一下 catalog

发现需要指定一下下面的内容:

catalog = {
	factory = MySQL
}

但是这个重要的属性,在官方文档上并没有。

还是在另一个博客中找到的

全方位解读SeaTunnel MySQL CDC连接器:实现数据高效同步的强大工具

# Defining the runtime environment
env {
  # You can set flink configuration here
  parallelism = 1
  job.mode = "STREAMING"
  job.name = "merge_cdc.user_info-STREAMING"
  checkpoint.interval = 10000
}
source{
    MySQL-CDC {
        catalog = {
            factory = MySQL
        }
        base-url = "jdbc:mysql://127.0.0.1:3306/cdc?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        username = "admin"
        password = "123456"
        table-names = ["cdc.user_info", "cdc.role_info"]

        startup.mode = "initial"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    ConsoleBinlog {
    }
}

对应的文件内容:

2024-01-30 10:33:50.632 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.role_info, kind=+I, fields=[1, r1, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:50.659 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.role_info, kind=+I, fields=[2, r2, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.795 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.role_info, kind=+I, fields=[3, r3, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.811 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.role_info, kind=+I, fields=[4, r4, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.827 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.user_info, kind=+I, fields=[1, u1, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.843 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.user_info, kind=+I, fields=[2, u2, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.864 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.user_info, kind=+I, fields=[3, u3, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.879 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.user_info, kind=+I, fields=[4, u4, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}

发现这里有对应的 tableId 信息。

那如果想写入到对应的 jdbc 表,应该怎么办呢?

多表的同步验证

根据 source

虽然官方并没有真正的看到 cdc 多表的完整例子。

但是上面的 console 内容输出为:

2024-01-30 10:33:50.632 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.role_info, kind=+I, fields=[1, r1, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:50.659 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.role_info, kind=+I, fields=[2, r2, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.795 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.role_info, kind=+I, fields=[3, r3, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.811 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.role_info, kind=+I, fields=[4, r4, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.827 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.user_info, kind=+I, fields=[1, u1, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.843 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.user_info, kind=+I, fields=[2, u2, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.864 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.user_info, kind=+I, fields=[3, u3, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}
2024-01-30 10:33:57.879 ~~~~~~~~~~~~ SeaTunnelRow{tableId=cdc.user_info, kind=+I, fields=[4, u4, 2024-01-29T17:33:42, 2024-01-29T17:33:42]}

然后 jdbc + mysql 对应的 sink 看起来都是单表的?

v1-不指定数据库+表的验证

conf

看一下是否会自动对应?

# Defining the runtime environment
env {
  # You can set flink configuration here
  parallelism = 1
  job.mode = "STREAMING"
  job.name = "merge_cdc.user_info-STREAMING"
  checkpoint.interval = 10000
}
source{
    MySQL-CDC {
        catalog = {
            factory = MySQL
        }
        base-url = "jdbc:mysql://127.0.0.1:3306/cdc?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.jdbc.Driver"
        username = "admin"
        password = "123456"
        table-names = ["cdc.user_info", "cdc.role_info"]

        startup.mode = "initial"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    jdbc {
        database = "cdc_target"
        url = "jdbc:mysql://localhost:3306/cdc_target?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "admin"
        password = "123456"
        generate_sink_sql = true
    }
}

step1: 不创建库+表

直接执行,会报错库不存在

Caused by: java.sql.SQLSyntaxErrorException: Unknown database 'cdc_target'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:111)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:70)
	... 18 more

我们创建一下对应的库

create database cdc_target;

step2: 创建库+不创建表

再次执行

Caused by: java.sql.SQLSyntaxErrorException: Table 'cdc_target.user_info' doesn't exist
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1098)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832)
	... 23 more

	at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
	... 2 more

但是这并不支持 Schema Evolution 表结构变更同步呀?

还是说我哪里配置的问题?

step3: 创建库+创建表

use cdc_target;

drop table if exists user_info;
create table user_info
(
    id int unsigned auto_increment comment '主键' primary key,
    username varchar(128) not null comment '用户名',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '用户表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;

drop table if exists role_info;
create table role_info
(
    id int unsigned auto_increment comment '主键' primary key,
    role_name varchar(128) not null comment '角色名',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '角色表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;

重新执行。

mysql> select * from  role_info;
+----+-----------+---------------------+---------------------+
| id | role_name | create_time         | update_time         |
+----+-----------+---------------------+---------------------+
|  1 | r1        | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  2 | r2        | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  3 | r3        | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  4 | r4        | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
+----+-----------+---------------------+---------------------+
4 rows in set (0.00 sec)

mysql> select * from user_info;
+----+----------+---------------------+---------------------+
| id | username | create_time         | update_time         |
+----+----------+---------------------+---------------------+
|  1 | u1       | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  2 | u2       | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  3 | u3       | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
|  4 | u4       | 2024-01-29 17:33:42 | 2024-01-29 17:33:42 |
+----+----------+---------------------+---------------------+
4 rows in set (0.00 sec)

变更表结构呢?

变更 cdc 源的结构

ALTER table user_info add remark varchar(128) null comment '备注';

实际并不生效。

参考资料

https://blog.csdn.net/DolphinScheduler/article/details/134855410