日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Redis运行流程源码解析--转载

發(fā)布時間:2025/4/5 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis运行流程源码解析--转载 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

http://blog.nosqlfan.com/html/4007.html

http://www.searchdatabase.com.cn/showcontent_62166.htm

導(dǎo)讀:本文分析源碼基于Redis 2.4.7 stable版本,對Redis運行流程,命令處理的內(nèi)部實現(xiàn)進行了深入講解。

關(guān)鍵詞:Redis?源碼?運行流程?框架?

  概述

  Redis通過定義一個 struct redisServer 類型的全局變量server 來保存服務(wù)器的相關(guān)信息(比如:配置信息,統(tǒng)計信息,服務(wù)器狀態(tài)等等)。啟動時通過讀取配置文件里邊的信息對server進行初始化(如果沒有指定配置文 件,將使用默認值對sever進行初始化),初始化的內(nèi)容有:起監(jiān)聽端口,綁定有新連接時的回調(diào)函數(shù),綁定服務(wù)器的定時函數(shù),虛擬內(nèi)存初始化,log初始 化等等。

  啟動

  初始化服務(wù)器配置

  先來看看redis 的main函數(shù)的入口

  Redis.c:1694

int?main(int?argc,?char?**argv)?{
????time_t?start;

????initServerConfig();
????if?(argc?==?2)?{
????????if?(strcmp(argv[1],?"-v")?==?0?||
????????????strcmp(argv[1],?"--version")?==?0)?version();
????????if?(strcmp(argv[1],?"--help")?==?0)?usage();
????????resetServerSaveParams();
????????loadServerConfig(argv[1]);
????}?else?if?((argc?>?2))?{
????????usage();
????}?else?{
????????...
????}
????if?(server.daemonize)?daemonize();
????initServer();
????...
  • initServerConfig初始化全局變量 server 的屬性為默認值。
  • 如果命令行指定了配置文件, resetServerSaveParams重置對落地備份的配置(即重置為默認值)并讀取配置文件的內(nèi)容對全局變量 server 再進行初始化 ,沒有在配置文件中配置的將使用默認值。
  • 如果服務(wù)器配置成后臺執(zhí)行,則對服務(wù)器進行 daemonize。
  • initServer初始化服務(wù)器,主要是設(shè)置信號處理函數(shù),初始化事件輪詢,起監(jiān)聽端口,綁定有新連接時的回調(diào)函數(shù),綁定服務(wù)器的定時函數(shù),初始化虛擬內(nèi)存和log等等。
  • 創(chuàng)建服務(wù)器監(jiān)聽端口。

  Redis.c:923

????if?(server.port?!=?0)?{
????????server.ipfd=?anetTcpServer(server.neterr,server.port,server.bindaddr);
????????if?(server.ipfd?==?ANET_ERR)?{
????????????redisLog(REDIS_WARNING,?"Opening?port?%d:?%s",
????????????????server.port,?server.neterr);
????????????exit(1);
????????}
????}
  • anetTcpServer創(chuàng)建一個socket并進行監(jiān)聽,然后把返回的socket fd賦值給server.ipfd。

  事件輪詢結(jié)構(gòu)體定義

  先看看事件輪詢的結(jié)構(gòu)體定義

  Ae.h:88

/*?State?of?an?event?based?program?*/
typedef?struct?aeEventLoop?{
????int?maxfd;
????long?long?timeEventNextId;
????aeFileEvent?events[AE_SETSIZE];?/*?Registered?events?*/
????aeFiredEvent?fired[AE_SETSIZE];?/*?Fired?events?*/
????aeTimeEvent?*timeEventHead;
????int?stop;
????void?*apidata;?/*?This?is?used?for?polling?API?specific?data?*/
????aeBeforeSleepProc?*beforesleep;
}?aeEventLoop;
  • maxfd是最大的文件描述符,主要用來判斷是否有文件事件需要處理(ae.c:293)和當使用select 來處理網(wǎng)絡(luò)IO時作為select的參數(shù)(ae_select.c:50)。
  • timeEventNextId 是下一個定時事件的ID。
  • events[AE_SETSIZE]用于保存通過aeCreateFileEvent函數(shù)創(chuàng)建的文件事件,在sendReplyToClient函數(shù)和freeClient函數(shù)中通過調(diào)用aeDeleteFileEvent函數(shù)刪除已經(jīng)處理完的事件。
  • fired[AE_SETSIZE] 用于保存已經(jīng)觸發(fā)的文件事件,在對應(yīng)的網(wǎng)絡(luò)I/O函數(shù)中進行賦值(epoll,select,kqueue),不會對fired進行刪除操作,只會一直覆 蓋原來的值。然后在aeProcessEvents函數(shù)中對已經(jīng)觸發(fā)的事件進行處理。
  • timeEventHead 是定時事件鏈表的頭,定時事件的存儲用鏈表實現(xiàn)。
  • Stop 用于停止事件輪詢處理。
  • apidata 用于保存輪詢api需要的數(shù)據(jù),即aeApiState結(jié)構(gòu)體,對于epoll來說,aeApiState結(jié)構(gòu)體的定義如下:
typedef?struct?aeApiState?{
????int?epfd;
????struct?epoll_event?events[AE_SETSIZE];
}?aeApiState;
  • beforesleep 是每次進入處理事件時執(zhí)行的函數(shù)。

  創(chuàng)建事件輪詢

  Redis.c:920

??server.el?=?aeCreateEventLoop();
Ae.c:55
aeEventLoop?*aeCreateEventLoop(void)?{
????aeEventLoop?*eventLoop;
????int?i;

????eventLoop?=?zmalloc(sizeof(*eventLoop));
????if?(!eventLoop)?return?NULL;
????eventLoop->timeEventHead?=?NULL;
????eventLoop->timeEventNextId?=?0;
????eventLoop->stop?=?0;
????eventLoop->maxfd?=?-1;
????eventLoop->beforesleep?=?NULL;
????if?(aeApiCreate(eventLoop)?==?-1)?{
????????zfree(eventLoop);
????????return?NULL;
????}
/*?Events?with?mask?==?AE_NONE?are?not?set.?So?let's?initialize
?*?the?vector?with?it.?*/
????for?(i?=?0;?i?<?AE_SETSIZE;?i++)
????????eventLoop->events[i].mask?=?AE_NONE;
????return?eventLoop;
}

  綁定定時函數(shù)和有新連接時的回調(diào)函數(shù)

  redis.c:973

aeCreateTimeEvent(server.el,?1,?serverCron,?NULL,?NULL);
if?(server.ipfd?>?0?&&
????aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL)?==?AE_ERR)?oom("creating?file?event");
  • aeCreateTimeEvent 創(chuàng)建定時事件并綁定回調(diào)函數(shù)serverCron,這個定時事件第一次是超過1毫秒就有權(quán)限執(zhí)行,如果其他事件的處理時間比較長,可能會出現(xiàn)超過一定時間 都沒執(zhí)行情況。這里的1毫秒只是超過后有可執(zhí)行的權(quán)限,并不是一定會執(zhí)行。第一次執(zhí)行后,如果還要執(zhí)行,是由定時函數(shù)的返回值確定的,在 processTimeEvents(ae.c:219)中,當調(diào)用定時回調(diào)函數(shù)后,獲取定時回調(diào)函數(shù)的返回值,如果返回值不等于-1,則設(shè)置定時回調(diào)函 數(shù)的下一次觸發(fā)時間為當前時間加上定時回調(diào)函數(shù)的返回值,即調(diào)用間隔時間。serverCron的返回值是100ms,表明從二次開始,每超過100ms 就有權(quán)限執(zhí)行。(定時回調(diào)函數(shù)serverCron用于更新lru時鐘,更新服務(wù)器的狀態(tài),打印一些服務(wù)器信息,符合條件的情況下對hash表進行重哈 希,啟動后端寫AOF或者檢查后端寫AOF或者備份是否完成,檢查過期的KEY等等)
  • aeCreateFileEvent創(chuàng)建監(jiān)聽端口的socket fd的文件讀事件(即注冊網(wǎng)絡(luò)io事件)并綁定回調(diào)函數(shù)acceptTcpHandler。

  進入事件輪詢

  初始化后將進入事件輪詢

  Redis.c:1733

