加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Redis系列(十一):数据结构Set源码解析和SADD、SINTER、SDIFF

发布时间:2020-12-16 04:37:57 所属栏目:安全 来源:网络整理
导读:1.介绍 Hash是以K-V形式存储,而Set则是K存储,空间节省了很多 Redis中Set是String类型的无序集合;集合成员是唯一的。 这就意味着集合中不能出现重复的数据。可根据应用场景需要选用该数据类型。(比如:好友/关注/粉丝/感兴趣的人/黑白名单) 2.源码解析 Re

1.介绍

Hash是以K->V形式存储,而Set则是K存储,空间节省了很多

Redis中Set是String类型的无序集合;集合成员是唯一的。

这就意味着集合中不能出现重复的数据。可根据应用场景需要选用该数据类型。(比如:好友/关注/粉丝/感兴趣的人/黑白名单)

2.源码解析

Redis使用Dict和IntSet保存Set数据

// 1. inset 数据结构,在set数据量小且都是整型数据时使用
typedef struct intset {
     编码范围,由具体存储值决定
    uint32_t encoding;
     数组长度
    uint32_t length;
     具体存储元素的容器
    int8_t contents[];
} intset;

?

?

 2. dict 相关数据结构,即是 hash 的实现相关的数据结构
/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing,for the old to the new table. */
typedef  dictht {
    dictEntry **table;
    unsigned long size;
    unsigned  sizemask;
    unsigned  used;
} dictht;

typedef  dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 
    unsigned long iterators;  number of iterators currently running 
} dict;

 If safe is set to 1 this is a safe iterator,that means,you can call
 * dictAdd,dictFind,and other functions against the dictionary even while
 * iterating. Otherwise it is a non safe iterator,and only dictNext()
 * should be called while iterating.  dictIterator {
    dict *d;
     index;
    int table,safe;
    dictEntry *entry,*nextEntry;
     unsafe iterator fingerprint for misuse detection. */
    long  fingerprint;
} dictIterator;

