玖叶教程网

前端编程开发入门

深入学习 Redis 中的五种基础数据类型,附源代码和面试题(一)

以 Redis6.0 版本为例子,结合源代码,深入学习五种基础的数据类型,并且结合常见的面试题进行回答。

redisObject

在 Redis 中,‘robj’结构(或称为 redisObject)是一个非常核心的数据结构,用于表示所有的键值对象,包括字符串、列表、集合等所有类型的数据

typedef struct redisObject {
  
   // 这个字段表示对象的类型,如字符串、列表、集合、有序集合、哈希,类型信息告诉 Redis 该如何解释‘ptr’指针指向的数据
    unsigned type:4;              
  
  // 例如哈希和有序集合都可以用 ziplist 去编码
  // 这个字段表示对象的内部编码方式,例如字符串可以通过‘OBJ_ENCODING_RAW’或‘OBJ_ENCODIOG_EMBSTR’编码存储,集合可以用整数集合或哈希表编码存储
    unsigned encoding:4;
  
  // 这个字段用于缓存淘汰策略,存储了对象最后一次被访问的时间
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
  
  // 这个字段表示对象的引用计数,redis 使用引用计数来管理内存
    int refcount;
   
   // 这是一个指针,指向实际存储的数据
    void *ptr;
} robj;

所以接下来学习五种基础数据类型的时候,实际上就是 redisObject 中 type:4 的五个不同取值。

ziplist

redis中的 ziplist(压缩列表)是一种为了节省内存而设计的紧凑的序列化数据结构,用于高效地存储和访问一系列的小数据项,例如小整数和短字符串。ziplist 广泛用于Redis的内部实现,特别是当存储结构(如列表、哈希、集合和有序集合)中的元素数量较少且元素大小较小时,以优化内存使用。

ziplist的主要特点是它的元素是连续存储在单一的内存块中,以减少内存碎片和管理开销。它的结构大致可以分为以下几个部分:

  • 头部(Header):包含了整个ziplist的元数据,如 ziplist 的总字节长度和元素个数。这些信息用于快速访问和操作 ziplist 。
  • 条目(Entries):每个条目可能是一个整数或者一个字符串,条目之间紧密排列。每个条目都包含一些元数据,比如它的长度和编码方式,以及实际的数据内容。
  • 结束标志(End):一个特殊的字节(通常是0xFF),用于标记 ziplist 的结尾。

压缩列表节点包含三部分内容:

  • prevlen,记录了「前一个节点」的长度,目的是为了实现从后向前遍历;
  • encoding,记录了当前节点实际数据的「类型和长度」,类型主要有两种:字符串和整数。
  • data,记录了当前节点的实际数据,类型和长度都由 encoding 决定;

string(字符串)

应用:

String 类型的底层数据结构使用了 SDS,之所以没有使用 C 语言的字符串表示,因为 SDS 相比 C 原生的字符串,SDS 不仅可以保存文本数据,还可以保存二进制数据。因为 SDS 使用 len 属性的值而不是空字符串来判断字符串是否结束,而且 SDS 的所有 API 都会以处理二进制的方式来处理 SDS 存放在 buf[] 数组里的数据。所以 SDS 不光能存放文本数据,还能保存图片等二进制数据。还有动态内存分配的特性。还做了各种压缩编码的操作。

SDS 对象可以表示数字、字符串等类型。

SDS 数据结构,为了节省内存空间,SDS 采用了不同长度类型的数据结构,下面的其中一种:

struct __attribute__ ((__packed__)) sdshdr8 {
  // 字符串当前的长度
    uint8_t len; 
  
  // 为字符串内容分配的内存大小(不包括头部和空字符)
    uint8_t alloc; 
  
    // 快速识别字符串的实际类型和大小
    unsigned char flags; 
  
  	// 存储字符串内容
    char buf[];
};

数字:

127.0.0.1:6379> set count 1
OK
127.0.0.1:6379> incr count
(integer) 2
127.0.0.1:6379> get count
"2"

在表示数字的时候,和字符串有些许区别,例如使用 incr 命令的时候,redis 会去动态解析 sds 里面的 buf[] 数组,如果不是数字就报错,看下面位于 object.c 的代码:

