API 网关的限流、熔断与降级功能:构建高可用微服务系统的防护机制
2025/8/30大约 8 分钟
在现代分布式系统中,高并发和系统稳定性是两个核心挑战。API 网关作为系统的统一入口,承担着保护后端服务免受过载和故障影响的重要职责。通过限流、熔断和降级机制,API 网关能够有效控制系统流量,防止级联故障,确保系统的高可用性。本文将深入探讨这些关键功能的实现机制和最佳实践。
限流机制(Rate Limiting)
什么是限流
限流是一种流量控制策略,通过限制单位时间内处理的请求数量来保护系统资源,防止系统因突发流量而过载。
限流算法
固定窗口算法
固定窗口算法是最简单的限流算法,将时间划分为固定长度的窗口,在每个窗口内限制请求数量。
// 示例:固定窗口限流实现
@Component
public class FixedWindowRateLimiter {
private final Map<String, WindowCounter> counters = new ConcurrentHashMap<>();
private final long windowSizeInMillis;
private final int maxRequests;
public FixedWindowRateLimiter(long windowSizeInMillis, int maxRequests) {
this.windowSizeInMillis = windowSizeInMillis;
this.maxRequests = maxRequests;
}
public boolean isAllowed(String key) {
long currentTime = System.currentTimeMillis();
long windowStart = currentTime - (currentTime % windowSizeInMillis);
WindowCounter counter = counters.computeIfAbsent(key, k ->
new WindowCounter(windowStart, 0));
if (counter.windowStart == windowStart) {
if (counter.count < maxRequests) {
counter.count++;
return true;
}
return false;
} else {
counter.windowStart = windowStart;
counter.count = 1;
return true;
}
}
private static class WindowCounter {
volatile long windowStart;
volatile int count;
WindowCounter(long windowStart, int count) {
this.windowStart = windowStart;
this.count = count;
}
}
}滑动窗口算法
滑动窗口算法通过维护一个时间窗口内的请求计数,提供更平滑的限流效果。
// 示例:滑动窗口限流实现
@Component
public class SlidingWindowRateLimiter {
private final Map<String, Deque<Long>> windows = new ConcurrentHashMap<>();
private final long windowSizeInMillis;
private final int maxRequests;
public SlidingWindowRateLimiter(long windowSizeInMillis, int maxRequests) {
this.windowSizeInMillis = windowSizeInMillis;
this.maxRequests = maxRequests;
}
public synchronized boolean isAllowed(String key) {
long currentTime = System.currentTimeMillis();
Deque<Long> window = windows.computeIfAbsent(key, k -> new LinkedList<>());
// 移除窗口外的请求记录
while (!window.isEmpty() && window.peekFirst() < currentTime - windowSizeInMillis) {
window.pollFirst();
}
// 检查是否超过限流阈值
if (window.size() < maxRequests) {
window.addLast(currentTime);
return true;
}
return false;
}
}令牌桶算法
令牌桶算法通过以固定速率生成令牌,请求需要消耗令牌才能被处理。
// 示例:令牌桶限流实现
@Component
public class TokenBucketRateLimiter {
private final long capacity;
private final long refillTokensPerOneSecond;
private final AtomicLong availableTokens;
private final AtomicLong lastRefillTimestamp;
public TokenBucketRateLimiter(long capacity, long refillTokensPerOneSecond) {
this.capacity = capacity;
this.refillTokensPerOneSecond = refillTokensPerOneSecond;
this.availableTokens = new AtomicLong(capacity);
this.lastRefillTimestamp = new AtomicLong(System.nanoTime());
}
public boolean isAllowed() {
refill();
return availableTokens.getAndDecrement() > 0;
}
private void refill() {
long now = System.nanoTime();
long tokensToAdd = (now - lastRefillTimestamp.get()) * refillTokensPerOneSecond / 1_000_000_000;
if (tokensToAdd > 0) {
lastRefillTimestamp.set(now);
availableTokens.accumulateAndGet(tokensToAdd, (current, toAdd) ->
Math.min(current + toAdd, capacity));
}
}
}漏桶算法
漏桶算法通过固定速率处理请求,多余的请求会被丢弃或排队。
// 示例:漏桶限流实现
@Component
public class LeakyBucketRateLimiter {
private final Queue<Long> queue;
private final long capacity;
private final long leakRatePerSecond;
private final AtomicLong lastLeakTimestamp;
public LeakyBucketRateLimiter(long capacity, long leakRatePerSecond) {
this.queue = new ConcurrentLinkedQueue<>();
this.capacity = capacity;
this.leakRatePerSecond = leakRatePerSecond;
this.lastLeakTimestamp = new AtomicLong(System.currentTimeMillis());
}
public synchronized boolean isAllowed() {
leak();
if (queue.size() < capacity) {
queue.offer(System.currentTimeMillis());
return true;
}
return false;
}
private void leak() {
long now = System.currentTimeMillis();
long timeSinceLastLeak = now - lastLeakTimestamp.get();
long tokensToLeak = timeSinceLastLeak * leakRatePerSecond / 1000;
if (tokensToLeak > 0) {
lastLeakTimestamp.set(now);
for (int i = 0; i < tokensToLeak && !queue.isEmpty(); i++) {
queue.poll();
}
}
}
}限流维度
全局限流
对整个 API 网关的请求进行限流:
# 示例:全局限流配置
spring:
cloud:
gateway:
routes:
- id: global-rate-limit
uri: lb://service
predicates:
- Path=/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 100
redis-rate-limiter.burstCapacity: 200用户级限流
基于用户身份进行限流:
// 示例:用户级限流实现
@Component
public class UserRateLimiter {
private final Map<String, RateLimiter> userLimiters = new ConcurrentHashMap<>();
private final int defaultMaxRequests;
private final long defaultWindowSize;
public boolean isAllowed(String userId) {
RateLimiter limiter = userLimiters.computeIfAbsent(userId,
id -> new TokenBucketRateLimiter(defaultMaxRequests, defaultWindowSize));
return limiter.isAllowed();
}
}API 级限流
基于 API 路径进行限流:
// 示例:API 级限流实现
@Component
public class ApiRateLimiter {
private final Map<String, RateLimiter> apiLimiters = new ConcurrentHashMap<>();
public boolean isAllowed(String apiPath) {
RateLimiter limiter = apiLimiters.computeIfAbsent(apiPath,
path -> new SlidingWindowRateLimiter(60000, 1000)); // 每分钟1000次
return limiter.isAllowed();
}
}熔断机制(Circuit Breaker)
什么是熔断
熔断机制是一种保护性措施,当某个服务出现故障或响应时间过长时,暂时停止向该服务发送请求,避免故障扩散。
熔断器状态
熔断器通常有三种状态:
- 关闭状态(Closed):正常转发请求到后端服务
- 打开状态(Open):拒绝所有请求,直接返回错误
- 半开状态(Half-Open):允许少量请求通过,测试服务是否恢复
Hystrix 熔断器实现
// 示例:Hystrix 熔断器实现
@Component
public class ServiceCircuitBreaker {
private final CircuitBreaker circuitBreaker;
public ServiceCircuitBreaker() {
this.circuitBreaker = CircuitBreaker.ofDefaults("service-circuit-breaker");
}
public <T> T executeWithCircuitBreaker(Supplier<T> serviceCall, Supplier<T> fallback) {
return circuitBreaker.executeSupplier(() -> {
try {
return serviceCall.get();
} catch (Exception ex) {
throw new CallNotPermittedException(circuitBreaker, ex);
}
}).recover(throwable -> fallback.get());
}
}Resilience4j 熔断器实现
// 示例:Resilience4j 熔断器实现
@Component
public class Resilience4jCircuitBreaker {
private final CircuitBreaker circuitBreaker;
private final Retry retry;
public Resilience4jCircuitBreaker() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(10))
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED)
.slidingWindowSize(10)
.build();
this.circuitBreaker = CircuitBreaker.of("service-circuit-breaker", config);
RetryConfig retryConfig = RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofSeconds(1))
.build();
this.retry = Retry.of("service-retry", retryConfig);
}
public <T> T executeWithCircuitBreaker(Supplier<T> serviceCall, Supplier<T> fallback) {
Supplier<T> decoratedSupplier = CircuitBreaker
.decorateSupplier(circuitBreaker, serviceCall);
decoratedSupplier = Retry
.decorateSupplier(retry, decoratedSupplier);
return Try.ofSupplier(decoratedSupplier)
.recover(throwable -> fallback.get());
}
}熔断策略配置
# 示例:熔断策略配置
resilience4j:
circuitbreaker:
instances:
user-service:
failureRateThreshold: 50
waitDurationInOpenState: 10000
permittedNumberOfCallsInHalfOpenState: 3
slidingWindowSize: 10
slidingWindowType: TIME_BASED
retry:
instances:
user-service:
maxAttempts: 3
waitDuration: 1000降级机制(Degradation)
什么是降级
降级是一种在系统压力过大或部分功能不可用时,暂时关闭非核心功能或提供简化服务的策略,以保证核心功能的正常运行。
降级策略
功能降级
关闭非核心功能:
// 示例:功能降级实现
@Component
public class FeatureDegradationManager {
private final AtomicBoolean featureEnabled = new AtomicBoolean(true);
private final Map<String, Boolean> featureFlags = new ConcurrentHashMap<>();
public void enableFeature(String featureName) {
featureFlags.put(featureName, true);
}
public void disableFeature(String featureName) {
featureFlags.put(featureName, false);
}
public boolean isFeatureEnabled(String featureName) {
return featureFlags.getOrDefault(featureName, true);
}
public <T> T executeWithFallback(Supplier<T> primary, Supplier<T> fallback) {
if (featureEnabled.get()) {
try {
return primary.get();
} catch (Exception ex) {
// 记录异常日志
log.warn("Primary execution failed, falling back", ex);
}
}
return fallback.get();
}
}数据降级
返回缓存数据或简化数据:
// 示例:数据降级实现
@Component
public class DataDegradationService {
private final Cache<String, Object> cache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
public <T> T getDataWithDegradation(String key,
Supplier<T> primarySource,
Supplier<T> fallbackSource) {
try {
T data = primarySource.get();
// 缓存数据
cache.put(key, data);
return data;
} catch (Exception ex) {
// 从缓存获取数据
T cachedData = (T) cache.getIfPresent(key);
if (cachedData != null) {
return cachedData;
}
// 使用降级数据源
return fallbackSource.get();
}
}
}服务降级
临时关闭某些服务:
// 示例:服务降级实现
@Component
public class ServiceDegradationManager {
private final Map<String, Boolean> serviceStatus = new ConcurrentHashMap<>();
public void enableService(String serviceName) {
serviceStatus.put(serviceName, true);
}
public void disableService(String serviceName) {
serviceStatus.put(serviceName, false);
}
public boolean isServiceAvailable(String serviceName) {
return serviceStatus.getOrDefault(serviceName, true);
}
public <T> Mono<T> callServiceWithDegradation(String serviceName,
Supplier<Mono<T>> serviceCall,
Supplier<Mono<T>> fallback) {
if (isServiceAvailable(serviceName)) {
return serviceCall.get()
.onErrorResume(ex -> {
log.warn("Service call failed, falling back", ex);
return fallback.get();
});
} else {
return fallback.get();
}
}
}综合防护策略
流量控制过滤器
// 示例:综合流量控制过滤器
@Component
public class TrafficControlFilter implements GlobalFilter {
private final RateLimiter globalRateLimiter;
private final CircuitBreaker circuitBreaker;
private final FeatureDegradationManager degradationManager;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String path = exchange.getRequest().getPath().value();
String userId = getUserId(exchange);
// 全局限流检查
if (!globalRateLimiter.isAllowed()) {
return handleRateLimitExceeded(exchange);
}
// 用户级限流检查
if (!isUserAllowed(userId)) {
return handleRateLimitExceeded(exchange);
}
// API 级限流检查
if (!isApiAllowed(path)) {
return handleRateLimitExceeded(exchange);
}
// 熔断器检查
if (circuitBreaker.getState() == CircuitBreaker.State.OPEN) {
return handleServiceUnavailable(exchange);
}
// 功能降级检查
if (!degradationManager.isFeatureEnabled("advanced-feature")) {
exchange.getAttributes().put("degraded", true);
}
return chain.filter(exchange);
}
private Mono<Void> handleRateLimitExceeded(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return response.writeWith(Mono.just(response.bufferFactory()
.wrap("Rate limit exceeded".getBytes())));
}
private Mono<Void> handleServiceUnavailable(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.SERVICE_UNAVAILABLE);
return response.writeWith(Mono.just(response.bufferFactory()
.wrap("Service temporarily unavailable".getBytes())));
}
}配置管理
# 示例:流量控制配置
traffic-control:
rate-limiting:
global:
enabled: true
max-requests: 1000
window-size: 60000
per-user:
enabled: true
max-requests: 100
window-size: 60000
per-api:
enabled: true
rules:
- path: /api/users/**
max-requests: 200
window-size: 60000
- path: /api/orders/**
max-requests: 500
window-size: 60000
circuit-breaker:
enabled: true
failure-rate-threshold: 50
wait-duration-in-open-state: 10000
degradation:
enabled: true
features:
advanced-feature: true
premium-feature: false监控与告警
指标收集
// 示例:流量控制指标收集
@Component
public class TrafficControlMetrics {
private final MeterRegistry meterRegistry;
private final Counter rateLimitCounter;
private final Counter circuitBreakerCounter;
public TrafficControlMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.rateLimitCounter = Counter.builder("api.gateway.rate.limit.exceeded")
.description("Number of rate limit exceeded events")
.register(meterRegistry);
this.circuitBreakerCounter = Counter.builder("api.gateway.circuit.breaker.opened")
.description("Number of circuit breaker opened events")
.register(meterRegistry);
}
public void recordRateLimitExceeded() {
rateLimitCounter.increment();
}
public void recordCircuitBreakerOpened() {
circuitBreakerCounter.increment();
}
}告警配置
# 示例:告警配置
management:
endpoint:
metrics:
enabled: true
endpoints:
web:
exposure:
include: metrics,health
metrics:
export:
prometheus:
enabled: true最佳实践
策略设计原则
- 分层防护:实现多层限流、熔断和降级策略
- 动态调整:根据系统负载动态调整限流阈值
- 快速失败:在系统过载时快速失败,避免资源耗尽
- 优雅降级:提供合理的降级方案,保证核心功能可用
性能优化
- 异步处理:使用异步方式处理限流和熔断逻辑
- 缓存利用:合理利用缓存减少重复计算
- 批量操作:对批量请求进行优化处理
- 资源隔离:对不同服务进行资源隔离
故障恢复
- 自动恢复:实现自动故障检测和恢复机制
- 手动干预:提供手动干预和配置调整能力
- 状态监控:实时监控系统状态和防护机制运行情况
- 日志记录:详细记录防护机制的触发和执行情况
总结
限流、熔断和降级机制是构建高可用微服务系统的重要防护措施。通过合理设计和实现这些机制,API 网关能够有效控制系统流量,防止级联故障,确保系统在高并发和异常情况下的稳定运行。
在实际应用中,需要根据具体的业务需求和技术架构选择合适的防护策略,并持续优化配置参数以达到最佳的防护效果。同时,完善的监控和告警机制也是确保防护机制有效运行的重要保障。
在后续章节中,我们将继续探讨 API 网关的其他核心功能,帮助读者全面掌握这一关键技术组件。