typedef  dictEntry {
    key;
    union {
        val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

typedef  dictType {
    unsigned int (*hashFunction)(const key);
    void *(*keyDup)(void *privdata,void *(*valDup)(obj);
    int (*keyCompare)(void *key1,1)">key2);
    void (*keyDestructor)(void (*valDestructor)(obj);
} dictType;

3.SADD

加一个或多个指定的member元素到集合的 key中.指定的一个或者多个元素member 如果已经在集合key中存在则忽略.

如果集合key 不存在,则新建集合key,并添加member元素到集合key中.

如果key 的类型不是集合则返回错误.

时间复杂度:O(N)

127.0.0.1:6379> sadd myset "Hello"
(integer) 1
0
6379> smembers myset
1) "
6379>

?

 用法: SADD key member1 [member2]
 t_set.c,添加member
void saddCommand(client *c) {
    robj *set;
    int j,added = 0 先从当前db中查找set实例
    set = lookupKeyWrite(c->db,c->argv[1]);
    if (set == NULL) {
         1. 新建set实例并添加到当前db中
        set = setTypeCreate(c->argv[2]->ptr);
        dbAdd(c->db,1)">1],1)">);
    } else {
        set->type != OBJ_SET) {
            addReply(c,shared.wrongtypeerr);
            return;
        }
    }
     对于n个member,一个个地添加即可
    for (j = 2; j < c->argc; j++) {
         2. 只有添加成功,added 才会加1
        if (setTypeAdd(set,c->argv[j]->ptr)) added++;
    }
     命令传播
    if (added) {
        signalModifiedKey(c->db,1)">]);
        notifyKeyspaceEvent(NOTIFY_SET,sadd",c->db->id);
    }
    server.dirty += added;
     响应添加成功的数量
    addReplyLongLong(c,added);
}

 1. 创建新的set集合实例(需根据首次的参数类型判定)
 Factory method to return a set that *can* hold "value". When the object has
 * an integer-encodable value,an intset will be returned. Otherwise a regular
 * hash table. 
robj *setTypeCreate(sds value) {
     如果传入的value是整型,则创建 intset 类型的set
     否则使用dict类型的set
     一般地,第一个数据为整型,后续数据也应该为整型,所以这个数据结构相对稳定
     而hash的容器创建时,只使用了一 ziplist 创建,这是不一样的实现
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
         createIntsetObject();
     createSetObject();
}

 1.1. 创建 intset 型的set
 object.c 
robj *createIntsetObject(void) {
    intset *is = intsetNew();
    robj *o = createObject(OBJ_SET,1)">is);
    o->encoding = OBJ_ENCODING_INTSET;
     o;
}
 intset.c,new一个空的intset对象 Create an empty intset. 
intset *intsetNew(is = zmalloc(sizeof(intset));
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    is->length = return ;
}

 1.2. 创建dict 型的set
robj *createSetObject() {
    dict *d = dictCreate(&setDictType,NULL);
    robj *o = createObject(OBJ_SET,d);
    o->encoding = OBJ_ENCODING_HT;
     dict.c Create a new hash table 
dict *dictCreate(dictType *type,privDataPtr)
{
    dict *d = zmalloc(sizeof(*d));

    _dictInit(d,type,privDataPtr);
     d;
}
 Initialize the hash table */
int _dictInit(dict *d,dictType *privDataPtr)
{
    _dictReset(&d->ht[]);
    _dictReset(&d->ht[]);
    d->type = type;
    d->privdata = privDataPtr;
    d->rehashidx = -;
    d->iterators =  DICT_OK;
}

 2. 添加member到set集合中
 Add the specified value into a set.
 *
 * If the value was already member of the set,nothing is done and 0 is
 * returned,otherwise the new element is added and 1 is returned. int setTypeAdd(robj *subject,sds value) {
     llval;
     2.1. HT编码和INTSET编码分别处理就好
    if (subject->encoding == OBJ_ENCODING_HT) {
        dict *ht = subject->ptr;
         以 value 为 key,添加实例到ht中
         实现过程也很简单,大概就是如果存在则返回NULL(即无需添加),辅助rehash,分配内存创建dictEntry实例,稍后简单看看
        dictEntry *de = dictAddRaw(ht,value);
         (de) {
             重新设置key为 sdsdup(value),value为NULL
            dictSetKey(ht,de,sdsdup(value));
            dictSetVal(ht,NULL);
            return ;
        }
    } 
     2.2. intset 编码的member添加
    else  OBJ_ENCODING_INTSET) {
         尝试解析value为 long 型,值写入 llval 中
         C_OK) {
            uint8_t success = ;
             情况1. 可添加到intset中
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
             (success) {
                 Convert to regular set when the intset contains
                 * too many entries. */
                 默认: 512,intset大于之后,则转换为ht hash表模式存储 
                if (intsetLen(subject->ptr) > server.set_max_intset_entries)
                     2.3. 转换intset编码为 ht 编码
                    setTypeConvert(subject,OBJ_ENCODING_HT);
                ;
            }
        }  {
             情况2. member 是字符串型,先将set容器转换为 ht 编码,再重新执行dict的添加模式
             Failed to get integer from object,convert to regular set. 
            setTypeConvert(subject,OBJ_ENCODING_HT);

             The set *was* an intset and this value is not integer
             * encodable,so dictAdd should always work. 
            serverAssert(dictAdd(subject->ptr,sdsdup(value),1)"> DICT_OK);
            ;
        }
    }  {
        serverPanic(Unknown set encoding);
    }
    ;
}
 2.1. 添加member到dict中(略解,在hash数据结构解析中已介绍)
 dict.c,添加某key到 d 字典中 Low level add. This function adds the entry but instead of setting
 * a value returns the dictEntry structure to the user,that will make
 * sure to fill the value field as he wishes.
 *
 * This function is also directly exposed to the user API to be called
 * mainly in order to store non-pointers inside the hash value,example:
 *
 * entry = dictAddRaw(dict,mykey);
 * if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
 *
 * Return values:
 *
 * If key already exists NULL is returned.
 * If key was added,the hash entry is returned to be manipulated by the caller.
 
dictEntry *dictAddRaw(dict *d,1)">key)
{
     index;
    dictEntry *entry;
    dictht *ht;

     (dictIsRehashing(d)) _dictRehashStep(d);

     Get the index of the new element,or -1 if
     * the element already exists.  获取需要添加的key的存放位置下标(slot),如果该key已存在,则返回-1(无可用slot)
    if ((index = _dictKeyIndex(d,key)) == -)
         NULL;

     Allocate the memory and store the new entry.
     * Insert the element in top,with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. 
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[];
    entry = zmalloc(entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

     Set the hash entry fields. 
    dictSetKey(d,entry,key);
     entry;
}

 2.2. 添加整型数据到 intset中
 Insert an integer in the intset 
intset *intsetAdd(intset *is,int64_t value,uint8_t *success) {
     获取value的所属范围
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success =  Upgrade encoding if necessary. If we need to upgrade,we know that
     * this value should be either appended (if > 0) or prepended (if < 0),* because it lies outside the range of existing values.  默认 is->encoding 为 INTSET_ENC_INT16 (16位长)
     2.2.1. 即超过当前预设的位长,则需要增大预设,然后添加
     此时的value可以确定: 要么是最大,要么是最小 (所以我们可以推断,此intset应该是有序的)
    if (valenc > intrev32ifbe(is->encoding)) {
         This always succeeds,so we don't need to curry *success. */
        return intsetUpgradeAndAdd(,value);
    }  Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found.  2.2.2. 在当前环境下添加value
         找到value则说明元素已存在,不可再添加
         pos 保存比value小的第1个元素的位置
        if (intsetSearch(pos)) {
            ;
        }

        is = intsetResize(is->length)+);
         在pos不是末尾位置时,需要留出空位,依次移动后面的元素
        if (pos < intrev32ifbe(is->length)) intsetMoveTail( 针对编码位不变更的情况下设置pos位置的值
    _intsetSet(is->length = intrev32ifbe(intrev32ifbe();
     判断 value 的位长
 INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64
 2 < 4 < 8 Return the required encoding for the provided value. static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
         INTSET_ENC_INT64;
    if (v < INT16_MIN || v > INT16_MAX)
         INTSET_ENC_INT32;
    else
         INTSET_ENC_INT16;
}

 2.2.1. 升级预设位长,并添加value
 intset.c Upgrades the intset to a larger encoding and inserts the given integer. static intset *intsetUpgradeAndAdd(intset *encoding);
    uint8_t newenc = _intsetValueEncoding(value);
    int length = intrev32ifbe(length);
    int prepend = value < 0 ? 1 :  First set new encoding and resize  intrev32ifbe(newenc);
     每次必进行扩容
    );

     Upgrade back-to-front so we don't overwrite values.
     * Note that the "prepend" variable is used to make sure we have an empty
     * space at either the beginning or the end of the intset.  因编码发生变化,元素的位置已经不能一一对应,需要按照原来的编码依次转移过来
     从后往前依次赋值,所以,内存位置上不存在覆盖问题(后面内存位置一定是空的),直接依次赋值即可(高效复制)
    while(length--)
        _intsetSet( Set the value at the beginning or the end.  对新增加的元素,负数添加到第0位,否则添加到最后一个元素后一位
     (prepend)
        _intsetSet(
        _intsetSet(length),1)"> Resize the intset static intset *intsetResize(intset *encoding);
     malloc
    is = zrealloc(sizeof(intset)+size);
     Return the value at pos,given an encoding. static int64_t _intsetGetEncoded(intset * pos,uint8_t enc) {
    int64_t v64;
    int32_t v32;
    int16_t v16;

    if (enc == INTSET_ENC_INT64) {
        memcpy(&v64,((int64_t*)is->contents)+pos,1)">(v64));
        memrev64ifbe(&v64);
         v64;
    }  INTSET_ENC_INT32) {
        memcpy(&v32,((int32_t*)(v32));
        memrev32ifbe(&v32);
         v32;
    }  {
        memcpy(&v16,((int16_t*)(v16));
        memrev16ifbe(&v16);
         v16;
    }
}
 只是这里数据类型是不确定的,所以使用指针进行赋值 Set the value at pos,using the configured encoding. static void _intsetSet(intset *if (encoding == INTSET_ENC_INT64) {
        ((int64_t*)is->contents)[pos] = value;
        memrev64ifbe(((int64_t*)is->contents)+pos);
    }  INTSET_ENC_INT32) {
        ((int32_t*) value;
        memrev32ifbe(((int32_t*) {
        ((int16_t*) value;
        memrev16ifbe(((int16_t*)pos);
    }
}

 2.2.2. 在编码类型未变更的情况,需要查找可以存放value的位置(为了确认该value是否已存在,以及小于value的第一个位置赋值) Search for the position of "value". Return 1 when the value was found and
 * sets "pos" to the position of the value within the intset. Return 0 when
 * the value is not present in the intset and sets "pos" to the position
 * where "value" can be inserted. static uint8_t intsetSearch(intset *pos) {
    int min = 0,max = intrev32ifbe(is->length)-1,mid = -;
    int64_t cur = - The value can never be found when the set is empty if (intrev32ifbe(is->length) == if (pos) *pos = ;
        ;
    }  Check for the case where we know we cannot find the value,* but do know the insert position.  因 intset 是有序数组,即可以判定是否超出范围,如果超出则元素必定不存在
        if (value > _intsetGet()) {
            if (pos) *pos = intrev32ifbe(length);
            ;
        } if (value < _intsetGet( 使用二分查找
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> ;
        cur = _intsetGet(if (value > cur) {
            min = mid+if (value < cur) {
            max = mid- 找到了
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
         在没有找到的情况下,min就是第一个比 value 小的元素
         min;
        ;
    }
}
 intset移动(内存移动)
void intsetMoveTail(intset *fromvoid *src,1)">dst;
    uint32_t bytes = intrev32ifbe(is->length)-;
    uint32_t encoding = intrev32ifbe(encoding);

     INTSET_ENC_INT64) {
        src = (int64_t*)is->contents+;
        dst = (int64_t*)to;
        bytes *= (int64_t);
    }  INTSET_ENC_INT32) {
        src = (int32_t*);
        dst = (int32_t*)(int32_t);
    }  {
        src = (int16_t*);
        dst = (int16_t*)(int16_t);
    }
    memmove(dst,src,bytes);
}

 2.3. 转换intset编码为 ht 编码 (如果遇到string型的value或者intset数量大于阀值(默认:512)时)
 Convert the set to specified encoding. The resulting dict (when converting
 * to a hash table) is presized to hold the number of elements in the original
 * set. void setTypeConvert(robj *setobj,1)"> enc) {
    setTypeIterator *si;
     要求外部必须保证 set类型且 intset 编码
    serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&
                             setobj->encoding == OBJ_ENCODING_INTSET);

     OBJ_ENCODING_HT) {
        int64_t intele;
         直接创建一个 dict 来容纳数据
        dict *d = dictCreate(& Presize the dict to avoid rehashing  直接一次性扩容成需要的大小
        dictExpand(d,intsetLen(setobj->ptr));

         To add the elements we extract integers and create redis objects  setTypeIterator 迭代器是转换的关键 
        si = setTypeInitIterator(setobj);
        while (setTypeNext(si,&element,&intele) != -) {
             element:ht编码时的key,intele: intset编码时的value
            element = sdsfromlonglong(intele);
             因set特性保证是无重复元素,所以添加dict时,必然应成功
             此处应无 rehash,而是直接计算 hashCode,放置元素,时间复杂度 O(1)
            serverAssert(dictAdd(d,element,1)"> DICT_OK);
        }
         释放迭代器
        setTypeReleaseIterator(si);

        setobj->encoding = OBJ_ENCODING_HT;
        zfree(setobj->ptr);
        setobj->ptr = d;
    } Unsupported set conversion);
    }
}
subject) {
    setTypeIterator *si = zmalloc((setTypeIterator));
     设置迭代器公用信息
    si->subject = subject;
    si->encoding = subject->encoding;
     hash表则需要再迭代 dict
    if (si->encoding == OBJ_ENCODING_HT) {
        si->di = dictGetIterator(subject->ptr);
    }
     intset 比较简单,直接设置下标即可
     OBJ_ENCODING_INTSET) {
        si->ii =  si;
}
d)
{
    dictIterator *iter = zmalloc(iter));

    iter->d = d;
    iter->table = ;
    iter->index = -;
    iter->safe = ;
    iter->entry = NULL;
    iter->nextEntry = NULL;
     iter;
}
 Move to the next entry in the set. Returns the object at the current
 * position.
 *
 * Since set elements can be internally be stored as SDS strings or
 * simple arrays of integers,setTypeNext returns the encoding of the
 * set object you are iterating,and will populate the appropriate pointer
 * (sdsele) or (llele) accordingly.
 *
 * Note that both the sdsele and llele pointers should be passed and cannot
 * be NULL since the function will try to defensively populate the non
 * used field with values which are easy to trap if misused.
 *
 * When there are no longer elements -1 is returned. int setTypeNext(setTypeIterator *si,sds *sdsele,int64_t *llele) {
     hash表返回key
     OBJ_ENCODING_HT) {
        dictEntry *de = dictNext(si->di);
        if (de == NULL) return -;
        *sdsele = dictGetKey(de);
        *llele = -123456789;  Not needed. Defensive. 
    }
     intset 直接获取下标对应的元素即可
    if (!intsetGet(si->subject->ptr,si->ii++;
        *sdsele = NULL; 
    } Wrong set encoding in setTypeNextreturn si->encoding;
}
 case1: intset直接叠加下标即可
 Sets the value to the value at the given position. When this position is
 * out of range the function returns 0,when in range it returns 1. 
uint8_t intsetGet(intset *value) {
    length)) {
        *value = _intsetGet(static int64_t _intsetGet(intset * pos) {
    return _intsetGetEncoded(encoding));
}
 (附带)case2: dict的迭代
iter)
{
     一直迭代查找
    while ( iter->entry 为NULL,有两种可能: 1. 初始化时; 2. 上一元素为迭代完成(hash冲突)
        if (iter->entry == NULL) {
            dictht *ht = &iter->d->ht[iter->table];
            if (iter->index == -1 && iter->table == ) {
                if (iter->safe)
                    iter->d->iterators++;
                
                    iter->fingerprint = dictFingerprint(iter->d);
            }
             直接使用下标进行迭代,如果中间有空闲位置该如何处理??
             看起来redis是使用了全量迭代元素的处理办法,即有可能有许多空迭代过程
             一般地,也是进行两层迭代,jdk的hashmap迭代实现为直接找到下一次非空的元素为止
            iter->index++ 直到迭代完成所有元素,否则会直到找到一个元素为止
            if (iter->index >= (long) ht->size) {
                if (dictIsRehashing(iter->d) && iter->table == ) {
                    iter->table++;
                    iter->index = ;
                    ht = &iter->d->ht[];
                }  {
                    ;
                }
            }
            iter->entry = ht->table[iter->index];
        }  entry不为空,就一定有nextEntry??
            iter->entry = iter->nextEntry;
        }
         如果当前entry为空,则继续迭代下一个 index
        entry) {
             We need to save the 'next' here,the iterator user
             * may delete the entry we are returning. 
            iter->nextEntry = iter->entry->next;
            return iter->entry;
        }
    }
     NULL;
}

4.SISMEMBER

返回成员 member 是否是存储的集合 key的成员.

如果member元素是集合key的成员,则返回1

如果member元素不是key的成员,或者集合key不存在,则返回0

时间复杂度:O(1)

6379> sismember myset World6379>
 用法: SISMEMBER key member 
void sismemberCommand(client *if ((set = lookupKeyReadOrReply(c,shared.czero)) == NULL ||
        checkType(c,OBJ_SET))  主要方法 setTypeIsMember
    if (setTypeIsMember(ptr))
         回复1
        addReply(c,shared.cone);
     回复0

 t_set.c
int setTypeIsMember(robj * OBJ_ENCODING_HT) {
         hash 表的查找方式,hashCode 计算,链表查找,就这么简单
        return dictFind((dict*)subject->ptr,value) != NULL;
    }  如果当前的set集合是 intset 编码的,则只有查找值也是整型的情况下才可能查找到元素
         C_OK) {
             intset 查找,而且 intset 是有序的,所以直接使用二分查找即可 
            return intsetFind((intset*)subject->ptr,llval);
        }
    } ;
}

?

 Determine whether a value belongs to this set 
uint8_t intsetFind(intset * 最大范围检查,加二分查找  
     intsetSearch 前面已介绍
    return valenc <= intrev32ifbe(is->encoding) && intsetSearch(sinter:

返回指定所有的集合的成员的交集,例如(共同好友)

sdiff:

返回一个集合与给定集合的差集的元素

sunion

返回给定的多个集合的并集中的所有成员

6379> sadd myset1 1 2 3 4 55
6379> sadd myset2 5 6 7 sinter myset1 myset2
32) 43) 5 sdiff myset1 myset2
12 sunion myset1 myset2
4) 5) 6) 67) 76379>

