checkpoint
这个功能能力比较重要,重点学习一下。
此处以 v2.3.3 为例。
savepoint 与 checkpoint
savepoint 是以 checkpoint 为基础实现的。
savepoint 可以让我们保存+恢复一个任务。
我们这里重点看一下 checkpoint
server 配置
- seatunnel.yaml
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 300000
timeout: 10000
storage:
type: localfile
max-retained: 3
plugin-config:
namespace: C:\ProgramData\seatunnel\checkpoint\
这里的 checkpoint 部分,对应着 checkpoint 的配置。
checkpoint 属性
每一个配置的描述,参见 ServerConfigOptions 源码
public static final Option<Integer> CHECKPOINT_INTERVAL =
Options.key("interval")
.intType()
.defaultValue(300000)
.withDescription(
"The interval (in milliseconds) between two consecutive checkpoints.");
public static final Option<Integer> CHECKPOINT_TIMEOUT =
Options.key("timeout")
.intType()
.defaultValue(30000)
.withDescription("The timeout (in milliseconds) for a checkpoint.");
这里一个是间隔,一个是超时时间。
间隔太短,影响性能;太长,持久化可能导致数据时间间隔太长,所以需要均衡。
timeout 这个意味着什么?
持久化
checkpoint 需要持久化,所以 storage 这部分是关于持久化的配置。
比如基于本地 file,或者 HDFS。我们文件部分暂时不做深入。
因为他可以是任何一种持久化的实现。
public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE =
Options.key("storage")
.type(new TypeReference<CheckpointStorageConfig>() {})
.defaultValue(new CheckpointStorageConfig())
.withDescription("The checkpoint storage configuration.");
public static final Option<String> CHECKPOINT_STORAGE_TYPE =
Options.key("type")
.stringType()
.defaultValue("localfile")
.withDescription("The checkpoint storage type.");
public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED =
Options.key("max-retained")
.intType()
.defaultValue(20)
.withDescription("The maximum number of retained checkpoints.");
public static final Option<Map<String, String>> CHECKPOINT_STORAGE_PLUGIN_CONFIG =
Options.key("plugin-config")
.type(new TypeReference<Map<String, String>>() {})
.noDefaultValue()
.withDescription("The checkpoint storage instance configuration.");