????aeSetBeforeSleepProc(server.el,beforeSleep);
????aeMain(server.el);
????aeDeleteEventLoop(server.el);
  • 設(shè)置每次進入事件處理前會執(zhí)行的函數(shù)beforeSleep。
  • 進入事件輪詢aeMain。
  • 退出事件輪詢后刪除事件輪詢,釋放事件輪詢占用內(nèi)存aeDeleteEventLoop(不過沒在代碼中發(fā)現(xiàn)有執(zhí)行到這一步的可能,服務(wù)器接到shutdown命令時通過一些處理后直接就通過exit退出了,可能是我看錯了,待驗證)。

  事件輪詢函數(shù)aeMain

  看看aeMain的內(nèi)容

  Ae.c:382

void?aeMain(aeEventLoop?*eventLoop)?{
????eventLoop->stop?=?0;
????while?(!eventLoop->stop)?{
????????if?(eventLoop->beforesleep?!=?NULL)
????????????eventLoop->beforesleep(eventLoop);
????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS);
????}
}
  • 每次進入事件處理前,都會調(diào)用設(shè)置的beforesleep,beforeSleep函數(shù)主要是處理被阻塞的命令和根據(jù)配置寫AOF。
  • aeProcessEvents處理定時事件和網(wǎng)絡(luò)io事件。

  啟動完畢,等待客戶端請求

  到進入事件輪詢函數(shù)后,redis的啟動工作就做完了,接下來就是等待客戶端的請求了。

  接收請求

  新連接到來時的回調(diào)函數(shù)

  在綁定定時函數(shù)和有新連接時的回調(diào)函數(shù)中說到了綁定有新連接來時的回調(diào)函數(shù)acceptTcpHandler,現(xiàn)在來看看這個函數(shù)的具體內(nèi)容

  Networking.c:427

