Redis系列(八):数据结构List双向链表中阻塞版本之BLPOP、BRPO
1.BRPOP、BLPOPBLPOP: BLPOP?是阻塞式列表的弹出原语。 它是命令?LPOP?的阻塞版本,这是因为当给定列表内没有任何元素可供弹出的时候, 连接将被?BLPOP?命令阻塞。 当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。 BRPOP:
请在?BLPOP?文档 中查看该命令的准确语义,因为? 时间复杂度 :O(1) 127.0.0.1:6379> rpush order 10001 (integer) 1 100022 100033 100044 100055 100066379> 6379> brpop order 0 1) "order" 2) 100051000410003100021000110006" (2765.54s) 6379> ? ? ? ?源码解析 void blpopCommand(client *c) { blockingPopGenericCommand(c,LIST_HEAD); } void brpopCommand(client *blockingPopGenericCommand /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(client *c,int where) { robj *o; mstime_t timeout; int j; if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) != C_OK) return; for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]); if (o != NULL) { if (o->type != OBJ_LIST) { addReply(c,shared.wrongtypeerr); ; } else { if (listTypeLength(o) != 0) { Non empty list,this is like a non normal [LR]POP. */ char *event = (where == LIST_HEAD) ? lpop" : rpop; robj *value = listTypePop(o,1)">); serverAssert(value != NULL); addReplyArrayLen(c,2); addReplyBulk(c,c->argv[j]); addReplyBulk(c,value); decrRefCount(value); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[j],c->db->id); if (listTypeLength(o) == ) { dbDelete(c->db,1)">argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC,1)">delid); } signalModifiedKey(c,c->db,1)">argv[j]); server.dirty++; Replicate it as an [LR]POP instead of B[LR]POP. */ rewriteClientCommandVector(c,(where == LIST_HEAD) ? shared.lpop : shared.rpop,1)">argv[j]); ; } } } } If we are inside a MULTI/EXEC and the list is empty the only thing * we can do is treating it as a timeout (even with timeout 0). */ if (c->flags & CLIENT_MULTI) { addReplyNullArray(c); ; } If the list is empty or the key does not exists we must block blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - blockForKeys void blockForKeys(client *c,1)">int btype,robj **keys,1)">int numkeys,mstime_t timeout,robj *target,streamID *ids) { dictEntry *de; list *l; j; c->bpop.timeout = timeout; c->bpop.target = target; if (target != NULL) incrRefCount(target); 0; j < numkeys; j++) { Allocate our bkinfo structure,associated to each key the client * is blocked for. bkinfo *bki = zmalloc(sizeof(*bki)); if (btype == BLOCKED_STREAM) bki->stream_id = ids[j]; If the key already exists in the dictionary ignore it. */ if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) { zfree(bki); continue; } incrRefCount(keys[j]); And in the other "side",to map keys -> clients de = dictFind(c->db->blocking_keys,keys[j]); if (de == retval; For every key we take a list of clients blocked for it l = listCreate(); retval = dictAdd(c->db-> DICT_OK); } { l = dictGetVal(de); } listAddNodeTail(l,c); bki->listnode = listLast(l); } blockClient(c,btype); } blockClient
void blockClient(client *c,1)"> btype) { c->flags |= CLIENT_BLOCKED; c->btype = btype; server.blocked_clients++; server.blocked_clients_by_type[btype]++; addClientToTimeoutTable(c); } addClientToTimeoutTable
void addClientToTimeoutTable(client *c) { if (c->bpop.timeout == 0) ; uint64_t timeout = c->bpop.timeout; unsigned char buf[CLIENT_ST_KEYLEN]; encodeTimeoutKey(buf,c); if (raxTryInsert(server.clients_timeout_table,buf,1)">sizeof(buf),NULL)) c->flags |= CLIENT_IN_TO_TABLE; } 2.LINDEX返回列表里的元素的索引 index 存储在 key 里面。 下标是从0开始索引的,所以 0 是表示第一个元素, 1 表示第二个元素,并以此类推。 负数索引用于指定从列表尾部开始索引的元素。在这种方法下,-1 表示最后一个元素,-2 表示倒数第二个元素,并以此往前推。 当 key 位置的值不是一个列表的时候,会返回一个error。 时间复杂度:O(N) 6379> lpush mylist World" Hello6379> lindex mylist 6379> 源码解析 void lindexCommand(client *c) { robj *o = lookupKeyReadOrReply(c,c->argv[null[c->resp]); if (o == NULL || checkType(c,o,OBJ_LIST)) ; long index; robj *value = NULL; if ((getLongFromObjectOrReply(c,1)">2],&index,NULL) != C_OK)) if (o->encoding == OBJ_ENCODING_QUICKLIST) { quicklistEntry entry; if (quicklistIndex(o->ptr,index,1)">entry)) { if (entry.value) { value = createStringObject((char*)entry.value,entry.sz); } { value = createStringObjectFromLongLong(entry.longval); } addReplyBulk(c,value); decrRefCount(value); } { addReplyNull(c); } } { serverPanic(Unknown list encoding); } } ? Populate 'entry' with the element at the specified zero-based index * where 0 is the head,1 is the element next to head * and so on. Negative integers are used in order to count * from the tail,-1 is the last element,-2 the penultimate * and so on. If the index is out of range 0 is returned. * * Returns 1 if element found * Returns 0 if element not found int quicklistIndex(const quicklist *quicklist,1)">const long idx,quicklistEntry *entry) { quicklistNode *n; unsigned long accum = ; unsigned index; int forward = idx < 0 ? 0 : 1; < 0 -> reverse,0+ -> forward initEntry(entry); entry->quicklist = quicklist; if (!forward) { index = (-idx) - 1; n = quicklist->tail; } { index = idx; n = quicklist->head; } if (index >= quicklist->count) return while (likely(n)) { if ((accum + n->count) > index) { break; } { D(Skipping over (%p) %u at accum %lld",(void *)n,n->count,accum); accum += n->count; n = forward ? n->next : n->prev; } } n) ; D(Found node: %p at accum %llu,idx %llu,sub+ %llu,sub- %lluvoid *)n,accum,index - accum,(-index) - 1 + accum); entry->node = n; (forward) { forward = normal head-to-tail offset. entry->offset = index - accum; } { reverse = need negative offset for tail-to-head,so undo * the result of the original if (index < 0) above. entry->offset = (-index) - accum; } quicklistDecompressNodeForUse(entry->node); entry->zi = ziplistIndex(entry->node->zl,entry->offset); ziplistGet(entry->zi,&entry->value,&entry->sz,&entry->longval); The caller will use our result,so we don't re-compress here. * The caller can recompress or delete the node as needed. ; } 3.LINSERT把 value 插入存于 key 的列表中在基准值 pivot 的前面或后面。 当 key 不存在时,这个list会被看作是空list,任何操作都不会发生。 当 key 存在,但保存的不是一个list的时候,会返回error。 6379> linsert mylist before There6379> lrange mylist 0 -3) 6379> 4.LRANGE返回存储在 key 的列表里指定范围内的元素。 start 和 end 偏移量都是基于0的下标,即list的第一个元素下标是0(list的表头),第二个元素下标是1,以此类推。 偏移量也可以是负数,表示偏移量是从list尾部开始计数。 例如, -1 表示列表的最后一个元素,-2 是倒数第二个,以此类推。 时间复杂度:O(S+N) 0 void lrangeCommand(client *c) { robj *o; start,end,llen,rangelen; (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)) if ((o = lookupKeyReadOrReply(c,shared.emptyarray)) == NULL || checkType(c,1)">; llen = listTypeLength(o); convert negative indexes if (start < 0) start = llen+start; if (end < 0) end = llen+end; 0) start = Invariant: start >= 0,so this test will be true when end < 0. * The range is empty when start > end or start >= length. if (start > end || start >= llen) { addReply(c,shared.emptyarray); ; } if (end >= llen) end = llen-; rangelen = (end-start)+ Return the result in form of a multi-bulk reply addReplyArrayLen(c,rangelen); OBJ_ENCODING_QUICKLIST) { listTypeIterator *iter = listTypeInitIterator(o,start,LIST_TAIL); while(rangelen--) { listTypeEntry entry; listTypeNext(iter,&entry); quicklistEntry *qe = &entry.entry; if (qe->value) { addReplyBulkCBuffer(c,qe->value,qe->sz); } { addReplyBulkLongLong(c,qe->longval); } } listTypeReleaseIterator(iter); } List encoding is not QUICKLIST!); } } ? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |