故障降级与恢复: Redis宕机时,自动降级到本地限流或直接放行
2025/9/7大约 9 分钟
在分布式限流系统中,高可用性是至关重要的设计目标。当核心组件如Redis出现故障时,系统必须具备故障降级和自动恢复的能力,以确保业务的连续性。本章将深入探讨如何设计和实现故障降级与恢复机制,确保在Redis宕机等异常情况下,系统能够自动降级到本地限流或直接放行策略,并在服务恢复后自动恢复正常运行。
故障降级策略设计
降级策略选择
在分布式限流系统中,我们需要根据业务特点和风险承受能力选择合适的故障降级策略:
- 本地限流降级:使用本地内存进行限流,保证基本的限流功能
- 直接放行降级:暂时取消限流,确保业务请求能够正常处理
- 静态规则降级:使用预定义的静态规则进行限流
降级决策机制
// 降级决策枚举
public enum DegradationStrategy {
LOCAL_LIMITING, // 本地限流
DIRECT_PASS, // 直接放行
STATIC_RULES // 静态规则
}
// 降级配置
public class DegradationConfig {
private DegradationStrategy strategy;
private long degradationTimeoutMs;
private boolean autoRecovery;
private List<StaticRule> staticRules;
// 构造函数、getter和setter方法
// ...
}
// 静态规则定义
public class StaticRule {
private String resource;
private long limit;
private TimeUnit timeUnit;
// 构造函数、getter和setter方法
// ...
}
故障检测与降级实现
基于熔断器的故障检测
// 基于Resilience4j的熔断器实现
@Component
public class ResilientRateLimiter {
private final DistributedRateLimiter distributedRateLimiter;
private final LocalRateLimiter localRateLimiter;
private final CircuitBreaker circuitBreaker;
private final DegradationConfig degradationConfig;
private volatile boolean degraded = false;
private volatile long degradationStartTime = 0;
public ResilientRateLimiter(DistributedRateLimiter distributedRateLimiter,
LocalRateLimiter localRateLimiter,
DegradationConfig degradationConfig) {
this.distributedRateLimiter = distributedRateLimiter;
this.localRateLimiter = localRateLimiter;
this.degradationConfig = degradationConfig;
// 配置熔断器
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 失败率阈值50%
.waitDurationInOpenState(Duration.ofSeconds(30)) // 开启状态持续时间
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED)
.slidingWindowSize(10) // 滑动窗口大小
.minimumNumberOfCalls(5) // 最小调用次数
.build();
this.circuitBreaker = CircuitBreaker.of("rate-limiter", circuitBreakerConfig);
// 注册熔断器事件监听器
circuitBreaker.getEventPublisher()
.onStateTransition(this::handleStateTransition);
}
public boolean tryAcquire(String resource, int permits) {
if (degraded) {
// 检查是否应该退出降级状态
if (shouldExitDegradation()) {
degraded = false;
log.info("Exiting degradation mode for resource: {}", resource);
} else {
// 在降级状态下执行降级策略
return executeDegradationStrategy(resource, permits);
}
}
// 使用熔断器包装分布式限流操作
Supplier<Boolean> decoratedSupplier = CircuitBreaker
.decorateSupplier(circuitBreaker,
() -> distributedRateLimiter.tryAcquire(resource, permits));
try {
return decoratedSupplier.get();
} catch (Exception e) {
log.warn("Distributed rate limiting failed for resource: {}, entering degradation mode",
resource, e);
// 进入降级状态
enterDegradationMode();
// 执行降级策略
return executeDegradationStrategy(resource, permits);
}
}
private boolean executeDegradationStrategy(String resource, int permits) {
switch (degradationConfig.getStrategy()) {
case LOCAL_LIMITING:
return localRateLimiter.tryAcquire(resource, permits);
case DIRECT_PASS:
return true; // 直接放行
case STATIC_RULES:
return checkStaticRules(resource, permits);
default:
return true; // 默认直接放行
}
}
private boolean checkStaticRules(String resource, int permits) {
List<StaticRule> staticRules = degradationConfig.getStaticRules();
if (staticRules == null || staticRules.isEmpty()) {
return true; // 没有静态规则,默认放行
}
// 查找匹配的静态规则
for (StaticRule rule : staticRules) {
if (rule.getResource().equals(resource)) {
// 简化的本地计数实现
return localCounter.tryAcquire(resource, rule.getLimit(), permits);
}
}
return true; // 没有匹配规则,默认放行
}
private void enterDegradationMode() {
degraded = true;
degradationStartTime = System.currentTimeMillis();
// 发布降级事件
eventBus.post(new DegradationEvent(DegradationReason.REDIS_FAILURE));
log.warn("Entered degradation mode, strategy: {}", degradationConfig.getStrategy());
}
private boolean shouldExitDegradation() {
if (!degradationConfig.isAutoRecovery()) {
return false;
}
long elapsed = System.currentTimeMillis() - degradationStartTime;
return elapsed > degradationConfig.getDegradationTimeoutMs();
}
private void handleStateTransition(CircuitBreaker.StateTransition transition) {
log.info("Circuit breaker state transition: {} -> {}",
transition.getFromState(), transition.getToState());
// 根据状态转换执行相应操作
switch (transition.getToState()) {
case CLOSED:
log.info("Circuit breaker closed, service recovered");
break;
case OPEN:
log.warn("Circuit breaker opened, service unavailable");
break;
case HALF_OPEN:
log.info("Circuit breaker half-open, testing service recovery");
break;
}
}
}
基于健康检查的故障检测
// 健康检查实现
@Component
public class HealthCheckBasedDegradation {
private final RedisTemplate<String, String> redisTemplate;
private final LocalRateLimiter localRateLimiter;
private final ScheduledExecutorService healthCheckScheduler;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final int maxFailureCount = 3;
private volatile boolean redisHealthy = true;
private volatile boolean degraded = false;
public HealthCheckBasedDegradation(RedisTemplate<String, String> redisTemplate,
LocalRateLimiter localRateLimiter) {
this.redisTemplate = redisTemplate;
this.localRateLimiter = localRateLimiter;
this.healthCheckScheduler = Executors.newScheduledThreadPool(1);
// 启动定期健康检查
startHealthCheck();
}
public boolean tryAcquire(String resource, int permits) {
if (degraded || !redisHealthy) {
// 在降级或Redis不健康状态下使用本地限流
return localRateLimiter.tryAcquire(resource, permits);
}
try {
// 尝试使用分布式限流
boolean result = doDistributedAcquire(resource, permits);
// 重置失败计数
failureCount.set(0);
return result;
} catch (Exception e) {
log.warn("Distributed rate limiting failed, failure count: {}",
failureCount.incrementAndGet(), e);
// 检查是否需要进入降级状态
if (failureCount.get() >= maxFailureCount) {
enterDegradationMode();
}
// 降级到本地限流
return localRateLimiter.tryAcquire(resource, permits);
}
}
private boolean doDistributedAcquire(String resource, int permits) {
// 实际的分布式限流实现
// 这里简化处理,实际实现会调用Redis或其他分布式存储
return true;
}
private void startHealthCheck() {
healthCheckScheduler.scheduleAtFixedRate(this::performHealthCheck,
0, 5, TimeUnit.SECONDS);
}
private void performHealthCheck() {
try {
// 执行简单的Redis健康检查
String pong = redisTemplate.getConnectionFactory()
.getConnection().ping();
if ("PONG".equals(pong)) {
// Redis健康
redisHealthy = true;
// 如果之前不健康,现在恢复了,检查是否可以退出降级状态
if (!degraded && failureCount.get() > 0) {
failureCount.set(0);
} else if (degraded) {
// 检查是否可以恢复
checkRecovery();
}
} else {
// Redis不健康
handleRedisUnhealthy();
}
} catch (Exception e) {
log.warn("Redis health check failed", e);
handleRedisUnhealthy();
}
}
private void handleRedisUnhealthy() {
redisHealthy = false;
if (failureCount.incrementAndGet() >= maxFailureCount && !degraded) {
enterDegradationMode();
}
}
private void enterDegradationMode() {
if (!degraded) {
degraded = true;
log.warn("Entering degradation mode due to Redis failure");
eventBus.post(new DegradationEvent(DegradationReason.REDIS_UNHEALTHY));
}
}
private void checkRecovery() {
try {
// 尝试执行一个简单的操作来验证Redis是否真正恢复
redisTemplate.opsForValue().get("health_check_test");
// 如果成功,可以考虑退出降级状态
if (failureCount.get() == 0) {
exitDegradationMode();
}
} catch (Exception e) {
log.debug("Recovery check failed, Redis may not be fully recovered", e);
}
}
private void exitDegradationMode() {
degraded = false;
redisHealthy = true;
log.info("Exiting degradation mode, Redis service recovered");
eventBus.post(new RecoveryEvent(RecoveryReason.REDIS_RECOVERED));
}
}
本地限流实现
高性能本地限流器
// 高性能本地限流器实现
@Component
public class HighPerformanceLocalRateLimiter {
private final Cache<String, LocalRateLimiter> resourceLimiters;
private final RuleManager ruleManager;
public HighPerformanceLocalRateLimiter(RuleManager ruleManager) {
this.ruleManager = ruleManager;
this.resourceLimiters = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
}
public boolean tryAcquire(String resource, int permits) {
try {
LocalRateLimiter limiter = resourceLimiters.get(resource,
k -> createLimiter(resource));
return limiter.tryAcquire(permits);
} catch (Exception e) {
log.error("Local rate limiting failed for resource: {}", resource, e);
// 失败时默认允许通过
return true;
}
}
private LocalRateLimiter createLimiter(String resource) {
RateLimitRule rule = ruleManager.getRule(resource);
if (rule == null) {
// 没有规则,默认创建一个宽松的限流器
return new LocalRateLimiter(1000, TimeUnit.SECONDS);
}
return new LocalRateLimiter(rule.getLimit(), rule.getTimeUnit());
}
}
// 本地限流器实现
public class LocalRateLimiter {
private final long limit;
private final TimeUnit timeUnit;
private final AtomicLong counter = new AtomicLong(0);
private volatile long windowStart = System.currentTimeMillis();
private final long windowMillis;
public LocalRateLimiter(long limit, TimeUnit timeUnit) {
this.limit = limit;
this.timeUnit = timeUnit;
this.windowMillis = timeUnit.toMillis(1);
}
public boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
// 检查是否需要重置窗口
if (now - windowStart >= windowMillis) {
synchronized (this) {
if (now - windowStart >= windowMillis) {
counter.set(0);
windowStart = now;
}
}
}
// 尝试获取令牌
long current = counter.get();
if (current + permits <= limit) {
return counter.compareAndSet(current, current + permits);
}
return false;
}
}
滑动窗口本地限流
// 滑动窗口本地限流实现
public class SlidingWindowLocalRateLimiter {
private final long limit;
private final long windowMillis;
private final Queue<RequestRecord> requestRecords = new ConcurrentLinkedQueue<>();
private final ReentrantLock lock = new ReentrantLock();
public SlidingWindowLocalRateLimiter(long limit, long windowMillis) {
this.limit = limit;
this.windowMillis = windowMillis;
}
public boolean tryAcquire(int permits) {
long now = System.currentTimeMillis();
long windowStart = now - windowMillis;
lock.lock();
try {
// 清除过期的请求记录
while (!requestRecords.isEmpty() &&
requestRecords.peek().getTimestamp() < windowStart) {
requestRecords.poll();
}
// 检查是否超过限制
if (requestRecords.size() + permits > limit) {
return false;
}
// 记录当前请求
for (int i = 0; i < permits; i++) {
requestRecords.offer(new RequestRecord(now));
}
return true;
} finally {
lock.unlock();
}
}
// 请求记录
private static class RequestRecord {
private final long timestamp;
public RequestRecord(long timestamp) {
this.timestamp = timestamp;
}
public long getTimestamp() {
return timestamp;
}
}
}
自动恢复机制
恢复检测与验证
// 自动恢复机制实现
@Component
public class AutoRecoveryManager {
private final RedisTemplate<String, String> redisTemplate;
private final ScheduledExecutorService recoveryScheduler;
private final AtomicInteger recoveryAttempts = new AtomicInteger(0);
private final int maxRecoveryAttempts = 5;
private volatile boolean recoveryInProgress = false;
public AutoRecoveryManager(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
this.recoveryScheduler = Executors.newScheduledThreadPool(1);
}
public void startRecoveryProcess() {
if (recoveryInProgress) {
log.debug("Recovery process already in progress");
return;
}
recoveryInProgress = true;
recoveryAttempts.set(0);
log.info("Starting automatic recovery process");
eventBus.post(new RecoveryStartEvent());
// 启动恢复检测任务
scheduleRecoveryCheck();
}
private void scheduleRecoveryCheck() {
recoveryScheduler.schedule(this::checkRecovery, 10, TimeUnit.SECONDS);
}
private void checkRecovery() {
if (!recoveryInProgress) {
return;
}
try {
// 执行恢复检测
if (performRecoveryCheck()) {
// 恢复成功
handleRecoverySuccess();
} else {
// 恢复失败,继续尝试
handleRecoveryFailure();
}
} catch (Exception e) {
log.error("Recovery check failed", e);
handleRecoveryFailure();
}
}
private boolean performRecoveryCheck() {
try {
// 1. 检查Redis连接
String pong = redisTemplate.getConnectionFactory().getConnection().ping();
if (!"PONG".equals(pong)) {
return false;
}
// 2. 执行简单的读写操作测试
String testKey = "recovery_test_" + System.currentTimeMillis();
redisTemplate.opsForValue().set(testKey, "test", 10, TimeUnit.SECONDS);
String value = redisTemplate.opsForValue().get(testKey);
if (!"test".equals(value)) {
return false;
}
// 3. 清理测试数据
redisTemplate.delete(testKey);
// 4. 检查关键业务操作
return performBusinessValidation();
} catch (Exception e) {
log.debug("Recovery check failed: {}", e.getMessage());
return false;
}
}
private boolean performBusinessValidation() {
try {
// 执行业务相关的验证操作
// 例如:测试限流相关的关键操作
return true;
} catch (Exception e) {
log.debug("Business validation failed: {}", e.getMessage());
return false;
}
}
private void handleRecoverySuccess() {
log.info("Automatic recovery successful");
recoveryInProgress = false;
recoveryAttempts.set(0);
eventBus.post(new RecoverySuccessEvent());
// 通知相关组件恢复
notifyRecoveryToComponents();
}
private void handleRecoveryFailure() {
int attempts = recoveryAttempts.incrementAndGet();
if (attempts >= maxRecoveryAttempts) {
log.warn("Maximum recovery attempts reached, giving up");
recoveryInProgress = false;
eventBus.post(new RecoveryFailedEvent("Max attempts reached"));
} else {
log.info("Recovery attempt {} failed, retrying...", attempts);
scheduleRecoveryCheck();
}
}
private void notifyRecoveryToComponents() {
// 通知其他组件服务已恢复
eventBus.post(new ServiceRecoveredEvent());
}
}
状态同步与数据恢复
// 状态同步与数据恢复
@Component
public class StateSyncManager {
private final RedisTemplate<String, String> redisTemplate;
private final LocalRateLimiter localRateLimiter;
private final ScheduledExecutorService syncScheduler;
public StateSyncManager(RedisTemplate<String, String> redisTemplate,
LocalRateLimiter localRateLimiter) {
this.redisTemplate = redisTemplate;
this.localRateLimiter = localRateLimiter;
this.syncScheduler = Executors.newScheduledThreadPool(1);
// 在恢复后同步本地状态到Redis
eventBus.register(this);
}
@Subscribe
public void handleServiceRecovered(ServiceRecoveredEvent event) {
log.info("Service recovered, starting state synchronization");
// 延迟执行状态同步,确保服务完全恢复
syncScheduler.schedule(this::syncLocalStateToRedis, 5, TimeUnit.SECONDS);
}
private void syncLocalStateToRedis() {
try {
// 同步本地限流状态到Redis
// 这里简化处理,实际实现需要根据具体的数据结构进行同步
log.info("State synchronization completed");
eventBus.post(new StateSyncCompletedEvent());
} catch (Exception e) {
log.error("State synchronization failed", e);
eventBus.post(new StateSyncFailedEvent(e.getMessage()));
}
}
}
监控与告警
降级状态监控
// 降级状态监控
@Component
public class DegradationMetrics {
private final MeterRegistry meterRegistry;
private final Counter degradationCounter;
private final Counter recoveryCounter;
private final Timer degradationDurationTimer;
private final AtomicInteger currentDegradedResources;
public DegradationMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.degradationCounter = Counter.builder("rate_limiter.degradations")
.description("Number of degradation events")
.register(meterRegistry);
this.recoveryCounter = Counter.builder("rate_limiter.recoveries")
.description("Number of recovery events")
.register(meterRegistry);
this.degradationDurationTimer = Timer.builder("rate_limiter.degradation.duration")
.description("Duration of degradation states")
.register(meterRegistry);
this.currentDegradedResources = meterRegistry.gauge("rate_limiter.degraded.resources",
new AtomicInteger(0));
}
public void recordDegradation(DegradationReason reason) {
degradationCounter.increment(Tag.of("reason", reason.name()));
log.warn("Degradation recorded: {}", reason);
}
public void recordRecovery(RecoveryReason reason) {
recoveryCounter.increment(Tag.of("reason", reason.name()));
log.info("Recovery recorded: {}", reason);
}
public Timer.Sample startDegradationTimer() {
currentDegradedResources.incrementAndGet();
return Timer.start(meterRegistry);
}
public void stopDegradationTimer(Timer.Sample sample) {
currentDegradedResources.decrementAndGet();
sample.stop(degradationDurationTimer);
}
}
告警机制
// 告警机制实现
@Component
public class DegradationAlertManager {
private final AlertService alertService;
private final AtomicInteger consecutiveDegradations = new AtomicInteger(0);
private final int alertThreshold = 3;
public DegradationAlertManager(AlertService alertService) {
this.alertService = alertService;
}
@Subscribe
public void handleDegradationEvent(DegradationEvent event) {
int count = consecutiveDegradations.incrementAndGet();
if (count >= alertThreshold) {
// 发送告警
sendDegradationAlert(event.getReason(), count);
}
// 记录降级日志
logDegradation(event);
}
@Subscribe
public void handleRecoveryEvent(RecoveryEvent event) {
// 重置连续降级计数
consecutiveDegradations.set(0);
// 发送恢复通知
sendRecoveryNotification(event.getReason());
// 记录恢复日志
logRecovery(event);
}
private void sendDegradationAlert(DegradationReason reason, int count) {
Alert alert = Alert.builder()
.level(AlertLevel.CRITICAL)
.title("Rate Limiter Degradation")
.message(String.format("Rate limiter has been degraded %d times consecutively due to %s",
count, reason))
.timestamp(System.currentTimeMillis())
.build();
alertService.sendAlert(alert);
}
private void sendRecoveryNotification(RecoveryReason reason) {
Alert alert = Alert.builder()
.level(AlertLevel.INFO)
.title("Rate Limiter Recovery")
.message(String.format("Rate limiter has recovered from %s", reason))
.timestamp(System.currentTimeMillis())
.build();
alertService.sendAlert(alert);
}
private void logDegradation(DegradationEvent event) {
log.warn("Rate limiter degradation: reason={}, timestamp={}",
event.getReason(), event.getTimestamp());
}
private void logRecovery(RecoveryEvent event) {
log.info("Rate limiter recovery: reason={}, timestamp={}",
event.getReason(), event.getTimestamp());
}
}
通过以上实现,我们构建了一个完整的故障降级与恢复机制,能够在Redis等核心组件出现故障时自动切换到备用方案,并在服务恢复后自动恢复正常运行。这种设计确保了分布式限流系统的高可用性和业务连续性。