sinter源码解析

 用法: SINTER key1 [key2]
void sinterCommand(client *c) {
     第三个参数是用来存储 交集结果的,两段代码已做复用,说明存储过程还是比较简单的
    sinterGenericCommand(c,c->argv+void sinterGenericCommand(client *c,robj **setkeys,unsigned long setnum,robj *dstkey) {
    robj **sets = zmalloc(sizeof(robj*)*setnum);
    setTypeIterator *si;
    robj *dstset = NULL;
    sds elesds;
    int64_t intobj;
    void *replylen = NULL;
    unsigned long j,cardinality =  encoding;

    0; j < setnum; j++ 依次查找每个key的set实例
        robj *setobj = dstkey ?
            lookupKeyWrite(c->db,setkeys[j]) :
            lookupKeyRead(c-> 只要有一个set为空,则交集必定为为,无需再找
        if (!setobj) {
            zfree(sets);
             (dstkey) {
                 没有交集,直接将dstKey 删除,注意此逻辑??
                if (dbDelete(c->++;
                }
                addReply(c,shared.czero);
            }  {
                addReply(c,shared.emptymultibulk);
            }
            ;
        }
         (checkType(c,OBJ_SET)) {
            zfree(sets);
            ;
        }
        sets[j] = setobj;
    }
     Sort sets from the smallest to largest,this will improve our
     * algorithm's performance  快速排序算法,将 sets 按照元素长度做排序,使最少元素的set排在最前面
    qsort(sets,setnum,1)">sizeof(robj*),qsortCompareSetsByCardinality);

     The first thing we should output is the total number of elements...
     * since this is a multi-bulk write,but at this stage we don't know
     * the intersection set size,so we use a trick,append an empty object
     * to the output list and save the pointer to later modify it with the
     * right length dstkey) {
        replylen = addDeferredMultiBulkLength(c);
    }  If we have a target key where to store the resulting set
         * create this key with an empty set inside 
        dstset = createIntsetObject();
    }

     Iterate all the elements of the first (smallest) set,and test
     * the element against all the other sets,if at least one set does
     * not include the element it is discarded  看来redis也是直接通过迭代的方式来完成交集功能
     迭代最少的set集合,依次查找后续的set集合,当遇到一个不存在的set时,上值被排除,否则是交集
    si = setTypeInitIterator(sets[while((encoding = setTypeNext(si,&elesds,&intobj)) != -1; j < setnum; j++if (sets[j] == sets[0]) continue 以下是查找过程
             分 hash表查找 和 intset 编码查找
             OBJ_ENCODING_INTSET) {
                 intset with intset is simple... and fast  两个集合都是 intset 编码,直接二分查找即可
                if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
                    !intsetFind((intset*)sets[j]-> in order to compare an integer with an object we
                 * have to use the generic function,creating an object
                 * for this 
                } if (sets[j]->encoding == OBJ_ENCODING_HT) {
                     编码不一致,但元素可能相同
                     setTypeIsMember 复用前面的代码,直接查找即可
                    elesds = sdsfromlonglong(intobj);
                    setTypeIsMember(sets[j],elesds)) {
                        sdsfree(elesds);
                        ;
                    }
                    sdsfree(elesds);
                }
            }  OBJ_ENCODING_HT) {
                ;
                }
            }
        }

         Only take action when all sets contain the member  当迭代完所有集合,说明每个set中都存在该值,是交集(注意分析最后一个迭代)
        if (j == setnum) {
             不存储交集的情况下,直接响应元素值即可
            dstkey) {
                 OBJ_ENCODING_HT)
                    addReplyBulkCBuffer(c,elesds,sdslen(elesds));
                
                    addReplyBulkLongLong(c,intobj);
                cardinality++;
            } 
             要存储交集数据,将值存储到 dstset 中
             {
                 OBJ_ENCODING_INTSET) {
                    elesds = sdsfromlonglong(intobj);
                    setTypeAdd(dstset,elesds);
                    sdsfree(elesds);
                }  {
                    setTypeAdd(dstset,elesds);
                }
            }
        }
    }
    setTypeReleaseIterator(si);

     (dstkey) {
         Store the resulting set into the target,if the intersection
         * is not an empty set.  存储集合之前会先把原来的数据删除,如果进行多次交集运算,dstKey 就相当于临时表咯
        int deleted = dbDelete(c->if (setTypeSize(dstset) > ) {
            dbAdd(c->sinterstoreid);
        }  {
            decrRefCount(dstset);
            addReply(c,shared.czero);
             (deleted)
                notifyKeyspaceEvent(NOTIFY_GENERIC,1)">delid);
        }
        signalModifiedKey(c-> {
        setDeferredMultiBulkLength(c,replylen,cardinality);
    }
    zfree(sets);
}
 compare 方法
int qsortCompareSetsByCardinality(void *s1,1)">s2) {
    return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2);
}
 快排样例 sort.lua
