我们再来学习如何从跳跃表中查询数据,跳跃表本质上是一个链表,但它允许我们像数组一样定位某个索引区间内的节点,并且与数组不同的是,跳跃表允许我们将头节点L0层的前驱节点(即跳跃表分值最小的节点)zsl->header.level[0].forward当成索引0的节点,尾节点zsl->tail(跳跃表分值最大的节点)当成索引zsl->length-1的节点,索引按分值从小到大递增查询;也允许我们将尾节点当成索引0的节点,头节点L0层的前驱节点当做索引zsl->length-1的节点,索引按分值从大到小递增查询。当我们调用下面的方法按照索引区间来查询时,会把我们的索引转换成跨度,然后查找落在跨度的第一个节点,之后根据reverse(逆向状态)决定是要正向查询还是逆向查询。

假设我们要进行正向查询(即:索引按分值从小到大递增查询),给定的索引区间是[0,2],那么我们要找到跨度为1的节点,然后从跨度为1的节点L0层逐个递进,直到停留在跨度为3的节点,头节点L0层的前驱节点zsl->header.level[0].forward在跳跃表的索引为0,跨度为1,在跨度为1的节点从L0层逐个递进,一直递进到跨度为3的节点,这样便完成了索引区间[0,2]的查询。如果我们要进行逆向查询(即:索引按分值从大到小递增查询),索引区间依旧是[0,2],那么我们要找到跨度为跨度为zsl->length-0=zsl->length的节点,那自然是尾节点,找到跨度区间的第一个节点后,我们通过backward指针逐个后退,一直后退到跨度为zsl->length-2的节点,如此便完成查询。

