Redis运行流程源码解析--转载
http://blog.nosqlfan.com/html/4007.html
http://www.searchdatabase.com.cn/showcontent_62166.htm
導(dǎo)讀:本文分析源碼基于Redis 2.4.7 stable版本,對Redis運行流程,命令處理的內(nèi)部實現(xiàn)進行了深入講解。
關(guān)鍵詞:Redis?源碼?運行流程?框架?
概述
Redis通過定義一個 struct redisServer 類型的全局變量server 來保存服務(wù)器的相關(guān)信息(比如:配置信息,統(tǒng)計信息,服務(wù)器狀態(tài)等等)。啟動時通過讀取配置文件里邊的信息對server進行初始化(如果沒有指定配置文 件,將使用默認值對sever進行初始化),初始化的內(nèi)容有:起監(jiān)聽端口,綁定有新連接時的回調(diào)函數(shù),綁定服務(wù)器的定時函數(shù),虛擬內(nèi)存初始化,log初始 化等等。
啟動
初始化服務(wù)器配置
先來看看redis 的main函數(shù)的入口
Redis.c:1694
| int?main(int?argc,?char?**argv)?{ ????time_t?start; ????initServerConfig(); ????if?(argc?==?2)?{ ????????if?(strcmp(argv[1],?"-v")?==?0?|| ????????????strcmp(argv[1],?"--version")?==?0)?version(); ????????if?(strcmp(argv[1],?"--help")?==?0)?usage(); ????????resetServerSaveParams(); ????????loadServerConfig(argv[1]); ????}?else?if?((argc?>?2))?{ ????????usage(); ????}?else?{ ????????... ????} ????if?(server.daemonize)?daemonize(); ????initServer(); ????... |
- initServerConfig初始化全局變量 server 的屬性為默認值。
- 如果命令行指定了配置文件, resetServerSaveParams重置對落地備份的配置(即重置為默認值)并讀取配置文件的內(nèi)容對全局變量 server 再進行初始化 ,沒有在配置文件中配置的將使用默認值。
- 如果服務(wù)器配置成后臺執(zhí)行,則對服務(wù)器進行 daemonize。
- initServer初始化服務(wù)器,主要是設(shè)置信號處理函數(shù),初始化事件輪詢,起監(jiān)聽端口,綁定有新連接時的回調(diào)函數(shù),綁定服務(wù)器的定時函數(shù),初始化虛擬內(nèi)存和log等等。
- 創(chuàng)建服務(wù)器監(jiān)聽端口。
Redis.c:923
| ????if?(server.port?!=?0)?{ ????????server.ipfd=?anetTcpServer(server.neterr,server.port,server.bindaddr); ????????if?(server.ipfd?==?ANET_ERR)?{ ????????????redisLog(REDIS_WARNING,?"Opening?port?%d:?%s", ????????????????server.port,?server.neterr); ????????????exit(1); ????????} ????} |
- anetTcpServer創(chuàng)建一個socket并進行監(jiān)聽,然后把返回的socket fd賦值給server.ipfd。
事件輪詢結(jié)構(gòu)體定義
先看看事件輪詢的結(jié)構(gòu)體定義
Ae.h:88
| /*?State?of?an?event?based?program?*/ typedef?struct?aeEventLoop?{ ????int?maxfd; ????long?long?timeEventNextId; ????aeFileEvent?events[AE_SETSIZE];?/*?Registered?events?*/ ????aeFiredEvent?fired[AE_SETSIZE];?/*?Fired?events?*/ ????aeTimeEvent?*timeEventHead; ????int?stop; ????void?*apidata;?/*?This?is?used?for?polling?API?specific?data?*/ ????aeBeforeSleepProc?*beforesleep; }?aeEventLoop; |
- maxfd是最大的文件描述符,主要用來判斷是否有文件事件需要處理(ae.c:293)和當使用select 來處理網(wǎng)絡(luò)IO時作為select的參數(shù)(ae_select.c:50)。
- timeEventNextId 是下一個定時事件的ID。
- events[AE_SETSIZE]用于保存通過aeCreateFileEvent函數(shù)創(chuàng)建的文件事件,在sendReplyToClient函數(shù)和freeClient函數(shù)中通過調(diào)用aeDeleteFileEvent函數(shù)刪除已經(jīng)處理完的事件。
- fired[AE_SETSIZE] 用于保存已經(jīng)觸發(fā)的文件事件,在對應(yīng)的網(wǎng)絡(luò)I/O函數(shù)中進行賦值(epoll,select,kqueue),不會對fired進行刪除操作,只會一直覆 蓋原來的值。然后在aeProcessEvents函數(shù)中對已經(jīng)觸發(fā)的事件進行處理。
- timeEventHead 是定時事件鏈表的頭,定時事件的存儲用鏈表實現(xiàn)。
- Stop 用于停止事件輪詢處理。
- apidata 用于保存輪詢api需要的數(shù)據(jù),即aeApiState結(jié)構(gòu)體,對于epoll來說,aeApiState結(jié)構(gòu)體的定義如下:
| typedef?struct?aeApiState?{ ????int?epfd; ????struct?epoll_event?events[AE_SETSIZE]; }?aeApiState; |
- beforesleep 是每次進入處理事件時執(zhí)行的函數(shù)。
創(chuàng)建事件輪詢
Redis.c:920
| ??server.el?=?aeCreateEventLoop(); Ae.c:55 aeEventLoop?*aeCreateEventLoop(void)?{ ????aeEventLoop?*eventLoop; ????int?i; ????eventLoop?=?zmalloc(sizeof(*eventLoop)); ????if?(!eventLoop)?return?NULL; ????eventLoop->timeEventHead?=?NULL; ????eventLoop->timeEventNextId?=?0; ????eventLoop->stop?=?0; ????eventLoop->maxfd?=?-1; ????eventLoop->beforesleep?=?NULL; ????if?(aeApiCreate(eventLoop)?==?-1)?{ ????????zfree(eventLoop); ????????return?NULL; ????} /*?Events?with?mask?==?AE_NONE?are?not?set.?So?let's?initialize ?*?the?vector?with?it.?*/ ????for?(i?=?0;?i?<?AE_SETSIZE;?i++) ????????eventLoop->events[i].mask?=?AE_NONE; ????return?eventLoop; } |
綁定定時函數(shù)和有新連接時的回調(diào)函數(shù)
redis.c:973
| aeCreateTimeEvent(server.el,?1,?serverCron,?NULL,?NULL); if?(server.ipfd?>?0?&& ????aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL)?==?AE_ERR)?oom("creating?file?event"); |
- aeCreateTimeEvent 創(chuàng)建定時事件并綁定回調(diào)函數(shù)serverCron,這個定時事件第一次是超過1毫秒就有權(quán)限執(zhí)行,如果其他事件的處理時間比較長,可能會出現(xiàn)超過一定時間 都沒執(zhí)行情況。這里的1毫秒只是超過后有可執(zhí)行的權(quán)限,并不是一定會執(zhí)行。第一次執(zhí)行后,如果還要執(zhí)行,是由定時函數(shù)的返回值確定的,在 processTimeEvents(ae.c:219)中,當調(diào)用定時回調(diào)函數(shù)后,獲取定時回調(diào)函數(shù)的返回值,如果返回值不等于-1,則設(shè)置定時回調(diào)函 數(shù)的下一次觸發(fā)時間為當前時間加上定時回調(diào)函數(shù)的返回值,即調(diào)用間隔時間。serverCron的返回值是100ms,表明從二次開始,每超過100ms 就有權(quán)限執(zhí)行。(定時回調(diào)函數(shù)serverCron用于更新lru時鐘,更新服務(wù)器的狀態(tài),打印一些服務(wù)器信息,符合條件的情況下對hash表進行重哈 希,啟動后端寫AOF或者檢查后端寫AOF或者備份是否完成,檢查過期的KEY等等)
- aeCreateFileEvent創(chuàng)建監(jiān)聽端口的socket fd的文件讀事件(即注冊網(wǎng)絡(luò)io事件)并綁定回調(diào)函數(shù)acceptTcpHandler。
進入事件輪詢
初始化后將進入事件輪詢
Redis.c:1733
| ????aeSetBeforeSleepProc(server.el,beforeSleep); ????aeMain(server.el); ????aeDeleteEventLoop(server.el); |
- 設(shè)置每次進入事件處理前會執(zhí)行的函數(shù)beforeSleep。
- 進入事件輪詢aeMain。
- 退出事件輪詢后刪除事件輪詢,釋放事件輪詢占用內(nèi)存aeDeleteEventLoop(不過沒在代碼中發(fā)現(xiàn)有執(zhí)行到這一步的可能,服務(wù)器接到shutdown命令時通過一些處理后直接就通過exit退出了,可能是我看錯了,待驗證)。
事件輪詢函數(shù)aeMain
看看aeMain的內(nèi)容
Ae.c:382
| void?aeMain(aeEventLoop?*eventLoop)?{ ????eventLoop->stop?=?0; ????while?(!eventLoop->stop)?{ ????????if?(eventLoop->beforesleep?!=?NULL) ????????????eventLoop->beforesleep(eventLoop); ????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS); ????} } |
- 每次進入事件處理前,都會調(diào)用設(shè)置的beforesleep,beforeSleep函數(shù)主要是處理被阻塞的命令和根據(jù)配置寫AOF。
- aeProcessEvents處理定時事件和網(wǎng)絡(luò)io事件。
啟動完畢,等待客戶端請求
到進入事件輪詢函數(shù)后,redis的啟動工作就做完了,接下來就是等待客戶端的請求了。
接收請求
新連接到來時的回調(diào)函數(shù)
在綁定定時函數(shù)和有新連接時的回調(diào)函數(shù)中說到了綁定有新連接來時的回調(diào)函數(shù)acceptTcpHandler,現(xiàn)在來看看這個函數(shù)的具體內(nèi)容
Networking.c:427
| void?acceptTcpHandler(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{ ????int?cport,?cfd; ????char?cip[128]; ????REDIS_NOTUSED(el); ????REDIS_NOTUSED(mask); ????REDIS_NOTUSED(privdata); ????cfd?=?anetTcpAccept(server.neterr,?fd,?cip,?&cport); ????if?(cfd?==?AE_ERR)?{ ????????redisLog(REDIS_WARNING,"Accepting?client?connection:?%s",?server.neterr); ????????return; ????} ????redisLog(REDIS_VERBOSE,"Accepted?%s:%d",?cip,?cport); ????acceptCommonHandler(cfd); } |
- anetTcpAccept 函數(shù) accept新連接,返回的cfd是新連接的socket fd。
- acceptCommonHandler 函數(shù)是對新建立的連接進行處理,這個函數(shù)在使用 unix socket 時也會被用到。
接收客戶端的新連接
接下來看看anetTcpAccept函數(shù)的具體內(nèi)容
| Anet.c:330 int?anetTcpAccept(char?*err,?int?s,?char?*ip,?int?*port)?{ ????int?fd; ????struct?sockaddr_in?sa; ????socklen_t?salen?=?sizeof(sa); ????if?((fd?=?anetGenericAccept(err,s,(struct?sockaddr*)&sa,&salen))?==?ANET_ERR) ????????return?ANET_ERR; ????if?(ip)?strcpy(ip,inet_ntoa(sa.sin_addr)); ????if?(port)?*port?=?ntohs(sa.sin_port); ????return?fd; } |
再進去anetGenericAccept 看看
Anet.c:313
| static?int?anetGenericAccept(char?*err,?int?s,?struct?sockaddr?*sa,?socklen_t?*len)?{ ????int?fd; ????while(1)?{ ????????fd?=?accept(s,sa,len); ????????if?(fd?==?-1)?{ ????????????if?(errno?==?EINTR) ????????????????continue; ????????????else?{ ????????????????anetSetError(err,?"accept:?%s",?strerror(errno)); ????????????????return?ANET_ERR; ????????????} ????????} ????????break; ????} ????return?fd; } |
- anetTcpAccept 函數(shù)中調(diào)用anetGenericAccept 函數(shù)進行接收新連接,anetGenericAccept函數(shù)在 unix socket 的新連接處理中也會用到。
- anetTcpAccept 函數(shù)接收新連接后,獲取客戶端得ip,port 并返回。
創(chuàng)建redisClient進行接收處理
anetTcpAccept 運行完后,返回新連接的socket fd, 然后返回到調(diào)用函數(shù)acceptTcpHandler中,繼續(xù)執(zhí)行acceptCommonHandler 函數(shù)
Networking.c:403
| static?void?acceptCommonHandler(int?fd)?{ ????redisClient?*c; ????if?((c?=?createClient(fd))?==?NULL)?{ ????????redisLog(REDIS_WARNING,"Error?allocating?resoures?for?the?client"); ????????close(fd);?/*?May?be?already?closed,?just?ingore?errors?*/ ????????return; ????} ????/*?If?maxclient?directive?is?set?and?this?is?one?client?more...?close?the ?????*?connection.?Note?that?we?create?the?client?instead?to?check?before ?????*?for?this?condition,?since?now?the?socket?is?already?set?in?nonblocking ?????*?mode?and?we?can?send?an?error?for?free?using?the?Kernel?I/O?*/ ????if?(server.maxclients?&&?listLength(server.clients)?>?server.maxclients)?{ ????????char?*err?=?"-ERR?max?number?of?clients?reached\r\n"; ????????/*?That's?a?best?effort?error?message,?don't?check?write?errors?*/ ????????if?(write(c->fd,err,strlen(err))?==?-1)?{ ????????????/*?Nothing?to?do,?Just?to?avoid?the?warning...?*/ ????????} ????????freeClient(c); ????????return; ????} ????server.stat_numconnections++; } |
- 創(chuàng)建一個 redisClient 來處理新連接,每個連接都會創(chuàng)建一個 redisClient 來處理。
- 如果配置了最大并發(fā)客戶端,則對現(xiàn)有的連接數(shù)進行檢查和處理。
- 最后統(tǒng)計連接數(shù)。
綁定有數(shù)據(jù)可讀時的回調(diào)函數(shù)
Networking.c:15
| redisClient?*createClient(int?fd)?{ ????redisClient?*c?=?zmalloc(sizeof(redisClient)); ????c->bufpos?=?0; ????anetNonBlock(NULL,fd); ????anetTcpNoDelay(NULL,fd); ????if?(aeCreateFileEvent(server.el,fd,AE_READABLE, ????????readQueryFromClient,?c)?==?AE_ERR) ????{ ????????close(fd); ????????zfree(c); ????????return?NULL; ????} ????selectDb(c,0); ????c->fd?=?fd; ????c->querybuf?=?sdsempty(); c->reqtype?=?0; ... } |
- 創(chuàng)建新連接的socket fd對應(yīng)的文件讀事件,綁定回調(diào)函數(shù)readQueryFromClient。
- 如果創(chuàng)建成功,則對 redisClient 進行一系列的初始化,因為 redisClient 是通用的,即不管是什么命令的請求,都是通過創(chuàng)建一個 redisClient 來處理的,所以會有比較多的字段需要初始化。
createClient 函數(shù)執(zhí)行完后返回到調(diào)用處acceptCommonHandler函數(shù),然后從acceptCommonHandler函數(shù)再返回到acceptTcpHandler函數(shù)。
接收請求完畢,準備接收客戶端得數(shù)據(jù)
到此為止,新連接到來時的回調(diào)函數(shù)acceptTcpHandler執(zhí)行完畢,在這個回調(diào)函數(shù)中創(chuàng)建了一個redisClient來處理這個客戶端接下 來的請求,并綁定了接收的新連接的讀文件事件。當有數(shù)據(jù)可讀時,網(wǎng)絡(luò)i/o輪詢(比如epoll)會有事件觸發(fā),此時綁定的回調(diào)函數(shù) readQueryFromClient將會調(diào)用來處理客戶端發(fā)送過來的數(shù)據(jù)。
讀取客戶端請求的數(shù)據(jù)
在綁定有數(shù)據(jù)可讀時的回調(diào)函數(shù)中的createClient函數(shù)中綁定了一個有數(shù)據(jù)可讀時的回調(diào)函數(shù)readQueryFromClient函數(shù),現(xiàn)在看看這個函數(shù)的具體內(nèi)容
Networking.c:874
| void?readQueryFromClient(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{ ????redisClient?*c?=?(redisClient*)?privdata; ????char?buf[REDIS_IOBUF_LEN]; ????int?nread; ????REDIS_NOTUSED(el); ????REDIS_NOTUSED(mask); ????server.current_client?=?c; ????nread?=?read(fd,?buf,?REDIS_IOBUF_LEN); ????if?(nread?==?-1)?{ ????????if?(errno?==?EAGAIN)?{ ????????????nread?=?0; ????????}?else?{ ????????????redisLog(REDIS_VERBOSE,?"Reading?from?client:?%s",strerror(errno)); ????????????freeClient(c); ????????????return; ????????} ????}?else?if?(nread?==?0)?{ ????????redisLog(REDIS_VERBOSE,?"Client?closed?connection"); ????????freeClient(c); ????????return; ????} ????if?(nread)?{ ????????c->querybuf?=?sdscatlen(c->querybuf,buf,nread); ????????c->lastinteraction?=?time(NULL); ????}?else?{ ????????server.current_client?=?NULL; ????????return; ????} ????if?(sdslen(c->querybuf)?>?server.client_max_querybuf_len)?{ ????????sds?ci?=?getClientInfoString(c),?bytes?=?sdsempty(); ????????bytes?=?sdscatrepr(bytes,c->querybuf,64); ????????redisLog(REDIS_WARNING,"Closing?client?that?reached?max?query?buffer?length:?%s?(qbuf?initial?bytes:?%s)",?ci,?bytes); ????????sdsfree(ci); ????????sdsfree(bytes); ????????freeClient(c); ????????return; ????} ????processInputBuffer(c); ????server.current_client?=?NULL; } |
- 調(diào)用系統(tǒng)函數(shù)read來讀取客戶端傳送過來的數(shù)據(jù),調(diào)用read后對讀取過程中被系統(tǒng)中斷的情況(nread == -1 && errno == EAGAIN),客戶端關(guān)閉的情況(nread == 0)進行了判斷處理。
- 如果讀取的數(shù)據(jù)超過限制(1GB)則報錯。
- 讀取完后進入processInputBuffer進行協(xié)議解析。
請求協(xié)議
從readQueryFromClient函數(shù)讀取客戶端傳過來的數(shù)據(jù),進入processInputBuffer函數(shù)進行協(xié)議解析,可以把processInputBuffer函數(shù)看作是輸入數(shù)據(jù)的協(xié)議解析器
Networking.c:835
| void?processInputBuffer(redisClient?*c)?{ ????/*?Keep?processing?while?there?is?something?in?the?input?buffer?*/ ????while(sdslen(c->querybuf))?{ ????????/*?Immediately?abort?if?the?client?is?in?the?middle?of?something.?*/ ????????if?(c->flags?&?REDIS_BLOCKED?||?c->flags?&?REDIS_IO_WAIT)?return; ????????/*?REDIS_CLOSE_AFTER_REPLY?closes?the?connection?once?the?reply?is ?????????*?written?to?the?client.?Make?sure?to?not?let?the?reply?grow?after ?????????*?this?flag?has?been?set?(i.e.?don't?process?more?commands).?*/ ????????if?(c->flags?&?REDIS_CLOSE_AFTER_REPLY)?return; ????????/*?Determine?request?type?when?unknown.?*/ ????????if?(!c->reqtype)?{ ????????????if?(c->querybuf[0]?==?'*')?{ ????????????????c->reqtype?=?REDIS_REQ_MULTIBULK; ????????????}?else?{ ????????????????c->reqtype?=?REDIS_REQ_INLINE; ????????????} ????????} ????????if?(c->reqtype?==?REDIS_REQ_INLINE)?{ ????????????if?(processInlineBuffer(c)?!=?REDIS_OK)?break; ????????}?else?if?(c->reqtype?==?REDIS_REQ_MULTIBULK)?{ ????????????if?(processMultibulkBuffer(c)?!=?REDIS_OK)?break; ????????}?else?{ ????????????redisPanic("Unknown?request?type"); ????????} ????????/*?Multibulk?processing?could?see?a?<=?0?length.?*/ ????????if?(c->argc?==?0)?{ ????????????resetClient(c); ????????}?else?{ ????????????/*?Only?reset?the?client?when?the?command?was?executed.?*/ ????????????if?(processCommand(c)?==?REDIS_OK) ????????????????resetClient(c); ????????} ????} } |
- Redis支持兩種協(xié)議,一種是inline,一種是multibulk。inline協(xié)議是老協(xié)議,現(xiàn)在一般只在命令行下的redis客戶端使用,其他情況一般是使用multibulk協(xié)議。
- 如 果客戶端傳送的數(shù)據(jù)的第一個字符時‘*’,那么傳送數(shù)據(jù)將被當做multibulk協(xié)議處理,否則將被當做inline協(xié)議處理。Inline協(xié)議的具體 解析函數(shù)是processInlineBuffer,multibulk協(xié)議的具體解析函數(shù)是processMultibulkBuffer。
- 當協(xié)議解析完畢,即客戶端傳送的數(shù)據(jù)已經(jīng)解析出命令字段和參數(shù)字段,接下來進行命令處理,命令處理函數(shù)是processCommand。
Inline請求協(xié)議
Networking.c:679
| int?processInlineBuffer(redisClient?*c)?{ ????... } |
- 根據(jù)空格分割客戶端傳送過來的數(shù)據(jù),把傳送過來的命令和參數(shù)保存在argv數(shù)組中,把參數(shù)個數(shù)保存在argc中,argc的值包括了命令參數(shù)本身。即set key value命令,argc的值為3。詳細解析見協(xié)議詳解
Multibulk請求協(xié)議
Multibulk協(xié)議比inline協(xié)議復(fù)雜,它是二進制安全的,即傳送數(shù)據(jù)可以包含不安全字符。Inline協(xié)議不是二進制安全的,比如,如果 set key value命令中的key或value包含空白字符,那么inline協(xié)議解析時將會失敗,因為解析出來的參數(shù)個數(shù)與命令需要的的參數(shù)個數(shù)會不一致。
協(xié)議格式
| *<number?of?arguments>?CR?LF $<number?of?bytes?of?argument?1>?CR?LF <argument?data>?CR?LF ... $<number?of?bytes?of?argument?N>?CR?LF <argument?data>?CR?LF |
協(xié)議舉例
| *3 $3 SET $5 mykey $7 myvalue |
具體解析代碼位于
Networking.c:731
| int?processMultibulkBuffer(redisClient?*c)?{ ... } |
詳細解析見協(xié)議詳解
處理命令
當協(xié)議解析完畢,則表示客戶端的命令輸入已經(jīng)全部讀取并已經(jīng)解析成功,接下來就是執(zhí)行客戶端命令前的準備和執(zhí)行客戶端傳送過來的命令
Redis.c:1062
| /*?If?this?function?gets?called?we?already?read?a?whole ?*?command,?argments?are?in?the?client?argv/argc?fields. ?*?processCommand()?execute?the?command?or?prepare?the ?*?server?for?a?bulk?read?from?the?client. ?* ?*?If?1?is?returned?the?client?is?still?alive?and?valid?and ?*?and?other?operations?can?be?performed?by?the?caller.?Otherwise ?*?if?0?is?returned?the?client?was?destroied?(i.e.?after?QUIT).?*/ int?processCommand(redisClient?*c)?{ ... ?/*?Now?lookup?the?command?and?check?ASAP?about?trivial?error?conditions ??*?such?as?wrong?arity,?bad?command?name?and?so?forth.?*/ c->cmd?=?c->lastcmd?=?lookupCommand(c->argv[0]->ptr); ... call(c); ... } |
- lookupCommand先根據(jù)客戶端傳送過來的數(shù)據(jù)查找該命令并找到命令的對應(yīng)處理函數(shù)。
- Call函數(shù)調(diào)用該命令函數(shù)來處理命令,命令與對應(yīng)處理函數(shù)的綁定位于。
Redi.c:72
| struct?redisCommand?*commandTable; struct?redisCommand?readonlyCommandTable[]?=?{ {"get",getCommand,2,0,NULL,1,1,1}, ... } |
回復(fù)請求
回復(fù)請求位于對應(yīng)的命令中,以get命令為例
T_string.c:67
| void?getCommand(redisClient?*c)?{ ????getGenericCommand(c); } |
T_string.c:52
| int?getGenericCommand(redisClient?*c)?{ ????robj?*o; ????if?((o?=?lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk))?==?NULL) ????????return?REDIS_OK; ????if?(o->type?!=?REDIS_STRING)?{ ????????addReply(c,shared.wrongtypeerr); ????????return?REDIS_ERR; ????}?else?{ ????????addReplyBulk(c,o); ????????return?REDIS_OK; ????} } |
- getGenericCommand在getset 命令中也會用到。
- lookupKeyReadOrReply是以讀數(shù)據(jù)為目的查詢key函數(shù),并且如果該key不存在,則在該函數(shù)中做不存在的回包處理。
- 如果該key存在,則返回該key對應(yīng)的數(shù)據(jù),addReply函數(shù)以及以addReply函數(shù)開頭的都是回包函數(shù)。
綁定寫數(shù)據(jù)的回調(diào)函數(shù)
接下來看看addReply函數(shù)里的內(nèi)容
Networking.c:190
| void?addReply(redisClient?*c,?robj?*obj)?{ ????if?(_installWriteEvent(c)?!=?REDIS_OK)?return; ????... } |
Networking.c:64
| int?_installWriteEvent(redisClient?*c)?{ ????if?(c->fd?<=?0)?return?REDIS_ERR; ????if?(c->bufpos?==?0?&&?listLength(c->reply)?==?0?&& ????????(c->replstate?==?REDIS_REPL_NONE?|| ?????????c->replstate?==?REDIS_REPL_ONLINE)?&& ????????aeCreateFileEvent(server.el,?c->fd,?AE_WRITABLE, ????????sendReplyToClient,?c)?==?AE_ERR)?return?REDIS_ERR; ????return?REDIS_OK; } |
- addReply函數(shù)一進來就先調(diào)用綁定寫數(shù)據(jù)的回調(diào)函數(shù)installWriteEvent。
- installWriteEvent函數(shù)中創(chuàng)建了一個文件寫事件和綁定寫事件的回調(diào)函數(shù)為sendReplyToClient。
準備寫的數(shù)據(jù)內(nèi)容
??? addReply函數(shù)一進來后就綁定寫數(shù)據(jù)的回調(diào)函數(shù),接下來就是準備寫的數(shù)據(jù)內(nèi)容
Networking.c:190
| void?addReply(redisClient?*c,?robj?*obj)?{ ????if?(_installWriteEvent(c)?!=?REDIS_OK)?return; ????redisAssert(!server.vm_enabled?||?obj->storage?==?REDIS_VM_MEMORY); ????/*?This?is?an?important?place?where?we?can?avoid?copy-on-write ?????*?when?there?is?a?saving?child?running,?avoiding?touching?the ?????*?refcount?field?of?the?object?if?it's?not?needed. ?????* ?????*?If?the?encoding?is?RAW?and?there?is?room?in?the?static?buffer ?????*?we'll?be?able?to?send?the?object?to?the?client?without ?????*?messing?with?its?page.?*/ ????if?(obj->encoding?==?REDIS_ENCODING_RAW)?{ ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK) ????????????_addReplyObjectToList(c,obj); ????}?else?{ ????????/*?FIXME:?convert?the?long?into?string?and?use?_addReplyToBuffer() ?????????*?instead?of?calling?getDecodedObject.?As?this?place?in?the ?????????*?code?is?too?performance?critical.?*/ ????????obj?=?getDecodedObject(obj); ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK) ????????????_addReplyObjectToList(c,obj); ????????decrRefCount(obj); ????} } |
- 先嘗試把要返回的內(nèi)容添加到發(fā)送數(shù)據(jù)緩沖區(qū)中(redisClient->buf),如果該緩沖區(qū)的大小已經(jīng)放不下這次想放進去的數(shù)據(jù),或者已經(jīng)有數(shù)據(jù)在排隊(redisClient->reply 鏈表不為空),則把數(shù)據(jù)添加到發(fā)送鏈表的尾部。
給客戶端答復(fù)數(shù)據(jù)
在綁定寫數(shù)據(jù)的回調(diào)函數(shù)中看到綁定了回調(diào)函數(shù)sendReplyToClient,現(xiàn)在來看看這個函數(shù)的主要內(nèi)容
Networking.c:566
| void?sendReplyToClient(aeEventLoop?*el,?int?fd,?...)?{ ????... while(c->bufpos?>?0?||?listLength(c->reply))?{ ????... ????if(c->bufpos?>?0){ ????????... ????????????nwritten=write(fd,...,c->bufpos-c->sentlen); ????????????... ????????}?else?{ ????????????o?=?listNodeValue(listFirst(c->reply)); ????????????... ????????????nwritten=write(fd,...,objlen-c->sentlen); ????????????... ????????} ????} } |
- 通過調(diào)用系統(tǒng)函數(shù)write給客戶端發(fā)送數(shù)據(jù),如果緩沖區(qū)有數(shù)據(jù)就把緩沖區(qū)的數(shù)據(jù)發(fā)送給客戶端,緩沖區(qū)的數(shù)據(jù)發(fā)送完了,如果有排隊數(shù)據(jù),則繼續(xù)發(fā)送。
退出
Redis 服務(wù)器的退出是通過shutdown命令來退出的,退出前會做一系列的清理工作
Db.c:347
| void?shutdownCommand(redisClient?*c)?{ ????if?(prepareForShutdown()?==?REDIS_OK) ????????exit(0); ????addReplyError(c,"Errors?trying?to?SHUTDOWN.?Check?logs."); } |
總結(jié)
框架從啟動,接收請求,讀取客戶端數(shù)據(jù),請求協(xié)議解析,處理命令,回復(fù)請求,退出對redis運行的整個流程做了一個梳理。對整個redis的運作和框架有了一個初步的了解。
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/3446349.html
總結(jié)
以上是生活随笔為你收集整理的Redis运行流程源码解析--转载的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kill -3 获取threaddump
- 下一篇: windows端口查看及进程查找