与监控系统(Prometheus)集成: 部署后自动验证
2025/9/7大约 5 分钟
部署后的自动验证是确保软件交付质量的关键环节。通过将CI/CD平台与Prometheus等监控系统集成,可以在部署完成后自动验证系统状态,确保新版本的部署没有引入性能问题或功能异常。这种自动化的验证机制不仅提高了部署的可靠性,还能够在问题发生时快速回滚,保障系统的稳定性。
部署后验证机制
部署后验证通过查询监控系统的关键指标,自动判断部署是否成功以及系统是否处于健康状态。
集成实现方案
1. 指标查询与验证
实现Prometheus指标的查询和验证功能:
#!/usr/bin/env python3
"""
Prometheus集成管理器
"""
import json
import requests
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging
import time
class PrometheusIntegrationManager:
def __init__(self, prometheus_url: str, auth_token: str = None):
self.prometheus_url = prometheus_url.rstrip('/')
self.auth_token = auth_token
self.logger = logging.getLogger(__name__)
self.session = requests.Session()
if auth_token:
self.session.headers.update({
'Authorization': f"Bearer {auth_token}"
})
def query_metric(self, query: str, time: str = None) -> Dict[str, Any]:
"""查询Prometheus指标"""
try:
params = {'query': query}
if time:
params['time'] = time
response = self.session.get(
f"{self.prometheus_url}/api/v1/query",
params=params
)
if response.status_code == 200:
result = response.json()
if result.get('status') == 'success':
return {
'success': True,
'data': result.get('data', {}),
'message': 'Query executed successfully'
}
else:
return {
'success': False,
'error': f"Query failed: {result.get('error', 'Unknown error')}"
}
else:
return {
'success': False,
'error': f"HTTP error {response.status_code}: {response.text}"
}
except Exception as e:
return {
'success': False,
'error': f"Exception during query: {str(e)}"
}
def query_range(self, query: str, start: str, end: str,
step: str = '15s') -> Dict[str, Any]:
"""查询时间范围内的指标"""
try:
params = {
'query': query,
'start': start,
'end': end,
'step': step
}
response = self.session.get(
f"{self.prometheus_url}/api/v1/query_range",
params=params
)
if response.status_code == 200:
result = response.json()
if result.get('status') == 'success':
return {
'success': True,
'data': result.get('data', {}),
'message': 'Range query executed successfully'
}
else:
return {
'success': False,
'error': f"Range query failed: {result.get('error', 'Unknown error')}"
}
else:
return {
'success': False,
'error': f"HTTP error {response.status_code}: {response.text}"
}
except Exception as e:
return {
'success': False,
'error': f"Exception during range query: {str(e)}"
}
def validate_deployment_metrics(self, environment: str,
deployment_time: str,
validation_window: int = 300) -> Dict[str, Any]:
"""验证部署后的关键指标"""
try:
# 计算验证时间窗口
end_time = datetime.fromisoformat(deployment_time)
start_time = end_time - timedelta(seconds=validation_window)
# 定义关键指标验证规则
validation_rules = [
{
'name': 'error_rate',
'query': f'rate(http_requests_total{{job="api", status=~"5..", env="{environment}"}}[5m])',
'threshold': 0.01, # 1%错误率阈值
'operator': '<',
'description': 'HTTP 5xx error rate'
},
{
'name': 'latency',
'query': f'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{{job="api", env="{environment}"}}[5m]))',
'threshold': 2.0, # 2秒P95延迟阈值
'operator': '<',
'description': 'HTTP request P95 latency'
},
{
'name': 'availability',
'query': f'up{{job="api", env="{environment}"}}',
'threshold': 1.0, # 100%可用性
'operator': '==',
'description': 'Service availability'
},
{
'name': 'cpu_usage',
'query': f'rate(container_cpu_usage_seconds_total{{container="app", env="{environment}"}}[5m])',
'threshold': 0.8, # 80% CPU使用率阈值
'operator': '<',
'description': 'Container CPU usage'
}
]
validation_results = []
all_passed = True
for rule in validation_rules:
result = self.query_metric(rule['query'], deployment_time)
if result['success'] and result['data'].get('result'):
# 提取指标值
metric_value = float(result['data']['result'][0]['value'][1])
# 验证指标
passed = self._validate_metric_value(
metric_value,
rule['threshold'],
rule['operator']
)
validation_results.append({
'metric': rule['name'],
'description': rule['description'],
'value': metric_value,
'threshold': rule['threshold'],
'operator': rule['operator'],
'passed': passed
})
if not passed:
all_passed = False
else:
validation_results.append({
'metric': rule['name'],
'description': rule['description'],
'error': result.get('error', 'Query failed'),
'passed': False
})
all_passed = False
return {
'success': True,
'all_passed': all_passed,
'results': validation_results,
'validation_window': validation_window,
'environment': environment
}
except Exception as e:
return {
'success': False,
'error': f"Exception during deployment validation: {str(e)}"
}
def _validate_metric_value(self, value: float, threshold: float,
operator: str) -> bool:
"""验证指标值是否符合预期"""
if operator == '==':
return value == threshold
elif operator == '!=':
return value != threshold
elif operator == '>':
return value > threshold
elif operator == '>=':
return value >= threshold
elif operator == '<':
return value < threshold
elif operator == '<=':
return value <= threshold
else:
return False
def wait_for_metric_stabilization(self, query: str,
stabilization_period: int = 60,
check_interval: int = 10) -> Dict[str, Any]:
"""等待指标稳定"""
try:
end_time = datetime.now()
start_time = end_time - timedelta(seconds=stabilization_period)
values = []
current_time = start_time
while current_time <= end_time:
result = self.query_metric(query, current_time.isoformat())
if result['success'] and result['data'].get('result'):
value = float(result['data']['result'][0]['value'][1])
values.append(value)
time.sleep(check_interval)
current_time = datetime.now()
if len(values) < 2:
return {
'success': False,
'error': 'Insufficient data points for stabilization check'
}
# 检查最后几个值是否稳定(变化小于5%)
recent_values = values[-5:] if len(values) >= 5 else values
avg_value = sum(recent_values) / len(recent_values)
max_deviation = max(abs(v - avg_value) for v in recent_values)
stability_ratio = max_deviation / avg_value if avg_value != 0 else 0
is_stable = stability_ratio < 0.05 # 5%以内的变化认为是稳定的
return {
'success': True,
'is_stable': is_stable,
'average_value': avg_value,
'stability_ratio': stability_ratio,
'data_points': len(values)
}
except Exception as e:
return {
'success': False,
'error': f"Exception during stabilization check: {str(e)}"
}
def create_alert_annotation(self, environment: str,
deployment_info: Dict[str, Any]) -> Dict[str, Any]:
"""创建告警注解"""
try:
annotation_data = {
'metric': 'deployment_annotation',
'start': deployment_info.get('start_time'),
'end': deployment_info.get('end_time', datetime.now().isoformat()),
'tags': [
f"environment:{environment}",
f"version:{deployment_info.get('version', 'unknown')}",
f"service:{deployment_info.get('service', 'unknown')}"
],
'text': f"Deployment of {deployment_info.get('service', 'unknown')} "
f"version {deployment_info.get('version', 'unknown')} "
f"to {environment} environment"
}
# 这里应该调用Prometheus的API来创建注解
# 实际实现取决于使用的告警系统(如Grafana)
return {
'success': True,
'message': 'Alert annotation created successfully',
'annotation': annotation_data
}
except Exception as e:
return {
'success': False,
'error': f"Exception during annotation creation: {str(e)}"
}
def get_service_health_score(self, service_name: str,
environment: str,
time_window: int = 300) -> Dict[str, Any]:
"""获取服务健康评分"""
try:
end_time = datetime.now().isoformat()
# 查询多个健康相关指标
metrics = {
'availability': f'avg_over_time(up{{job="{service_name}", env="{environment}"}}[{time_window}s])',
'error_rate': f'rate(http_requests_total{{job="{service_name}", status=~"5..", env="{environment}"}}[{time_window}s])',
'latency_p95': f'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{{job="{service_name}", env="{environment}"}}[{time_window}s]))',
'cpu_usage': f'rate(container_cpu_usage_seconds_total{{container="{service_name}", env="{environment}"}}[{time_window}s])'
}
health_scores = {}
for metric_name, query in metrics.items():
result = self.query_metric(query, end_time)
if result['success'] and result['data'].get('result'):
value = float(result['data']['result'][0]['value'][1])
health_scores[metric_name] = value
# 计算综合健康评分(简化算法)
score = 100.0
if 'availability' in health_scores:
# 可用性每降低1%扣10分
score -= (1 - health_scores['availability']) * 1000
if 'error_rate' in health_scores:
# 错误率每增加0.1%扣5分
score -= health_scores['error_rate'] * 5000
if 'latency_p95' in health_scores:
# 延迟每增加100ms扣5分
score -= health_scores['latency_p95'] * 50
if 'cpu_usage' in health_scores:
# CPU使用率每增加10%扣5分
score -= health_scores['cpu_usage'] * 50
# 确保分数在0-100之间
score = max(0, min(100, score))
return {
'success': True,
'health_score': round(score, 2),
'metrics': health_scores,
'environment': environment,
'service': service_name
}
except Exception as e:
return {
'success': False,
'error': f"Exception during health score calculation: {str(e)}"
}
# 使用示例
# prometheus_manager = PrometheusIntegrationManager(
# prometheus_url="http://prometheus-server:9090",
# auth_token="your-prometheus-token" # 如果需要认证
# )
#
# # 查询指标
# result = prometheus_manager.query_metric('up{job="api"}')
# print(result)
#
# # 验证部署指标
# validation_result = prometheus_manager.validate_deployment_metrics(
# environment="production",
# deployment_time="2025-09-07T10:00:00Z",
# validation_window=300 # 5分钟验证窗口
# )
# print(json.dumps(validation_result, indent=2))
#
# # 等待指标稳定
# stability_result = prometheus_manager.wait_for_metric_stabilization(
# query='rate(http_requests_total[5m])',
# stabilization_period=60,
# check_interval=10
# )
# print(stability_result)
#
# # 获取服务健康评分
# health_result = prometheus_manager.get_service_health_score(
# service_name="user-service",
# environment="production",
# time_window=300
# )
# print(json.dumps(health_result, indent=2))