网关作为限流的第一道防线: 全局流量管控
2025/9/7大约 7 分钟
在现代微服务架构中,API网关作为系统的入口,承担着流量管控的重要职责。将分布式限流能力集成到API网关中,可以构建起系统的第一道防线,有效防止恶意流量、突发流量和异常流量对后端服务的冲击。本章将深入探讨如何将限流能力集成到API网关中,实现全局流量管控。
网关限流的重要性
第一道防线的价值
API网关作为所有外部请求的入口,是实施限流策略的理想位置。在网关层实现限流具有以下重要价值:
- 全局控制:可以在请求进入系统之前就进行流量控制,避免无效请求消耗后端资源
- 集中管理:统一管理所有API的限流策略,降低管理复杂度
- 快速响应:在网关层拒绝请求的响应速度更快,用户体验更好
- 资源保护:有效保护后端服务免受流量冲击
限流维度设计
在网关层,我们可以从多个维度对流量进行控制:
// 网关限流维度定义
public class GatewayLimitingDimensions {
// 基于IP的限流
private String clientIP;
// 基于用户的限流
private String userId;
// 基于API路径的限流
private String apiPath;
// 基于请求方法的限流
private String httpMethod;
// 基于API密钥的限流
private String apiKey;
// 基于用户代理的限流
private String userAgent;
// 基于地理位置的限流
private String geoLocation;
// 自定义标签限流
private Map<String, String> customLabels;
// 构造函数、getter和setter方法
// ...
}
Spring Cloud Gateway集成
集成实现
Spring Cloud Gateway是目前主流的微服务网关解决方案之一,我们可以将其与分布式限流平台深度集成。
// Spring Cloud Gateway限流过滤器
@Component
public class RateLimitingGatewayFilter implements GlobalFilter, Ordered {
private final RateLimitingClient rateLimitingClient;
private final RateLimitingConfig rateLimitingConfig;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 1. 提取限流维度
GatewayLimitingDimensions dimensions = extractDimensions(request);
// 2. 构造资源标识
String resource = buildResourceIdentifier(dimensions);
// 3. 执行限流检查
return rateLimitingClient.tryAcquire(resource, 1)
.flatMap(result -> {
if (result.isAllowed()) {
// 允许通过,继续处理请求
return chain.filter(exchange);
} else {
// 拒绝请求,返回429状态码
return handleRateLimitRejection(exchange, result.getReason());
}
});
}
private GatewayLimitingDimensions extractDimensions(ServerHttpRequest request) {
GatewayLimitingDimensions dimensions = new GatewayLimitingDimensions();
// 提取客户端IP
dimensions.setClientIP(getClientIP(request));
// 提取用户ID(从JWT或其他认证信息中)
dimensions.setUserId(extractUserId(request));
// 提取API路径
dimensions.setApiPath(request.getPath().value());
// 提取HTTP方法
dimensions.setHttpMethod(request.getMethod().name());
// 提取API密钥
dimensions.setApiKey(request.getHeaders().getFirst("X-API-Key"));
// 提取用户代理
dimensions.setUserAgent(request.getHeaders().getFirst("User-Agent"));
return dimensions;
}
private String buildResourceIdentifier(GatewayLimitingDimensions dimensions) {
// 根据配置决定使用哪些维度构建资源标识
StringBuilder sb = new StringBuilder();
if (rateLimitingConfig.isLimitByPath()) {
sb.append(dimensions.getApiPath()).append(":");
}
if (rateLimitingConfig.isLimitByMethod()) {
sb.append(dimensions.getHttpMethod()).append(":");
}
if (rateLimitingConfig.isLimitByIP()) {
sb.append(dimensions.getClientIP()).append(":");
}
if (rateLimitingConfig.isLimitByUser()) {
sb.append(dimensions.getUserId()).append(":");
}
if (rateLimitingConfig.isLimitByApiKey()) {
sb.append(dimensions.getApiKey()).append(":");
}
// 移除末尾的冒号
if (sb.length() > 0 && sb.charAt(sb.length() - 1) == ':') {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}
private String getClientIP(ServerHttpRequest request) {
// 优先从X-Forwarded-For头获取
String xForwardedFor = request.getHeaders().getFirst("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0].trim();
}
// 从X-Real-IP头获取
String xRealIP = request.getHeaders().getFirst("X-Real-IP");
if (xRealIP != null && !xRealIP.isEmpty()) {
return xRealIP;
}
// 从远程地址获取
InetSocketAddress remoteAddress = request.getRemoteAddress();
return remoteAddress != null ? remoteAddress.getAddress().getHostAddress() : "unknown";
}
private String extractUserId(ServerHttpRequest request) {
// 从JWT Token中提取用户ID
String authorization = request.getHeaders().getFirst("Authorization");
if (authorization != null && authorization.startsWith("Bearer ")) {
String token = authorization.substring(7);
try {
JWTClaimsSet claims = JWTParser.parse(token).getJWTClaimsSet();
return claims.getStringClaim("userId");
} catch (Exception e) {
log.warn("Failed to parse JWT token", e);
}
}
return "anonymous";
}
private Mono<Void> handleRateLimitRejection(ServerWebExchange exchange, String reason) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
response.getHeaders().add("X-RateLimit-Reason", reason);
response.getHeaders().add("Retry-After", "60"); // 建议重试时间
// 返回错误信息
String errorMessage = "{\"error\":\"Rate limit exceeded\",\"message\":\"" + reason + "\"}";
DataBuffer buffer = response.bufferFactory().wrap(errorMessage.getBytes(StandardCharsets.UTF_8));
return response.writeWith(Mono.just(buffer));
}
@Override
public int getOrder() {
return -1; // 确保在其他过滤器之前执行
}
}
配置管理
# Spring Cloud Gateway限流配置
spring:
cloud:
gateway:
routes:
- id: rate-limited-api
uri: lb://backend-service
predicates:
- Path=/api/**
filters:
- name: RateLimiting
args:
# 限流配置
rules:
- resource: api:/api/users
threshold: 1000
timeUnit: SECONDS
strategy: TOKEN_BUCKET
- resource: api:/api/orders
threshold: 500
timeUnit: SECONDS
strategy: TOKEN_BUCKET
// 动态规则配置
@Component
public class DynamicRateLimitingConfig {
private final ConfigService configService;
private final Map<String, LimitingRule> ruleCache = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// 初始化时加载规则
loadRules();
// 监听配置变化
configService.addListener("gateway-rate-limit-rules", this::onRulesChanged);
}
private void loadRules() {
try {
String rulesJson = configService.getProperty("gateway-rate-limit-rules", "[]");
List<LimitingRule> rules = parseRulesFromJson(rulesJson);
for (LimitingRule rule : rules) {
ruleCache.put(rule.getResource(), rule);
}
} catch (Exception e) {
log.error("Failed to load gateway rate limiting rules", e);
}
}
private void onRulesChanged(String key, String newValue) {
if ("gateway-rate-limit-rules".equals(key)) {
try {
List<LimitingRule> rules = parseRulesFromJson(newValue);
ruleCache.clear();
for (LimitingRule rule : rules) {
ruleCache.put(rule.getResource(), rule);
}
log.info("Gateway rate limiting rules updated, total rules: {}", rules.size());
} catch (Exception e) {
log.error("Failed to parse updated gateway rate limiting rules", e);
}
}
}
public LimitingRule getRule(String resource) {
return ruleCache.get(resource);
}
}
Kong网关集成
插件开发
Kong是一个可扩展的API网关,可以通过开发自定义插件来实现限流功能。
-- Kong限流插件实现 (Lua)
local BasePlugin = require "kong.plugins.base_plugin"
local responses = require "kong.tools.responses"
local rate_limiting_client = require "kong.plugins.rate-limiting.client"
local RateLimitingHandler = BasePlugin:extend()
RateLimitingHandler.PRIORITY = 900
RateLimitingHandler.VERSION = "1.0.0"
function RateLimitingHandler:new()
RateLimitingHandler.super.new(self, "rate-limiting")
end
function RateLimitingHandler:access(conf)
RateLimitingHandler.super.access(self)
-- 提取限流维度
local dimensions = self:extract_dimensions(conf)
-- 构造资源标识
local resource = self:build_resource_identifier(dimensions, conf)
-- 执行限流检查
local allowed, err = rate_limiting_client:try_acquire(resource, 1)
if not allowed then
if err then
kong.log.err("Rate limiting error: ", err)
end
-- 拒绝请求
return responses.send(429, {
message = "API rate limit exceeded"
})
end
end
function RateLimitingHandler:extract_dimensions(conf)
local dimensions = {}
-- 提取客户端IP
if conf.limit_by_ip then
dimensions.client_ip = kong.client.ip()
end
-- 提取消费者ID
if conf.limit_by_consumer then
local consumer = kong.client.get_consumer()
if consumer then
dimensions.consumer_id = consumer.id
end
end
-- 提取凭证
if conf.limit_by_credential then
local credential = kong.client.get_credential()
if credential then
dimensions.credential_id = credential.id
end
end
-- 提取服务和路由
dimensions.service_id = kong.router.get_service().id
dimensions.route_id = kong.router.get_route().id
return dimensions
end
function RateLimitingHandler:build_resource_identifier(dimensions, conf)
local parts = {}
if conf.limit_by_ip and dimensions.client_ip then
table.insert(parts, "ip:" .. dimensions.client_ip)
end
if conf.limit_by_consumer and dimensions.consumer_id then
table.insert(parts, "consumer:" .. dimensions.consumer_id)
end
if conf.limit_by_credential and dimensions.credential_id then
table.insert(parts, "credential:" .. dimensions.credential_id)
end
table.insert(parts, "service:" .. dimensions.service_id)
table.insert(parts, "route:" .. dimensions.route_id)
return table.concat(parts, "|")
end
return RateLimitingHandler
Envoy代理集成
配置示例
Envoy作为Service Mesh中的数据平面,可以通过配置实现限流功能。
# Envoy限流配置示例
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 10000 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route: { cluster: some_service }
# 限流配置
rate_limits:
- stage: 0
actions:
- {source_cluster: {}}
- {destination_cluster: {}}
http_filters:
- name: envoy.filters.http.ratelimit
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
domain: rate_limit_service
rate_limit_service:
grpc_service:
envoy_grpc:
cluster_name: rate_limit_cluster
- name: envoy.filters.http.router
性能优化策略
缓存优化
// 网关限流缓存优化
@Component
public class OptimizedGatewayRateLimiter {
// 使用本地缓存减少网络调用
private final Cache<String, Boolean> allowCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(1, TimeUnit.SECONDS)
.build();
// 批量处理减少网络往返
private final RateLimitingBatchProcessor batchProcessor;
public boolean tryAcquire(String resource) {
// 先检查本地缓存
Boolean cachedResult = allowCache.getIfPresent(resource);
if (cachedResult != null) {
return cachedResult;
}
// 执行限流检查
boolean allowed = doTryAcquire(resource);
// 缓存结果(只缓存允许通过的结果,拒绝的结果不缓存)
if (allowed) {
allowCache.put(resource, allowed);
}
return allowed;
}
private boolean doTryAcquire(String resource) {
// 使用批量处理器优化性能
return batchProcessor.process(resource);
}
}
异步处理
// 异步限流处理
@Component
public class AsyncGatewayRateLimiter {
private final RateLimitingClient rateLimitingClient;
private final ExecutorService executorService;
public CompletableFuture<Boolean> tryAcquireAsync(String resource) {
return CompletableFuture.supplyAsync(() -> {
try {
RateLimitingResult result = rateLimitingClient.tryAcquire(resource, 1);
return result.isAllowed();
} catch (Exception e) {
log.error("Rate limiting failed for resource: {}", resource, e);
// 失败时默认允许通过以保证业务连续性
return true;
}
}, executorService);
}
public Mono<Void> filterWithRateLimiting(ServerWebExchange exchange, GatewayFilterChain chain) {
String resource = extractResource(exchange.getRequest());
return Mono.fromFuture(tryAcquireAsync(resource))
.flatMap(allowed -> {
if (allowed) {
return chain.filter(exchange);
} else {
return handleRateLimitRejection(exchange);
}
});
}
}
监控与告警
指标收集
// 网关限流指标收集
@Component
public class GatewayRateLimitingMetrics {
private final MeterRegistry meterRegistry;
private final Counter totalRequestsCounter;
private final Counter rateLimitedRequestsCounter;
private final Timer rateLimitingTimer;
public GatewayRateLimitingMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.totalRequestsCounter = Counter.builder("gateway.requests.total")
.description("Total number of requests")
.register(meterRegistry);
this.rateLimitedRequestsCounter = Counter.builder("gateway.requests.rate_limited")
.description("Number of rate limited requests")
.register(meterRegistry);
this.rateLimitingTimer = Timer.builder("gateway.rate_limiting.duration")
.description("Rate limiting execution time")
.register(meterRegistry);
}
public void recordRequest(String resource, boolean rateLimited) {
totalRequestsCounter.increment(Tag.of("resource", resource));
if (rateLimited) {
rateLimitedRequestsCounter.increment(Tag.of("resource", resource));
}
}
public void recordRateLimitingDuration(String resource, long durationNanos) {
rateLimitingTimer.record(durationNanos, TimeUnit.NANOSECONDS,
Tag.of("resource", resource));
}
}
通过将分布式限流能力集成到API网关中,我们可以构建起系统的第一道防线,实现全局流量管控。这种集成方式不仅能够有效保护后端服务,还能提供统一的限流管理界面,大大简化了限流策略的实施和维护工作。