savepoint 和使用 savepoint 进行恢复
savepoint 是使用检查点创建的,是作业执行状态的全局镜像,可用于作业或 SeaTunnel 的停止和恢复、升级等操作。
使用 savepoint
要使用 savepoint,您需要确保作业使用的连接器支持检查点,否则可能会导致数据丢失或重复。
确保作业正在运行
使用以下命令触发 savepoint:
./bin/seatunnel.sh -s {jobId}
执行成功后,将保存检查点数据并结束任务。
使用 savepoint 进行恢复
使用 jobId 从 savepoint 恢复
./bin/seatunnel.sh -c {jobConfig} -r {jobId}
实战测试
本地安装
参考: ETL-03-简化版 SeaTunnel install windows10 单机 WSL 安装笔记
本地 cdc 测试
参考
准备工作
数据库启动 binlog.
建表语句
drop table if exists user_info;
create table user_info
(
id int unsigned auto_increment comment '主键' primary key,
username varchar(128) not null comment '用户名',
create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间',
update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间'
) comment '枚举映射表' ENGINE=Innodb default charset=utf8mb4 auto_increment=1;
create unique index user_info on user_info (username) comment '标识索引';
数据初始化
创建一些测试数据。
insert into user_info (username) values ('binlog-add-01');
insert into user_info (username) values ('binlog-add-02');
insert into user_info (username) values ('binlog-add-03');
insert into user_info (username) values ('binlog-add-04');
insert into user_info (username) values ('binlog-add-05');
数据确认:
mysql> select * from user_info;
+----+---------------+---------------------+---------------------+
| id | username | create_time | update_time |
+----+---------------+---------------------+---------------------+
| 14 | binlog-add-01 | 2024-02-21 14:17:54 | 2024-02-21 14:17:54 |
| 15 | binlog-add-02 | 2024-02-21 14:17:54 | 2024-02-21 14:17:54 |
| 16 | binlog-add-03 | 2024-02-21 14:17:54 | 2024-02-21 14:17:54 |
| 17 | binlog-add-04 | 2024-02-21 14:17:54 | 2024-02-21 14:17:54 |
| 18 | binlog-add-05 | 2024-02-21 14:17:55 | 2024-02-21 14:17:55 |
+----+---------------+---------------------+---------------------+
5 rows in set (0.00 sec)
配置查看
这里简化一下,因为主要需要看一下主要对于任务是否可以暂停+恢复。
mysql_cdc_to_console.conf 放在 /home/dh/bigdata/seatunnel-2.3.3/config
目录下。
- mysql_cdc_to_console.conf
# Defining the runtime environment
env {
# You can set flink configuration here
parallelism = 1
job.mode = "STREAMING"
job.name = "etl.user_info-streaming"
checkpoint.interval = 10000
}
source{
MySQL-CDC {
base-url = "jdbc:mysql://127.0.0.1:13306/etl?useSSL=false&serverTimezone=Asia/Shanghai"
driver = "com.mysql.cj.jdbc.Driver"
username = "admin"
password = "123456"
table-names = ["etl.user_info"]
startup.mode = "initial"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
ConsoleBinlog {}
}
ConsoleBinlog 是一个自定义的日志输出,避免 binglog 直接输出报错的,简单的 console 输出实现。
只是为了演示,实际改成需要的 sink 即可。
seatunnel 服务启动
#进入安装目录
cd /home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3
# 关闭
bash bin/stop-seatunnel-cluster.sh
# 启动服务
nohup bash bin/seatunnel-cluster.sh 2>&1 &
确认:
$ jps
1050 Jps
892 SeaTunnelServer
启动日志在 nohup.out
seatunnel 指定配置文件,添加 job
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --config /home/dh/bigdata/seatunnel-2.3.3/config/mysql_cdc_to_console.conf -elocal
日志查看在文件:/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/logs/seatunnel-engine-server.log
启动时测试日志为:
2024-02-21 14:29:34,678 INFO org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - output rowType: create_time<TIMESTAMP>, id<BIGINT>, update_time<TIMESTAMP>, username<STRING>
2024-02-21 14:29:36,539 INFO org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-02-21 14:29:36.537 >>>>>>>>>>> SeaTunnelRow{tableId=etl.user_info, kind=+I, fields=[2024-02-21T14:17:54, 14, 2024-02-21T14:17:54, binlog-add-01]}
2024-02-21 14:29:36,540 INFO org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-02-21 14:29:36.539 >>>>>>>>>>> SeaTunnelRow{tableId=etl.user_info, kind=+I, fields=[2024-02-21T14:17:54, 15, 2024-02-21T14:17:54, binlog-add-02]}
2024-02-21 14:29:36,541 INFO org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-02-21 14:29:36.540 >>>>>>>>>>> SeaTunnelRow{tableId=etl.user_info, kind=+I, fields=[2024-02-21T14:17:54, 16, 2024-02-21T14:17:54, binlog-add-03]}
2024-02-21 14:29:36,543 INFO org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-02-21 14:29:36.542 >>>>>>>>>>> SeaTunnelRow{tableId=etl.user_info, kind=+I, fields=[2024-02-21T14:17:54, 17, 2024-02-21T14:17:54, binlog-add-04]}
2024-02-21 14:29:36,544 INFO org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-02-21 14:29:36.543 >>>>>>>>>>> SeaTunnelRow{tableId=etl.user_info, kind=+I, fields=[2024-02-21T14:17:55, 18, 2024-02-21T14:17:55, binlog-add-05]}
插入测试
mysql 插入一条:
insert into user_info (username) values ('binlog-add-06');
会有一条新的输出:
2024-02-21 14:35:28,587 INFO org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-02-21 14:35:28.585 >>>>>>>>>>> SeaTunnelRow{tableId=etl.user_info, kind=+I, fields=[2024-02-21T14:35:28, 19, 2024-02-21T14:35:28, binlog-add-06]}
查看任务列表
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --list
可以获取到对应的任务列表
2024-02-21 14:33:33,073 INFO com.hazelcast.client.impl.statistics.ClientStatisticsService - Client statistics is enabled with period 5 seconds.
Job ID Job Name Job Status Submit Time Finished Time
------------------ --------- ---------- ----------------------- -----------------------
812571631246376961 SeaTunnel RUNNING 2024-02-21 14:29:34.08
812571631246376961 就是我们刚才的任务标识。
任务暂停+savepoint
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh -s 812571631246376961
这个命令可以触发指定 jobid 的 savepoint,然后发现任务已经被暂停了。
执行成功后,将保存检查点数据并结束任务。
2024-02-21 14:36:44,715 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2024-02-21 14:36:44,722 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel] [5.1] Removed connection to endpoint: [localhost]:5801:13affa6c-bd6c-4e53-82b7-03585fcfc599, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:45369->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2024-02-21 14:36:44.713, lastWriteTime=2024-02-21 14:36:44.712, closedTime=2024-02-21 14:36:44.719, connected server version=5.1}
2024-02-21 14:36:44,722 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2024-02-21 14:36:44,730 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2024-02-21 14:36:44,730 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2024-02-21 14:36:44,730 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2024-02-21 14:36:44,732 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
查看任务确认
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --list
发现任务的状态变成了完成:
2024-02-21 14:38:08,132 INFO com.hazelcast.client.impl.statistics.ClientStatisticsService - Client statistics is enabled with period 5 seconds.
Job ID Job Name Job Status Submit Time Finished Time
------------------ --------- ---------- ----------------------- -----------------------
812571631246376961 SeaTunnel FINISHED 2024-02-21 14:29:34.08 2024-02-21 14:36:44.696
使用 savepoint 进行恢复
使用 jobId 从 savepoint 恢复
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --config /home/dh/bigdata/seatunnel-2.3.3/config/mysql_cdc_to_console.conf -elocal -r 812571631246376961
和原来比,多了一个 -r 812571631246376961
确认任务
/home/dh/bigdata/seatunnel-2.3.3/backend/apache-seatunnel-2.3.3/bin/seatunnel.sh --list
如下:
2024-02-21 14:40:01,903 INFO com.hazelcast.client.impl.statistics.ClientStatisticsService - Client statistics is enabled with period 5 seconds.
Job ID Job Name Job Status Submit Time Finished Time
------------------ --------- ---------- ----------------------- -----------------------
812571631246376961 SeaTunnel RUNNING 2024-02-21 14:39:31.309
console
可以启动时发现,此时的 console 并没有额外的日志输出。
我们在 mysql 再插入一条数据,可以发现出现的增量日志:
insert into user_info (username) values ('binlog-add-07');
此时控台多出一条实际日志:
2024-02-21 14:42:11,144 INFO org.apache.seatunnel.connectors.seatunnel.consolebinlog.sink.ConsoleBinlogSinkWriter - ConsoleBinlogSinkWriter ================= 2024-02-21 14:42:11.142 >>>>>>>>>>> SeaTunnelRow{tableId=etl.user_info, kind=+I, fields=[2024-02-21T14:42:10, 20, 2024-02-21T14:42:10, binlog-add-07]}
参考资料
https://seatunnel.apache.org/docs/2.3.3/seatunnel-engine/savepoint