手写 Redis 系列

java从零手写实现redis(一)如何实现固定大小的缓存?

java从零手写实现redis(三)redis expire 过期原理

java从零手写实现redis(三)内存数据如何重启不丢失?

java从零手写实现redis(四)添加监听器

java从零手写实现redis(五)过期策略的另一种实现思路

java从零手写实现redis(六)AOF 持久化原理详解及实现

java从零手写实现redis(七)LRU 缓存淘汰策略详解

java从零开始手写redis(八)朴素 LRU 淘汰算法性能优化

java从零开始手写redis(九)LRU 缓存淘汰算法如何避免缓存污染

java从零开始手写redis(十)缓存淘汰算法 LFU 最少使用频次

java从零开始手写redis(十一)缓存淘汰算法 COLOK 算法

java从零开始手写redis(十二)过期策略如何实现随机 keys 淘汰

java从零开始手写redis(十三)redis渐进式rehash详解

java从零开始手写redis(十四)JDK HashMap 源码解析

java从零开始手写redis(十四)JDK ConcurrentHashMap 源码解析

java从零开始手写redis(十五)实现自己的 HashMap

java从零开始手写redis(十六)实现渐进式 rehash map

思维导图

实现渐进式rehash

上节内容回顾

我们在 从零手写缓存框架(15)实现自己的 HashMap 中已经实现了自己的简易版本的 HashMap,下面就让我们为这个简易版的 HashMap 加一点特技,让他变成一个渐进式 HashMap。

duang~

渐进式 rehash

好了,接下来,让我们考虑下,如何实现一个渐进式 rehash 的 Map?

实现流程

让我们在回顾一遍 redis 的渐进式 rehash 过程:

哈希表渐进式 rehash 的详细步骤:

(1)为 ht[1] 分配空间, 让字典同时持有 ht[0] 和 ht[1] 两个哈希表。

(2)在字典中维持一个索引计数器变量 rehashidx , 并将它的值设置为 0 , 表示 rehash 工作正式开始。

(3)在 rehash 进行期间, 每次对字典执行添加、删除、查找或者更新操作时, 程序除了执行指定的操作以外, 还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对 rehash 到 ht[1] , 当 rehash 工作完成之后, 程序将 rehashidx 属性的值增1。

(4)随着字典操作的不断执行, 最终在某个时间点上, ht[0] 的所有键值对都会被 rehash 至 ht[1] , 这时程序将 rehashidx 属性的值设为 -1 , 表示 rehash 操作已完成。

实现方式

我们直接在上面简易版本的基础上进行实现。

实习过程中发现还是有点难度的,代码量也是成倍增长。

本次编写耗费的时间较多,大家一次看不完建议收藏,慢慢学习。

无论面试还是工作,都可以做到知其然,知其所以然。升职加薪,不在话下。

类定义

/**
 * 自己实现的渐进式 rehash map
 *
 * @since 0.0.3
 * @param <K> key 泛型
 * @param <V> value 泛型
 * @see HashMap
 * @author binbin.hou
 */
public class MyProgressiveReHashMap<K,V> extends AbstractMap<K,V> implements Map<K,V> {
}

和简易版本类似。

私有变量

private static final Log log = LogFactory.getLog(MyProgressiveReHashMap.class);

/**
 * rehash 的下标
 *
 * 如果 rehashIndex != -1,说明正在进行 rehash
 * @since 0.0.3
 * @author binbin.hou
 */
private int rehashIndex = -1;

/**
 * 容量
 * 默认为 8
 */
private int capacity;

/**
 * 处于 rehash 状态的容量
 * @since 0.0.3
 */
private int rehashCapacity;

/**
 * 统计大小的信息
 */
private int size = 0;

/**
 * 阈值
 * 阈值=容量*factor
 * 暂时不考虑最大值的问题
 *
 * 当达到这个阈值的时候,直接进行两倍的容量扩充+rehash。
 */
private final double factor = 1.0;

