业务需求
测试的时候,执行了本地的一个单元测试,但是任务是如何执行的?
和 web 调用异曲同工之妙。
source
测试类
import org.apache.seatunnel.core.starter.SeaTunnel;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
public class SeaTunnelEngineExampleHttpToConsoleRawStreaming {
//file:/D:/_my/seatunnel-2.3.3-release-slim/seatunnel-engine/seatunnel-engine-common/target/classes/seatunnel.yaml
public static void main(String[] args)
throws FileNotFoundException, URISyntaxException, CommandException {
String configurePath = args.length > 0 ? args[0] : "/examples/http_to_consoleRaw.conf";
String configFile = getTestConfigFile(configurePath);
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setConfigFile(configFile);
clientCommandArgs.setCheckConfig(false);
clientCommandArgs.setJobName(Paths.get(configFile).getFileName().toString());
// Change Execution Mode to CLUSTER to use client mode, before do this, you should start
// SeaTunnelEngineServerExample
clientCommandArgs.setMasterType(MasterType.LOCAL);
SeaTunnel.run(clientCommandArgs.buildCommand());
}
public static String getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
URL resource = SeaTunnelEngineExampleHttpToConsoleRawStreaming.class.getResource(configFile);
if (resource == null) {
throw new FileNotFoundException("Can't find config file: " + configFile);
}
return Paths.get(resource.toURI()).toString();
}
}
下面的代码只是在构建参数:
String configurePath = args.length > 0 ? args[0] : "/examples/http_to_consoleRaw.conf";
String configFile = getTestConfigFile(configurePath);
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setConfigFile(configFile);
clientCommandArgs.setCheckConfig(false);
clientCommandArgs.setJobName(Paths.get(configFile).getFileName().toString());
// Change Execution Mode to CLUSTER to use client mode, before do this, you should start
// SeaTunnelEngineServerExample
clientCommandArgs.setMasterType(MasterType.LOCAL);
然后核心的执行代码是:
SeaTunnel.run(clientCommandArgs.buildCommand());
下面调用的还是 ClientExecuteCommand.execute()
方法。
执行 ClientExecuteCommand debug
下面的分支比较多,我们可以本地 debug 跟一下代码实现,重点看一下 ClientExecuteCommand#execute 方法
进入时:
JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null; //统计类?
LocalDateTime startTime = LocalDateTime.now(); // 开始时间
LocalDateTime endTime = LocalDateTime.now(); // 结束时间
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); //配置类
ConfigProvider.locateAndGetSeaTunnelConfig()
这一行,是去加载对应的配置引擎信息,暂时不做展开。
就是读取我们配置的配置文件:
EngineConfig(backupCount=1, printExecutionInfoInterval=60, printJobMetricsInfoInterval=60, jobMetricsBackupInterval=10, taskExecutionThreadShareMode=OFF, slotServiceConfig=SlotServiceConfig(dynamicSlot=true, slotNum=2), checkpointConfig=CheckpointConfig(checkpointInterval=300000, checkpointTimeout=10000, schemaChangeCheckpointTimeout=30000, storage=CheckpointStorageConfig(storage=localfile, maxRetainedCheckpoints=3, storagePluginConfig={namespace=C:\ProgramData\seatunnel\checkpoint\})), queueType=BLOCKINGQUEUE, historyJobExpireMinutes=1440)
Config{configurationUrl=null, configurationFile=null, classLoader=null, properties={hazelcast.operation.generic.thread.count=50, hazelcast.invocation.max.retry.count=20, hazelcast.tcp.join.port.try.count=30, hazelcast.logging.type=log4j2}, instanceName='null', clusterName='seatunnel', networkConfig=NetworkConfig{publicAddress='null', port=5801, portCount=100, portAutoIncrement=true, join=JoinConfig{multicastConfig=MulticastConfig [enabled=false, multicastGroup=224.2.2.3, multicastPort=54327, multicastTimeToLive=32, multicastTimeoutSeconds=2, trustedInterfaces=[], ...
对应我们的
- seatunnel.yaml
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 300000
timeout: 10000
storage:
type: localfile
max-retained: 3
plugin-config:
namespace: C:\ProgramData\seatunnel\checkpoint\
- hazelcast.yaml
hazelcast:
cluster-name: seatunnel
network:
rest-api:
enabled: true
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
tcp-ip:
enabled: true
member-list:
- localhost
port:
auto-increment: true
port-count: 100
port: 5801
properties:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 50
本地模式
我们本地测试,用的是本地模式
String clusterName = clientCommandArgs.getClusterName(); //默认是 null,我们未设置
if (clientCommandArgs.getMasterType().equals(MasterType.LOCAL)) {
// 随机创建一个名称?比如 seatunnel-926147
clusterName =
creatRandomClusterName(
StringUtils.isNotEmpty(clusterName)
? clusterName
: Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
// 创建一个内部的服务
instance = createServerInLocal(clusterName, seaTunnelConfig);
// 设置集群名称
if (StringUtils.isNotEmpty(clusterName)) {
seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
}
}
我们重点看一下 createServerInLocal 方法:
private HazelcastInstance createServerInLocal(
String clusterName, SeaTunnelConfig seaTunnelConfig) {
seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
seaTunnelConfig.getHazelcastConfig().getNetworkConfig().setPortAutoIncrement(true);
return HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
new SeaTunnelNodeContext(seaTunnelConfig));
}
这个是基于 clusterName+seaTunnelConfig,基于分布式内存 Hazelcast 直接常见一个 server。
client 创建
下面一段是创建 SeaTunnelClient 的代码。
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
if (StringUtils.isNotEmpty(clusterName)) {
clientConfig.setClusterName(clusterName);
}
engineClient = new SeaTunnelClient(clientConfig);
ConfigProvider.locateAndGetClientConfig() 用于加载 client 配置,也是解析 yaml 配置
- hazelcast-client.yaml
hazelcast-client:
cluster-name: seatunnel
properties:
hazelcast.logging.type: log4j2
network:
cluster-members:
- localhost:5801
- localhost:5802
- localhost:5803
- localhost:5804
- localhost:5805
- localhost:5806
- localhost:5807
- localhost:5808
- localhost:5809
- localhost:5810
- localhost:5811
- localhost:5812
- localhost:5813
- localhost:5814
- localhost:5815
配置信息 debug 内容截取:
ClientConfig{properties={hazelcast.logging.type=log4j2}, clusterName=seatunnel, securityConfig=ClientSecurityConfig{identityConfig=null, realmConfigs={}}, networkConfig=ClientNetworkConfig{addressList=[localhost:5801, localhost:5802, localhost:5803, localhost:5804, localhost:5805, localhost:5806, localhost:5807, localhost:5808, localhost:5809, localhost:5810, localhost:5811, localhost:5812, localhost:5813, localhost:5814, localhost:5815], ....
job 类别=》任务执行
任务状态 –list
不同的任务类别,处理方式应该是不同的。
if (clientCommandArgs.isListJob()) {
String jobStatus = engineClient.getJobClient().listJobStatus(true);
System.out.println(jobStatus);
}
这个对应的是命令行查看任务状态:
@Parameter(
names = {"-l", "--list"},
description = "list job status")
private boolean listJob = false;
获取运行状态 –get_running_job_metrics
else if (clientCommandArgs.isGetRunningJobMetrics()) {
String runningJobMetrics = engineClient.getJobClient().getRunningJobMetrics();
System.out.println(runningJobMetrics);
}
对应命令:
@Parameter(
names = {"--get_running_job_metrics"},
description = "Gets metrics for running jobs")
private boolean getRunningJobMetrics = false;
jobId 存在,显示 任务状态 -j
else if (null != clientCommandArgs.getJobId()) {
String jobState =
engineClient
.getJobClient()
.getJobDetailStatus(Long.parseLong(clientCommandArgs.getJobId()));
System.out.println(jobState);
}
@Parameter(
names = {"-j", "--job-id"},
description = "Get job status by JobId")
private String jobId;
这里没指定。
任务取消 -can
else if (null != clientCommandArgs.getCancelJobId()) {
engineClient
.getJobClient()
.cancelJob(Long.parseLong(clientCommandArgs.getCancelJobId()));
}
对应:
@Parameter(
names = {"-can", "--cancel-job"},
description = "Cancel job by JobId")
private String cancelJobId;
统计信息 –metrics
else if (null != clientCommandArgs.getMetricsJobId()) {
String jobMetrics =
engineClient
.getJobClient()
.getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId()));
System.out.println(jobMetrics);
}
对应:
@Parameter(
names = {"--metrics"},
description = "Get job metrics by JobId")
private String metricsJobId;
保存点 -s
else if (null != clientCommandArgs.getSavePointJobId()) {
engineClient
.getJobClient()
.savePointJob(Long.parseLong(clientCommandArgs.getSavePointJobId()));
}
对应:
@Parameter(
names = {"-s", "--savepoint"},
description = "savepoint job by jobId")
private String savePointJobId;
其他
如果不是上述的分支:
// 这里获取到的就是我们指定的测试配置文件:~/examples/http_to_consoleRaw.conf
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
JobConfig jobConfig = new JobConfig();
JobExecutionEnvironment jobExecutionEnv;
jobConfig.setName(clientCommandArgs.getJobName());
// restore 存在
if (null != clientCommandArgs.getRestoreJobId()) {
jobExecutionEnv =
engineClient.restoreExecutionContext(
configFile.toString(),
jobConfig,
Long.parseLong(clientCommandArgs.getRestoreJobId()));
} else {
// 本地测试不存在,走到了这里
// 根据 config 文件,创建执行上下文。
jobExecutionEnv =
engineClient.createExecutionContext(configFile.toString(), jobConfig);
}
// 这里其实额外加几个参数比较好,任务触发,到任务开始执行,中间等待了一段时间
// get job start time
startTime = LocalDateTime.now();
// create job proxy
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
// 本地模式,禁止异步?
if (clientCommandArgs.isAsync()) {
if (clientCommandArgs.getMasterType().equals(MasterType.LOCAL)) {
log.warn("The job is running in local mode, can not use async mode.");
} else {
return;
}
}
// register cancelJob hook
// 取消任务的钩子函数
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
log.info(
"run shutdown hook because get close signal");
shutdownHook(clientJobProxy);
});
try {
future.get(15, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Cancel job failed.", e);
}
}));
// get job id
long jobId = clientJobProxy.getJobId();
JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
executorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("job-metrics-runner-%d")
.setDaemon(true)
.build());
executorService.scheduleAtFixedRate(
jobMetricsRunner,
0,
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
TimeUnit.SECONDS);
// wait for job complete
jobStatus = clientJobProxy.waitForJobComplete();
// get job end time
endTime = LocalDateTime.now();
// get job statistic information when job finished
jobMetricsSummary = engineClient.getJobMetricsSummary(jobId);
}
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
这里创建了对应的任务执行代理:
public ClientJobProxy execute() throws ExecutionException, InterruptedException {
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
Long.parseLong(jobConfig.getJobContext().getJobId()),
jobConfig.getName(),
isStartWithSavePoint,
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
jobConfig,
new ArrayList<>(jarUrls));
// 创建代理,就是往下调用。
return jobClient.createJobProxy(jobImmutableInformation);
}
public ClientJobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
return new ClientJobProxy(hazelcastClient, jobImmutableInformation);
}
public ClientJobProxy(
@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
@NonNull JobImmutableInformation jobImmutableInformation) {
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobId = jobImmutableInformation.getJobId();
submitJob(jobImmutableInformation);
}
submitJob(jobImmutableInformation); 提交任务如下:
private void submitJob(JobImmutableInformation jobImmutableInformation) {
LOGGER.info(
String.format(
"Start submit job, job id: %s, with plugin jar %s",
jobImmutableInformation.getJobId(),
jobImmutableInformation.getPluginJarsUrls()));
// 客户端请求信息
ClientMessage request =
SeaTunnelSubmitJobCodec.encodeRequest(
jobImmutableInformation.getJobId(),
seaTunnelHazelcastClient
.getSerializationService()
.toData(jobImmutableInformation));
// 调用 master,并且获取到 Future 对象
// 看到这里,感觉和 web 触发一样
// 看起来是本地模式,实际上还是通过网络通信。保证分布式情况下的一致性
PassiveCompletableFuture<Void> submitJobFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
submitJobFuture.join();
LOGGER.info(
String.format(
"Submit job finished, job id: %s, job name: %s",
jobImmutableInformation.getJobId(), jobImmutableInformation.getJobName()));
}
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request)
展开如下:
public PassiveCompletableFuture<Void> requestOnMasterAndGetCompletableFuture(
@NonNull ClientMessage request) {
//获取 masterUUID
UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
// 请求
return requestAndGetCompletableFuture(masterUuid, request);
}
public PassiveCompletableFuture<Void> requestAndGetCompletableFuture(
@NonNull UUID uuid, @NonNull ClientMessage request) {
ClientInvocation invocation = new ClientInvocation(hazelcastClient, request, null, uuid);
try {
return new PassiveCompletableFuture<>(invocation.invoke().thenApply(r -> null));
} catch (Throwable t) {
throw ExceptionUtil.rethrow(t);
}
}
// 对象构建
protected ClientInvocation(HazelcastClientInstanceImpl client, ClientMessage clientMessage, Object objectName, int partitionId, UUID uuid, Connection connection) {
super(((ClientInvocationServiceImpl)client.getInvocationService()).invocationLogger);
this.allowRetryOnRandom = true;
this.lifecycleService = client.getLifecycleService();
this.invocationService = (ClientInvocationServiceImpl)client.getInvocationService();
this.executionService = client.getTaskScheduler();
this.objectName = objectName;
this.clientMessage = clientMessage;
this.partitionId = partitionId;
this.uuid = uuid;
this.connection = connection;
this.startTimeMillis = System.currentTimeMillis();
this.retryPauseMillis = this.invocationService.getInvocationRetryPauseMillis();
this.callIdSequence = this.invocationService.getCallIdSequence();
this.clientInvocationFuture = new ClientInvocationFuture(this, clientMessage, this.logger, this.callIdSequence);
this.invocationTimeoutMillis = this.invocationService.getInvocationTimeoutMillis();
this.isSmartRoutingEnabled = this.invocationService.isSmartRoutingEnabled();
}
invocation.invoke()
如下:
public ClientInvocationFuture invoke() {
this.clientMessage.setCorrelationId(this.callIdSequence.next());
this.invokeOnSelection();
return this.clientInvocationFuture;
}
private void invokeOnSelection() {
try {
INVOKE_COUNT.incrementAndGet(this);
if (!this.urgent) {
this.invocationService.checkInvocationAllowed();
}
boolean invoked;
if (this.isBindToSingleConnection()) {
invoked = this.invocationService.invokeOnConnection(this, (ClientConnection)this.connection);
if (!invoked) {
this.notifyExceptionWithOwnedPermission(new IOException("Could not invoke on connection " + this.connection));
}
return;
}
if (this.isSmartRoutingEnabled) {
if (this.partitionId != -1) {
invoked = this.invocationService.invokeOnPartitionOwner(this, this.partitionId);
} else if (this.uuid != null) {
invoked = this.invocationService.invokeOnTarget(this, this.uuid);
} else {
invoked = this.invocationService.invoke(this);
}
if (this.allowRetryOnRandom && !invoked) {
invoked = this.invocationService.invoke(this);
}
} else {
invoked = this.invocationService.invoke(this);
}
if (!invoked) {
this.notifyExceptionWithOwnedPermission(new IOException("No connection found to invoke"));
}
} catch (Throwable var2) {
this.notifyExceptionWithOwnedPermission(var2);
}
}
小结
从本地调用,看起来比较简单。
但是考虑到分布式调度,这个问题就必须要涉及到分布式的网络请求。
后续如果自己设计类似的框架,也可以参考这个工具。
可以让本地调用+远程调用,对应用户的体验是一样的。