int getLongLongFromObject(robj *o, long long *target) {
    long long value;

    if (o == NULL) {
        value = 0;
    } else {
        serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
      // 如果 redisObject 的编码方式是 OBJ_ENCODING_RAW 或者 OBJ_ENCODIOG_EMBSTR
        if (sdsEncodedObject(o)) {
          // 尝试将字符串转换成数字,如果转换失败则报错
            if (string2ll(o->ptr,sdslen(o->ptr),&value) == 0) return C_ERR;
          
          // 数字小的时候,直接在 redisObject 里面用 OBJ_ENCODING_INT 编码表示数字
        } else if (o->encoding == OBJ_ENCODING_INT) {
            value = (long)o->ptr;
        } else {
            serverPanic("Unknown string encoding");
        }
    }
    if (target) *target = value;
    return C_OK;
}

字符串:

127.0.0.1:6379> set count num
OK
127.0.0.1:6379> get count
"num"
127.0.0.1:6379> incr count
(error) ERR value is not an integer or out of range

SET 方法的源码实现:

在 t_string.c 文件中:

/* SET key value [NX] [XX] [KEEPTTL] [EX <seconds>] [PX <milliseconds>] */
void setCommand(client *c) {
    int j;
    robj *expire = NULL; // 用于存储过期时间的对象
    int unit = UNIT_SECONDS; // 默认时间单位是秒
    int flags = OBJ_SET_NO_FLAGS; // 初始化标志,表示没有任何特殊选项

    ...... // 中间省略其它功能

    // 尝试对 value 进行编码优化
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    // 调用 setGenericCommand 执行设置操作,传入解析的选项和参数
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

面试官:在 redisObject 中,表示字符串对象有哪些编码方式,分别介绍一下?

答:

OBJ_ENCODING_RAW:

当字符串对象较大时,Redis 使用 OBJ_ENCODING_RAW 编码。OBJ_ENCODING_RAW 编码的字符串对象是由两部分内存分配组成的:一部分用于 redisObject 结构体本身,另一部分独立分配用于 SDS 数据结构。也就是涉及到两次内存分配。

OBJ_ENCODING_EMBSTR:

对于小字符串(通常是 44 字节或更少),Redis 采用 OBJ_ENCODING_EMBSTR 编码。在这种编码方式中,redisObject 结构体和字符串数据(SDS)被紧凑地存储在单个连续的内存块中。也就是只涉及到一次内存分配。

OBJ_ENCODING_INT:

当字符串的内容可以表示为整数时,Redis 会选择这种编码方式。这不仅节省了存储空间,还提高了处理速度。在这种编码下,整数值直接存储在 redisObject 的指针字段中,而不是单独分配内存来存储数字的字符串表示。这种方式极大地优化了对整数的存储和操作。

面试官:项目中的 Redis 是怎么用的(字符串)?

答:

  • 缓存。将一个对象转换成 json 字符串存到 redis 中。
  • 计数器。在做登录功能的时候,要限制登录错误次数,使用了 incr 命令。
  • 验证码。使用方便,不用在数据库去设计表,redis 做验证码用完就丢。还有一个好处就是 redis 能够实现过期键,对应验证码自动过期。
  • session。用户登录功能,cookie 的机制。

Hash (哈希)

应用:

127.0.0.1:6379> HSET user:1001 name "John Doe"
(integer) 1
127.0.0.1:6379> HGET user:1001 name
"John Doe"

内部实现:

在 Redis6.0 中 Hash(哈希)类型的底层存储结构有两种,它们分别是:

  • ziplist(压缩列表),当哈希类型的键值对数量较少且键值对的大小较小时,Redis 会使用 ziplist 作为哈希的底层存储结构,ziplist 是一种紧凑的序列化数据结构,它将所有的元素存储在连续的内存区域中。
  • hashtable(哈希表),当哈希类型的键值对数量增加或者键值对的大小变大时,为了保持操作的效率,Redis 会将底层存储结构从 ziplist 转换为 hashtable,Redis 的 hashtable 使用链地址法来解决哈希冲突,扩容过程使用了 rehash 的操作,下文会结合源代码讲解。

Redis 在选择使用 ziplist 或 hashtable 作为哈希的底层存储结构时,会根据一些配置参数来决定,这些参数包括:

  • hash-max-ziplist-entries:这个配置项指定了哈希在使用 ziplist 作为底层结构时可以存储的最大元素数量,如果一个哈希的元素数量超过这个值,Redis 会自动将底层结构从 ziplist 转换为 hashtable。
  • hash-max-ziplist-value:这个配置项指定了哈希在使用 ziplist 作为底层结构时,最大的元素大小(字节数),如果哈希中的任何一个元素的大小超过这个值,Redis 也会将底层结构从 ziplist 转换为 hashtable。

ziplist:

在源代码中,它不是通过传统的 C 结构体来定义,它是一种特定的格式直接存储在连续的内存块中。

/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;
    zl[bytes-1] = ZIP_END;
    return zl;
}

