与RPA集成: 处理无API的遗留系统操作
2025/9/7大约 19 分钟
与RPA集成:处理无API的遗留系统操作
机器人流程自动化(RPA)作为一种重要的自动化技术,能够模拟人类用户操作界面来执行重复性任务。在企业级BPM平台建设中,将BPM与RPA集成可以有效解决那些缺乏API接口或难以通过传统方式集成的遗留系统的自动化问题。本章将深入探讨BPM与RPA集成的架构设计、实现方法和最佳实践。
BPM与RPA集成的核心价值
扩展自动化边界
RPA能够处理传统BPM难以触及的遗留系统和桌面应用,显著扩展了业务流程自动化的覆盖范围。
降低集成成本
相比开发和维护复杂的系统接口,RPA提供了一种更快速、低成本的集成方式。
提高业务灵活性
通过可视化的方式配置RPA机器人,业务用户能够更灵活地调整自动化流程。
实现端到端自动化
BPM负责流程编排和管理,RPA负责具体的界面操作,两者结合实现真正的端到端自动化。
集成架构设计
BPM与RPA的集成需要设计合理的架构,确保两者能够协同工作并统一管理。
// BPM-RPA集成架构服务
@Service
public class BpmRpaIntegrationService {
@Autowired
private RpaRobotRepository robotRepository;
@Autowired
private ProcessInstanceRepository processInstanceRepository;
@Autowired
private TaskRepository taskRepository;
@Autowired
private RpaOrchestratorService orchestratorService;
/**
* 执行BPM与RPA集成任务
* @param taskId BPM任务ID
* @param rpaProcessName RPA流程名称
* @param inputParameters 输入参数
* @return 集成任务结果
*/
public BpmRpaIntegrationResult executeIntegrationTask(String taskId, String rpaProcessName,
Map<String, Object> inputParameters) {
BpmRpaIntegrationResult result = new BpmRpaIntegrationResult();
result.setTaskId(taskId);
result.setStartTime(new Date());
try {
// 1. 获取BPM任务信息
Task bpmTask = taskRepository.findById(taskId);
if (bpmTask == null) {
throw new IntegrationException("BPM任务不存在: " + taskId);
}
// 2. 选择合适的RPA机器人
RpaRobot selectedRobot = selectOptimalRobot(rpaProcessName, inputParameters);
result.setRobotId(selectedRobot.getId());
// 3. 准备RPA执行参数
Map<String, Object> rpaParameters = prepareRpaParameters(bpmTask, inputParameters);
// 4. 触发RPA流程执行
RpaExecutionResult rpaResult = triggerRpaExecution(selectedRobot, rpaProcessName, rpaParameters);
result.setRpaExecutionId(rpaResult.getExecutionId());
// 5. 监控RPA执行状态
RpaExecutionStatus finalStatus = monitorRpaExecution(rpaResult.getExecutionId());
result.setFinalStatus(finalStatus);
// 6. 处理执行结果
handleRpaExecutionResult(bpmTask, rpaResult, finalStatus);
result.setEndTime(new Date());
result.setSuccess(true);
result.setMessage("BPM-RPA集成任务执行完成");
log.info("BPM-RPA集成任务执行完成 - 任务ID: {}, RPA执行ID: {}", taskId, rpaResult.getExecutionId());
} catch (Exception e) {
log.error("BPM-RPA集成任务执行失败 - 任务ID: {}", taskId, e);
result.setSuccess(false);
result.setErrorMessage("集成任务执行失败: " + e.getMessage());
// 更新BPM任务状态
updateBpmTaskOnFailure(taskId, e);
}
return result;
}
/**
* 选择最优RPA机器人
*/
private RpaRobot selectOptimalRobot(String processName, Map<String, Object> parameters) {
try {
// 获取可用的RPA机器人
List<RpaRobot> availableRobots = robotRepository.findAvailableRobots(processName);
if (availableRobots.isEmpty()) {
throw new IntegrationException("没有可用的RPA机器人执行流程: " + processName);
}
// 根据负载和能力选择最优机器人
RpaRobot optimalRobot = availableRobots.stream()
.min(Comparator.comparingDouble(RpaRobot::getCurrentLoad)
.thenComparing(RpaRobot::getPerformanceScore).reversed())
.orElse(availableRobots.get(0));
// 更新机器人状态
optimalRobot.setCurrentLoad(optimalRobot.getCurrentLoad() + 1);
robotRepository.save(optimalRobot);
return optimalRobot;
} catch (Exception e) {
log.error("选择RPA机器人失败 - 流程名称: {}", processName, e);
throw new IntegrationException("选择RPA机器人失败", e);
}
}
/**
* 准备RPA执行参数
*/
private Map<String, Object> prepareRpaParameters(Task bpmTask, Map<String, Object> inputParameters) {
Map<String, Object> rpaParameters = new HashMap<>();
try {
// 添加BPM任务相关信息
rpaParameters.put("bpmTaskId", bpmTask.getId());
rpaParameters.put("processInstanceId", bpmTask.getProcessInstanceId());
rpaParameters.put("assignee", bpmTask.getAssignee());
rpaParameters.put("taskName", bpmTask.getName());
rpaParameters.put("taskDescription", bpmTask.getDescription());
// 添加输入参数
rpaParameters.putAll(inputParameters);
// 添加上下文信息
ProcessInstance processInstance = processInstanceRepository.findById(bpmTask.getProcessInstanceId());
if (processInstance != null) {
rpaParameters.put("businessKey", processInstance.getBusinessKey());
rpaParameters.put("processVariables", processInstance.getVariables());
}
// 数据转换和验证
rpaParameters = transformAndValidateParameters(rpaParameters);
} catch (Exception e) {
log.warn("准备RPA参数失败", e);
}
return rpaParameters;
}
/**
* 数据转换和验证
*/
private Map<String, Object> transformAndValidateParameters(Map<String, Object> parameters) {
Map<String, Object> transformed = new HashMap<>();
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
// 数据类型转换
if (value instanceof String) {
String strValue = (String) value;
// 尝试转换为数字
if (strValue.matches("-?\\d+(\\.\\d+)?")) {
try {
if (strValue.contains(".")) {
transformed.put(key, Double.parseDouble(strValue));
} else {
transformed.put(key, Long.parseLong(strValue));
}
} catch (NumberFormatException e) {
transformed.put(key, value);
}
} else {
transformed.put(key, value);
}
} else {
transformed.put(key, value);
}
}
return transformed;
}
/**
* 触发RPA执行
*/
private RpaExecutionResult triggerRpaExecution(RpaRobot robot, String processName,
Map<String, Object> parameters) {
try {
// 通过RPA编排器触发执行
RpaExecutionRequest request = new RpaExecutionRequest();
request.setRobotId(robot.getId());
request.setProcessName(processName);
request.setParameters(parameters);
request.setPriority(RpaPriority.NORMAL);
request.setTimeout(3600000); // 1小时超时
return orchestratorService.triggerExecution(request);
} catch (Exception e) {
log.error("触发RPA执行失败 - 机器人ID: {}, 流程名称: {}", robot.getId(), processName, e);
throw new IntegrationException("触发RPA执行失败", e);
}
}
/**
* 监控RPA执行状态
*/
private RpaExecutionStatus monitorRpaExecution(String executionId) {
try {
long startTime = System.currentTimeMillis();
long timeout = 3600000; // 1小时超时
while (System.currentTimeMillis() - startTime < timeout) {
RpaExecutionStatus status = orchestratorService.getExecutionStatus(executionId);
switch (status.getStatus()) {
case COMPLETED:
log.info("RPA执行完成 - 执行ID: {}", executionId);
return status;
case FAILED:
log.warn("RPA执行失败 - 执行ID: {}, 错误信息: {}", executionId, status.getErrorMessage());
return status;
case CANCELLED:
log.warn("RPA执行被取消 - 执行ID: {}", executionId);
return status;
case RUNNING:
// 继续监控
Thread.sleep(5000); // 5秒后再次检查
break;
default:
log.warn("RPA执行状态未知 - 执行ID: {}, 状态: {}", executionId, status.getStatus());
Thread.sleep(5000);
break;
}
}
// 超时处理
log.warn("RPA执行超时 - 执行ID: {}", executionId);
RpaExecutionStatus timeoutStatus = new RpaExecutionStatus();
timeoutStatus.setExecutionId(executionId);
timeoutStatus.setStatus(RpaExecutionState.TIMEOUT);
timeoutStatus.setErrorMessage("执行超时");
return timeoutStatus;
} catch (Exception e) {
log.error("监控RPA执行状态失败 - 执行ID: {}", executionId, e);
RpaExecutionStatus errorStatus = new RpaExecutionStatus();
errorStatus.setExecutionId(executionId);
errorStatus.setStatus(RpaExecutionState.FAILED);
errorStatus.setErrorMessage("监控失败: " + e.getMessage());
return errorStatus;
}
}
/**
* 处理RPA执行结果
*/
private void handleRpaExecutionResult(Task bpmTask, RpaExecutionResult rpaResult,
RpaExecutionStatus finalStatus) {
try {
switch (finalStatus.getStatus()) {
case COMPLETED:
// 处理成功结果
handleSuccessfulExecution(bpmTask, rpaResult, finalStatus);
break;
case FAILED:
case TIMEOUT:
case CANCELLED:
// 处理失败结果
handleFailedExecution(bpmTask, rpaResult, finalStatus);
break;
default:
log.warn("未处理的RPA执行状态 - 任务ID: {}, 状态: {}",
bpmTask.getId(), finalStatus.getStatus());
}
} catch (Exception e) {
log.error("处理RPA执行结果失败 - 任务ID: {}", bpmTask.getId(), e);
}
}
/**
* 处理成功执行结果
*/
private void handleSuccessfulExecution(Task bpmTask, RpaExecutionResult rpaResult,
RpaExecutionStatus finalStatus) {
try {
// 提取RPA执行结果
Map<String, Object> rpaOutputs = rpaResult.getOutputs();
// 更新BPM流程变量
ProcessInstance processInstance = processInstanceRepository.findById(bpmTask.getProcessInstanceId());
if (processInstance != null && rpaOutputs != null) {
Map<String, Object> processVariables = processInstance.getVariables();
processVariables.putAll(rpaOutputs);
processInstance.setVariables(processVariables);
processInstanceRepository.save(processInstance);
}
// 完成BPM任务
completeBpmTask(bpmTask, rpaOutputs);
// 释放RPA机器人
releaseRobot(rpaResult.getRobotId());
log.info("成功处理RPA执行结果 - 任务ID: {}", bpmTask.getId());
} catch (Exception e) {
log.error("处理成功执行结果失败 - 任务ID: {}", bpmTask.getId(), e);
throw new IntegrationException("处理成功执行结果失败", e);
}
}
/**
* 处理失败执行结果
*/
private void handleFailedExecution(Task bpmTask, RpaExecutionResult rpaResult,
RpaExecutionStatus finalStatus) {
try {
// 记录错误信息
String errorMessage = finalStatus.getErrorMessage();
log.error("RPA执行失败 - 任务ID: {}, 错误信息: {}", bpmTask.getId(), errorMessage);
// 更新BPM任务状态为错误
bpmTask.setStatus(TaskStatus.ERROR);
bpmTask.setErrorMessage(errorMessage);
bpmTask.setEndTime(new Date());
taskRepository.save(bpmTask);
// 通知相关人员
notifyOnError(bpmTask, errorMessage);
// 释放RPA机器人
releaseRobot(rpaResult.getRobotId());
} catch (Exception e) {
log.error("处理失败执行结果失败 - 任务ID: {}", bpmTask.getId(), e);
}
}
/**
* 完成BPM任务
*/
private void completeBpmTask(Task task, Map<String, Object> outputs) {
try {
task.setStatus(TaskStatus.COMPLETED);
task.setEndTime(new Date());
// 设置输出变量
if (outputs != null) {
task.setOutputs(outputs);
}
taskRepository.save(task);
log.info("BPM任务已完成 - 任务ID: {}", task.getId());
} catch (Exception e) {
log.error("完成BPM任务失败 - 任务ID: {}", task.getId(), e);
}
}
/**
* 释放RPA机器人
*/
private void releaseRobot(String robotId) {
try {
RpaRobot robot = robotRepository.findById(robotId);
if (robot != null) {
robot.setCurrentLoad(Math.max(0, robot.getCurrentLoad() - 1));
robotRepository.save(robot);
log.info("RPA机器人已释放 - 机器人ID: {}", robotId);
}
} catch (Exception e) {
log.warn("释放RPA机器人失败 - 机器人ID: {}", robotId, e);
}
}
/**
* 失败时更新BPM任务
*/
private void updateBpmTaskOnFailure(String taskId, Exception exception) {
try {
Task task = taskRepository.findById(taskId);
if (task != null) {
task.setStatus(TaskStatus.ERROR);
task.setErrorMessage(exception.getMessage());
task.setEndTime(new Date());
taskRepository.save(task);
log.info("BPM任务状态已更新为错误 - 任务ID: {}", taskId);
}
} catch (Exception e) {
log.error("更新BPM任务状态失败 - 任务ID: {}", taskId, e);
}
}
/**
* 错误通知
*/
private void notifyOnError(Task task, String errorMessage) {
try {
// 发送错误通知给任务负责人
String subject = "BPM-RPA集成任务执行失败";
String content = String.format(
"任务执行失败通知\n\n" +
"任务ID: %s\n" +
"任务名称: %s\n" +
"流程实例ID: %s\n" +
"错误信息: %s\n" +
"发生时间: %s",
task.getId(),
task.getName(),
task.getProcessInstanceId(),
errorMessage,
new Date()
);
// notificationService.sendToAssignee(task.getAssignee(), subject, content);
log.info("已发送错误通知 - 任务ID: {}", task.getId());
} catch (Exception e) {
log.error("发送错误通知失败 - 任务ID: {}", task.getId(), e);
}
}
}统一流程编排
BPM作为流程编排中心,需要能够统一管理和协调BPM流程和RPA机器人。
// 统一流程编排服务
@Service
public class UnifiedProcessOrchestrationService {
@Autowired
private ProcessEngine processEngine;
@Autowired
private RpaOrchestratorService rpaOrchestrator;
@Autowired
private MonitoringService monitoringService;
/**
* 创建包含RPA任务的BPM流程
* @param processDefinition 流程定义
* @return 流程定义ID
*/
public String createProcessWithRpaTasks(ProcessDefinition processDefinition) {
try {
// 1. 解析流程定义中的RPA任务
List<RpaTaskDefinition> rpaTasks = extractRpaTasks(processDefinition);
// 2. 验证RPA流程可用性
validateRpaProcesses(rpaTasks);
// 3. 部署BPM流程定义
String processDefinitionId = deployBpmProcess(processDefinition);
// 4. 注册RPA任务映射
registerRpaTaskMappings(processDefinitionId, rpaTasks);
// 5. 初始化监控配置
initializeMonitoring(processDefinitionId, rpaTasks);
log.info("包含RPA任务的BPM流程创建完成 - 流程定义ID: {}", processDefinitionId);
return processDefinitionId;
} catch (Exception e) {
log.error("创建包含RPA任务的BPM流程失败", e);
throw new OrchestrationException("创建流程失败", e);
}
}
/**
* 提取RPA任务定义
*/
private List<RpaTaskDefinition> extractRpaTasks(ProcessDefinition processDefinition) {
List<RpaTaskDefinition> rpaTasks = new ArrayList<>();
try {
// 遍历流程中的所有任务节点
for (FlowElement element : processDefinition.getFlowElements()) {
if (element instanceof ServiceTask) {
ServiceTask serviceTask = (ServiceTask) element;
// 检查是否为RPA任务
String taskType = serviceTask.getType();
if ("rpa".equalsIgnoreCase(taskType)) {
RpaTaskDefinition rpaTask = new RpaTaskDefinition();
rpaTask.setTaskId(serviceTask.getId());
rpaTask.setTaskName(serviceTask.getName());
rpaTask.setRpaProcessName(serviceTask.getImplementation());
// 提取RPA参数映射
Map<String, String> parameterMappings = extractParameterMappings(serviceTask);
rpaTask.setParameterMappings(parameterMappings);
// 提取输出变量映射
Map<String, String> outputMappings = extractOutputMappings(serviceTask);
rpaTask.setOutputMappings(outputMappings);
rpaTasks.add(rpaTask);
}
}
}
} catch (Exception e) {
log.warn("提取RPA任务定义失败", e);
}
return rpaTasks;
}
/**
* 提取参数映射
*/
private Map<String, String> extractParameterMappings(ServiceTask serviceTask) {
Map<String, String> mappings = new HashMap<>();
try {
// 从任务扩展属性中提取参数映射
for (Map.Entry<String, String> entry : serviceTask.getExtensionElements().entrySet()) {
String key = entry.getKey();
if (key.startsWith("rpa.param.")) {
String paramName = key.substring(10); // 移除"rpa.param."前缀
mappings.put(paramName, entry.getValue());
}
}
} catch (Exception e) {
log.warn("提取参数映射失败 - 任务ID: {}", serviceTask.getId(), e);
}
return mappings;
}
/**
* 提取输出映射
*/
private Map<String, String> extractOutputMappings(ServiceTask serviceTask) {
Map<String, String> mappings = new HashMap<>();
try {
// 从任务扩展属性中提取输出映射
for (Map.Entry<String, String> entry : serviceTask.getExtensionElements().entrySet()) {
String key = entry.getKey();
if (key.startsWith("rpa.output.")) {
String outputName = key.substring(11); // 移除"rpa.output."前缀
mappings.put(outputName, entry.getValue());
}
}
} catch (Exception e) {
log.warn("提取输出映射失败 - 任务ID: {}", serviceTask.getId(), e);
}
return mappings;
}
/**
* 验证RPA流程可用性
*/
private void validateRpaProcesses(List<RpaTaskDefinition> rpaTasks) {
try {
for (RpaTaskDefinition rpaTask : rpaTasks) {
String rpaProcessName = rpaTask.getRpaProcessName();
// 检查RPA流程是否存在
if (!rpaOrchestrator.isProcessAvailable(rpaProcessName)) {
throw new OrchestrationException("RPA流程不可用: " + rpaProcessName);
}
// 验证参数映射
validateParameterMappings(rpaProcessName, rpaTask.getParameterMappings());
}
} catch (Exception e) {
log.error("验证RPA流程可用性失败", e);
throw new OrchestrationException("RPA流程验证失败", e);
}
}
/**
* 验证参数映射
*/
private void validateParameterMappings(String rpaProcessName, Map<String, String> parameterMappings) {
try {
// 获取RPA流程的参数定义
List<RpaParameter> rpaParameters = rpaOrchestrator.getProcessParameters(rpaProcessName);
// 验证必需参数是否都已映射
for (RpaParameter param : rpaParameters) {
if (param.isRequired()) {
boolean mapped = parameterMappings.containsKey(param.getName()) ||
parameterMappings.containsValue("${" + param.getName() + "}");
if (!mapped) {
throw new OrchestrationException(
"RPA流程 " + rpaProcessName + " 的必需参数 " + param.getName() + " 未映射");
}
}
}
} catch (Exception e) {
log.warn("验证参数映射失败 - 流程名称: {}", rpaProcessName, e);
}
}
/**
* 部署BPM流程
*/
private String deployBpmProcess(ProcessDefinition processDefinition) {
try {
// 使用BPM引擎部署流程定义
return processEngine.deployProcess(processDefinition);
} catch (Exception e) {
log.error("部署BPM流程失败", e);
throw new OrchestrationException("BPM流程部署失败", e);
}
}
/**
* 注册RPA任务映射
*/
private void registerRpaTaskMappings(String processDefinitionId, List<RpaTaskDefinition> rpaTasks) {
try {
// 将RPA任务映射注册到映射管理器
for (RpaTaskDefinition rpaTask : rpaTasks) {
RpaTaskMapping mapping = new RpaTaskMapping();
mapping.setProcessDefinitionId(processDefinitionId);
mapping.setTaskId(rpaTask.getTaskId());
mapping.setRpaProcessName(rpaTask.getRpaProcessName());
mapping.setParameterMappings(rpaTask.getParameterMappings());
mapping.setOutputMappings(rpaTask.getOutputMappings());
// rpaTaskMappingRepository.save(mapping);
}
log.info("RPA任务映射注册完成 - 流程定义ID: {}, RPA任务数: {}",
processDefinitionId, rpaTasks.size());
} catch (Exception e) {
log.error("注册RPA任务映射失败 - 流程定义ID: {}", processDefinitionId, e);
}
}
/**
* 初始化监控配置
*/
private void initializeMonitoring(String processDefinitionId, List<RpaTaskDefinition> rpaTasks) {
try {
// 为包含RPA任务的流程初始化监控配置
MonitoringConfig config = new MonitoringConfig();
config.setProcessDefinitionId(processDefinitionId);
config.setMonitoringEnabled(true);
config.setAlertThresholds(createAlertThresholds());
config.setRpaTaskIds(rpaTasks.stream()
.map(RpaTaskDefinition::getTaskId)
.collect(Collectors.toList()));
monitoringService.initializeMonitoring(config);
} catch (Exception e) {
log.warn("初始化监控配置失败 - 流程定义ID: {}", processDefinitionId, e);
}
}
/**
* 创建告警阈值
*/
private Map<String, Object> createAlertThresholds() {
Map<String, Object> thresholds = new HashMap<>();
thresholds.put("rpaExecutionTimeout", 3600000); // 1小时
thresholds.put("rpaFailureRate", 0.05); // 5%失败率
thresholds.put("rpaQueueLength", 100); // 队列长度100
return thresholds;
}
/**
* 启动流程实例
* @param processDefinitionId 流程定义ID
* @param variables 流程变量
* @return 流程实例ID
*/
public String startProcessInstance(String processDefinitionId, Map<String, Object> variables) {
try {
// 1. 启动BPM流程实例
String processInstanceId = processEngine.startProcessInstance(processDefinitionId, variables);
// 2. 初始化RPA任务跟踪
initializeRpaTaskTracking(processInstanceId, processDefinitionId);
// 3. 启动监控
startMonitoring(processInstanceId);
log.info("流程实例启动完成 - 流程实例ID: {}", processInstanceId);
return processInstanceId;
} catch (Exception e) {
log.error("启动流程实例失败 - 流程定义ID: {}", processDefinitionId, e);
throw new OrchestrationException("启动流程实例失败", e);
}
}
/**
* 初始化RPA任务跟踪
*/
private void initializeRpaTaskTracking(String processInstanceId, String processDefinitionId) {
try {
// 创建RPA任务跟踪记录
RpaTaskTracking tracking = new RpaTaskTracking();
tracking.setProcessInstanceId(processInstanceId);
tracking.setProcessDefinitionId(processDefinitionId);
tracking.setTrackingStatus(TrackingStatus.ACTIVE);
tracking.setStartTime(new Date());
// rpaTaskTrackingRepository.save(tracking);
} catch (Exception e) {
log.warn("初始化RPA任务跟踪失败 - 流程实例ID: {}", processInstanceId, e);
}
}
/**
* 启动监控
*/
private void startMonitoring(String processInstanceId) {
try {
monitoringService.startProcessMonitoring(processInstanceId);
} catch (Exception e) {
log.warn("启动流程监控失败 - 流程实例ID: {}", processInstanceId, e);
}
}
/**
* 处理RPA任务完成事件
* @param processInstanceId 流程实例ID
* @param taskId 任务ID
* @param rpaOutputs RPA输出
*/
public void handleRpaTaskCompletion(String processInstanceId, String taskId,
Map<String, Object> rpaOutputs) {
try {
// 1. 更新RPA任务跟踪状态
updateRpaTaskTracking(processInstanceId, taskId, rpaOutputs);
// 2. 将RPA输出映射到BPM流程变量
Map<String, Object> mappedOutputs = mapRpaOutputsToProcessVariables(taskId, rpaOutputs);
// 3. 完成BPM服务任务
processEngine.completeServiceTask(processInstanceId, taskId, mappedOutputs);
// 4. 记录完成日志
logRpaTaskCompletion(processInstanceId, taskId, rpaOutputs);
log.info("RPA任务完成处理完成 - 流程实例ID: {}, 任务ID: {}", processInstanceId, taskId);
} catch (Exception e) {
log.error("处理RPA任务完成事件失败 - 流程实例ID: {}, 任务ID: {}", processInstanceId, taskId, e);
// 处理失败情况
handleRpaTaskFailure(processInstanceId, taskId, e);
}
}
/**
* 更新RPA任务跟踪
*/
private void updateRpaTaskTracking(String processInstanceId, String taskId,
Map<String, Object> rpaOutputs) {
try {
// RpaTaskTracking tracking = rpaTaskTrackingRepository
// .findByProcessInstanceIdAndTaskId(processInstanceId, taskId);
//
// if (tracking != null) {
// tracking.setCompletionTime(new Date());
// tracking.setStatus(TaskStatus.COMPLETED);
// tracking.setOutputs(rpaOutputs);
// rpaTaskTrackingRepository.save(tracking);
// }
} catch (Exception e) {
log.warn("更新RPA任务跟踪失败 - 流程实例ID: {}, 任务ID: {}", processInstanceId, taskId, e);
}
}
/**
* 映射RPA输出到流程变量
*/
private Map<String, Object> mapRpaOutputsToProcessVariables(String taskId,
Map<String, Object> rpaOutputs) {
Map<String, Object> mappedOutputs = new HashMap<>();
try {
// 获取任务的输出映射配置
// RpaTaskMapping mapping = rpaTaskMappingRepository.findByTaskId(taskId);
//
// if (mapping != null && mapping.getOutputMappings() != null) {
// Map<String, String> outputMappings = mapping.getOutputMappings();
//
// for (Map.Entry<String, String> entry : outputMappings.entrySet()) {
// String rpaOutputName = entry.getKey();
// String processVariableName = entry.getValue();
//
// if (rpaOutputs.containsKey(rpaOutputName)) {
// mappedOutputs.put(processVariableName, rpaOutputs.get(rpaOutputName));
// }
// }
// }
} catch (Exception e) {
log.warn("映射RPA输出到流程变量失败 - 任务ID: {}", taskId, e);
}
return mappedOutputs;
}
/**
* 记录RPA任务完成日志
*/
private void logRpaTaskCompletion(String processInstanceId, String taskId,
Map<String, Object> rpaOutputs) {
try {
RpaTaskLog logEntry = new RpaTaskLog();
logEntry.setProcessInstanceId(processInstanceId);
logEntry.setTaskId(taskId);
logEntry.setExecutionTime(new Date());
logEntry.setStatus(LogStatus.SUCCESS);
logEntry.setOutputs(rpaOutputs);
// rpaTaskLogRepository.save(logEntry);
} catch (Exception e) {
log.warn("记录RPA任务完成日志失败 - 流程实例ID: {}, 任务ID: {}", processInstanceId, taskId, e);
}
}
/**
* 处理RPA任务失败
*/
private void handleRpaTaskFailure(String processInstanceId, String taskId, Exception exception) {
try {
// 记录失败日志
RpaTaskLog logEntry = new RpaTaskLog();
logEntry.setProcessInstanceId(processInstanceId);
logEntry.setTaskId(taskId);
logEntry.setExecutionTime(new Date());
logEntry.setStatus(LogStatus.FAILED);
logEntry.setErrorMessage(exception.getMessage());
// rpaTaskLogRepository.save(logEntry);
// 更新BPM任务状态为错误
processEngine.failServiceTask(processInstanceId, taskId, exception.getMessage());
} catch (Exception e) {
log.error("处理RPA任务失败失败 - 流程实例ID: {}, 任务ID: {}", processInstanceId, taskId, e);
}
}
}异常处理与错误恢复
在BPM与RPA集成环境中,需要建立完善的异常处理和错误恢复机制。
// 异常处理与错误恢复服务
@Service
public class ExceptionHandlingAndRecoveryService {
@Autowired
private RpaOrchestratorService rpaOrchestrator;
@Autowired
private ProcessEngine processEngine;
@Autowired
private RetryPolicyRepository retryPolicyRepository;
@Autowired
private ErrorHandlingService errorHandlingService;
/**
* 处理RPA执行异常
* @param executionId RPA执行ID
* @param exception 异常信息
* @return 处理结果
*/
public ExceptionHandlingResult handleRpaException(String executionId, Exception exception) {
ExceptionHandlingResult result = new ExceptionHandlingResult();
result.setExecutionId(executionId);
result.setHandlingTime(new Date());
try {
// 1. 获取执行信息
RpaExecutionInfo executionInfo = rpaOrchestrator.getExecutionInfo(executionId);
if (executionInfo == null) {
throw new ExceptionHandlingException("无法获取RPA执行信息: " + executionId);
}
// 2. 分析异常类型
ExceptionType exceptionType = analyzeExceptionType(exception, executionInfo);
result.setExceptionType(exceptionType);
// 3. 获取重试策略
RetryPolicy retryPolicy = getRetryPolicy(executionInfo.getProcessName(), exceptionType);
// 4. 执行异常处理
ExceptionHandlingAction action = determineHandlingAction(
executionInfo, exceptionType, retryPolicy);
result.setRecommendedAction(action);
// 5. 执行处理动作
executeHandlingAction(executionInfo, action, retryPolicy);
result.setSuccess(true);
result.setMessage("异常处理完成");
log.info("RPA异常处理完成 - 执行ID: {}, 异常类型: {}", executionId, exceptionType);
} catch (Exception e) {
log.error("处理RPA异常失败 - 执行ID: {}", executionId, e);
result.setSuccess(false);
result.setErrorMessage("异常处理失败: " + e.getMessage());
}
return result;
}
/**
* 分析异常类型
*/
private ExceptionType analyzeExceptionType(Exception exception, RpaExecutionInfo executionInfo) {
try {
String errorMessage = exception.getMessage();
if (errorMessage == null) {
errorMessage = exception.getClass().getSimpleName();
}
// 应用程序错误
if (errorMessage.contains("application error") ||
errorMessage.contains("应用程序错误")) {
return ExceptionType.APPLICATION_ERROR;
}
// 系统错误
if (errorMessage.contains("system error") ||
errorMessage.contains("系统错误") ||
exception instanceof SystemException) {
return ExceptionType.SYSTEM_ERROR;
}
// 网络错误
if (errorMessage.contains("network") ||
errorMessage.contains("connection") ||
errorMessage.contains("网络") ||
errorMessage.contains("连接")) {
return ExceptionType.NETWORK_ERROR;
}
// 超时错误
if (errorMessage.contains("timeout") ||
errorMessage.contains("超时")) {
return ExceptionType.TIMEOUT_ERROR;
}
// 认证错误
if (errorMessage.contains("authentication") ||
errorMessage.contains("authorization") ||
errorMessage.contains("认证") ||
errorMessage.contains("授权")) {
return ExceptionType.AUTHENTICATION_ERROR;
}
// 数据错误
if (errorMessage.contains("data") ||
errorMessage.contains("数据")) {
return ExceptionType.DATA_ERROR;
}
// 元素未找到错误
if (errorMessage.contains("element not found") ||
errorMessage.contains("元素未找到")) {
return ExceptionType.ELEMENT_NOT_FOUND;
}
return ExceptionType.UNKNOWN_ERROR;
} catch (Exception e) {
log.warn("分析异常类型失败", e);
return ExceptionType.UNKNOWN_ERROR;
}
}
/**
* 获取重试策略
*/
private RetryPolicy getRetryPolicy(String processName, ExceptionType exceptionType) {
try {
// 首先尝试获取特定流程和异常类型的重试策略
RetryPolicy policy = retryPolicyRepository.findByProcessNameAndExceptionType(
processName, exceptionType);
if (policy != null) {
return policy;
}
// 如果没有找到,尝试获取特定异常类型的默认策略
policy = retryPolicyRepository.findByProcessNameAndExceptionType(
"DEFAULT", exceptionType);
if (policy != null) {
return policy;
}
// 如果仍然没有找到,使用系统默认策略
return RetryPolicy.getDefaultPolicy();
} catch (Exception e) {
log.warn("获取重试策略失败 - 流程名称: {}, 异常类型: {}", processName, exceptionType, e);
return RetryPolicy.getDefaultPolicy();
}
}
/**
* 确定处理动作
*/
private ExceptionHandlingAction determineHandlingAction(RpaExecutionInfo executionInfo,
ExceptionType exceptionType, RetryPolicy retryPolicy) {
try {
// 检查是否已达到最大重试次数
if (executionInfo.getRetryCount() >= retryPolicy.getMaxRetries()) {
return ExceptionHandlingAction.FAIL_AND_NOTIFY;
}
// 根据异常类型确定处理动作
switch (exceptionType) {
case NETWORK_ERROR:
case TIMEOUT_ERROR:
// 网络和超时错误通常可以重试
return ExceptionHandlingAction.RETRY_WITH_DELAY;
case AUTHENTICATION_ERROR:
// 认证错误可能需要重新登录
return ExceptionHandlingAction.REAUTHENTICATE_AND_RETRY;
case ELEMENT_NOT_FOUND:
// 元素未找到可能需要等待或调整流程
if (executionInfo.getRetryCount() < 2) {
return ExceptionHandlingAction.WAIT_AND_RETRY;
} else {
return ExceptionHandlingAction.MANUAL_INTERVENTION;
}
case APPLICATION_ERROR:
case SYSTEM_ERROR:
// 应用程序和系统错误可能需要更长的等待时间
return ExceptionHandlingAction.RETRY_WITH_LONG_DELAY;
case DATA_ERROR:
// 数据错误通常需要人工干预
return ExceptionHandlingAction.MANUAL_INTERVENTION;
default:
// 未知错误根据重试策略决定
if (retryPolicy.isRetryOnUnknownError()) {
return ExceptionHandlingAction.RETRY_WITH_DELAY;
} else {
return ExceptionHandlingAction.FAIL_AND_NOTIFY;
}
}
} catch (Exception e) {
log.warn("确定处理动作失败", e);
return ExceptionHandlingAction.FAIL_AND_NOTIFY;
}
}
/**
* 执行处理动作
*/
private void executeHandlingAction(RpaExecutionInfo executionInfo,
ExceptionHandlingAction action, RetryPolicy retryPolicy) {
try {
switch (action) {
case RETRY_WITH_DELAY:
scheduleRetry(executionInfo, retryPolicy.getRetryDelay());
break;
case RETRY_WITH_LONG_DELAY:
scheduleRetry(executionInfo, retryPolicy.getLongRetryDelay());
break;
case WAIT_AND_RETRY:
scheduleRetry(executionInfo, 30000); // 等待30秒后重试
break;
case REAUTHENTICATE_AND_RETRY:
reauthenticateAndRetry(executionInfo);
break;
case MANUAL_INTERVENTION:
requestManualIntervention(executionInfo);
break;
case FAIL_AND_NOTIFY:
failAndNotify(executionInfo);
break;
default:
log.warn("未知的处理动作: {}", action);
}
} catch (Exception e) {
log.error("执行处理动作失败 - 执行ID: {}", executionInfo.getExecutionId(), e);
}
}
/**
* 安排重试
*/
private void scheduleRetry(RpaExecutionInfo executionInfo, long delay) {
try {
// 更新执行信息
executionInfo.setRetryCount(executionInfo.getRetryCount() + 1);
executionInfo.setNextRetryTime(new Date(System.currentTimeMillis() + delay));
executionInfo.setStatus(RpaExecutionState.PENDING_RETRY);
// 保存更新后的执行信息
rpaOrchestrator.updateExecutionInfo(executionInfo);
// 安排延迟执行
scheduleDelayedExecution(executionInfo, delay);
log.info("已安排重试 - 执行ID: {}, 延迟: {}ms, 重试次数: {}",
executionInfo.getExecutionId(), delay, executionInfo.getRetryCount());
} catch (Exception e) {
log.error("安排重试失败 - 执行ID: {}", executionInfo.getExecutionId(), e);
}
}
/**
* 安排延迟执行
*/
private void scheduleDelayedExecution(RpaExecutionInfo executionInfo, long delay) {
// 使用调度器安排延迟执行
// 这里简化实现,实际应该使用Quartz或其他调度框架
new Thread(() -> {
try {
Thread.sleep(delay);
retryExecution(executionInfo);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("延迟执行失败", e);
}
}).start();
}
/**
* 重试执行
*/
private void retryExecution(RpaExecutionInfo executionInfo) {
try {
log.info("开始重试执行 - 执行ID: {}", executionInfo.getExecutionId());
// 重新触发RPA执行
RpaExecutionRequest request = new RpaExecutionRequest();
request.setRobotId(executionInfo.getRobotId());
request.setProcessName(executionInfo.getProcessName());
request.setParameters(executionInfo.getParameters());
request.setPriority(executionInfo.getPriority());
rpaOrchestrator.triggerExecution(request);
} catch (Exception e) {
log.error("重试执行失败 - 执行ID: {}", executionInfo.getExecutionId(), e);
}
}
/**
* 重新认证并重试
*/
private void reauthenticateAndRetry(RpaExecutionInfo executionInfo) {
try {
// 触发重新认证流程
rpaOrchestrator.reauthenticateRobot(executionInfo.getRobotId());
// 等待认证完成
Thread.sleep(5000);
// 重新安排执行
scheduleRetry(executionInfo, 10000); // 10秒后重试
} catch (Exception e) {
log.error("重新认证并重试失败 - 执行ID: {}", executionInfo.getExecutionId(), e);
}
}
/**
* 请求人工干预
*/
private void requestManualIntervention(RpaExecutionInfo executionInfo) {
try {
// 更新执行状态
executionInfo.setStatus(RpaExecutionState.WAITING_FOR_MANUAL_INTERVENTION);
rpaOrchestrator.updateExecutionInfo(executionInfo);
// 通知相关人员
notifyForManualIntervention(executionInfo);
log.info("已请求人工干预 - 执行ID: {}", executionInfo.getExecutionId());
} catch (Exception e) {
log.error("请求人工干预失败 - 执行ID: {}", executionInfo.getExecutionId(), e);
}
}
/**
* 通知人工干预
*/
private void notifyForManualIntervention(RpaExecutionInfo executionInfo) {
try {
String subject = "需要人工干预的RPA执行任务";
String content = buildManualInterventionNotification(executionInfo);
// 发送给RPA管理员和流程负责人
// notificationService.sendToRpaAdmins(subject, content);
// notificationService.sendToProcessOwner(executionInfo.getProcessInstanceId(), subject, content);
} catch (Exception e) {
log.error("发送人工干预通知失败", e);
}
}
/**
* 构建人工干预通知
*/
private String buildManualInterventionNotification(RpaExecutionInfo executionInfo) {
return String.format(
"RPA执行需要人工干预\n\n" +
"执行ID: %s\n" +
"流程名称: %s\n" +
"机器人ID: %s\n" +
"执行参数: %s\n" +
"错误信息: %s\n" +
"发生时间: %s\n\n" +
"请登录RPA管理平台查看详细信息并进行处理。",
executionInfo.getExecutionId(),
executionInfo.getProcessName(),
executionInfo.getRobotId(),
executionInfo.getParameters(),
executionInfo.getErrorMessage(),
executionInfo.getStartTime()
);
}
/**
* 失败并通知
*/
private void failAndNotify(RpaExecutionInfo executionInfo) {
try {
// 更新执行状态为失败
executionInfo.setStatus(RpaExecutionState.FAILED);
executionInfo.setEndTime(new Date());
rpaOrchestrator.updateExecutionInfo(executionInfo);
// 通知失败
notifyExecutionFailure(executionInfo);
// 如果这是BPM流程中的任务,更新BPM任务状态
if (executionInfo.getBpmTaskId() != null) {
processEngine.failServiceTask(
executionInfo.getBpmProcessInstanceId(),
executionInfo.getBpmTaskId(),
executionInfo.getErrorMessage()
);
}
log.info("执行标记为失败并已通知 - 执行ID: {}", executionInfo.getExecutionId());
} catch (Exception e) {
log.error("失败并通知处理失败 - 执行ID: {}", executionInfo.getExecutionId(), e);
}
}
/**
* 通知执行失败
*/
private void notifyExecutionFailure(RpaExecutionInfo executionInfo) {
try {
String subject = "RPA执行失败通知";
String content = buildFailureNotification(executionInfo);
// 发送给相关人员
// notificationService.sendToRpaAdmins(subject, content);
// notificationService.sendToProcessOwner(executionInfo.getProcessInstanceId(), subject, content);
} catch (Exception e) {
log.error("发送执行失败通知失败", e);
}
}
/**
* 构建失败通知
*/
private String buildFailureNotification(RpaExecutionInfo executionInfo) {
return String.format(
"RPA执行失败通知\n\n" +
"执行ID: %s\n" +
"流程名称: %s\n" +
"机器人ID: %s\n" +
"错误信息: %s\n" +
"重试次数: %d\n" +
"发生时间: %s\n\n" +
"已达到最大重试次数,请检查并处理问题。",
executionInfo.getExecutionId(),
executionInfo.getProcessName(),
executionInfo.getRobotId(),
executionInfo.getErrorMessage(),
executionInfo.getRetryCount(),
executionInfo.getStartTime()
);
}
/**
* 处理BPM流程中的RPA任务异常
* @param processInstanceId BPM流程实例ID
* @param taskId BPM任务ID
* @param rpaExecutionId RPA执行ID
* @param exception 异常信息
*/
public void handleBpmRpaTaskException(String processInstanceId, String taskId,
String rpaExecutionId, Exception exception) {
try {
log.info("处理BPM-RPA任务异常 - 流程实例ID: {}, 任务ID: {}, RPA执行ID: {}",
processInstanceId, taskId, rpaExecutionId);
// 1. 处理RPA异常
ExceptionHandlingResult result = handleRpaException(rpaExecutionId, exception);
// 2. 根据处理结果更新BPM任务状态
if (result.getRecommendedAction() == ExceptionHandlingAction.FAIL_AND_NOTIFY) {
// 如果处理结果是失败,更新BPM任务状态
processEngine.failServiceTask(processInstanceId, taskId, exception.getMessage());
} else if (result.getRecommendedAction() == ExceptionHandlingAction.MANUAL_INTERVENTION) {
// 如果需要人工干预,暂停BPM任务
processEngine.suspendTask(processInstanceId, taskId);
}
// 对于重试情况,不需要立即更新BPM任务状态,等待重试结果
} catch (Exception e) {
log.error("处理BPM-RPA任务异常失败 - 流程实例ID: {}, 任务ID: {}", processInstanceId, taskId, e);
}
}
}最佳实践与注意事项
在实施BPM与RPA集成时,需要注意以下最佳实践:
1. 架构设计原则
- 采用松耦合的设计,确保BPM和RPA可以独立发展
- 建立统一的监控和管理界面
- 设计清晰的接口和数据交换格式
2. 异常处理策略
- 建立完善的异常分类和处理机制
- 实施合理的重试策略
- 提供人工干预的应急处理通道
3. 安全与合规
- 确保RPA操作符合安全规范
- 建立操作审计和日志记录机制
- 遵守数据保护和隐私法规
4. 性能优化
- 合理分配RPA机器人资源
- 优化RPA流程执行效率
- 建立性能监控和调优机制
通过合理的架构设计和实施策略,BPM与RPA的集成能够为企业提供更强大的自动化能力,解决传统系统集成难以处理的复杂场景,实现端到端的业务流程自动化。
