mysql 准备

创建一个测试账户:

  [sql]
1
2
3
CREATE USER 'admin'@'%' IDENTIFIED BY '123456'; GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION; flush privileges;

建表

  [sql]
1
2
create database etl; use etl;

创建测试表+初始化数据:

  [sql]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
drop table if exists lc_enum_mapping; create table lc_enum_mapping ( id int unsigned auto_increment comment '主键' primary key, table_name varchar(128) not null comment '表名称', column_name varchar(128) not null comment '字段名称', `key` varchar(128) not null comment '字段编码', label varchar(256) not null comment '字段显示', create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间', update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间' ) comment '枚举映射表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1; create unique index ix_lc_enum_mapping on lc_enum_mapping (table_name, column_name, `key`) comment '标识索引'; insert into lc_enum_mapping(table_name, column_name, `key`, label) values ('user', 'status', 'Y', '启用'); insert into lc_enum_mapping(table_name, column_name, `key`, label) values ('user', 'status', 'N', '禁用');

测试:

  [plaintext]
1
2
3
4
5
6
7
8
> select * from lc_enum_mapping; +----+------------+-------------+-----+--------+---------------------+---------------------+ | id | table_name | column_name | key | label | create_time | update_time | +----+------------+-------------+-----+--------+---------------------+---------------------+ | 1 | user | status | Y | 启用 | 2024-01-15 09:23:50 | 2024-01-15 09:23:50 | | 2 | user | status | N | 禁用 | 2024-01-15 09:23:51 | 2024-01-15 09:23:51 | +----+------------+-------------+-----+--------+---------------------+---------------------+ 2 rows in set (0.00 sec)

v1-简单的 mysql => console 配置

需求

我们首先验证一下最简单的 mysql 到控台的功能。

  [yaml]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Defining the runtime environment env { # You can set flink configuration here execution.parallelism = 2 job.mode = "BATCH" } source{ Jdbc { url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 100 user = "admin" password = "123456" query = "select * from lc_enum_mapping limit 10" } } transform { # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/transform-v2/sql } sink { Console {} }

报错 1

  [plaintext]
1
2
3
4
5
6
7
8
9
10
Caused by: java.lang.RuntimeException: Plugin PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Jdbc'} not found. at org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery.createPluginInstance(AbstractPluginDiscovery.java:223) at org.apache.seatunnel.engine.core.parse.ConnectorInstanceLoader.loadSourceInstance(ConnectorInstanceLoader.java:61) at org.apache.seatunnel.engine.core.parse.JobConfigParser.parseSource(JobConfigParser.java:81) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:317) at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:179) at org.apache.seatunnel.engine.core.job.AbstractJobEnvironment.getLogicalDag(AbstractJobEnvironment.java:109) at org.apache.seatunnel.engine.client.job.JobExecutionEnvironment.execute(JobExecutionEnvironment.java:73) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:143) ... 2 more

测试时 jar plugin 是如何发现的?

debug 可以看到对应的插件目录:

  [plaintext]
1
pluginDir=D:\_my\seatunnel-2.3.3-release-slim\seatunnel-common\connectors\seatunnel

解决方式

1) 引入缺失的包

在 seatunnel-engine-examples 模块引入我们需要的包。默认原来其实只有 connector-fake/connector-console/connector-assert,所以原来测试没问题。

  [xml]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- 添加测试 connector--> <dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>connector-jdbc</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>connector-cdc-mysql</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.27</version> </dependency>

2)重启 idea,避免不生效等问题。

3)重新编译安装

  [plaintext]
1
mvn clean install -DskipTests

测试日志:

  [plaintext]
1
2
3
2024-01-15 09:59:27,267 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader - Closed the bounded jdbc source 2024-01-15 09:59:27,268 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : 1, user, status, Y, 启用, 2024-01-15T09:23:50, 2024-01-15T09:23:50 2024-01-15 09:59:27,268 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : 2, user, status, N, 禁用, 2024-01-15T09:23:51, 2024-01-15T09:23:51

v2-mysql=>console 添加格式处理转换

说明

原始的查询在落库的时候,一般是需要做一些转换处理的。

我们可以把原来的 sql source 作为第一层的结果,然后在 transform 中处理,最后在 sink 中获取结果。

