可视化编排与脚本生成的结合
2025/9/6大约 19 分钟
7.4 可视化编排与脚本生成的结合
在现代UI自动化测试平台中,可视化编排与脚本生成的结合是提升测试效率和降低使用门槛的关键技术。通过直观的图形界面,用户可以轻松设计复杂的测试流程,而平台则自动生成相应的测试脚本。本节将详细介绍可视化测试流程设计、脚本自动生成机制以及可视化与脚本的双向同步技术。
可视化测试流程设计
可视化界面架构
可视化编排界面需要提供直观、易用的操作方式,让用户能够通过拖拽和配置来设计测试流程:
class VisualOrchestrator {
constructor(containerId, config = {}) {
this.container = document.getElementById(containerId);
this.config = {
gridSize: 20,
snapToGrid: true,
allowParallelExecution: true,
maxNodes: 100,
...config
};
this.nodes = new Map();
this.connections = new Map();
this.selectedNode = null;
this.draggedNode = null;
this.initCanvas();
this.initEventListeners();
this.initToolbox();
}
initCanvas() {
/** 初始化画布 */
this.canvas = document.createElement('div');
this.canvas.className = 'visual-canvas';
this.canvas.style.position = 'relative';
this.canvas.style.width = '100%';
this.canvas.style.height = '100%';
this.canvas.style.backgroundColor = '#f8f9fa';
this.canvas.style.overflow = 'auto';
// 添加网格背景
if (this.config.snapToGrid) {
this.canvas.style.backgroundImage = `
linear-gradient(to right, #e9ecef 1px, transparent 1px),
linear-gradient(to bottom, #e9ecef 1px, transparent 1px)
`;
this.canvas.style.backgroundSize = `${this.config.gridSize}px ${this.config.gridSize}px`;
}
this.container.appendChild(this.canvas);
}
initToolbox() {
/** 初始化工具箱 */
this.toolbox = document.createElement('div');
this.toolbox.className = 'toolbox';
this.toolbox.style.position = 'absolute';
this.toolbox.style.left = '10px';
this.toolbox.style.top = '10px';
this.toolbox.style.width = '200px';
this.toolbox.style.backgroundColor = '#fff';
this.toolbox.style.border = '1px solid #dee2e6';
this.toolbox.style.borderRadius = '4px';
this.toolbox.style.padding = '10px';
this.toolbox.style.boxShadow = '0 2px 4px rgba(0,0,0,0.1)';
// 添加节点类型
const nodeTypes = [
{ type: 'navigate', label: '导航', icon: '🧭' },
{ type: 'click', label: '点击', icon: '👆' },
{ type: 'input', label: '输入', icon: '⌨️' },
{ type: 'assert', label: '断言', icon: '✅' },
{ type: 'wait', label: '等待', icon: '⏱️' },
{ type: 'screenshot', label: '截图', icon: '📸' },
{ type: 'loop', label: '循环', icon: '🔄' },
{ type: 'condition', label: '条件', icon: '❓' }
];
nodeTypes.forEach(nodeType => {
const nodeElement = document.createElement('div');
nodeElement.className = 'toolbox-node';
nodeElement.style.padding = '8px';
nodeElement.style.margin = '4px 0';
nodeElement.style.backgroundColor = '#f8f9fa';
nodeElement.style.border = '1px solid #dee2e6';
nodeElement.style.borderRadius = '4px';
nodeElement.style.cursor = 'grab';
nodeElement.style.userSelect = 'none';
nodeElement.draggable = true;
nodeElement.innerHTML = `
<span style="margin-right: 8px;">${nodeType.icon}</span>
<span>${nodeType.label}</span>
`;
nodeElement.addEventListener('dragstart', (e) => {
e.dataTransfer.setData('nodeType', nodeType.type);
e.dataTransfer.setData('nodeLabel', nodeType.label);
});
this.toolbox.appendChild(nodeElement);
});
this.canvas.appendChild(this.toolbox);
}
initEventListeners() {
/** 初始化事件监听器 */
this.canvas.addEventListener('dragover', (e) => {
e.preventDefault();
});
this.canvas.addEventListener('drop', (e) => {
e.preventDefault();
const nodeType = e.dataTransfer.getData('nodeType');
const nodeLabel = e.dataTransfer.getData('nodeLabel');
if (nodeType) {
this.addNode(nodeType, nodeLabel, {
x: e.offsetX,
y: e.offsetY
});
}
});
// 画布点击事件
this.canvas.addEventListener('click', (e) => {
if (e.target === this.canvas) {
this.deselectNode();
}
});
}
addNode(type, label, position) {
/** 添加节点 */
const nodeId = this.generateNodeId();
// 计算网格对齐位置
if (this.config.snapToGrid) {
position.x = Math.round(position.x / this.config.gridSize) * this.config.gridSize;
position.y = Math.round(position.y / this.config.gridSize) * this.config.gridSize;
}
const node = new VisualNode(nodeId, type, label, position);
this.nodes.set(nodeId, node);
// 创建节点元素
const nodeElement = this.createNodeElement(node);
this.canvas.appendChild(nodeElement);
return node;
}
createNodeElement(node) {
/** 创建节点元素 */
const nodeElement = document.createElement('div');
nodeElement.className = 'visual-node';
nodeElement.id = `node-${node.id}`;
nodeElement.style.position = 'absolute';
nodeElement.style.left = `${node.position.x}px`;
nodeElement.style.top = `${node.position.y}px`;
nodeElement.style.width = '120px';
nodeElement.style.minHeight = '60px';
nodeElement.style.backgroundColor = '#fff';
nodeElement.style.border = '2px solid #007bff';
nodeElement.style.borderRadius = '6px';
nodeElement.style.padding = '10px';
nodeElement.style.boxShadow = '0 2px 8px rgba(0,0,0,0.15)';
nodeElement.style.cursor = 'move';
nodeElement.style.userSelect = 'none';
// 节点内容
nodeElement.innerHTML = `
<div class="node-header" style="font-weight: bold; margin-bottom: 5px;">
${node.icon} ${node.label}
</div>
<div class="node-content" style="font-size: 12px; color: #6c757d;">
${node.type}
</div>
`;
// 添加拖拽功能
this.makeNodeDraggable(nodeElement, node);
// 添加点击事件
nodeElement.addEventListener('click', (e) => {
e.stopPropagation();
this.selectNode(node.id);
});
// 添加双击事件(编辑节点)
nodeElement.addEventListener('dblclick', (e) => {
e.stopPropagation();
this.editNode(node.id);
});
return nodeElement;
}
makeNodeDraggable(nodeElement, node) {
/** 使节点可拖拽 */
let isDragging = false;
let offsetX, offsetY;
nodeElement.addEventListener('mousedown', (e) => {
isDragging = true;
this.draggedNode = node;
const rect = nodeElement.getBoundingClientRect();
offsetX = e.clientX - rect.left;
offsetY = e.clientY - rect.top;
nodeElement.style.zIndex = '1000';
nodeElement.style.opacity = '0.8';
});
document.addEventListener('mousemove', (e) => {
if (isDragging && this.draggedNode === node) {
const canvasRect = this.canvas.getBoundingClientRect();
let x = e.clientX - canvasRect.left - offsetX;
let y = e.clientY - canvasRect.top - offsetY;
// 网格对齐
if (this.config.snapToGrid) {
x = Math.round(x / this.config.gridSize) * this.config.gridSize;
y = Math.round(y / this.config.gridSize) * this.config.gridSize;
}
// 边界检查
x = Math.max(0, Math.min(x, this.canvas.offsetWidth - 120));
y = Math.max(0, Math.min(y, this.canvas.offsetHeight - 60));
nodeElement.style.left = `${x}px`;
nodeElement.style.top = `${y}px`;
node.position.x = x;
node.position.y = y;
// 更新连接线
this.updateConnectionsForNode(node.id);
}
});
document.addEventListener('mouseup', () => {
if (isDragging) {
isDragging = false;
this.draggedNode = null;
nodeElement.style.zIndex = '1';
nodeElement.style.opacity = '1';
}
});
}
selectNode(nodeId) {
/** 选择节点 */
// 取消之前选择的节点
if (this.selectedNode) {
const prevElement = document.getElementById(`node-${this.selectedNode}`);
if (prevElement) {
prevElement.style.border = '2px solid #007bff';
}
}
// 选择新节点
this.selectedNode = nodeId;
const nodeElement = document.getElementById(`node-${nodeId}`);
if (nodeElement) {
nodeElement.style.border = '2px solid #28a745';
}
// 显示属性面板
this.showPropertyPanel(nodeId);
}
deselectNode() {
/** 取消选择节点 */
if (this.selectedNode) {
const nodeElement = document.getElementById(`node-${this.selectedNode}`);
if (nodeElement) {
nodeElement.style.border = '2px solid #007bff';
}
this.selectedNode = null;
}
// 隐藏属性面板
this.hidePropertyPanel();
}
editNode(nodeId) {
/** 编辑节点 */
const node = this.nodes.get(nodeId);
if (!node) return;
// 显示编辑对话框
this.showNodeEditor(node);
}
showPropertyPanel(nodeId) {
/** 显示属性面板 */
const node = this.nodes.get(nodeId);
if (!node) return;
// 创建或更新属性面板
let propertyPanel = document.getElementById('property-panel');
if (!propertyPanel) {
propertyPanel = document.createElement('div');
propertyPanel.id = 'property-panel';
propertyPanel.style.position = 'absolute';
propertyPanel.style.right = '10px';
propertyPanel.style.top = '10px';
propertyPanel.style.width = '250px';
propertyPanel.style.backgroundColor = '#fff';
propertyPanel.style.border = '1px solid #dee2e6';
propertyPanel.style.borderRadius = '4px';
propertyPanel.style.padding = '15px';
propertyPanel.style.boxShadow = '0 2px 8px rgba(0,0,0,0.15)';
propertyPanel.style.zIndex = '100';
this.canvas.appendChild(propertyPanel);
}
// 更新属性面板内容
propertyPanel.innerHTML = `
<h4 style="margin-top: 0;">节点属性</h4>
<div style="margin-bottom: 10px;">
<label style="display: block; margin-bottom: 5px; font-weight: bold;">类型:</label>
<div>${node.label} (${node.type})</div>
</div>
<div style="margin-bottom: 10px;">
<label style="display: block; margin-bottom: 5px; font-weight: bold;">位置:</label>
<div>X: ${node.position.x}, Y: ${node.position.y}</div>
</div>
<div style="margin-bottom: 10px;">
<label style="display: block; margin-bottom: 5px; font-weight: bold;">配置:</label>
<button onclick="window.orchestrator.showNodeEditor('${nodeId}')"
style="padding: 5px 10px; background: #007bff; color: white; border: none; border-radius: 3px; cursor: pointer;">
编辑配置
</button>
</div>
`;
}
hidePropertyPanel() {
/** 隐藏属性面板 */
const propertyPanel = document.getElementById('property-panel');
if (propertyPanel) {
propertyPanel.remove();
}
}
showNodeEditor(node) {
/** 显示节点编辑器 */
// 这里应该根据节点类型显示不同的编辑界面
const editor = new NodeEditor(node, this);
editor.show();
}
updateConnectionsForNode(nodeId) {
/** 更新节点的连接线 */
// 实现连接线更新逻辑
}
generateNodeId() {
/** 生成节点ID */
return 'node_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
}
}
class VisualNode {
constructor(id, type, label, position) {
this.id = id;
this.type = type;
this.label = label;
this.position = position;
this.config = {};
this.inputs = [];
this.outputs = [];
// 设置节点图标
this.icon = this.getIconForType(type);
}
getIconForType(type) {
const icons = {
'navigate': '🧭',
'click': '👆',
'input': '⌨️',
'assert': '✅',
'wait': '⏱️',
'screenshot': '📸',
'loop': '🔄',
'condition': '❓'
};
return icons[type] || '⚙️';
}
setConfig(config) {
this.config = { ...this.config, ...config };
}
getConfig() {
return this.config;
}
}
class NodeEditor {
constructor(node, orchestrator) {
this.node = node;
this.orchestrator = orchestrator;
}
show() {
/** 显示节点编辑器 */
// 创建模态对话框
const modal = document.createElement('div');
modal.style.position = 'fixed';
modal.style.left = '0';
modal.style.top = '0';
modal.style.width = '100%';
modal.style.height = '100%';
modal.style.backgroundColor = 'rgba(0,0,0,0.5)';
modal.style.display = 'flex';
modal.style.justifyContent = 'center';
modal.style.alignItems = 'center';
modal.style.zIndex = '10000';
const dialog = document.createElement('div');
dialog.style.backgroundColor = '#fff';
dialog.style.padding = '20px';
dialog.style.borderRadius = '8px';
dialog.style.minWidth = '400px';
dialog.style.maxWidth = '600px';
dialog.style.maxHeight = '80vh';
dialog.style.overflowY = 'auto';
dialog.innerHTML = `
<h3 style="margin-top: 0;">编辑节点: ${this.node.label}</h3>
<div id="node-config-form"></div>
<div style="margin-top: 20px; text-align: right;">
<button id="cancel-btn" style="margin-right: 10px; padding: 8px 16px;">取消</button>
<button id="save-btn" style="padding: 8px 16px; background: #28a745; color: white; border: none; border-radius: 4px;">保存</button>
</div>
`;
// 根据节点类型生成配置表单
this.generateConfigForm(dialog.querySelector('#node-config-form'));
// 添加事件监听器
dialog.querySelector('#cancel-btn').addEventListener('click', () => {
modal.remove();
});
dialog.querySelector('#save-btn').addEventListener('click', () => {
this.saveConfig();
modal.remove();
});
modal.appendChild(dialog);
document.body.appendChild(modal);
}
generateConfigForm(container) {
/** 生成配置表单 */
let formHtml = '';
switch (this.node.type) {
case 'navigate':
formHtml = `
<div style="margin-bottom: 15px;">
<label style="display: block; margin-bottom: 5px;">URL:</label>
<input type="text" id="url-input" value="${this.node.config.url || ''}"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
</div>
`;
break;
case 'click':
formHtml = `
<div style="margin-bottom: 15px;">
<label style="display: block; margin-bottom: 5px;">选择器:</label>
<input type="text" id="selector-input" value="${this.node.config.selector || ''}"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
</div>
<div style="margin-bottom: 15px;">
<label style="display: block; margin-bottom: 5px;">等待时间 (秒):</label>
<input type="number" id="timeout-input" value="${this.node.config.timeout || 10}"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
</div>
`;
break;
case 'input':
formHtml = `
<div style="margin-bottom: 15px;">
<label style="display: block; margin-bottom: 5px;">选择器:</label>
<input type="text" id="selector-input" value="${this.node.config.selector || ''}"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
</div>
<div style="margin-bottom: 15px;">
<label style="display: block; margin-bottom: 5px;">输入值:</label>
<input type="text" id="value-input" value="${this.node.config.value || ''}"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
</div>
`;
break;
case 'assert':
formHtml = `
<div style="margin-bottom: 15px;">
<label style="display: block; margin-bottom: 5px;">断言类型:</label>
<select id="assert-type" style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
<option value="text" ${this.node.config.assertType === 'text' ? 'selected' : ''}>文本匹配</option>
<option value="visibility" ${this.node.config.assertType === 'visibility' ? 'selected' : ''}>可见性</option>
<option value="existence" ${this.node.config.assertType === 'existence' ? 'selected' : ''}>存在性</option>
</select>
</div>
<div style="margin-bottom: 15px;">
<label style="display: block; margin-bottom: 5px;">选择器:</label>
<input type="text" id="selector-input" value="${this.node.config.selector || ''}"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
</div>
<div style="margin-bottom: 15px;" id="expected-value-container">
<label style="display: block; margin-bottom: 5px;">期望值:</label>
<input type="text" id="expected-value" value="${this.node.config.expectedValue || ''}"
style="width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px;">
</div>
`;
break;
default:
formHtml = '<p>暂不支持此节点类型的配置编辑</p>';
}
container.innerHTML = formHtml;
// 添加动态行为
const assertTypeSelect = container.querySelector('#assert-type');
if (assertTypeSelect) {
const expectedValueContainer = container.querySelector('#expected-value-container');
assertTypeSelect.addEventListener('change', () => {
if (assertTypeSelect.value === 'visibility' || assertTypeSelect.value === 'existence') {
expectedValueContainer.style.display = 'none';
} else {
expectedValueContainer.style.display = 'block';
}
});
// 初始化状态
if (assertTypeSelect.value === 'visibility' || assertTypeSelect.value === 'existence') {
expectedValueContainer.style.display = 'none';
}
}
}
saveConfig() {
/** 保存配置 */
const config = {};
switch (this.node.type) {
case 'navigate':
config.url = document.getElementById('url-input').value;
break;
case 'click':
config.selector = document.getElementById('selector-input').value;
config.timeout = parseInt(document.getElementById('timeout-input').value) || 10;
break;
case 'input':
config.selector = document.getElementById('selector-input').value;
config.value = document.getElementById('value-input').value;
break;
case 'assert':
config.assertType = document.getElementById('assert-type').value;
config.selector = document.getElementById('selector-input').value;
if (config.assertType === 'text') {
config.expectedValue = document.getElementById('expected-value').value;
}
break;
}
this.node.setConfig(config);
// 更新节点显示
const nodeElement = document.getElementById(`node-${this.node.id}`);
if (nodeElement) {
// 可以更新节点显示信息
}
}
}流程连接机制
可视化编排需要支持节点间的连接,形成完整的测试流程:
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
from enum import Enum
class ConnectionType(Enum):
"""连接类型枚举"""
SEQUENTIAL = "sequential" # 顺序执行
CONDITIONAL_TRUE = "conditional_true" # 条件为真
CONDITIONAL_FALSE = "conditional_false" # 条件为假
LOOP_ITERATION = "loop_iteration" # 循环迭代
@dataclass
class NodeConnection:
"""节点连接"""
id: str
source_node_id: str
target_node_id: str
connection_type: ConnectionType
condition: Optional[str] = None # 条件表达式
created_at: Optional[str] = None
@dataclass
class TestFlow:
"""测试流程"""
id: str
name: str
description: str
nodes: List[Dict[str, Any]]
connections: List[NodeConnection]
created_at: str
updated_at: str
created_by: str
version: str = "1.0.0"
class FlowManager:
"""流程管理器"""
def __init__(self, storage_backend):
self.storage = storage_backend
self.flows = {}
def create_flow(self, flow: TestFlow) -> str:
"""创建测试流程"""
# 验证流程数据
self._validate_flow(flow)
# 生成唯一ID
if not flow.id:
flow.id = self._generate_unique_id()
# 保存到存储
self.storage.save_flow(flow)
# 更新缓存
self.flows[flow.id] = flow
return flow.id
def get_flow(self, flow_id: str) -> Optional[TestFlow]:
"""获取测试流程"""
# 先从缓存获取
if flow_id in self.flows:
return self.flows[flow_id]
# 从存储获取
flow = self.storage.get_flow(flow_id)
if flow:
self.flows[flow_id] = flow
return flow
def update_flow(self, flow: TestFlow) -> bool:
"""更新测试流程"""
# 验证流程数据
self._validate_flow(flow)
# 检查是否存在
if not self.storage.flow_exists(flow.id):
return False
# 更新时间戳
from datetime import datetime
flow.updated_at = datetime.now().isoformat()
# 保存到存储
self.storage.update_flow(flow)
# 更新缓存
self.flows[flow.id] = flow
return True
def delete_flow(self, flow_id: str) -> bool:
"""删除测试流程"""
# 检查是否存在
if not self.storage.flow_exists(flow_id):
return False
# 从存储删除
self.storage.delete_flow(flow_id)
# 从缓存删除
if flow_id in self.flows:
del self.flows[flow_id]
return True
def execute_flow(self, flow_id: str, execution_context: Dict = None) -> Dict:
"""执行测试流程"""
flow = self.get_flow(flow_id)
if not flow:
raise ValueError(f"Flow {flow_id} not found")
# 创建执行器
executor = FlowExecutor(flow, execution_context or {})
# 执行流程
result = executor.execute()
return result
def _validate_flow(self, flow: TestFlow):
"""验证流程数据"""
if not flow.name:
raise ValueError("Flow name is required")
# 验证节点连接的有效性
node_ids = {node["id"] for node in flow.nodes}
for connection in flow.connections:
if connection.source_node_id not in node_ids:
raise ValueError(f"Source node {connection.source_node_id} not found")
if connection.target_node_id not in node_ids:
raise ValueError(f"Target node {connection.target_node_id} not found")
def _generate_unique_id(self) -> str:
"""生成唯一ID"""
import uuid
return str(uuid.uuid4())
class FlowExecutor:
"""流程执行器"""
def __init__(self, flow: TestFlow, context: Dict):
self.flow = flow
self.context = context
self.node_results = {}
self.execution_log = []
def execute(self) -> Dict:
"""执行流程"""
try:
# 找到起始节点(没有输入连接的节点)
start_node = self._find_start_node()
if not start_node:
raise ValueError("No start node found in flow")
# 执行流程
result = self._execute_node(start_node)
return {
"status": "completed",
"result": result,
"execution_log": self.execution_log,
"node_results": self.node_results
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"execution_log": self.execution_log,
"node_results": self.node_results
}
def _find_start_node(self) -> Optional[Dict]:
"""找到起始节点"""
# 找到所有有输入连接的节点
connected_nodes = {
conn.target_node_id for conn in self.flow.connections
}
# 起始节点是没有输入连接的节点
for node in self.flow.nodes:
if node["id"] not in connected_nodes:
return node
return None
def _execute_node(self, node: Dict) -> Any:
"""执行单个节点"""
node_id = node["id"]
# 记录执行开始
self._log_execution(node_id, "started")
try:
# 根据节点类型执行不同的操作
result = self._execute_node_by_type(node)
# 记录执行结果
self.node_results[node_id] = {
"status": "success",
"result": result
}
self._log_execution(node_id, "completed", result)
# 执行后续节点
next_nodes = self._get_next_nodes(node_id)
for next_node_info in next_nodes:
next_node = self._find_node_by_id(next_node_info["node_id"])
if next_node:
self._execute_node(next_node)
return result
except Exception as e:
self.node_results[node_id] = {
"status": "failed",
"error": str(e)
}
self._log_execution(node_id, "failed", str(e))
raise e
def _execute_node_by_type(self, node: Dict) -> Any:
"""根据节点类型执行"""
node_type = node["type"]
if node_type == "navigate":
return self._execute_navigate(node)
elif node_type == "click":
return self._execute_click(node)
elif node_type == "input":
return self._execute_input(node)
elif node_type == "assert":
return self._execute_assert(node)
elif node_type == "wait":
return self._execute_wait(node)
elif node_type == "screenshot":
return self._execute_screenshot(node)
else:
raise ValueError(f"Unsupported node type: {node_type}")
def _execute_navigate(self, node: Dict) -> str:
"""执行导航节点"""
url = node.get("config", {}).get("url")
if not url:
raise ValueError("URL is required for navigate node")
# 这里应该调用实际的浏览器导航方法
# 例如: self.context["driver"].get(url)
return f"Navigated to {url}"
def _execute_click(self, node: Dict) -> str:
"""执行点击节点"""
selector = node.get("config", {}).get("selector")
if not selector:
raise ValueError("Selector is required for click node")
timeout = node.get("config", {}).get("timeout", 10)
# 这里应该调用实际的点击方法
# 例如: WebDriverWait(driver, timeout).until(
# EC.element_to_be_clickable((By.CSS_SELECTOR, selector))
# ).click()
return f"Clicked element {selector}"
def _execute_input(self, node: Dict) -> str:
"""执行输入节点"""
selector = node.get("config", {}).get("selector")
value = node.get("config", {}).get("value")
if not selector:
raise ValueError("Selector is required for input node")
if value is None:
raise ValueError("Value is required for input node")
# 这里应该调用实际的输入方法
# 例如: element = driver.find_element(By.CSS_SELECTOR, selector)
# element.clear()
# element.send_keys(value)
return f"Input '{value}' into element {selector}"
def _execute_assert(self, node: Dict) -> bool:
"""执行断言节点"""
assert_type = node.get("config", {}).get("assertType")
selector = node.get("config", {}).get("selector")
if not assert_type:
raise ValueError("Assert type is required for assert node")
if not selector:
raise ValueError("Selector is required for assert node")
# 这里应该调用实际的断言方法
result = True # 模拟断言结果
if not result:
raise AssertionError(f"Assertion failed for element {selector}")
return result
def _execute_wait(self, node: Dict) -> str:
"""执行等待节点"""
timeout = node.get("config", {}).get("timeout", 5)
# 这里应该调用实际的等待方法
# 例如: time.sleep(timeout)
import time
time.sleep(min(timeout, 10)) # 限制最大等待时间
return f"Waited for {timeout} seconds"
def _execute_screenshot(self, node: Dict) -> str:
"""执行截图节点"""
filename = node.get("config", {}).get("filename", f"screenshot_{int(time.time())}.png")
# 这里应该调用实际的截图方法
# 例如: driver.save_screenshot(filename)
return f"Screenshot saved as {filename}"
def _get_next_nodes(self, node_id: str) -> List[Dict]:
"""获取后续节点"""
next_nodes = []
for connection in self.flow.connections:
if connection.source_node_id == node_id:
next_nodes.append({
"node_id": connection.target_node_id,
"connection_type": connection.connection_type,
"condition": connection.condition
})
return next_nodes
def _find_node_by_id(self, node_id: str) -> Optional[Dict]:
"""根据ID查找节点"""
for node in self.flow.nodes:
if node["id"] == node_id:
return node
return None
def _log_execution(self, node_id: str, status: str, details: Any = None):
"""记录执行日志"""
from datetime import datetime
log_entry = {
"timestamp": datetime.now().isoformat(),
"node_id": node_id,
"status": status,
"details": details
}
self.execution_log.append(log_entry)脚本自动生成机制
代码生成器设计
基于可视化编排的流程,自动生成相应的测试脚本:
class ScriptGenerator:
"""脚本生成器"""
def __init__(self, template_engine=None):
self.template_engine = template_engine or TemplateEngine()
self.generators = {
"python_selenium": self._generate_python_selenium,
"javascript_playwright": self._generate_javascript_playwright,
"java_selenium": self._generate_java_selenium
}
def generate_script(self, flow: TestFlow, language: str = "python_selenium") -> str:
"""生成测试脚本"""
generator = self.generators.get(language)
if not generator:
raise ValueError(f"Unsupported language: {language}")
return generator(flow)
def _generate_python_selenium(self, flow: TestFlow) -> str:
"""生成Python Selenium脚本"""
template = '''
import unittest
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
class {{ flow.name|replace(' ', '') }}Test(unittest.TestCase):
def setUp(self):
"""测试初始化"""
self.driver = webdriver.Chrome()
self.driver.maximize_window()
self.wait = WebDriverWait(self.driver, 10)
def test_{{ flow.name|lower|replace(' ', '_') }}(self):
"""{{ flow.description or flow.name }}"""
driver = self.driver
wait = self.wait
try:
{% for node in flow.nodes %}
# {{ node.label or node.type }}
{% if node.type == "navigate" %}
driver.get("{{ node.config.url }}")
{% elif node.type == "click" %}
element = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "{{ node.config.selector }}")))
element.click()
{% elif node.type == "input" %}
element = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "{{ node.config.selector }}")))
element.clear()
element.send_keys("{{ node.config.value }}")
{% elif node.type == "assert" %}
{% if node.config.assertType == "text" %}
element = wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "{{ node.config.selector }}")))
self.assertEqual(element.text, "{{ node.config.expectedValue }}")
{% elif node.config.assertType == "visibility" %}
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "{{ node.config.selector }}")))
{% elif node.config.assertType == "existence" %}
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "{{ node.config.selector }}")))
{% endif %}
{% elif node.type == "wait" %}
time.sleep({{ node.config.timeout or 5 }})
{% elif node.type == "screenshot" %}
driver.save_screenshot("{{ node.config.filename or 'screenshot.png' }}")
{% endif %}
{% endfor %}
except Exception as e:
self.fail(f"Test failed with exception: {{ '{' }}{{ '{' }}e{{ '}' }}{{ '}' }}")
def tearDown(self):
"""测试清理"""
self.driver.quit()
if __name__ == "__main__":
unittest.main()
'''.strip()
# 渲染模板
return self.template_engine.render(template, {"flow": flow})
def _generate_javascript_playwright(self, flow: TestFlow) -> str:
"""生成JavaScript Playwright脚本"""
template = '''
const { chromium } = require('playwright');
async function {{ flow.name|lower|replace(' ', '_') }}() {
const browser = await chromium.launch({ headless: false });
const context = await browser.newContext();
const page = await context.newPage();
try {
{% for node in flow.nodes %}
// {{ node.label or node.type }}
{% if node.type == "navigate" %}
await page.goto('{{ node.config.url }}');
{% elif node.type == "click" %}
await page.click('{{ node.config.selector }}');
{% elif node.type == "input" %}
await page.fill('{{ node.config.selector }}', '{{ node.config.value }}');
{% elif node.type == "assert" %}
{% if node.config.assertType == "text" %}
const elementText = await page.textContent('{{ node.config.selector }}');
if (elementText !== '{{ node.config.expectedValue }}') {
throw new Error(`Assertion failed: expected "{{ node.config.expectedValue }}", got "${elementText}"`);
}
{% elif node.config.assertType == "visibility" %}
await page.waitForSelector('{{ node.config.selector }}', { state: 'visible' });
{% elif node.config.assertType == "existence" %}
await page.waitForSelector('{{ node.config.selector }}');
{% endif %}
{% elif node.type == "wait" %}
await page.waitForTimeout({{ (node.config.timeout or 5) * 1000 }});
{% elif node.type == "screenshot" %}
await page.screenshot({ path: '{{ node.config.filename or "screenshot.png" }}' });
{% endif %}
{% endfor %}
} catch (error) {
console.error('Test failed:', error);
throw error;
} finally {
await browser.close();
}
}
{{ flow.name|lower|replace(' ', '_') }}().catch(console.error);
'''.strip()
# 渲染模板
return self.template_engine.render(template, {"flow": flow})
def _generate_java_selenium(self, flow: TestFlow) -> str:
"""生成Java Selenium脚本"""
template = '''
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.openqa.selenium.By;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.chrome.ChromeDriver;
import org.openqa.selenium.support.ui.WebDriverWait;
import org.openqa.selenium.support.ui.ExpectedConditions;
import static org.junit.Assert.*;
public class {{ flow.name|replace(' ', '') }}Test {
private WebDriver driver;
private WebDriverWait wait;
@Before
public void setUp() {
System.setProperty("webdriver.chrome.driver", "chromedriver.exe");
driver = new ChromeDriver();
driver.manage().window().maximize();
wait = new WebDriverWait(driver, 10);
}
@Test
public void test{{ flow.name|replace(' ', '') }}() {
try {
{% for node in flow.nodes %}
// {{ node.label or node.type }}
{% if node.type == "navigate" %}
driver.get("{{ node.config.url }}");
{% elif node.type == "click" %}
WebElement element = wait.until(ExpectedConditions.elementToBeClickable(By.cssSelector("{{ node.config.selector }}")));
element.click();
{% elif node.type == "input" %}
WebElement element = wait.until(ExpectedConditions.presenceOfElementLocated(By.cssSelector("{{ node.config.selector }}")));
element.clear();
element.sendKeys("{{ node.config.value }}");
{% elif node.type == "assert" %}
{% if node.config.assertType == "text" %}
WebElement element = wait.until(ExpectedConditions.visibilityOfElementLocated(By.cssSelector("{{ node.config.selector }}")));
assertEquals("{{ node.config.expectedValue }}", element.getText());
{% elif node.config.assertType == "visibility" %}
wait.until(ExpectedConditions.visibilityOfElementLocated(By.cssSelector("{{ node.config.selector }}")));
{% elif node.config.assertType == "existence" %}
wait.until(ExpectedConditions.presenceOfElementLocated(By.cssSelector("{{ node.config.selector }}")));
{% endif %}
{% elif node.type == "wait" %}
Thread.sleep({{ (node.config.timeout or 5) * 1000 }}L);
{% elif node.type == "screenshot" %}
// Screenshot functionality would need additional implementation
{% endif %}
{% endfor %}
} catch (Exception e) {
fail("Test failed with exception: " + e.getMessage());
}
}
@After
public void tearDown() {
if (driver != null) {
driver.quit();
}
}
}
'''.strip()
# 渲染模板
return self.template_engine.render(template, {"flow": flow})
class TemplateEngine:
"""模板引擎"""
def render(self, template_str: str, context: Dict) -> str:
"""渲染模板"""
from jinja2 import Template
# 简单的过滤器实现
def replace_filter(value, old, new):
return value.replace(old, new)
def lower_filter(value):
return value.lower()
# 创建模板并添加过滤器
template = Template(template_str)
template.environment.filters['replace'] = replace_filter
template.environment.filters['lower'] = lower_filter
return template.render(**context)模板管理系统
为了支持不同语言和框架的脚本生成,需要实现模板管理系统:
import os
import json
from typing import Dict, List, Optional
class TemplateManager:
"""模板管理器"""
def __init__(self, template_path: str = "templates"):
self.template_path = template_path
self.templates = {}
self.load_templates()
def load_templates(self):
"""加载模板"""
if not os.path.exists(self.template_path):
os.makedirs(self.template_path)
return
for filename in os.listdir(self.template_path):
if filename.endswith('.json'):
template_name = filename[:-5] # 移除.json后缀
with open(os.path.join(self.template_path, filename), 'r', encoding='utf-8') as f:
self.templates[template_name] = json.load(f)
def get_template(self, template_name: str) -> Optional[Dict]:
"""获取模板"""
return self.templates.get(template_name)
def save_template(self, template_name: str, template_data: Dict):
"""保存模板"""
self.templates[template_name] = template_data
# 保存到文件
with open(os.path.join(self.template_path, f"{template_name}.json"), 'w', encoding='utf-8') as f:
json.dump(template_data, f, ensure_ascii=False, indent=2)
def list_templates(self) -> List[str]:
"""列出所有模板"""
return list(self.templates.keys())
def delete_template(self, template_name: str) -> bool:
"""删除模板"""
if template_name in self.templates:
del self.templates[template_name]
# 删除文件
file_path = os.path.join(self.template_path, f"{template_name}.json")
if os.path.exists(file_path):
os.remove(file_path)
return True
return False
def create_template_from_flow(self, flow: TestFlow, language: str) -> str:
"""从流程创建模板"""
template_name = f"{flow.name.lower().replace(' ', '_')}_{language}"
template_data = {
"name": template_name,
"language": language,
"flow_id": flow.id,
"created_at": flow.created_at,
"nodes": flow.nodes,
"connections": [
{
"source": conn.source_node_id,
"target": conn.target_node_id,
"type": conn.connection_type.value,
"condition": conn.condition
}
for conn in flow.connections
]
}
self.save_template(template_name, template_data)
return template_name
class ScriptExporter:
"""脚本导出器"""
def __init__(self, script_generator: ScriptGenerator, template_manager: TemplateManager):
self.generator = script_generator
self.template_manager = template_manager
def export_flow_as_script(self, flow_id: str, flow_manager: FlowManager,
language: str = "python_selenium",
output_path: str = None) -> str:
"""导出流程为脚本"""
# 获取流程
flow = flow_manager.get_flow(flow_id)
if not flow:
raise ValueError(f"Flow {flow_id} not found")
# 生成脚本
script_content = self.generator.generate_script(flow, language)
# 保存到文件
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(script_content)
return script_content
def export_flow_as_template(self, flow_id: str, flow_manager: FlowManager) -> str:
"""导出流程为模板"""
# 获取流程
flow = flow_manager.get_flow(flow_id)
if not flow:
raise ValueError(f"Flow {flow_id} not found")
# 创建模板
template_name = self.template_manager.create_template_from_flow(flow, "generic")
return template_name可视化与脚本的双向同步
双向同步机制设计
为了确保可视化编排与脚本代码的一致性,需要实现双向同步机制:
class BidirectionalSync:
"""双向同步器"""
def __init__(self, flow_manager: FlowManager, script_generator: ScriptGenerator):
self.flow_manager = flow_manager
self.script_generator = script_generator
self.sync_handlers = {
"python_selenium": self._sync_python_selenium,
"javascript_playwright": self._sync_javascript_playwright
}
def sync_script_to_flow(self, script_content: str, language: str = "python_selenium") -> str:
"""将脚本同步到流程"""
sync_handler = self.sync_handlers.get(language)
if not sync_handler:
raise ValueError(f"Unsupported language for sync: {language}")
# 解析脚本内容
flow_data = sync_handler(script_content)
# 创建或更新流程
flow = TestFlow(**flow_data)
flow_id = self.flow_manager.create_flow(flow)
return flow_id
def sync_flow_to_script(self, flow_id: str, language: str = "python_selenium") -> str:
"""将流程同步到脚本"""
flow = self.flow_manager.get_flow(flow_id)
if not flow:
raise ValueError(f"Flow {flow_id} not found")
# 生成脚本
script_content = self.script_generator.generate_script(flow, language)
return script_content
def _sync_python_selenium(self, script_content: str) -> Dict:
"""同步Python Selenium脚本"""
import ast
import re
# 解析Python代码
try:
tree = ast.parse(script_content)
except SyntaxError as e:
raise ValueError(f"Invalid Python syntax: {e}")
# 提取测试类和方法
flow_data = {
"name": "Imported Flow",
"description": "Imported from Python Selenium script",
"nodes": [],
"connections": [],
"created_at": "",
"updated_at": "",
"created_by": "importer"
}
node_counter = 0
# 简单的AST遍历来提取操作
for node in ast.walk(tree):
if isinstance(node, ast.Call):
# 检查方法调用
if isinstance(node.func, ast.Attribute):
method_name = node.func.attr
if method_name == "get":
# 导航操作
if len(node.args) > 0 and isinstance(node.args[0], ast.Constant):
url = node.args[0].value
flow_data["nodes"].append({
"id": f"node_{node_counter}",
"type": "navigate",
"label": "导航",
"config": {"url": url}
})
node_counter += 1
elif method_name == "click":
# 点击操作
# 这里需要更复杂的解析来提取选择器
flow_data["nodes"].append({
"id": f"node_{node_counter}",
"type": "click",
"label": "点击",
"config": {"selector": "/* extracted selector */"}
})
node_counter += 1
elif method_name == "send_keys":
# 输入操作
if len(node.args) > 0 and isinstance(node.args[0], ast.Constant):
value = node.args[0].value
flow_data["nodes"].append({
"id": f"node_{node_counter}",
"type": "input",
"label": "输入",
"config": {
"selector": "/* extracted selector */",
"value": value
}
})
node_counter += 1
return flow_data
def _sync_javascript_playwright(self, script_content: str) -> Dict:
"""同步JavaScript Playwright脚本"""
# 使用正则表达式提取操作
flow_data = {
"name": "Imported Flow",
"description": "Imported from JavaScript Playwright script",
"nodes": [],
"connections": [],
"created_at": "",
"updated_at": "",
"created_by": "importer"
}
node_counter = 0
# 提取页面操作
page_operations = re.findall(r'await\s+page\.(\w+)\(([^)]+)\)', script_content)
for operation, args in page_operations:
if operation == "goto":
# 导航操作
url_match = re.search(r'["\']([^"\']+)["\']', args)
if url_match:
url = url_match.group(1)
flow_data["nodes"].append({
"id": f"node_{node_counter}",
"type": "navigate",
"label": "导航",
"config": {"url": url}
})
node_counter += 1
elif operation == "click":
# 点击操作
selector_match = re.search(r'["\']([^"\']+)["\']', args)
if selector_match:
selector = selector_match.group(1)
flow_data["nodes"].append({
"id": f"node_{node_counter}",
"type": "click",
"label": "点击",
"config": {"selector": selector}
})
node_counter += 1
elif operation == "fill":
# 输入操作
matches = re.findall(r'["\']([^"\']+)["\']', args)
if len(matches) >= 2:
selector, value = matches[0], matches[1]
flow_data["nodes"].append({
"id": f"node_{node_counter}",
"type": "input",
"label": "输入",
"config": {
"selector": selector,
"value": value
}
})
node_counter += 1
return flow_data
class SyncManager:
"""同步管理器"""
def __init__(self, bidirectional_sync: BidirectionalSync):
self.sync = bidirectional_sync
self.sync_history = []
def sync_with_version_control(self, flow_id: str, language: str = "python_selenium"):
"""与版本控制系统同步"""
# 生成脚本
script_content = self.sync.sync_flow_to_script(flow_id, language)
# 这里应该与Git等版本控制系统集成
# 例如:保存到Git仓库,提交更改等
# 记录同步历史
sync_record = {
"flow_id": flow_id,
"language": language,
"sync_time": "",
"script_hash": hash(script_content)
}
self.sync_history.append(sync_record)
return script_content
def auto_sync_on_change(self, flow_id: str):
"""变更时自动同步"""
# 监听流程变更事件
# 当流程发生变更时,自动同步到脚本
# 这里应该与事件系统集成
pass
def resolve_sync_conflicts(self, flow_id: str, script_content: str,
language: str = "python_selenium"):
"""解决同步冲突"""
# 比较流程和脚本的差异
# 提供冲突解决界面
# 这里应该实现差异比较和合并逻辑
pass实践案例分析
案例一:电商平台的可视化编排实践
某大型电商平台在UI自动化测试中成功实施了可视化编排与脚本生成的结合:
实施背景:
- 测试场景复杂,涉及多个页面和操作
- 非技术人员难以编写测试脚本
- 测试维护成本高
技术实现:
- 开发基于Web的可视化编排界面
- 实现多语言脚本自动生成
- 集成版本控制和协作功能
实施效果:
- 测试开发效率提升70%
- 非技术人员也能参与测试设计
- 测试维护成本降低50%
案例二:金融科技公司的双向同步实践
某金融科技公司通过双向同步机制显著提高了测试开发的灵活性:
应用场景:
- 既需要可视化设计,也需要代码编辑
- 团队成员技能水平不同
- 需要与现有测试资产集成
解决方案:
- 实现可视化与代码的双向同步
- 支持从现有脚本导入流程
- 提供冲突解决机制
应用效果:
- 开发方式更加灵活
- 现有资产得到有效利用
- 团队协作效率显著提升
最佳实践建议
可视化设计建议
用户体验:
- 提供直观的拖拽操作
- 支持键盘快捷键
- 实现撤销/重做功能
功能完整性:
- 支持复杂流程设计(条件、循环等)
- 提供丰富的节点类型
- 支持流程验证和调试
性能优化:
- 优化大型流程的渲染性能
- 实现增量更新机制
- 支持流程分片管理
脚本生成建议
代码质量:
- 生成符合最佳实践的代码
- 支持代码格式化
- 提供代码注释和文档
多语言支持:
- 支持主流测试框架
- 提供可扩展的模板系统
- 实现语言间的一致性
集成能力:
- 支持CI/CD集成
- 提供API接口
- 支持自定义扩展
本节小结
本节深入介绍了可视化编排与脚本生成结合的关键技术,包括可视化界面设计、流程连接机制、脚本自动生成以及双向同步机制。通过这些技术的实现,可以显著提高UI自动化测试的易用性和效率。
通过本节的学习,读者应该能够:
- 理解可视化测试流程设计的核心原理和实现方法。
- 掌握脚本自动生成机制和技术实现。
- 学会可视化与脚本的双向同步技术。
- 了解实际项目中的应用案例和最佳实践。
至此,我们已经完成了第7章的所有内容。在下一章中,我们将详细介绍移动端专项测试平台建设,帮助读者构建全面的移动测试体系。