/**
 * 用来存放信息的 table 数组。
 * 数组:数组的下标是一个桶,桶对应的元素 hash 值相同。
 * 桶里放置的是一个链表。
 *
 * 可以理解为 table 是一个 ArrayList
 * arrayList 中每一个元素,都是一个 DoubleLinkedList
 */
private List<List<Entry<K, V>>> table;

/**
 * 渐进式 rehash 时,用来存储元素信息使用。
 *
 * @since 0.0.3
 */
private List<List<Entry<K, V>>> rehashTable;

/**
 * 是否开启 debug 模式
 * @since 0.0.3
 */
private boolean debugMode = false;

rehashIndex/rehashCapacity/rehashTable 这三个值都是我们在进行渐进式实现的时候需要使用的值。

构造器

主要是一些值的初始化。

public MyProgressiveReHashMap() {
    this(8);
}

/**
 * 初始化 hash map
 * @param capacity 初始化容量
 */
public MyProgressiveReHashMap(int capacity) {
    this(capacity, false);
}

/**
 * 初始化 hash map
 * @param capacity 初始化容量
 * @param debugMode 是否开启 debug 模式
 * @since 0.0.3
 * @author binbin.hou
 */
public MyProgressiveReHashMap(int capacity, boolean debugMode) {
    this.capacity = capacity;
    // 初始化最大为容量的个数,如果 hash 的非常完美的话。
    this.table = new ArrayList<>(capacity);
    // 初始化为空列表
    for(int i = 0; i < capacity; i++) {
        this.table.add(i, new ArrayList<Entry<K, V>>());
    }
    this.debugMode = debugMode;
    this.rehashIndex = -1;
    this.rehashCapacity = -1;
    this.rehashTable = null;
}

put() 方法

这个方法相对难度比较大:

put() 的过程可以见方法的注释。

需要考虑是否为 rehash 阶段,还需要考虑是否为更新。

/**
 * put 一个值
 *
 * (1)如果不处于 rehash 阶段
 *
 * 1.1 判断是否为 table 更新,如果是,则进行更新
 * 1.2 如果不是更新,则进行插入
 *
 * 插入的时候可能触发 rehash
 *
 * (2)如果处于 rehash 阶段
 *
 * 2.0 执行一次渐进式 rehash 的动作
 *
 * 2.1 判断是否为更新,需要遍历 table 和 rehashTable
 * 如果是,执行更新
 *
 * 2.2 如果不是,则执行插入
 * 插入到 rehashTable 中
 *
 * @param key 键
 * @param value 值
 * @return 值
 * @author binbin.hou
 */
@Override
public V put(K key, V value) {
    boolean isInRehash = isInReHash();
    if(!isInRehash) {
        //1. 是否为更新
        Pair<Boolean, V> pair = updateTableInfo(key, value, this.table, this.capacity);
        if(pair.getValueOne()) {
            V oldVal = pair.getValueTwo();
            if(debugMode) {
                log.debug("不处于渐进式 rehash,此次为更新操作。key: {}, value: {}", key, value);
                printTable(this.table);
            }
            return oldVal;
        } else {
            // 插入
            return this.createNewEntry(key, value);
        }
    } else {
        //2.0 执行一个附加操作,进行渐进式 rehash 处理
        if(debugMode) {
            log.debug("当前处于渐进式 rehash 阶段,额外执行一次渐进式 rehash 的动作");
        }
        rehashToNew();
        //2.1 是否为 table 更新
        Pair<Boolean, V> pair = updateTableInfo(key, value, this.table, this.capacity);
        if(pair.getValueOne()) {
            V oldVal = pair.getValueTwo();
            if(debugMode) {
                log.debug("此次为更新 table 操作。key: {}, value: {}", key, value);
                printTable(this.table);
            }
            return oldVal;
        }
        //2.2 是否为 rehashTable 更新
        Pair<Boolean, V> pair2 = updateTableInfo(key, value, this.rehashTable, this.rehashCapacity);
        if(pair2.getValueOne()) {
            V oldVal = pair2.getValueTwo();
            if(debugMode) {
                log.debug("此次为更新 rehashTable 操作。key: {}, value: {}", key, value);
                printTable(this.table);
            }
            return oldVal;
        }
        //2.3 插入
        return this.createNewEntry(key, value);
    }
}

