重试策略与断路器实现:构建弹性微服务调用机制
2025/8/31大约 7 分钟
在微服务架构中,服务间的调用可能会因为网络波动、服务过载、临时故障等原因而失败。合理的重试策略和断路器实现是确保系统稳定性和可用性的关键机制。本文将深入探讨重试策略的设计原则和断路器的具体实现方式。
重试策略设计原则
重试机制是处理瞬时故障的有效手段,但不当的重试策略可能会加重系统负担甚至引发雪崩效应。因此,设计合理的重试策略至关重要。
重试触发条件
并非所有的异常都应该触发重试,需要区分可重试异常和不可重试异常:
// 可重试异常定义
public class RetryableException extends Exception {
public RetryableException(String message) {
super(message);
}
public RetryableException(String message, Throwable cause) {
super(message, cause);
}
}
// 不可重试异常定义
public class NonRetryableException extends Exception {
public NonRetryableException(String message) {
super(message);
}
public NonRetryableException(String message, Throwable cause) {
super(message, cause);
}
}重试策略分类
1. 固定间隔重试
适用于故障恢复时间相对固定的场景:
// 固定间隔重试实现
public class FixedIntervalRetry {
private final int maxAttempts;
private final long intervalMillis;
public FixedIntervalRetry(int maxAttempts, long intervalMillis) {
this.maxAttempts = maxAttempts;
this.intervalMillis = intervalMillis;
}
public <T> T execute(Supplier<T> operation) throws Exception {
Exception lastException = null;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return operation.get();
} catch (Exception e) {
lastException = e;
// 检查是否为可重试异常
if (!isRetryable(e)) {
throw e;
}
// 最后一次尝试,直接抛出异常
if (attempt == maxAttempts) {
throw e;
}
// 等待固定间隔
Thread.sleep(intervalMillis);
}
}
throw lastException;
}
private boolean isRetryable(Exception e) {
// 定义可重试的异常类型
return e instanceof RetryableException ||
e instanceof SocketTimeoutException ||
e instanceof ConnectException;
}
}2. 指数退避重试
适用于避免对故障服务造成进一步压力的场景:
// 指数退避重试实现
public class ExponentialBackoffRetry {
private final int maxAttempts;
private final long initialIntervalMillis;
private final double multiplier;
private final long maxIntervalMillis;
public ExponentialBackoffRetry(int maxAttempts, long initialIntervalMillis,
double multiplier, long maxIntervalMillis) {
this.maxAttempts = maxAttempts;
this.initialIntervalMillis = initialIntervalMillis;
this.multiplier = multiplier;
this.maxIntervalMillis = maxIntervalMillis;
}
public <T> T execute(Supplier<T> operation) throws Exception {
Exception lastException = null;
long currentInterval = initialIntervalMillis;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return operation.get();
} catch (Exception e) {
lastException = e;
// 检查是否为可重试异常
if (!isRetryable(e)) {
throw e;
}
// 最后一次尝试,直接抛出异常
if (attempt == maxAttempts) {
throw e;
}
// 等待指数退避间隔
Thread.sleep(Math.min(currentInterval, maxIntervalMillis));
// 计算下一次的间隔时间
currentInterval = (long) (currentInterval * multiplier);
}
}
throw lastException;
}
private boolean isRetryable(Exception e) {
return e instanceof RetryableException ||
e instanceof SocketTimeoutException ||
e instanceof ConnectException ||
e instanceof SocketException;
}
}3. 随机化退避重试
适用于避免多个客户端同时重试造成冲击的场景:
// 随机化退避重试实现
public class RandomizedBackoffRetry {
private final int maxAttempts;
private final long initialIntervalMillis;
private final double multiplier;
private final double randomness;
private final long maxIntervalMillis;
public RandomizedBackoffRetry(int maxAttempts, long initialIntervalMillis,
double multiplier, double randomness, long maxIntervalMillis) {
this.maxAttempts = maxAttempts;
this.initialIntervalMillis = initialIntervalMillis;
this.multiplier = multiplier;
this.randomness = randomness;
this.maxIntervalMillis = maxIntervalMillis;
}
public <T> T execute(Supplier<T> operation) throws Exception {
Exception lastException = null;
long currentInterval = initialIntervalMillis;
Random random = new Random();
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return operation.get();
} catch (Exception e) {
lastException = e;
// 检查是否为可重试异常
if (!isRetryable(e)) {
throw e;
}
// 最后一次尝试,直接抛出异常
if (attempt == maxAttempts) {
throw e;
}
// 计算带随机抖动的等待时间
long jitter = (long) (currentInterval * randomness * (random.nextDouble() - 0.5) * 2);
long waitTime = Math.min(currentInterval + jitter, maxIntervalMillis);
waitTime = Math.max(waitTime, 0); // 确保不为负数
Thread.sleep(waitTime);
// 计算下一次的间隔时间
currentInterval = (long) (currentInterval * multiplier);
}
}
throw lastException;
}
private boolean isRetryable(Exception e) {
return e instanceof RetryableException ||
e instanceof SocketTimeoutException ||
e instanceof ConnectException ||
e instanceof SocketException ||
e instanceof UnknownHostException;
}
}Resilience4j断路器实现
Resilience4j是新一代的容错库,专为函数式编程设计,提供了完善的断路器功能。
基础断路器配置
// Resilience4j断路器配置
@Configuration
public class CircuitBreakerConfig {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
// 创建自定义配置
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 失败率阈值50%
.slowCallRateThreshold(100) // 慢调用率阈值100%
.slowCallDurationThreshold(Duration.ofSeconds(2)) // 慢调用时间阈值2秒
.waitDurationInOpenState(Duration.ofSeconds(30)) // 打开状态持续时间30秒
.permittedNumberOfCallsInHalfOpenState(10) // 半开状态允许的调用数
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.TIME_BASED)
.slidingWindowSize(100) // 滑动窗口大小
.minimumNumberOfCalls(10) // 最小调用数
.automaticTransitionFromOpenToHalfOpenEnabled(true) // 自动从打开状态转换到半开状态
.recordException(throwable ->
!(throwable instanceof BusinessException)) // 不记录业务异常
.build();
// 创建注册中心
CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
// 添加事件消费者
registry.getAllCircuitBreakers().forEach(circuitBreaker -> {
circuitBreaker.getEventPublisher()
.onStateTransition(event ->
log.info("Circuit breaker {} state changed from {} to {}",
event.getCircuitBreakerName(),
event.getStateTransition().getFromState(),
event.getStateTransition().getToState()));
});
return registry;
}
}断路器使用示例
// 使用Resilience4j断路器
@Service
public class UserService {
private final CircuitBreaker circuitBreaker;
private final UserServiceClient userServiceClient;
public UserService(CircuitBreakerRegistry circuitBreakerRegistry,
UserServiceClient userServiceClient) {
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("userService");
this.userServiceClient = userServiceClient;
}
public User getUserById(Long userId) {
// 创建装饰后的Supplier
Supplier<User> decoratedSupplier = CircuitBreaker
.decorateSupplier(circuitBreaker, () -> userServiceClient.getUser(userId));
// 使用Try处理可能的异常
return Try.ofSupplier(decoratedSupplier)
.recover(throwable -> {
log.warn("Failed to get user {}, using fallback", userId, throwable);
return new User(userId, "Fallback User", "fallback@example.com");
})
.get();
}
public List<User> getUsers() {
// 创建装饰后的Supplier
Supplier<List<User>> decoratedSupplier = CircuitBreaker
.decorateSupplier(circuitBreaker, userServiceClient::getAllUsers);
// 使用Try处理可能的异常
return Try.ofSupplier(decoratedSupplier)
.recover(throwable -> {
log.warn("Failed to get users, using fallback", throwable);
return Collections.emptyList();
})
.get();
}
}断路器与重试结合使用
// 断路器与重试结合使用
@Service
public class OrderService {
private final CircuitBreaker circuitBreaker;
private final Retry retry;
private final OrderServiceClient orderServiceClient;
public OrderService(CircuitBreakerRegistry circuitBreakerRegistry,
RetryRegistry retryRegistry,
OrderServiceClient orderServiceClient) {
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("orderService");
this.retry = retryRegistry.retry("orderService");
this.orderServiceClient = orderServiceClient;
}
public Order createOrder(OrderRequest request) {
// 创建重试装饰后的Supplier
Supplier<Order> retryableSupplier = Retry
.decorateSupplier(retry, () -> orderServiceClient.createOrder(request));
// 创建断路器装饰后的Supplier
Supplier<Order> circuitBreakerSupplier = CircuitBreaker
.decorateSupplier(circuitBreaker, retryableSupplier);
// 使用Try处理可能的异常
return Try.ofSupplier(circuitBreakerSupplier)
.recover(throwable -> {
log.warn("Failed to create order, using fallback", throwable);
return new Order("FALLBACK-" + System.currentTimeMillis(),
request.getUserId(), Collections.emptyList());
})
.get();
}
}Hystrix断路器实现
虽然Hystrix已停止维护,但在一些遗留系统中仍在使用。
Hystrix命令实现
// Hystrix命令实现
@Component
public class GetUserCommand extends HystrixCommand<User> {
private final UserServiceClient userServiceClient;
private final Long userId;
public GetUserCommand(UserServiceClient userServiceClient, Long userId) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UserGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("GetUser"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserThreadPool"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(20) // 请求量阈值
.withCircuitBreakerSleepWindowInMilliseconds(5000) // 睡眠窗口
.withCircuitBreakerErrorThresholdPercentage(50) // 错误百分比阈值
.withExecutionTimeoutInMilliseconds(3000)) // 执行超时时间
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(10) // 核心线程数
.withMaximumSize(20) // 最大线程数
.withMaxQueueSize(100))); // 最大队列大小
this.userServiceClient = userServiceClient;
this.userId = userId;
}
@Override
protected User run() throws Exception {
// 正常的服务调用
return userServiceClient.getUser(userId);
}
@Override
protected User getFallback() {
// 降级处理
log.warn("Fallback for user id: {}", userId);
return new User(userId, "Fallback User", "fallback@example.com");
}
}Hystrix配置优化
# Hystrix配置优化
hystrix:
command:
default:
execution:
timeout:
enabled: true
isolation:
thread:
timeoutInMilliseconds: 3000
circuitBreaker:
enabled: true
requestVolumeThreshold: 20
sleepWindowInMilliseconds: 5000
errorThresholdPercentage: 50
forceOpen: false
forceClosed: false
threadpool:
default:
coreSize: 10
maximumSize: 20
maxQueueSize: -1
queueSizeRejectionThreshold: 5
keepAliveTimeMinutes: 1自定义断路器实现
在某些特殊场景下,可能需要自定义断路器实现:
// 自定义断路器实现
public class CustomCircuitBreaker {
public enum State {
CLOSED, OPEN, HALF_OPEN
}
private volatile State state = State.CLOSED;
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicInteger successCount = new AtomicInteger(0);
private volatile long lastFailureTime = 0;
private final CircuitBreakerConfig config;
public CustomCircuitBreaker(CircuitBreakerConfig config) {
this.config = config;
}
public <T> T execute(Supplier<T> operation) throws Exception {
// 检查状态转换
checkStateTransition();
switch (state) {
case CLOSED:
return executeInClosedState(operation);
case OPEN:
throw new CircuitBreakerOpenException("Circuit breaker is open");
case HALF_OPEN:
return executeInHalfOpenState(operation);
default:
throw new IllegalStateException("Unknown state: " + state);
}
}
private void checkStateTransition() {
if (state == State.OPEN &&
System.currentTimeMillis() - lastFailureTime > config.getWaitDurationInOpenState()) {
state = State.HALF_OPEN;
successCount.set(0);
}
}
private <T> T executeInClosedState(Supplier<T> operation) throws Exception {
try {
T result = operation.get();
onSuccess();
return result;
} catch (Exception e) {
onFailure();
throw e;
}
}
private <T> T executeInHalfOpenState(Supplier<T> operation) throws Exception {
try {
T result = operation.get();
onHalfOpenSuccess();
return result;
} catch (Exception e) {
onHalfOpenFailure();
throw e;
}
}
private void onSuccess() {
failureCount.set(0);
}
private void onFailure() {
int failures = failureCount.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
if (failures >= config.getFailureThreshold()) {
state = State.OPEN;
}
}
private void onHalfOpenSuccess() {
int successes = successCount.incrementAndGet();
if (successes >= config.getPermittedNumberOfCallsInHalfOpenState()) {
state = State.CLOSED;
failureCount.set(0);
}
}
private void onHalfOpenFailure() {
state = State.OPEN;
lastFailureTime = System.currentTimeMillis();
}
public State getState() {
return state;
}
public int getFailureCount() {
return failureCount.get();
}
// 配置类
public static class CircuitBreakerConfig {
private int failureThreshold = 5;
private long waitDurationInOpenState = 60000; // 60秒
private int permittedNumberOfCallsInHalfOpenState = 3;
// getters and setters
public int getFailureThreshold() {
return failureThreshold;
}
public long getWaitDurationInOpenState() {
return waitDurationInOpenState;
}
public int getPermittedNumberOfCallsInHalfOpenState() {
return permittedNumberOfCallsInHalfOpenState;
}
}
}总结
重试策略和断路器实现是构建弹性微服务调用机制的核心组件。通过合理设计重试策略,我们可以有效处理瞬时故障;通过断路器机制,我们可以防止故障级联传播,保护系统稳定性。
在实际应用中,我们需要根据业务特点选择合适的重试策略,如固定间隔重试、指数退避重试或随机化退避重试。同时,使用成熟的断路器实现如Resilience4j或Hystrix,可以大大简化容错机制的实现。
需要注意的是,重试和断路器机制不是万能的,它们只是容错体系中的一部分。我们还需要结合监控告警、日志分析、故障演练等手段,构建完整的微服务容错体系,确保系统在各种异常情况下都能稳定运行。
