Redis系列(十一):数据结构Set源码解析和SADD、SINTER、SDIFF
1.介绍Hash是以K->V形式存储,而Set则是K存储,空间节省了很多 Redis中Set是String类型的无序集合;集合成员是唯一的。 这就意味着集合中不能出现重复的数据。可根据应用场景需要选用该数据类型。(比如:好友/关注/粉丝/感兴趣的人/黑白名单) 2.源码解析Redis使用Dict和IntSet保存Set数据 // 1. inset 数据结构,在set数据量小且都是整型数据时使用 typedef struct intset { 编码范围,由具体存储值决定 uint32_t encoding; 数组长度 uint32_t length; 具体存储元素的容器 int8_t contents[]; } intset; ? ? 2. dict 相关数据结构,即是 hash 的实现相关的数据结构 /* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing,for the old to the new table. */ typedef dictht { dictEntry **table; unsigned long size; unsigned sizemask; unsigned used; } dictht; typedef dict { dictType *type; void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 unsigned long iterators; number of iterators currently running } dict; If safe is set to 1 this is a safe iterator,that means,you can call * dictAdd,dictFind,and other functions against the dictionary even while * iterating. Otherwise it is a non safe iterator,and only dictNext() * should be called while iterating. dictIterator { dict *d; index; int table,safe; dictEntry *entry,*nextEntry; unsafe iterator fingerprint for misuse detection. */ long fingerprint; } dictIterator; typedef dictEntry { key; union { val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; } dictEntry; typedef dictType { unsigned int (*hashFunction)(const key); void *(*keyDup)(void *privdata,void *(*valDup)(obj); int (*keyCompare)(void *key1,1)">key2); void (*keyDestructor)(void (*valDestructor)(obj); } dictType; 3.SADD加一个或多个指定的member元素到集合的 key中.指定的一个或者多个元素member 如果已经在集合key中存在则忽略. 如果集合key 不存在,则新建集合key,并添加member元素到集合key中. 如果key 的类型不是集合则返回错误. 时间复杂度:O(N) 127.0.0.1:6379> sadd myset "Hello" (integer) 1 0 6379> smembers myset 1) " 6379> ? 用法: SADD key member1 [member2] t_set.c,添加member void saddCommand(client *c) { robj *set; int j,added = 0 先从当前db中查找set实例 set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { 1. 新建set实例并添加到当前db中 set = setTypeCreate(c->argv[2]->ptr); dbAdd(c->db,1)">1],1)">); } else { set->type != OBJ_SET) { addReply(c,shared.wrongtypeerr); return; } } 对于n个member,一个个地添加即可 for (j = 2; j < c->argc; j++) { 2. 只有添加成功,added 才会加1 if (setTypeAdd(set,c->argv[j]->ptr)) added++; } 命令传播 if (added) { signalModifiedKey(c->db,1)">]); notifyKeyspaceEvent(NOTIFY_SET,sadd",c->db->id); } server.dirty += added; 响应添加成功的数量 addReplyLongLong(c,added); } 1. 创建新的set集合实例(需根据首次的参数类型判定) Factory method to return a set that *can* hold "value". When the object has * an integer-encodable value,an intset will be returned. Otherwise a regular * hash table. robj *setTypeCreate(sds value) { 如果传入的value是整型,则创建 intset 类型的set 否则使用dict类型的set 一般地,第一个数据为整型,后续数据也应该为整型,所以这个数据结构相对稳定 而hash的容器创建时,只使用了一 ziplist 创建,这是不一样的实现 if (isSdsRepresentableAsLongLong(value,NULL) == C_OK) createIntsetObject(); createSetObject(); } 1.1. 创建 intset 型的set object.c robj *createIntsetObject(void) { intset *is = intsetNew(); robj *o = createObject(OBJ_SET,1)">is); o->encoding = OBJ_ENCODING_INTSET; o; } intset.c,new一个空的intset对象 Create an empty intset. intset *intsetNew(is = zmalloc(sizeof(intset)); is->encoding = intrev32ifbe(INTSET_ENC_INT16); is->length = return ; } 1.2. 创建dict 型的set robj *createSetObject() { dict *d = dictCreate(&setDictType,NULL); robj *o = createObject(OBJ_SET,d); o->encoding = OBJ_ENCODING_HT; dict.c Create a new hash table dict *dictCreate(dictType *type,privDataPtr) { dict *d = zmalloc(sizeof(*d)); _dictInit(d,type,privDataPtr); d; } Initialize the hash table */ int _dictInit(dict *d,dictType *privDataPtr) { _dictReset(&d->ht[]); _dictReset(&d->ht[]); d->type = type; d->privdata = privDataPtr; d->rehashidx = -; d->iterators = DICT_OK; } 2. 添加member到set集合中 Add the specified value into a set. * * If the value was already member of the set,nothing is done and 0 is * returned,otherwise the new element is added and 1 is returned. int setTypeAdd(robj *subject,sds value) { llval; 2.1. HT编码和INTSET编码分别处理就好 if (subject->encoding == OBJ_ENCODING_HT) { dict *ht = subject->ptr; 以 value 为 key,添加实例到ht中 实现过程也很简单,大概就是如果存在则返回NULL(即无需添加),辅助rehash,分配内存创建dictEntry实例,稍后简单看看 dictEntry *de = dictAddRaw(ht,value); (de) { 重新设置key为 sdsdup(value),value为NULL dictSetKey(ht,de,sdsdup(value)); dictSetVal(ht,NULL); return ; } } 2.2. intset 编码的member添加 else OBJ_ENCODING_INTSET) { 尝试解析value为 long 型,值写入 llval 中 C_OK) { uint8_t success = ; 情况1. 可添加到intset中 subject->ptr = intsetAdd(subject->ptr,llval,&success); (success) { Convert to regular set when the intset contains * too many entries. */ 默认: 512,intset大于之后,则转换为ht hash表模式存储 if (intsetLen(subject->ptr) > server.set_max_intset_entries) 2.3. 转换intset编码为 ht 编码 setTypeConvert(subject,OBJ_ENCODING_HT); ; } } { 情况2. member 是字符串型,先将set容器转换为 ht 编码,再重新执行dict的添加模式 Failed to get integer from object,convert to regular set. setTypeConvert(subject,OBJ_ENCODING_HT); The set *was* an intset and this value is not integer * encodable,so dictAdd should always work. serverAssert(dictAdd(subject->ptr,sdsdup(value),1)"> DICT_OK); ; } } { serverPanic(Unknown set encoding); } ; } 2.1. 添加member到dict中(略解,在hash数据结构解析中已介绍) dict.c,添加某key到 d 字典中 Low level add. This function adds the entry but instead of setting * a value returns the dictEntry structure to the user,that will make * sure to fill the value field as he wishes. * * This function is also directly exposed to the user API to be called * mainly in order to store non-pointers inside the hash value,example: * * entry = dictAddRaw(dict,mykey); * if (entry != NULL) dictSetSignedIntegerVal(entry,1000); * * Return values: * * If key already exists NULL is returned. * If key was added,the hash entry is returned to be manipulated by the caller. dictEntry *dictAddRaw(dict *d,1)">key) { index; dictEntry *entry; dictht *ht; (dictIsRehashing(d)) _dictRehashStep(d); Get the index of the new element,or -1 if * the element already exists. 获取需要添加的key的存放位置下标(slot),如果该key已存在,则返回-1(无可用slot) if ((index = _dictKeyIndex(d,key)) == -) NULL; Allocate the memory and store the new entry. * Insert the element in top,with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[]; entry = zmalloc(entry)); entry->next = ht->table[index]; ht->table[index] = entry; ht->used++; Set the hash entry fields. dictSetKey(d,entry,key); entry; } 2.2. 添加整型数据到 intset中 Insert an integer in the intset intset *intsetAdd(intset *is,int64_t value,uint8_t *success) { 获取value的所属范围 uint8_t valenc = _intsetValueEncoding(value); uint32_t pos; if (success) *success = Upgrade encoding if necessary. If we need to upgrade,we know that * this value should be either appended (if > 0) or prepended (if < 0),* because it lies outside the range of existing values. 默认 is->encoding 为 INTSET_ENC_INT16 (16位长) 2.2.1. 即超过当前预设的位长,则需要增大预设,然后添加 此时的value可以确定: 要么是最大,要么是最小 (所以我们可以推断,此intset应该是有序的) if (valenc > intrev32ifbe(is->encoding)) { This always succeeds,so we don't need to curry *success. */ return intsetUpgradeAndAdd(,value); } Abort if the value is already present in the set. * This call will populate "pos" with the right position to insert * the value when it cannot be found. 2.2.2. 在当前环境下添加value 找到value则说明元素已存在,不可再添加 pos 保存比value小的第1个元素的位置 if (intsetSearch(pos)) { ; } is = intsetResize(is->length)+); 在pos不是末尾位置时,需要留出空位,依次移动后面的元素 if (pos < intrev32ifbe(is->length)) intsetMoveTail( 针对编码位不变更的情况下设置pos位置的值 _intsetSet(is->length = intrev32ifbe(intrev32ifbe(); 判断 value 的位长 INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64 2 < 4 < 8 Return the required encoding for the provided value. static uint8_t _intsetValueEncoding(int64_t v) { if (v < INT32_MIN || v > INT32_MAX) INTSET_ENC_INT64; if (v < INT16_MIN || v > INT16_MAX) INTSET_ENC_INT32; else INTSET_ENC_INT16; } 2.2.1. 升级预设位长,并添加value intset.c Upgrades the intset to a larger encoding and inserts the given integer. static intset *intsetUpgradeAndAdd(intset *encoding); uint8_t newenc = _intsetValueEncoding(value); int length = intrev32ifbe(length); int prepend = value < 0 ? 1 : First set new encoding and resize intrev32ifbe(newenc); 每次必进行扩容 ); Upgrade back-to-front so we don't overwrite values. * Note that the "prepend" variable is used to make sure we have an empty * space at either the beginning or the end of the intset. 因编码发生变化,元素的位置已经不能一一对应,需要按照原来的编码依次转移过来 从后往前依次赋值,所以,内存位置上不存在覆盖问题(后面内存位置一定是空的),直接依次赋值即可(高效复制) while(length--) _intsetSet( Set the value at the beginning or the end. 对新增加的元素,负数添加到第0位,否则添加到最后一个元素后一位 (prepend) _intsetSet( _intsetSet(length),1)"> Resize the intset static intset *intsetResize(intset *encoding); malloc is = zrealloc(sizeof(intset)+size); Return the value at pos,given an encoding. static int64_t _intsetGetEncoded(intset * pos,uint8_t enc) { int64_t v64; int32_t v32; int16_t v16; if (enc == INTSET_ENC_INT64) { memcpy(&v64,((int64_t*)is->contents)+pos,1)">(v64)); memrev64ifbe(&v64); v64; } INTSET_ENC_INT32) { memcpy(&v32,((int32_t*)(v32)); memrev32ifbe(&v32); v32; } { memcpy(&v16,((int16_t*)(v16)); memrev16ifbe(&v16); v16; } } 只是这里数据类型是不确定的,所以使用指针进行赋值 Set the value at pos,using the configured encoding. static void _intsetSet(intset *if (encoding == INTSET_ENC_INT64) { ((int64_t*)is->contents)[pos] = value; memrev64ifbe(((int64_t*)is->contents)+pos); } INTSET_ENC_INT32) { ((int32_t*) value; memrev32ifbe(((int32_t*) { ((int16_t*) value; memrev16ifbe(((int16_t*)pos); } } 2.2.2. 在编码类型未变更的情况,需要查找可以存放value的位置(为了确认该value是否已存在,以及小于value的第一个位置赋值) Search for the position of "value". Return 1 when the value was found and * sets "pos" to the position of the value within the intset. Return 0 when * the value is not present in the intset and sets "pos" to the position * where "value" can be inserted. static uint8_t intsetSearch(intset *pos) { int min = 0,max = intrev32ifbe(is->length)-1,mid = -; int64_t cur = - The value can never be found when the set is empty if (intrev32ifbe(is->length) == if (pos) *pos = ; ; } Check for the case where we know we cannot find the value,* but do know the insert position. 因 intset 是有序数组,即可以判定是否超出范围,如果超出则元素必定不存在 if (value > _intsetGet()) { if (pos) *pos = intrev32ifbe(length); ; } if (value < _intsetGet( 使用二分查找 while(max >= min) { mid = ((unsigned int)min + (unsigned int)max) >> ; cur = _intsetGet(if (value > cur) { min = mid+if (value < cur) { max = mid- 找到了 break; } } if (value == cur) { if (pos) *pos = mid; 在没有找到的情况下,min就是第一个比 value 小的元素 min; ; } } intset移动(内存移动) void intsetMoveTail(intset *fromvoid *src,1)">dst; uint32_t bytes = intrev32ifbe(is->length)-; uint32_t encoding = intrev32ifbe(encoding); INTSET_ENC_INT64) { src = (int64_t*)is->contents+; dst = (int64_t*)to; bytes *= (int64_t); } INTSET_ENC_INT32) { src = (int32_t*); dst = (int32_t*)(int32_t); } { src = (int16_t*); dst = (int16_t*)(int16_t); } memmove(dst,src,bytes); } 2.3. 转换intset编码为 ht 编码 (如果遇到string型的value或者intset数量大于阀值(默认:512)时) Convert the set to specified encoding. The resulting dict (when converting * to a hash table) is presized to hold the number of elements in the original * set. void setTypeConvert(robj *setobj,1)"> enc) { setTypeIterator *si; 要求外部必须保证 set类型且 intset 编码 serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET && setobj->encoding == OBJ_ENCODING_INTSET); OBJ_ENCODING_HT) { int64_t intele; 直接创建一个 dict 来容纳数据 dict *d = dictCreate(& Presize the dict to avoid rehashing 直接一次性扩容成需要的大小 dictExpand(d,intsetLen(setobj->ptr)); To add the elements we extract integers and create redis objects setTypeIterator 迭代器是转换的关键 si = setTypeInitIterator(setobj); while (setTypeNext(si,&element,&intele) != -) { element:ht编码时的key,intele: intset编码时的value element = sdsfromlonglong(intele); 因set特性保证是无重复元素,所以添加dict时,必然应成功 此处应无 rehash,而是直接计算 hashCode,放置元素,时间复杂度 O(1) serverAssert(dictAdd(d,element,1)"> DICT_OK); } 释放迭代器 setTypeReleaseIterator(si); setobj->encoding = OBJ_ENCODING_HT; zfree(setobj->ptr); setobj->ptr = d; } Unsupported set conversion); } } subject) { setTypeIterator *si = zmalloc((setTypeIterator)); 设置迭代器公用信息 si->subject = subject; si->encoding = subject->encoding; hash表则需要再迭代 dict if (si->encoding == OBJ_ENCODING_HT) { si->di = dictGetIterator(subject->ptr); } intset 比较简单,直接设置下标即可 OBJ_ENCODING_INTSET) { si->ii = si; } d) { dictIterator *iter = zmalloc(iter)); iter->d = d; iter->table = ; iter->index = -; iter->safe = ; iter->entry = NULL; iter->nextEntry = NULL; iter; } Move to the next entry in the set. Returns the object at the current * position. * * Since set elements can be internally be stored as SDS strings or * simple arrays of integers,setTypeNext returns the encoding of the * set object you are iterating,and will populate the appropriate pointer * (sdsele) or (llele) accordingly. * * Note that both the sdsele and llele pointers should be passed and cannot * be NULL since the function will try to defensively populate the non * used field with values which are easy to trap if misused. * * When there are no longer elements -1 is returned. int setTypeNext(setTypeIterator *si,sds *sdsele,int64_t *llele) { hash表返回key OBJ_ENCODING_HT) { dictEntry *de = dictNext(si->di); if (de == NULL) return -; *sdsele = dictGetKey(de); *llele = -123456789; Not needed. Defensive. } intset 直接获取下标对应的元素即可 if (!intsetGet(si->subject->ptr,si->ii++; *sdsele = NULL; } Wrong set encoding in setTypeNextreturn si->encoding; } case1: intset直接叠加下标即可 Sets the value to the value at the given position. When this position is * out of range the function returns 0,when in range it returns 1. uint8_t intsetGet(intset *value) { length)) { *value = _intsetGet(static int64_t _intsetGet(intset * pos) { return _intsetGetEncoded(encoding)); } (附带)case2: dict的迭代 iter) { 一直迭代查找 while ( iter->entry 为NULL,有两种可能: 1. 初始化时; 2. 上一元素为迭代完成(hash冲突) if (iter->entry == NULL) { dictht *ht = &iter->d->ht[iter->table]; if (iter->index == -1 && iter->table == ) { if (iter->safe) iter->d->iterators++; iter->fingerprint = dictFingerprint(iter->d); } 直接使用下标进行迭代,如果中间有空闲位置该如何处理?? 看起来redis是使用了全量迭代元素的处理办法,即有可能有许多空迭代过程 一般地,也是进行两层迭代,jdk的hashmap迭代实现为直接找到下一次非空的元素为止 iter->index++ 直到迭代完成所有元素,否则会直到找到一个元素为止 if (iter->index >= (long) ht->size) { if (dictIsRehashing(iter->d) && iter->table == ) { iter->table++; iter->index = ; ht = &iter->d->ht[]; } { ; } } iter->entry = ht->table[iter->index]; } entry不为空,就一定有nextEntry?? iter->entry = iter->nextEntry; } 如果当前entry为空,则继续迭代下一个 index entry) { We need to save the 'next' here,the iterator user * may delete the entry we are returning. iter->nextEntry = iter->entry->next; return iter->entry; } } NULL; } 4.SISMEMBER返回成员 member 是否是存储的集合 key的成员. 如果member元素是集合key的成员,则返回1 如果member元素不是key的成员,或者集合key不存在,则返回0 时间复杂度:O(1) 6379> sismember myset World6379> 用法: SISMEMBER key member void sismemberCommand(client *if ((set = lookupKeyReadOrReply(c,shared.czero)) == NULL || checkType(c,OBJ_SET)) 主要方法 setTypeIsMember if (setTypeIsMember(ptr)) 回复1 addReply(c,shared.cone); 回复0t_set.c int setTypeIsMember(robj * OBJ_ENCODING_HT) { hash 表的查找方式,hashCode 计算,链表查找,就这么简单 return dictFind((dict*)subject->ptr,value) != NULL; } 如果当前的set集合是 intset 编码的,则只有查找值也是整型的情况下才可能查找到元素 C_OK) { intset 查找,而且 intset 是有序的,所以直接使用二分查找即可 return intsetFind((intset*)subject->ptr,llval); } } ; } ? Determine whether a value belongs to this set uint8_t intsetFind(intset * 最大范围检查,加二分查找 intsetSearch 前面已介绍 return valenc <= intrev32ifbe(is->encoding) && intsetSearch(sinter: 源码解析 用法: SPOP key [count] void spopCommand(client *aux; sds sdsele; int64_t llele; if (c->argc == 3 弹出指定数量的元素,略 spopWithCountCommand(c); if (c->argc > ) { addReply(c,shared.syntaxerr); ; } Make sure a key with the name inputted exists,and that it's type is * indeed a set set = lookupKeyWriteOrReply(c,shared.nullbulk)) == NULL || Get a random element from the set 1. 随机获取一个元素,这是 spop 的定义 encoding = setTypeRandomElement(llele); Remove the element from the set 2. 删除元素 OBJ_ENCODING_INTSET) { ele = createStringObjectFromLongLong(llele); set->ptr = intsetRemove(set-> { ele = createStringObject(sdsele,sdslen(sdsele)); setTypeRemove(ptr); } notifyKeyspaceEvent(NOTIFY_SET,1)">spopid); Replicate/AOF this command as an SREM operation aux = createStringObject(SREM4); rewriteClientCommandVector(c,1)">3,aux,ele); decrRefCount(aux); Add the element to the reply addReplyBulk(c,ele); decrRefCount(ele); Delete the set if it's empty if (setTypeSize(set) == ) { dbDelete(c->db,1)">]); notifyKeyspaceEvent(NOTIFY_GENERIC,1)">id); } Set has been modified signalModifiedKey(c->db,1)">]); server.dirty++ 没啥好说的,就看下是如何随机的就好了 Return random element from a non empty set. * The returned element can be a int64_t value if the set is encoded * as an "intset" blob of integers,or an SDS string if the set * is a regular set. * * The caller provides both pointers to be populated with the right * object. The return value of the function is the object->encoding * field of the object and is used by the caller to check if the * int64_t pointer or the redis object pointer was populated. * * Note that both the sdsele and llele pointers should be passed and cannot * be NULL since the function will try to defensively populate the non * used field with values which are easy to trap if misused. int setTypeRandomElement(robj *setobj,1)">if (setobj->encoding == 1.1. dict 型的随机 dictEntry *de = dictGetRandomKey(setobj->ptr); *sdsele = 1.2. intset 型的随机 *llele = intsetRandom(setobj->ptr); *sdsele = NULL; return setobj-> 1.1. dict 型的随机 Return a random entry from the hash table. Useful to * implement randomized algorithms dictEntry *dictGetRandomKey(dict *d) { dictEntry *he,1)">orighe; unsigned h; listlen,listele; if (dictSize(d) == (dictIsRehashing(d)) _dictRehashStep(d); 基本原理就是一直接随机获取下标,直到有值 (dictIsRehashing(d)) { do We are sure there are no elements in indexes from 0 * to rehashidx-1 获取随机下标,须保证在 两个hash表的范围内 h = d->rehashidx + (random() % (d->ht[0].size + d->ht[1].size - d->rehashidx)); he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[].size] : d->ht[].table[h]; } while(he == NULL); } { h = random() & d->ht[].sizemask; he = d->ht[ NULL); } Now we found a non empty bucket,but it is a linked * list and we need to get a random element from the list. * The only sane way to do so is counting the elements and * select a random index. listlen = ; orighe = he; 对于hash冲突情况,再随机一次 while(he) { he = he->next; listlen++; } listele = random() % listlen; he = orighe; while(listele--) he = he->next; he; } 1.2. intset 型的随机 Return random member int64_t intsetRandom(intset *) { 这个随机就简单了,直接获取随机下标,因为intset可以保证自身元素的完整性 return _intsetGet(length)); } ? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |