加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Redis运行流程源码解析--转载

发布时间:2020-12-16 04:44:53 所属栏目:安全 来源:网络整理
导读:http://blog.nosqlfan.com/html/4007.html http://www.searchdatabase.com.cn/showcontent_62166.htm :本文分析源码基于Redis 2.4.7 stable版本,对Redis运行流程,命令处理的内部实现进行了深入讲解。 ???? 概述 Redis通过定义一个 struct redisServer 类

http://blog.nosqlfan.com/html/4007.html

http://www.searchdatabase.com.cn/showcontent_62166.htm

:本文分析源码基于Redis 2.4.7 stable版本,对Redis运行流程,命令处理的内部实现进行了深入讲解。

????

  概述

  Redis通过定义一个 struct redisServer 类型的全局变量server 来保存服务器的相关信息(比如:配置信息,统计信息,服务器状态等等)。启动时通过读取配置文件里边的信息对server进行初始化(如果没有指定配置文 件,将使用默认值对sever进行初始化),初始化的内容有:起监听端口,绑定有新连接时的回调函数,绑定服务器的定时函数,虚拟内存初始化,log初始 化等等。

  启动

  初始化服务器配置

  先来看看redis 的main函数的入口

  Redis.c:1694

?2))?{ ????????usage(); ????}?else?{ ????????... ????} ????if?(server.daemonize)?daemonize(); ????initServer(); ????...</tr></table>

  • initServerConfig初始化全局变量 server 的属性为默认值。
  • 如果命令行指定了配置文件, resetServerSaveParams重置对落地备份的配置(即重置为默认值)并读取配置文件的内容对全局变量 server 再进行初始化 ,没有在配置文件中配置的将使用默认值。
  • 如果服务器配置成后台执行,则对服务器进行 daemonize。
  • initServer初始化服务器,主要是设置信号处理函数,初始化事件轮询,起监听端口,绑定有新连接时的回调函数,绑定服务器的定时函数,初始化虚拟内存和log等等。
  • 创建服务器监听端口。

  Redis.c:923

</tr></table>

  • anetTcpServer创建一个socket并进行监听,然后把返回的socket fd赋值给server.ipfd。

  事件轮询结构体定义

  先看看事件轮询的结构体定义

  Ae.h:88

</tr></table>

  • maxfd是最大的文件描述符,主要用来判断是否有文件事件需要处理(ae.c:293)和当使用select 来处理网络IO时作为select的参数(ae_select.c:50)。
  • timeEventNextId 是下一个定时事件的ID。
  • events[AE_SETSIZE]用于保存通过aeCreateFileEvent函数创建的文件事件,在sendReplyToClient函数和freeClient函数中通过调用aeDeleteFileEvent函数删除已经处理完的事件。
  • fired[AE_SETSIZE] 用于保存已经触发的文件事件,在对应的网络I/O函数中进行赋值(epoll,select,kqueue),不会对fired进行删除操作,只会一直覆 盖原来的值。然后在aeProcessEvents函数中对已经触发的事件进行处理。
  • timeEventHead 是定时事件链表的头,定时事件的存储用链表实现。
  • Stop 用于停止事件轮询处理。
  • apidata 用于保存轮询api需要的数据,即aeApiState结构体,对于epoll来说,aeApiState结构体的定义如下:
</tr></table>

  • beforesleep 是每次进入处理事件时执行的函数。

  创建事件轮询

  Redis.c:920

timeEventHead?=?NULL; ????eventLoop->timeEventNextId?=?0; ????eventLoop->stop?=?0; ????eventLoop->maxfd?=?-1; ????eventLoop->beforesleep?=?NULL; ????if?(aeApiCreate(eventLoop)?==?-1)?{ ????????zfree(eventLoop); ????????return?NULL; ????} /*?Events?with?mask?==?AE_NONE?are?not?set.?So?let's?initialize ?*?the?vector?with?it.?*/ ????for?(i?=?0;?i?events[i].mask?=?AE_NONE; ????return?eventLoop; }</tr></table>

  绑定定时函数和有新连接时的回调函数

  redis.c:973

?0?&& ????aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,acceptTcpHandler,NULL)?==?AE_ERR)?oom("creating?file?event");</tr></table>

  • aeCreateTimeEvent 创建定时事件并绑定回调函数serverCron,这个定时事件第一次是超过1毫秒就有权限执行,如果其他事件的处理时间比较长,可能会出现超过一定时间 都没执行情况。这里的1毫秒只是超过后有可执行的权限,并不是一定会执行。第一次执行后,如果还要执行,是由定时函数的返回值确定的,在 processTimeEvents(ae.c:219)中,当调用定时回调函数后,获取定时回调函数的返回值,如果返回值不等于-1,则设置定时回调函 数的下一次触发时间为当前时间加上定时回调函数的返回值,即调用间隔时间。serverCron的返回值是100ms,表明从二次开始,每超过100ms 就有权限执行。(定时回调函数serverCron用于更新lru时钟,更新服务器的状态,打印一些服务器信息,符合条件的情况下对hash表进行重哈 希,启动后端写AOF或者检查后端写AOF或者备份是否完成,检查过期的KEY等等)
  • aeCreateFileEvent创建监听端口的socket fd的文件读事件(即注册网络io事件)并绑定回调函数acceptTcpHandler。

  进入事件轮询

  初始化后将进入事件轮询

  Redis.c:1733

</tr></table>

  • 设置每次进入事件处理前会执行的函数beforeSleep。
  • 进入事件轮询aeMain。
  • 退出事件轮询后删除事件轮询,释放事件轮询占用内存aeDeleteEventLoop(不过没在代码中发现有执行到这一步的可能,服务器接到shutdown命令时通过一些处理后直接就通过exit退出了,可能是我看错了,待验证)。

  事件轮询函数aeMain

  看看aeMain的内容

  Ae.c:382

stop?=?0; ????while?(!eventLoop->stop)?{ ????????if?(eventLoop->beforesleep?!=?NULL) ????????????eventLoop->beforesleep(eventLoop); ????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS); ????} }</tr></table>

  • 每次进入事件处理前,都会调用设置的beforesleep,beforeSleep函数主要是处理被阻塞的命令和根据配置写AOF。
  • aeProcessEvents处理定时事件和网络io事件。

  启动完毕,等待客户端请求

  到进入事件轮询函数后,redis的启动工作就做完了,接下来就是等待客户端的请求了。

  接收请求

  新连接到来时的回调函数

  在绑定定时函数和有新连接时的回调函数中说到了绑定有新连接来时的回调函数acceptTcpHandler,现在来看看这个函数的具体内容

  Networking.c:427

</tr></table>

  • anetTcpAccept 函数 accept新连接,返回的cfd是新连接的socket fd。
  • acceptCommonHandler 函数是对新建立的连接进行处理,这个函数在使用 unix socket 时也会被用到。

  接收客户端的新连接

  接下来看看anetTcpAccept函数的具体内容

</tr></table>

  再进去anetGenericAccept 看看

  Anet.c:313

</tr></table>

  • anetTcpAccept 函数中调用anetGenericAccept 函数进行接收新连接,anetGenericAccept函数在 unix socket 的新连接处理中也会用到。
  • anetTcpAccept 函数接收新连接后,获取客户端得ip,port 并返回。

  创建redisClient进行接收处理

  anetTcpAccept 运行完后,返回新连接的socket fd,然后返回到调用函数acceptTcpHandler中,继续执行acceptCommonHandler 函数

  Networking.c:403

?server.maxclients)?{ ????????char?*err?=?"-ERR?max?number?of?clients?reachedrn"; ????????/*?That's?a?best?effort?error?message,?don't?check?write?errors?*/ ????????if?(write(c->fd,err,strlen(err))?==?-1)?{ ????????????/*?Nothing?to?do,?Just?to?avoid?the?warning...?*/ ????????} ????????freeClient(c); ????????return; ????} ????server.stat_numconnections++; }</tr></table>

  • 创建一个 redisClient 来处理新连接,每个连接都会创建一个 redisClient 来处理。
  • 如果配置了最大并发客户端,则对现有的连接数进行检查和处理。
  • 最后统计连接数。

  绑定有数据可读时的回调函数

  Networking.c:15

bufpos?=?0; ????anetNonBlock(NULL,fd); ????anetTcpNoDelay(NULL,fd); ????if?(aeCreateFileEvent(server.el,fd,????????readQueryFromClient,?c)?==?AE_ERR) ????{ ????????close(fd); ????????zfree(c); ????????return?NULL; ????} ????selectDb(c,0); ????c->fd?=?fd; ????c->querybuf?=?sdsempty(); c->reqtype?=?0; ... }</tr></table>

  • 创建新连接的socket fd对应的文件读事件,绑定回调函数readQueryFromClient。
  • 如果创建成功,则对 redisClient 进行一系列的初始化,因为 redisClient 是通用的,即不管是什么命令的请求,都是通过创建一个 redisClient 来处理的,所以会有比较多的字段需要初始化。

  createClient 函数执行完后返回到调用处acceptCommonHandler函数,然后从acceptCommonHandler函数再返回到acceptTcpHandler函数。

  接收请求完毕,准备接收客户端得数据

   到此为止,新连接到来时的回调函数acceptTcpHandler执行完毕,在这个回调函数中创建了一个redisClient来处理这个客户端接下 来的请求,并绑定了接收的新连接的读文件事件。当有数据可读时,网络i/o轮询(比如epoll)会有事件触发,此时绑定的回调函数 readQueryFromClient将会调用来处理客户端发送过来的数据。

  读取客户端请求的数据

  在绑定有数据可读时的回调函数中的createClient函数中绑定了一个有数据可读时的回调函数readQueryFromClient函数,现在看看这个函数的具体内容

  Networking.c:874

