Redis 跳表 skiplist
跳表(skiplist)是一种有序的数据结构,是在有序链表的基础上发展起来的,可以看作是分层链表。在 Redis 中跳表是有序集合(sort set)的底层实现之一。
为什么会出现跳表?跳表解决了什么样的问题?
跳表可以说是平衡树的一种替代品。它也是为了解决元素随机插入后快速定位的的问题。到这里,你可能会说 hash 表解决的不是很好吗?插入和查找都是 O(1) 的时间复杂度。是的,hash 表是很好的解决了查找的问题,但若想要有序呢?这个时候 hash 表就不行了,二叉查找树可以解决这个问题。
但是由于二叉查找树在按大小顺序进行插入的时候,就会退化为链表。所以又出现了平衡二叉树,而根据算法不同,又分为 AVL 树、B-Tree、B+Tree、红黑树等。看到这里,是不是头都大了(天呐,这么多算法,这么复杂的实现)。
而跳表的出现就是为了解决平衡二叉树复杂的问题。所以它可以说是平衡树的一种替代品。它以一种较为简单的方式实现了平衡二叉树的功能。
跳表的核心思想
跳表的核心思想是 “剪枝”,具体是如下方式实现 如果是一个简单的链表,那么我们知道在链表中查找一个元素 I 的话,需要将整个链表遍历一次。
如果是说链表是排序的,并且节点中还存储了“跳跃”的指向后续节点的指针的话,那么在查找一个节点时,仅仅需要遍历 N/2 个节点即可。
从上图中已经可以看到"层"的出现使得时间复杂度降为原来的一半。
一次典型的跳表查询过程
skiplist 上的查找路径展示
skiplist 正是受这种多层链表的想法的启发而设计出来的。实际上,按照上面生成链表的方式,上面每一层链表的节点个数,是下面一层的节点个数的 1/P(redis 中 P 为 0.25),这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到 O(log n)。
源码分析
核心数据结构
#define ZSKIPLIST_MAXLEVEL 32 //最大层数
#define ZSKIPLIST_P 0.25 //P
typedef struct zskiplistNode {
robj *obj;
double score;
struct zskiplistNode *backward; //后向指针
struct zskiplistLevel {
struct zskiplistNode *forward;//每一层中的前向指针
unsigned int span;//x.level[i].span 表示节点x在第i层到其下一个节点需跳过的节点数。注:两个相邻节点span为1
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;//节点总数
int level;//总层数
} zskiplis
随机算法
int zslRandomLevel(void) {
int level = 1;
// TODO 了解这个公式背后的数学原理
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
使用随机算法,在概率上可以保证上一层的节点数为下一层的1/P。那么 SkipList 可以看成是一棵平衡的 P 叉树,从最顶层开始查找某个节点需要的时间是 O(logpN)
初始化
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;//这点很重要,跳表的层数是动态变化的。
// 初始化头节点, O(1)
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 初始化层指针,O(1)
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
插入实现
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
// 记录寻找元素过程中,每层能到达的最右节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// 记录寻找元素过程中,每层所跨越的节点数
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
x = zsl->header;
// 记录沿途访问的节点,并计数 span 等属性
// 平均 O(log N) ,最坏 O(N)
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 右节点不为空
while (x->level[i].forward &&
// 右节点的 score 比给定 score 小
(x->level[i].forward->score < score ||
// 右节点的 score 相同,但节点的 member 比输入 member 要小
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
// 记录跨越了多少个元素
rank[i] += x->level[i].span;
// 继续向右前进
x = x->level[i].forward;
}
// 保存访问节点
update[i] = x;
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happpen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
// 因为这个函数不可能处理两个元素的 member 和 score 都相同的情况,
// 所以直接创建新节点,不用检查存在性
// 计算新的随机层数
level = zslRandomLevel();
// 如果 level 比当前 skiplist 的最大层数还要大
// 那么更新 zsl->level 参数
// 并且初始化 update 和 rank 参数在相应的层的数据
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 创建新节点
x = zslCreateNode(level,score,obj);
// 根据 update 和 rank 两个数组的资料,初始化新节点
// 并设置相应的指针
// O(N)
for (i = 0; i < level; i++) {
// 设置指针
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
// 设置 span
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
// 更新沿途访问节点的 span 值
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 设置后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
// 设置 x 的前进指针
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
// 这个是新的表尾节点
zsl->tail = x;
// 更新跳跃表节点数量
zsl->length++;
return x;
}
图示
以节点19
插入为例,其中
黑色箭头的表示的跨度为update[i]->level[i].span
蓝色箭头表示的跨度为rank[0] - rank[i]
即节点 19 在level_0
的update[0]
为 11,在level_1
的update[1]
为 7,rank[0] - rank[i]
为节点 7 与节点 11 之间的跨度
绿色箭头表示的跨度为节点 19 到节点 37 的 span
节点删除
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
// 遍历所有层,记录删除节点后需要被修改的节点到 update 数组
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
// 因为多个不同的 member 可能有相同的 score
// 所以要确保 x 的 member 和 score 都匹配时,才进行删除
x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
} else {
return 0; /* not found */
}
return 0; /* not found */
}
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
// 修改相应的指针和 span , O(N)
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
// 处理表头和表尾节点
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
// 收缩 level 的值, O(N)
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
Q&A
跳表层数上限为啥是 32?
根据前面的随机算法当 level[0]有 2 的 64 次方个节点时,才能达到 32 层,因此层数上限是 32 完全够用了
redis 中为啥不用红黑树二用跳表?
1 内存占用方面跳表比红黑树多,但是多的内存很有限 2 实现比红黑树简单 3 跟红黑树更方便的支持范围查询
评论 (1)