void zrangeGenericCommand(client *c, int reverse) {
    robj *key = c->argv[1];
    robj *zobj;
    int withscores = 0;
    long start;
    long end;
    long llen;
    long rangelen;

    //读取起始索引和终止索引,如果存在一个索引读取失败,则退出
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK))
        return;
    //判断是否要返回分值
    if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr, "withscores")) {
        withscores = 1;
    } else if (c->argc >= 5) {
        addReply(c, shared.syntaxerr);
        return;
    }
    //判断key是否存在,如果不存在则退出,如果存在但类型不为ZSET也退出。
    if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL
        || checkType(c, zobj, OBJ_ZSET))
        return;

    /*
     * Sanitize indexes.
     * 审查索引,这里主要针对传入索引为负数的情况,大家都知道,如果一个
     * 跳跃表的节点个数为N,我们要从起始节点查询到末尾节点,可以用[0,N-1]
     * 或者[0,-1],当传入的end<0时,这里会重新规正end的索引,llen为zset的长度,
     * 因此查询[0,-1],这里会规正为[0,N-1]。同理,start也会被规正,如果我们查询
     * [-5,-3],即代表查询有序集合倒数第5个节点至倒数第三个节点,前提是N>=5这个
     * 查询才有意义。如果我们的起始索引传入的是一个绝对值>N的负数,那么llen + start的
     * 结果也为负数,如果判断start<0,则start会被规正为0。
     * */
    llen = zsetLength(zobj);
    if (start < 0) start = llen + start;
    if (end < 0) end = llen + end;
    if (start < 0) start = 0;

    /*
     * Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length.
     * 如果起始索引大于终止索引,或者起始索引大于等于有序集合节点数量,则直接
     * 返回空数组。
     * */
    if (start > end || start >= llen) {
        addReply(c, shared.emptyarray);
        return;
    }
    //如果判断终止索引大于等于节点数,则规整为llen-1
    if (end >= llen) end = llen - 1;
    //计算要返回的节点数
    rangelen = (end - start) + 1;
	//……
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		//压缩列表逻辑……
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        sds ele;

        /*
         * Check if starting point is trivial, before doing log(N) lookup.
         * 这里会根据给定的开始索引,查找该索引对应的节点,并将ln指向该节点。
         * 需要注意的一点是:平常我们都认为header.level[0].forward指向的节点,在跳跃表
         * 中的索引为0,但有时候跳跃表的末尾节点zsl->tail的索引值也有可能为0,这里就要提到
         * 逆向查询。
         * 当我们使用ZRANGE key min max [WITHSCORES]命令查询时,成员的位置是按照其分值
         * 从小到大来排序,这时候header.level[0].forward的索引值为0,
         * header.level[0].forward.level[0].forward的索引值为1。而zls->tail的索引值
         * 为zls->length-1。
         * 当我们使用ZREVRANGE key start stop [WITHSCORES]命令查询时,成员的位置是按照
         * 其分值从大到小来排序,这时候zls->tail的索引值为0,header.level[0].forward的
         * 索引值为zls->length-1,header.level[0].forward.level[0].forward的索引值为
         * zls->length-2。当reverse为1时,本次查询即为逆向查询。
         * 我们注意到不管是if还是else分值,只要start>0,最终都会执行zslGetElementByRank()
         * 将ln定位到起始节点。当start为0时,如果是逆向查询,则索引0的位置是尾节点zsl->tail,
         * 如果是正向查询,索引0的位置则是zsl->header->level[0].forward。
         * 那么(llen - start)和(start + 1)又代表什么含义呢?为什么zslGetElementByRank()
         * 可以根据这两个公式的计算结果,定位到索引对应的节点呢?其实这两个公式计算的是跨度,而
         * zslGetElementByRank()则是根据给定的跳跃表和跨度查找节点而已。
         * 如果是正常查询,假设起始索引为0,则跨度为start(0)+1=1,刚好为头节点L0层到达第一个节点的
         * 跨度为1;如果起始索引为1,则跨度为start(1)+1=2,刚好是头节点到达索引值为1的节点的跨度。
         * 如果是逆向查询,索引值为0代表尾节点,而llen-start(0)=llen为头节点到达尾节点的跨度;同理,
         * 倒数第二个节点的索引值为1,头节点到达倒数第二个节点的跨度为llen-start(1)=llen-1。
         * */
        if (reverse) {
            ln = zsl->tail;
            if (start > 0)
                ln = zslGetElementByRank(zsl, llen - start);
        } else {
            ln = zsl->header->level[0].forward;
            if (start > 0)
                ln = zslGetElementByRank(zsl, start + 1);
        }
        //定位到起始节点后,根据逆向状态,不为0时后退查询(ln-backward),为0时递进查询(ln->level[0].forward)
        while (rangelen--) {
            serverAssertWithInfo(c, zobj, ln != NULL);
            ele = ln->ele;
            if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
            addReplyBulkCBuffer(c, ele, sdslen(ele));
            if (withscores) addReplyDouble(c, ln->score);
            ln = reverse ? ln->backward : ln->level[0].forward;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}

  

查找到跨度对应的节点,查找到跨度对应的节点,则在<1>处返回,如果我们传入的跨度大于头节点到尾节点的跨度,则返回NULL。

/*
 * Finds an element by its rank. The rank argument needs to be 1-based.
 * */
zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;//累计跨度
    int i;

    x = zsl->header;
    /*
     * 从头节点的最高层出发,如果基于当前层能够递进到前一个节点,
     * 则把当前节点的跨度加到traversed。
     */
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank) {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        /*
         * 如果累计跨度与调用方传入的跨度相等,则代表x已经前进到调用方
         * 所要求达到的跨度的节点,返回x。
         */
        if (traversed == rank) {
            return x;//<1>
        }
    }
    //如果传入的跨度大于头节点到尾节点的跨度,则返回NULL。
    return NULL;
}

   

跳跃表除了可以根据索引区间来查询,还可以根据分值区间来查询,这里我们又见到了结构体zrangespec。当我们需要判断一个节点是否落在我们指定的分值区间内,需要调用zslValueGteMin()和zslValueLteMax(),当传入一个指定的分值和区间,zslValueGteMin()和zslValueLteMax()的结果不为0,则表明节点落在分值区间内。此外,这两个方法还可以判断一个跳跃表是否和区间有交集,比如调用zslValueGteMin()时,传入尾节点(跳跃表分值最大的节点)及一个指定区间,如果尾节点没有落在指定区间,代表此区间都大于尾节点,此时我们不需要遍历跳跃表即可返回一个空数组,告诉客户端在指定区间内找不到任何节点;同理,调用zslValueLteMax()时传入头节点L0层的前驱节点(跳跃表分值最小节点)没有落在区间内,则表明区间小于跳跃表,同样不需要遍历跳跃表即可返回空数组给客户端,告诉客户端在指定区间内找不到任何节点。