querybuf?=?sdscatlen(c->querybuf,buf,nread); ????????c->lastinteraction?=?time(NULL); ????}?else?{ ????????server.current_client?=?NULL; ????????return; ????} ????if?(sdslen(c->querybuf)?>?server.client_max_querybuf_len)?{ ????????sds?ci?=?getClientInfoString(c),?bytes?=?sdsempty(); ????????bytes?=?sdscatrepr(bytes,c->querybuf,64); ????????redisLog(REDIS_WARNING,"Closing?client?that?reached?max?query?buffer?length:?%s?(qbuf?initial?bytes:?%s)",?ci,?bytes); ????????sdsfree(ci); ????????sdsfree(bytes); ????????freeClient(c); ????????return; ????} ????processInputBuffer(c); ????server.current_client?=?NULL; }</tr></table>

  • 调用系统函数read来读取客户端传送过来的数据,调用read后对读取过程中被系统中断的情况(nread == -1 && errno == EAGAIN),客户端关闭的情况(nread == 0)进行了判断处理。
  • 如果读取的数据超过限制(1GB)则报错。
  • 读取完后进入processInputBuffer进行协议解析。

  请求协议

  从readQueryFromClient函数读取客户端传过来的数据,进入processInputBuffer函数进行协议解析,可以把processInputBuffer函数看作是输入数据的协议解析器

  Networking.c:835

querybuf))?{ ????????/*?Immediately?abort?if?the?client?is?in?the?middle?of?something.?*/ ????????if?(c->flags?&?REDIS_BLOCKED?||?c->flags?&?REDIS_IO_WAIT)?return; ????????/*?REDIS_CLOSE_AFTER_REPLY?closes?the?connection?once?the?reply?is ?????????*?written?to?the?client.?Make?sure?to?not?let?the?reply?grow?after ?????????*?this?flag?has?been?set?(i.e.?don't?process?more?commands).?*/ ????????if?(c->flags?&?REDIS_CLOSE_AFTER_REPLY)?return; ????????/*?Determine?request?type?when?unknown.?*/ ????????if?(!c->reqtype)?{ ????????????if?(c->querybuf[0]?==?'*')?{ ????????????????c->reqtype?=?REDIS_REQ_MULTIBULK; ????????????}?else?{ ????????????????c->reqtype?=?REDIS_REQ_INLINE; ????????????} ????????} ????????if?(c->reqtype?==?REDIS_REQ_INLINE)?{ ????????????if?(processInlineBuffer(c)?!=?REDIS_OK)?break; ????????}?else?if?(c->reqtype?==?REDIS_REQ_MULTIBULK)?{ ????????????if?(processMultibulkBuffer(c)?!=?REDIS_OK)?break; ????????}?else?{ ????????????redisPanic("Unknown?request?type"); ????????} ????????/*?Multibulk?processing?could?see?a?<=?0?length.?*/ ????????if?(c->argc?==?0)?{ ????????????resetClient(c); ????????}?else?{ ????????????/*?Only?reset?the?client?when?the?command?was?executed.?*/ ????????????if?(processCommand(c)?==?REDIS_OK) ????????????????resetClient(c); ????????} ????} }</tr></table>

  • Redis支持两种协议,一种是inline,一种是multibulk。inline协议是老协议,现在一般只在命令行下的redis客户端使用,其他情况一般是使用multibulk协议。
  • 如 果客户端传送的数据的第一个字符时‘*’,那么传送数据将被当做multibulk协议处理,否则将被当做inline协议处理。Inline协议的具体 解析函数是processInlineBuffer,multibulk协议的具体解析函数是processMultibulkBuffer。
  • 当协议解析完毕,即客户端传送的数据已经解析出命令字段和参数字段,接下来进行命令处理,命令处理函数是processCommand。

  Inline请求协议

  Networking.c:679

</tr></table>

  • 根据空格分割客户端传送过来的数据,把传送过来的命令和参数保存在argv数组中,把参数个数保存在argc中,argc的值包括了命令参数本身。即set key value命令,argc的值为3。详细解析见协议详解

  Multibulk请求协议

   Multibulk协议比inline协议复杂,它是二进制安全的,即传送数据可以包含不安全字符。Inline协议不是二进制安全的,比如,如果 set key value命令中的key或value包含空白字符,那么inline协议解析时将会失败,因为解析出来的参数个数与命令需要的的参数个数会不一致。

  协议格式

?CR?LF $?CR?LF ?CR?LF ... $?CR?LF ?CR?LF</tr></table>

  协议举例

</tr></table>

  具体解析代码位于

  Networking.c:731