面试官:当哈希底层使用 ziplist 作为数据结构时,是怎么添加元素的?

答:

int hashTypeSet(robj *o, sds field, sds value, int flags) {
    int update = 0;

  // 当底层是 ziplist 时
    if (o->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl, *fptr, *vptr;

        zl = o->ptr;
      // 获取 ziplist 的头部指针  
        fptr = ziplistIndex(zl, ZIPLIST_HEAD);
        if (fptr != NULL) {
            // 这里很关键,这里的搜索不是用 hash 的方式,而是遍历搜索  
          // 直接通过字节级别的比较判断两个键是否相等
            fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1);
            if (fptr != NULL) {
                // 获取字段的位置,fptr 指向字段,vptr 指向值
                vptr = ziplistNext(zl, fptr);
                serverAssert(vptr != NULL);
                update = 1;

                /* Delete value */
                zl = ziplistDelete(zl, &vptr);

                /* Insert new value */
                zl = ziplistInsert(zl, vptr, (unsigned char*)value,
                        sdslen(value));
            }
        }

      // 如果字段不存在,则讲新的字段和值添加到 ziplist 的末尾
        if (!update) {
            /* Push new field/value pair onto the tail of the ziplist */
            zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
                    ZIPLIST_TAIL);
            zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
                    ZIPLIST_TAIL);
        }
        o->ptr = zl;

        /* Check if the ziplist needs to be converted to a hash table */
        if (hashTypeLength(o) > server.hash_max_ziplist_entries)
            hashTypeConvert(o, OBJ_ENCODING_HT);
    } else if (o->encoding == OBJ_ENCODING_HT) {
        ...... // 省略底层是 hashTable 的情况
    } else {
        serverPanic("Unknown hash encoding");
    }

    /* Free SDS strings we did not referenced elsewhere if the flags
     * want this function to be responsible. */
    if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
    if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
    return update;
}
 

可以看到,ziplist 在存储哈希时,底层是这样的:【键1,值1,键2,值2,键3,值3......】,是紧凑存储的,因为元素少,查找元素的时候也是直接遍历对比,所以这种方式是时间和空间都非常友好的

使用 ziplist 作为哈希底层数据结构的时候,一定要注意!每次添加元素都会去扩容!因为 ziplist 被设置成一种紧凑型的数据结构,它不会预分配内存,每次添加新元素都会去扩容,因为扩容后的大小刚好能够把新元素添加进来!

面试官:当哈希底层使用 ziplist 作为数据结构时,是怎么解决哈希冲突的?

答:这种情况不会存在哈希冲突,因为每次添加新元素都是往 ziplist 最后去插入,而更新就元素就直接替换掉旧的。

面试官:当哈希底层使用 hashTable 作为数据结构时,是怎么添加元素的?

答:哈希的数据结构定义:

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

typedef struct dictType {
    uint64_t (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
  // 记录渐进式 rehash 的进度,如果不等于 -1 就是正在渐进式 rehash
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;
else if (o->encoding == OBJ_ENCODING_HT) {
        dictEntry *de = dictFind(o->ptr,field);
  			// 如果已经有相同的键
        if (de) {
          // 首先释放旧值占用的内存
            sdsfree(dictGetVal(de));
          
          // 根据 flags 决定是直接使用提供的 value(通过设置 HASH_SET_TAKE_VALUE 标志),还是复制一份新的值(使用 sdsdup)。如果直接使用提供的 value,将 value 指针赋给 dictEntry,然后将原 value 指针设为 NULL 防止外部再次使用。设置 update = 1 表示这是一个更新操作。
            if (flags & HASH_SET_TAKE_VALUE) {
                dictGetVal(de) = value;
                value = NULL;
            } else {
                dictGetVal(de) = sdsdup(value);
            }
            update = 1;
          // 如果字段不存在(de 为 NULL),则需要添加新的字段和值
        } else {
            sds f,v;
            if (flags & HASH_SET_TAKE_FIELD) {
                f = field;
                field = NULL;
            } else {
                f = sdsdup(field);
            }
            if (flags & HASH_SET_TAKE_VALUE) {
                v = value;
                value = NULL;
            } else {
                v = sdsdup(value);
            }
            dictAdd(o->ptr,f,v);
        }
    }

面试官:当哈希底层使用 hashTable 作为数据结构的时候,是怎么解决哈希冲突的?

答:使用了链地址法,

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
      // 如果哈希表当前不在 rehash 过程中,就没有必要检查第二个表格,因此跳出循环。
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}

面试官:讲一下 Redis 中,当哈希底层使用 hashTable 作为数据结构的扩容流程

答:在添加新元素的时候,首先判断一下是否需要扩容。

  1. 首先判断是否在 rehash,如果是就返回。
  2. 如果不是就判断 hashTable 是否为空,如果是就初始化一个容量。
  3. 如果 hashTable 不为空,如果扩容启用且已经负载因子达到1,或者扩容未被明确禁止且负载因子超过5,就要触发扩容。
  4. 每次扩容后的新容量都是原来的两倍。
  5. 为了避免 rehash 在数据迁移的过程中因拷贝数据的耗时影响 Redis 的性能,Redis 采用了渐进式 rehash,也就是将数据的迁移工作不再是一次性完成,而是分多次迁移。

具体讲一下渐进式 rehash:

当需要 rehash 时,Redis 并不立即释放旧的哈希表,而是分配一个新的哈希表(ht[1]),旧的哈希表是(ht[0]),此时 Redis 会同时维护这两个哈希表。

在后续的操作中,(插入、查找、删除),会先进行渐进式 rehash,也就是将 ht[0] 中的元素逐渐迁移到 ht[1] 中。每次迁移 n 个桶。同时判断如果每次访问空桶最大次数到达 n 的 10 倍时,就结束本次 rehash,防止单次操作时间太长。(正常情况下每次的 n 是 1)

int dictRehash(dict *d, int n) {
  // 设置访问空桶的最大次数为 n 的 10 倍,避免在空桶太多时造成的长时间循环
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
  // 旧哈希表的大小
    unsigned long s0 = d->ht[0].size;
  // 新哈希表的大小
    unsigned long s1 = d->ht[1].size;
    if (dict_can_resize == DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;
    if (dict_can_resize == DICT_RESIZE_AVOID && 
        ((s1 > s0 && s1 / s0 < dict_force_resize_ratio) ||
         (s1 < s0 && s0 / s1 < dict_force_resize_ratio)))
    {
        return 0;
    }

  // 尝试对 n 个桶进行 rehash
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        // 将当前桶中的所有键迁移到新哈希表
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
      // 清空已迁移的桶
        d->ht[0].table[d->rehashidx] = NULL;
      // 移动到下一个桶
        d->rehashidx++;
    }

    // 检查旧表是否已完全迁移
    if (d->ht[0].used == 0) {
      // 释放旧表内存
        zfree(d->ht[0].table);
      // 将新表设置为主表
        d->ht[0] = d->ht[1];
      // 重置新表
        _dictReset(&d->ht[1]);
      // 重置 rehash 索引
        d->rehashidx = -1;
       // 返回 0,表示 rehash 完成
        return 0;
    }

   // 返回 1,表示还有更多元素需要 rehash
    return 1;
}

在进行渐进式 rehash 的过程中,如果是查找,则先找 ht[0],如果通过标志位确认当前处于 rehash 过程,则再去 ht[1] 找;如果是添加,就直接把新的键值对添加到 ht[1] 中;如果是删除,就先查找 ht[0] 并删除,如果通过标志位确认当前处于 rehash 过程,则再去 ht[1] 查找并删除。

n 不等于 1 的情况通常发生在 Redis 认为有必要加快 rehash 进程的特定场景下,就是一些后台任务,这里不细说。下面是指定 rehash 时间的源码:

int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;

  // 每次 100 个桶
    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

面试官:讲一下 Redis 中,哈希数据结构的扩容流程

答:首先在元素较少的时候,使用的是 ziplist,后续继续添加新的元素后,如果达到配置hash-max-ziplist-entrieshash-max-ziplist-value就变成 hashTable,后续继续添加新元素,就采用渐进式 rehash 进行扩容,每次最多迁移一个桶,每次新的桶数量都是原来的 2 倍。

list(列表)

应用:

可以用作消息队列等

127.0.0.1:6379> lpush key value
(integer) 1
127.0.0.1:6379> lrange key 0 -1
1) "value"

