Redis跳表skiplist

817次围观   6个点赞   1人评论

作者头像

zeal

1年前 发表于 技术专栏

Redis跳表skiplist

817次围观   6个点赞   1人评论

作者头像

zeal

1年前 发表于 技术专栏

Redis 跳表 skiplist

跳表(skiplist)是一种有序的数据结构,是在有序链表的基础上发展起来的,可以看作是分层链表。在 Redis 中跳表是有序集合(sort set)的底层实现之一。

为什么会出现跳表?跳表解决了什么样的问题?

跳表可以说是平衡树的一种替代品。它也是为了解决元素随机插入后快速定位的的问题。到这里,你可能会说 hash 表解决的不是很好吗?插入和查找都是 O(1) 的时间复杂度。是的,hash 表是很好的解决了查找的问题,但若想要有序呢?这个时候 hash 表就不行了,二叉查找树可以解决这个问题。

但是由于二叉查找树在按大小顺序进行插入的时候,就会退化为链表。所以又出现了平衡二叉树,而根据算法不同,又分为 AVL 树、B-Tree、B+Tree、红黑树等。看到这里,是不是头都大了(天呐,这么多算法,这么复杂的实现)。

而跳表的出现就是为了解决平衡二叉树复杂的问题。所以它可以说是平衡树的一种替代品。它以一种较为简单的方式实现了平衡二叉树的功能。

跳表的核心思想

跳表的核心思想是 “剪枝”,具体是如下方式实现 如果是一个简单的链表,那么我们知道在链表中查找一个元素 I 的话,需要将整个链表遍历一次。

如果是说链表是排序的,并且节点中还存储了“跳跃”的指向后续节点的指针的话,那么在查找一个节点时,仅仅需要遍历 N/2 个节点即可。

从上图中已经可以看到"层"的出现使得时间复杂度降为原来的一半。

一次典型的跳表查询过程

skiplist 上的查找路径展示

skiplist 正是受这种多层链表的想法的启发而设计出来的。实际上,按照上面生成链表的方式,上面每一层链表的节点个数,是下面一层的节点个数的 1/P(redis 中 P 为 0.25),这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到 O(log n)。

源码分析

核心数据结构

#define ZSKIPLIST_MAXLEVEL 32 //最大层数
#define ZSKIPLIST_P 0.25 //P

typedef struct zskiplistNode {
    robj *obj;
    double score;
    struct zskiplistNode *backward; //后向指针
    struct zskiplistLevel {
        struct zskiplistNode *forward;//每一层中的前向指针
        unsigned int span;//x.level[i].span 表示节点x在第i层到其下一个节点需跳过的节点数。注:两个相邻节点span为1
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;//节点总数
    int level;//总层数
} zskiplis

随机算法

int zslRandomLevel(void) {
    int level = 1;
    // TODO 了解这个公式背后的数学原理
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

使用随机算法,在概率上可以保证上一层的节点数为下一层的1/P。那么 SkipList 可以看成是一棵平衡的 P 叉树,从最顶层开始查找某个节点需要的时间是 O(logpN)

初始化

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;//这点很重要,跳表的层数是动态变化的。
    // 初始化头节点, O(1)
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    // 初始化层指针,O(1)
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

插入实现

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {

    // 记录寻找元素过程中,每层能到达的最右节点
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

    // 记录寻找元素过程中,每层所跨越的节点数
    unsigned int rank[ZSKIPLIST_MAXLEVEL];

    int i, level;

    redisAssert(!isnan(score));
    x = zsl->header;
    // 记录沿途访问的节点,并计数 span 等属性
    // 平均 O(log N) ,最坏 O(N)
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

        // 右节点不为空
        while (x->level[i].forward &&
            // 右节点的 score 比给定 score 小
            (x->level[i].forward->score < score ||
                // 右节点的 score 相同,但节点的 member 比输入 member 要小
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            // 记录跨越了多少个元素
            rank[i] += x->level[i].span;
            // 继续向右前进
            x = x->level[i].forward;
        }
        // 保存访问节点
        update[i] = x;
    }

    /* we assume the key is not already inside, since we allow duplicated
     * scores, and the re-insertion of score and redis object should never
     * happpen since the caller of zslInsert() should test in the hash table
     * if the element is already inside or not. */

    // 因为这个函数不可能处理两个元素的 member 和 score 都相同的情况,
    // 所以直接创建新节点,不用检查存在性

    // 计算新的随机层数
    level = zslRandomLevel();
    // 如果 level 比当前 skiplist 的最大层数还要大
    // 那么更新 zsl->level 参数
    // 并且初始化 update 和 rank 参数在相应的层的数据
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }

    // 创建新节点
    x = zslCreateNode(level,score,obj);
    // 根据 update 和 rank 两个数组的资料,初始化新节点
    // 并设置相应的指针
    // O(N)
    for (i = 0; i < level; i++) {
        // 设置指针
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        // 设置 span
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    // 更新沿途访问节点的 span 值
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    // 设置后退指针
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    // 设置 x 的前进指针
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        // 这个是新的表尾节点
        zsl->tail = x;

    // 更新跳跃表节点数量
    zsl->length++;

    return x;
}

图示

节点19插入为例,其中 黑色箭头的表示的跨度为update[i]->level[i].span 蓝色箭头表示的跨度为rank[0] - rank[i]即节点 19 在level_0update[0]为 11,在level_1update[1]为 7,rank[0] - rank[i]为节点 7 与节点 11 之间的跨度 绿色箭头表示的跨度为节点 19 到节点 37 的 span

节点删除

int zslDelete(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 遍历所有层,记录删除节点后需要被修改的节点到 update 数组
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                compareStringObjects(x->level[i].forward->obj,obj) < 0)))
            x = x->level[i].forward;
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */

    // 因为多个不同的 member 可能有相同的 score
    // 所以要确保 x 的 member 和 score 都匹配时,才进行删除
    x = x->level[0].forward;
    if (x && score == x->score && equalStringObjects(x->obj,obj)) {
        zslDeleteNode(zsl, x, update);
        zslFreeNode(x);
        return 1;
    } else {
        return 0/* not found */
    }
    return 0/* not found */
}
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;

    // 修改相应的指针和 span , O(N)
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }

    // 处理表头和表尾节点
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }

    // 收缩 level 的值, O(N)
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;

    zsl->length--;
}

Q&A

跳表层数上限为啥是 32?

根据前面的随机算法当 level[0]有 2 的 64 次方个节点时,才能达到 32 层,因此层数上限是 32 完全够用了

redis 中为啥不用红黑树二用跳表?

1 内存占用方面跳表比红黑树多,但是多的内存很有限 2 实现比红黑树简单 3 跟红黑树更方便的支持范围查询

评论 (1)
在这里说点什么吧... (取消回复)
留下一个好听的昵称吧!
好听的昵称!
请输入正确的邮箱格式!
不错的邮箱!
评论内容不能为空!
理性发言,和谐讨论!