</tr></table>

  详细解析见协议详解

  处理命令

  当协议解析完毕,则表示客户端的命令输入已经全部读取并已经解析成功,接下来就是执行客户端命令前的准备和执行客户端传送过来的命令

  Redis.c:1062

cmd?=?c->lastcmd?=?lookupCommand(c->argv[0]->ptr); ... call(c); ... }</tr></table>

  • lookupCommand先根据客户端传送过来的数据查找该命令并找到命令的对应处理函数。
  • Call函数调用该命令函数来处理命令,命令与对应处理函数的绑定位于。

  Redi.c:72

</tr></table>

  回复请求

  回复请求位于对应的命令中,以get命令为例

  T_string.c:67

</tr></table>

  T_string.c:52

argv[1],shared.nullbulk))?==?NULL) ????????return?REDIS_OK; ????if?(o->type?!=?REDIS_STRING)?{ ????????addReply(c,shared.wrongtypeerr); ????????return?REDIS_ERR; ????}?else?{ ????????addReplyBulk(c,o); ????????return?REDIS_OK; ????} }</tr></table>

  • getGenericCommand在getset 命令中也会用到。
  • lookupKeyReadOrReply是以读数据为目的查询key函数,并且如果该key不存在,则在该函数中做不存在的回包处理。
  • 如果该key存在,则返回该key对应的数据,addReply函数以及以addReply函数开头的都是回包函数。

  绑定写数据的回调函数

  接下来看看addReply函数里的内容

  Networking.c:190

</tr></table>

  Networking.c:64

fd?<=?0)?return?REDIS_ERR; ????if?(c->bufpos?==?0?&&?listLength(c->reply)?==?0?&& ????????(c->replstate?==?REDIS_REPL_NONE?|| ?????????c->replstate?==?REDIS_REPL_ONLINE)?&& ????????aeCreateFileEvent(server.el,?c->fd,?AE_WRITABLE,????????sendReplyToClient,?c)?==?AE_ERR)?return?REDIS_ERR; ????return?REDIS_OK; }</tr></table>

  • addReply函数一进来就先调用绑定写数据的回调函数installWriteEvent。
  • installWriteEvent函数中创建了一个文件写事件和绑定写事件的回调函数为sendReplyToClient。

  准备写的数据内容

??? addReply函数一进来后就绑定写数据的回调函数,接下来就是准备写的数据内容

  Networking.c:190

storage?==?REDIS_VM_MEMORY); ????/*?This?is?an?important?place?where?we?can?avoid?copy-on-write ?????*?when?there?is?a?saving?child?running,?avoiding?touching?the ?????*?refcount?field?of?the?object?if?it's?not?needed. ?????* ?????*?If?the?encoding?is?RAW?and?there?is?room?in?the?static?buffer ?????*?we'll?be?able?to?send?the?object?to?the?client?without ?????*?messing?with?its?page.?*/ ????if?(obj->encoding?==?REDIS_ENCODING_RAW)?{ ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK) ????????????_addReplyObjectToList(c,obj); ????}?else?{ ????????/*?FIXME:?convert?the?long?into?string?and?use?_addReplyToBuffer() ?????????*?instead?of?calling?getDecodedObject.?As?this?place?in?the ?????????*?code?is?too?performance?critical.?*/ ????????obj?=?getDecodedObject(obj); ????????if?(_addReplyToBuffer(c,obj); ????????decrRefCount(obj); ????} }</tr></table>

  • 先尝试把要返回的内容添加到发送数据缓冲区中(redisClient->buf),如果该缓冲区的大小已经放不下这次想放进去的数据,或者已经有数据在排队(redisClient->reply 链表不为空),则把数据添加到发送链表的尾部。

  给客户端答复数据

  在绑定写数据的回调函数中看到绑定了回调函数sendReplyToClient,现在来看看这个函数的主要内容

  Networking.c:566

bufpos?>?0?||?listLength(c->reply))?{ ????... ????if(c->bufpos?>?0){ ????????... ????????????nwritten=write(fd,...,c->bufpos-c->sentlen); ????????????... ????????}?else?{ ????????????o?=?listNodeValue(listFirst(c->reply)); ????????????... ????????????nwritten=write(fd,objlen-c->sentlen); ????????????... ????????} ????} }</tr></table>

  • 通过调用系统函数write给客户端发送数据,如果缓冲区有数据就把缓冲区的数据发送给客户端,缓冲区的数据发送完了,如果有排队数据,则继续发送。

  退出

  Redis 服务器的退出是通过shutdown命令来退出的,退出前会做一系列的清理工作

  Db.c:347

</tr></table>

  总结

  框架从启动,接收请求,读取客户端数据,请求协议解析,处理命令,回复请求,退出对redis运行的整个流程做了一个梳理。对整个redis的运作和框架有了一个初步的了解。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!