以 Redis6.0 版本为例子,结合源代码,深入学习五种基础的数据类型,并且结合常见的面试题进行回答。
redisObject
在 Redis 中,‘robj’结构(或称为 redisObject)是一个非常核心的数据结构,用于表示所有的键值对象,包括字符串、列表、集合等所有类型的数据
typedef struct redisObject {
// 这个字段表示对象的类型,如字符串、列表、集合、有序集合、哈希,类型信息告诉 Redis 该如何解释‘ptr’指针指向的数据
unsigned type:4;
// 例如哈希和有序集合都可以用 ziplist 去编码
// 这个字段表示对象的内部编码方式,例如字符串可以通过‘OBJ_ENCODING_RAW’或‘OBJ_ENCODIOG_EMBSTR’编码存储,集合可以用整数集合或哈希表编码存储
unsigned encoding:4;
// 这个字段用于缓存淘汰策略,存储了对象最后一次被访问的时间
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
// 这个字段表示对象的引用计数,redis 使用引用计数来管理内存
int refcount;
// 这是一个指针,指向实际存储的数据
void *ptr;
} robj;
所以接下来学习五种基础数据类型的时候,实际上就是 redisObject 中 type:4 的五个不同取值。
ziplist
redis中的 ziplist(压缩列表)是一种为了节省内存而设计的紧凑的序列化数据结构,用于高效地存储和访问一系列的小数据项,例如小整数和短字符串。ziplist 广泛用于Redis的内部实现,特别是当存储结构(如列表、哈希、集合和有序集合)中的元素数量较少且元素大小较小时,以优化内存使用。
ziplist的主要特点是它的元素是连续存储在单一的内存块中,以减少内存碎片和管理开销。它的结构大致可以分为以下几个部分:
- 头部(Header):包含了整个ziplist的元数据,如 ziplist 的总字节长度和元素个数。这些信息用于快速访问和操作 ziplist 。
- 条目(Entries):每个条目可能是一个整数或者一个字符串,条目之间紧密排列。每个条目都包含一些元数据,比如它的长度和编码方式,以及实际的数据内容。
- 结束标志(End):一个特殊的字节(通常是0xFF),用于标记 ziplist 的结尾。
压缩列表节点包含三部分内容:
- prevlen,记录了「前一个节点」的长度,目的是为了实现从后向前遍历;
- encoding,记录了当前节点实际数据的「类型和长度」,类型主要有两种:字符串和整数。
- data,记录了当前节点的实际数据,类型和长度都由 encoding 决定;
string(字符串)
应用:
String 类型的底层数据结构使用了 SDS,之所以没有使用 C 语言的字符串表示,因为 SDS 相比 C 原生的字符串,SDS 不仅可以保存文本数据,还可以保存二进制数据。因为 SDS 使用 len 属性的值而不是空字符串来判断字符串是否结束,而且 SDS 的所有 API 都会以处理二进制的方式来处理 SDS 存放在 buf[] 数组里的数据。所以 SDS 不光能存放文本数据,还能保存图片等二进制数据。还有动态内存分配的特性。还做了各种压缩编码的操作。
SDS 对象可以表示数字、字符串等类型。
SDS 数据结构,为了节省内存空间,SDS 采用了不同长度类型的数据结构,下面的其中一种:
struct __attribute__ ((__packed__)) sdshdr8 {
// 字符串当前的长度
uint8_t len;
// 为字符串内容分配的内存大小(不包括头部和空字符)
uint8_t alloc;
// 快速识别字符串的实际类型和大小
unsigned char flags;
// 存储字符串内容
char buf[];
};
数字:
127.0.0.1:6379> set count 1
OK
127.0.0.1:6379> incr count
(integer) 2
127.0.0.1:6379> get count
"2"
在表示数字的时候,和字符串有些许区别,例如使用 incr 命令的时候,redis 会去动态解析 sds 里面的 buf[] 数组,如果不是数字就报错,看下面位于 object.c 的代码:
int getLongLongFromObject(robj *o, long long *target) {
long long value;
if (o == NULL) {
value = 0;
} else {
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
// 如果 redisObject 的编码方式是 OBJ_ENCODING_RAW 或者 OBJ_ENCODIOG_EMBSTR
if (sdsEncodedObject(o)) {
// 尝试将字符串转换成数字,如果转换失败则报错
if (string2ll(o->ptr,sdslen(o->ptr),&value) == 0) return C_ERR;
// 数字小的时候,直接在 redisObject 里面用 OBJ_ENCODING_INT 编码表示数字
} else if (o->encoding == OBJ_ENCODING_INT) {
value = (long)o->ptr;
} else {
serverPanic("Unknown string encoding");
}
}
if (target) *target = value;
return C_OK;
}
字符串:
127.0.0.1:6379> set count num
OK
127.0.0.1:6379> get count
"num"
127.0.0.1:6379> incr count
(error) ERR value is not an integer or out of range
SET 方法的源码实现:
在 t_string.c 文件中:
/* SET key value [NX] [XX] [KEEPTTL] [EX <seconds>] [PX <milliseconds>] */
void setCommand(client *c) {
int j;
robj *expire = NULL; // 用于存储过期时间的对象
int unit = UNIT_SECONDS; // 默认时间单位是秒
int flags = OBJ_SET_NO_FLAGS; // 初始化标志,表示没有任何特殊选项
...... // 中间省略其它功能
// 尝试对 value 进行编码优化
c->argv[2] = tryObjectEncoding(c->argv[2]);
// 调用 setGenericCommand 执行设置操作,传入解析的选项和参数
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
面试官:在 redisObject 中,表示字符串对象有哪些编码方式,分别介绍一下?
答:
OBJ_ENCODING_RAW:
当字符串对象较大时,Redis 使用 OBJ_ENCODING_RAW 编码。OBJ_ENCODING_RAW 编码的字符串对象是由两部分内存分配组成的:一部分用于 redisObject 结构体本身,另一部分独立分配用于 SDS 数据结构。也就是涉及到两次内存分配。
OBJ_ENCODING_EMBSTR:
对于小字符串(通常是 44 字节或更少),Redis 采用 OBJ_ENCODING_EMBSTR 编码。在这种编码方式中,redisObject 结构体和字符串数据(SDS)被紧凑地存储在单个连续的内存块中。也就是只涉及到一次内存分配。
OBJ_ENCODING_INT:
当字符串的内容可以表示为整数时,Redis 会选择这种编码方式。这不仅节省了存储空间,还提高了处理速度。在这种编码下,整数值直接存储在 redisObject 的指针字段中,而不是单独分配内存来存储数字的字符串表示。这种方式极大地优化了对整数的存储和操作。
面试官:项目中的 Redis 是怎么用的(字符串)?
答:
- 缓存。将一个对象转换成 json 字符串存到 redis 中。
- 计数器。在做登录功能的时候,要限制登录错误次数,使用了 incr 命令。
- 验证码。使用方便,不用在数据库去设计表,redis 做验证码用完就丢。还有一个好处就是 redis 能够实现过期键,对应验证码自动过期。
- session。用户登录功能,cookie 的机制。
Hash (哈希)
应用:
127.0.0.1:6379> HSET user:1001 name "John Doe"
(integer) 1
127.0.0.1:6379> HGET user:1001 name
"John Doe"
内部实现:
在 Redis6.0 中 Hash(哈希)类型的底层存储结构有两种,它们分别是:
- ziplist(压缩列表),当哈希类型的键值对数量较少且键值对的大小较小时,Redis 会使用 ziplist 作为哈希的底层存储结构,ziplist 是一种紧凑的序列化数据结构,它将所有的元素存储在连续的内存区域中。
- hashtable(哈希表),当哈希类型的键值对数量增加或者键值对的大小变大时,为了保持操作的效率,Redis 会将底层存储结构从 ziplist 转换为 hashtable,Redis 的 hashtable 使用链地址法来解决哈希冲突,扩容过程使用了 rehash 的操作,下文会结合源代码讲解。
Redis 在选择使用 ziplist 或 hashtable 作为哈希的底层存储结构时,会根据一些配置参数来决定,这些参数包括:
- hash-max-ziplist-entries:这个配置项指定了哈希在使用 ziplist 作为底层结构时可以存储的最大元素数量,如果一个哈希的元素数量超过这个值,Redis 会自动将底层结构从 ziplist 转换为 hashtable。
- hash-max-ziplist-value:这个配置项指定了哈希在使用 ziplist 作为底层结构时,最大的元素大小(字节数),如果哈希中的任何一个元素的大小超过这个值,Redis 也会将底层结构从 ziplist 转换为 hashtable。
ziplist:
在源代码中,它不是通过传统的 C 结构体来定义,它是一种特定的格式直接存储在连续的内存块中。
/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
unsigned char *zl = zmalloc(bytes);
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
ZIPLIST_LENGTH(zl) = 0;
zl[bytes-1] = ZIP_END;
return zl;
}
面试官:当哈希底层使用 ziplist 作为数据结构时,是怎么添加元素的?
答:
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;
// 当底层是 ziplist 时
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr, *vptr;
zl = o->ptr;
// 获取 ziplist 的头部指针
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
// 这里很关键,这里的搜索不是用 hash 的方式,而是遍历搜索
// 直接通过字节级别的比较判断两个键是否相等
fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1);
if (fptr != NULL) {
// 获取字段的位置,fptr 指向字段,vptr 指向值
vptr = ziplistNext(zl, fptr);
serverAssert(vptr != NULL);
update = 1;
/* Delete value */
zl = ziplistDelete(zl, &vptr);
/* Insert new value */
zl = ziplistInsert(zl, vptr, (unsigned char*)value,
sdslen(value));
}
}
// 如果字段不存在,则讲新的字段和值添加到 ziplist 的末尾
if (!update) {
/* Push new field/value pair onto the tail of the ziplist */
zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
ZIPLIST_TAIL);
zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
ZIPLIST_TAIL);
}
o->ptr = zl;
/* Check if the ziplist needs to be converted to a hash table */
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
} else if (o->encoding == OBJ_ENCODING_HT) {
...... // 省略底层是 hashTable 的情况
} else {
serverPanic("Unknown hash encoding");
}
/* Free SDS strings we did not referenced elsewhere if the flags
* want this function to be responsible. */
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
return update;
}
可以看到,ziplist 在存储哈希时,底层是这样的:【键1,值1,键2,值2,键3,值3......】,是紧凑存储的,因为元素少,查找元素的时候也是直接遍历对比,所以这种方式是时间和空间都非常友好的
使用 ziplist 作为哈希底层数据结构的时候,一定要注意!每次添加元素都会去扩容!因为 ziplist 被设置成一种紧凑型的数据结构,它不会预分配内存,每次添加新元素都会去扩容,因为扩容后的大小刚好能够把新元素添加进来!
面试官:当哈希底层使用 ziplist 作为数据结构时,是怎么解决哈希冲突的?
答:这种情况不会存在哈希冲突,因为每次添加新元素都是往 ziplist 最后去插入,而更新就元素就直接替换掉旧的。
面试官:当哈希底层使用 hashTable 作为数据结构时,是怎么添加元素的?
答:哈希的数据结构定义:
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
// 记录渐进式 rehash 的进度,如果不等于 -1 就是正在渐进式 rehash
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;
else if (o->encoding == OBJ_ENCODING_HT) {
dictEntry *de = dictFind(o->ptr,field);
// 如果已经有相同的键
if (de) {
// 首先释放旧值占用的内存
sdsfree(dictGetVal(de));
// 根据 flags 决定是直接使用提供的 value(通过设置 HASH_SET_TAKE_VALUE 标志),还是复制一份新的值(使用 sdsdup)。如果直接使用提供的 value,将 value 指针赋给 dictEntry,然后将原 value 指针设为 NULL 防止外部再次使用。设置 update = 1 表示这是一个更新操作。
if (flags & HASH_SET_TAKE_VALUE) {
dictGetVal(de) = value;
value = NULL;
} else {
dictGetVal(de) = sdsdup(value);
}
update = 1;
// 如果字段不存在(de 为 NULL),则需要添加新的字段和值
} else {
sds f,v;
if (flags & HASH_SET_TAKE_FIELD) {
f = field;
field = NULL;
} else {
f = sdsdup(field);
}
if (flags & HASH_SET_TAKE_VALUE) {
v = value;
value = NULL;
} else {
v = sdsdup(value);
}
dictAdd(o->ptr,f,v);
}
}
面试官:当哈希底层使用 hashTable 作为数据结构的时候,是怎么解决哈希冲突的?
答:使用了链地址法,
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
// 如果哈希表当前不在 rehash 过程中,就没有必要检查第二个表格,因此跳出循环。
if (!dictIsRehashing(d)) break;
}
return idx;
}
面试官:讲一下 Redis 中,当哈希底层使用 hashTable 作为数据结构的扩容流程
答:在添加新元素的时候,首先判断一下是否需要扩容。
- 首先判断是否在 rehash,如果是就返回。
- 如果不是就判断 hashTable 是否为空,如果是就初始化一个容量。
- 如果 hashTable 不为空,如果扩容启用且已经负载因子达到1,或者扩容未被明确禁止且负载因子超过5,就要触发扩容。
- 每次扩容后的新容量都是原来的两倍。
- 为了避免 rehash 在数据迁移的过程中因拷贝数据的耗时影响 Redis 的性能,Redis 采用了渐进式 rehash,也就是将数据的迁移工作不再是一次性完成,而是分多次迁移。
具体讲一下渐进式 rehash:
当需要 rehash 时,Redis 并不立即释放旧的哈希表,而是分配一个新的哈希表(ht[1]),旧的哈希表是(ht[0]),此时 Redis 会同时维护这两个哈希表。
在后续的操作中,(插入、查找、删除),会先进行渐进式 rehash,也就是将 ht[0] 中的元素逐渐迁移到 ht[1] 中。每次迁移 n 个桶。同时判断如果每次访问空桶最大次数到达 n 的 10 倍时,就结束本次 rehash,防止单次操作时间太长。(正常情况下每次的 n 是 1)
int dictRehash(dict *d, int n) {
// 设置访问空桶的最大次数为 n 的 10 倍,避免在空桶太多时造成的长时间循环
int empty_visits = n*10; /* Max number of empty buckets to visit. */
// 旧哈希表的大小
unsigned long s0 = d->ht[0].size;
// 新哈希表的大小
unsigned long s1 = d->ht[1].size;
if (dict_can_resize == DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;
if (dict_can_resize == DICT_RESIZE_AVOID &&
((s1 > s0 && s1 / s0 < dict_force_resize_ratio) ||
(s1 < s0 && s0 / s1 < dict_force_resize_ratio)))
{
return 0;
}
// 尝试对 n 个桶进行 rehash
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
// 将当前桶中的所有键迁移到新哈希表
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
// 清空已迁移的桶
d->ht[0].table[d->rehashidx] = NULL;
// 移动到下一个桶
d->rehashidx++;
}
// 检查旧表是否已完全迁移
if (d->ht[0].used == 0) {
// 释放旧表内存
zfree(d->ht[0].table);
// 将新表设置为主表
d->ht[0] = d->ht[1];
// 重置新表
_dictReset(&d->ht[1]);
// 重置 rehash 索引
d->rehashidx = -1;
// 返回 0,表示 rehash 完成
return 0;
}
// 返回 1,表示还有更多元素需要 rehash
return 1;
}
在进行渐进式 rehash 的过程中,如果是查找,则先找 ht[0],如果通过标志位确认当前处于 rehash 过程,则再去 ht[1] 找;如果是添加,就直接把新的键值对添加到 ht[1] 中;如果是删除,就先查找 ht[0] 并删除,如果通过标志位确认当前处于 rehash 过程,则再去 ht[1] 查找并删除。
n 不等于 1 的情况通常发生在 Redis 认为有必要加快 rehash 进程的特定场景下,就是一些后台任务,这里不细说。下面是指定 rehash 时间的源码:
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
// 每次 100 个桶
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
面试官:讲一下 Redis 中,哈希数据结构的扩容流程
答:首先在元素较少的时候,使用的是 ziplist,后续继续添加新的元素后,如果达到配置hash-max-ziplist-entries和hash-max-ziplist-value就变成 hashTable,后续继续添加新元素,就采用渐进式 rehash 进行扩容,每次最多迁移一个桶,每次新的桶数量都是原来的 2 倍。
list(列表)
应用:
可以用作消息队列等
127.0.0.1:6379> lpush key value
(integer) 1
127.0.0.1:6379> lrange key 0 -1
1) "value"
内部实现:
在 Redis6.0 中,底层使用的是 quicklist,它结合了双向链表和 ziplist 的优点,提供空间效率和操作性能之间的最佳平衡
typedef struct quicklist {
quicklistNode *head; /* 指向第一个快速列表节点的指针 */
quicklistNode *tail; /* 指向最后一个快速列表节点的指针 */
unsigned long count; /* 所有 ziplist 中所有条目的总计数 */
unsigned long len; /* quicklistNodes 的数量 */
int fill : QL_FILL_BITS; /* 个别节点的填充因子 */
unsigned int compress : QL_COMP_BITS; /* 不压缩的末端节点的深度;0=关闭 */
// 这是在 Redis 6.2 中引入的功能,允许用户在 quicklist 中设置标记,以便快速跳转到列表中的特定位置。每个书签都指向一个 quicklist 节点,使得某些操作更加高效。
unsigned int bookmark_count: QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
typedef struct quicklistNode {
struct quicklistNode *prev; /* 指向前一个节点的指针 */
struct quicklistNode *next; /* 指向下一个节点的指针 */
unsigned char *zl; /* 指向压缩列表的指针 */
unsigned int sz; /* 压缩列表的字节大小 */
unsigned int count : 16; /* 压缩列表中的项数 */
unsigned int encoding : 2; /* 编码类型:RAW==1 或 LZF==2 */
unsigned int container : 2; /* 容器类型:NONE==1 或 ZIPLIST==2 */
unsigned int recompress : 1; /* 此节点是否之前被压缩过 */
unsigned int attempted_compress : 1; /* 节点是否尝试过压缩但因太小而失败 */
unsigned int extra : 10; /* 为将来的使用预留的额外位 */
} quicklistNode;
- 双向链表:quicklist 的主体是一个双向链表,每个节点都可以通过 head 和 tail 指针快速访问,支持从两端进行高效操作。
- 压缩列表(ziplist):链表的每个节点都包含一个压缩列表,这是一个紧凑的数组,用于存储实际的列表元素。这种结构使得 quicklist 在存储小列表时非常高效,同时通过分割大列表来避免单个压缩列表变得过大。
在向 quicklist 添加一个元素的时候,不会像普通的链表那样,直接新建一个链表节点。而是会检查插入位置的压缩列表是否能容纳该元素,如果能容纳就直接保存到 quicklistNode 结构里的压缩列表,如果不能容纳,才会新建一个新的 quicklistNode 结构。
下面是向 quicklist 头部插入元素的流程:
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
// 保存原始头节点的指针,以便之后判断头节点是否发生变化
quicklistNode *orig_head = quicklist->head;
// 确保插入值的大小小于 UINT32_MAX,这是因为 quicklist 设计上的限制
assert(sz < UINT32_MAX);
// 检查当前的头节点是否有足够的空间插入新的值
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
// 如果头节点有空间,直接在头节点的 ziplist 中插入新值
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
// 更新头节点的 ziplist 大小
quicklistNodeUpdateSz(quicklist->head);
} else {
// 如果头节点没有足够空间,创建一个新的 quicklist 节点
quicklistNode *node = quicklistCreateNode();
// 在新节点的 ziplist 中插入新值,并设置为头节点
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
// 更新新节点的 ziplist 大小
quicklistNodeUpdateSz(node);
// 将新节点插入到 quicklist 的头部
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
// 增加 quicklist 的总元素计数
quicklist->count++;
// 增加头节点的元素计数
quicklist->head->count++;
// 返回一个布尔值,指示头节点是否因为插入操作而改变
return (orig_head != quicklist->head);
}
set(集合)
应用:
127.0.0.1:6379> sadd test a
(integer) 1
127.0.0.1:6379> sadd test b
(integer) 1
127.0.0.1:6379> smembers test
1) "a"
2) "b"
内部实现:
底层就是直接使用哈希,没什么好说的。
zset(有序集合)
redis的有序集合(zset)主要是根据分数(score)来进行排序的,这是它的核心特性之一。每个元素在有序集合中都有一个与之关联的分数,这个分数用于确定元素在集合中的排序位置。当有序集合进行操作如插入、删除或查找元素时,redis会首先根据分数进行排序。如果多个元素有相同的分数,那么这些元素会根据它们的元素值(通常是字符串)在字典序中的排列进行二级排序。
应用:
127.0.0.1:6379> ZADD test 1 a
(integer) 1
127.0.0.1:6379> ZADD test 2 b
(integer) 1
127.0.0.1:6379> ZRANGE test 0 10 WITHSCORES
1) "a"
2) "1"
3) "b"
4) "2"
内部实现:
zset 类型(有序集合类型)相比于 set 类型多了一个排序属性 score(分值),对于有序集合 zSet 来说,每个存储元素相当于有两个值组成的,一个是有序集合的元素值,一个是排序值。
有序集合保留了集合不能有重复成员的特性(分值可以重复),但不同的是,有序集合中的元素可以排序。
Zset 类型的底层数据结构是由 ziplist 或 skiplist 实现的:
- 如果有序集合的元素个数小于 xxx 个,并且每个元素的值小于 xxx 字节时,redis 会使用 ziplist 作为 zset 类型的底层数据结构,这个 xxx 可以配置。
- 如果有序集合的元素不满足上面的条件,redis 会使用 skiplist + 哈希作为 zset 类型的底层数据结构。在哈希中,key 是元素,值的对应的分数。在 skiplist 中,将元素和分数放到 zskiplistNode 里,元素在 skiplist 里按照分数进行排序。(哈希的目的是根据元素查找分数,skiplist 的目的是根据分数查找元素,两个是相反的操作)
底层使用 skiplist + 哈希 作为 zset 底层:
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
面试官:当 zset 底层使用 ziplist 时,是怎么插入和查询的?
答:
插入:
/* 当编码方式是 ziplist 时。 */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
/* 查找元素是否已存在于有序集合中,如果存在,获取其当前分数。 */
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
... // 省略其它
/* 如果新分数与当前分数不同,则需要从压缩列表中删除原元素,并以新分数重新插入。 */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr); // 删除原元素。
zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 以新分数重新插入元素。
*flags |= ZADD_UPDATED; // 设置标志表示元素分数已更新。
}
return 1;
} else if (!xx) {
/* 如果设置了XX标志且元素不存在,则不进行任何操作。XX选项表示仅在元素已存在时更新。 */
/* 在执行zzlInsert之前,检查元素大小或压缩列表长度是否超出限制。 */
if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value ||
!ziplistSafeToAdd(zobj->ptr, sdslen(ele)))
{
/* 如果超出限制,则将 ziplist 转换为 skiplist。 */
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
} else {
/* 否则,将新元素插入压缩列表,并设置相应的标志。 */
zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 插入新元素。
if (newscore) *newscore = score; // 如果提供了newscore指针,更新其值为新分数。
*flags |= ZADD_ADDED; // 设置标志表示有新元素被添加。
return 1;
}
} else {
*flags |= ZADD_NOP; // 如果元素不存在且设置了XX标志,设置标志表示本次操作没有改变有序集合。
return 1;
}
}
查询:
可以看到,当 zset 底层用 ziplist 作为数据结构时,是这样存储是:[元素1,分数1,元素2,分数2......]
unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
// 从ziplist的头部开始遍历。
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
while (eptr != NULL) { // 当指针非空,即未遍历到ziplist的末尾时,继续遍历。
sptr = ziplistNext(zl,eptr); // 获取当前元素的分数指针。
serverAssert(sptr != NULL); // 断言分数指针非空,确保数据完整性。
// 比较当前遍历到的元素是否与查找的元素相匹配。
if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
// 如果找到匹配的元素,并且调用者请求返回分数,通过zzlGetScore函数获取分数。
if (score != NULL) *score = zzlGetScore(sptr);
return eptr; // 返回找到的元素的指针。
}
// 移动到下一个元素,即跳过当前的分数指针,指向下一个元素的指针。
eptr = ziplistNext(zl,sptr);
}
// 如果遍历完整个ziplist都没有找到元素,则返回NULL。
return NULL;
}
面试官:当 zset 底层使用 skiplist 时,是怎么插入和查询的?
答:
插入:
// 检查对象的编码是否为跳跃表
if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
// 获取有序集合的指针
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
// 在字典中查找元素
de = dictFind(zs->dict,ele);
if (de != NULL) {
...... // 省略其它代码
// 获取当前元素的分数
curscore = *(double*)dictGetVal(de);
// 如果分数有变化,需要在跳跃表中更新元素的分数
if (score != curscore) {
znode = zslUpdateScore(zs->zsl,curscore,ele,score); // 更新跳跃表中元素的分数
// 更新字典中分数的引用
dictGetVal(de) = &znode->score;
*flags |= ZADD_UPDATED; // 设置标志以指示分数被更新
}
return 1;
} else if (!xx) {
// 如果元素不存在且没有设置XX选项,则添加新元素
ele = sdsdup(ele); // 复制元素的SDS字符串
znode = zslInsert(zs->zsl,score,ele); // 在跳跃表中插入新元素
// 确保在字典中也添加了元素,并关联到跳跃表节点的分数
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*flags |= ZADD_ADDED; // 设置标志以指示添加了新元素
if (newscore) *newscore = score; // 如果需要,更新外部提供的新分数变量
return 1;
} else {
// 如果元素不存在且设置了XX选项,则不进行任何操作
*flags |= ZADD_NOP;
return 1;
}
}
// 定义插入操作的函数,将一个新元素及其分数插入到跳跃表中
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
// 定义更新数组和排名数组,以及其他变量
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
// 断言分数不是NaN,确保分数有效
serverAssert(!isnan(score));
// 从头节点开始遍历跳跃表
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 存储到达插入位置时经过的节点数
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 遍历跳跃表,找到插入点
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
// 随机生成新节点的层数
level = zslRandomLevel();
// 如果新节点的层数高于当前跳跃表的最高层数,则进行调整
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 创建新节点
x = zslCreateNode(level,score,ele);
// 插入新节点到跳跃表,并更新相关指针和跨度
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
// 更新跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 对未触及的层增加跨度
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 设置新节点的后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
// 更新新节点前向节点的后退指针或者跳跃表的尾指针
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
// 增加跳跃表的长度
zsl->length++;
// 返回新插入的节点
return x;
}
节点层数的确定:
int zslRandomLevel(void) {
int level = 1; // 起始层为1,因为每个节点至少要出现在第一层
// 使用随机数决定节点的层数,以ZSKIPLIST_P的概率增加层
// ZSKIPLIST_P是一个概率值,用于控制节点平均占据的层数
// 通常设置为1/4或者类似的值,意味着每增加一层的概率是25%
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1; // 如果随机条件满足,则层数加1
// 确保层数不超过ZSKIPLIST_MAXLEVEL,这是跳跃表允许的最大层数
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
平均来说,每个节点的层数是 1.33 层,相对二叉平衡搜索树,每个节点都要包含两个其它节点的引用,在这方面 skiplist 做到了内存优化。
查询:
一层一层去查找,时间复杂度是 log(N),N 是元素个数。
总结
redis 为了节省内存,做了很多优化,例如 zset、hash,在元素少和小的情况,先使用 ziplist,后期元素增多就会升级成其它数据结构,深入源代码细节,还有其它优化,例如 sds 还可以用不同的编码来节省内存。