mysql 准备

创建一个测试账户:

CREATE USER 'admin'@'%' IDENTIFIED BY '123456';
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;
flush privileges;

建表

create database etl;
use etl;

创建测试表+初始化数据:

drop table if exists lc_enum_mapping;
create table lc_enum_mapping
(
    id int unsigned auto_increment comment '主键' primary key,
    table_name varchar(128) not null comment '表名称',
    column_name varchar(128) not null comment '字段名称',
    `key` varchar(128) not null comment '字段编码',
    label varchar(256) not null comment '字段显示',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '枚举映射表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;
create unique index ix_lc_enum_mapping on lc_enum_mapping (table_name, column_name, `key`) comment '标识索引';

insert into lc_enum_mapping(table_name, column_name, `key`, label) values ('user', 'status', 'Y', '启用');
insert into lc_enum_mapping(table_name, column_name, `key`, label) values ('user', 'status', 'N', '禁用');

测试:

> select * from lc_enum_mapping;
+----+------------+-------------+-----+--------+---------------------+---------------------+
| id | table_name | column_name | key | label  | create_time         | update_time         |
+----+------------+-------------+-----+--------+---------------------+---------------------+
|  1 | user       | status      | Y   | 启用   | 2024-01-15 09:23:50 | 2024-01-15 09:23:50 |
|  2 | user       | status      | N   | 禁用   | 2024-01-15 09:23:51 | 2024-01-15 09:23:51 |
+----+------------+-------------+-----+--------+---------------------+---------------------+
2 rows in set (0.00 sec)

v1-简单的 mysql => console 配置

需求

我们首先验证一下最简单的 mysql 到控台的功能。

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "admin"
        password = "123456"
        query = "select * from lc_enum_mapping limit 10"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

报错 1

Caused by: java.lang.RuntimeException: Plugin PluginIdentifier{engineType='seatunnel', pluginType='source', pluginName='Jdbc'} not found.
	at org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery.createPluginInstance(AbstractPluginDiscovery.java:223)
	at org.apache.seatunnel.engine.core.parse.ConnectorInstanceLoader.loadSourceInstance(ConnectorInstanceLoader.java:61)
	at org.apache.seatunnel.engine.core.parse.JobConfigParser.parseSource(JobConfigParser.java:81)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:317)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:179)
	at org.apache.seatunnel.engine.core.job.AbstractJobEnvironment.getLogicalDag(AbstractJobEnvironment.java:109)
	at org.apache.seatunnel.engine.client.job.JobExecutionEnvironment.execute(JobExecutionEnvironment.java:73)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:143)
	... 2 more

测试时 jar plugin 是如何发现的?

debug 可以看到对应的插件目录:

pluginDir=D:\_my\seatunnel-2.3.3-release-slim\seatunnel-common\connectors\seatunnel

解决方式

1) 引入缺失的包

在 seatunnel-engine-examples 模块引入我们需要的包。默认原来其实只有 connector-fake/connector-console/connector-assert,所以原来测试没问题。

<!--        添加测试 connector-->
<dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>connector-jdbc</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>connector-cdc-mysql</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

2)重启 idea,避免不生效等问题。

3)重新编译安装

mvn clean install -DskipTests

测试日志:

2024-01-15 09:59:27,267 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader - Closed the bounded jdbc source
2024-01-15 09:59:27,268 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : 1, user, status, Y, 启用, 2024-01-15T09:23:50, 2024-01-15T09:23:50
2024-01-15 09:59:27,268 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : 2, user, status, N, 禁用, 2024-01-15T09:23:51, 2024-01-15T09:23:51

v2-mysql=>console 添加格式处理转换

说明

原始的查询在落库的时候,一般是需要做一些转换处理的。

我们可以把原来的 sql source 作为第一层的结果,然后在 transform 中处理,最后在 sink 中获取结果。

