命令行

最基本的,可以直接 shell 执行

./bin/seatunnel.sh -c config/v2.batch.config.template -m local

本地方法

在写单测的时候,基于方法执行。

个人理解应该是封装了 shell 命令:

public static void main(String[] args)
        throws FileNotFoundException, URISyntaxException, CommandException {
    String configurePath = args.length > 0 ? args[0] : "/examples/mysql_to_console.conf";
    String configFile = getTestConfigFile(configurePath);
    ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
    clientCommandArgs.setConfigFile(configFile);
    clientCommandArgs.setCheckConfig(false);
    clientCommandArgs.setJobName(Paths.get(configFile).getFileName().toString());
    // Change Execution Mode to CLUSTER to use client mode, before do this, you should start
    // SeaTunnelEngineServerExample
    clientCommandArgs.setMasterType(MasterType.LOCAL);
    SeaTunnel.run(clientCommandArgs.buildCommand());
}

web 是如何执行的

web 配置

web 页面简单的搭建了一下,这个可以配置,指定任务执行的,到底是如何实现的?

JobExecutorController

主要可以看一下这个任务执行类,底层实现。

    @Override
    public Result<Long> jobExecute(Integer userId, Long jobDefineId) {
        // 配置信息获取   
        JobExecutorRes executeResource =
                jobInstanceService.createExecuteResource(userId, jobDefineId);
        String jobConfig = executeResource.getJobConfig();

        String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId);

        // 执行
        Long jobInstanceId =
                executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId());
        return Result.success(jobInstanceId);
    }

最核心的是这一句:

executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId());

如下:

    public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
        Common.setDeployMode(DeployMode.CLIENT);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(jobInstanceId + "_job");
        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
        try {
            JobExecutionEnvironment jobExecutionEnv =
                    seaTunnelClient.createExecutionContext(filePath, jobConfig);
            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
            JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
            jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
            jobInstanceDao.update(jobInstance);

            CompletableFuture.runAsync(
                    () -> {
                        waitJobFinish(
                                clientJobProxy,
                                userId,
                                jobInstanceId,
                                Long.toString(clientJobProxy.getJobId()),
                                seaTunnelClient);
                    });

        } catch (ExecutionException | InterruptedException e) {
            ExceptionUtils.getMessage(e);
            throw new RuntimeException(e);
        }
        return jobInstanceId;
    }

这里是通过 SeaTunnelClient 实现的,我们看一下 seaTunnelClient.createExecutionContext(filePath, jobConfig)

1) 创建:

    private SeaTunnelClient createSeaTunnelClient() {
        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
        clientConfig.setClusterName(getClusterName("seatunnel"));
        return new SeaTunnelClient(clientConfig);
    }

2) 获取执行环境

    public JobExecutionEnvironment createExecutionContext(
            @NonNull String filePath, @NonNull JobConfig jobConfig) {
        return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
    }

3) 执行:

public ClientJobProxy execute() throws ExecutionException, InterruptedException {
    JobImmutableInformation jobImmutableInformation =
            new JobImmutableInformation(
                    Long.parseLong(jobConfig.getJobContext().getJobId()),
                    jobConfig.getName(),
                    isStartWithSavePoint,
                    seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
                    jobConfig,
                    new ArrayList<>(jarUrls));
    return jobClient.createJobProxy(jobImmutableInformation);
}

最后还是基于 ClientJobProxy 类实现的

ClientJobProxy 类

私有属性:

public class ClientJobProxy implements Job {
    private static final ILogger LOGGER = Logger.getLogger(ClientJobProxy.class);
    private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
    private final Long jobId;
    private JobResult jobResult;

任务执行:

private void submitJob(JobImmutableInformation jobImmutableInformation) {
    LOGGER.info(
            String.format(
                    "Start submit job, job id: %s, with plugin jar %s",
                    jobImmutableInformation.getJobId(),
                    jobImmutableInformation.getPluginJarsUrls()));

    ClientMessage request =
            SeaTunnelSubmitJobCodec.encodeRequest(
                    jobImmutableInformation.getJobId(),
                    seaTunnelHazelcastClient
                            .getSerializationService()
                            .toData(jobImmutableInformation));
    PassiveCompletableFuture<Void> submitJobFuture =
            seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
    submitJobFuture.join();
    LOGGER.info(
            String.format(
                    "Submit job finished, job id: %s, job name: %s",
                    jobImmutableInformation.getJobId(), jobImmutableInformation.getJobName()));
}

对应的 seaTunnelHazelcastClient

    public PassiveCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(
            @NonNull ClientMessage request) {
        // 获取 master 的 uuid
        UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();

        // 调用请求
        return requestAndGetCompletableFuture(masterUuid, request);
    }


    public PassiveCompletableFuture<Void> requestAndGetCompletableFuture(
            @NonNull UUID uuid, @NonNull ClientMessage request) {
        ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
        try {
            return new PassiveCompletableFuture<>(invocation.invoke().thenApply(r -> null));
        } catch (Throwable t) {
            throw ExceptionUtil.rethrow(t);
        }
    }

对应的 invoke 实现类:

    public ClientInvocationFuture invoke() {
        this.clientMessage.setCorrelationId(this.callIdSequence.next());
        this.invokeOnSelection();
        return this.clientInvocationFuture;
    }

    private void invokeOnSelection() {
        try {
            INVOKE_COUNT.incrementAndGet(this);
            if (!this.urgent) {
                this.invocationService.checkInvocationAllowed();
            }

            boolean invoked;
            if (this.isBindToSingleConnection()) {
                invoked = this.invocationService.invokeOnConnection(this, (ClientConnection)this.connection);
                if (!invoked) {
                    this.notifyExceptionWithOwnedPermission(new IOException("Could not invoke on connection " + this.connection));
                }

                return;
            }

            if (this.isSmartRoutingEnabled) {
                if (this.partitionId != -1) {
                    invoked = this.invocationService.invokeOnPartitionOwner(this, this.partitionId);
                } else if (this.uuid != null) {
                    invoked = this.invocationService.invokeOnTarget(this, this.uuid);
                } else {
                    invoked = this.invocationService.invoke(this);
                }

                if (this.allowRetryOnRandom && !invoked) {
                    invoked = this.invocationService.invoke(this);
                }
            } else {
                invoked = this.invocationService.invoke(this);
            }

            if (!invoked) {
                this.notifyExceptionWithOwnedPermission(new IOException("No connection found to invoke"));
            }
        } catch (Throwable var2) {
            this.notifyExceptionWithOwnedPermission(var2);
        }

    }