拓展阅读

ETL-10-apache SeaTunnel Connector v2 source mysql cdc

mysql binlog

说明

mysql cdc 看官方的推荐,应该是让通过 json 发送到 kafka。

不过这样也比较麻烦,如果只是简单的 cdc 监听处理,那发送到 kafka,然后再监听 kafka 处理,绕了一个大弯子。

有没有办法,直接监听 CDC 处理,然后写入到 neo4j 库中?

因为有时候 mysql 到 neo4j 可能一对多,我们这里自己实现一个插件,支持基于 CDC 的类别,做一个对应的列表处理。多个 cypher 放在一个事务中。

插件核心代码简介

下面是我们自定义的插件简单介绍。

配置例子

sink 配置例子:

sink {
    Neo4jMulti {
            source_table_name = "etl.user_info"

            uri = "bolt://localhost:7687"
            username = "neo4j"
            password = "12345678"
            database = "neo4j"

            max_transaction_retry_time = 1000
            max_connection_timeout = 1000

            # id,table_name,column_name,key,label,create_time,update_time
            queryConfigList = [
                {
                    rowKind = "INSERT"
                    query = "CREATE (a:UserInfoCdc {id: $id, username: $username})"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                },
                {
                    rowKind = "DELETE"
                    query = "MATCH (a:UserInfoCdc {id: $id, username: $username}) DELETE a"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                }
            ]
    }

}

配置解析核心代码

    // TODO: 这里其实应该细分为:加一个操作条件 create delete update
    private List<Neo4jMultiQueryPosConfig> buildConfigList(final Config config) {
        List<? extends ConfigObject> list = config.getObjectList(QUERY_PARAM_POSITION.key());

        List<Neo4jMultiQueryPosConfig> resultList = new ArrayList<>();

        // 对应的类被隐藏了,所以无法直接获取。
        for(ConfigObject configObject : list) {
            String rowKind = (String) configObject.get("rowKind").unwrapped();
            String query = (String) configObject.get("query").unwrapped();
            Map<String, Object>  queryParamPosition = (Map<String, Object>) configObject.get("queryParamPosition").unwrapped();

            Neo4jMultiQueryPosConfig result = new Neo4jMultiQueryPosConfig();
            result.setRowKind(rowKind);
            result.setQuery(query);
            result.setQueryParamPosition(queryParamPosition);
            resultList.add(result);
        }

        return resultList;
    }

sink 的核心代码

直接根据 cdc 的类别,过滤获取到 cypher 列表,然后放在一个事务中执行即可。

@Slf4j
public class Neo4jMultiSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {

    private final Neo4jMultiSinkQueryInfo neo4jSinkQueryInfo;
    private final transient Driver driver;
    private final transient Session session;
    private final List<Neo4jMultiSeaTunnelRowValue> writeBuffer;

    public Neo4jMultiSinkWriter(
            Neo4jMultiSinkQueryInfo neo4jSinkQueryInfo, SeaTunnelRowType seaTunnelRowType) {
        this.neo4jSinkQueryInfo = neo4jSinkQueryInfo;
        this.driver = this.neo4jSinkQueryInfo.getDriverBuilder().build();
        this.session =
                driver.session(
                        SessionConfig.forDatabase(
                                neo4jSinkQueryInfo.getDriverBuilder().getDatabase()));
        this.writeBuffer = new ArrayList<>();
    }

    @Override
    public void write(SeaTunnelRow element) throws IOException {
        writeOneByOne(element);
    }

    private void writeOneByOne(SeaTunnelRow element) {
        // 构建一个列表,能否开启一个事务处理?
        try {
            List<Neo4jMultiQueryPosConfig> filterConfigList = getFilterList(element);
            if(CollectionUtils.isEmpty(filterConfigList)) {
                return;
            }

            // 事务批量处理
            session.writeTransaction(
                    tx -> {
                        for(Neo4jMultiQueryPosConfig queryPosConfig : filterConfigList) {
                            Query query = buildNeo4jQuery(element, queryPosConfig);
                            tx.run(query);
                        }
                        return null;
                    });
        } catch (Neo4jException e) {
            throw new Neo4jConnectorException(
                    Neo4jConnectorErrorCode.DATE_BASE_ERROR, e.getMessage());
        }
    }