/*
 * Struct to hold a inclusive/exclusive range spec by score comparison.
 * 此结构体用于表示一个指定区间,minex为0时表示在进行最小值比较时,要包含最小值本身
 * 同理maxex为0时表示进行最大值比较时,要包含最大值本身。
 * 比如:min=2,max=9
 * 当minex=0,maxex=0时,区间为:[2,9]
 * 当minex=1,maxex=0时,区间为:(2,9]
 * 当minex=0,maxex=1时,区间为:[2,9)
 * 当minex=1,maxex=1时,区间为:(2,9)
 * */
typedef struct {
    double min, max;
    int minex, maxex; /* are min or max exclusive? */
} zrangespec;

/*
 * 如果spec->minex不为0,返回分值是否大于区间最小值的比较结果,
 * 为0则返回分值是否大于等于区间最小值的比较结果。
 * 如果传入一个跳跃表尾节点的分值zsl->tail.score(即:跳跃表最大分值)和区间返回结果为0,
 * 则表示跳跃表和区间没有交集。
 * 这里分两种情况:
 * spec->minex不为0:区间要查询分值大于spec->min的元素,
 * zsl->tail.score<=spec->min代表跳跃表最大分值小于等于min,返回结果为0。
 * spec->minex为0:区间要查询分值大于等于spec->min的元素,
 * zsl->tail.score<spec->min代表跳跃表最大分值小于min,返回结果为0。
 */
int zslValueGteMin(double value, zrangespec *spec) {
    return spec->minex ? (value > spec->min) : (value >= spec->min);
}

/*
 * 如果spec->maxex不为0,返回分值是否小于区间最大值的比较结果,为0则返回分值
 * 是否小于等于区间最大值的比较结果。
 * 如果传入一个跳跃表头节点L0层指向节点的分值
 * zsl->header.level[0].forward.score(即:跳跃表最小分值)和
 * 区间返回结果为0,则表示跳跃表和区间没有交集。
 * 这里分两种情况:
 * spec->maxex不为0:区间要查询分值小于spec->max的元素,
 * zsl->header.level[0].forward.score>=spec->max代表跳跃表最小分值大于等于区间
 * 最大分值,返回结果为0。
 * spec->maxex为0:区间要查询分值小于等于spec->max的元素,
 * zsl->header.level[0].forward.score>spec->max代表跳跃表最小分值大于区间最大分值,
 * 返回结果为0。
 */
int zslValueLteMax(double value, zrangespec *spec) {
    return spec->maxex ? (value < spec->max) : (value <= spec->max);
}

    

在真正根据分值区间查询跳跃表前,会校验区间是否有效,如果我们输入一个区间[a,b],但a>b,那么这个区间肯定是无效区间,无须遍历跳跃表;如果a=b,如果区间的开闭状态出现:(a,b)、(a,b]、[a,b)这三种情况,也是无效区间,只有[a,b]才会去查询节点,表示需要查找分值为a(或者b)的节点。当校验完区间是有效后,还会调用zslValueGteMin()和zslValueLteMax()判断跳跃表和区间是否存在交集,即区间是否整体大于跳跃表或整体小于跳跃表,如果出现这两种情况则表明区间和跳跃表无交集,也就不需要遍历。

