高可用设计: 控制面无状态、数据面本地降级、存储多活
2025/9/7大约 8 分钟
在构建企业级分布式限流平台时,高可用性是确保系统稳定运行的关键要求。面对复杂的分布式环境和各种潜在的故障场景,我们需要从控制面、数据面和存储层三个维度进行全面的高可用设计。本章将深入探讨如何通过控制面无状态、数据面本地降级和存储多活等策略来构建高可用的分布式限流平台。
控制面无状态设计
无状态架构优势
控制面作为分布式限流平台的"大脑",负责规则管理、策略分发等核心功能。采用无状态设计可以带来以下优势:
- 弹性伸缩:可以根据负载情况动态增减节点
- 故障恢复:节点故障不会导致数据丢失
- 负载均衡:请求可以均匀分布到各个节点
- 部署简化:无需考虑节点间的数据同步
实现方案
// 无状态控制面实现
@RestController
public class StatelessControlPlane {
// 注入外部服务,避免在类中维护状态
private final RuleService ruleService;
private final NodeDiscoveryService nodeDiscoveryService;
private final ConfigService configService;
public StatelessControlPlane(RuleService ruleService,
NodeDiscoveryService nodeDiscoveryService,
ConfigService configService) {
this.ruleService = ruleService;
this.nodeDiscoveryService = nodeDiscoveryService;
this.configService = configService;
}
@PostMapping("/rules")
public ResponseEntity<Rule> createRule(@RequestBody Rule rule) {
// 规则存储到外部存储(如数据库、配置中心)
Rule createdRule = ruleService.createRule(rule);
// 通过外部通知机制分发规则
notifyRuleChange(createdRule, RuleChangeType.CREATE);
return ResponseEntity.ok(createdRule);
}
@GetMapping("/rules/{id}")
public ResponseEntity<Rule> getRule(@PathVariable String id) {
return ruleService.getRule(id)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/nodes")
public ResponseEntity<List<NodeInfo>> getActiveNodes() {
// 通过服务发现机制获取活跃节点
List<NodeInfo> nodes = nodeDiscoveryService.getActiveNodes();
return ResponseEntity.ok(nodes);
}
private void notifyRuleChange(Rule rule, RuleChangeType changeType) {
// 使用消息队列或事件总线通知所有数据面节点
RuleChangeEvent event = new RuleChangeEvent(rule, changeType);
eventBus.post(event);
}
}负载均衡与故障转移
// 控制面负载均衡实现
@Component
public class ControlPlaneLoadBalancer {
private final ServiceDiscovery serviceDiscovery;
private final LoadBalancingStrategy strategy;
public ControlPlaneNode selectNode() {
// 获取所有可用的控制面节点
List<ControlPlaneNode> nodes = serviceDiscovery.getAvailableNodes();
if (nodes.isEmpty()) {
throw new NoAvailableNodeException("No available control plane nodes");
}
// 根据负载均衡策略选择节点
return strategy.select(nodes);
}
public <T> T executeWithFailover(ControlPlaneOperation<T> operation) {
List<ControlPlaneNode> nodes = serviceDiscovery.getAvailableNodes();
Exception lastException = null;
// 尝试所有可用节点
for (ControlPlaneNode node : nodes) {
try {
return operation.execute(node);
} catch (Exception e) {
lastException = e;
log.warn("Operation failed on node {}, trying next node", node.getId(), e);
}
}
// 所有节点都失败了
throw new ControlPlaneUnavailableException("All control plane nodes unavailable", lastException);
}
}数据面本地降级
降级策略设计
数据面作为限流策略的执行者,在面对控制面或存储层故障时,需要具备本地降级能力,确保业务连续性。
// 数据面本地降级实现
public class LocalDegradationDataPlane {
private final DistributedRateLimiter distributedRateLimiter;
private final LocalRateLimiter localRateLimiter;
private final HealthChecker healthChecker;
private volatile boolean degraded = false;
private volatile long lastDegradationTime = 0;
public RateLimitingResult tryAcquire(String resource, int permits) {
if (degraded) {
// 降级模式下使用本地限流器
return localRateLimiter.tryAcquire(resource, permits);
}
try {
// 正常模式下使用分布式限流器
return distributedRateLimiter.tryAcquire(resource, permits);
} catch (Exception e) {
// 检测到异常,切换到降级模式
handleDegradation(e);
return localRateLimiter.tryAcquire(resource, permits);
}
}
private void handleDegradation(Exception e) {
// 检查是否需要切换到降级模式
if (shouldDegrade(e)) {
log.warn("Switching to local degradation mode due to: {}", e.getMessage());
degraded = true;
lastDegradationTime = System.currentTimeMillis();
// 记录降级事件
eventBus.post(new DegradationEvent(DegradationReason.DISTRIBUTED_FAILURE, e));
// 启动恢复检测任务
startRecoveryCheck();
}
}
private boolean shouldDegrade(Exception e) {
// 判断是否为需要降级的异常类型
return e instanceof NetworkException ||
e instanceof TimeoutException ||
e instanceof StorageUnavailableException;
}
private void startRecoveryCheck() {
// 定时检查分布式服务是否恢复
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
// 尝试执行一个简单的分布式操作来检测服务状态
if (distributedRateLimiter.healthCheck()) {
log.info("Distributed service recovered, switching back to normal mode");
degraded = false;
// 发布恢复事件
eventBus.post(new RecoveryEvent(RecoveryReason.DISTRIBUTED_RECOVERED));
}
} catch (Exception e) {
// 服务仍未恢复,继续降级
log.debug("Distributed service still unavailable: {}", e.getMessage());
}
}, 30, 30, TimeUnit.SECONDS); // 每30秒检查一次
}
}本地限流器实现
// 本地限流器实现
public class LocalRateLimiter {
private final Cache<String, RateLimiter> resourceLimiters;
private final LocalRuleStore localRuleStore;
public LocalRateLimiter(LocalRuleStore localRuleStore) {
this.localRuleStore = localRuleStore;
this.resourceLimiters = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
}
public RateLimitingResult tryAcquire(String resource, int permits) {
try {
// 获取资源对应的规则
LimitingRule rule = localRuleStore.getRule(resource);
if (rule == null || !rule.isEnabled()) {
return RateLimitingResult.allowed();
}
// 获取或创建限流器
RateLimiter limiter = resourceLimiters.get(resource,
k -> createRateLimiter(rule));
// 执行限流判断
if (limiter.tryAcquire(permits)) {
return RateLimitingResult.allowed();
} else {
return RateLimitingResult.rejected("Local rate limit exceeded");
}
} catch (Exception e) {
log.error("Local rate limiting failed for resource: {}", resource, e);
// 本地限流失败时,默认允许通过以保证业务连续性
return RateLimitingResult.allowed();
}
}
private RateLimiter createRateLimiter(LimitingRule rule) {
switch (rule.getStrategy()) {
case TOKEN_BUCKET:
return RateLimiter.create(rule.getThreshold());
case FIXED_WINDOW:
return new FixedWindowRateLimiter(rule.getThreshold(),
rule.getTimeUnit().toMillis(1));
default:
return RateLimiter.create(rule.getThreshold());
}
}
}存储多活设计
多活架构模式
存储层作为分布式限流平台的基础支撑,需要采用多活架构来确保数据的高可用性和一致性。
// 多活存储实现
public class MultiLiveStorage {
private final List<StorageNode> storageNodes;
private final ConsistencyProtocol consistencyProtocol;
private final FailureDetector failureDetector;
public MultiLiveStorage(List<StorageNode> storageNodes,
ConsistencyProtocol consistencyProtocol,
FailureDetector failureDetector) {
this.storageNodes = new CopyOnWriteArrayList<>(storageNodes);
this.consistencyProtocol = consistencyProtocol;
this.failureDetector = failureDetector;
// 启动故障检测
startFailureDetection();
}
public void write(String key, String value) {
// 根据一致性协议写入数据
consistencyProtocol.write(storageNodes, key, value);
}
public String read(String key) {
// 根据一致性协议读取数据
return consistencyProtocol.read(storageNodes, key);
}
private void startFailureDetection() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
List<StorageNode> failedNodes = new ArrayList<>();
// 检查所有节点的健康状态
for (StorageNode node : storageNodes) {
if (!failureDetector.isAlive(node)) {
failedNodes.add(node);
}
}
// 处理故障节点
if (!failedNodes.isEmpty()) {
handleNodeFailures(failedNodes);
}
}, 0, 5, TimeUnit.SECONDS); // 每5秒检查一次
}
private void handleNodeFailures(List<StorageNode> failedNodes) {
// 从活跃节点列表中移除故障节点
storageNodes.removeAll(failedNodes);
// 触发数据重新分片
rebalanceData();
// 发布节点故障事件
eventBus.post(new StorageNodeFailureEvent(failedNodes));
}
}一致性协议实现
// 一致性协议实现
public class ConsistencyProtocol {
// 强一致性写入
public void strongWrite(List<StorageNode> nodes, String key, String value) {
// 所有节点都必须写入成功
List<CompletableFuture<Boolean>> futures = nodes.stream()
.map(node -> CompletableFuture.supplyAsync(() -> {
try {
node.write(key, value);
return true;
} catch (Exception e) {
log.error("Failed to write to node: {}", node.getId(), e);
return false;
}
}))
.collect(Collectors.toList());
// 等待所有写入完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 检查写入结果
boolean allSuccess = futures.stream()
.map(CompletableFuture::join)
.allMatch(success -> success);
if (!allSuccess) {
throw new WriteFailureException("Failed to write to all nodes");
}
}
// 最终一致性写入
public void eventualWrite(List<StorageNode> nodes, String key, String value) {
// 至少写入多数节点即可返回
int quorum = nodes.size() / 2 + 1;
AtomicInteger successCount = new AtomicInteger(0);
List<CompletableFuture<Void>> futures = nodes.stream()
.map(node -> CompletableFuture.runAsync(() -> {
try {
node.write(key, value);
successCount.incrementAndGet();
} catch (Exception e) {
log.error("Failed to write to node: {}", node.getId(), e);
}
}))
.collect(Collectors.toList());
// 等待足够多的节点写入成功
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
if (successCount.get() >= quorum) {
log.debug("Write succeeded on {} nodes, quorum achieved", successCount.get());
} else {
log.warn("Write only succeeded on {} nodes, quorum not achieved", successCount.get());
}
});
}
// 读取数据
public String read(List<StorageNode> nodes, String key) {
// 优先从健康节点读取
for (StorageNode node : nodes) {
try {
return node.read(key);
} catch (Exception e) {
log.warn("Failed to read from node: {}", node.getId(), e);
}
}
throw new ReadFailureException("Failed to read from all nodes");
}
}故障检测与恢复
健康检查机制
// 健康检查实现
@Component
public class HealthChecker {
private final List<HealthIndicator> healthIndicators;
private final EventBus eventBus;
public HealthStatus checkHealth() {
List<HealthIndicatorResult> results = new ArrayList<>();
// 检查所有健康指标
for (HealthIndicator indicator : healthIndicators) {
try {
HealthIndicatorResult result = indicator.check();
results.add(result);
} catch (Exception e) {
log.error("Health check failed for indicator: {}",
indicator.getClass().getSimpleName(), e);
results.add(new HealthIndicatorResult(
indicator.getClass().getSimpleName(),
HealthStatus.DOWN,
e.getMessage()));
}
}
// 根据检查结果判断整体健康状态
boolean allUp = results.stream()
.allMatch(result -> result.getStatus() == HealthStatus.UP);
boolean anyDown = results.stream()
.anyMatch(result -> result.getStatus() == HealthStatus.DOWN);
if (allUp) {
return HealthStatus.UP;
} else if (anyDown) {
return HealthStatus.DOWN;
} else {
return HealthStatus.DEGRADED;
}
}
@Scheduled(fixedRate = 10000) // 每10秒检查一次
public void performHealthCheck() {
HealthStatus status = checkHealth();
// 发布健康状态变更事件
if (status != previousStatus) {
eventBus.post(new HealthStatusChangedEvent(status, previousStatus));
previousStatus = status;
}
}
}自动恢复机制
// 自动恢复实现
@Component
public class AutoRecoveryManager {
private final RecoveryStrategy recoveryStrategy;
private final EventBus eventBus;
@EventListener
public void handleFailureEvent(FailureEvent event) {
// 根据故障类型选择恢复策略
switch (event.getFailureType()) {
case CONTROL_PLANE_FAILURE:
recoverControlPlane(event);
break;
case DATA_PLANE_FAILURE:
recoverDataPlane(event);
break;
case STORAGE_FAILURE:
recoverStorage(event);
break;
default:
log.warn("Unknown failure type: {}", event.getFailureType());
}
}
private void recoverControlPlane(FailureEvent event) {
log.info("Starting control plane recovery...");
try {
// 执行控制面恢复策略
recoveryStrategy.recoverControlPlane(event);
// 发布恢复完成事件
eventBus.post(new RecoveryCompletedEvent(
RecoveryTarget.CONTROL_PLANE,
System.currentTimeMillis()));
log.info("Control plane recovery completed successfully");
} catch (Exception e) {
log.error("Control plane recovery failed", e);
eventBus.post(new RecoveryFailedEvent(
RecoveryTarget.CONTROL_PLANE,
e.getMessage()));
}
}
private void recoverDataPlane(FailureEvent event) {
log.info("Starting data plane recovery...");
try {
// 执行数据面恢复策略
recoveryStrategy.recoverDataPlane(event);
log.info("Data plane recovery completed successfully");
} catch (Exception e) {
log.error("Data plane recovery failed", e);
}
}
private void recoverStorage(FailureEvent event) {
log.info("Starting storage recovery...");
try {
// 执行存储层恢复策略
recoveryStrategy.recoverStorage(event);
log.info("Storage recovery completed successfully");
} catch (Exception e) {
log.error("Storage recovery failed", e);
}
}
}监控与告警
高可用指标监控
// 高可用指标监控实现
@Component
public class HighAvailabilityMetrics {
private final MeterRegistry meterRegistry;
private final Counter controlPlaneFailureCounter;
private final Counter dataPlaneFailureCounter;
private final Counter storageFailureCounter;
private final Timer recoveryTimeTimer;
public HighAvailabilityMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.controlPlaneFailureCounter = Counter.builder("ha.controlplane.failures")
.description("Control plane failure count")
.register(meterRegistry);
this.dataPlaneFailureCounter = Counter.builder("ha.dataplane.failures")
.description("Data plane failure count")
.register(meterRegistry);
this.storageFailureCounter = Counter.builder("ha.storage.failures")
.description("Storage failure count")
.register(meterRegistry);
this.recoveryTimeTimer = Timer.builder("ha.recovery.time")
.description("Recovery time")
.register(meterRegistry);
}
public void recordControlPlaneFailure() {
controlPlaneFailureCounter.increment();
}
public void recordDataPlaneFailure() {
dataPlaneFailureCounter.increment();
}
public void recordStorageFailure() {
storageFailureCounter.increment();
}
public void recordRecoveryTime(long durationMillis) {
recoveryTimeTimer.record(durationMillis, TimeUnit.MILLISECONDS);
}
}通过以上高可用设计,分布式限流平台能够在面对各种故障场景时保持稳定运行。控制面的无状态设计确保了其弹性伸缩和故障恢复能力,数据面的本地降级机制保证了业务连续性,存储层的多活架构提供了数据的高可用性。这些设计相互配合,共同构建了一个健壮、可靠的分布式限流平台。