    private List<Neo4jMultiQueryPosConfig> getFilterList(SeaTunnelRow element) {
        // 根据配置,构建好每一个 query 信息
        final List<Neo4jMultiQueryPosConfig> queryConfigList = this.neo4jSinkQueryInfo.getQueryConfigList();

        // 过滤
        final String rowKind = element.getRowKind().name();
        final List<Neo4jMultiQueryPosConfig> filterConfigList = queryConfigList.stream()
                .filter(new Predicate<Neo4jMultiQueryPosConfig>() {
                    @Override
                    public boolean test(Neo4jMultiQueryPosConfig queryPosConfig) {
                        return rowKind.equalsIgnoreCase(queryPosConfig.getRowKind());
                    }
                }).collect(Collectors.toList());

        return filterConfigList;
    }

    private Query buildNeo4jQuery(final SeaTunnelRow element,
                                  final Neo4jMultiQueryPosConfig queryPosConfig) {
        final Map<String, Object> queryParamPosition =
                queryPosConfig.getQueryParamPosition().entrySet().stream()
                        .collect(
                                Collectors.toMap(
                                        Map.Entry::getKey,
                                        e -> element.getField((Integer) e.getValue())));
        final Query query = new Query(queryPosConfig.getQuery(), queryParamPosition);
        return query;
    }

    @Override
    public Optional<Void> prepareCommit() throws IOException {
        return Optional.empty();
    }

    @Override
    public void abortPrepare() {}

    @Override
    public void close() throws IOException {
        session.close();
        driver.close();
    }

}

mysql 准备

创建一个测试账户:

CREATE USER 'admin'@'%' IDENTIFIED BY '123456';
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;
flush privileges;

启用 binlog

# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id         = 223344
log_bin           = mysql-bin
expire_logs_days  = 10
binlog_format     = row
binlog_row_image  = FULL

# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on

建表

create database etl;
use etl;

创建测试表:

drop table if exists user_info;
create table user_info
(
    id int unsigned auto_increment comment '主键' primary key,
    username varchar(128) not null comment '用户名',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '枚举映射表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;
create unique index user_info on user_info (username) comment '标识索引';

drop table if exists user_info_bak;
create table user_info_bak
(
    id int unsigned auto_increment comment '主键' primary key,
    username varchar(128) not null comment '用户名',
    create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
    update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '枚举映射表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;
create unique index user_info_bak on user_info_bak (username) comment '标识索引';

v1-mysql cdc => neo4j multi CRUD 测试

添加依赖

添加对应的 cdc 依赖:

<dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>connector-cdc-mysql</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>connector-neo4j-multi</artifactId>
    <version>${project.version}</version>
</dependency>

配置

source 基于 mysql CDC

sink 中指定了增删改的对应操作。

# Defining the runtime environment
env {
  # You can set flink configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}
source{
    MySQL-CDC {
        base-url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.cj.jdbc.Driver"
        username = "admin"
        password = "123456"
        table-names = ["etl.user_info"]

        startup.mode = "initial"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Neo4jMulti {
            source_table_name = "etl.user_info"

            uri = "bolt://localhost:7687"
            username = "neo4j"
            password = "12345678"
            database = "neo4j"

            max_transaction_retry_time = 1000
            max_connection_timeout = 1000

            # id,table_name,column_name,key,label,create_time,update_time
            queryConfigList = [
                {
                    rowKind = "INSERT"
                    query = "CREATE (a:UserInfoCdc {id: $id, username: $username})"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                },
                {
                    rowKind = "DELETE"
                    query = "MATCH (a:UserInfoCdc {id: $id, username: $username}) DELETE a"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                },
                {
                    rowKind = "UPDATE_AFTER"
                    query = "MATCH (a:UserInfoCdc {id: $id}) SET a.username=$username"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                }
            ]
    }

}

启动效果

启动时,会有对应的一些 binlog 加载处理。

neo4j 会有对应的数据。

╒═════════════════════════════════════════════════╕
│n                                                │
╞═════════════════════════════════════════════════╡
│(:UserInfoCdc {id: 4,username: "binlog-add-04"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 5,username: "binlog-add-05"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 6,username: "binlog-add-01"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 7,username: "binlog-add-08"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 3,username: "binlog-edit-03"})│
└─────────────────────────────────────────────────┘

测试删除

mysql 执行

delete from user_info where id=7 and username='binlog-add-08';

因为我们 neo4j sink 中的的删除处理,neo4j 的数据变成:

╒═════════════════════════════════════════════════╕
│n                                                │
╞═════════════════════════════════════════════════╡
│(:UserInfoCdc {id: 4,username: "binlog-add-04"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 5,username: "binlog-add-05"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 6,username: "binlog-add-01"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 3,username: "binlog-edit-03"})│
└─────────────────────────────────────────────────┘

添加

mysql 中执行

insert into user_info (id, username) values (7, 'binlog-add-07');

neo4j 中数据对应变化为:

╒═════════════════════════════════════════════════╕
│n                                                │
╞═════════════════════════════════════════════════╡
│(:UserInfoCdc {id: 4,username: "binlog-add-04"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 5,username: "binlog-add-05"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 6,username: "binlog-add-01"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 7,username: "binlog-add-07"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 3,username: "binlog-edit-03"})│
└─────────────────────────────────────────────────┘

修改

mysql 中执行

update user_info set username='binlog-add-06' where id=6;

neo4j 中的数据也会根据 sink 对应变化:

╒═════════════════════════════════════════════════╕
│n                                                │
╞═════════════════════════════════════════════════╡
│(:UserInfoCdc {id: 4,username: "binlog-add-04"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 5,username: "binlog-add-05"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 6,username: "binlog-add-06"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 7,username: "binlog-add-07"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 3,username: "binlog-edit-03"})│
└─────────────────────────────────────────────────┘

v2-mysql cdc => neo4j multi 多个语句测试

说明

上面演示了对应 cdc 的增删改查。

我们下面展示一下 neo4j-multi 的另一个能力,多个 cypher 语句。

比如我们在执行 mysql cdc 增删改查同步时,同时记录每一次的操作日志。

编写 conf 文件

# Defining the runtime environment
env {
  # You can set flink configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}
source{
    MySQL-CDC {
        base-url = "jdbc:mysql://localhost:3306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
        driver = "com.mysql.cj.jdbc.Driver"
        username = "admin"
        password = "123456"
        table-names = ["etl.user_info"]

        startup.mode = "initial"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Neo4jMulti {
            source_table_name = "etl.user_info"

            uri = "bolt://localhost:7687"
            username = "neo4j"
            password = "12345678"
            database = "neo4j"

            max_transaction_retry_time = 1000
            max_connection_timeout = 1000

            # id,table_name,column_name,key,label,create_time,update_time
            queryConfigList = [
                {
                    rowKind = "INSERT"
                    query = "CREATE (a:UserInfoCdc {id: $id, username: $username})"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                },
                {
                    rowKind = "INSERT"
                    query = "CREATE (a:UserInfoCdcLog {id: $id, username: $username, rowKind: 'INSERT'})"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                },
                {
                    rowKind = "DELETE"
                    query = "MATCH (a:UserInfoCdc {id: $id, username: $username}) DELETE a"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                },
                {
                    rowKind = "DELETE"
                    query = "CREATE (a:UserInfoCdcLog {id: $id, username: $username, rowKind: 'DELETE'})"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                },
                {
                    rowKind = "UPDATE_AFTER"
                    query = "MATCH (a:UserInfoCdc {id: $id}) SET a.username=$username"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                },
                {
                    rowKind = "UPDATE_AFTER"
                    query = "CREATE (a:UserInfoCdcLog {id: $id, username: $username, rowKind: 'UPDATE'})"
                    queryParamPosition = {
                        id = 0
                        username = 1
                    }
                }
            ]
    }
}

我们在原来的 CRUD 之外,添加了对应的 UserInfoCdcLog 操作日志。

测试

启动效果

发现对应的 UserInfoCdc 和 UserInfoCdcLog 都有数据。

╒═════════════════════════════════════════════════╕
│n                                                │
╞═════════════════════════════════════════════════╡
│(:UserInfoCdc {id: 4,username: "binlog-add-04"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 5,username: "binlog-add-05"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 3,username: "binlog-edit-03"})│
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 6,username: "binlog-add-06"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 7,username: "binlog-add-07"}) │
└─────────────────────────────────────────────────┘

╒══════════════════════════════════════════════════════════════════════╕
│n                                                                     │
╞══════════════════════════════════════════════════════════════════════╡
│(:UserInfoCdcLog {id: 3,rowKind: "INSERT",username: "binlog-edit-03"})│
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 4,rowKind: "INSERT",username: "binlog-add-04"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 5,rowKind: "INSERT",username: "binlog-add-05"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 6,rowKind: "INSERT",username: "binlog-add-06"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 7,rowKind: "INSERT",username: "binlog-add-07"}) │
└──────────────────────────────────────────────────────────────────────┘

测试新增

mysql 执行

insert into user_info (id, username) values (8, 'binlog-add-08');

neo4j 数据:

╒═════════════════════════════════════════════════╕
│n                                                │
╞═════════════════════════════════════════════════╡
│(:UserInfoCdc {id: 4,username: "binlog-add-04"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 5,username: "binlog-add-05"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 3,username: "binlog-edit-03"})│
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 6,username: "binlog-add-06"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 7,username: "binlog-add-07"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 8,username: "binlog-add-08"}) │
└─────────────────────────────────────────────────┘

