高性能本地缓存+低频同步: 降低Redis压力,保证最终一致性
2025/9/7大约 8 分钟
在分布式限流系统中,频繁访问Redis等共享存储会导致性能瓶颈和网络开销。为了提高系统的响应速度和降低Redis压力,本地缓存成为必不可少的优化手段。然而,本地缓存的引入也带来了数据一致性的问题。本章将深入探讨如何通过高性能本地缓存与低频同步机制,在保证最终一致性的同时显著提升系统性能。
本地缓存的设计原则
缓存策略选择
在分布式限流场景中,我们需要根据业务特点选择合适的缓存策略:
- 读多写少:限流场景下,读操作(限流判断)远多于写操作(计数更新)
- 时效性要求:限流数据具有一定的时效性,过期数据可以被清除
- 一致性要求:允许一定程度的不一致,但需要保证最终一致性
缓存数据结构设计
// 限流缓存条目
public class RateLimitCacheEntry {
private final String resource;
private volatile long count;
private volatile long lastUpdateTime;
private final long expireTime;
private final RateLimitRule rule;
public RateLimitCacheEntry(String resource, RateLimitRule rule) {
this.resource = resource;
this.count = 0;
this.lastUpdateTime = System.currentTimeMillis();
this.expireTime = rule.getWindowMillis();
this.rule = rule;
}
public boolean isExpired() {
return System.currentTimeMillis() - lastUpdateTime > expireTime;
}
public boolean tryAcquire(int permits) {
if (isExpired()) {
reset();
}
long newCount = count + permits;
if (newCount <= rule.getLimit()) {
count = newCount;
lastUpdateTime = System.currentTimeMillis();
return true;
}
return false;
}
private void reset() {
count = 0;
lastUpdateTime = System.currentTimeMillis();
}
// getter和setter方法
public String getResource() { return resource; }
public long getCount() { return count; }
public long getLastUpdateTime() { return lastUpdateTime; }
public RateLimitRule getRule() { return rule; }
}
高性能本地缓存实现
基于Caffeine的缓存实现
// 高性能本地缓存实现
@Component
public class HighPerformanceLocalCache {
private final Cache<String, RateLimitCacheEntry> cache;
private final RateLimitRuleManager ruleManager;
private final RedisCounter redisCounter;
public HighPerformanceLocalCache(RateLimitRuleManager ruleManager,
RedisCounter redisCounter) {
this.ruleManager = ruleManager;
this.redisCounter = redisCounter;
// 配置Caffeine缓存
this.cache = Caffeine.newBuilder()
.maximumSize(100000) // 最大缓存条目数
.expireAfterWrite(10, TimeUnit.SECONDS) // 写入后10秒过期
.expireAfterAccess(5, TimeUnit.SECONDS) // 访问后5秒过期
.refreshAfterWrite(5, TimeUnit.SECONDS) // 写入后5秒刷新
.recordStats() // 记录缓存统计信息
.build(this::loadFromRedis);
}
public boolean tryAcquire(String resource, int permits) {
try {
// 从缓存中获取条目
RateLimitCacheEntry entry = cache.get(resource, this::createCacheEntry);
// 在本地尝试获取令牌
if (entry.tryAcquire(permits)) {
return true;
}
// 本地获取失败,需要同步到Redis进行精确判断
return syncToRedisAndRetry(resource, permits);
} catch (Exception e) {
log.error("Local cache acquire failed for resource: {}", resource, e);
// 失败时直接访问Redis
return redisCounter.tryAcquire(resource, permits);
}
}
private RateLimitCacheEntry createCacheEntry(String resource) {
RateLimitRule rule = ruleManager.getRule(resource);
return new RateLimitCacheEntry(resource, rule);
}
private RateLimitCacheEntry loadFromRedis(String resource) {
// 从Redis加载最新的计数信息
long count = redisCounter.getCount(resource);
RateLimitRule rule = ruleManager.getRule(resource);
RateLimitCacheEntry entry = new RateLimitCacheEntry(resource, rule);
// 使用反射或包访问权限更新计数(实际实现中需要考虑线程安全)
updateEntryCount(entry, count);
return entry;
}
private boolean syncToRedisAndRetry(String resource, int permits) {
// 同步本地缓存到Redis
syncToRedis(resource);
// 重新从Redis获取最新状态并尝试
return redisCounter.tryAcquire(resource, permits);
}
private void syncToRedis(String resource) {
RateLimitCacheEntry entry = cache.getIfPresent(resource);
if (entry != null) {
redisCounter.updateCount(resource, entry.getCount());
}
}
// 获取缓存统计信息
public CacheStats getCacheStats() {
return cache.stats();
}
}
多级缓存架构
// 多级缓存架构实现
@Component
public class MultiLevelCache {
// L1缓存:线程本地缓存,最低延迟
private final ThreadLocal<Map<String, RateLimitCacheEntry>> threadLocalCache =
ThreadLocal.withInitial(HashMap::new);
// L2缓存:进程内共享缓存
private final Cache<String, RateLimitCacheEntry> processCache;
// L3缓存:分布式缓存(Redis)
private final RedisCounter redisCounter;
public MultiLevelCache(RedisCounter redisCounter) {
this.redisCounter = redisCounter;
this.processCache = Caffeine.newBuilder()
.maximumSize(50000)
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();
}
public boolean tryAcquire(String resource, int permits) {
// 1. 首先检查线程本地缓存
Map<String, RateLimitCacheEntry> localCache = threadLocalCache.get();
RateLimitCacheEntry localEntry = localCache.get(resource);
if (localEntry != null && !localEntry.isExpired()) {
if (localEntry.tryAcquire(permits)) {
return true;
}
}
// 2. 检查进程内缓存
RateLimitCacheEntry processEntry = processCache.getIfPresent(resource);
if (processEntry != null && !processEntry.isExpired()) {
if (processEntry.tryAcquire(permits)) {
// 更新线程本地缓存
localCache.put(resource, processEntry);
return true;
}
}
// 3. 访问Redis
boolean allowed = redisCounter.tryAcquire(resource, permits);
if (allowed) {
// 更新各级缓存
updateCaches(resource, permits);
}
return allowed;
}
private void updateCaches(String resource, int permits) {
// 更新进程内缓存
processCache.asMap().computeIfPresent(resource, (key, entry) -> {
// 这里简化处理,实际需要更复杂的同步逻辑
return entry;
});
// 更新线程本地缓存
Map<String, RateLimitCacheEntry> localCache = threadLocalCache.get();
RateLimitCacheEntry localEntry = localCache.get(resource);
if (localEntry != null) {
// 更新本地条目
}
}
// 定期清理线程本地缓存
@Scheduled(fixedRate = 1000)
public void cleanupThreadLocalCache() {
threadLocalCache.get().clear();
}
}
低频同步机制
同步策略设计
为了保证数据的最终一致性,我们需要设计合理的同步策略:
// 低频同步机制实现
@Component
public class LowFrequencySyncManager {
private final Cache<String, RateLimitCacheEntry> localCache;
private final RedisCounter redisCounter;
private final ScheduledExecutorService syncScheduler;
private final AtomicInteger syncCounter = new AtomicInteger(0);
// 记录需要同步的资源
private final Set<String> pendingSyncResources = ConcurrentHashMap.newKeySet();
public LowFrequencySyncManager(Cache<String, RateLimitCacheEntry> localCache,
RedisCounter redisCounter) {
this.localCache = localCache;
this.redisCounter = redisCounter;
this.syncScheduler = Executors.newScheduledThreadPool(5);
// 启动定期同步任务
startPeriodicSync();
// 启动批量同步任务
startBatchSync();
}
public void markForSync(String resource) {
pendingSyncResources.add(resource);
}
private void startPeriodicSync() {
// 每5秒执行一次定期同步
syncScheduler.scheduleAtFixedRate(this::periodicSync, 5, 5, TimeUnit.SECONDS);
}
private void startBatchSync() {
// 每100毫秒执行一次批量同步
syncScheduler.scheduleAtFixedRate(this::batchSync, 100, 100, TimeUnit.MILLISECONDS);
}
private void periodicSync() {
try {
// 同步所有缓存条目
CacheStats stats = localCache.stats();
long totalRequests = stats.requestCount();
long hitCount = stats.hitCount();
// 如果缓存命中率较高,则进行同步
if (totalRequests > 0 && (double) hitCount / totalRequests > 0.7) {
syncAllEntries();
}
} catch (Exception e) {
log.error("Periodic sync failed", e);
}
}
private void batchSync() {
try {
if (pendingSyncResources.size() > 10) {
// 批量同步待处理的资源
syncPendingResources();
}
} catch (Exception e) {
log.error("Batch sync failed", e);
}
}
private void syncAllEntries() {
// 获取所有缓存条目并同步到Redis
ConcurrentMap<String, RateLimitCacheEntry> cacheMap = localCache.asMap();
// 分批处理,避免一次性处理过多数据
List<Map.Entry<String, RateLimitCacheEntry>> entries =
new ArrayList<>(cacheMap.entrySet());
for (int i = 0; i < entries.size(); i += 100) {
int end = Math.min(i + 100, entries.size());
List<Map.Entry<String, RateLimitCacheEntry>> batch =
entries.subList(i, end);
syncBatch(batch);
}
}
private void syncPendingResources() {
// 获取待同步的资源列表
List<String> resourcesToSync = new ArrayList<>(pendingSyncResources);
pendingSyncResources.clear();
// 分批同步
for (int i = 0; i < resourcesToSync.size(); i += 50) {
int end = Math.min(i + 50, resourcesToSync.size());
List<String> batch = resourcesToSync.subList(i, end);
syncResourceBatch(batch);
}
}
private void syncBatch(List<Map.Entry<String, RateLimitCacheEntry>> batch) {
// 使用Redis Pipeline进行批量同步
redisCounter.batchUpdate(batch.stream()
.collect(Collectors.toMap(
entry -> entry.getKey(),
entry -> entry.getValue().getCount()
)));
}
private void syncResourceBatch(List<String> resources) {
// 批量获取Redis中的最新计数
Map<String, Long> redisCounts = redisCounter.batchGet(resources);
// 更新本地缓存
for (String resource : resources) {
Long redisCount = redisCounts.get(resource);
if (redisCount != null) {
RateLimitCacheEntry entry = localCache.getIfPresent(resource);
if (entry != null) {
// 更新本地计数(需要考虑线程安全)
updateEntryCount(entry, redisCount);
}
}
}
}
}
增量同步优化
// 增量同步优化
@Component
public class IncrementalSyncManager {
private final Cache<String, RateLimitCacheEntry> localCache;
private final RedisCounter redisCounter;
private final Map<String, Long> lastSyncVersions = new ConcurrentHashMap<>();
public void incrementalSync(String resource) {
RateLimitCacheEntry entry = localCache.getIfPresent(resource);
if (entry == null) return;
Long lastSyncVersion = lastSyncVersions.get(resource);
long currentVersion = entry.getLastUpdateTime();
// 只有在数据发生变化时才进行同步
if (lastSyncVersion == null || currentVersion > lastSyncVersion) {
// 执行增量同步
redisCounter.increment(resource, entry.getCount() -
(lastSyncVersion != null ? lastSyncVersion : 0));
// 更新同步版本
lastSyncVersions.put(resource, currentVersion);
}
}
// 异步增量同步
public CompletableFuture<Void> asyncIncrementalSync(String resource) {
return CompletableFuture.runAsync(() -> incrementalSync(resource));
}
}
一致性保证机制
最终一致性实现
// 最终一致性保证机制
@Component
public class EventualConsistencyManager {
private final Cache<String, RateLimitCacheEntry> localCache;
private final RedisCounter redisCounter;
private final EventBus eventBus;
public EventualConsistencyManager(Cache<String, RateLimitCacheEntry> localCache,
RedisCounter redisCounter,
EventBus eventBus) {
this.localCache = localCache;
this.redisCounter = redisCounter;
this.eventBus = eventBus;
// 注册事件监听器
eventBus.register(this);
}
// 处理Redis数据变更事件
@Subscribe
public void handleRedisDataChangeEvent(RedisDataChangeEvent event) {
String resource = event.getResource();
long newCount = event.getNewCount();
// 更新本地缓存
RateLimitCacheEntry entry = localCache.getIfPresent(resource);
if (entry != null) {
updateEntryCount(entry, newCount);
}
}
// 处理缓存失效事件
@Subscribe
public void handleCacheInvalidateEvent(CacheInvalidateEvent event) {
String resource = event.getResource();
// 从缓存中移除条目,下次访问时重新加载
localCache.invalidate(resource);
}
// 主动发布数据变更事件
public void publishDataChangeEvent(String resource, long newCount) {
eventBus.post(new RedisDataChangeEvent(resource, newCount));
}
}
数据冲突解决
// 数据冲突解决机制
@Component
public class ConflictResolutionManager {
public long resolveConflict(String resource, long localCount, long remoteCount) {
// 策略1:取最大值(适用于计数场景)
return Math.max(localCount, remoteCount);
// 策略2:取平均值(适用于需要平滑过渡的场景)
// return (localCount + remoteCount) / 2;
// 策略3:基于时间戳的冲突解决
// return resolveByTimestamp(resource, localCount, remoteCount);
}
private long resolveByTimestamp(String resource, long localCount, long remoteCount) {
// 获取本地和远程数据的时间戳
long localTimestamp = getLocalTimestamp(resource);
long remoteTimestamp = getRemoteTimestamp(resource);
// 选择时间戳更新的数据
return localTimestamp > remoteTimestamp ? localCount : remoteCount;
}
private long getLocalTimestamp(String resource) {
// 获取本地缓存的时间戳
return System.currentTimeMillis();
}
private long getRemoteTimestamp(String resource) {
// 获取Redis中数据的时间戳
return System.currentTimeMillis();
}
}
性能监控与调优
缓存性能监控
// 缓存性能监控
@Component
public class CachePerformanceMonitor {
private final MeterRegistry meterRegistry;
private final Cache<String, RateLimitCacheEntry> localCache;
private final Timer cacheHitTimer;
private final Timer cacheMissTimer;
private final Counter cacheSyncCounter;
public CachePerformanceMonitor(MeterRegistry meterRegistry,
Cache<String, RateLimitCacheEntry> localCache) {
this.meterRegistry = meterRegistry;
this.localCache = localCache;
this.cacheHitTimer = Timer.builder("cache.hit.duration")
.description("Cache hit duration")
.register(meterRegistry);
this.cacheMissTimer = Timer.builder("cache.miss.duration")
.description("Cache miss duration")
.register(meterRegistry);
this.cacheSyncCounter = Counter.builder("cache.sync.count")
.description("Cache sync count")
.register(meterRegistry);
// 注册缓存统计信息
Gauge.builder("cache.hit.ratio")
.description("Cache hit ratio")
.register(meterRegistry, localCache, cache -> {
CacheStats stats = cache.stats();
long requestCount = stats.requestCount();
return requestCount > 0 ? (double) stats.hitCount() / requestCount : 0.0;
});
}
public <T> T monitorCacheHit(Supplier<T> operation) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
T result = operation.get();
sample.stop(cacheHitTimer);
return result;
} catch (Exception e) {
sample.stop(cacheMissTimer);
throw e;
}
}
public void recordCacheSync() {
cacheSyncCounter.increment();
}
}
自适应缓存调优
// 自适应缓存调优
@Component
public class AdaptiveCacheOptimizer {
private final Cache<String, RateLimitCacheEntry> localCache;
private final CachePerformanceMonitor performanceMonitor;
private volatile CacheConfig currentConfig;
@Scheduled(fixedRate = 30000) // 每30秒检查一次
public void optimize() {
// 收集性能指标
CacheStats stats = localCache.stats();
double hitRatio = stats.requestCount() > 0 ?
(double) stats.hitCount() / stats.requestCount() : 0.0;
// 根据命中率调整缓存配置
if (hitRatio < 0.8) {
// 命中率较低,增加缓存大小
increaseCacheSize();
} else if (hitRatio > 0.95) {
// 命中率很高,可以适当减少缓存大小以节省内存
decreaseCacheSize();
}
// 根据同步频率调整同步策略
adjustSyncFrequency();
}
private void increaseCacheSize() {
currentConfig.setMaxSize(currentConfig.getMaxSize() * 2);
log.info("Increased cache size to {}", currentConfig.getMaxSize());
}
private void decreaseCacheSize() {
currentConfig.setMaxSize(Math.max(1000, currentConfig.getMaxSize() / 2));
log.info("Decreased cache size to {}", currentConfig.getMaxSize());
}
private void adjustSyncFrequency() {
// 根据系统负载调整同步频率
// 实现细节省略
}
}
通过以上实现,我们构建了一个高性能的本地缓存系统,结合低频同步机制,在显著提升系统性能的同时保证了数据的最终一致性。这种设计在分布式限流场景中能够有效降低Redis压力,提高系统的响应速度和可扩展性。