配置

  [yaml]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Defining the runtime environment env { # You can set flink configuration here execution.parallelism = 2 job.mode = "BATCH" } source{ Jdbc { url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 100 user = "admin" password = "123456" query = "select * from lc_enum_mapping limit 10" result_table_name = "lc_enum_mapping_source" } } transform { # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/transform-v2/sql Sql { source_table_name = "lc_enum_mapping_source" result_table_name = "lc_enum_mapping_transform" # id | table_name | column_name | key | label query = "select id, concat(table_name, '_') as table_name, column_name, key, label from lc_enum_mapping_source" } } sink { Console { source_table_name = "lc_enum_mapping_transform" } }

引入转换的依赖包

这里用到了 sql 的 transform

看了下,测试模块默认包含了。

  [xml]
1
2
3
4
5
6
<!-- seatunnel-transforms-v2 --> <dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>seatunnel-transforms-v2</artifactId> <version>${project.version}</version> </dependency>

测试日志:

  [plaintext]
1
2
3
2024-01-15 10:20:16,019 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader - Closed the bounded jdbc source 2024-01-15 10:20:16,019 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : 1, user_, status, Y, 启用 2024-01-15 10:20:16,019 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : 2, user_, status, N, 禁用

可以看到这次 console 输出的日志,已经是我们在 transform 转换处理过的了。

会有链式处理的日志:

  [plaintext]
1
2024-01-15 10:20:16,228 DEBUG org.apache.seatunnel.engine.server.checkpoint.PendingCheckpoint - acknowledgeTask states [[ActionSubtaskState(stateKey=ActionStateKey(name=ActionStateKey - pipeline-1 [Source[0]-Jdbc-lc_enum_mapping_source]), index=0, state=[]), ActionSubtaskState(stateKey=ActionStateKey(name=ActionStateKey - pipeline-1 [TransformChain[Transform[0]-Sql-lc_enum_mapping_transform]]), index=0, state=[])]]

mysql=>neo4j 测试

说明

我们如何把 mysql 的数据写入到 neo4j 呢?

准备工作

首先准备好对应的 neo4j 服务。这里不做展开。

依赖 xml

  [xml]
1
2
3
4
5
6
7
8
9
10
<dependency> <groupId>org.apache.seatunnel</groupId> <artifactId>connector-neo4j</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.neo4j.driver</groupId> <artifactId>neo4j-java-driver</artifactId> <version>4.4.9</version> </dependency>

配置文件

  [conf]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Defining the runtime environment env { # You can set flink configuration here execution.parallelism = 2 job.mode = "BATCH" } source{ Jdbc { url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 1000 user = "admin" password = "123456" query = "select id,table_name,column_name,`key`,label,create_time,update_time from lc_enum_mapping limit 10" result_table_name = "lc_enum_mapping_source" } } transform { } sink { Neo4j { source_table_name = "lc_enum_mapping_source" uri = "bolt://localhost:7687" username = "neo4j" password = "12345678" database = "neo4j" max_transaction_retry_time = 1000 max_connection_timeout = 1000 # id,table_name,column_name,key,label,create_time,update_time query = "CREATE (a:LcEnumMapping {id: $id, tableName: $table_name, columnName: $column_name, label: $label})" queryParamPosition = { id = 0 table_name = 1 column_name = 2 label = 4 } } }

这个配置,从 mysql 中查询数据,然后插入到对应的数据库 neo4j。

测试效果

正常插入到 neo4j,neo4j 对应的数据:

  [plaintext]
1
MATCH (n:LcEnumMapping) RETURN n LIMIT 25

结果:

  [plaintext]
1
2
3
4
5
6
7
8
9
╒══════════════════════════════════════════════════════════════════════╕ │n │ ╞══════════════════════════════════════════════════════════════════════╡ │(:LcEnumMapping {id: 1,label: "启用",tableName: "user",columnName: "stat│ │us"}) │ ├──────────────────────────────────────────────────────────────────────┤ │(:LcEnumMapping {id: 2,label: "禁用",tableName: "user",columnName: "stat│ │us"}) │ └──────────────────────────────────────────────────────────────────────┘

一些疑问

如果想把数据库中一张表的数据,全部同步到 neo4j。要如何配置实现?

还是说只能是一次全量的同步?

我们下一篇测试下大量的数据处理,然后看一下具体效果。

参考资料

https://seatunnel.apache.org/docs/2.3.3/contribution/contribute-transform-v2-guide