前言
大家好,我是老马。
分布式系统中,一致性算法是最重要的基石,也是最难学习的部分。
这里是从零开始实现 raft 系列。
核心能力
Raft 为了算法的可理解性,将算法分成了 4 个部分。
leader 选举
日志复制
成员变更
日志压缩
Leader 选举的实现
请求者
选举,其实就是一个定时器,根据 Raft 论文描述,如果超时了就需要重新选举,我们使用 Java 的定时任务线程池进行实现,实现之前,需要确定几个点:
-
选举者必须不是 leader。
-
必须超时了才能选举,具体超时时间根据你的设计而定,注意,每个节点的超时时间不能相同,应当使用随机算法错开(Raft 关键实现),避免无谓的死锁。
-
选举者优先选举自己,将自己变成 candidate。
-
选举的第一步就是把自己的 term 加一。
-
然后像其他节点发送请求投票 RPC,请求参数参照论文,包括自身的 term,自身的 lastIndex,以及日志的 lastTerm。同时,请求投票 RPC 应该是并行请求的。
-
等待投票结果应该有超时控制,如果超时了,就不等待了。
-
最后,如果有超过半数的响应为 success,那么就需要立即变成 leader ,并发送心跳阻止其他选举。
-
如果失败了,就需要重新选举。注意,这个期间,如果有其他节点发送心跳,也需要立刻变成 follower,否则,将死循环。
接受者
上面说的,其实是 Leader 选举中,请求者的实现,那么接收者如何实现呢?
接收者在收到“请求投票” RPC 后,需要做以下事情:
-
注意,选举操作应该是串行的,因为涉及到状态修改,并发操作将导致数据错乱。也就是说,如果抢锁失败,应当立即返回错误。
-
首先判断对方的 term 是否小于自己,如果小于自己,直接返回失败。
-
如果当前节点没有投票给任何人,或者投的正好是对方,那么就可以比较日志的大小,反之,返回失败。
-
如果对方日志没有自己大,返回失败。反之,投票给对方,并变成 follower。变成 follower 的同时,异步的选举任务在最后从 condidate 变成 leader 之前,会判断是否是
follower,如果是 follower,就放弃成为 leader。这是一个兜底的措施。
到这里,基本就能够实现 Raft Leader 选举的逻辑。
注意,我们上面涉及到的 LastIndex 等参数,还没有实现,但不影响我们编写伪代码,毕竟日志复制比 leader 选举要复杂的多,我们的原则是从易到难。:)
实现逻辑
请求者
投票任务的核心逻辑:
package com.github.houbb.raft.server.support.vote;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.raft.common.constant.RpcRequestCmdConst;
import com.github.houbb.raft.common.constant.enums.NodeStatusEnum;
import com.github.houbb.raft.common.entity.req.VoteRequest;
import com.github.houbb.raft.common.entity.req.dto.LogEntry;
import com.github.houbb.raft.common.entity.resp.VoteResponse;
import com.github.houbb.raft.common.rpc.RpcClient;
import com.github.houbb.raft.common.rpc.RpcRequest;
import com.github.houbb.raft.server.core.LogManager;
import com.github.houbb.raft.server.dto.PeerInfoDto;
import com.github.houbb.raft.server.dto.node.NodeInfoContext;
import com.github.houbb.raft.server.support.peer.PeerManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* 投定时调度
*
* 1. 在转变成候选人后就立即开始选举过程
* 自增当前的任期号(currentTerm)
* 给自己投票
* 重置选举超时计时器
* 发送请求投票的 RPC 给其他所有服务器
* 2. 如果接收到大多数服务器的选票,那么就变成领导人
* 3. 如果接收到来自新的领导人的附加日志 RPC,转变成跟随者
* 4. 如果选举过程超时,再次发起一轮选举
*
* @since 1.0.0
*/
public class VoteTask implements Runnable {
private final Log log = LogFactory.getLog(VoteTask.class);
private final NodeInfoContext nodeInfoContext;
public VoteTask(NodeInfoContext nodeInfoContext) {
this.nodeInfoContext = nodeInfoContext;
}
@Override
public void run() {
//1. leader 不参与选举
if(NodeStatusEnum.LEADER.equals(nodeInfoContext.getStatus())) {
log.info("[Raft] current status is leader, ignore vote.");
return;
}
//2. 判断两次的时间间隔
boolean isFitElectionTime = isFitElectionTime();
if(!isFitElectionTime) {
return;
}
//3. 开始准备选举
//3.1 状态候选
nodeInfoContext.setStatus(NodeStatusEnum.CANDIDATE);
log.info("Node will become CANDIDATE and start election leader, info={}", nodeInfoContext);
//3.2 上一次的选票时间
nodeInfoContext.setPreElectionTime(getPreElectionTime());
//3.3 term 自增
nodeInfoContext.setCurrentTerm(nodeInfoContext.getCurrentTerm()+1);
//3.4 给自己投票
final PeerManager peerManager = nodeInfoContext.getPeerManager();
final String selfAddress = peerManager.getSelf().getAddress();
nodeInfoContext.setVotedFor(selfAddress);
//通知其他除了自己的节点(暂时使用同步,后续应该优化为异步线程池,这里为了简化流程)
// TODO: 需要考虑超时的情况
final List<PeerInfoDto> allPeerList = peerManager.getList();
List<VoteResponse> voteResponseList = new ArrayList<>();
for(PeerInfoDto remotePeer : allPeerList) {
// 跳过自己
if(remotePeer.getAddress().equals(selfAddress)) {
continue;
}
// 远程投票
try {
VoteResponse response = voteSelfToRemote(remotePeer, selfAddress, nodeInfoContext);
voteResponseList.add(response);
} catch (Exception e) {
log.error("voteSelfToRemote meet ex, remotePeer={}", remotePeer, e);
}
}
//3.5 判断选举结果
int voteSuccessTotal = calcVoteSuccessVote(voteResponseList, nodeInfoContext);
// 如果投票期间,有其他服务器发送 appendEntry , 就可能变成 follower ,这时,应该停止.
if (NodeStatusEnum.FOLLOWER.equals(nodeInfoContext.getStatus())) {
log.info("[Raft] 如果投票期间,有其他服务器发送 appendEntry, 就可能变成 follower, 这时,应该停止.");
return;
}
// 是否超过一半?加上自己,等于也行。自己此时没算
if(voteSuccessTotal >= peerManager.getList().size() / 2) {
log.warn("[Raft] leader node vote success become leader {}", selfAddress);
nodeInfoContext.setStatus(NodeStatusEnum.LEADER);
peerManager.setLeader(peerManager.getSelf());
// 投票人信息清空
nodeInfoContext.setVotedFor("");
// 成主之后做的一些事情
afterBeingLeader(nodeInfoContext);
} else {
// 投票人信息清空 重新选举
nodeInfoContext.setVotedFor("");
log.warn("vote failed, wait next vote");
}
// 再次更新选举时间 为什么????
nodeInfoContext.setPreElectionTime(getPreElectionTime());
}
/**
* 随机 获取上一次的选举时间
* @return 时间
*/
private long getPreElectionTime() {
return System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(200) + 150;
}
/**
* 初始化所有的 nextIndex 值为自己的最后一条日志的 index + 1. 如果下次 RPC 时, 跟随者和leader 不一致,就会失败.
* 那么 leader 尝试递减 nextIndex 并进行重试.最终将达成一致.
* @param nodeInfoContext 上下文
*/
private void afterBeingLeader(NodeInfoContext nodeInfoContext) {
//todo... 这个后续再日志复制部分实现
}
/**
* 计算投票给自己的数量
* 1. 同时需要更新自己的任期
* @param voteResponseList 结果列表
* @param nodeInfoContext 基本信息
* @return 结果
*/
private int calcVoteSuccessVote(List<VoteResponse> voteResponseList,
final NodeInfoContext nodeInfoContext) {
int sum = 0;
for(VoteResponse response : voteResponseList) {
if(response == null) {
log.error("response is null");
continue;
}
// 投票给自己
boolean isVoteGranted = response.isVoteGranted();
if (isVoteGranted) {
sum++;
} else {
// 更新自己的任期.
long resTerm = response.getTerm();
if (resTerm >= nodeInfoContext.getCurrentTerm()) {
nodeInfoContext.setCurrentTerm(resTerm);
log.info("[Raft] update current term from vote res={}", response);
}
}
}
log.info("calcVoteSuccessVote sum={}", sum);
return sum;
}
private VoteResponse voteSelfToRemote(PeerInfoDto remotePeer,
final String selfAddress,
final NodeInfoContext nodeInfoContext) {
final LogManager logManager = nodeInfoContext.getLogManager();
// 当前最后的 term
long lastTerm = 0L;
LogEntry last = logManager.getLast();
if (last != null) {
lastTerm = last.getTerm();
}
VoteRequest param = new VoteRequest();
param.setTerm(nodeInfoContext.getCurrentTerm());
param.setCandidateId(nodeInfoContext.getPeerManager().getSelf().getAddress());
long logIndex = logManager.getLastIndex() == null ? 0 : logManager.getLastIndex();
param.setLastLogIndex(logIndex);
param.setLastLogTerm(lastTerm);
RpcRequest request = new RpcRequest();
request.setCmd(RpcRequestCmdConst.R_VOTE);
request.setObj(param);
request.setUrl(remotePeer.getAddress());
// 发送
final RpcClient rpcClient = nodeInfoContext.getRpcClient();
// 请求超时时间,后续可以考虑配置化
VoteResponse voteResponse = rpcClient.send(request, 30);
return voteResponse;
}
/**
* 是否满足选举的时间
*
* @return 结果
*/
private boolean isFitElectionTime() {
long electionTime = nodeInfoContext.getElectionTime();
long preElectionTime = nodeInfoContext.getPreElectionTime();
//基于 RAFT 的随机时间,解决冲突.
// 这里不会导致这个值越来越大吗???
long randomElectionTime = electionTime + ThreadLocalRandom.current().nextInt(50);
nodeInfoContext.setElectionTime(randomElectionTime);
long current = System.currentTimeMillis();
if (current - preElectionTime < randomElectionTime) {
log.warn("[Raft] current electionTime is not fit, ignore handle");
return false;
}
return true;
}
}
接受者
接受者的核心逻辑如下:
package com.github.houbb.raft.server.core.impl;
import com.github.houbb.heaven.util.io.StreamUtil;
import com.github.houbb.heaven.util.lang.StringUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.raft.common.constant.enums.NodeStatusEnum;
import com.github.houbb.raft.common.entity.req.AppendLogRequest;
import com.github.houbb.raft.common.entity.req.VoteRequest;
import com.github.houbb.raft.common.entity.resp.AppendLogResponse;
import com.github.houbb.raft.common.entity.resp.VoteResponse;
import com.github.houbb.raft.server.core.Consensus;
import com.github.houbb.raft.server.core.LogManager;
import com.github.houbb.raft.server.dto.PeerInfoDto;
import com.github.houbb.raft.server.dto.node.NodeInfoContext;
import com.github.houbb.raft.server.support.peer.PeerManager;
import java.util.concurrent.locks.ReentrantLock;
/**
* 默认一致性实现
* @since 1.0.0
*/
public class DefaultConsensus implements Consensus {
private static final Log log = LogFactory.getLog(DefaultConsensus.class);
/**
* 选举锁
*/
private final ReentrantLock voteLock = new ReentrantLock();
/**
* node 信息上下文
*/
private final NodeInfoContext nodeInfoContext;
public DefaultConsensus(NodeInfoContext nodeInfoContext) {
this.nodeInfoContext = nodeInfoContext;
}
/**
* 接收者实现:
* 主要时先做一个抢占锁的动作,失败,则直接返回。
*
* 如果term < currentTerm返回 false (5.2 节)
* 如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)
*
* @param request 请求
* @return 结果
*/
@Override
public VoteResponse vote(VoteRequest request) {
final long currentTerm = nodeInfoContext.getCurrentTerm();
final String currentVoteFor = nodeInfoContext.getVotedFor();
final PeerManager peerManager = nodeInfoContext.getPeerManager();
final VoteResponse voteResponse = new VoteResponse();
voteResponse.setTerm(currentTerm);
voteResponse.setVoteGranted(false);
final long reqTerm = request.getTerm();
try {
//1. 抢占锁
boolean tryLogFlag = voteLock.tryLock();
if(!tryLogFlag) {
log.info("vote for request={} tryLock false", request);
return voteResponse;
}
//2.1 如果term < currentTerm返回 false (5.2 节)
if(reqTerm < currentTerm) {
log.info("vote for reqTerm={} < currentTerm={}", reqTerm, currentTerm);
return voteResponse;
}
log.info("node {} currentTerm={}. current vote for [{}], paramCandidateId : {}, paramTerm={}",
peerManager.getSelf(),
currentTerm,
currentVoteFor,
request.getCandidateId(),
request.getTerm()
);
//2.2 (当前节点并没有投票 或者 已经投票过了且是对方节点) && 对方日志和自己一样新
boolean isMatchVoteCondition = isMatchVoteCondition(request);
if(!isMatchVoteCondition) {
return voteResponse;
}
//2.3 如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)
final LogManager logManager = nodeInfoContext.getLogManager();
// 对方没有自己新
if (logManager.getLast().getTerm() > request.getLastLogTerm()) {
log.info("request lastTerm is too old.");
return voteResponse;
}
// 对方没有自己新
if (logManager.getLastIndex() > request.getLastLogIndex()) {
log.info("request lastIndex is too old.");
return voteResponse;
}
//3. 满足
nodeInfoContext.setStatus(NodeStatusEnum.FOLLOWER);
nodeInfoContext.setCurrentTerm(reqTerm);
nodeInfoContext.setVotedFor(request.getServerId()); //serverId 和 candidateId 是一样的,为什么要两个?
peerManager.setLeader(new PeerInfoDto(request.getCandidateId()));
//4. 返回成功
voteResponse.setTerm(reqTerm);
voteResponse.setVoteGranted(true);
return voteResponse;
} catch (Exception e) {
log.error("Vote meet ex, req={}", request, e);
return voteResponse;
} finally {
voteLock.unlock();
}
}
/**
* 满足投票条件
* @param request 请求
* @return 结果
*/
private boolean isMatchVoteCondition(VoteRequest request) {
final String currentVoteFor = nodeInfoContext.getVotedFor();
if(StringUtil.isEmpty(currentVoteFor)
|| currentVoteFor.equals(request.getCandidateId())) {
return true;
}
return false;
}
@Override
public AppendLogResponse appendLog(AppendLogRequest request) {
// todo...
return null;
}
}
小结
当然,这里只是简单的实现 vote 的初步逻辑,还缺少对于日志复制的逻辑实现。
我们下一节开始考虑日志的处理逻辑。