前言

大家好,我是老马。

分布式系统中,一致性算法是最重要的基石,也是最难学习的部分。

这里是从零开始实现 raft 系列。

核心能力

Raft 为了算法的可理解性,将算法分成了 4 个部分。

leader 选举

日志复制

成员变更

日志压缩

Leader 选举的实现

请求者

选举,其实就是一个定时器,根据 Raft 论文描述,如果超时了就需要重新选举,我们使用 Java 的定时任务线程池进行实现,实现之前,需要确定几个点:

  • 选举者必须不是 leader。

  • 必须超时了才能选举,具体超时时间根据你的设计而定,注意,每个节点的超时时间不能相同,应当使用随机算法错开(Raft 关键实现),避免无谓的死锁。

  • 选举者优先选举自己,将自己变成 candidate。

  • 选举的第一步就是把自己的 term 加一。

  • 然后像其他节点发送请求投票 RPC,请求参数参照论文,包括自身的 term,自身的 lastIndex,以及日志的 lastTerm。同时,请求投票 RPC 应该是并行请求的。

  • 等待投票结果应该有超时控制,如果超时了,就不等待了。

  • 最后,如果有超过半数的响应为 success,那么就需要立即变成 leader ,并发送心跳阻止其他选举。

  • 如果失败了,就需要重新选举。注意,这个期间,如果有其他节点发送心跳,也需要立刻变成 follower,否则,将死循环。

接受者

上面说的,其实是 Leader 选举中,请求者的实现,那么接收者如何实现呢?

接收者在收到“请求投票” RPC 后,需要做以下事情:

  • 注意,选举操作应该是串行的,因为涉及到状态修改,并发操作将导致数据错乱。也就是说,如果抢锁失败,应当立即返回错误。

  • 首先判断对方的 term 是否小于自己,如果小于自己,直接返回失败。

  • 如果当前节点没有投票给任何人,或者投的正好是对方,那么就可以比较日志的大小,反之,返回失败。

  • 如果对方日志没有自己大,返回失败。反之,投票给对方,并变成 follower。变成 follower 的同时,异步的选举任务在最后从 condidate 变成 leader 之前,会判断是否是

follower,如果是 follower,就放弃成为 leader。这是一个兜底的措施。

到这里,基本就能够实现 Raft Leader 选举的逻辑。

注意,我们上面涉及到的 LastIndex 等参数,还没有实现,但不影响我们编写伪代码,毕竟日志复制比 leader 选举要复杂的多,我们的原则是从易到难。:)

实现逻辑

请求者