是否为 rehash 阶段

这个实现比较简单,就是判断 rehashIndex 是否为 -1:

/**
 * 是否处于 rehash 阶段
 * @return 是否
 * @since 0.0.3
 * @author binbin.hou
 */
private boolean isInReHash() {
    return rehashIndex != -1;
}

更新列表信息

这里为了复用,对方法进行了抽象。可以同时使用到 table 和 rehashTable 中。

/**
 * 是否为更新信息
 * @param key key
 * @param value value
 * @param table table 信息
 * @param tableCapacity table 的容量(使用 size 也可以,因为都默认初始化了。)
 * @return 更新结果
 * @since 0.0.3
 * @author binbin.hou
 */
private Pair<Boolean, V> updateTableInfo(K key, V value, final List<List<Entry<K,V>>> table,
                             final int tableCapacity) {
    // 计算 index 值
    int hash = HashUtil.hash(key);
    int index = HashUtil.indexFor(hash, tableCapacity);
    // 判断是否为替换
    List<Entry<K,V>> entryList = new ArrayList<>();
    if(index < table.size()) {
        entryList = table.get(index);
    }
    // 遍历
    for(Entry<K,V> entry : entryList) {
        // 二者的 key 都为 null,或者二者的 key equals()
        final K entryKey = entry.getKey();
        if(ObjectUtil.isNull(key, entryKey)
                || key.equals(entryKey)) {
            final V oldValue = entry.getValue();
            // 更新新的 value
            entry.setValue(value);
            return Pair.of(true, oldValue);
        }
    }
    return Pair.of(false, null);
}

这个和以前基本是类似的。

返回结果时,为了同时保存是否为更新,以及更新的 value 值。所以使用了 Pair 工具类。

插入新的元素

插入方法也比较麻烦,需要区分是否处于渐进式 rehash 阶段。还要考虑是否需要扩容。

/**
 * 创建一个新的明细
 *
 * (1)如果处于渐进式 rehash 中,则设置到 rehashTable 中
 * (2)如果不是,则判断是否需要扩容
 *
 * 2.1 如果扩容,则直接放到 rehashTable 中。
 * 因为我们每次扩容内存翻倍,一次只处理一个 index 的信息,所以不会直接 rehash 结束,直接放到新的 rehashTable 中即可
 * 2.2 如果不扩容,则放入 table 中
 *
 * @param key key
 * @param value value
 * @since 0.0.3
 * @author binbin.hou
 */
private V createNewEntry(final K key,
                            final V value) {
    Entry<K,V> entry = new DefaultMapEntry<>(key, value);
    // 重新计算 tableIndex
    int hash = HashUtil.hash(key);
    //是否处于 rehash 中?
    if(isInReHash()) {
        int index = HashUtil.indexFor(hash, this.rehashCapacity);
        List<Entry<K,V>> list = this.rehashTable.get(index);
        list.add(entry);
        if(debugMode) {
            log.debug("目前处于 rehash 中,元素直接插入到 rehashTable 中。");
            printTable(this.rehashTable);
        }
    }
    // 是否需要扩容 && 不处于渐进式 rehash
    // rehash 一定是扩容 rehashTable
    // 如果发生了 rehash,元素是直接放到 rehashTable 中的
    if(isNeedExpand()) {
        rehash();
        // 放入到 rehashTable 中
        int index = HashUtil.indexFor(hash, this.rehashCapacity);
        List<Entry<K,V>> list = this.rehashTable.get(index);
        list.add(entry);
        if(debugMode) {
            log.debug("目前处于 rehash 中,元素直接插入到 rehashTable 中。");
            printTable(this.rehashTable);
        }
    } else {
        int index = HashUtil.indexFor(hash, this.capacity);
        List<Entry<K,V>> list = this.table.get(index);
        list.add(entry);
        if(debugMode) {
            log.debug("目前不处于 rehash 中,元素直接插入到 table 中。");
            printTable(this.table);
        }
    }
    this.size++;
    return value;
}

