一致性与高可用
2025/9/1大约 5 分钟
在分布式系统中,一致性和高可用性是两个核心挑战。服务注册与配置中心作为微服务架构的关键组件,必须在这两个方面做出合理的权衡和设计。本章将深入探讨Raft/Paxos算法在注册中心中的应用、Leader选举实现以及多副本数据同步等关键技术。
Raft / Paxos 在注册中心中的应用
Raft和Paxos是两种著名的分布式一致性算法,它们在注册中心中发挥着重要作用。
Raft算法
Raft算法通过将复杂的分布式一致性问题分解为三个相对独立的子问题来简化理解和实现:
- Leader选举:选出集群中的领导者节点
- 日志复制:领导者将日志条目复制到其他节点
- 安全性:确保状态机的安全性
Raft在注册中心中的应用
// Raft节点状态
public enum RaftState {
FOLLOWER,
CANDIDATE,
LEADER
}
// Raft节点核心实现
public class RaftNode {
private RaftState state = RaftState.FOLLOWER;
private long currentTerm = 0;
private String votedFor = null;
private List<LogEntry> log = new ArrayList<>();
private long commitIndex = 0;
private long lastApplied = 0;
// 选举超时时间(毫秒)
private long electionTimeout = 150 + new Random().nextInt(150);
private long lastHeartbeatTime = System.currentTimeMillis();
// 处理选举超时
public void handleElectionTimeout() {
if (state == RaftState.FOLLOWER || state == RaftState.CANDIDATE) {
if (System.currentTimeMillis() - lastHeartbeatTime > electionTimeout) {
startElection();
}
}
}
// 开始选举
private void startElection() {
state = RaftState.CANDIDATE;
currentTerm++;
votedFor = nodeId;
votesReceived = 1; // 给自己投票
// 向其他节点发送投票请求
for (String peer : peers) {
sendRequestVote(peer, currentTerm, nodeId, getLastLogIndex(), getLastLogTerm());
}
}
// 处理投票请求
public RequestVoteResponse handleRequestVote(RequestVoteRequest request) {
if (request.getTerm() < currentTerm) {
return new RequestVoteResponse(currentTerm, false);
}
if (request.getTerm() > currentTerm) {
currentTerm = request.getTerm();
state = RaftState.FOLLOWER;
votedFor = null;
}
// 检查是否应该投票给该候选人
if (votedFor == null || votedFor.equals(request.getCandidateId())) {
if (isLogUpToDate(request.getLastLogIndex(), request.getLastLogTerm())) {
votedFor = request.getCandidateId();
lastHeartbeatTime = System.currentTimeMillis();
return new RequestVoteResponse(currentTerm, true);
}
}
return new RequestVoteResponse(currentTerm, false);
}
}
日志复制
// Leader处理客户端请求
public boolean handleClientRequest(String command) {
if (state != RaftState.LEADER) {
return false;
}
// 将命令添加到本地日志
LogEntry entry = new LogEntry(currentTerm, command);
log.add(entry);
// 向其他节点复制日志
replicateLogToFollowers(entry);
return true;
}
// 日志复制到Follower
private void replicateLogToFollowers(LogEntry entry) {
for (String peer : peers) {
// 获取该节点的下一个日志索引
long nextIndex = getNextIndex(peer);
// 发送追加条目请求
AppendEntriesRequest request = new AppendEntriesRequest(
currentTerm,
nodeId,
nextIndex - 1,
getLogTerm(nextIndex - 1),
Collections.singletonList(entry),
commitIndex
);
sendAppendEntries(peer, request);
}
}
Paxos算法
Paxos算法通过多轮提案和接受过程来达成一致性:
// Paxos角色
public enum PaxosRole {
PROPOSER,
ACCEPTOR,
LEARNER
}
// Paxos提案
public class Proposal {
private long proposalNumber;
private String value;
// getters and setters
}
// Paxos Acceptor实现
public class PaxosAcceptor {
private long promisedProposalNumber = -1;
private long acceptedProposalNumber = -1;
private String acceptedValue = null;
// 处理Prepare请求
public Promise handlePrepare(long proposalNumber) {
if (proposalNumber > promisedProposalNumber) {
promisedProposalNumber = proposalNumber;
return new Promise(true, acceptedProposalNumber, acceptedValue);
}
return new Promise(false, -1, null);
}
// 处理Accept请求
public boolean handleAccept(long proposalNumber, String value) {
if (proposalNumber >= promisedProposalNumber) {
acceptedProposalNumber = proposalNumber;
acceptedValue = value;
return true;
}
return false;
}
}
Leader 选举实现
Leader选举是保证系统一致性和可用性的关键机制。
选举触发条件
// 选举触发条件
public class ElectionTrigger {
private static final long ELECTION_TIMEOUT_MIN = 150;
private static final long ELECTION_TIMEOUT_MAX = 300;
private long lastHeartbeatTime;
private long electionTimeout;
private ScheduledExecutorService scheduler;
public ElectionTrigger() {
// 随机化选举超时时间,避免选举冲突
this.electionTimeout = ELECTION_TIMEOUT_MIN +
new Random().nextInt((int)(ELECTION_TIMEOUT_MAX - ELECTION_TIMEOUT_MIN));
this.lastHeartbeatTime = System.currentTimeMillis();
// 启动选举超时检查
startElectionTimeoutChecker();
}
private void startElectionTimeoutChecker() {
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
if (System.currentTimeMillis() - lastHeartbeatTime > electionTimeout) {
triggerElection();
}
}, 0, 10, TimeUnit.MILLISECONDS);
}
private void triggerElection() {
// 发送选举请求
broadcastVoteRequest();
}
}
选举算法实现
// 基于任期和日志的选举算法
public class LeaderElection {
private String nodeId;
private List<String> clusterNodes;
private long currentTerm;
private Map<String, VoteResponse> votes = new ConcurrentHashMap<>();
// 发起选举
public void startElection() {
currentTerm++;
votes.clear();
votes.put(nodeId, new VoteResponse(true, currentTerm));
// 向其他节点发送投票请求
for (String peer : clusterNodes) {
if (!peer.equals(nodeId)) {
VoteRequest request = new VoteRequest(currentTerm, nodeId,
getLastLogIndex(), getLastLogTerm());
sendVoteRequest(peer, request);
}
}
// 启动选举超时检查
checkElectionResult();
}
// 处理投票响应
public void handleVoteResponse(String fromNode, VoteResponse response) {
if (response.getTerm() > currentTerm) {
// 发现更高任期,转换为Follower
currentTerm = response.getTerm();
becomeFollower();
return;
}
if (response.getTerm() == currentTerm && response.isVoteGranted()) {
votes.put(fromNode, response);
// 检查是否获得多数票
if (votes.size() > clusterNodes.size() / 2) {
becomeLeader();
}
}
}
private void becomeLeader() {
System.out.println(nodeId + " becomes leader in term " + currentTerm);
// 开始发送心跳
startHeartbeat();
}
private void becomeFollower() {
System.out.println(nodeId + " becomes follower in term " + currentTerm);
// 停止心跳发送
stopHeartbeat();
}
}
多副本数据同步
多副本数据同步是保证系统高可用性和数据一致性的基础。
数据同步策略
// 数据同步管理器
public class DataReplicationManager {
private List<String> replicas;
private String leaderId;
private ReplicationStrategy strategy;
// 同步策略枚举
public enum ReplicationStrategy {
SYNC, // 同步复制
ASYNC, // 异步复制
SEMI_SYNC // 半同步复制
}
// 同步复制
public boolean syncReplicate(DataEntry entry) {
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
// 向所有副本发送同步请求
for (String replica : replicas) {
if (!replica.equals(leaderId)) {
CompletableFuture<Boolean> future = replicateToReplica(replica, entry);
futures.add(future);
}
}
// 等待所有副本确认
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
try {
allFutures.get(5, TimeUnit.SECONDS); // 5秒超时
// 检查所有副本是否都成功
for (CompletableFuture<Boolean> future : futures) {
if (!future.get()) {
return false;
}
}
return true;
} catch (Exception e) {
return false;
}
}
// 异步复制
public void asyncReplicate(DataEntry entry) {
for (String replica : replicas) {
if (!replica.equals(leaderId)) {
// 异步发送复制请求
replicateToReplicaAsync(replica, entry);
}
}
}
// 半同步复制
public boolean semiSyncReplicate(DataEntry entry) {
int quorum = replicas.size() / 2 + 1;
AtomicInteger ackCount = new AtomicInteger(1); // 包括自己
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (String replica : replicas) {
if (!replica.equals(leaderId)) {
CompletableFuture<Boolean> future = replicateToReplica(replica, entry)
.thenApply(success -> {
if (success) {
ackCount.incrementAndGet();
}
return success;
});
futures.add(future);
}
}
// 等待法定数量的确认
CompletableFuture<Void> quorumFuture = CompletableFuture.anyOf(
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])),
CompletableFuture.runAsync(() -> {
while (ackCount.get() < quorum) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
})
);
try {
quorumFuture.get(3, TimeUnit.SECONDS);
return ackCount.get() >= quorum;
} catch (Exception e) {
return false;
}
}
}
数据一致性保证
// 数据一致性检查器
public class ConsistencyChecker {
// 检查数据一致性
public ConsistencyReport checkConsistency() {
ConsistencyReport report = new ConsistencyReport();
// 获取所有副本的数据摘要
Map<String, DataDigest> digests = new HashMap<>();
for (String replica : replicas) {
DataDigest digest = getDataDigest(replica);
digests.put(replica, digest);
}
// 比较数据摘要
DataDigest referenceDigest = digests.get(leaderId);
for (Map.Entry<String, DataDigest> entry : digests.entrySet()) {
String replica = entry.getKey();
DataDigest digest = entry.getValue();
if (!referenceDigest.equals(digest)) {
report.addInconsistentReplica(replica);
}
}
return report;
}
// 数据恢复
public void recoverInconsistentReplicas(ConsistencyReport report) {
DataSnapshot snapshot = getLatestSnapshot(leaderId);
for (String inconsistentReplica : report.getInconsistentReplicas()) {
// 向不一致的副本发送数据快照
sendSnapshot(inconsistentReplica, snapshot);
}
}
}
总结
一致性和高可用性是注册中心设计的核心挑战:
- Raft/Paxos算法:提供了强一致性的保证,但可能影响性能
- Leader选举:通过合理的选举机制保证系统的可用性
- 多副本同步:通过同步、异步、半同步等策略平衡一致性和性能
- 数据一致性保证:通过数据摘要和快照机制确保数据一致性
在实际应用中,需要根据业务需求在一致性、可用性和性能之间做出合理权衡。对于大多数微服务场景,Raft算法因其易于理解和实现而成为首选方案。