-- extracted from Programming Pearls,page 110
function qsort(x,l,u,f)
 if l<u then
  local m=math.random(u-(l-1))+l-1    -- choose a random pivot in range l..u
  x[l],x[m]=x[m],x[l]            -- swap pivot to first position
  local t=x[l]                -- pivot value
  m=l
  local i=l+1
  while i<=u do
    -- invariant: x[l+1..m] < t <= x[m+1..i-]
     f(x[i],t) then
      m=m+
      x[m],x[i]=x[i],x[m]        -- swap x[i] and x[m]
    end
    i=i+
  end
  x[l],1)"> swap pivot to a valid place
  -- x[l+1..m-1] < x[m] <= x[m+..u]
  qsort(x,m-

sdiff和sunion源码解析

void sunionCommand(client *c) {
    sunionDiffGenericCommand(c,c->argv+void sunionstoreCommand(client *2,1)">],1)">void sdiffCommand(client *void sdiffstoreCommand(client *
 用法: SDIFFSTORE destination key1 [key2]
 看起来sdiff 与 sunion 共用了一段代码,为啥呢?
     想想 sql 中的 full join 
     c->argv[1] 是 dstKey
    sunionDiffGenericCommand(c,SET_OP_DIFF);
}
void sunionDiffGenericCommand(client *c,robj **setkeys,1)"> setnum,robj *dstkey,1)"> op) {
    robj **sets = zmalloc( NULL;
    sds ele;
    int diff_algo =  同样的套路,先查找各key的实例
     不同的是,这里的key允许不存在,但不允许类型不一致
    ) {
        robj *setobj = dstkey ?setobj) {
            sets[j] = NULL;
             setobj;
    }

     Select what DIFF algorithm to use.
     *
     * Algorithm 1 is O(N*M) where N is the size of the element first set
     * and M the total number of sets.
     *
     * Algorithm 2 is O(N) where N is the total number of elements in all
     * the sets.
     *
     * We compute what is the best bet with the current input here.  针对差集运算,做算法优化
    if (op == SET_OP_DIFF && sets[]) {
        long algo_one_work = ;

        if (sets[j] == NULL) ;

            algo_one_work += setTypeSize(sets[]);
            algo_two_work += setTypeSize(sets[j]);
        }

         Algorithm 1 has better constant times and performs less operations
         * if there are elements in common. Give it some advantage. 
        algo_one_work /= ;
        diff_algo = (algo_one_work <= algo_two_work) ? if (diff_algo == 1 && setnum >  With algorithm 1 it is better to order the sets to subtract
             * by decreasing size,so that we are more likely to find
             * duplicated elements ASAP. 
            qsort(sets+ We need a temp set object to store our union. If the dstkey
     * is not NULL (that is,we are inside an SUNIONSTORE operation) then
     * this set object will be the resulting object to set into the target key
    dstset = createIntsetObject();

    if (op == SET_OP_UNION) {
         Union is trivial,just add every element of every set to the
         * temporary set. if (!sets[j]) continue;  non existing keys are like empty sets */
             依次添加即可,对于 sunion 来说,有序是无意义的
            si = setTypeInitIterator(sets[j]);
            while((ele = setTypeNextObject(si)) != NULL) {
                if (setTypeAdd(dstset,ele)) cardinality++;
                sdsfree(ele);
            }
            setTypeReleaseIterator(si);
        }
    } 
     使用算法1,依次迭代最大元素
    0] && diff_algo ==  DIFF Algorithm 1:
         *
         * We perform the diff by iterating all the elements of the first set,* and only adding it to the target set if the element does not exist
         * into all the other sets.
         *
         * This way we perform at max N*M operations,where N is the size of
         * the first set,and M the number of sets. 
        si = setTypeInitIterator(sets[]);
         NULL) {
             no key is an empty set. break;  same set!  只要有一个相同,就不算是差集??
                if (setTypeIsMember(sets[j],ele)) ;
            }
             这里的差集是所有set的值都不相同或者为空??? 尴尬了
             setnum) {
                 There is no other set with this element. Add it. 
                setTypeAdd(dstset,ele);
                cardinality++;
            }
            sdsfree(ele);
        }
        setTypeReleaseIterator(si);
    } 
     使用算法2,直接以第一个元素为基础,后续set做remove,最后剩下的就是差集
     DIFF Algorithm 2:
         *
         * Add all the elements of the first set to the auxiliary set.
         * Then remove all the elements of all the next sets from it.
         *
         * This is O(N) where N is the sum of all the elements in every
         * set. 

            si =if (j == ) {
                    ;
                } if (setTypeRemove(dstset,ele)) cardinality--;
                }
                sdsfree(ele);
            }
            setTypeReleaseIterator(si);

             Exit if result set is empty as any additional removal
             * of elements will have no effect. if (cardinality == 0)  Output the content of the resulting set,if not in STORE mode dstkey) {
        addReplyMultiBulkLen(c,cardinality);
        si = setTypeInitIterator(dstset);
         响应差集列表
         NULL) {
            addReplyBulkCBuffer(c,ele,sdslen(ele));
            sdsfree(ele);
        }
        setTypeReleaseIterator(si);
        decrRefCount(dstset);
    }  If we have a target key where to store the resulting set
         * create this key with the result set inside  存储差集列表,响应差集个数
            dbAdd(c->sunionstore" : sdiffstore;
    }
    zfree(sets);
}
 This is used by SDIFF and in this case we can receive NULL that should
 * be handled as empty sets. int qsortCompareSetsByRevCardinality(s2) {
    robj *o1 = *(robj**)s1,*o2 = *(robj**)s2;

    return  (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : );
}