是否需要扩容的方法也比较简单:

/**
 * 是否需要扩容
 *
 * 比例满足,且不处于渐进式 rehash 中
 * @return 是否
 * @since 0.0.3
 * @author binbin.hou
 */
private boolean isNeedExpand() {
    // 验证比例
    double rate = size*1.0 / capacity*1.0;
    return rate >= factor && !isInReHash();
}

不过我们这次添加了一个不要处于渐进式 rehash 过程中。

其中 rehash 的实现也发生了很大的变化,具体实现如下:

/**
 * 直接 rehash 的流程
 *
 * (1)如果处于 rehash 中,直接返回
 * (2)初始化 rehashTable,并且更新 rehashIndex=0;
 * (3)获取 table[0],rehash 到 rehashTable 中
 * (4)设置 table[0] = new ArrayList();
 *
 * @since 0.0.3
 * @author binbin.hou
 */
private void rehash() {
    if(isInReHash()) {
        if(debugMode) {
            log.debug("当前处于渐进式 rehash 阶段,不重复进行 rehash!");
        }
        return;
    }
    // 初始化 rehashTable
    this.rehashIndex = -1;
    this.rehashCapacity = 2*capacity;
    this.rehashTable = new ArrayList<>(this.rehashCapacity);
    for(int i = 0; i < rehashCapacity; i++) {
        rehashTable.add(i, new ArrayList<Entry<K, V>>());
    }

    // 遍历元素第一个元素,其他的进行渐进式更新。
    rehashToNew();
}

渐进式更新的方法,可以在 get/put/remove 等操作时,执行附加操作时使用。

所以单独抽成了一个方法,实现如下:

/**
 * 将信息从旧的 table 迁移到新的 table 中
 *
 * (1)table[rehashIndex] 重新 rehash 到 rehashTable 中
 * (2)设置 table[rehashIndex] = new ArrayList();
 * (3)判断是否完成渐进式 rehash
 */
private void rehashToNew() {
    rehashIndex++;

    List<Entry<K, V>> list = table.get(rehashIndex);
    for(Entry<K, V> entry : list) {
        int hash = HashUtil.hash(entry);
        int index = HashUtil.indexFor(hash, rehashCapacity);
        //  添加元素
        // 获取列表,避免数组越界
        List<Entry<K,V>> newList = rehashTable.get(index);
        // 添加元素到列表
        // 元素不存在重复,所以不需要考虑更新
        newList.add(entry);
        rehashTable.set(index, newList);
    }

    // 清空 index 处的信息
    table.set(rehashIndex, new ArrayList<Entry<K, V>>());

    // 判断大小是否完成 rehash
    // 验证是否已经完成
    if(rehashIndex == (table.size()-1)) {
        this.capacity = this.rehashCapacity;
        this.table = this.rehashTable;
        this.rehashIndex = -1;
        this.rehashCapacity = -1;
        this.rehashTable = null;
        if(debugMode) {
            log.debug("渐进式 rehash 已经完成。");
            printTable(this.table);
        }
    } else {
        if(debugMode) {
            log.debug("渐进式 rehash 处理中, 目前 index:{} 已完成", rehashIndex);
            printAllTable();
        }
    }
}

get() 操作

渐进式 rehash 将动作分散到每一个操作中,我们对 get 方法进行重写,当做一个例子。其他的方法如果实现也是类似的。

/**
 * 查询方法
 * (1)如果处于渐进式 rehash 状态,额外执行一次 rehashToNew()
 * (2)判断 table 中是否存在元素
 * (3)判断 rehashTable 中是否存在元素
 * @param key key
 * @return 结果
 */
