前言
大家好,我是老马。
分布式系统中,一致性算法是最重要的基石,也是最难学习的部分。
这里是从零开始实现 raft 系列。
核心能力
Raft 为了算法的可理解性,将算法分成了 4 个部分。
leader 选举
日志复制
成员变更
日志压缩
简单说明
同 zk 一样,leader 都是必须的,所有的写操作都是由 leader 发起,从而保证数据流向足够简单。
而 leader 的选举则通过比较每个节点的逻辑时间(term)大小,以及日志下标(index)的大小。
刚刚说 leader 选举涉及日志下标,那么就要讲日志复制。
日志复制可以说是 Raft 核心的核心,说简单点,Raft 就是为了保证多节点之间日志的一致。
当日志一致,我们可以认为整个系统的状态是一致的。这个日志你可以理解成 mysql 的 binlog。
Raft 通过各种补丁,保证了日志复制的正确性。
Raft leader 节点会将客户端的请求都封装成日志,发送到各个 follower 中,如果集群中超过一半的 follower 回复成功,那么这个日志就可以被提交(commit),这个 commit 可以理解为 ACID 的 D ,即持久化。当日志被持久化到磁盘,后面的事情就好办了。
而第三点则是为了节点的扩展性。
第四点是为了性能。
相比较 leader 选举和 日志复制,不是那么的重要,可以说,如果没有成员变更和日志压缩,也可以搞出一个可用的 Raft 分布式系统,但没有 leader 选举和日志复制,是万万不能的。
因此,本文和本项目将重点放在 leader 选举和日志复制。
接口定义
概览
Consensus, 一致性模块接口
LogManager 日志管理模块接口
StateMachine, 状态机接口
RpcServer & RpcClient, RPC 接口
Node,同时,为了聚合上面的几个接口,我们需要定义一个 Node 接口,即节点,Raft 抽象的机器节点。
LifeCycle, 最后,我们需要管理以上组件的生命周期,因此需要一个 LifeCycle 接口。
Consensus 一致性模块接口
请求投票 & 附加日志。
也就是我们的 Raft 节点的核心功能,leader 选举和 日志复制。
实现这两个接口是 Raft 的关键所在。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50package com.github.houbb.raft.server.core;
import com.github.houbb.raft.common.entity.req.AppendLogRequest;
import com.github.houbb.raft.common.entity.req.VoteRequest;
import com.github.houbb.raft.common.entity.resp.AppendLogResponse;
import com.github.houbb.raft.common.entity.resp.VoteResponse;
/**
* 一致性模块接口
*
* 1. leader 选举
* 2. 日志复制。
*
* 实现这两个接口是 Raft 的关键所在。
*
* @since 1.0.0
*/
public interface Consensus {
/**
* 请求投票 RPC
*
* 接收者实现:
*
* 1. 如果term < currentTerm返回 false (5.2 节)
*
* 2. 如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)
*
* @param request 请求
* @return 结果
*/
VoteResponse vote(VoteRequest request);
/**
* 附加日志(多个日志,为了提高效率) RPC
*
* 接收者实现:
*
* 如果 term < currentTerm 就返回 false (5.1 节)
* 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节)
* 如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节)
* 附加任何在已有的日志中不存在的条目
* 如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个
*
* @param request 请求
* @return 结果
*/
AppendLogResponse appendLog(AppendLogRequest request);
}
LogManager 日志管理模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.github.houbb.raft.server.core;
import com.github.houbb.raft.common.entity.req.dto.LogEntry;
/**
* 日志模块
*
* @since 1.0.0
*/
public interface LogManager {
/**
* 写入
* @param logEntry 日志
*/
void write(LogEntry logEntry);
/**
* 读取
* @param index 下标志
* @return 结果
*/
LogEntry read(Long index);
/**
* 从开始位置删除
* @param startIndex 开始位置
*/
void removeOnStartIndex(Long startIndex);
/**
* 获取最新的日志
* @return 日志
*/
LogEntry getLast();
/**
* 获取最新的下标
* @return 结果
*/
Long getLastIndex();
}
分别是写,读,删,最后是两个关于 Last 的接口,在 Raft 中,Last 是一个非常关键的东西,因此我这里单独定义了 2个方法,虽然看起来不是很好看 :)
StateMachine, 状态机接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52package com.github.houbb.raft.server.core;
import com.github.houbb.raft.common.entity.req.dto.LogEntry;
/**
* 状态机接口
*
* 状态机接口,在 Raft 论文中,将数据保存到状态机,作者称之为应用,那么我们也这么命名,
*
* 说白了,就是将已成功提交的日志应用到状态机中:
*
* @since 1.0.0
*/
public interface StateMachine {
/**
* 将数据应用到状态机.
*
* 原则上,只需这一个方法(apply). 其他的方法是为了更方便的使用状态机.
* @param logEntry 日志中的数据.
*/
void apply(LogEntry logEntry);
/**
* 获取信息
* @param key Key
* @return 结果
*/
LogEntry get(String key);
/**
* 获取信息
* @param key Key
* @return 结果
*/
String getString(String key);
/**
* 设置
* @param key Key
* @param value 值
*/
void setString(String key, String value);
/**
* 删除
* @param key Key
*/
void delString(String... key);
}
LifeCycle 生命周期
这里我们简单定义了 2 个,有需要的话,再另外加上组合接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/**
*
* 生命周期
*
* @since 1.0.0
*/
public interface LifeCycle {
/**
* 初始化.
* @throws Throwable 异常
*/
void init() throws Throwable;
/**
* 关闭资源.
* @throws Throwable 异常
*/
void destroy() throws Throwable;
}
rpc 客户端+服务端
RpcClient 和 RPCServer 没什么好讲的,其实就是 send 和 receive。
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* rpc 客户端
* @author houbb
*/
public interface RpcClient extends LifeCycle {
/**
* 发送请求, 并同步等待返回值.
* @param request 参数
* @param <R> 返回值泛型
* @return 结果
*/
<R> R send(RpcRequest request);
<R> R send(RpcRequest request, int timeout);
}
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* rpc 服务端
*
* @author houbb
*/
public interface RpcServer extends LifeCycle {
/**
* 处理请求.
* @param request 请求参数.
* @return 返回值.
*/
RpcResponse<?> handlerRequest(RpcRequest request);
}
Node 接口
然后是 Node 接口,Node 接口也是 Raft 没有定义的,我们依靠自己的理解定义了几个接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66package com.github.houbb.raft.server.core;
import com.github.houbb.raft.common.core.LifeCycle;
import com.github.houbb.raft.common.entity.dto.NodeConfig;
import com.github.houbb.raft.common.entity.req.AppendLogRequest;
import com.github.houbb.raft.common.entity.req.ClientKeyValueRequest;
import com.github.houbb.raft.common.entity.req.VoteRequest;
import com.github.houbb.raft.common.entity.resp.AppendLogResponse;
import com.github.houbb.raft.common.entity.resp.ClientKeyValueResponse;
import com.github.houbb.raft.common.entity.resp.VoteResponse;
/**
* 节点
*
* 首先,一个 Node 肯定需要配置文件,所以有一个 setConfig 接口,
*
* 然后,肯定需要处理“请求投票”和“附加日志”,
*
* 同时,还需要接收用户,也就是客户端的请求(不然数据从哪来?),
*
* 所以有 handlerClientRequest 接口,最后,考虑到灵活性,
*
* 我们让每个节点都可以接收客户端的请求,但 follower 节点并不能处理请求,所以需要重定向到 leader 节点,因此,我们需要一个重定向接口。
*
*/
public interface Node extends LifeCycle {
/**
* 设置配置文件.
*
* @param config 配置
*/
void setConfig(NodeConfig config);
/**
* 处理请求投票 RPC.
*
* @param param 请求
* @return 结果
*/
VoteResponse handlerRequestVote(VoteRequest param);
/**
* 处理附加日志请求.
*
* @param param 请求
* @return v结果
*/
AppendLogResponse handlerAppendEntries(AppendLogRequest param);
/**
* 处理客户端请求.
*
* @param request 请求
* @return 结果
*/
ClientKeyValueResponse handlerClientRequest(ClientKeyValueRequest request);
/**
* 转发给 leader 节点.
* @param request 请求
* @return 结果
*/
ClientKeyValueResponse redirect(ClientKeyValueRequest request);
}