╒══════════════════════════════════════════════════════════════════════╕
│n                                                                     │
╞══════════════════════════════════════════════════════════════════════╡
│(:UserInfoCdcLog {id: 3,rowKind: "INSERT",username: "binlog-edit-03"})│
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 4,rowKind: "INSERT",username: "binlog-add-04"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 5,rowKind: "INSERT",username: "binlog-add-05"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 6,rowKind: "INSERT",username: "binlog-add-06"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 7,rowKind: "INSERT",username: "binlog-add-07"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 8,rowKind: "INSERT",username: "binlog-add-08"}) │
└──────────────────────────────────────────────────────────────────────┘

测试修改

mysql 执行:

update user_info set username = 'binlog-edit-08' where id=8;

neo4j 数据:

╒═════════════════════════════════════════════════╕
│n                                                │
╞═════════════════════════════════════════════════╡
│(:UserInfoCdc {id: 4,username: "binlog-add-04"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 5,username: "binlog-add-05"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 3,username: "binlog-edit-03"})│
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 6,username: "binlog-add-06"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 7,username: "binlog-add-07"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 8,username: "binlog-edit-08"})│
└─────────────────────────────────────────────────┘

╒══════════════════════════════════════════════════════════════════════╕
│n                                                                     │
╞══════════════════════════════════════════════════════════════════════╡
│(:UserInfoCdcLog {id: 3,rowKind: "INSERT",username: "binlog-edit-03"})│
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 4,rowKind: "INSERT",username: "binlog-add-04"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 5,rowKind: "INSERT",username: "binlog-add-05"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 6,rowKind: "INSERT",username: "binlog-add-06"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 7,rowKind: "INSERT",username: "binlog-add-07"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 8,rowKind: "INSERT",username: "binlog-add-08"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 8,rowKind: "UPDATE",username: "binlog-edit-08"})│
└──────────────────────────────────────────────────────────────────────┘

测试删除

mysql 中执行:

delete from user_info where id=8;

neo4j 中数据:

╒═════════════════════════════════════════════════╕
│n                                                │
╞═════════════════════════════════════════════════╡
│(:UserInfoCdc {id: 4,username: "binlog-add-04"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 5,username: "binlog-add-05"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 3,username: "binlog-edit-03"})│
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 6,username: "binlog-add-06"}) │
├─────────────────────────────────────────────────┤
│(:UserInfoCdc {id: 7,username: "binlog-add-07"}) │
└─────────────────────────────────────────────────┘

╒══════════════════════════════════════════════════════════════════════╕
│n                                                                     │
╞══════════════════════════════════════════════════════════════════════╡
│(:UserInfoCdcLog {id: 3,rowKind: "INSERT",username: "binlog-edit-03"})│
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 4,rowKind: "INSERT",username: "binlog-add-04"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 5,rowKind: "INSERT",username: "binlog-add-05"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 6,rowKind: "INSERT",username: "binlog-add-06"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 7,rowKind: "INSERT",username: "binlog-add-07"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 8,rowKind: "INSERT",username: "binlog-add-08"}) │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 8,rowKind: "UPDATE",username: "binlog-edit-08"})│
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: 8,rowKind: "DELETE",username: "binlog-edit-08"})│
└──────────────────────────────────────────────────────────────────────┘