@Override
public V get(Object key) {
    if(isInReHash()) {
        if(debugMode) {
            log.debug("当前处于渐进式 rehash 状态,额外执行一次操作");
            rehashToNew();
        }
    }

    //1. 判断 table 中是否存在
    V result = getValue(key, this.table);
    if(result != null) {
        return result;
    }

    //2. 是否处于渐进式 rehash
    if(isInReHash()) {
        return getValue(key, this.rehashTable);
    }
    return null;
}

测试

我们历经千辛万苦,终于实现了一个简单版本的渐进式 hash map。

下面来测试一下功能是否符合我们的预期。

put 操作

Map<String, String> map = new MyProgressiveReHashMap<>(2, true);
map.put("1", "1");
map.put("1", "2");

日志:

[DEBUG] [2020-10-11 21:30:15.072] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。
{1: 1} 
[DEBUG] [2020-10-11 21:30:15.076] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.put] - 不处于渐进式 rehash,此次为更新操作。key: 1, value: 2
{1: 2} 

第一次是插入,第二次是更新。

这里都没有触发扩容,下面我们看一下触发扩容的情况。

扩容测试

Map<String, String> map = new MyProgressiveReHashMap<>(2, true);
map.put("1", "1");
map.put("2", "2");
map.put("3", "3");

Assert.assertEquals("1", map.get("1"));
Assert.assertEquals("2", map.get("2"));
Assert.assertEquals("3", map.get("3"));

日志如下:

[DEBUG] [2020-10-11 21:31:12.559] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。
{1: 1} 
[DEBUG] [2020-10-11 21:31:12.560] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前不处于 rehash 中,元素直接插入到 table 中。
{2: 2} 
{1: 1} 
[DEBUG] [2020-10-11 21:31:12.563] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.rehashToNew] - 渐进式 rehash 处理中, 目前 index:0 已完成
原始 table 信息: 
{1: 1} 
新的 table 信息: 
{2: 2} 
[DEBUG] [2020-10-11 21:31:12.563] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.createNewEntry] - 目前处于 rehash 中,元素直接插入到 rehashTable 中。
{2: 2} 
{3: 3} 
[DEBUG] [2020-10-11 21:31:12.564] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.get] - 当前处于渐进式 rehash 状态,额外执行一次操作
[DEBUG] [2020-10-11 21:31:12.564] [main] [c.g.h.d.s.c.u.m.MyProgressiveReHashMap.rehashToNew] - 渐进式 rehash 已经完成。
{2: 2} 
{1: 1} 
{3: 3} 

当放入元素【3】的时候,已经触发了 rehash。

(1)第一次渐进式 rehash 将 table[0] 的元素 rehash 到了新的节点。

(2)插入的元素直接插入到 rehashTable 中

(3)get 操作时,额外触发一次 rehash,然后所有的 rehash 已经完成。

ps: 写完的那一刻,感觉自己又变强了!

输入图片说明

小结

本节我们自己动手实现了一个简易版本的渐进式 rehash HashMap。

为了实现这个功能,我们做了很多准备工作,HashMap 的源码学习,redis 渐进式 rehash 原理,HashMap 的手写实现。

跨过这几个难关之后,终于实现了一个自己的 rehash HashMap。

本实现是我个人完全独立创造,只参考了 redis 的实现流程,如有雷同,肯定是抄【老马啸西风】的。

本节的内容较长,书写也花费了大量的时间,一切都是值得的。希望你喜欢。

可以先收藏转发一波,然后细细品味。后续将带给大家更多高性能相关的原理和手写框架,感兴趣的伙伴可以关注一波,即使获取最新动态~

开源地址:https://github.com/houbb/cache

觉得本文对你有帮助的话,欢迎点赞评论收藏关注一波。你的鼓励,是我最大的动力~

不知道你有哪些收获呢?或者有其他更多的想法,欢迎留言区和我一起讨论,期待与你的思考相遇。