投票任务的核心逻辑:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package com.github.houbb.raft.server.support.vote; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.raft.common.constant.RpcRequestCmdConst; import com.github.houbb.raft.common.constant.enums.NodeStatusEnum; import com.github.houbb.raft.common.entity.req.VoteRequest; import com.github.houbb.raft.common.entity.req.dto.LogEntry; import com.github.houbb.raft.common.entity.resp.VoteResponse; import com.github.houbb.raft.common.rpc.RpcClient; import com.github.houbb.raft.common.rpc.RpcRequest; import com.github.houbb.raft.server.core.LogManager; import com.github.houbb.raft.server.dto.PeerInfoDto; import com.github.houbb.raft.server.dto.node.NodeInfoContext; import com.github.houbb.raft.server.support.peer.PeerManager; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * 投定时调度 * * 1. 在转变成候选人后就立即开始选举过程 * 自增当前的任期号(currentTerm) * 给自己投票 * 重置选举超时计时器 * 发送请求投票的 RPC 给其他所有服务器 * 2. 如果接收到大多数服务器的选票,那么就变成领导人 * 3. 如果接收到来自新的领导人的附加日志 RPC,转变成跟随者 * 4. 如果选举过程超时,再次发起一轮选举 * * @since 1.0.0 */ public class VoteTask implements Runnable { private final Log log = LogFactory.getLog(VoteTask.class); private final NodeInfoContext nodeInfoContext; public VoteTask(NodeInfoContext nodeInfoContext) { this.nodeInfoContext = nodeInfoContext; } @Override public void run() { //1. leader 不参与选举 if(NodeStatusEnum.LEADER.equals(nodeInfoContext.getStatus())) { log.info("[Raft] current status is leader, ignore vote."); return; } //2. 判断两次的时间间隔 boolean isFitElectionTime = isFitElectionTime(); if(!isFitElectionTime) { return; } //3. 开始准备选举 //3.1 状态候选 nodeInfoContext.setStatus(NodeStatusEnum.CANDIDATE); log.info("Node will become CANDIDATE and start election leader, info={}", nodeInfoContext); //3.2 上一次的选票时间 nodeInfoContext.setPreElectionTime(getPreElectionTime()); //3.3 term 自增 nodeInfoContext.setCurrentTerm(nodeInfoContext.getCurrentTerm()+1); //3.4 给自己投票 final PeerManager peerManager = nodeInfoContext.getPeerManager(); final String selfAddress = peerManager.getSelf().getAddress(); nodeInfoContext.setVotedFor(selfAddress); //通知其他除了自己的节点(暂时使用同步,后续应该优化为异步线程池,这里为了简化流程) // TODO: 需要考虑超时的情况 final List<PeerInfoDto> allPeerList = peerManager.getList(); List<VoteResponse> voteResponseList = new ArrayList<>(); for(PeerInfoDto remotePeer : allPeerList) { // 跳过自己 if(remotePeer.getAddress().equals(selfAddress)) { continue; } // 远程投票 try { VoteResponse response = voteSelfToRemote(remotePeer, selfAddress, nodeInfoContext); voteResponseList.add(response); } catch (Exception e) { log.error("voteSelfToRemote meet ex, remotePeer={}", remotePeer, e); } } //3.5 判断选举结果 int voteSuccessTotal = calcVoteSuccessVote(voteResponseList, nodeInfoContext); // 如果投票期间,有其他服务器发送 appendEntry , 就可能变成 follower ,这时,应该停止. if (NodeStatusEnum.FOLLOWER.equals(nodeInfoContext.getStatus())) { log.info("[Raft] 如果投票期间,有其他服务器发送 appendEntry, 就可能变成 follower, 这时,应该停止."); return; } // 是否超过一半?加上自己,等于也行。自己此时没算 if(voteSuccessTotal >= peerManager.getList().size() / 2) { log.warn("[Raft] leader node vote success become leader {}", selfAddress); nodeInfoContext.setStatus(NodeStatusEnum.LEADER); peerManager.setLeader(peerManager.getSelf()); // 投票人信息清空 nodeInfoContext.setVotedFor(""); // 成主之后做的一些事情 afterBeingLeader(nodeInfoContext); } else { // 投票人信息清空 重新选举 nodeInfoContext.setVotedFor(""); log.warn("vote failed, wait next vote"); } // 再次更新选举时间 为什么???? nodeInfoContext.setPreElectionTime(getPreElectionTime()); } /** * 随机 获取上一次的选举时间 * @return 时间 */ private long getPreElectionTime() { return System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(200) + 150; } /** * 初始化所有的 nextIndex 值为自己的最后一条日志的 index + 1. 如果下次 RPC 时, 跟随者和leader 不一致,就会失败. * 那么 leader 尝试递减 nextIndex 并进行重试.最终将达成一致. * @param nodeInfoContext 上下文 */ private void afterBeingLeader(NodeInfoContext nodeInfoContext) { //todo... 这个后续再日志复制部分实现 } /** * 计算投票给自己的数量 * 1. 同时需要更新自己的任期 * @param voteResponseList 结果列表 * @param nodeInfoContext 基本信息 * @return 结果 */ private int calcVoteSuccessVote(List<VoteResponse> voteResponseList, final NodeInfoContext nodeInfoContext) { int sum = 0; for(VoteResponse response : voteResponseList) { if(response == null) { log.error("response is null"); continue; } // 投票给自己 boolean isVoteGranted = response.isVoteGranted(); if (isVoteGranted) { sum++; } else { // 更新自己的任期. long resTerm = response.getTerm(); if (resTerm >= nodeInfoContext.getCurrentTerm()) { nodeInfoContext.setCurrentTerm(resTerm); log.info("[Raft] update current term from vote res={}", response); } } } log.info("calcVoteSuccessVote sum={}", sum); return sum; } private VoteResponse voteSelfToRemote(PeerInfoDto remotePeer, final String selfAddress, final NodeInfoContext nodeInfoContext) { final LogManager logManager = nodeInfoContext.getLogManager(); // 当前最后的 term long lastTerm = 0L; LogEntry last = logManager.getLast(); if (last != null) { lastTerm = last.getTerm(); } VoteRequest param = new VoteRequest(); param.setTerm(nodeInfoContext.getCurrentTerm()); param.setCandidateId(nodeInfoContext.getPeerManager().getSelf().getAddress()); long logIndex = logManager.getLastIndex() == null ? 0 : logManager.getLastIndex(); param.setLastLogIndex(logIndex); param.setLastLogTerm(lastTerm); RpcRequest request = new RpcRequest(); request.setCmd(RpcRequestCmdConst.R_VOTE); request.setObj(param); request.setUrl(remotePeer.getAddress()); // 发送 final RpcClient rpcClient = nodeInfoContext.getRpcClient(); // 请求超时时间,后续可以考虑配置化 VoteResponse voteResponse = rpcClient.send(request, 30); return voteResponse; } /** * 是否满足选举的时间 * * @return 结果 */ private boolean isFitElectionTime() { long electionTime = nodeInfoContext.getElectionTime(); long preElectionTime = nodeInfoContext.getPreElectionTime(); //基于 RAFT 的随机时间,解决冲突. // 这里不会导致这个值越来越大吗??? long randomElectionTime = electionTime + ThreadLocalRandom.current().nextInt(50); nodeInfoContext.setElectionTime(randomElectionTime); long current = System.currentTimeMillis(); if (current - preElectionTime < randomElectionTime) { log.warn("[Raft] current electionTime is not fit, ignore handle"); return false; } return true; } }

接受者

接受者的核心逻辑如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package com.github.houbb.raft.server.core.impl; import com.github.houbb.heaven.util.io.StreamUtil; import com.github.houbb.heaven.util.lang.StringUtil; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.raft.common.constant.enums.NodeStatusEnum; import com.github.houbb.raft.common.entity.req.AppendLogRequest; import com.github.houbb.raft.common.entity.req.VoteRequest; import com.github.houbb.raft.common.entity.resp.AppendLogResponse; import com.github.houbb.raft.common.entity.resp.VoteResponse; import com.github.houbb.raft.server.core.Consensus; import com.github.houbb.raft.server.core.LogManager; import com.github.houbb.raft.server.dto.PeerInfoDto; import com.github.houbb.raft.server.dto.node.NodeInfoContext; import com.github.houbb.raft.server.support.peer.PeerManager; import java.util.concurrent.locks.ReentrantLock; /** * 默认一致性实现 * @since 1.0.0 */ public class DefaultConsensus implements Consensus { private static final Log log = LogFactory.getLog(DefaultConsensus.class); /** * 选举锁 */ private final ReentrantLock voteLock = new ReentrantLock(); /** * node 信息上下文 */ private final NodeInfoContext nodeInfoContext; public DefaultConsensus(NodeInfoContext nodeInfoContext) { this.nodeInfoContext = nodeInfoContext; } /** * 接收者实现: * 主要时先做一个抢占锁的动作,失败,则直接返回。 * * 如果term < currentTerm返回 false (5.2 节) * 如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节) * * @param request 请求 * @return 结果 */ @Override public VoteResponse vote(VoteRequest request) { final long currentTerm = nodeInfoContext.getCurrentTerm(); final String currentVoteFor = nodeInfoContext.getVotedFor(); final PeerManager peerManager = nodeInfoContext.getPeerManager(); final VoteResponse voteResponse = new VoteResponse(); voteResponse.setTerm(currentTerm); voteResponse.setVoteGranted(false); final long reqTerm = request.getTerm(); try { //1. 抢占锁 boolean tryLogFlag = voteLock.tryLock(); if(!tryLogFlag) { log.info("vote for request={} tryLock false", request); return voteResponse; } //2.1 如果term < currentTerm返回 false (5.2 节) if(reqTerm < currentTerm) { log.info("vote for reqTerm={} < currentTerm={}", reqTerm, currentTerm); return voteResponse; } log.info("node {} currentTerm={}. current vote for [{}], paramCandidateId : {}, paramTerm={}", peerManager.getSelf(), currentTerm, currentVoteFor, request.getCandidateId(), request.getTerm() ); //2.2 (当前节点并没有投票 或者 已经投票过了且是对方节点) && 对方日志和自己一样新 boolean isMatchVoteCondition = isMatchVoteCondition(request); if(!isMatchVoteCondition) { return voteResponse; } //2.3 如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节) final LogManager logManager = nodeInfoContext.getLogManager(); // 对方没有自己新 if (logManager.getLast().getTerm() > request.getLastLogTerm()) { log.info("request lastTerm is too old."); return voteResponse; } // 对方没有自己新 if (logManager.getLastIndex() > request.getLastLogIndex()) { log.info("request lastIndex is too old."); return voteResponse; } //3. 满足 nodeInfoContext.setStatus(NodeStatusEnum.FOLLOWER); nodeInfoContext.setCurrentTerm(reqTerm); nodeInfoContext.setVotedFor(request.getServerId()); //serverId 和 candidateId 是一样的,为什么要两个? peerManager.setLeader(new PeerInfoDto(request.getCandidateId())); //4. 返回成功 voteResponse.setTerm(reqTerm); voteResponse.setVoteGranted(true); return voteResponse; } catch (Exception e) { log.error("Vote meet ex, req={}", request, e); return voteResponse; } finally { voteLock.unlock(); } } /** * 满足投票条件 * @param request 请求 * @return 结果 */ private boolean isMatchVoteCondition(VoteRequest request) { final String currentVoteFor = nodeInfoContext.getVotedFor(); if(StringUtil.isEmpty(currentVoteFor) || currentVoteFor.equals(request.getCandidateId())) { return true; } return false; } @Override public AppendLogResponse appendLog(AppendLogRequest request) { // todo... return null; } }

小结

当然,这里只是简单的实现 vote 的初步逻辑,还缺少对于日志复制的逻辑实现。

我们下一节开始考虑日志的处理逻辑。

参考资料