/*
 * Returns if there is a part of the zset is in range.
 * 判断跳跃表和区间是否存在交集
 * */
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;

    /*
     * Test for ranges that will always be empty.
     * 校验区间范围是否有效,无效则返回0表示查询结果为空:
     * 1.如果最小值大于最大值,则无效。
     * 2.如果最小值等于最大值,且区间为:(min,max)、(min,max]、[min,max)则无效。
     * */
    if (range->min > range->max ||
        (range->min == range->max && (range->minex || range->maxex)))
        return 0;
    /*
     * 如果尾节点不为NULL,则把跳跃表最大分值zsl->tail.score与区间比较,
     * 如果range->minex不为0,则查询分值大于range->min的元素,如果跳跃表
     * 最大分值zsl->tail.score小于等于range->min,则表示跳跃表和区间没有交集,
     * 无须遍历跳跃表查询;同理如果range->minex为0,则查询分值大于等于range->min
     * 的元素,如果zsl->tail.score小于range->min,则表示跳跃表和区间没有交集,
     * 也无须遍历跳跃表查询。
     */
    x = zsl->tail;
    if (x == NULL || !zslValueGteMin(x->score, range))
        return 0;
    /*
     * 如果头节点L0层的前驱节点不为NULL,则把跳跃表最小分值zsl->header->level[0].forward.score
     * 与区间比较,如果range->maxex不为0,则查询分值小于range->maxex的元素,如果跳跃表最小
     * 分值zsl->header->level[0].forward.score大于等于range->max,则表示跳跃表和区间没有交集,
     * 无须遍历跳跃表查询;同理如果range->maxex为0,则查询分值小于等于range->maxex的元素,如果跳跃表
     * 最小分值zsl->header->level[0].forward.score大于range->maxex,则表示跳跃表和区间没有交集,
     * 也无须遍历跳跃表查询。
     */
    x = zsl->header->level[0].forward;
    if (x == NULL || !zslValueLteMax(x->score, range))
        return 0;
    //跳跃表和区间存在交集,需要遍历跳跃表查询。
    return 1;
}

  

跳跃表允许我们根据分值区间进行正向查询(分值从小到大)或逆向查询(分值从大到小),如果是正向查询,则调用zslFirstInRange()方法,先判断跳跃表和指定区间是否存在交集,如果存在则查找指定区间内的分值最小的节点并返回。

/*
 * Find the first node that is contained in the specified range.
 * Returns NULL when no element is contained in the range.
 * 查找落在指定区间的第一个节点,如果没有元素落在这个区间则返回NULL。
 * */
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;
    int i;

    /*
     * If everything is out of range, return early.
     * 如果跳跃表和区间没有交集则无须遍历,直接返回NULL。
     * */
    if (!zslIsInRange(zsl, range)) return NULL;
    //从头节点的最高层开始遍历
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        /*
         * Go forward while *OUT* of range.
         * 如果x->level[i].forward不为NULL,根据其分值x->level[i].forward->score和
         * range->minex判断前驱节点是否能前进。
         * 这里分两种情况:
         * range->minex不为0:判断前驱节点的分值是否大于range->min,如果小于等于的话
         * expression=zslValueGteMin(x->level[i].forward->score, range)为0,
         * 代表需要前进,找到大于min的节点,而!expression为1,while条件成立,x会
         * 前进到它的前驱节点。当x的前驱节点的分值大于min,就会停止循环,x会停留在区间内
         * 第一个节点的后继节点。
         * range->minex为0:判断前驱节点的分值是否大于等于range->min,如果小于的话,
         * expression=zslValueGteMin(x->level[i].forward->score, range),expression为0,
         * 需要前进,找到大于等于min的节点,而!expression为1,while条件成立,x会
         * 前进到它的前驱节点。当x的前驱节点的分值大于等于min,就会停止循环,x会停留在区间内
         * 第一个节点的后继节点。
         * */
        while (x->level[i].forward &&
               !zslValueGteMin(x->level[i].forward->score, range))
            x = x->level[i].forward;
    }

    /*
     * This is an inner range, so the next node cannot be NULL.
     * 上面的循环会让x停留在区间内第一个节点的后继节点,为了达到区间内的
     * 第一个节点,x要在L0层前进到它的前驱节点。
     * */
    x = x->level[0].forward;
    serverAssert(x != NULL);

    /*
     * Check if score <= max.
     * range->maxex不为0:如果区间内第一个节点的分值大于等于spec->max,
     * expression=zslValueLteMax(x->score, range),expression结果为0
     * !expression为1,表示查询异常,返回NULL。
     * range->maxex为0:如果区间内第一个节点的分值大于spec->max,
     * 则expression=zslValueLteMax(x->score, range),expression结果为0
     * !expression为1,表示查询异常,返回NULL。
     * */
    if (!zslValueLteMax(x->score, range)) return NULL;
    return x;
}

   