void?acceptTcpHandler(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{
????int?cport,?cfd;
????char?cip[128];
????REDIS_NOTUSED(el);
????REDIS_NOTUSED(mask);
????REDIS_NOTUSED(privdata);

????cfd?=?anetTcpAccept(server.neterr,?fd,?cip,?&cport);
????if?(cfd?==?AE_ERR)?{
????????redisLog(REDIS_WARNING,"Accepting?client?connection:?%s",?server.neterr);
????????return;
????}
????redisLog(REDIS_VERBOSE,"Accepted?%s:%d",?cip,?cport);
????acceptCommonHandler(cfd);
}
  • anetTcpAccept 函數(shù) accept新連接,返回的cfd是新連接的socket fd。
  • acceptCommonHandler 函數(shù)是對新建立的連接進行處理,這個函數(shù)在使用 unix socket 時也會被用到。

  接收客戶端的新連接

  接下來看看anetTcpAccept函數(shù)的具體內(nèi)容

Anet.c:330
int?anetTcpAccept(char?*err,?int?s,?char?*ip,?int?*port)?{
????int?fd;
????struct?sockaddr_in?sa;
????socklen_t?salen?=?sizeof(sa);
????if?((fd?=?anetGenericAccept(err,s,(struct?sockaddr*)&sa,&salen))?==?ANET_ERR)
????????return?ANET_ERR;

????if?(ip)?strcpy(ip,inet_ntoa(sa.sin_addr));
????if?(port)?*port?=?ntohs(sa.sin_port);
????return?fd;
}

  再進去anetGenericAccept 看看

  Anet.c:313

static?int?anetGenericAccept(char?*err,?int?s,?struct?sockaddr?*sa,?socklen_t?*len)?{
????int?fd;
????while(1)?{
????????fd?=?accept(s,sa,len);
????????if?(fd?==?-1)?{
????????????if?(errno?==?EINTR)
????????????????continue;
????????????else?{
????????????????anetSetError(err,?"accept:?%s",?strerror(errno));
????????????????return?ANET_ERR;
????????????}
????????}
????????break;
????}
????return?fd;
}
  • anetTcpAccept 函數(shù)中調(diào)用anetGenericAccept 函數(shù)進行接收新連接,anetGenericAccept函數(shù)在 unix socket 的新連接處理中也會用到。
  • anetTcpAccept 函數(shù)接收新連接后,獲取客戶端得ip,port 并返回。

  創(chuàng)建redisClient進行接收處理

  anetTcpAccept 運行完后,返回新連接的socket fd, 然后返回到調(diào)用函數(shù)acceptTcpHandler中,繼續(xù)執(zhí)行acceptCommonHandler 函數(shù)

  Networking.c:403

static?void?acceptCommonHandler(int?fd)?{
????redisClient?*c;
????if?((c?=?createClient(fd))?==?NULL)?{
????????redisLog(REDIS_WARNING,"Error?allocating?resoures?for?the?client");
????????close(fd);?/*?May?be?already?closed,?just?ingore?errors?*/
????????return;
????}
????/*?If?maxclient?directive?is?set?and?this?is?one?client?more...?close?the
?????*?connection.?Note?that?we?create?the?client?instead?to?check?before
?????*?for?this?condition,?since?now?the?socket?is?already?set?in?nonblocking
?????*?mode?and?we?can?send?an?error?for?free?using?the?Kernel?I/O?*/
????if?(server.maxclients?&&?listLength(server.clients)?>?server.maxclients)?{
????????char?*err?=?"-ERR?max?number?of?clients?reached\r\n";

????????/*?That's?a?best?effort?error?message,?don't?check?write?errors?*/
????????if?(write(c->fd,err,strlen(err))?==?-1)?{
????????????/*?Nothing?to?do,?Just?to?avoid?the?warning...?*/
????????}
????????freeClient(c);
????????return;
????}
????server.stat_numconnections++;
}
  • 創(chuàng)建一個 redisClient 來處理新連接,每個連接都會創(chuàng)建一個 redisClient 來處理。
  • 如果配置了最大并發(fā)客戶端,則對現(xiàn)有的連接數(shù)進行檢查和處理。
  • 最后統(tǒng)計連接數(shù)。

  綁定有數(shù)據(jù)可讀時的回調(diào)函數(shù)

  Networking.c:15

redisClient?*createClient(int?fd)?{
????redisClient?*c?=?zmalloc(sizeof(redisClient));
????c->bufpos?=?0;

????anetNonBlock(NULL,fd);
????anetTcpNoDelay(NULL,fd);
????if?(aeCreateFileEvent(server.el,fd,AE_READABLE,
????????readQueryFromClient,?c)?==?AE_ERR)
????{
????????close(fd);
????????zfree(c);
????????return?NULL;
????}

????selectDb(c,0);
????c->fd?=?fd;
????c->querybuf?=?sdsempty();
c->reqtype?=?0;
...
}
  • 創(chuàng)建新連接的socket fd對應(yīng)的文件讀事件,綁定回調(diào)函數(shù)readQueryFromClient。
  • 如果創(chuàng)建成功,則對 redisClient 進行一系列的初始化,因為 redisClient 是通用的,即不管是什么命令的請求,都是通過創(chuàng)建一個 redisClient 來處理的,所以會有比較多的字段需要初始化。

  createClient 函數(shù)執(zhí)行完后返回到調(diào)用處acceptCommonHandler函數(shù),然后從acceptCommonHandler函數(shù)再返回到acceptTcpHandler函數(shù)。

  接收請求完畢,準備接收客戶端得數(shù)據(jù)

   到此為止,新連接到來時的回調(diào)函數(shù)acceptTcpHandler執(zhí)行完畢,在這個回調(diào)函數(shù)中創(chuàng)建了一個redisClient來處理這個客戶端接下 來的請求,并綁定了接收的新連接的讀文件事件。當有數(shù)據(jù)可讀時,網(wǎng)絡(luò)i/o輪詢(比如epoll)會有事件觸發(fā),此時綁定的回調(diào)函數(shù) readQueryFromClient將會調(diào)用來處理客戶端發(fā)送過來的數(shù)據(jù)。

  讀取客戶端請求的數(shù)據(jù)

  在綁定有數(shù)據(jù)可讀時的回調(diào)函數(shù)中的createClient函數(shù)中綁定了一個有數(shù)據(jù)可讀時的回調(diào)函數(shù)readQueryFromClient函數(shù),現(xiàn)在看看這個函數(shù)的具體內(nèi)容

  Networking.c:874

void?readQueryFromClient(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{
????redisClient?*c?=?(redisClient*)?privdata;
????char?buf[REDIS_IOBUF_LEN];
????int?nread;
????REDIS_NOTUSED(el);
????REDIS_NOTUSED(mask);

????server.current_client?=?c;
????nread?=?read(fd,?buf,?REDIS_IOBUF_LEN);
????if?(nread?==?-1)?{
????????if?(errno?==?EAGAIN)?{
????????????nread?=?0;
????????}?else?{
????????????redisLog(REDIS_VERBOSE,?"Reading?from?client:?%s",strerror(errno));
????????????freeClient(c);
????????????return;
????????}
????}?else?if?(nread?==?0)?{
????????redisLog(REDIS_VERBOSE,?"Client?closed?connection");
????????freeClient(c);
????????return;
????}
????if?(nread)?{
????????c->querybuf?=?sdscatlen(c->querybuf,buf,nread);
????????c->lastinteraction?=?time(NULL);
????}?else?{
????????server.current_client?=?NULL;
????????return;
????}
????if?(sdslen(c->querybuf)?>?server.client_max_querybuf_len)?{
????????sds?ci?=?getClientInfoString(c),?bytes?=?sdsempty();

????????bytes?=?sdscatrepr(bytes,c->querybuf,64);
????????redisLog(REDIS_WARNING,"Closing?client?that?reached?max?query?buffer?length:?%s?(qbuf?initial?bytes:?%s)",?ci,?bytes);
????????sdsfree(ci);
????????sdsfree(bytes);
????????freeClient(c);
????????return;
????}
????processInputBuffer(c);
????server.current_client?=?NULL;
}
  • 調(diào)用系統(tǒng)函數(shù)read來讀取客戶端傳送過來的數(shù)據(jù),調(diào)用read后對讀取過程中被系統(tǒng)中斷的情況(nread == -1 && errno == EAGAIN),客戶端關(guān)閉的情況(nread == 0)進行了判斷處理。
  • 如果讀取的數(shù)據(jù)超過限制(1GB)則報錯。
  • 讀取完后進入processInputBuffer進行協(xié)議解析。

  請求協(xié)議

  從readQueryFromClient函數(shù)讀取客戶端傳過來的數(shù)據(jù),進入processInputBuffer函數(shù)進行協(xié)議解析,可以把processInputBuffer函數(shù)看作是輸入數(shù)據(jù)的協(xié)議解析器

  Networking.c:835

void?processInputBuffer(redisClient?*c)?{
????/*?Keep?processing?while?there?is?something?in?the?input?buffer?*/
????while(sdslen(c->querybuf))?{
????????/*?Immediately?abort?if?the?client?is?in?the?middle?of?something.?*/
????????if?(c->flags?&?REDIS_BLOCKED?||?c->flags?&?REDIS_IO_WAIT)?return;

????????/*?REDIS_CLOSE_AFTER_REPLY?closes?the?connection?once?the?reply?is
?????????*?written?to?the?client.?Make?sure?to?not?let?the?reply?grow?after
?????????*?this?flag?has?been?set?(i.e.?don't?process?more?commands).?*/
????????if?(c->flags?&?REDIS_CLOSE_AFTER_REPLY)?return;

????????/*?Determine?request?type?when?unknown.?*/
????????if?(!c->reqtype)?{
????????????if?(c->querybuf[0]?==?'*')?{
????????????????c->reqtype?=?REDIS_REQ_MULTIBULK;
????????????}?else?{
????????????????c->reqtype?=?REDIS_REQ_INLINE;
????????????}
????????}

????????if?(c->reqtype?==?REDIS_REQ_INLINE)?{
????????????if?(processInlineBuffer(c)?!=?REDIS_OK)?break;
????????}?else?if?(c->reqtype?==?REDIS_REQ_MULTIBULK)?{
????????????if?(processMultibulkBuffer(c)?!=?REDIS_OK)?break;
????????}?else?{
????????????redisPanic("Unknown?request?type");
????????}

????????/*?Multibulk?processing?could?see?a?<=?0?length.?*/
????????if?(c->argc?==?0)?{
????????????resetClient(c);
????????}?else?{
????????????/*?Only?reset?the?client?when?the?command?was?executed.?*/
????????????if?(processCommand(c)?==?REDIS_OK)
????????????????resetClient(c);
????????}
????}
}
  • Redis支持兩種協(xié)議,一種是inline,一種是multibulk。inline協(xié)議是老協(xié)議,現(xiàn)在一般只在命令行下的redis客戶端使用,其他情況一般是使用multibulk協(xié)議。
  • 如 果客戶端傳送的數(shù)據(jù)的第一個字符時‘*’,那么傳送數(shù)據(jù)將被當做multibulk協(xié)議處理,否則將被當做inline協(xié)議處理。Inline協(xié)議的具體 解析函數(shù)是processInlineBuffer,multibulk協(xié)議的具體解析函數(shù)是processMultibulkBuffer。
  • 當協(xié)議解析完畢,即客戶端傳送的數(shù)據(jù)已經(jīng)解析出命令字段和參數(shù)字段,接下來進行命令處理,命令處理函數(shù)是processCommand。

  Inline請求協(xié)議

  Networking.c:679

int?processInlineBuffer(redisClient?*c)?{
????...
}
  • 根據(jù)空格分割客戶端傳送過來的數(shù)據(jù),把傳送過來的命令和參數(shù)保存在argv數(shù)組中,把參數(shù)個數(shù)保存在argc中,argc的值包括了命令參數(shù)本身。即set key value命令,argc的值為3。詳細解析見協(xié)議詳解

  Multibulk請求協(xié)議

   Multibulk協(xié)議比inline協(xié)議復(fù)雜,它是二進制安全的,即傳送數(shù)據(jù)可以包含不安全字符。Inline協(xié)議不是二進制安全的,比如,如果 set key value命令中的key或value包含空白字符,那么inline協(xié)議解析時將會失敗,因為解析出來的參數(shù)個數(shù)與命令需要的的參數(shù)個數(shù)會不一致。

  協(xié)議格式

*<number?of?arguments>?CR?LF
$<number?of?bytes?of?argument?1>?CR?LF
<argument?data>?CR?LF
...
$<number?of?bytes?of?argument?N>?CR?LF
<argument?data>?CR?LF

  協(xié)議舉例

*3
$3
SET
$5
mykey
$7
myvalue

  具體解析代碼位于

  Networking.c:731

int?processMultibulkBuffer(redisClient?*c)?{
...
}

  詳細解析見協(xié)議詳解

  處理命令

  當協(xié)議解析完畢,則表示客戶端的命令輸入已經(jīng)全部讀取并已經(jīng)解析成功,接下來就是執(zhí)行客戶端命令前的準備和執(zhí)行客戶端傳送過來的命令

  Redis.c:1062

/*?If?this?function?gets?called?we?already?read?a?whole
?*?command,?argments?are?in?the?client?argv/argc?fields.
?*?processCommand()?execute?the?command?or?prepare?the
?*?server?for?a?bulk?read?from?the?client.
?*
?*?If?1?is?returned?the?client?is?still?alive?and?valid?and
?*?and?other?operations?can?be?performed?by?the?caller.?Otherwise
?*?if?0?is?returned?the?client?was?destroied?(i.e.?after?QUIT).?*/
int?processCommand(redisClient?*c)?{
...
?/*?Now?lookup?the?command?and?check?ASAP?about?trivial?error?conditions
??*?such?as?wrong?arity,?bad?command?name?and?so?forth.?*/
c->cmd?=?c->lastcmd?=?lookupCommand(c->argv[0]->ptr);
...
call(c);
...
}
  • lookupCommand先根據(jù)客戶端傳送過來的數(shù)據(jù)查找該命令并找到命令的對應(yīng)處理函數(shù)。
  • Call函數(shù)調(diào)用該命令函數(shù)來處理命令,命令與對應(yīng)處理函數(shù)的綁定位于。

  Redi.c:72

struct?redisCommand?*commandTable;
struct?redisCommand?readonlyCommandTable[]?=?{
{"get",getCommand,2,0,NULL,1,1,1},
...
}

  回復(fù)請求

  回復(fù)請求位于對應(yīng)的命令中,以get命令為例

  T_string.c:67

void?getCommand(redisClient?*c)?{
????getGenericCommand(c);
}

  T_string.c:52

int?getGenericCommand(redisClient?*c)?{
????robj?*o;

????if?((o?=?lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk))?==?NULL)
????????return?REDIS_OK;

????if?(o->type?!=?REDIS_STRING)?{
????????addReply(c,shared.wrongtypeerr);
????????return?REDIS_ERR;
????}?else?{
????????addReplyBulk(c,o);
????????return?REDIS_OK;
????}
}
  • getGenericCommand在getset 命令中也會用到。
  • lookupKeyReadOrReply是以讀數(shù)據(jù)為目的查詢key函數(shù),并且如果該key不存在,則在該函數(shù)中做不存在的回包處理。
  • 如果該key存在,則返回該key對應(yīng)的數(shù)據(jù),addReply函數(shù)以及以addReply函數(shù)開頭的都是回包函數(shù)。

  綁定寫數(shù)據(jù)的回調(diào)函數(shù)

  接下來看看addReply函數(shù)里的內(nèi)容

  Networking.c:190

void?addReply(redisClient?*c,?robj?*obj)?{
????if?(_installWriteEvent(c)?!=?REDIS_OK)?return;
????...
}

  Networking.c:64

int?_installWriteEvent(redisClient?*c)?{
????if?(c->fd?<=?0)?return?REDIS_ERR;
????if?(c->bufpos?==?0?&&?listLength(c->reply)?==?0?&&
????????(c->replstate?==?REDIS_REPL_NONE?||
?????????c->replstate?==?REDIS_REPL_ONLINE)?&&
????????aeCreateFileEvent(server.el,?c->fd,?AE_WRITABLE,
????????sendReplyToClient,?c)?==?AE_ERR)?return?REDIS_ERR;
????return?REDIS_OK;
}
  • addReply函數(shù)一進來就先調(diào)用綁定寫數(shù)據(jù)的回調(diào)函數(shù)installWriteEvent。
  • installWriteEvent函數(shù)中創(chuàng)建了一個文件寫事件和綁定寫事件的回調(diào)函數(shù)為sendReplyToClient。

  準備寫的數(shù)據(jù)內(nèi)容

??? addReply函數(shù)一進來后就綁定寫數(shù)據(jù)的回調(diào)函數(shù),接下來就是準備寫的數(shù)據(jù)內(nèi)容

  Networking.c:190

void?addReply(redisClient?*c,?robj?*obj)?{
????if?(_installWriteEvent(c)?!=?REDIS_OK)?return;
????redisAssert(!server.vm_enabled?||?obj->storage?==?REDIS_VM_MEMORY);

????/*?This?is?an?important?place?where?we?can?avoid?copy-on-write
?????*?when?there?is?a?saving?child?running,?avoiding?touching?the
?????*?refcount?field?of?the?object?if?it's?not?needed.
?????*
?????*?If?the?encoding?is?RAW?and?there?is?room?in?the?static?buffer
?????*?we'll?be?able?to?send?the?object?to?the?client?without
?????*?messing?with?its?page.?*/
????if?(obj->encoding?==?REDIS_ENCODING_RAW)?{
????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK)
????????????_addReplyObjectToList(c,obj);
????}?else?{
????????/*?FIXME:?convert?the?long?into?string?and?use?_addReplyToBuffer()
?????????*?instead?of?calling?getDecodedObject.?As?this?place?in?the
?????????*?code?is?too?performance?critical.?*/
????????obj?=?getDecodedObject(obj);
????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?REDIS_OK)
????????????_addReplyObjectToList(c,obj);
????????decrRefCount(obj);
????}
}
  • 先嘗試把要返回的內(nèi)容添加到發(fā)送數(shù)據(jù)緩沖區(qū)中(redisClient->buf),如果該緩沖區(qū)的大小已經(jīng)放不下這次想放進去的數(shù)據(jù),或者已經(jīng)有數(shù)據(jù)在排隊(redisClient->reply 鏈表不為空),則把數(shù)據(jù)添加到發(fā)送鏈表的尾部。

  給客戶端答復(fù)數(shù)據(jù)

  在綁定寫數(shù)據(jù)的回調(diào)函數(shù)中看到綁定了回調(diào)函數(shù)sendReplyToClient,現(xiàn)在來看看這個函數(shù)的主要內(nèi)容

  Networking.c:566