内部实现:

在 Redis6.0 中,底层使用的是 quicklist,它结合了双向链表和 ziplist 的优点,提供空间效率和操作性能之间的最佳平衡

typedef struct quicklist {
    quicklistNode *head;                  /* 指向第一个快速列表节点的指针 */
    quicklistNode *tail;                  /* 指向最后一个快速列表节点的指针 */
    unsigned long count;                  /* 所有 ziplist 中所有条目的总计数 */
    unsigned long len;                    /* quicklistNodes 的数量 */
    int fill : QL_FILL_BITS;              /* 个别节点的填充因子 */
    unsigned int compress : QL_COMP_BITS; /* 不压缩的末端节点的深度;0=关闭 */
  
   // 这是在 Redis 6.2 中引入的功能,允许用户在 quicklist 中设置标记,以便快速跳转到列表中的特定位置。每个书签都指向一个 quicklist 节点,使得某些操作更加高效。
    unsigned int bookmark_count: QL_BM_BITS; 
    quicklistBookmark bookmarks[];    
} quicklist;

typedef struct quicklistNode {
    struct quicklistNode *prev;             /* 指向前一个节点的指针 */
    struct quicklistNode *next;             /* 指向下一个节点的指针 */
    unsigned char *zl;                      /* 指向压缩列表的指针 */
    unsigned int sz;                        /* 压缩列表的字节大小 */
    unsigned int count : 16;                /* 压缩列表中的项数 */
    unsigned int encoding : 2;              /* 编码类型:RAW==1 或 LZF==2 */
    unsigned int container : 2;             /* 容器类型:NONE==1 或 ZIPLIST==2 */
    unsigned int recompress : 1;            /* 此节点是否之前被压缩过 */
    unsigned int attempted_compress : 1;    /* 节点是否尝试过压缩但因太小而失败 */
    unsigned int extra : 10;                /* 为将来的使用预留的额外位 */
} quicklistNode;
  • 双向链表:quicklist 的主体是一个双向链表,每个节点都可以通过 head 和 tail 指针快速访问,支持从两端进行高效操作。
  • 压缩列表(ziplist):链表的每个节点都包含一个压缩列表,这是一个紧凑的数组,用于存储实际的列表元素。这种结构使得 quicklist 在存储小列表时非常高效,同时通过分割大列表来避免单个压缩列表变得过大。

在向 quicklist 添加一个元素的时候,不会像普通的链表那样,直接新建一个链表节点。而是会检查插入位置的压缩列表是否能容纳该元素,如果能容纳就直接保存到 quicklistNode 结构里的压缩列表,如果不能容纳,才会新建一个新的 quicklistNode 结构。

下面是向 quicklist 头部插入元素的流程:

int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    // 保存原始头节点的指针,以便之后判断头节点是否发生变化
    quicklistNode *orig_head = quicklist->head;
    // 确保插入值的大小小于 UINT32_MAX,这是因为 quicklist 设计上的限制
    assert(sz < UINT32_MAX); 
    // 检查当前的头节点是否有足够的空间插入新的值
    if (likely(
            _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        // 如果头节点有空间,直接在头节点的 ziplist 中插入新值
        quicklist->head->zl =
            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        // 更新头节点的 ziplist 大小
        quicklistNodeUpdateSz(quicklist->head);
    } else {
        // 如果头节点没有足够空间,创建一个新的 quicklist 节点
        quicklistNode *node = quicklistCreateNode();
        // 在新节点的 ziplist 中插入新值,并设置为头节点
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

        // 更新新节点的 ziplist 大小
        quicklistNodeUpdateSz(node);
        // 将新节点插入到 quicklist 的头部
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    // 增加 quicklist 的总元素计数
    quicklist->count++;
    // 增加头节点的元素计数
    quicklist->head->count++;
    // 返回一个布尔值,指示头节点是否因为插入操作而改变
    return (orig_head != quicklist->head);
}

set(集合)

应用:

127.0.0.1:6379> sadd test a
(integer) 1
127.0.0.1:6379> sadd test b
(integer) 1
127.0.0.1:6379> smembers test
1) "a"
2) "b"