6.SPOP

从存储在key的集合中移除并返回一个或多个随机元素。

此操作与SRANDMEMBER类似,它从一个集合中返回一个或多个随机元素,但不删除元素。

spop myset1 smembers myset1 6379>

源码解析

 用法: SPOP key [count]
void spopCommand(client *aux;
    sds sdsele;
    int64_t llele;
    if (c->argc == 3 弹出指定数量的元素,略
        spopWithCountCommand(c);
        if (c->argc > ) {
        addReply(c,shared.syntaxerr);
        ;
    }

     Make sure a key with the name inputted exists,and that it's type is
     * indeed a set set = lookupKeyWriteOrReply(c,shared.nullbulk)) == NULL || Get a random element from the set  1. 随机获取一个元素,这是 spop 的定义
    encoding = setTypeRandomElement(llele);

     Remove the element from the set  2. 删除元素
     OBJ_ENCODING_INTSET) {
        ele = createStringObjectFromLongLong(llele);
        set->ptr = intsetRemove(set-> {
        ele = createStringObject(sdsele,sdslen(sdsele));
        setTypeRemove(ptr);
    }

    notifyKeyspaceEvent(NOTIFY_SET,1)">spopid);

     Replicate/AOF this command as an SREM operation 
    aux = createStringObject(SREM4);
    rewriteClientCommandVector(c,1)">3,aux,ele);
    decrRefCount(aux);

     Add the element to the reply 
    addReplyBulk(c,ele);
    decrRefCount(ele);

     Delete the set if it's empty if (setTypeSize(set) == ) {
        dbDelete(c->db,1)">]);
        notifyKeyspaceEvent(NOTIFY_GENERIC,1)">id);
    }

     Set has been modified 
    signalModifiedKey(c->db,1)">]);
    server.dirty++ 没啥好说的,就看下是如何随机的就好了
 Return random element from a non empty set.
 * The returned element can be a int64_t value if the set is encoded
 * as an "intset" blob of integers,or an SDS string if the set
 * is a regular set.
 *
 * The caller provides both pointers to be populated with the right
 * object. The return value of the function is the object->encoding
 * field of the object and is used by the caller to check if the
 * int64_t pointer or the redis object pointer was populated.
 *
 * Note that both the sdsele and llele pointers should be passed and cannot
 * be NULL since the function will try to defensively populate the non
 * used field with values which are easy to trap if misused. int setTypeRandomElement(robj *setobj,1)">if (setobj->encoding == 1.1. dict 型的随机
        dictEntry *de = dictGetRandomKey(setobj->ptr);
        *sdsele = 1.2. intset 型的随机
        *llele = intsetRandom(setobj->ptr);
        *sdsele = NULL; return setobj-> 1.1. dict 型的随机 Return a random entry from the hash table. Useful to
 * implement randomized algorithms 
dictEntry *dictGetRandomKey(dict *d)
{
    dictEntry *he,1)">orighe;
    unsigned  h;
     listlen,listele;

    if (dictSize(d) ==  (dictIsRehashing(d)) _dictRehashStep(d);
     基本原理就是一直接随机获取下标,直到有值
     (dictIsRehashing(d)) {
        do We are sure there are no elements in indexes from 0
             * to rehashidx-1  获取随机下标,须保证在 两个hash表的范围内
            h = d->rehashidx + (random() % (d->ht[0].size +
                                            d->ht[1].size -
                                            d->rehashidx));
            he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[].size] :
                                      d->ht[].table[h];
        } while(he == NULL);
    }  {
            h = random() & d->ht[].sizemask;
            he = d->ht[ NULL);
    }

     Now we found a non empty bucket,but it is a linked
     * list and we need to get a random element from the list.
     * The only sane way to do so is counting the elements and
     * select a random index. 
    listlen = ;
    orighe = he;
     对于hash冲突情况,再随机一次
    while(he) {
        he = he->next;
        listlen++;
    }
    listele = random() % listlen;
    he = orighe;
    while(listele--) he = he->next;
     he;
}

 1.2. intset 型的随机
 Return random member 
int64_t intsetRandom(intset *) {
     这个随机就简单了,直接获取随机下标,因为intset可以保证自身元素的完整性
    return _intsetGet(length));
}

?

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读