void?sendReplyToClient(aeEventLoop?*el,?int?fd,?...)?{
????...
while(c->bufpos?>?0?||?listLength(c->reply))?{
????...
????if(c->bufpos?>?0){
????????...
????????????nwritten=write(fd,...,c->bufpos-c->sentlen);
????????????...
????????}?else?{
????????????o?=?listNodeValue(listFirst(c->reply));
????????????...
????????????nwritten=write(fd,...,objlen-c->sentlen);
????????????...
????????}
????}
}
  • 通過調(diào)用系統(tǒng)函數(shù)write給客戶端發(fā)送數(shù)據(jù),如果緩沖區(qū)有數(shù)據(jù)就把緩沖區(qū)的數(shù)據(jù)發(fā)送給客戶端,緩沖區(qū)的數(shù)據(jù)發(fā)送完了,如果有排隊數(shù)據(jù),則繼續(xù)發(fā)送。

  退出

  Redis 服務(wù)器的退出是通過shutdown命令來退出的,退出前會做一系列的清理工作

  Db.c:347

void?shutdownCommand(redisClient?*c)?{
????if?(prepareForShutdown()?==?REDIS_OK)
????????exit(0);
????addReplyError(c,"Errors?trying?to?SHUTDOWN.?Check?logs.");
}

  總結(jié)

  框架從啟動,接收請求,讀取客戶端數(shù)據(jù),請求協(xié)議解析,處理命令,回復(fù)請求,退出對redis運行的整個流程做了一個梳理。對整個redis的運作和框架有了一個初步的了解。

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/3446349.html

總結(jié)

以上是生活随笔為你收集整理的Redis运行流程源码解析--转载的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。