拓展阅读
ETL-10-apache SeaTunnel Connector v2 source mysql cdc
说明
mysql cdc 看官方的推荐,应该是让通过 json 发送到 kafka。
不过这样也比较麻烦,如果只是简单的 cdc 监听处理,那发送到 kafka,然后再监听 kafka 处理,绕了一个大弯子。
有没有办法,直接监听 CDC 处理,然后写入到 neo4j 库中?
因为有时候 mysql 到 neo4j 可能一对多,我们这里自己实现一个插件,支持基于 CDC 的类别,做一个对应的列表处理。多个 cypher 放在一个事务中。
准备
这里我们定义一个拥有 binlog 权限的账户;
CREATE USER 'admin'@'%' IDENTIFIED BY '123456';
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;
flush privileges;
确认开启了 binlog
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+-------+
| Variable_name | Value |
+--------------------------+-------+
| binlog_format | ROW |
| binlog_row_image | FULL |
| enforce_gtid_consistency | ON |
| gtid_mode | ON |
| log_bin | ON |
+--------------------------+-------+
5 rows in set, 1 warning (0.00 sec)
初始化表
我们模拟从源头库迁移到目标库。
源头库
create database migrate_source;
use migrate_source;
drop table if exists user_info;
create table user_info
(
id int unsigned auto_increment comment '主键' primary key,
username varchar(128) not null comment '用户名',
create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '用户表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;
drop table if exists role_info;
create table role_info
(
id int unsigned auto_increment comment '主键' primary key,
role_name varchar(128) not null comment '角色名',
create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '角色表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;
插入语句
truncate table user_info;
insert into user_info (username) values ('u1');
insert into user_info (username) values ('u2');
insert into user_info (username) values ('u3');
insert into user_info (username) values ('u4');
truncate table role_info;
insert into role_info (role_name) values ('r1');
insert into role_info (role_name) values ('r2');
insert into role_info (role_name) values ('r3');
insert into role_info (role_name) values ('r4');
确认:
mysql> select * from user_info;
+----+----------+---------------------+---------------------+
| id | username | create_time | update_time |
+----+----------+---------------------+---------------------+
| 1 | u1 | 2024-01-29 14:27:06 | 2024-01-29 14:27:06 |
| 2 | u2 | 2024-01-29 14:27:07 | 2024-01-29 14:27:07 |
| 3 | u3 | 2024-01-29 14:27:07 | 2024-01-29 14:27:07 |
| 4 | u4 | 2024-01-29 14:27:07 | 2024-01-29 14:27:07 |
+----+----------+---------------------+---------------------+
mysql> select * from role_info;
+----+-----------+---------------------+---------------------+
| id | role_name | create_time | update_time |
+----+-----------+---------------------+---------------------+
| 1 | r1 | 2024-01-29 15:46:14 | 2024-01-29 15:46:14 |
| 2 | r2 | 2024-01-29 15:46:14 | 2024-01-29 15:46:14 |
| 3 | r3 | 2024-01-29 15:46:14 | 2024-01-29 15:46:14 |
| 4 | r4 | 2024-01-29 15:46:14 | 2024-01-29 15:46:14 |
+----+-----------+---------------------+---------------------+
v1-基本功能的测试
说明:基本的功能测试条件。
配置文件
# Defining the runtime environment
env {
# You can set flink configuration here
parallelism = 1
job.mode = "STREAMING"
job.name = "merge_migrate_source.user_info-STREAMING"
checkpoint.interval = 10000
}
source{
MySQL-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/migrate_source?useSSL=false&serverTimezone=Asia/Shanghai"
driver = "com.mysql.jdbc.Driver"
username = "admin"
password = "123456"
table-names = ["migrate_source.user_info"]
startup.mode = "initial"
result_table_name="merge_migrate_source.user_info"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
MysqlToNeo4j {
source_table_name = "merge_migrate_source.user_info"
uri = "bolt://localhost:7687"
username = "neo4j"
password = "12345678"
database = "neo4j"
max_transaction_retry_time = 30000
max_connection_timeout = 30000
format = "default"
queryConfigList = [
{
tableName = "merge.migrate_source.user_info"
rowKind = "INSERT"
query = "create(p:merge_user_info {id: $id, username: $username, create_time: $create_time, update_time: $update_time})"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.user_info"
rowKind = "DELETE"
query = "MATCH (p:merge_user_info) WHERE p.id = $id DELETE p"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.user_info"
rowKind = "UPDATE_AFTER"
query = "MATCH (p:merge_user_info) WHERE p.id = $id SET p.id = $id, p.username = $username, p.create_time = $create_time, p.update_time = $update_time"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
}
]
}
}
启动初始化
╒══════════════════════════════════════════════════════════════════════╕
│n │
╞══════════════════════════════════════════════════════════════════════╡
│(:merge_user_info {update_time: "2024-01-29T14:27:06",create_time: "20│
│24-01-29T14:27:06",id: 1,username: "u1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 2,username: "u2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 3,username: "u3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 4,username: "u4"}) │
└──────────────────────────────────────────────────────────────────────┘
修改操作
add
source:
insert into user_info(username) values ('u5');
target:
╒══════════════════════════════════════════════════════════════════════╕
│n │
╞══════════════════════════════════════════════════════════════════════╡
│(:merge_user_info {update_time: "2024-01-29T14:27:06",create_time: "20│
│24-01-29T14:27:06",id: 1,username: "u1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 2,username: "u2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 3,username: "u3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 4,username: "u4"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:29:16",create_time: "20│
│24-01-29T14:29:16",id: 5,username: "u5"}) │
└──────────────────────────────────────────────────────────────────────┘
edit
source:
update user_info set username='u5-edit' where id=5;
target:
╒══════════════════════════════════════════════════════════════════════╕
│n │
╞══════════════════════════════════════════════════════════════════════╡
│(:merge_user_info {update_time: "2024-01-29T14:27:06",create_time: "20│
│24-01-29T14:27:06",id: 1,username: "u1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 2,username: "u2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 3,username: "u3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 4,username: "u4"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:29:42",create_time: "20│
│24-01-29T14:29:16",id: 5,username: "u5-edit"}) │
└──────────────────────────────────────────────────────────────────────┘
remove
source:
delete from user_info where id=5;
target:
╒══════════════════════════════════════════════════════════════════════╕
│n │
╞══════════════════════════════════════════════════════════════════════╡
│(:merge_user_info {update_time: "2024-01-29T14:27:06",create_time: "20│
│24-01-29T14:27:06",id: 1,username: "u1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 2,username: "u2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 3,username: "u3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 4,username: "u4"}) │
└──────────────────────────────────────────────────────────────────────┘
v2-insertOrUpdate
说明:同时支持 insert/update 的语句模式。
实现思路
利用 neo4j 的 merge 语句:
MERGE (p:Person {id: $id})
SET p.name = $name
RETURN p
这个查询首先尝试根据给定的id查找匹配的Person节点。如果节点存在,则更新节点的name属性;如果节点不存在,则创建一个新节点,并设置id和name属性。
比如:
第一次执行时插入:
MERGE (p:Person {id: 123})
SET p.name = 'John Doe'
RETURN p
再次执行时更新:
MERGE (p:Person {id: 123})
SET p.name = 'John Doe-edit'
RETURN p
配置文件写法
通过 merge 的写法,让 insert/update 变成同一个逻辑。
有时候数据接收的顺序可以保证兼容。
# Defining the runtime environment
env {
# You can set flink configuration here
parallelism = 1
job.mode = "STREAMING"
job.name = "merge_migrate_source.user_info-STREAMING"
checkpoint.interval = 10000
}
source{
MySQL-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/migrate_source?useSSL=false&serverTimezone=Asia/Shanghai"
driver = "com.mysql.jdbc.Driver"
username = "admin"
password = "123456"
table-names = ["migrate_source.user_info"]
startup.mode = "initial"
result_table_name="merge_migrate_source.user_info"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
MysqlToNeo4j {
source_table_name = "merge_migrate_source.user_info"
uri = "bolt://localhost:7687"
username = "neo4j"
password = "12345678"
database = "neo4j"
max_transaction_retry_time = 30000
max_connection_timeout = 30000
format = "default"
queryConfigList = [
{
tableName = "merge.migrate_source.user_info"
rowKind = "INSERT"
query = "MERGE (p:merge_user_info { id: $id }) SET p.id = $id, p.username = $username, p.create_time = $create_time, p.update_time = $update_time"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.user_info"
rowKind = "DELETE"
query = "MATCH (p:merge_user_info) WHERE p.id = $id DELETE p"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.user_info"
rowKind = "UPDATE_AFTER"
query = "MERGE (p:merge_user_info { id: $id }) SET p.id = $id, p.username = $username, p.create_time = $create_time, p.update_time = $update_time"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
}
]
}
}
准备工作
清空一下原来 target 的数据。
初始化启动
╒══════════════════════════════════════════════════════════════════════╕
│n │
╞══════════════════════════════════════════════════════════════════════╡
│(:merge_user_info {update_time: "2024-01-29T14:27:06",create_time: "20│
│24-01-29T14:27:06",id: 1,username: "u1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 2,username: "u2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 3,username: "u3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 4,username: "u4"}) │
└──────────────────────────────────────────────────────────────────────┘
修改操作
add
source:
insert into user_info(id, username) values (5, 'u5');
target:
╒══════════════════════════════════════════════════════════════════════╕
│n │
╞══════════════════════════════════════════════════════════════════════╡
│(:merge_user_info {update_time: "2024-01-29T14:27:06",create_time: "20│
│24-01-29T14:27:06",id: 1,username: "u1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 2,username: "u2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 3,username: "u3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 4,username: "u4"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:56:37",create_time: "20│
│24-01-29T14:56:37",id: 5,username: "u5"}) │
└──────────────────────────────────────────────────────────────────────┘
edit
source:
update user_info set username='u5-edit' where id=5;
target:
╒══════════════════════════════════════════════════════════════════════╕
│n │
╞══════════════════════════════════════════════════════════════════════╡
│(:merge_user_info {update_time: "2024-01-29T14:27:06",create_time: "20│
│24-01-29T14:27:06",id: 1,username: "u1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 2,username: "u2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 3,username: "u3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 4,username: "u4"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:57:01",create_time: "20│
│24-01-29T14:56:37",id: 5,username: "u5-edit"}) │
└──────────────────────────────────────────────────────────────────────┘
remove
source:
delete from user_info where id=5;
target:
╒══════════════════════════════════════════════════════════════════════╕
│n │
╞══════════════════════════════════════════════════════════════════════╡
│(:merge_user_info {update_time: "2024-01-29T14:27:06",create_time: "20│
│24-01-29T14:27:06",id: 1,username: "u1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 2,username: "u2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 3,username: "u3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T14:27:07",create_time: "20│
│24-01-29T14:27:07",id: 4,username: "u4"}) │
└──────────────────────────────────────────────────────────────────────┘
v3-allInOne 模式
说明:所有的语句,生成唯一一个文件。
思路
文件生成的时候,所有的表放在一起,然后不要循环生成。
当然每一张表单独一个 streaming 也可以,但是这样当表特别多的时候,就会导致暂用大量的线程去处理。
配置文件
# Defining the runtime environment
env {
# You can set flink configuration here
parallelism = 1
job.mode = "STREAMING"
job.name = "allInOne-CDC-STREAMING"
checkpoint.interval = 10000
}
source{
MySQL-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/migrate_source?useSSL=false&serverTimezone=Asia/Shanghai"
driver = "com.mysql.jdbc.Driver"
username = "admin"
password = "123456"
table-names = ["migrate_source.role_info", "migrate_source.user_info"]
startup.mode = "initial"
result_table_name="allInOne-CDC-result"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
MysqlToNeo4j {
source_table_name = "allInOne-CDC-result"
uri = "bolt://localhost:7687"
username = "neo4j"
password = "12345678"
database = "neo4j"
max_transaction_retry_time = 30000
max_connection_timeout = 30000
format = "default"
queryConfigList = [
{
tableName = "merge.migrate_source.role_info"
rowKind = "INSERT"
query = "MERGE (p:merge_role_info { id: $id }) SET p.id = $id, p.rolename = $rolename, p.create_time = $create_time, p.update_time = $update_time"
queryParamPosition = {
id = 0
rolename = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.role_info"
rowKind = "DELETE"
query = "MATCH (p:merge_role_info) WHERE p.id = $id DELETE p"
queryParamPosition = {
id = 0
rolename = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.role_info"
rowKind = "UPDATE_AFTER"
query = "MERGE (p:merge_role_info { id: $id }) SET p.id = $id, p.rolename = $rolename, p.create_time = $create_time, p.update_time = $update_time"
queryParamPosition = {
id = 0
rolename = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.user_info"
rowKind = "INSERT"
query = "MERGE (p:merge_user_info { id: $id }) SET p.id = $id, p.username = $username, p.create_time = $create_time, p.update_time = $update_time"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.user_info"
rowKind = "DELETE"
query = "MATCH (p:merge_user_info) WHERE p.id = $id DELETE p"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
},
{
tableName = "merge.migrate_source.user_info"
rowKind = "UPDATE_AFTER"
query = "MERGE (p:merge_user_info { id: $id }) SET p.id = $id, p.username = $username, p.create_time = $create_time, p.update_time = $update_time"
queryParamPosition = {
id = 0
username = 1
create_time = 2
update_time = 3
}
}
]
}
}
实际测试
清空操作
MATCH (n:merge_user_info) delete n;
MATCH (n:merge_role_info) delete n;
启动初始化
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T15:10:47",create_time: "20│
│24-01-29T15:10:47",id: 1,username: "r1"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_role_info {update_time: "2024-01-29T15:10:47",create_time: "20│
│24-01-29T15:10:47",rolename: "r2",id: 2}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T15:10:47",create_time: "20│
│24-01-29T15:10:47",id: 2,username: "r2"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_role_info {update_time: "2024-01-29T15:10:47",create_time: "20│
│24-01-29T15:10:47",rolename: "r3",id: 3}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T15:10:47",create_time: "20│
│24-01-29T15:10:47",id: 3,username: "r3"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_role_info {update_time: "2024-01-29T15:10:47",create_time: "20│
│24-01-29T15:10:47",rolename: "r4",id: 4}) │
├──────────────────────────────────────────────────────────────────────┤
│(:merge_user_info {update_time: "2024-01-29T15:10:47",create_time: "20│
│24-01-29T15:10:47",id: 4,username: "r4"}) │
└──────────────────────────────────────────────────────────────────────┘
启动失败
实际测试,当启动的时候,如果指定多个表,那么会失败。
应该是不同的表结构同步查询的时候出现问题,pull NetRecord 构建直接异常。
小结
保证灵活性与隔离性。