简介
检查点是一种容错恢复机制。此机制确保当程序运行时,即使突然遇到异常,也能够自我恢复。
检查点存储
检查点存储是用于存储检查点数据的存储机制。
SeaTunnel Engine 支持以下检查点存储类型:
- HDFS(OSS、S3、HDFS、LocalFile)
- LocalFile(本地文件)(已弃用:请改用 Hdfs(LocalFile))
我们使用了微内核设计模式,将检查点存储模块与引擎分离。这使用户能够实现自己的检查点存储模块。
checkpoint-storage-api 是检查点存储模块的 API,它定义了检查点存储模块的接口。
如果您想要实现自己的检查点存储模块,需要实现 CheckpointStorage 并提供相应的 CheckpointStorageFactory 实现。
检查点存储配置
seatunnel-server 模块的配置位于 seatunnel.yaml 文件中。
seatunnel:
engine:
checkpoint:
storage:
type: hdfs #plugin name of checkpoint storage, we support hdfs(S3, local, hdfs), localfile (native local file) is the default, but this plugin is de
# plugin configuration
plugin-config:
namespace: #checkpoint storage parent path, the default value is /seatunnel/checkpoint/
K1: V1 # plugin other configuration
K2: V2 # plugin other configuration
注意:namespace 必须以 “/” 结尾。
OSS
阿里云 OSS 基于 HDFS-File,因此您可以 参考 Hadoop OSS 文档配置 OSS。
除了与 OSS 存储桶交互时,OSS 客户端需要与存储桶交互所需的凭证。该客户端支持多种身份验证机制,并且可以配置使用哪些机制以及它们的使用顺序。还可以使用 org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider 的自定义实现。如果使用 AliyunCredentialsProvider(可以从阿里云 Access Key Management 获取),这些凭证包括访问密钥和密钥。您可以进行如下配置:
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: oss
oss.bucket: your-bucket
fs.oss.accessKeyId: your-access-key
fs.oss.accessKeySecret: your-secret-key
fs.oss.endpoint: endpoint address
fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
关于 Hadoop Credential Provider API 的详细信息请参阅:Credential Provider API。
阿里云 OSS Credential Provider 实现请参阅:Auth Credential Providers。
S3
S3 基于 HDFS-File,因此您可以参考 Hadoop S3 文档配置 S3。
除了与公共 S3 存储桶交互时,S3A 客户端需要与存储桶交互所需的凭证。
该客户端支持多种身份验证机制,并且可以配置使用哪些机制以及它们的使用顺序。还可以使用 com.amazonaws.auth.AWSCredentialsProvider 的自定义实现。
如果使用 SimpleAWSCredentialsProvider(可以从亚马逊安全令牌服务获取),这些凭证包括访问密钥和密钥。您可以进行如下配置:
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: s3
s3.bucket: your-bucket
fs.s3a.access.key: your-access-key
fs.s3a.secret.key: your-secret-key
fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
如果使用 InstanceProfileCredentialsProvider,它支持在 EC2 VM 中运行时使用实例配置文件凭证,您可以检查 iam-roles-for-amazon-ec2。您可以进行如下配置:
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: s3
s3.bucket: your-bucket
fs.s3a.endpoint: your-endpoint
fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.InstanceProfileCredentialsProvider
关于 Hadoop Credential Provider API 的详细信息请参阅:Credential Provider API。
HDFS
如果您使用 HDFS,可以进行如下配置:
seatunnel:
engine:
checkpoint:
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: hdfs
fs.defaultFS: hdfs://localhost:9000
# 如果使用 Kerberos,可以进行如下配置:
kerberosPrincipal: your-kerberos-principal
kerberosKeytab: your-kerberos-keytab
如果 HDFS 处于 HA 模式下,可以进行如下配置:
seatunnel:
engine:
checkpoint:
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: hdfs
fs.defaultFS: hdfs://usdp-bing
seatunnel.hadoop.dfs.nameservices: usdp-bing
seatunnel.hadoop.dfs.ha.namenodes.usdp-bing: nn1,nn2
seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1: usdp-bing-nn1:8020
seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2: usdp-bing-nn2:8020
seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
如果 HDFS 在 hdfs-site.xml 或 core-site.xml 中有其他配置,请使用 seatunnel.hadoop. 前缀设置 HDFS 配置。
LocalFile
seatunnel:
engine:
checkpoint:
interval: 6000
timeout: 7000
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: hdfs
fs.defaultFS: file:/// # Ensure that the directory has written permission
实际测试
上面的写法本地启动不一定成功,我本地实际测试的配置文件如下:
- seatunnel.yaml
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 300000
timeout: 10000
storage:
type: localfile
max-retained: 3
plugin-config:
namespace: C:\ProgramData\seatunnel\checkpoint\
# storage:
# type: hdfs
# max-retained: 3
# plugin-config:
# namespace: /tmp/seatunnel/checkpoint_snapshot/
# storage.type: hdfs
# fs.defaultFS: file:///tmp/
参考资料
https://seatunnel.apache.org/docs/2.3.3/seatunnel-engine/checkpoint-storage