与微服务体系的结合
2025/8/30大约 13 分钟
随着微服务架构的普及,传统的单体应用逐渐被拆分为多个独立的服务。在这种架构下,任务调度系统需要与微服务生态系统深度集成,以充分发挥其价值。本文将深入探讨调度平台如何与Spring Cloud/Spring Boot集成、如何与配置中心联动、以及如何利用服务发现实现智能任务路由等关键技术。
Spring Cloud/Spring Boot 集成调度框架
在微服务架构中,Spring Boot和Spring Cloud已成为主流的技术栈。将调度框架与这些技术深度集成,可以大大提高开发效率和系统可维护性。
Spring Boot 集成 Quartz
// Quartz 配置类
@Configuration
@EnableScheduling
public class QuartzConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);
factory.setJobFactory(autoWiringSpringBeanJobFactory());
factory.setQuartzProperties(quartzProperties());
return factory;
}
@Bean
public SpringBeanJobFactory autoWiringSpringBeanJobFactory() {
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public Properties quartzProperties() {
Properties prop = new Properties();
prop.put("org.quartz.scheduler.instanceName", "SpringBootScheduler");
prop.put("org.quartz.scheduler.instanceId", "AUTO");
prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
prop.put("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
prop.put("org.quartz.jobStore.dataSource", "quartzDataSource");
prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
prop.put("org.quartz.jobStore.isClustered", "true");
prop.put("org.quartz.jobStore.clusterCheckinInterval", "20000");
prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
prop.put("org.quartz.threadPool.threadCount", "10");
return prop;
}
@Autowired
private ApplicationContext applicationContext;
}
// 支持依赖注入的 Job 工厂
public class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory
implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object job = super.createJobInstance(bundle);
applicationContext.getAutowireCapableBeanFactory().autowireBean(job);
return job;
}
}
// 示例 Job 类
@Component
public class SampleJob implements Job {
@Autowired
private SomeService someService; // 可以直接注入 Spring Bean
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
// 使用注入的服务
someService.performTask();
// 记录执行日志
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String taskId = dataMap.getString("taskId");
System.out.println("执行任务: " + taskId);
} catch (Exception e) {
throw new JobExecutionException("任务执行失败", e);
}
}
}
// 任务调度服务
@Service
public class JobSchedulingService {
@Autowired
private Scheduler scheduler;
/**
* 创建并调度任务
*/
public void scheduleJob(String jobName, String groupName, String cronExpression,
Map<String, Object> jobData) throws SchedulerException {
// 创建 JobDetail
JobDetail jobDetail = JobBuilder.newJob(SampleJob.class)
.withIdentity(jobName, groupName)
.usingJobData(new JobDataMap(jobData))
.build();
// 创建 Trigger
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobName + "Trigger", groupName)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
// 调度任务
scheduler.scheduleJob(jobDetail, trigger);
}
/**
* 暂停任务
*/
public void pauseJob(String jobName, String groupName) throws SchedulerException {
scheduler.pauseJob(new JobKey(jobName, groupName));
}
/**
* 恢复任务
*/
public void resumeJob(String jobName, String groupName) throws SchedulerException {
scheduler.resumeJob(new JobKey(jobName, groupName));
}
/**
* 删除任务
*/
public void deleteJob(String jobName, String groupName) throws SchedulerException {
scheduler.deleteJob(new JobKey(jobName, groupName));
}
}
Spring Cloud 集成分布式调度
// 分布式调度配置
@Configuration
@EnableEurekaClient
public class DistributedSchedulingConfig {
@Bean
@ConditionalOnMissingBean
public DistributedLock distributedLock() {
// 基于 Redis 的分布式锁实现
return new RedisDistributedLock();
}
@Bean
@ConditionalOnMissingBean
public JobRegistry jobRegistry() {
// 基于 Eureka 的任务注册中心
return new EurekaJobRegistry();
}
}
// 基于 Redis 的分布式锁
@Component
public class RedisDistributedLock {
private final RedisTemplate<String, String> redisTemplate;
private final StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
public RedisDistributedLock(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 尝试获取分布式锁
*/
public boolean tryLock(String lockKey, String lockValue, long expireTime) {
Boolean result = redisTemplate.execute((RedisCallback<Boolean>) connection -> {
byte[] key = stringRedisSerializer.serialize(lockKey);
byte[] value = stringRedisSerializer.serialize(lockValue);
Expiration expiration = Expiration.from(expireTime, TimeUnit.SECONDS);
return connection.set(key, value, expiration, RedisStringCommands.SetOption.SET_IF_ABSENT);
});
return Boolean.TRUE.equals(result);
}
/**
* 释放分布式锁
*/
public void releaseLock(String lockKey, String lockValue) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey), lockValue);
}
}
// 基于 Eureka 的任务注册中心
@Component
public class EurekaJobRegistry {
private final EurekaClient eurekaClient;
private final ApplicationEventPublisher eventPublisher;
public EurekaJobRegistry(EurekaClient eurekaClient, ApplicationEventPublisher eventPublisher) {
this.eurekaClient = eurekaClient;
this.eventPublisher = eventPublisher;
}
/**
* 注册任务
*/
public void registerJob(String jobId, String jobName, String groupName) {
InstanceInfo instanceInfo = eurekaClient.getApplicationInfoManager().getInfo();
JobRegistration registration = new JobRegistration();
registration.setJobId(jobId);
registration.setJobName(jobName);
registration.setGroupName(groupName);
registration.setInstanceId(instanceInfo.getInstanceId());
registration.setHostName(instanceInfo.getHostName());
registration.setPort(instanceInfo.getPort());
registration.setRegistrationTime(System.currentTimeMillis());
// 发布任务注册事件
eventPublisher.publishEvent(new JobRegisteredEvent(registration));
}
/**
* 获取可用的任务执行节点
*/
public List<String> getAvailableJobNodes(String jobName) {
Application application = eurekaClient.getApplication("JOB-SERVICE");
if (application == null) {
return Collections.emptyList();
}
return application.getInstances().stream()
.filter(instance -> instance.getStatus() == InstanceInfo.InstanceStatus.UP)
.map(instance -> instance.getHostName() + ":" + instance.getPort())
.collect(Collectors.toList());
}
}
// 任务注册信息
public class JobRegistration {
private String jobId;
private String jobName;
private String groupName;
private String instanceId;
private String hostName;
private int port;
private long registrationTime;
// getters and setters
public String getJobId() { return jobId; }
public void setJobId(String jobId) { this.jobId = jobId; }
public String getJobName() { return jobName; }
public void setJobName(String jobName) { this.jobName = jobName; }
public String getGroupName() { return groupName; }
public void setGroupName(String groupName) { this.groupName = groupName; }
public String getInstanceId() { return instanceId; }
public void setInstanceId(String instanceId) { this.instanceId = instanceId; }
public String getHostName() { return hostName; }
public void setHostName(String hostName) { this.hostName = hostName; }
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
public long getRegistrationTime() { return registrationTime; }
public void setRegistrationTime(long registrationTime) { this.registrationTime = registrationTime; }
}
// 任务注册事件
public class JobRegisteredEvent extends ApplicationEvent {
private final JobRegistration registration;
public JobRegisteredEvent(JobRegistration registration) {
super(registration);
this.registration = registration;
}
public JobRegistration getRegistration() {
return registration;
}
}
配置中心与调度的联动
在微服务架构中,配置中心(如 Spring Cloud Config、Apollo、Nacos)用于统一管理应用配置。调度系统与配置中心的联动可以实现任务配置的动态调整。
基于 Spring Cloud Config 的配置管理
// 调度配置属性
@ConfigurationProperties(prefix = "scheduling")
@Component
public class SchedulingProperties {
private Map<String, JobConfig> jobs = new HashMap<>();
private int defaultThreadPoolSize = 10;
private long defaultTimeoutMs = 300000; // 5分钟
// getters and setters
public Map<String, JobConfig> getJobs() { return jobs; }
public void setJobs(Map<String, JobConfig> jobs) { this.jobs = jobs; }
public int getDefaultThreadPoolSize() { return defaultThreadPoolSize; }
public void setDefaultThreadPoolSize(int defaultThreadPoolSize) { this.defaultThreadPoolSize = defaultThreadPoolSize; }
public long getDefaultTimeoutMs() { return defaultTimeoutMs; }
public void setDefaultTimeoutMs(long defaultTimeoutMs) { this.defaultTimeoutMs = defaultTimeoutMs; }
}
// 任务配置
public class JobConfig {
private String name;
private String className;
private String cronExpression;
private Map<String, Object> parameters = new HashMap<>();
private int threadPoolSize = 5;
private long timeoutMs = 300000; // 5分钟
private boolean enabled = true;
// getters and setters
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getClassName() { return className; }
public void setClassName(String className) { this.className = className; }
public String getCronExpression() { return cronExpression; }
public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; }
public Map<String, Object> getParameters() { return parameters; }
public void setParameters(Map<String, Object> parameters) { this.parameters = parameters; }
public int getThreadPoolSize() { return threadPoolSize; }
public void setThreadPoolSize(int threadPoolSize) { this.threadPoolSize = threadPoolSize; }
public long getTimeoutMs() { return timeoutMs; }
public void setTimeoutMs(long timeoutMs) { this.timeoutMs = timeoutMs; }
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
}
// 动态调度配置管理器
@Component
public class DynamicSchedulingManager {
private final Scheduler scheduler;
private final SchedulingProperties schedulingProperties;
private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
public DynamicSchedulingManager(Scheduler scheduler, SchedulingProperties schedulingProperties) {
this.scheduler = scheduler;
this.schedulingProperties = schedulingProperties;
}
/**
* 根据配置初始化调度任务
*/
@PostConstruct
public void initializeScheduling() {
refreshScheduling();
}
/**
* 刷新调度配置
*/
@EventListener
public void handleRefreshEvent(EnvironmentChangeEvent event) {
refreshScheduling();
}
/**
* 刷新调度任务
*/
public void refreshScheduling() {
try {
// 获取当前所有任务
Set<JobKey> currentJobs = scheduler.getJobKeys(GroupMatcher.jobGroupEquals("DEFAULT"));
// 取消所有现有任务
for (JobKey jobKey : currentJobs) {
scheduler.deleteJob(jobKey);
}
// 根据新配置创建任务
for (Map.Entry<String, JobConfig> entry : schedulingProperties.getJobs().entrySet()) {
String jobId = entry.getKey();
JobConfig jobConfig = entry.getValue();
if (jobConfig.isEnabled()) {
scheduleJob(jobId, jobConfig);
}
}
} catch (SchedulerException e) {
throw new RuntimeException("刷新调度配置失败", e);
}
}
/**
* 调度单个任务
*/
private void scheduleJob(String jobId, JobConfig jobConfig) throws SchedulerException {
// 创建 JobDetail
Class<? extends Job> jobClass;
try {
jobClass = (Class<? extends Job>) Class.forName(jobConfig.getClassName());
} catch (ClassNotFoundException e) {
throw new RuntimeException("找不到任务类: " + jobConfig.getClassName(), e);
}
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(jobId, "DEFAULT")
.usingJobData(new JobDataMap(jobConfig.getParameters()))
.build();
// 创建 Trigger
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobId + "Trigger", "DEFAULT")
.withSchedule(CronScheduleBuilder.cronSchedule(jobConfig.getCronExpression()))
.build();
// 调度任务
scheduler.scheduleJob(jobDetail, trigger);
}
}
// 配置刷新监听器
@Component
public class ConfigRefreshListener {
private final DynamicSchedulingManager schedulingManager;
public ConfigRefreshListener(DynamicSchedulingManager schedulingManager) {
this.schedulingManager = schedulingManager;
}
@EventListener
public void handleConfigRefresh(EnvironmentChangeEvent event) {
System.out.println("检测到配置刷新,重新加载调度配置");
schedulingManager.refreshScheduling();
}
}
基于 Apollo 的配置管理
// Apollo 配置监听器
@Component
public class ApolloSchedulingConfigListener {
private final DynamicSchedulingManager schedulingManager;
public ApolloSchedulingConfigListener(DynamicSchedulingManager schedulingManager) {
this.schedulingManager = schedulingManager;
}
@ApolloConfigChangeListener
public void onChange(ConfigChangeEvent changeEvent) {
for (String changedKey : changeEvent.changedKeys()) {
if (changedKey.startsWith("scheduling.jobs")) {
System.out.println("调度配置发生变化: " + changedKey);
schedulingManager.refreshScheduling();
break;
}
}
}
}
// Apollo 配置示例
/*
scheduling.jobs.orderCloseJob.name=订单关闭任务
scheduling.jobs.orderCloseJob.className=com.example.job.OrderCloseJob
scheduling.jobs.orderCloseJob.cronExpression=0 */5 * * * ?
scheduling.jobs.orderCloseJob.parameters.timeout=300000
scheduling.jobs.orderCloseJob.enabled=true
scheduling.jobs.dataSyncJob.name=数据同步任务
scheduling.jobs.dataSyncJob.className=com.example.job.DataSyncJob
scheduling.jobs.dataSyncJob.cronExpression=0 0 2 * * ?
scheduling.jobs.dataSyncJob.parameters.sourceDb=jdbc:mysql://source:3306/db
scheduling.jobs.dataSyncJob.parameters.targetDb=jdbc:mysql://target:3306/db
scheduling.jobs.dataSyncJob.enabled=true
*/
服务发现与任务路由策略
在微服务架构中,服务发现机制(如 Eureka、Consul、Nacos)可以帮助调度系统动态发现可用的服务实例,并实现智能的任务路由。
基于服务发现的任务路由
// 任务路由策略接口
public interface TaskRoutingStrategy {
String selectServiceInstance(List<ServiceInstance> instances, TaskContext context);
}
// 轮询路由策略
@Component
public class RoundRobinRoutingStrategy implements TaskRoutingStrategy {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public String selectServiceInstance(List<ServiceInstance> instances, TaskContext context) {
if (instances.isEmpty()) {
return null;
}
int index = counter.getAndIncrement() % instances.size();
ServiceInstance instance = instances.get(index);
return instance.getHost() + ":" + instance.getPort();
}
}
// 负载均衡路由策略
@Component
public class LoadBalancedRoutingStrategy implements TaskRoutingStrategy {
private final LoadBalancerClient loadBalancerClient;
public LoadBalancedRoutingStrategy(LoadBalancerClient loadBalancerClient) {
this.loadBalancerClient = loadBalancerClient;
}
@Override
public String selectServiceInstance(List<ServiceInstance> instances, TaskContext context) {
ServiceInstance instance = loadBalancerClient.choose("task-execution-service");
if (instance != null) {
return instance.getHost() + ":" + instance.getPort();
}
return null;
}
}
// 基于标签的路由策略
@Component
public class TagBasedRoutingStrategy implements TaskRoutingStrategy {
@Override
public String selectServiceInstance(List<ServiceInstance> instances, TaskContext context) {
String requiredTag = context.getRequiredTag();
if (requiredTag == null || requiredTag.isEmpty()) {
// 如果没有指定标签,使用轮询策略
return new RoundRobinRoutingStrategy().selectServiceInstance(instances, context);
}
// 查找具有指定标签的实例
List<ServiceInstance> taggedInstances = instances.stream()
.filter(instance -> {
Map<String, String> metadata = instance.getMetadata();
return metadata != null && requiredTag.equals(metadata.get("tag"));
})
.collect(Collectors.toList());
if (!taggedInstances.isEmpty()) {
// 在符合条件的实例中随机选择一个
int index = new Random().nextInt(taggedInstances.size());
ServiceInstance instance = taggedInstances.get(index);
return instance.getHost() + ":" + instance.getPort();
}
return null;
}
}
// 任务上下文
public class TaskContext {
private final String taskId;
private final String taskType;
private final Map<String, Object> parameters;
private String requiredTag;
public TaskContext(String taskId, String taskType, Map<String, Object> parameters) {
this.taskId = taskId;
this.taskType = taskType;
this.parameters = new HashMap<>(parameters);
}
// getters and setters
public String getTaskId() { return taskId; }
public String getTaskType() { return taskType; }
public Map<String, Object> getParameters() { return new HashMap<>(parameters); }
public String getRequiredTag() { return requiredTag; }
public void setRequiredTag(String requiredTag) { this.requiredTag = requiredTag; }
}
// 服务发现任务执行器
@Component
public class ServiceDiscoveryTaskExecutor {
private final DiscoveryClient discoveryClient;
private final TaskRoutingStrategy routingStrategy;
private final RestTemplate restTemplate;
public ServiceDiscoveryTaskExecutor(DiscoveryClient discoveryClient,
TaskRoutingStrategy routingStrategy,
RestTemplate restTemplate) {
this.discoveryClient = discoveryClient;
this.routingStrategy = routingStrategy;
this.restTemplate = restTemplate;
}
/**
* 执行远程任务
*/
public TaskExecutionResult executeRemoteTask(String serviceName, TaskContext context) {
// 发现服务实例
List<ServiceInstance> instances = discoveryClient.getInstances(serviceName);
if (instances.isEmpty()) {
return TaskExecutionResult.failure("未找到可用的服务实例: " + serviceName);
}
// 选择服务实例
String selectedInstance = routingStrategy.selectServiceInstance(instances, context);
if (selectedInstance == null) {
return TaskExecutionResult.failure("无法选择合适的服务实例");
}
// 构造任务执行URL
String taskUrl = "http://" + selectedInstance + "/api/tasks/execute";
try {
// 执行远程任务
ResponseEntity<TaskExecutionResult> response = restTemplate.postForEntity(
taskUrl, context, TaskExecutionResult.class);
return response.getBody();
} catch (Exception e) {
return TaskExecutionResult.failure("任务执行失败: " + e.getMessage());
}
}
}
// 任务执行结果
public class TaskExecutionResult {
private final boolean success;
private final String message;
private final Object resultData;
private final long executionTimeMs;
private TaskExecutionResult(boolean success, String message, Object resultData, long executionTimeMs) {
this.success = success;
this.message = message;
this.resultData = resultData;
this.executionTimeMs = executionTimeMs;
}
public static TaskExecutionResult success(String message, Object resultData) {
return new TaskExecutionResult(true, message, resultData, System.currentTimeMillis());
}
public static TaskExecutionResult failure(String message) {
return new TaskExecutionResult(false, message, null, System.currentTimeMillis());
}
// getters
public boolean isSuccess() { return success; }
public String getMessage() { return message; }
public Object getResultData() { return resultData; }
public long getExecutionTimeMs() { return executionTimeMs; }
}
// 远程任务控制器
@RestController
@RequestMapping("/api/tasks")
public class RemoteTaskController {
@PostMapping("/execute")
public ResponseEntity<TaskExecutionResult> executeTask(@RequestBody TaskContext context) {
try {
// 根据任务类型执行相应的逻辑
Object result = performTask(context);
return ResponseEntity.ok(TaskExecutionResult.success("任务执行成功", result));
} catch (Exception e) {
return ResponseEntity.ok(TaskExecutionResult.failure("任务执行失败: " + e.getMessage()));
}
}
private Object performTask(TaskContext context) {
String taskType = context.getTaskType();
switch (taskType) {
case "DATA_PROCESSING":
return processData(context.getParameters());
case "REPORT_GENERATION":
return generateReport(context.getParameters());
default:
throw new IllegalArgumentException("不支持的任务类型: " + taskType);
}
}
private Object processData(Map<String, Object> parameters) {
// 实现数据处理逻辑
return "数据处理完成";
}
private Object generateReport(Map<String, Object> parameters) {
// 实现报告生成逻辑
return "报告生成完成";
}
}
智能任务路由实现
// 智能任务路由管理器
@Component
public class SmartTaskRoutingManager {
private final DiscoveryClient discoveryClient;
private final Map<String, TaskRoutingStrategy> routingStrategies;
private final LoadBalancerClient loadBalancerClient;
public SmartTaskRoutingManager(DiscoveryClient discoveryClient,
List<TaskRoutingStrategy> strategies,
LoadBalancerClient loadBalancerClient) {
this.discoveryClient = discoveryClient;
this.loadBalancerClient = loadBalancerClient;
// 注册路由策略
this.routingStrategies = new HashMap<>();
for (TaskRoutingStrategy strategy : strategies) {
if (strategy instanceof RoundRobinRoutingStrategy) {
routingStrategies.put("ROUND_ROBIN", strategy);
} else if (strategy instanceof LoadBalancedRoutingStrategy) {
routingStrategies.put("LOAD_BALANCED", strategy);
} else if (strategy instanceof TagBasedRoutingStrategy) {
routingStrategies.put("TAG_BASED", strategy);
}
}
}
/**
* 根据任务配置选择路由策略并执行任务
*/
public TaskExecutionResult routeAndExecuteTask(TaskRoutingConfig config, TaskContext context) {
// 获取服务实例
List<ServiceInstance> instances = discoveryClient.getInstances(config.getServiceName());
if (instances.isEmpty()) {
return TaskExecutionResult.failure("未找到服务实例: " + config.getServiceName());
}
// 选择路由策略
TaskRoutingStrategy strategy = selectRoutingStrategy(config.getRoutingStrategy(), instances);
if (strategy == null) {
return TaskExecutionResult.failure("未找到合适的路由策略: " + config.getRoutingStrategy());
}
// 选择服务实例
String selectedInstance = strategy.selectServiceInstance(instances, context);
if (selectedInstance == null) {
return TaskExecutionResult.failure("无法选择服务实例");
}
// 执行任务
return executeTaskOnInstance(selectedInstance, context);
}
/**
* 选择路由策略
*/
private TaskRoutingStrategy selectRoutingStrategy(String strategyName, List<ServiceInstance> instances) {
if (strategyName == null || strategyName.isEmpty()) {
// 默认使用负载均衡策略
return routingStrategies.get("LOAD_BALANCED");
}
return routingStrategies.get(strategyName.toUpperCase());
}
/**
* 在选定的实例上执行任务
*/
private TaskExecutionResult executeTaskOnInstance(String instanceAddress, TaskContext context) {
String taskUrl = "http://" + instanceAddress + "/api/tasks/execute";
try {
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<TaskExecutionResult> response = restTemplate.postForEntity(
taskUrl, context, TaskExecutionResult.class);
return response.getBody();
} catch (Exception e) {
return TaskExecutionResult.failure("任务执行失败: " + e.getMessage());
}
}
}
// 任务路由配置
public class TaskRoutingConfig {
private String serviceName;
private String routingStrategy;
private Map<String, Object> routingParameters = new HashMap<>();
// constructors, getters and setters
public TaskRoutingConfig(String serviceName, String routingStrategy) {
this.serviceName = serviceName;
this.routingStrategy = routingStrategy;
}
// getters and setters
public String getServiceName() { return serviceName; }
public void setServiceName(String serviceName) { this.serviceName = serviceName; }
public String getRoutingStrategy() { return routingStrategy; }
public void setRoutingStrategy(String routingStrategy) { this.routingStrategy = routingStrategy; }
public Map<String, Object> getRoutingParameters() { return routingParameters; }
public void setRoutingParameters(Map<String, Object> routingParameters) { this.routingParameters = routingParameters; }
}
// 基于健康检查的路由策略
@Component
public class HealthAwareRoutingStrategy implements TaskRoutingStrategy {
private final RestTemplate restTemplate;
public HealthAwareRoutingStrategy(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
@Override
public String selectServiceInstance(List<ServiceInstance> instances, TaskContext context) {
// 过滤出健康的实例
List<ServiceInstance> healthyInstances = instances.stream()
.filter(this::isInstanceHealthy)
.collect(Collectors.toList());
if (healthyInstances.isEmpty()) {
return null;
}
// 在健康实例中随机选择
int index = new Random().nextInt(healthyInstances.size());
ServiceInstance instance = healthyInstances.get(index);
return instance.getHost() + ":" + instance.getPort();
}
/**
* 检查实例是否健康
*/
private boolean isInstanceHealthy(ServiceInstance instance) {
String healthUrl = "http://" + instance.getHost() + ":" + instance.getPort() + "/actuator/health";
try {
ResponseEntity<Map> response = restTemplate.getForEntity(healthUrl, Map.class);
if (response.getStatusCode().is2xxSuccessful()) {
Map<String, Object> healthInfo = response.getBody();
String status = (String) healthInfo.get("status");
return "UP".equalsIgnoreCase(status);
}
} catch (Exception e) {
// 忽略异常,认为实例不健康
}
return false;
}
}
微服务调度集成示例
// 微服务调度集成配置
@Configuration
@EnableScheduling
public class MicroserviceSchedulingConfig {
@Bean
public TaskRoutingStrategy roundRobinRoutingStrategy() {
return new RoundRobinRoutingStrategy();
}
@Bean
public TaskRoutingStrategy loadBalancedRoutingStrategy(LoadBalancerClient loadBalancerClient) {
return new LoadBalancedRoutingStrategy(loadBalancerClient);
}
@Bean
public TaskRoutingStrategy tagBasedRoutingStrategy() {
return new TagBasedRoutingStrategy();
}
@Bean
public TaskRoutingStrategy healthAwareRoutingStrategy(RestTemplate restTemplate) {
return new HealthAwareRoutingStrategy(restTemplate);
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
// 调度任务示例
@Component
public class MicroserviceScheduledTasks {
@Autowired
private SmartTaskRoutingManager routingManager;
@Autowired
private SchedulingProperties schedulingProperties;
// 每5分钟执行一次数据同步任务
@Scheduled(cron = "0 */5 * * * ?")
public void executeDataSyncTask() {
TaskRoutingConfig config = new TaskRoutingConfig("data-processing-service", "LOAD_BALANCED");
Map<String, Object> parameters = new HashMap<>();
parameters.put("source", "database");
parameters.put("target", "data-warehouse");
parameters.put("batchSize", 1000);
TaskContext context = new TaskContext("data-sync-001", "DATA_PROCESSING", parameters);
TaskExecutionResult result = routingManager.routeAndExecuteTask(config, context);
if (result.isSuccess()) {
System.out.println("数据同步任务执行成功: " + result.getMessage());
} else {
System.err.println("数据同步任务执行失败: " + result.getMessage());
}
}
// 每天凌晨2点执行报告生成任务
@Scheduled(cron = "0 0 2 * * ?")
public void executeReportGenerationTask() {
TaskRoutingConfig config = new TaskRoutingConfig("report-service", "TAG_BASED");
Map<String, Object> parameters = new HashMap<>();
parameters.put("reportType", "DAILY");
parameters.put("date", LocalDate.now().minusDays(1).toString());
TaskContext context = new TaskContext("daily-report-001", "REPORT_GENERATION", parameters);
context.setRequiredTag("high-memory"); // 指定需要高内存的实例
TaskExecutionResult result = routingManager.routeAndExecuteTask(config, context);
if (result.isSuccess()) {
System.out.println("报告生成任务执行成功: " + result.getMessage());
} else {
System.err.println("报告生成任务执行失败: " + result.getMessage());
}
}
}
最佳实践
1. 配置管理最佳实践
// 配置验证器
@Component
public class SchedulingConfigValidator {
public void validateSchedulingProperties(SchedulingProperties properties) {
for (Map.Entry<String, JobConfig> entry : properties.getJobs().entrySet()) {
String jobId = entry.getKey();
JobConfig jobConfig = entry.getValue();
// 验证 Cron 表达式
try {
new CronExpression(jobConfig.getCronExpression());
} catch (ParseException e) {
throw new IllegalArgumentException("无效的 Cron 表达式 for job " + jobId, e);
}
// 验证任务类是否存在
try {
Class.forName(jobConfig.getClassName());
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("任务类不存在: " + jobConfig.getClassName());
}
// 验证超时时间
if (jobConfig.getTimeoutMs() <= 0) {
throw new IllegalArgumentException("超时时间必须大于0: " + jobId);
}
}
}
}
// 配置变更监听器
@Component
public class ConfigChangeHandler {
private final DynamicSchedulingManager schedulingManager;
private final SchedulingConfigValidator configValidator;
public ConfigChangeHandler(DynamicSchedulingManager schedulingManager,
SchedulingConfigValidator configValidator) {
this.schedulingManager = schedulingManager;
this.configValidator = configValidator;
}
@EventListener
public void handleConfigChange(EnvironmentChangeEvent event) {
try {
// 验证新配置
SchedulingProperties newProperties = getNewSchedulingProperties();
configValidator.validateSchedulingProperties(newProperties);
// 应用新配置
schedulingManager.refreshScheduling();
System.out.println("调度配置更新成功");
} catch (Exception e) {
System.err.println("调度配置更新失败: " + e.getMessage());
// 可以考虑回滚到之前的配置
}
}
private SchedulingProperties getNewSchedulingProperties() {
// 获取最新的配置属性
return new SchedulingProperties();
}
}
2. 服务发现最佳实践
// 服务实例健康检查器
@Component
public class ServiceInstanceHealthChecker {
private final RestTemplate restTemplate;
private final Map<String, Long> lastCheckTime = new ConcurrentHashMap<>();
private final Map<String, Boolean> healthStatus = new ConcurrentHashMap<>();
private final long checkIntervalMs = 30000; // 30秒检查间隔
public ServiceInstanceHealthChecker(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
/**
* 检查服务实例是否健康
*/
public boolean isHealthy(ServiceInstance instance) {
String instanceKey = instance.getHost() + ":" + instance.getPort();
long currentTime = System.currentTimeMillis();
Long lastCheck = lastCheckTime.get(instanceKey);
// 如果距离上次检查时间不足检查间隔,直接返回缓存的结果
if (lastCheck != null && (currentTime - lastCheck) < checkIntervalMs) {
return healthStatus.getOrDefault(instanceKey, false);
}
// 执行健康检查
boolean isHealthy = performHealthCheck(instance);
lastCheckTime.put(instanceKey, currentTime);
healthStatus.put(instanceKey, isHealthy);
return isHealthy;
}
private boolean performHealthCheck(ServiceInstance instance) {
String healthUrl = "http://" + instance.getHost() + ":" + instance.getPort() + "/actuator/health";
try {
ResponseEntity<Map> response = restTemplate.getForEntity(healthUrl, Map.class, 5000);
if (response.getStatusCode().is2xxSuccessful()) {
Map<String, Object> healthInfo = response.getBody();
String status = (String) healthInfo.get("status");
return "UP".equalsIgnoreCase(status);
}
} catch (Exception e) {
// 记录异常日志
System.err.println("健康检查失败 for " + instanceKey + ": " + e.getMessage());
}
return false;
}
}
总结
通过与微服务架构的深度集成,调度平台能够更好地适应现代分布式系统的复杂需求。关键要点包括:
- Spring Boot/Cloud 集成:利用 Spring 生态的优势,简化调度任务的开发和部署
- 配置中心联动:实现任务配置的动态调整,提高系统的灵活性
- 服务发现集成:动态发现服务实例,实现智能的任务路由
- 健康感知路由:基于服务实例的健康状态进行任务分配,提高系统可靠性
在实际应用中,需要根据具体的业务场景和技术栈选择合适的集成方案,并建立完善的监控和告警机制,确保调度系统能够稳定可靠地运行。
在下一章中,我们将探讨调度平台的监控与运维,包括任务执行日志采集、调度指标监控、告警与自动化运维等内容。