智能测试优化: 预测性测试选择与故障测试用例识别
2025/9/7大约 16 分钟
在现代软件开发中,测试是确保代码质量和系统稳定性的关键环节。然而,随着应用程序复杂性的增加和代码库规模的扩大,传统的测试方法面临着效率低下、资源浪费和维护成本高等挑战。智能测试优化通过应用机器学习和数据分析技术,能够实现预测性测试选择和故障测试用例识别,从而显著提升测试效率和质量。本文将深入探讨智能测试优化的核心技术和实践方法。
预测性测试选择
预测性测试选择是一种基于历史数据和代码变更分析,智能选择最相关测试用例的技术。它能够显著减少测试执行时间,同时保持较高的测试覆盖率。
测试选择算法
实现预测性测试选择需要设计高效的算法来分析代码变更与测试用例之间的关系:
基于代码变更的测试选择
#!/usr/bin/env python3
"""
基于代码变更的预测性测试选择系统
"""
import ast
import hashlib
from typing import List, Dict, Set, Tuple
from dataclasses import dataclass
import networkx as nx
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import logging
@dataclass
class CodeChange:
file_path: str
change_type: str # added, modified, deleted
lines_changed: List[int]
methods_affected: List[str]
@dataclass
class TestCase:
name: str
file_path: str
methods_called: List[str]
last_execution_time: float
last_failure_rate: float
coverage_data: Dict[str, Set[int]] # file_path -> line_numbers
class PredictiveTestSelector:
def __init__(self):
self.test_dependency_graph = nx.DiGraph()
self.code_element_index = {}
self.test_case_index = {}
self.vectorizer = TfidfVectorizer()
self.logger = logging.getLogger(__name__)
def analyze_code_changes(self, changed_files: List[str]) -> List[CodeChange]:
"""分析代码变更"""
code_changes = []
for file_path in changed_files:
try:
change = self._parse_file_changes(file_path)
code_changes.append(change)
except Exception as e:
self.logger.error(f"Error analyzing changes in {file_path}: {e}")
return code_changes
def _parse_file_changes(self, file_path: str) -> CodeChange:
"""解析文件变更"""
# 这里简化实现,实际应用中会与版本控制系统集成
methods_affected = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 使用AST解析Python代码
if file_path.endswith('.py'):
tree = ast.parse(content)
methods_affected = self._extract_methods_from_ast(tree)
# 计算文件哈希用于变更检测
file_hash = hashlib.md5(content.encode()).hexdigest()
except Exception as e:
self.logger.warning(f"Could not parse {file_path}: {e}")
file_hash = ""
return CodeChange(
file_path=file_path,
change_type="modified", # 简化处理
lines_changed=[], # 实际应用中会从diff中获取
methods_affected=methods_affected
)
def _extract_methods_from_ast(self, tree: ast.AST) -> List[str]:
"""从AST中提取方法名"""
methods = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
methods.append(node.name)
elif isinstance(node, ast.ClassDef):
for item in node.body:
if isinstance(item, ast.FunctionDef):
methods.append(f"{node.name}.{item.name}")
return methods
def build_test_dependency_map(self, test_cases: List[TestCase]):
"""构建测试依赖映射"""
self.test_case_index = {tc.name: tc for tc in test_cases}
# 为每个测试用例建立依赖关系
for test_case in test_cases:
# 基于调用的方法建立依赖
for method in test_case.methods_called:
if method not in self.test_dependency_graph:
self.test_dependency_graph.add_node(method)
test_node = f"test:{test_case.name}"
if test_node not in self.test_dependency_graph:
self.test_dependency_graph.add_node(test_node)
self.test_dependency_graph.add_edge(method, test_node)
def select_tests_for_changes(self, code_changes: List[CodeChange],
all_tests: List[TestCase],
max_tests: int = 100) -> List[TestCase]:
"""为代码变更选择测试用例"""
# 1. 基于直接影响选择测试
directly_affected_tests = self._find_directly_affected_tests(code_changes)
# 2. 基于间接影响选择测试
indirectly_affected_tests = self._find_indirectly_affected_tests(
code_changes, directly_affected_tests
)
# 3. 基于历史数据优化选择
optimized_tests = self._optimize_test_selection(
directly_affected_tests + indirectly_affected_tests,
code_changes
)
# 4. 限制测试数量
if len(optimized_tests) > max_tests:
optimized_tests = self._prioritize_tests(optimized_tests, max_tests)
return optimized_tests
def _find_directly_affected_tests(self, code_changes: List[CodeChange]) -> List[TestCase]:
"""查找直接受影响的测试"""
affected_tests = set()
for change in code_changes:
# 基于文件路径匹配
for test_name, test_case in self.test_case_index.items():
if self._files_related(change.file_path, test_case.file_path):
affected_tests.add(test_name)
# 基于方法调用匹配
for method in change.methods_affected:
if method in self.test_dependency_graph:
# 查找依赖该方法的测试
for successor in self.test_dependency_graph.successors(method):
if successor.startswith("test:"):
test_name = successor[5:] # 移除"test:"前缀
affected_tests.add(test_name)
return [self.test_case_index[name] for name in affected_tests if name in self.test_case_index]
def _find_indirectly_affected_tests(self, code_changes: List[CodeChange],
direct_tests: List[TestCase]) -> List[TestCase]:
"""查找间接受影响的测试"""
indirectly_affected = set()
direct_test_names = {test.name for test in direct_tests}
# 基于测试执行历史查找相关测试
for change in code_changes:
related_tests = self._find_tests_by_history(change.file_path)
for test_name in related_tests:
if test_name not in direct_test_names:
indirectly_affected.add(test_name)
return [self.test_case_index[name] for name in indirectly_affected if name in self.test_case_index]
def _find_tests_by_history(self, file_path: str, lookback_days: int = 30) -> Set[str]:
"""基于历史执行记录查找相关测试"""
# 简化实现,实际应用中会查询测试执行数据库
related_tests = set()
# 模拟历史数据分析
# 在实际系统中,这里会查询测试执行记录,找出经常与该文件一起失败的测试
file_hash = hashlib.md5(file_path.encode()).hexdigest()
# 基于简单的哈希算法模拟相关性
for test_name in self.test_case_index.keys():
test_hash = hashlib.md5(test_name.encode()).hexdigest()
# 如果哈希值的前几位相同,认为相关
if file_hash[:4] == test_hash[:4]:
related_tests.add(test_name)
return related_tests
def _optimize_test_selection(self, candidate_tests: List[TestCase],
code_changes: List[CodeChange]) -> List[TestCase]:
"""优化测试选择"""
# 基于多种因素对测试进行评分
scored_tests = []
for test in candidate_tests:
score = self._calculate_test_score(test, code_changes)
scored_tests.append((test, score))
# 按分数排序
scored_tests.sort(key=lambda x: x[1], reverse=True)
return [test for test, score in scored_tests]
def _calculate_test_score(self, test: TestCase, code_changes: List[CodeChange]) -> float:
"""计算测试分数"""
score = 0.0
# 1. 历史失败率权重 (30%)
score += test.last_failure_rate * 0.3
# 2. 执行时间权重 (20%)
# 执行时间越短,优先级越高
if test.last_execution_time > 0:
time_score = min(1.0, 10.0 / test.last_execution_time) # 假设10秒为基准
score += time_score * 0.2
# 3. 覆盖率权重 (25%)
coverage_score = self._calculate_coverage_score(test, code_changes)
score += coverage_score * 0.25
# 4. 变更相关性权重 (25%)
relevance_score = self._calculate_relevance_score(test, code_changes)
score += relevance_score * 0.25
return score
def _calculate_coverage_score(self, test: TestCase, code_changes: List[CodeChange]) -> float:
"""计算覆盖率分数"""
if not test.coverage_data:
return 0.5 # 默认分数
total_lines = 0
covered_lines = 0
for change in code_changes:
if change.file_path in test.coverage_data:
changed_lines = set(change.lines_changed)
covered_lines_in_file = test.coverage_data[change.file_path]
# 计算变更行中被覆盖的比例
intersection = changed_lines.intersection(covered_lines_in_file)
if len(changed_lines) > 0:
return len(intersection) / len(changed_lines)
return 0.5 # 默认分数
def _calculate_relevance_score(self, test: TestCase, code_changes: List[CodeChange]) -> float:
"""计算相关性分数"""
# 基于文件路径和方法名的文本相似度
test_features = [test.file_path] + test.methods_called
change_features = [change.file_path for change in code_changes]
if not test_features or not change_features:
return 0.5
# 使用TF-IDF计算相似度
try:
all_features = test_features + change_features
tfidf_matrix = self.vectorizer.fit_transform(all_features)
# 计算测试特征与变更特征的相似度
test_vectors = tfidf_matrix[:len(test_features)]
change_vectors = tfidf_matrix[len(test_features):]
similarities = cosine_similarity(test_vectors, change_vectors)
max_similarity = similarities.max() if similarities.size > 0 else 0
return max_similarity
except Exception as e:
self.logger.warning(f"Error calculating relevance score: {e}")
return 0.5
def _prioritize_tests(self, tests: List[TestCase], max_count: int) -> List[TestCase]:
"""优先排序测试"""
# 基于失败率和执行时间进行优先排序
prioritized = sorted(tests, key=lambda x: (
-x.last_failure_rate, # 高失败率优先
x.last_execution_time # 短执行时间优先
))
return prioritized[:max_count]
def _files_related(self, file1: str, file2: str) -> bool:
"""判断两个文件是否相关"""
# 简化实现,实际应用中会更复杂
return file1.split('/')[0] == file2.split('/')[0]
# 使用示例
# selector = PredictiveTestSelector()
#
# # 创建测试用例
# test_cases = [
# TestCase(
# name="test_user_authentication",
# file_path="tests/test_auth.py",
# methods_called=["UserService.authenticate", "UserService.validate_token"],
# last_execution_time=2.5,
# last_failure_rate=0.1,
# coverage_data={"src/auth.py": {10, 15, 20, 25}}
# ),
# TestCase(
# name="test_database_connection",
# file_path="tests/test_db.py",
# methods_called=["DatabaseService.connect", "DatabaseService.query"],
# last_execution_time=1.8,
# last_failure_rate=0.05,
# coverage_data={"src/database.py": {5, 10, 15}}
# )
# ]
#
# # 构建依赖映射
# selector.build_test_dependency_map(test_cases)
#
# # 分析代码变更
# code_changes = selector.analyze_code_changes(["src/auth.py"])
#
# # 选择测试
# selected_tests = selector.select_tests_for_changes(code_changes, test_cases, max_tests=5)
# print(f"Selected {len(selected_tests)} tests")机器学习驱动的测试选择
通过机器学习模型可以更准确地预测哪些测试用例最可能受到代码变更的影响:
测试影响预测模型
#!/usr/bin/env python3
"""
机器学习驱动的测试影响预测模型
"""
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
import joblib
from typing import List, Dict, Tuple
import logging
class MLTestImpactPredictor:
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
self.scaler = StandardScaler()
self.is_trained = False
self.feature_columns = []
self.logger = logging.getLogger(__name__)
def prepare_training_data(self, historical_data: List[Dict]) -> pd.DataFrame:
"""准备训练数据"""
df = pd.DataFrame(historical_data)
# 特征工程
df = self._engineer_features(df)
return df
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""特征工程"""
# 代码复杂度特征
df['change_complexity'] = df['lines_added'] + df['lines_deleted']
df['method_change_ratio'] = df['methods_changed'] / (df['methods_total'] + 1)
# 测试历史特征
df['test_failure_rate_7d'] = df['failures_last_7_days'] / (df['executions_last_7_days'] + 1)
df['test_failure_rate_30d'] = df['failures_last_30_days'] / (df['executions_last_30_days'] + 1)
# 文件关系特征
df['same_module'] = (df['test_module'] == df['change_module']).astype(int)
df['file_path_similarity'] = df.apply(
lambda row: self._calculate_path_similarity(row['test_file'], row['change_file']),
axis=1
)
# 时间特征
df['time_since_last_execution'] = df['hours_since_last_test_run']
df['is_business_hours'] = ((df['hour_of_day'] >= 9) & (df['hour_of_day'] <= 17)).astype(int)
return df
def _calculate_path_similarity(self, path1: str, path2: str) -> float:
"""计算路径相似度"""
parts1 = path1.split('/')
parts2 = path2.split('/')
common_parts = 0
for i in range(min(len(parts1), len(parts2))):
if parts1[i] == parts2[i]:
common_parts += 1
else:
break
max_parts = max(len(parts1), len(parts2))
return common_parts / max_parts if max_parts > 0 else 0
def train_model(self, training_data: pd.DataFrame, target_column: str = 'should_run'):
"""训练模型"""
# 分离特征和目标变量
feature_columns = [col for col in training_data.columns if col != target_column]
self.feature_columns = feature_columns
X = training_data[feature_columns]
y = training_data[target_column]
# 数据标准化
X_scaled = self.scaler.fit_transform(X)
# 分割训练和测试数据
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y, test_size=0.2, random_state=42, stratify=y
)
# 训练模型
self.model.fit(X_train, y_train)
self.is_trained = True
# 评估模型
y_pred = self.model.predict(X_test)
report = classification_report(y_test, y_pred, output_dict=True)
self.logger.info("Model training completed")
self.logger.info(f"Accuracy: {report['accuracy']:.3f}")
self.logger.info(f"Precision: {report['1']['precision']:.3f}")
self.logger.info(f"Recall: {report['1']['recall']:.3f}")
return {
'accuracy': report['accuracy'],
'precision': report['1']['precision'],
'recall': report['1']['recall'],
'classification_report': report
}
def predict_test_impact(self, test_features: pd.DataFrame) -> pd.Series:
"""预测测试影响"""
if not self.is_trained:
raise Exception("Model not trained yet")
# 确保特征列一致
for col in self.feature_columns:
if col not in test_features.columns:
test_features[col] = 0
X = test_features[self.feature_columns]
X_scaled = self.scaler.transform(X)
# 预测概率
probabilities = self.model.predict_proba(X_scaled)
# 返回正类(应该运行测试)的概率
return pd.Series(probabilities[:, 1], index=test_features.index)
def get_feature_importance(self) -> pd.DataFrame:
"""获取特征重要性"""
if not self.is_trained:
raise Exception("Model not trained yet")
importance_df = pd.DataFrame({
'feature': self.feature_columns,
'importance': self.model.feature_importances_
})
return importance_df.sort_values('importance', ascending=False)
def save_model(self, model_path: str):
"""保存模型"""
model_data = {
'model': self.model,
'scaler': self.scaler,
'feature_columns': self.feature_columns,
'is_trained': self.is_trained
}
joblib.dump(model_data, model_path)
self.logger.info(f"Model saved to {model_path}")
def load_model(self, model_path: str):
"""加载模型"""
model_data = joblib.load(model_path)
self.model = model_data['model']
self.scaler = model_data['scaler']
self.feature_columns = model_data['feature_columns']
self.is_trained = model_data['is_trained']
self.logger.info(f"Model loaded from {model_path}")
# 使用示例
# # 创建示例训练数据
# training_data = [
# {
# 'test_name': 'test_auth',
# 'test_file': 'tests/test_auth.py',
# 'test_module': 'auth',
# 'change_file': 'src/auth.py',
# 'change_module': 'auth',
# 'lines_added': 10,
# 'lines_deleted': 5,
# 'methods_changed': 2,
# 'methods_total': 10,
# 'failures_last_7_days': 1,
# 'executions_last_7_days': 20,
# 'failures_last_30_days': 3,
# 'executions_last_30_days': 100,
# 'hours_since_last_test_run': 2,
# 'hour_of_day': 14,
# 'should_run': 1
# },
# {
# 'test_name': 'test_db',
# 'test_file': 'tests/test_db.py',
# 'test_module': 'database',
# 'change_file': 'src/auth.py',
# 'change_module': 'auth',
# 'lines_added': 10,
# 'lines_deleted': 5,
# 'methods_changed': 2,
# 'methods_total': 10,
# 'failures_last_7_days': 0,
# 'executions_last_7_days': 15,
# 'failures_last_30_days': 1,
# 'executions_last_30_days': 80,
# 'hours_since_last_test_run': 5,
# 'hour_of_day': 10,
# 'should_run': 0
# }
# ]
#
# # 训练模型
# predictor = MLTestImpactPredictor()
# df = predictor.prepare_training_data(training_data)
# results = predictor.train_model(df)
#
# # 预测新测试的影响
# new_test_features = pd.DataFrame([{
# 'change_complexity': 15,
# 'method_change_ratio': 0.2,
# 'test_failure_rate_7d': 0.05,
# 'test_failure_rate_30d': 0.01,
# 'same_module': 1,
# 'file_path_similarity': 0.8,
# 'time_since_last_execution': 1,
# 'is_business_hours': 1
# }])
#
# probabilities = predictor.predict_test_impact(new_test_features)
# print(f"Probability of test impact: {probabilities[0]:.3f}")故障测试用例识别
故障测试用例识别是通过分析测试执行历史和代码变更模式,识别出容易失败或不稳定的测试用例,从而帮助团队优先修复和优化这些测试。
测试稳定性分析
通过分析测试的历史执行数据,可以识别出不稳定的测试用例:
测试波动性检测
#!/usr/bin/env python3
"""
测试稳定性分析系统
"""
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple
from dataclasses import dataclass
from scipy import stats
import logging
@dataclass
class TestExecutionRecord:
test_name: str
execution_time: float
status: str # PASS, FAIL, ERROR, SKIP
timestamp: pd.Timestamp
build_id: str
environment: str
@dataclass
class TestStabilityMetrics:
test_name: str
failure_rate: float
flakiness_score: float
execution_time_variance: float
recent_trend: str # improving, deteriorating, stable
recommendations: List[str]
class TestStabilityAnalyzer:
def __init__(self):
self.logger = logging.getLogger(__name__)
def analyze_test_stability(self, execution_history: List[TestExecutionRecord],
analysis_period_days: int = 30) -> List[TestStabilityMetrics]:
"""分析测试稳定性"""
# 转换为DataFrame便于分析
df = pd.DataFrame([{
'test_name': record.test_name,
'execution_time': record.execution_time,
'status': record.status,
'timestamp': record.timestamp,
'build_id': record.build_id,
'environment': record.environment
} for record in execution_history])
# 过滤分析周期内的数据
cutoff_date = pd.Timestamp.now() - pd.Timedelta(days=analysis_period_days)
recent_data = df[df['timestamp'] >= cutoff_date]
# 按测试分组分析
stability_metrics = []
for test_name in recent_data['test_name'].unique():
test_data = recent_data[recent_data['test_name'] == test_name]
metrics = self._calculate_stability_metrics(test_name, test_data)
stability_metrics.append(metrics)
return stability_metrics
def _calculate_stability_metrics(self, test_name: str,
test_data: pd.DataFrame) -> TestStabilityMetrics:
"""计算稳定性指标"""
total_executions = len(test_data)
failed_executions = len(test_data[test_data['status'].isin(['FAIL', 'ERROR'])])
# 1. 失败率
failure_rate = failed_executions / total_executions if total_executions > 0 else 0
# 2. 不稳定性分数(基于执行结果的波动性)
flakiness_score = self._calculate_flakiness_score(test_data)
# 3. 执行时间方差
execution_time_variance = test_data['execution_time'].var() if total_executions > 1 else 0
# 4. 最近趋势
recent_trend = self._analyze_recent_trend(test_data)
# 5. 生成建议
recommendations = self._generate_recommendations(
failure_rate, flakiness_score, execution_time_variance, recent_trend
)
return TestStabilityMetrics(
test_name=test_name,
failure_rate=failure_rate,
flakiness_score=flakiness_score,
execution_time_variance=execution_time_variance,
recent_trend=recent_trend,
recommendations=recommendations
)
def _calculate_flakiness_score(self, test_data: pd.DataFrame) -> float:
"""计算不稳定性分数"""
if len(test_data) < 3:
return 0.0
# 计算状态序列的波动性
status_sequence = test_data['status'].apply(
lambda x: 1 if x in ['FAIL', 'ERROR'] else 0
).tolist()
# 计算连续相同状态的平均长度
runs = []
current_run = 1
for i in range(1, len(status_sequence)):
if status_sequence[i] == status_sequence[i-1]:
current_run += 1
else:
runs.append(current_run)
current_run = 1
runs.append(current_run)
avg_run_length = np.mean(runs) if runs else 1
max_run_length = max(runs) if runs else 1
# 不稳定性分数:平均运行长度越短,越不稳定
# 限制在0-1范围内
flakiness = min(1.0, max(0.0, 1 - (avg_run_length / max_run_length)))
return flakiness
def _analyze_recent_trend(self, test_data: pd.DataFrame) -> str:
"""分析最近趋势"""
if len(test_data) < 4:
return "stable"
# 按时间排序
sorted_data = test_data.sort_values('timestamp')
# 分为前后两半
mid_point = len(sorted_data) // 2
early_period = sorted_data.iloc[:mid_point]
late_period = sorted_data.iloc[mid_point:]
# 计算各期的失败率
early_failure_rate = len(early_period[early_period['status'].isin(['FAIL', 'ERROR'])]) / len(early_period)
late_failure_rate = len(late_period[late_period['status'].isin(['FAIL', 'ERROR'])]) / len(late_period)
# 判断趋势
rate_change = late_failure_rate - early_failure_rate
if rate_change > 0.1: # 失败率显著上升
return "deteriorating"
elif rate_change < -0.1: # 失败率显著下降
return "improving"
else:
return "stable"
def _generate_recommendations(self, failure_rate: float, flakiness_score: float,
execution_time_variance: float, recent_trend: str) -> List[str]:
"""生成优化建议"""
recommendations = []
# 基于失败率的建议
if failure_rate > 0.3:
recommendations.append("测试失败率过高(>30%),建议优先修复")
elif failure_rate > 0.1:
recommendations.append("测试失败率较高(>10%),建议关注")
# 基于不稳定性分数的建议
if flakiness_score > 0.7:
recommendations.append("测试不稳定,建议检查测试依赖和环境配置")
elif flakiness_score > 0.4:
recommendations.append("测试有一定波动性,建议优化测试隔离性")
# 基于执行时间方差的建议
if execution_time_variance > 100: # 假设时间单位为秒
recommendations.append("测试执行时间不稳定,建议优化测试性能")
# 基于趋势的建议
if recent_trend == "deteriorating":
recommendations.append("测试质量在恶化,需要立即关注")
elif recent_trend == "improving":
recommendations.append("测试质量在改善,继续保持")
# 默认建议
if not recommendations:
recommendations.append("测试表现良好,继续保持")
return recommendations
def identify_flaky_tests(self, stability_metrics: List[TestStabilityMetrics],
flakiness_threshold: float = 0.5) -> List[TestStabilityMetrics]:
"""识别不稳定测试"""
flaky_tests = [
metrics for metrics in stability_metrics
if metrics.flakiness_score > flakiness_threshold
]
return sorted(flaky_tests, key=lambda x: x.flakiness_score, reverse=True)
def identify_high_failure_tests(self, stability_metrics: List[TestStabilityMetrics],
failure_rate_threshold: float = 0.2) -> List[TestStabilityMetrics]:
"""识别高失败率测试"""
high_failure_tests = [
metrics for metrics in stability_metrics
if metrics.failure_rate > failure_rate_threshold
]
return sorted(high_failure_tests, key=lambda x: x.failure_rate, reverse=True)
# 使用示例
# analyzer = TestStabilityAnalyzer()
#
# # 创建示例执行记录
# execution_history = [
# TestExecutionRecord(
# test_name="test_user_login",
# execution_time=2.5,
# status="PASS" if i % 3 != 0 else "FAIL", # 模拟不稳定
# timestamp=pd.Timestamp.now() - pd.Timedelta(days=i),
# build_id=f"build-{i}",
# environment="test"
# )
# for i in range(30)
# ]
#
# # 分析测试稳定性
# stability_metrics = analyzer.analyze_test_stability(execution_history)
#
# # 识别不稳定测试
# flaky_tests = analyzer.identify_flaky_tests(stability_metrics)
# print(f"Found {len(flaky_tests)} flaky tests")
#
# # 识别高失败率测试
# high_failure_tests = analyzer.identify_high_failure_tests(stability_metrics)
# print(f"Found {len(high_failure_tests)} high failure rate tests")
#
# # 输出详细分析结果
# for metrics in stability_metrics[:3]:
# print(f"\nTest: {metrics.test_name}")
# print(f" Failure Rate: {metrics.failure_rate:.2%}")
# print(f" Flakiness Score: {metrics.flakiness_score:.2f}")
# print(f" Trend: {metrics.recent_trend}")
# print(f" Recommendations: {', '.join(metrics.recommendations)}")根因分析与修复建议
对于识别出的问题测试用例,需要进一步分析其失败的根本原因并提供修复建议:
测试失败根因分析
#!/usr/bin/env python3
"""
测试失败根因分析系统
"""
import pandas as pd
import numpy as np
from typing import List, Dict, Set, Tuple
from dataclasses import dataclass
import re
import logging
from collections import Counter
@dataclass
class TestFailureAnalysis:
test_name: str
failure_patterns: List[Dict[str, any]]
common_failure_points: List[str]
environmental_factors: List[str]
dependency_issues: List[str]
resource_contention: List[str]
suggested_fixes: List[str]
priority: str # high, medium, low
class TestFailureRootCauseAnalyzer:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.known_failure_patterns = self._initialize_failure_patterns()
def _initialize_failure_patterns(self) -> Dict[str, Dict]:
"""初始化已知失败模式"""
return {
'timeout': {
'patterns': [r'timeout', r'TimeOut', r' timed out', r'exceeded \d+ seconds'],
'category': 'performance',
'suggested_fixes': [
'增加测试超时时间',
'优化测试性能',
'检查外部依赖响应时间'
]
},
'race_condition': {
'patterns': [r'race condition', r'concurrent', r'parallel', r'thread'],
'category': 'concurrency',
'suggested_fixes': [
'添加适当的同步机制',
'确保测试隔离性',
'使用线程安全的数据结构'
]
},
'network_failure': {
'patterns': [r'connection refused', r'network error', r'unreachable', r'SSL'],
'category': 'network',
'suggested_fixes': [
'检查网络连接配置',
'增加重试机制',
'使用mock服务进行测试'
]
},
'database_issue': {
'patterns': [r'database', r'SQL', r'connection pool', r'transaction'],
'category': 'database',
'suggested_fixes': [
'检查数据库连接配置',
'确保测试数据清理',
'使用独立的测试数据库'
]
},
'resource_leak': {
'patterns': [r'memory leak', r'out of memory', r'file handle', r'resource'],
'category': 'resource',
'suggested_fixes': [
'确保资源正确释放',
'添加资源清理代码',
'监控资源使用情况'
]
}
}
def analyze_test_failures(self, failure_logs: List[Dict[str, any]]) -> List[TestFailureAnalysis]:
"""分析测试失败"""
# 按测试名称分组
test_groups = {}
for log in failure_logs:
test_name = log.get('test_name', 'unknown')
if test_name not in test_groups:
test_groups[test_name] = []
test_groups[test_name].append(log)
# 分析每个测试的失败模式
analyses = []
for test_name, logs in test_groups.items():
analysis = self._analyze_single_test(test_name, logs)
analyses.append(analysis)
return analyses
def _analyze_single_test(self, test_name: str, logs: List[Dict]) -> TestFailureAnalysis:
"""分析单个测试的失败"""
# 1. 识别失败模式
failure_patterns = self._identify_failure_patterns(logs)
# 2. 分析常见失败点
common_failure_points = self._analyze_failure_points(logs)
# 3. 识别环境因素
environmental_factors = self._identify_environmental_factors(logs)
# 4. 检查依赖问题
dependency_issues = self._analyze_dependency_issues(logs)
# 5. 检查资源竞争
resource_contention = self._analyze_resource_contention(logs)
# 6. 生成修复建议
suggested_fixes = self._generate_fix_suggestions(
failure_patterns, environmental_factors, dependency_issues, resource_contention
)
# 7. 确定优先级
priority = self._determine_priority(logs, failure_patterns)
return TestFailureAnalysis(
test_name=test_name,
failure_patterns=failure_patterns,
common_failure_points=common_failure_points,
environmental_factors=environmental_factors,
dependency_issues=dependency_issues,
resource_contention=resource_contention,
suggested_fixes=suggested_fixes,
priority=priority
)
def _identify_failure_patterns(self, logs: List[Dict]) -> List[Dict[str, any]]:
"""识别失败模式"""
patterns_found = []
for log in logs:
error_message = log.get('error_message', '')
stack_trace = log.get('stack_trace', '')
# 合并错误信息用于模式匹配
combined_text = f"{error_message} {stack_trace}".lower()
# 检查已知模式
for pattern_name, pattern_info in self.known_failure_patterns.items():
for pattern in pattern_info['patterns']:
if re.search(pattern, combined_text, re.IGNORECASE):
patterns_found.append({
'pattern': pattern_name,
'category': pattern_info['category'],
'matched_text': pattern,
'occurrence': log.get('timestamp', 'unknown')
})
return patterns_found
def _analyze_failure_points(self, logs: List[Dict]) -> List[str]:
"""分析常见失败点"""
failure_points = []
for log in logs:
stack_trace = log.get('stack_trace', '')
if stack_trace:
# 提取堆栈跟踪中的关键方法
lines = stack_trace.split('\n')
for line in lines:
if 'at ' in line and '.java:' in line:
# 提取类名和方法名
match = re.search(r'at ([\w\.]+)\.(\w+)\(', line)
if match:
class_name, method_name = match.groups()
failure_points.append(f"{class_name}.{method_name}")
# 统计最常见的失败点
point_counts = Counter(failure_points)
common_points = [point for point, count in point_counts.most_common(3)]
return common_points
def _identify_environmental_factors(self, logs: List[Dict]) -> List[str]:
"""识别环境因素"""
env_factors = set()
for log in logs:
environment = log.get('environment', '').lower()
if environment:
env_factors.add(environment)
# 检查环境相关的错误信息
error_message = log.get('error_message', '').lower()
if 'environment' in error_message or 'config' in error_message:
env_factors.add('configuration_issue')
if 'permission' in error_message or 'access denied' in error_message:
env_factors.add('permission_issue')
return list(env_factors)
def _analyze_dependency_issues(self, logs: List[Dict]) -> List[str]:
"""分析依赖问题"""
dependency_issues = set()
for log in logs:
error_message = log.get('error_message', '')
stack_trace = log.get('stack_trace', '')
combined_text = f"{error_message} {stack_trace}".lower()
# 检查常见的依赖问题
if 'classnotfound' in combined_text or 'noclassdeffound' in combined_text:
dependency_issues.add('missing_dependency')
if 'unsatisfiedlink' in combined_text:
dependency_issues.add('native_library_issue')
if 'version' in combined_text and ('conflict' in combined_text or 'mismatch' in combined_text):
dependency_issues.add('version_conflict')
return list(dependency_issues)
def _analyze_resource_contention(self, logs: List[Dict]) -> List[str]:
"""分析资源竞争"""
resource_issues = set()
for log in logs:
error_message = log.get('error_message', '')
stack_trace = log.get('stack_trace', '')
combined_text = f"{error_message} {stack_trace}".lower()
# 检查资源相关的错误
if 'deadlock' in combined_text or 'lock' in combined_text:
resource_issues.add('deadlock')
if 'timeout' in combined_text and ('acquire' in combined_text or 'obtain' in combined_text):
resource_issues.add('resource_acquisition_timeout')
if 'too many' in combined_text and ('connection' in combined_text or 'thread' in combined_text):
resource_issues.add('resource_exhaustion')
return list(resource_issues)
def _generate_fix_suggestions(self, failure_patterns: List[Dict],
environmental_factors: List[str],
dependency_issues: List[str],
resource_contention: List[str]) -> List[str]:
"""生成修复建议"""
suggestions = set()
# 基于失败模式的建议
for pattern in failure_patterns:
pattern_name = pattern['pattern']
if pattern_name in self.known_failure_patterns:
suggestions.update(self.known_failure_patterns[pattern_name]['suggested_fixes'])
# 基于环境因素的建议
if 'configuration_issue' in environmental_factors:
suggestions.add('检查测试环境配置')
if 'permission_issue' in environmental_factors:
suggestions.add('检查文件和目录权限')
# 基于依赖问题的建议
if 'missing_dependency' in dependency_issues:
suggestions.add('检查依赖项是否正确引入')
if 'version_conflict' in dependency_issues:
suggestions.add('解决依赖版本冲突')
# 基于资源竞争的建议
if 'deadlock' in resource_contention:
suggestions.add('检查并修复死锁问题')
if 'resource_exhaustion' in resource_contention:
suggestions.add('优化资源使用,增加资源限制')
return list(suggestions)
def _determine_priority(self, logs: List[Dict], failure_patterns: List[Dict]) -> str:
"""确定修复优先级"""
# 计算失败频率
failure_count = len(logs)
# 检查是否有高严重性模式
high_severity_patterns = ['race_condition', 'deadlock', 'resource_exhaustion']
has_high_severity = any(
pattern['pattern'] in high_severity_patterns for pattern in failure_patterns
)
# 检查失败趋势
recent_failures = sum(1 for log in logs if log.get('recent', False))
failure_trend = recent_failures / failure_count if failure_count > 0 else 0
# 确定优先级
if has_high_severity or (failure_count > 10 and failure_trend > 0.5):
return 'high'
elif failure_count > 5 or failure_trend > 0.3:
return 'medium'
else:
return 'low'
# 使用示例
# analyzer = TestFailureRootCauseAnalyzer()
#
# # 创建示例失败日志
# failure_logs = [
# {
# 'test_name': 'test_concurrent_user_access',
# 'error_message': 'Timeout while waiting for resource',
# 'stack_trace': 'at com.example.UserService.getUser(UserService.java:45)',
# 'environment': 'test',
# 'timestamp': '2025-09-07 10:30:00',
# 'recent': True
# },
# {
# 'test_name': 'test_concurrent_user_access',
# 'error_message': 'Deadlock detected',
# 'stack_trace': 'at com.example.DatabaseManager.query(DatabaseManager.java:120)',
# 'environment': 'test',
# 'timestamp': '2025-09-07 11:15:00',
# 'recent': True
# }
# ]
#
# # 分析测试失败
# analyses = analyzer.analyze_test_failures(failure_logs)
#
# # 输出分析结果
# for analysis in analyses:
# print(f"\nTest: {analysis.test_name}")
# print(f"Priority: {analysis.priority}")
# print(f"Failure Patterns: {[p['pattern'] for p in analysis.failure_patterns]}")
# print(f"Suggested Fixes: {analysis.suggested_fixes}")通过实现预测性测试选择和故障测试用例识别,智能测试优化能够显著提升测试效率和质量。这些技术不仅减少了不必要的测试执行时间,还帮助团队快速定位和修复问题测试,从而提高整个CI/CD流程的可靠性和效率。随着机器学习和数据分析技术的不断发展,智能测试优化将在未来的软件开发中发挥越来越重要的作用。