如果要进行逆向查询,则调用zslLastInRange(),这里同样先判断跳跃表是否和区间存在交集,只有存在交集才会进行下一步的判断,查找指定区间内分值最大的节点并返回。

/*
 * Find the last node that is contained in the specified range.
 * Returns NULL when no element is contained in the range.
 * 查找落在指定区间的最后一个节点,如果没有元素落在这个区间则返回NULL。
 * */
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;
    int i;

    /*If everything is out of range, return early.*/
    if (!zslIsInRange(zsl, range)) return NULL;
//从头节点的最高层开始遍历 x = zsl->header; for (i = zsl->level - 1; i >= 0; i--) { /* * Go forward while *IN* range. * 根据区间range和前驱节点的分值判断是否前进,如果x->level[i].forward * 不为NULL,根据range->maxex判断前驱节点是否能前进。 * 这里分两种情况: * 如果range->maxex不为0,且前驱节点的分值小于range->max,则可以前进。 * 如果range->maxex为0,且前驱节点的分值小于等于range->max,则可以前进。 * */ while (x->level[i].forward && zslValueLteMax(x->level[i].forward->score, range)) x = x->level[i].forward; } /* This is an inner range, so this node cannot be NULL. */ serverAssert(x != NULL); /* * Check if score >= min. * 如果range->minex不为0,x的分值小于或等于range->min,代表查询出现异常,则返回NULL。 * 如果range->minex为0,x的分值小于range->min,代表查询出现异常,则返回NULL。 * */ if (!zslValueGteMin(x->score, range)) return NULL; return x; }

   

在了解完上面的内容后,下面我们要步入正题:如何根据分值区间进行正向或逆向查找节点。在下面代码<1>处,会根据逆向状态选择ln是指向区间分值最大的节点,或是分值最小的节点。在定位到起始节点后,会在<2>处的while循环对节点进行偏移,如果到达偏移位置后的ln不为NULL,则会进入<3>处的while循环,查找分值落在区间内的节点,这里会根据逆向状态是否不为0,决定是用backward指针后退,还是向L0层的前驱节点递进,一直到分值不落在区间内跳出while循环,或者ln为NULL,又或者limit为0结束while循环。如果我们没有指定偏移(offset)和返回数量(limit),则不会进行偏移,limit默认值为-1,limit--永远不为0,这里会返回落在区间内的所有节点,能结束while循环只有遇到分值不落在区间内的节点,或者是ln为NULL。

/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
void genericZrangebyscoreCommand(client *c, int reverse) {
    zrangespec range;//指定区间
    robj *key = c->argv[1];
    robj *zobj;
    long offset = 0, limit = -1;//偏移和结果返回数量
    int withscores = 0;
    unsigned long rangelen = 0;
    void *replylen = NULL;
    int minidx, maxidx;

    /*
     * Parse the range arguments.
     * 解析范围参数
     * ZRANGEBYSCORE key min max和ZREVRANGEBYSCORE key max min两个命令
     * 都是此函数实现的,如果客户端输入的命令为ZRANGEBYSCORE,则reverse为0,按
     * 从小到大查找分值及元素,分值小的在前,分值大的在后。如果客户端输入的命令为
     * ZREVRANGEBYSCORE,则reverse不为0,按从大到小查找分值及元素,分值大的在前,
     * 分值小的在后。
     *
     * */
    if (reverse) {
        /* Range is given as [max,min] */
        maxidx = 2;
        minidx = 3;
    } else {
        /* Range is given as [min,max] */
        minidx = 2;
        maxidx = 3;
    }

    if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) {
        addReplyError(c, "min or max is not a float");
        return;
    }

    /*
     * Parse optional extra arguments. Note that ZCOUNT will exactly have
     * 4 arguments, so we'll never enter the following code path.
     * 遍历可选参数,这里会判断是否要返回分值(withscores),是否要对查询结果进行偏移(offset)和数量(limit)的限制
     * */
    if (c->argc > 4) {
        int remaining = c->argc - 4;
        int pos = 4;

        while (remaining) {
            if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr, "withscores")) {
                pos++;
                remaining--;
                withscores = 1;
            } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr, "limit")) {
                if ((getLongFromObjectOrReply(c, c->argv[pos + 1], &offset, NULL)
                     != C_OK) ||
                    (getLongFromObjectOrReply(c, c->argv[pos + 2], &limit, NULL)
                     != C_OK)) {
                    return;
                }
                pos += 3;
                remaining -= 3;
            } else {
                addReply(c, shared.syntaxerr);
                return;
            }
        }
    }

    /*
     * Ok, lookup the key and get the range
     * 如果key所对应的zobj不存在,或者zobj的类型不为zset,则退出。
     * */
    if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL ||
        checkType(c, zobj, OBJ_ZSET))
        return;

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        //压缩列表流程...
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//zobj类型为跳跃表
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;

        /*
         * If reversed, get the last node in range as starting point.
         * 如果是逆向查询,ln会指向区间分值最大的节点,如果是正向查询,ln则指向区间分值最小的节点。
         * */
        if (reverse) {
            ln = zslLastInRange(zsl, &range);
        } else {
            ln = zslFirstInRange(zsl, &range);
        }

        /*
         * No "first" element in the specified interval.
         * 如果没有落在区间的开始节点则退出
         * */
        if (ln == NULL) {
            addReply(c, shared.emptyarray);
            return;
        }

        /* We don't know in advance how many matching elements there are in the
         * list, so we push this object that will represent the multi-bulk
         * length in the output buffer, and will "fix" it later */
        replylen = addReplyDeferredLen(c);

        /*
         * If there is an offset, just traverse the number of elements without
         * checking the score because that is done in the next loop.
         * <2>如果有偏移量,则根据reverse状态选择是后退还是递进,以达到偏移量。
         * 如果客户端有传入偏移,则offset不为0,这里会循环到offset为0或ln为NULL时跳出循环。
         * 否则offset默认值为0,不会进入此循环。
         * */
        while (ln && offset--) {
            if (reverse) {
                ln = ln->backward;
            } else {
                ln = ln->level[0].forward;
            }
        }
        /*
         * <3>如果客户端有传入偏移和数量,则limit不为0,此时会根据reverse状态后退或者前进至
         * ln为NULL或者limit为0,否则limit默认值为-1,limit--永远为true(注:只要limit
         * 不为0,则永远为true,即便是负数),这里就会循环到ln为NULL时,获取所有分值符合区间
         * 节点的。
         * 除了limit为0,或者ln为NULL会跳出while循环,在<4>处还会根据reverse状态判断分值是否在区间,
         * 如果不在则跳出循环,如果分值符合区间,还会在<5>处根据reverse状态选择是后退到后一个节点(ln->backward),
         * 还是前进到前一个节点(ln->level[0].forward)。
         */
        while (ln && limit--) {
            /*Abort when the node is no longer in range.*/
            if (reverse) {//<4>
                if (!zslValueGteMin(ln->score, &range)) break;
            } else {
                if (!zslValueLteMax(ln->score, &range)) break;
            }

            rangelen++;
            if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
            addReplyBulkCBuffer(c, ln->ele, sdslen(ln->ele));
            if (withscores) addReplyDouble(c, ln->score);

            /* Move to next node */
            if (reverse) {//<5>
                ln = ln->backward;
            } else {
                ln = ln->level[0].forward;
            }
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }

    if (withscores && c->resp == 2) rangelen *= 2;
    setDeferredArrayLen(c, replylen, rangelen);
}

  

最后,我们要了解如何获取一个元素在跳跃表中的索引,其实这里面的逻辑也是非常的简单,我们先从字典上获取节点的分值,然后根据分值及元素获取其在跳跃表中的索引,这里依旧支持正向查询或逆向查询,如果是正向查询,分值越小,索引越小,如果分值相等,则元素越小,索引越小;如果是逆向查询,则分值越大,索引越小,如果分值相等,则元素越大,索引越小。

