参考资料
执行的任务比较长,checkpoint 的配置在文件:
异常
大概类似于
[Bug] [SeaTunnel Engine] NullPointerException when send data to doris
发生了什么事情:
我想将一亿条数据发送到 Doris,但任务总是因为 NullPointerException 而失败。我尝试了三次。
Doris 版本:1.2.7.1 Doris 表:
CREATE TABLE IF NOT EXISTS olap.seatunnel_fake (
id BIGINT COMMENT "主键",
name VARCHAR(256) COMMENT "名称",
age INT COMMENT "年龄",
time DATETIME COMMENT "时间"
) DUPLICATE KEY(id, name, age) DISTRIBUTED BY HASH(id) BUCKETS 8 PROPERTIES ("replication_num" = "1");
SeaTunnel 版本: SeaTunnel 2.3.3 Connector Doris 2.3.3 Connector Fake 2.3.3
SeaTunnel Config
env {
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
FakeSource {
result_table_name = "user"
row.num = 100000000
int.min = 1
int.max = 120
bigint.min = 1
bigint.max = 10000000
schema = {
fields {
id = "bigint"
name = "string"
age = "int"
time = "timestamp"
}
}
}
}
sink {
Doris {
source_table_name = "user"
fenodes = "10.58.33.158:8030"
username = "root"
password = ""
table.identifier = "olap.seatunnel_user"
sink.label-prefix = "test-seatunnel"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
Error Exception
2023-11-02 17:47:51,825 ERROR org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - report error from task
org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.NullPointerException
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:41)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:46)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:28)
at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:139)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.triggerBarrier(SourceFlowLifeCycle.java:268)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.triggerBarrier(SourceSeaTunnelTask.java:112)
at org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation.lambda$null$0(BarrierFlowOperation.java:90)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819)
at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835)
at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:553)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.ack(SeaTunnelTask.java:348)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:67)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:39)
... 10 more
Caused by: java.lang.NullPointerException
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:682)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:270)
at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:344) [seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:188) [seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [seatunnel-starter.jar:2.3.3]
2023-11-02 17:47:51,825 INFO org.apache.http.impl.execchain.RetryExec - I/O exception (java.net.SocketException) caught when processing request to {}->http://10.58.33.158:8040: Socket closed
2023-11-02 17:47:51,825 ERROR org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation - [localhost]:5801 [seatunnel-121748] [5.1] null
java.lang.NullPointerException: null
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:682) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:270) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81) ~[seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [seatunnel-starter.jar:2.3.3]
2023-11-02 17:47:51,826 ERROR org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - report error from task
org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.NullPointerException
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:41)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:46)
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.received(IntermediateQueueFlowLifeCycle.java:28)
at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:139)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.triggerBarrier(SourceFlowLifeCycle.java:268)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.triggerBarrier(SourceSeaTunnelTask.java:112)
at org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation.lambda$null$0(BarrierFlowOperation.java:90)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819)
at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835)
at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:553)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.ack(SeaTunnelTask.java:348)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:67)
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.received(IntermediateBlockingQueue.java:39)
... 10 more
Caused by: java.lang.NullPointerException
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:682)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:270)
at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:344) [seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:188) [seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.run(CheckpointErrorReportOperation.java:48) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) [seatunnel-starter.jar:2.3.3]
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) [seatunnel-starter.jar:2.3.3]
2023-11-02 17:47:51,838 INFO org.apache.seatunnel.engine.server.master.JobMaster - release the pipeline Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] resource
2023-11-02 17:47:51,839 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 772395272109031425, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=772395272109031425, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='db5f11ec-73f2-4f11-a6e2-ce5e1bf4925c'}
2023-11-02 17:47:51,839 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 772395272109031425, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=772395272109031425, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='db5f11ec-73f2-4f11-a6e2-ce5e1bf4925c'}
2023-11-02 17:47:53,817 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] cancel error will retry
java.lang.InterruptedException: null
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347) ~[?:1.8.0_272]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_272]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipelineTasks(SubPlan.java:461) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:417) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:341) ~[seatunnel-starter.jar:2.3.3]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_272]
at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:338) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:266) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:532) ~[seatunnel-starter.jar:2.3.3]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_272]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_272]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_272]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_272]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
2023-11-02 17:47:53,817 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] count = 1
2023-11-02 17:47:53,818 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] turn to end state FAILED.
2023-11-02 17:47:53,818 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] count = 0
2023-11-02 17:47:53,818 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] is in end state FAILED, can not be cancel
2023-11-02 17:47:53,818 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - cancel job Job SeaTunnel_Job (772395272109031425) because makeJobEndWhenPipelineEnded is true
2023-11-02 17:47:53,818 ERROR org.apache.seatunnel.engine.server.dag.physical.SubPlan - Pipeline is trying to leave terminal state FAILED
2023-11-02 17:47:53,819 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] cancel error
java.lang.IllegalStateException: Pipeline is trying to leave terminal state FAILED
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:348) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:414) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:341) ~[seatunnel-starter.jar:2.3.3]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_272]
at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:338) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:266) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:251) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:474) ~[seatunnel-starter.jar:2.3.3]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) [?:1.8.0_272]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) [?:1.8.0_272]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_272]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
2023-11-02 17:47:53,819 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (772395272109031425) turn from state RUNNING to CANCELLING.
2023-11-02 17:47:53,819 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] count = 0
2023-11-02 17:47:53,819 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] is in end state FAILED, can not be cancel
2023-11-02 17:47:53,819 ERROR org.apache.seatunnel.engine.server.dag.physical.SubPlan - Pipeline is trying to leave terminal state FAILED
2023-11-02 17:47:53,819 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] cancel error
java.lang.IllegalStateException: Pipeline is trying to leave terminal state FAILED
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:348) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:414) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:341) ~[seatunnel-starter.jar:2.3.3]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_272]
at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:338) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:266) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:251) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:474) ~[seatunnel-starter.jar:2.3.3]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) [?:1.8.0_272]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) [?:1.8.0_272]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_272]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
2023-11-02 17:47:53,819 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] count = 0
2023-11-02 17:47:53,819 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] is in end state FAILED, can not be cancel
2023-11-02 17:47:53,820 ERROR org.apache.seatunnel.engine.server.dag.physical.SubPlan - Pipeline is trying to leave terminal state FAILED
2023-11-02 17:47:53,820 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] cancel error
java.lang.IllegalStateException: Pipeline is trying to leave terminal state FAILED
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:348) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:414) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:341) ~[seatunnel-starter.jar:2.3.3]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_272]
at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:338) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:266) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:251) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:474) ~[seatunnel-starter.jar:2.3.3]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) [?:1.8.0_272]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) [?:1.8.0_272]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_272]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
2023-11-02 17:47:53,820 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - start cancel job Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] count = 0
2023-11-02 17:47:53,820 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] is in end state FAILED, can not be cancel
2023-11-02 17:47:53,820 ERROR org.apache.seatunnel.engine.server.dag.physical.SubPlan - Pipeline is trying to leave terminal state FAILED
2023-11-02 17:47:53,820 WARN org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (772395272109031425), Pipeline: [(1/1)] cancel error
java.lang.IllegalStateException: Pipeline is trying to leave terminal state FAILED
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:348) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.cancelPipeline(SubPlan.java:414) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.dag.physical.SubPlan.handleCheckpointError(SubPlan.java:659) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.master.JobMaster.lambda$handleCheckpointError$2(JobMaster.java:341) ~[seatunnel-starter.jar:2.3.3]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_272]
at org.apache.seatunnel.engine.server.master.JobMaster.handleCheckpointError(JobMaster.java:338) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.handleCheckpointError(CheckpointManager.java:180) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:266) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:251) ~[seatunnel-starter.jar:2.3.3]
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:474) ~[seatunnel-starter.jar:2.3.3]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) [?:1.8.0_272]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) [?:1.8.0_272]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_272]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_272]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
如何解决?
一些回答
A1
我也遇到了类似的情况,我在使用 Hive 场景(数据量:一亿以上)向 Doris 传输数据时遇到了问题。我发现问题的原因是当数据量很大时,栅栏消息滞后,导致检查点超时,而检查点超时又导致回滚任务。我认为可以采用类似 Flink 的解决方案,提高栅栏消息的优先级。
[Fix][Zeta] Fix CheckpointCoordinator report NPE when ack not existed pending checkpoint
启动脚本调整
或者源头为 cdc,能否把每一次的大小修改小一点
https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/MySQL-CDC
snapshot.fetch.size=1024 整数类型
优化的思路:也可以考虑一下,为什么性能这么差?3/s。
后来发现,依然会报错。可见这个没有用。
配置文件
- seatunnel.yaml
可以把这里的检查间隔修改大一些,一个小时?
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 100
timeout: 100
storage:
type: localfile
max-retained: 3
plugin-config:
namespace: C:\ProgramData\seatunnel\checkpoint\
这两个单位应该都是 integer,单位毫秒。
所以也不能太大。
可以考虑把时间间隔调整的非常大,然后每天定时用 cron 触发保存 checkpoint?
参考资料
https://github.com/apache/seatunnel/issues/5555