windows wsl linux 服务端模式运行

包冲突

v2 的 connector,需要把 lib 下面的 connector 包删除。

系统默认优先从 connectors 中读取。

说明

服务端运行时,会有一些不同。

本地 mvn clean install 打包,把对应的包命名为 connector-neo4j-multi-2.3.3.jar 放入到 connectors 下。

但是发现依然会报错,说插件不存在。

为什么内置的可以,我们自己定义不行?

~/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/connectors$ ls
plugin-mapping.properties  seatunnel

看了下需要改一下 plugin-mapping.properties

加一下我们自定义的插件:

# SeaTunnel Connector-V2
seatunnel.sink.Neo4jMulti = connector-neo4j-multi

seatunnel.source.FakeSource = connector-fake
seatunnel.sink.Console = connector-console

后面的版本一定要保持一致。

2.2 服务启动

cd /home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3
bash bin/stop-seatunnel-cluster.sh
nohup bash bin/seatunnel-cluster.sh 2>&1 &

运行命令:

/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --config /home/dh/bigdata/seatunnel-2.3.3/config/mysql_cdc_to_neo4j_multi.conf

日志查看在

tail -f /home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/logs/seatunnel-engine-server.log

neo4j 启动效果

MATCH (n:UserInfoCdc) RETURN n LIMIT 25

数据:

╒═══════════════════════════════════════════════════════╕
│n                                                      │
╞═══════════════════════════════════════════════════════╡
│(:UserInfoCdc {id: "2024-01-19T13:27:05",username: 1}) │
├───────────────────────────────────────────────────────┤
│(:UserInfoCdc {id: "2024-01-19T13:27:12",username: 2}) │
├───────────────────────────────────────────────────────┤
│(:UserInfoCdc {id: "2024-01-19T13:27:31",username: 3}) │
├───────────────────────────────────────────────────────┤
│(:UserInfoCdc {id: "2024-01-19T13:28:20",username: 4}) │
├───────────────────────────────────────────────────────┤
│(:UserInfoCdc {id: "2024-01-19T13:30:43",username: 5}) │
├───────────────────────────────────────────────────────┤
│(:UserInfoCdc {id: "2024-01-19T15:06:25",username: 9}) │
├───────────────────────────────────────────────────────┤
│(:UserInfoCdc {id: "2024-01-19T15:07:12",username: 10})│
├───────────────────────────────────────────────────────┤
│(:UserInfoCdc {id: "2024-01-19T15:41:08",username: 11})│
└───────────────────────────────────────────────────────┘

neo4j 增量效果

mysql 测试插入效果:

insert into user_info (id, username) values (8, 'etl-cdc-08');

再次查看 neo4j 中的数据:

╒══════════════════════════════════════════════════════════════════════╕
│n                                                                     │
╞══════════════════════════════════════════════════════════════════════╡
│(:UserInfoCdcLog {id: "2024-01-19T13:30:43",rowKind: "INSERT",username│
│: 5})                                                                 │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-19T13:27:05",rowKind: "INSERT",username│
│: 1})                                                                 │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-19T15:06:25",rowKind: "INSERT",username│
│: 9})                                                                 │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-19T13:27:12",rowKind: "INSERT",username│
│: 2})                                                                 │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-19T15:07:12",rowKind: "INSERT",username│
│: 10})                                                                │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-19T13:27:31",rowKind: "INSERT",username│
│: 3})                                                                 │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-19T15:41:08",rowKind: "INSERT",username│
│: 11})                                                                │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-19T13:28:20",rowKind: "INSERT",username│
│: 4})                                                                 │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-23T10:09:39",rowKind: "INSERT",username│
│: 12})                                                                │
├──────────────────────────────────────────────────────────────────────┤
│(:UserInfoCdcLog {id: "2024-01-23T10:12:38",rowKind: "INSERT",username│
│: 13})                                                                │
└──────────────────────────────────────────────────────────────────────┘

PS: 这里的属性值好像错了,回来调整一下。

小结

到这里,可以发现 seaTunnel 的设计给后续的拓展提供了强大的灵活性基础。

只不过 v2 的 transform 相对比较弱,但是都可以自定义拓展。所以问题不大。