拓展阅读
ETL-10-apache SeaTunnel Connector v2 source mysql cdc
mysql 准备
创建一个测试账户:
CREATE USER 'admin'@'%' IDENTIFIED BY '123456';
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;
flush privileges;
启用 binlog
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 10
binlog_format = row
binlog_row_image = FULL
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on
建表
create database etl;
use etl;
创建测试表:
drop table if exists user_info;
create table user_info
(
id int unsigned auto_increment comment '主键' primary key,
username varchar(128) not null comment '用户名',
create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '枚举映射表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;
create unique index user_info on user_info (username) comment '标识索引';
drop table if exists user_info_bak;
create table user_info_bak
(
id int unsigned auto_increment comment '主键' primary key,
username varchar(128) not null comment '用户名',
create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '枚举映射表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;
create unique index user_info_bak on user_info_bak (username) comment '标识索引';
v1-mysql => mysql CDC
需求
我们首先验证一下最简单的 mysql-CDC 到 mysql 的功能。
添加依赖
添加对应的 cdc 依赖:
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
</dependency>
配置
# Defining the runtime environment
env {
# You can set flink configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source{
MySQL-CDC {
base-url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
driver = "com.mysql.cj.jdbc.Driver"
username = "admin"
password = "123456"
table-names = ["etl.user_info"]
startup.mode = "initial"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {
}
}
吐槽一下,这里的配置属性值和一般的 jdbc 没有统一。
不足
感觉这里还没有 neo4j 的方便,因为参数看文档,只有 ?
的方式,无法指定对应的字段?
测试 add
insert into user_info (username) values ('binlog-add-01');
insert into user_info (username) values ('binlog-add-02');
insert into user_info (username) values ('binlog-add-03');
insert into user_info (username) values ('binlog-add-04');
insert into user_info (username) values ('binlog-add-05');
对应的日志:
2024-01-15 16:00:42,286 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=etl.user_info SeaTunnelRow#kind=INSERT : 1, binlog-add-01, 2024-01-15T16:00:42, 2024-01-15T16:00:42
2024-01-15 16:00:42,286 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=etl.user_info SeaTunnelRow#kind=INSERT : 2, binlog-add-02, 2024-01-15T16:00:42, 2024-01-15T16:00:42
2024-01-15 16:00:42,286 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=etl.user_info SeaTunnelRow#kind=INSERT : 3, binlog-add-03, 2024-01-15T16:00:42, 2024-01-15T16:00:42
2024-01-15 16:00:42,286 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=etl.user_info SeaTunnelRow#kind=INSERT : 4, binlog-add-04, 2024-01-15T16:00:42, 2024-01-15T16:00:42
2024-01-15 16:00:42,286 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=etl.user_info SeaTunnelRow#kind=INSERT : 5, binlog-add-05, 2024-01-15T16:00:42, 2024-01-15T16:00:42
注意
这个 startup.mode = “initial” 导致每次启动都会去从头开始查询数据,感觉还不如设置为最新比较合理。
测试修改
update user_info set username = 'binlog-edit-01' where username = 'binlog-add-01';
日志:
2024-01-15 16:07:32,265 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=etl.user_info SeaTunnelRow#kind=UPDATE_BEFORE : 1, binlog-add-01, 2024-01-15T16:00:42, 2024-01-15T16:00:42
2024-01-15 16:08:29,426 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=etl.user_info SeaTunnelRow#kind=UPDATE_AFTER : 1, binlog-edit-01, 2024-01-15T16:00:42, 2024-01-15T16:07:22
测试删除
delete from user_info where username = 'binlog-add-02';
日志:
2024-01-15 16:10:36,060 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=etl.user_info SeaTunnelRow#kind=DELETE : 2, binlog-add-02, 2024-01-15T16:00:42, 2024-01-15T16:00:42
问题
这种数据,信息都被简化掉了,会导致接收到的时候,无法和字段关联起来?
主要的是多了一个类别?
binlog json 格式
我们来验证一下 json 序列化的效果,并且输出到自定义的 console 中。
学习一下插件的编写。
json 序列化
上面的日只有一个缺点就是不太利于阅读。
如果我们想将其转换为标准的 json 的话。
# Defining the runtime environment
env {
# You can set flink configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source{
MySQL-CDC {
base-url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
driver = "com.mysql.cj.jdbc.Driver"
username = "admin"
password = "123456"
table-names = ["etl.user_info"]
startup.mode = "initial"
format = compatible_debezium_json
debezium = {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
# include dd1
include.schema.changes = true
# topic.prefix
database.server.name = ""
}
#compatible_debezium_json fixed schema
schema = {
fields = {
id = int
username = string
create_time = date
update_time = date
}
}
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
# 使用自定义的控台输出,避免 console 异常。
ConsoleBinlog {
}
}
序列化这部分的官方地址丢失了,可以参考类:DebeziumJsonSerializationSchema
还有一些别人提的 issue: https://github.com/apache/seatunnel/issues/5010
ConsoleBinlog 自定义插件
原始的插件会对数据进行处理,如果直接序列化之后,console 会报错。
我们可以复制一份 console 的实现,加一个 consoleBinlog 模块。
主要改一行逻辑,其他的都是插件名称修改:
@Slf4j
public class ConsoleBinlogSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
// 其他
@Override
@SuppressWarnings("checkstyle:RegexpSingleline")
public void write(SeaTunnelRow element) {
// 简化日志输出,避免 console 处理导致的报错
log.info("ConsoleBinlogSinkWriter ================= " + element.toString());
}
然后在 examples 中引入这个插件即可。
启动时
SeaTunnelRow{tableId=, kind=+I, fields=[.etl.user_info, {"id":1}, {"before":null,"after":{"id":1,"username":"binlog-edit-01","create_time":"2024-01-15T08:00:42Z","update_time":"2024-01-15T08:07:22Z"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"","ts_ms":0,"snapshot":"false","db":"etl","sequence":null,"table":"user_info","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705310101381,"transaction":null}]}
SeaTunnelRow{tableId=, kind=+I, fields=[.etl.user_info, {"id":3}, {"before":null,"after":{"id":3,"username":"binlog-add-03","create_time":"2024-01-15T08:00:42Z","update_time":"2024-01-15T08:00:42Z"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"","ts_ms":0,"snapshot":"false","db":"etl","sequence":null,"table":"user_info","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705310101381,"transaction":null}]}
SeaTunnelRow{tableId=, kind=+I, fields=[.etl.user_info, {"id":4}, {"before":null,"after":{"id":4,"username":"binlog-add-04","create_time":"2024-01-15T08:00:42Z","update_time":"2024-01-15T08:00:42Z"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"","ts_ms":0,"snapshot":"false","db":"etl","sequence":null,"table":"user_info","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705310101382,"transaction":null}]}
SeaTunnelRow{tableId=, kind=+I, fields=[.etl.user_info, {"id":5}, {"before":null,"after":{"id":5,"username":"binlog-add-05","create_time":"2024-01-15T08:00:42Z","update_time":"2024-01-15T08:00:42Z"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"","ts_ms":0,"snapshot":"false","db":"etl","sequence":null,"table":"user_info","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705310101382,"transaction":null}]}
更新
update user_info set username = 'binlog-edit-03' where username = 'binlog-add-03';
日志:
SeaTunnelRow{tableId=, kind=+I, fields=[.etl.user_info, {"id":3}, {"before":{"id":3,"username":"binlog-add-03","create_time":"2024-01-15T08:00:42Z","update_time":"2024-01-15T08:00:42Z"},"after":{"id":3,"username":"binlog-edit-03","create_time":"2024-01-15T08:00:42Z","update_time":"2024-01-15T09:17:02Z"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"","ts_ms":1705310222000,"snapshot":"false","db":"etl","sequence":null,"table":"user_info","server_id":223344,"gtid":"f00fb9a1-1a19-11ed-911a-00ff5f785ccc:15","file":"mysql-bin.000001","pos":4113,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1705310222423,"transaction":null}]}
删除
delete from user_info where username = 'binlog-edit-01';
日志:
SeaTunnelRow{tableId=, kind=+I, fields=[.etl.user_info, {"id":1}, {"before":{"id":1,"username":"binlog-edit-01","create_time":"2024-01-15T08:00:42Z","update_time":"2024-01-15T08:07:22Z"},"after":null,"source":{"version":"1.6.4.Final","connector":"mysql","name":"","ts_ms":1705310352000,"snapshot":"false","db":"etl","sequence":null,"table":"user_info","server_id":223344,"gtid":"f00fb9a1-1a19-11ed-911a-00ff5f785ccc:16","file":"mysql-bin.000001","pos":4431,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1705310352117,"transaction":null}]}
插入
insert into user_info (username) values ('binlog-add-06');
日志:
SeaTunnelRow{tableId=, kind=+I, fields=[.etl.user_info, {"id":8}, {"before":null,"after":{"id":8,"username":"binlog-add-06","create_time":"2024-01-15T09:22:09Z","update_time":"2024-01-15T09:22:09Z"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"","ts_ms":1705310529000,"snapshot":"false","db":"etl","sequence":null,"table":"user_info","server_id":223344,"gtid":"f00fb9a1-1a19-11ed-911a-00ff5f785ccc:19","file":"mysql-bin.000001","pos":5296,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1705310529765,"transaction":null}]}
整体感受
这里的 kink=+I 好像是固定的,没有仔细研究对应的 format 实现。
不过可以根据 op 来判断
c: CREATE d: DELETE u: UPDATE
思考
这里的格式化是一个插件。
关键既然可以根据原始的 row 序列化为 json,那么说明 row 的信息应该是完整的。
我们有没有办法直接解析这个原始的 row,实现入库等操作。
但是不是直接的入库,因为涉及到对应的 3 个操作:
-
CREATE 插入
-
DELETE 删除
-
UPDATE 更新
一些要求
能否类似于 neo4j?做对应的字段映射。可以指定,更加灵活?
如果一次操作,涉及到多个 node+edge 的变化,能否放在一个事务中?
v2.3.3 mysql cdc release 包的问题
现象
用本地代码测试正常,但是用命令行在 wsl 上执行就会报错。
命令
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --config /home/dh/bigdata/seatunnel-2.3.3/config/mysql_cdc_to_neo4j_multi.conf -elocal
配置文件
env {
# You can set flink configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source{
MySQL-CDC {
base-url = "jdbc:mysql://127.0.0.1:13306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
driver = "com.mysql.cj.jdbc.Driver"
username = "admin"
password = "123456"
table-names = ["etl.user_info"]
startup.mode = "initial"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {
}
}
releease 包的形式执行
2024-01-19 13:20:50,734 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed
2024-01-19 13:20:50,735 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:223)
at org.apache.seatunnel.engine.server.task.context.SourceReaderContext.sendSplitRequest(SourceReaderContext.java:64)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:94)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2437)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.seatunnel.common.utils.SerializationUtils.deserialize(SerializationUtils.java:74)
at org.apache.seatunnel.api.serialization.DefaultSerializer.deserialize(DefaultSerializer.java:41)
at org.apache.seatunnel.api.serialization.DefaultSerializer.deserialize(DefaultSerializer.java:25)
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.lambda$run$0(AssignSplitOperation.java:67)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.run(AssignSplitOperation.java:54)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
... 2 more
2024-01-19 12:47:38,125 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
猜测原因 v1-connector-cdc-mysql 问题
类似的bug:
https://github.com/apache/seatunnel/issues/4403
https://github.com/apache/seatunnel/issues/5010
说是已经解决,但是实际上问题并没有被解决。
尝试解决方式
~/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/lib
~/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel
这里的 connector-cdc-mysql-2.3.3.jar
备份到本地,然后移除。
在本地编译代码,然后把包放进去。
本地编译的 jar: connector-cdc-mysql-2.3.4-SNAPSHOT-2.11.12.jar
对应的 WSL 位置:
\\wsl.localhost\Ubuntu\home\dh\bigdata\seatunnel-2.3.3\backend\apache-seatunnel-2.3.3\lib
\\wsl.localhost\Ubuntu\home\dh\bigdata\seatunnel-2.3.3\backend\apache-seatunnel-2.3.3\connectors\seatunnel
重启服务
cd ~/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3
bash bin/stop-seatunnel-cluster.sh
nohup bash bin/seatunnel-cluster.sh 2>&1 &
再次验证
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --config /home/dh/bigdata/seatunnel-2.3.3/config/mysql_cdc_to_neo4j_multi.conf -elocal
发现依然报错。
猜测原因 v2-connector-cdc-base 问题
源码部分
protected SnapshotSplit createSnapshotSplit(
JdbcConnection jdbc,
TableId tableId,
int chunkId,
SeaTunnelRowType splitKeyType,
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
Object[] splitStart = chunkStart == null ? null : new Object[] {chunkStart};
Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
return new SnapshotSplit(
splitId(tableId, chunkId), tableId, splitKeyType, splitStart, splitEnd);
}
这里的 tableId 参数:io.debezium.relational.TableId
二者应该确实是同一个才对。
报错:
java.lang.ClassCastException: cannot assign instance of io.debezium.relational.TableId to field org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit.tableId of type io.debezium.relational.TableId in instance of org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
这个为什么会类型转换错误呢?二者不都是同一个类 io.debezium.relational.TableId 吗?
可能不一样的原因
在 Java 中,ClassCastException
表示在运行时尝试将一个对象强制转换为与其实际类型不兼容的类型。
虽然两者看起来相同,但在 Java 中,类加载器的不同实例可能导致两个具有相同类名的类被认为是不同的类。
下面是一些可能导致你遇到这个问题的原因:
-
类加载器问题: 类加载器负责在 Java 虚拟机中加载类。如果
io.debezium.relational.TableId
在运行时由不同的类加载器加载,即使两者的类名相同,它们也被认为是不同的类。在你的错误信息中,可能是在org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit
类的字段tableId
的类型定义上有问题。确保这两个类使用的是相同的类加载器。
-
版本不匹配: 即使是相同的类,也要确保它们来自于相同版本的库或 JAR 文件。版本不匹配可能导致类的字段、方法等发生变化,从而引发类转换异常。
-
ClassLoader 和双亲委派模型: Java 的类加载机制遵循双亲委派模型。如果在不同的类加载器层次结构中加载了相同名称的类,它们会被认为是不同的类。这可能导致在运行时发生类型转换异常。
为了解决这个问题,可以考虑以下步骤:
-
检查类加载器: 确保
io.debezium.relational.TableId
类在运行时由相同的类加载器加载。 -
检查版本: 确保你的应用程序使用的所有库和依赖项的版本是一致的。
-
ClassLoader 隔离: 如果可能,考虑使用类加载器隔离,确保相同的类只由相同的类加载器加载。
-
日志输出: 在出现异常的代码周围添加日志输出,输出相关对象的
getClass().getClassLoader()
,以便查看它们的类加载器是否相同。
如果问题仍然存在,更详细的代码片段和相关上下文可能有助于进一步分析问题。
验证猜想
我们把 connector 和 lib 中的包备份,然后把其他不需要的的 connector 全部移除掉。
cd ~/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/
cp -r lib lib_bak
cp -r connectors/seatunnel connectors/seatunnel_bak
我们移除 lib + connectors 下边的其他 cdc 包
rm connector-cdc-mongodb-2.3.3.jar
怀疑这个有问题,不同的 cdc 导致对应的基类冲突了?
重新验证
cd ~/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3
bash bin/stop-seatunnel-cluster.sh
nohup bash bin/seatunnel-cluster.sh 2>&1 &
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --config /home/dh/bigdata/seatunnel-2.3.3/config/mysql_cdc_to_neo4j_multi.conf
还是失败了。。。
是遗漏了什么吗?
移除 lib 下面的所有 datasource-*.jar
移除过头了。。PS 一定要提前备份。
成功的版本
看起来应该只需要把 lib 下的移除,只需要保留 connectos 即可。
对应的包
b$ ls
mysql-connector-java-8.0.28.jar seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar seatunnel-transforms-v2.jar
$ pwd
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/lib
$ pwd
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel
$ ls
connector-cdc-mysql-2.3.3.jar connector-console-2.3.3.jar connector-fake-2.3.3.jar
日志
2024-01-19 15:05:56,442 INFO org.apache.seatunnel.core.starter.utils.ConfigShadeUtils - Load config shade spi: [base64]
2024-01-19 15:05:56,471 INFO org.apache.seatunnel.core.starter.utils.ConfigBuilder - Parsed config file: {
"env" : {
"parallelism" : 1,
"job.mode" : "STREAMING",
"checkpoint.interval" : 10000
},
"source" : [
{
"base-url" : "jdbc:mysql://127.0.0.1:13306/etl?useSSL=false&serverTimezone=Asia/Shanghai",
"password" : "123456",
"startup.mode" : "initial",
"driver" : "com.mysql.cj.jdbc.Driver",
"table-names" : [
"etl.user_info"
],
"plugin_name" : "MySQL-CDC",
"username" : "admin"
}
],
"transform" : [],
"sink" : [
{
"plugin_name" : "Console"
}
]
}
2024-01-19 15:05:56,489 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-01-19 15:05:56,489 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-01-19 15:05:56,493 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load SeaTunnelSink Plugin from /home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel
2024-01-19 15:05:56,498 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: MySQL-CDC at: file:/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel/connector-cdc-mysql-2.3.3.jar
2024-01-19 15:05:56,499 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: Console at: file:/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel/connector-console-2.3.3.jar
2024-01-19 15:05:56,502 INFO org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all sources.
2024-01-19 15:05:56,502 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-01-19 15:05:56,519 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-01-19 15:05:56,521 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load SeaTunnelSource Plugin from /home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel
2024-01-19 15:05:56,525 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Discovery plugin jar: MySQL-CDC at: file:/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel/connector-cdc-mysql-2.3.3.jar
2024-01-19 15:05:56,527 INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery - Load plugin: PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='MySQL-CDC'} from classpath
2024-01-19 15:05:56,686 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - Catalog mysql established connection to jdbc:mysql://127.0.0.1:13306/etl?useSSL=false&serverTimezone=Asia/Shanghai
2024-01-19 15:05:56,738 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog - Catalog mysql closing
2024-01-19 15:05:56,786 INFO org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all transforms.
2024-01-19 15:05:56,786 INFO org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser - start generating all sinks.
2024-01-19 15:05:56,787 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-01-19 15:05:56,790 INFO org.apache.seatunnel.api.configuration.ReadonlyConfig - Config uses fallback configuration key 'plugin_name' instead of key 'factory'
2024-01-19 15:05:56,829 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Start submit job, job id: 800621986458894337, with plugin jar [file:/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel/connector-cdc-mysql-2.3.3.jar, file:/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors/seatunnel/connector-console-2.3.3.jar]
2024-01-19 15:05:56,867 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Submit job finished, job id: 800621986458894337, job name: SeaTunnel
2024-01-19 15:05:56,874 WARN org.apache.seatunnel.engine.client.job.JobMetricsRunner - Failed to get job metrics summary, it maybe first-run
^[[A^[[A^[[B^[[B2024-01-19 15:06:56,916 INFO org.apache.seatunnel.engine.client.job.JobMetricsRunner -
***********************************************
Job Progress Information
***********************************************
Job Id : 800621986458894337
Read Count So Far : 6
Write Count So Far : 6
Average Read Count : 0/s
Average Write Count : 0/s
Last Statistic Time : 2024-01-19 15:05:56
Current Statistic Time : 2024-01-19 15:06:56
***********************************************
2024-01-19 15:07:56,885 INFO org.apache.seatunnel.engine.client.job.JobMetricsRunner -
***********************************************
Job Progress Information
***********************************************
Job Id : 800621986458894337
Read Count So Far : 7
Write Count So Far : 7
Average Read Count : 0/s
Average Write Count : 0/s
Last Statistic Time : 2024-01-19 15:06:56
Current Statistic Time : 2024-01-19 15:07:56
***********************************************
2024-01-19 15:08:56,883 INFO org.apache.seatunnel.engine.client.job.JobMetricsRunner -
***********************************************
Job Progress Information
***********************************************
Job Id : 800621986458894337
Read Count So Far : 7
Write Count So Far : 7
Average Read Count : 0/s
Average Write Count : 0/s
Last Statistic Time : 2024-01-19 15:07:56
Current Statistic Time : 2024-01-19 15:08:56
***********************************************
2024-01-19 15:09:56,882 INFO org.apache.seatunnel.engine.client.job.JobMetricsRunner -
***********************************************
Job Progress Information
***********************************************
Job Id : 800621986458894337
Read Count So Far : 7
Write Count So Far : 7
Average Read Count : 0/s
Average Write Count : 0/s
Last Statistic Time : 2024-01-19 15:08:56
Current Statistic Time : 2024-01-19 15:09:56
***********************************************
参考资料
https://seatunnel.apache.org/docs/2.3.3/contribution/contribute-transform-v2-guide
[Bug] [Mysql-cdc] send Mysql data to kafka
[Bug] [mysql-cdc] mysql-cdc Failed to get driver instance for jdbcUrl