/* Given a sorted set object returns the 0-based rank of the object or
 * -1 if the object does not exist.
 * 返回元素在有序集合中的索引,如果返回-1则代表元素不在有序集合内。
 *
 * For rank we mean the position of the element in the sorted collection
 * of elements. So the first element has rank 0, the second rank 1, and so
 * forth up to length-1 elements.
 * 在跳跃表中第一个元素的索引为0,第二个元素索引为1,以此类推,最后一个元素索引为length-1。
 *
 * If 'reverse' is false, the rank is returned considering as first element
 * the one with the lowest score. Otherwise if 'reverse' is non-zero
 * the rank is computed considering as element with rank 0 the one with
 * the highest score.
 * 如果reverse为0,跳跃表索引从分值最小的节点开始,即zsl->header.level[0].forward索引为0、
 * zsl->header.level[0].forward.level[0].forward索引为1,zsl->tail索引为zsl-length-1;
 * 如果reverse不为0,跳跃表索引从分值最大的节点开始,即zsl->tail索引为0,zsl->tail.backward索引
 * 为1,zsl->header.level[0].forward索引为zsl->length-1
 * */
long zsetRank(robj *zobj, sds ele, int reverse) {
    unsigned long llen;
    unsigned long rank;

    llen = zsetLength(zobj);

    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        //压缩列表逻辑……
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        dictEntry *de;
        double score;

        de = dictFind(zs->dict, ele);
        if (de != NULL) {
            /*
             * 如果元素存在在跳跃表上,则获取元素的分支,并根据
             * 分支判断其在跳跃表中的跨度,根据跨度计算节点在跳跃表
             * 中的索引。
             * 如果是正向查询(reverse为0),则索引为跨度(rank)-1。
             * 如果是逆向查询(reverse不为0),则索引为跳跃表长度(zsl->length)-跨度(rank)。
             */
            score = *(double *) dictGetVal(de);
            rank = zslGetRank(zsl, score, ele);
            /* Existing elements always have a rank. */
            serverAssert(rank != 0);
            if (reverse)
                return llen - rank;
            else
                return rank - 1;
        } else {//如果元素不在跳跃表上,则返回-1
            return -1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}

  

获取完分值后,需要定位节点在跳跃表中的跨度,然后根据逆向状态及跨度,计算节点在跳跃表中的索引。

/* Find the rank for an element by both score and key.
 * Returns 0 when the element cannot be found, rank otherwise.
 * Note that the rank is 1-based due to the span of zsl->header to the
 * first element.
 * 根据给定的分支和元素查找其节点在跳跃表中的跨度,返回0代表节点不存在。
 * */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;
    //从头节点最高层遍历,如果能前进到前一个节点,则把当前节点的跨度加到rank上
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /*
         * x might be equal to zsl->header, so test if obj is non-NULL
         * x可能停留在头节点,此处判断是保证节点的元素不为NULL。
         * */
        if (x->ele && sdscmp(x->ele, ele) == 0) {
            return rank;
        }
    }
    return 0;
}

  

至此,笔者和大家一起学习了Redis跳跃表的是如何插入节点、删除节点、更新节点以及如何对跳跃表中的节点进行不同维度(索引、分值)的查询。跳跃表是一种应用相当广泛的数据结构,很多场景下人们都用跳跃表代替B-Tree,因为跳跃表和B-Tree有着一样的查询时间复杂度O(logN),但跳跃表的实现却比B-Tree简单很多。而Redis正是借助了跳跃表的思路实现了有序集合,使得很多需要存储、排序海量数据的业务得以实现,如:微博热搜或者头条新闻,都可以使用Redis有序集合来解决。

当然,由于笔者的时间精力有限,这里并没有完全介绍所有跳跃表命令的相关实现,但笔者相信能看到这里的人,基本已经掌握了跳跃表的整体脉络。如果对跳跃表其余命令有兴趣的朋友,可以自行翻阅Redis源码,或者评论私信笔者你们的疑问,如果问题多的话笔者还会针对大家共同的问题进行讲解。

  

06-19 10:33