前言

大家好,我是老马。

分布式系统中,一致性算法是最重要的基石,也是最难学习的部分。

这里是从零开始实现 raft 系列。

核心能力

Raft 为了算法的可理解性,将算法分成了 4 个部分。

leader 选举

日志复制

成员变更

日志压缩

日志复制的实现

日志复制是 Raft 实现一致性的核心。

日志复制有 2 种形式,1种是心跳,一种是真正的日志,心跳的日志内容是空的,其他部分基本相同,也就是说,接收方在收到日志时,如果发现是空的,那么他就是心跳。

心跳

请求者

既然是心跳,肯定就是个定时任务,和选举一样。

在我们的实现中,我们每 5 秒发送一次心跳。

注意点:

  • 首先自己必须是 leader 才能发送心跳。

  • 必须满足 5 秒的时间间隔。

  • 并发的向其他 follower 节点发送心跳。

  • 心跳参数包括自身的 ID,自身的 term,以便让对方检查 term,防止网络分区导致的脑裂。

  • 如果任意 follower 的返回值的 term 大于自身,说明自己分区了,那么需要变成 follower,并更新自己的 term。然后重新发起选举。

接收者

然后是心跳接收者的实现,这个就比较简单了,接收者需要做几件事情:

1) 无论成功失败首先设置返回值,也就是将自己的 term 返回给 leader。

2) 判断对方的 term 是否大于自身,如果大于自身,变成 follower,防止异步的选举任务误操作。同时更新选举时间和心跳时间。

3) 如果对方 term 小于自身,返回失败。不更新选举时间和心跳时间。以便触发选举。

实现逻辑

主节点

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.github.houbb.raft.server.support.hearbeat; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.raft.common.constant.RpcRequestCmdConst; import com.github.houbb.raft.common.constant.enums.NodeStatusEnum; import com.github.houbb.raft.common.entity.req.AppendLogRequest; import com.github.houbb.raft.common.entity.resp.AppendLogResponse; import com.github.houbb.raft.common.rpc.RpcClient; import com.github.houbb.raft.common.rpc.RpcRequest; import com.github.houbb.raft.server.dto.PeerInfoDto; import com.github.houbb.raft.server.dto.node.NodeInfoContext; import com.github.houbb.raft.server.support.peer.PeerManager; import java.util.List; /** * 心跳调度任务 * @since 1.0.0 */ public class HeartbeatTask implements Runnable { private final Log log = LogFactory.getLog(HeartbeatTask.class); private final NodeInfoContext nodeInfoContext; public HeartbeatTask(NodeInfoContext nodeInfoContext) { this.nodeInfoContext = nodeInfoContext; } /** * - 必须满足 5 秒的时间间隔。 * (其实这个应该调度间隔控制,方法判断感觉比较奇怪,如何二次刚好没达到,会导致下一次时间间隔过长) * * - 并发的向其他 follower 节点发送心跳。 * - 心跳参数包括自身的 ID,自身的 term,以便让对方检查 term,防止网络分区导致的脑裂。 * - 如果任意 follower 的返回值的 term 大于自身,说明自己分区了,那么需要变成 follower,并更新自己的 term。然后重新发起选举。 */ @Override public void run() { log.info(">>>>>>>>>>>>>>> [Heartbeat] task start"); final NodeStatusEnum nodeStatus = nodeInfoContext.getStatus(); if(!NodeStatusEnum.LEADER.equals(nodeStatus)) { log.warn("Only leader node need heartbeat, currentStatus={}", nodeStatus); return; } // 时间间隔控制,个人觉得没必要 // 通知 follower final PeerManager peerManager = nodeInfoContext.getPeerManager(); List<PeerInfoDto> peerInfoList = peerManager.getList(); final PeerInfoDto selfInfo = peerManager.getSelf(); final RpcClient rpcClient = nodeInfoContext.getRpcClient(); final long currentTerm = nodeInfoContext.getCurrentTerm(); for(PeerInfoDto remotePeer : peerInfoList) { // 跳过自己 if(remotePeer.getAddress().equals(selfInfo.getAddress())) { continue; } AppendLogRequest appendLogRequest = new AppendLogRequest(); appendLogRequest.setLeaderId(selfInfo.getAddress()); // 这有什么用? 通知到对方,为什么要设置对方的标识? appendLogRequest.setServerId(remotePeer.getAddress()); appendLogRequest.setTerm(nodeInfoContext.getCurrentTerm()); appendLogRequest.setLeaderCommit(currentTerm); appendLogRequest.setLeaderCommit(nodeInfoContext.getCommitIndex()); RpcRequest request = new RpcRequest(); request.setCmd(RpcRequestCmdConst.R_VOTE); request.setObj(appendLogRequest); request.setUrl(remotePeer.getAddress()); AppendLogResponse appendLogResponse = rpcClient.send(request); // 结果的处理 final long term = appendLogResponse.getTerm(); if (term > currentTerm) { log.error("self will become follower, he's term : {}, my term : {}", term, currentTerm); nodeInfoContext.setCurrentTerm(term); nodeInfoContext.setVotedFor(""); nodeInfoContext.setStatus(NodeStatusEnum.FOLLOWER); } } log.info(">>>>>>>>>>>>>>> [Heartbeat] task end"); } }

接收者

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/** * 添加日志 * * 接收者实现: * 如果 term < currentTerm 就返回 false (5.1 节) * 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节) * 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节) * 附加任何在已有的日志中不存在的条目 * 如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个 * * @param request 请求 * @return 结果 */ @Override public AppendLogResponse appendLog(AppendLogRequest request) { AppendLogResponse appendLogResponse = new AppendLogResponse(); final long currentTerm = nodeInfoContext.getCurrentTerm(); appendLogResponse.setTerm(currentTerm); appendLogResponse.setSuccess(false); final long reqTerm = request.getTerm(); try { //1.1 抢占锁 boolean tryLockFlag = appendLogLock.tryLock(); if(!tryLockFlag) { log.warn("[AppendLog] tryLog false"); return appendLogResponse; } //1.2 是否够格? if(currentTerm > request.getTerm()) { log.warn("[AppendLog] currentTerm={} > reqTerm={}", currentTerm, reqTerm); return appendLogResponse; } //2.1 基本信息更新 为什么这样设置? final PeerManager peerManager = nodeInfoContext.getPeerManager(); final long nowMills = System.currentTimeMillis(); nodeInfoContext.setElectionTime(nowMills); nodeInfoContext.setPreElectionTime(nowMills); nodeInfoContext.setStatus(NodeStatusEnum.FOLLOWER); nodeInfoContext.setCurrentTerm(reqTerm); peerManager.setLeader(new PeerInfoDto(request.getLeaderId())); log.info("[AppendLog] update electionTime={}, status=Follower, term={}, leader={}", nowMills, reqTerm, request.getLeaderId()); //3.1 处理心跳 if(ArrayUtil.isEmpty(request.getEntries())) { handleHeartbeat(request); //3.2 返回响应 appendLogResponse.setTerm(reqTerm); appendLogResponse.setSuccess(true); return appendLogResponse; } return appendLogResponse; } catch (Exception e) { throw new RuntimeException(e); } finally { appendLogLock.unlock(); } } private void handleHeartbeat(AppendLogRequest request) { final long startTime = System.currentTimeMillis(); log.info("handleHeartbeat start req={}", request); final LogManager logManager = nodeInfoContext.getLogManager(); // 处理 leader 已提交但未应用到状态机的日志 // 下一个需要提交的日志的索引(如有) long nextCommit = nodeInfoContext.getCommitIndex() + 1; //如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个 if (request.getLeaderCommit() > nodeInfoContext.getCommitIndex()) { int commitIndex = (int) Math.min(request.getLeaderCommit(), logManager.getLastIndex()); nodeInfoContext.setCommitIndex(commitIndex); nodeInfoContext.setLastApplied(commitIndex); } final StateMachine stateMachine = nodeInfoContext.getStateMachine(); while (nextCommit <= nodeInfoContext.getCommitIndex()){ // 提交之前的日志 // todo: 状态机需要基于 kv 实现 stateMachine.apply(logManager.read(nextCommit)); nextCommit++; } long costTime = System.currentTimeMillis() - startTime; log.info("handleHeartbeat start end, costTime={}", costTime); }

小结

当然,这里只是简单的实现 appendLog 的心跳 的初步逻辑,其他条目的处理逻辑还没有实现。

我们下一节开始考虑日志复制的具体处理逻辑。

参考资料