配置

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "admin"
        password = "123456"
        query = "select * from lc_enum_mapping limit 10"
        result_table_name = "lc_enum_mapping_source"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
    Sql {
        source_table_name = "lc_enum_mapping_source"
        result_table_name = "lc_enum_mapping_transform"
        # id | table_name | column_name | key | label
        query = "select id, concat(table_name, '_') as table_name, column_name, key, label from lc_enum_mapping_source"
    }
}

sink {
    Console {
        source_table_name = "lc_enum_mapping_transform"
    }
}

引入转换的依赖包

这里用到了 sql 的 transform

看了下,测试模块默认包含了。

<!--   seatunnel-transforms-v2   -->
<dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>seatunnel-transforms-v2</artifactId>
    <version>${project.version}</version>
</dependency>

测试日志:

2024-01-15 10:20:16,019 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader - Closed the bounded jdbc source
2024-01-15 10:20:16,019 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : 1, user_, status, Y, 启用
2024-01-15 10:20:16,019 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId= SeaTunnelRow#kind=INSERT : 2, user_, status, N, 禁用

可以看到这次 console 输出的日志,已经是我们在 transform 转换处理过的了。

会有链式处理的日志:

2024-01-15 10:20:16,228 DEBUG org.apache.seatunnel.engine.server.checkpoint.PendingCheckpoint - acknowledgeTask states [[ActionSubtaskState(stateKey=ActionStateKey(name=ActionStateKey - pipeline-1 [Source[0]-Jdbc-lc_enum_mapping_source]), index=0, state=[]), ActionSubtaskState(stateKey=ActionStateKey(name=ActionStateKey - pipeline-1 [TransformChain[Transform[0]-Sql-lc_enum_mapping_transform]]), index=0, state=[])]]

mysql=>neo4j 测试

说明

我们如何把 mysql 的数据写入到 neo4j 呢?

准备工作

首先准备好对应的 neo4j 服务。这里不做展开。

依赖 xml

<dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>connector-neo4j</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>org.neo4j.driver</groupId>
    <artifactId>neo4j-java-driver</artifactId>
    <version>4.4.9</version>
</dependency>

配置文件

# Defining the runtime environment
env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 1000
        user = "admin"
        password = "123456"
        query = "select id,table_name,column_name,`key`,label,create_time,update_time from lc_enum_mapping limit 10"
        result_table_name = "lc_enum_mapping_source"
    }
}

transform {
}

sink {
    Neo4j {
        source_table_name = "lc_enum_mapping_source"

        uri = "bolt://localhost:7687"
        username = "neo4j"
        password = "12345678"
        database = "neo4j"

        max_transaction_retry_time = 1000
        max_connection_timeout = 1000

        # id,table_name,column_name,key,label,create_time,update_time
        query = "CREATE (a:LcEnumMapping {id: $id, tableName: $table_name, columnName: $column_name, label: $label})"
        queryParamPosition = {
            id = 0
            table_name = 1
            column_name = 2
            label = 4
        }
    }
}

这个配置,从 mysql 中查询数据,然后插入到对应的数据库 neo4j。

测试效果

正常插入到 neo4j,neo4j 对应的数据:

MATCH (n:LcEnumMapping) RETURN n LIMIT 25

结果:

╒══════════════════════════════════════════════════════════════════════╕
│n                                                                     │
╞══════════════════════════════════════════════════════════════════════╡
│(:LcEnumMapping {id: 1,label: "启用",tableName: "user",columnName: "stat│
│us"})                                                                 │
├──────────────────────────────────────────────────────────────────────┤
│(:LcEnumMapping {id: 2,label: "禁用",tableName: "user",columnName: "stat│
│us"})                                                                 │
└──────────────────────────────────────────────────────────────────────┘

一些疑问

如果想把数据库中一张表的数据,全部同步到 neo4j。要如何配置实现?

还是说只能是一次全量的同步?

我们下一篇测试下大量的数据处理,然后看一下具体效果。

参考资料

https://seatunnel.apache.org/docs/2.3.3/contribution/contribute-transform-v2-guide