内部实现:

底层就是直接使用哈希,没什么好说的。

zset(有序集合)

redis的有序集合(zset)主要是根据分数(score)来进行排序的,这是它的核心特性之一。每个元素在有序集合中都有一个与之关联的分数,这个分数用于确定元素在集合中的排序位置。当有序集合进行操作如插入、删除或查找元素时,redis会首先根据分数进行排序。如果多个元素有相同的分数,那么这些元素会根据它们的元素值(通常是字符串)在字典序中的排列进行二级排序。

应用:

127.0.0.1:6379> ZADD test 1 a
(integer) 1
127.0.0.1:6379> ZADD test 2 b
(integer) 1
127.0.0.1:6379> ZRANGE test 0 10 WITHSCORES
1) "a"
2) "1"
3) "b"
4) "2"

内部实现:

zset 类型(有序集合类型)相比于 set 类型多了一个排序属性 score(分值),对于有序集合 zSet 来说,每个存储元素相当于有两个值组成的,一个是有序集合的元素值,一个是排序值。

有序集合保留了集合不能有重复成员的特性(分值可以重复),但不同的是,有序集合中的元素可以排序。

Zset 类型的底层数据结构是由 ziplist 或 skiplist 实现的:

  • 如果有序集合的元素个数小于 xxx 个,并且每个元素的值小于 xxx 字节时,redis 会使用 ziplist 作为 zset 类型的底层数据结构,这个 xxx 可以配置。
  • 如果有序集合的元素不满足上面的条件,redis 会使用 skiplist + 哈希作为 zset 类型的底层数据结构。在哈希中,key 是元素,值的对应的分数。在 skiplist 中,将元素和分数放到 zskiplistNode 里,元素在 skiplist 里按照分数进行排序。(哈希的目的是根据元素查找分数,skiplist 的目的是根据分数查找元素,两个是相反的操作)

底层使用 skiplist + 哈希 作为 zset 底层:

typedef struct zskiplistNode {
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

面试官:当 zset 底层使用 ziplist 时,是怎么插入和查询的?

答:

插入:

/* 当编码方式是 ziplist 时。 */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
    unsigned char *eptr;

    /* 查找元素是否已存在于有序集合中,如果存在,获取其当前分数。 */
    if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
      
       ...  // 省略其它

        /* 如果新分数与当前分数不同,则需要从压缩列表中删除原元素,并以新分数重新插入。 */
        if (score != curscore) {
            zobj->ptr = zzlDelete(zobj->ptr,eptr); // 删除原元素。
            zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 以新分数重新插入元素。
            *flags |= ZADD_UPDATED; // 设置标志表示元素分数已更新。
        }
        return 1;
    } else if (!xx) {
        /* 如果设置了XX标志且元素不存在,则不进行任何操作。XX选项表示仅在元素已存在时更新。 */
        /* 在执行zzlInsert之前,检查元素大小或压缩列表长度是否超出限制。 */
        if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries ||
            sdslen(ele) > server.zset_max_ziplist_value ||
            !ziplistSafeToAdd(zobj->ptr, sdslen(ele)))
        {
            /* 如果超出限制,则将 ziplist 转换为 skiplist。 */
            zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
        } else {
            /* 否则,将新元素插入压缩列表,并设置相应的标志。 */
            zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 插入新元素。
            if (newscore) *newscore = score; // 如果提供了newscore指针,更新其值为新分数。
            *flags |= ZADD_ADDED; // 设置标志表示有新元素被添加。
            return 1;
        }
    } else {
        *flags |= ZADD_NOP; // 如果元素不存在且设置了XX标志,设置标志表示本次操作没有改变有序集合。
        return 1;
    }
}

查询:

可以看到,当 zset 底层用 ziplist 作为数据结构时,是这样存储是:[元素1,分数1,元素2,分数2......]

unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
    // 从ziplist的头部开始遍历。
    unsigned char *eptr = ziplistIndex(zl,0), *sptr;

    while (eptr != NULL) { // 当指针非空,即未遍历到ziplist的末尾时,继续遍历。
        sptr = ziplistNext(zl,eptr); // 获取当前元素的分数指针。
        serverAssert(sptr != NULL); // 断言分数指针非空,确保数据完整性。

        // 比较当前遍历到的元素是否与查找的元素相匹配。
        if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
            // 如果找到匹配的元素,并且调用者请求返回分数,通过zzlGetScore函数获取分数。
            if (score != NULL) *score = zzlGetScore(sptr);
            return eptr; // 返回找到的元素的指针。
        }

        // 移动到下一个元素,即跳过当前的分数指针,指向下一个元素的指针。
        eptr = ziplistNext(zl,sptr);
    }
    // 如果遍历完整个ziplist都没有找到元素,则返回NULL。
    return NULL;
}

面试官:当 zset 底层使用 skiplist 时,是怎么插入和查询的?

答:

插入:

// 检查对象的编码是否为跳跃表
if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
    // 获取有序集合的指针
    zset *zs = zobj->ptr;
    zskiplistNode *znode;
    dictEntry *de;

    // 在字典中查找元素
    de = dictFind(zs->dict,ele);
    if (de != NULL) {
        ...... // 省略其它代码
  
        // 获取当前元素的分数
        curscore = *(double*)dictGetVal(de);
        // 如果分数有变化,需要在跳跃表中更新元素的分数
        if (score != curscore) {
            znode = zslUpdateScore(zs->zsl,curscore,ele,score); // 更新跳跃表中元素的分数
            // 更新字典中分数的引用
            dictGetVal(de) = &znode->score; 
            *flags |= ZADD_UPDATED; // 设置标志以指示分数被更新
        }
        return 1;
    } else if (!xx) {
        // 如果元素不存在且没有设置XX选项,则添加新元素
        ele = sdsdup(ele); // 复制元素的SDS字符串
        znode = zslInsert(zs->zsl,score,ele); // 在跳跃表中插入新元素
        // 确保在字典中也添加了元素,并关联到跳跃表节点的分数
        serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
        *flags |= ZADD_ADDED; // 设置标志以指示添加了新元素
        if (newscore) *newscore = score; // 如果需要,更新外部提供的新分数变量
        return 1;
    } else {
        // 如果元素不存在且设置了XX选项,则不进行任何操作
        *flags |= ZADD_NOP;
        return 1;
    }
}

// 定义插入操作的函数,将一个新元素及其分数插入到跳跃表中
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // 定义更新数组和排名数组,以及其他变量
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    // 断言分数不是NaN,确保分数有效
    serverAssert(!isnan(score));
    // 从头节点开始遍历跳跃表
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        // 存储到达插入位置时经过的节点数
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        // 遍历跳跃表,找到插入点
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    // 随机生成新节点的层数
    level = zslRandomLevel();
    // 如果新节点的层数高于当前跳跃表的最高层数,则进行调整
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    // 创建新节点
    x = zslCreateNode(level,score,ele);
    // 插入新节点到跳跃表,并更新相关指针和跨度
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        // 更新跨度
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    // 对未触及的层增加跨度
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    // 设置新节点的后退指针
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    // 更新新节点前向节点的后退指针或者跳跃表的尾指针
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    // 增加跳跃表的长度
    zsl->length++;
    // 返回新插入的节点
    return x;
}

节点层数的确定:

int zslRandomLevel(void) {
    int level = 1; // 起始层为1,因为每个节点至少要出现在第一层
    // 使用随机数决定节点的层数,以ZSKIPLIST_P的概率增加层
    // ZSKIPLIST_P是一个概率值,用于控制节点平均占据的层数
    // 通常设置为1/4或者类似的值,意味着每增加一层的概率是25%
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1; // 如果随机条件满足,则层数加1
    // 确保层数不超过ZSKIPLIST_MAXLEVEL,这是跳跃表允许的最大层数
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

平均来说,每个节点的层数是 1.33 层,相对二叉平衡搜索树,每个节点都要包含两个其它节点的引用,在这方面 skiplist 做到了内存优化。

查询:

一层一层去查找,时间复杂度是 log(N),N 是元素个数。

总结

redis 为了节省内存,做了很多优化,例如 zset、hash,在元素少和小的情况,先使用 ziplist,后期元素增多就会升级成其它数据结构,深入源代码细节,还有其它优化,例如 sds 还可以用不同的编码来节省内存。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言