日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Redis源码解析:14Redis服务器与客户端间的交互

發布時間:2025/3/15 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis源码解析:14Redis服务器与客户端间的交互 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ? ? ? ?Redis服務器是典型的一對多服務器程序,通過使用由IO多路復用技術實現的文件事件處理器,Redis服務器使用單線程單進程的方式來處理命令請求,并與多個客戶端進行網絡通信。

?

? ? ? ? ?Redis客戶端與服務器之間通過TCP協議進行通信。TCP協議是一種流式協議,數據以字節流的形式進行傳遞,沒有固有的"報文"或"報文邊界"的概念,如果需要設置邊界,需要應用層自行處理。

? ? ? ? ?因此,Redis客戶端與服務器之間的交互數據,都按照Redis自定義的統一請求協議的格式進行編碼。使用這種協議,每條命令之間都有了“邊界”。

? ? ? ? ?舉個例子,如果客戶端要向服務器發送以下命令請求:

? ? ? ? ?SET msg “helloworld”

? ? ? ? ?那么客戶端實際發送的數據是:

? ? ? ? ?*3\r\n$3\r\nSET\r\n$3\r\nmsg\r\n$11\r\nhelloworld\r\n

???????? 服務器收到這樣的數據時,就可以通過解析”*3”得到該命令有3個參數,第一個參數長度為3,值為”SET”,也就是要執行的命令;第二個參數長度為3,值為”msg”;第三個參數長度為11,值為”hello world”。

? ? ? ? ?這樣就得到了一條完整的命令,解析并處理該命令后,接著解析下一條命令。

?

一:客戶端結構redisClient

? ? ? ? ?對于每個與服務器進行連接的客戶端,服務器都為這些客戶端建立了相應的redisClient結構,該結構體定義在redis.h中,它的定義如下(有省略):

typedef struct redisClient {uint64_t id; /* Client incremental unique ID. */int fd;redisDb *db;int dictid;robj *name; /* As set by CLIENT SETNAME */sds querybuf;size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */int argc;robj **argv;struct redisCommand *cmd, *lastcmd;int reqtype;int multibulklen; /* number of multi bulk arguments left to read */long bulklen; /* length of bulk argument in multi bulk request */list *reply;unsigned long reply_bytes; /* Tot bytes of objects in reply list */...int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */int authenticated; /* when requirepass is non-NULL */.../* Response buffer */int bufpos;char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient;

? ? ? ? ?這個結構保存了客戶端當前的狀態信息,以及執行相關功能時需要用到的數據結構,比如:客戶端的socket描述符(fd),指向客戶端正在使用的數據庫的指針(db),客戶端的名字(name),客戶端的標志值(flags),客戶端輸入緩存(querybuf),客戶端當前要執行的命令參數(argv),以及參數個數(argc),以及客戶端的輸出緩存(buf和reply)等。

? ? ? ? ?這些屬性的具體意義會在下面的章節中介紹。

?

二:初始化(創建監聽端口、注冊建連事件)

? ? ? ? ?在Redis服務器的初始化函數initserver中,調用aeCreateEventLoop創建了Redis服務器中唯一的事件循環結構(aeEventLoop):server.e1。server.e1是全局性的,Redis服務器中所有的事件都注冊在該結構上。

?

???????? 默認情況下,Redis服務器監聽本地所有網絡接口上的連接(0.0.0.0)。可以在配置文件中,通過"bind"選項設置監聽的地址,其后跟一個或多個空格分隔的IP地址,比如:

bind 192.168.1.100 ?10.0.0.1

???????? Redis將這些地址保存在server.bindaddr中,IP地址總數為server.bindaddr_count。

?

? ? ? ? ?在initserver函數中,調用listenToPort,根據這些監聽地址,調用socket、bind和listen創建監聽socket描述符。

/* Open the TCP listening socket for the user commands. */ if (server.port != 0 &&listenToPort(server.port, server.ipfd, &server.ipfd_count) == REDIS_ERR)exit(1);

? ? ? ? ?創建好的監聽描述符保存在描述符數組server.ipfd中,最后創建的監聽描述符的總數為server.ipfd_count。server.ipfd數組為固定大小:REDIS_BINDADDR_MAX(16),因此最多只支持16個監聽地址。

?

???????? 然后,針對每個監聽描述符,調用aeCreateFileEvent,注冊其上的可讀事件,回調函數為acceptTcpHandler:

for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR){redisPanic("Unrecoverable error creating server.ipfd file event.");} }


???????? Redis服務器收到客戶端的TCP連接后,就會調用acceptTcpHandler函數進行處理。acceptTcpHandler函數的代碼如下:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[REDIS_IP_STR_LEN];REDIS_NOTUSED(el);REDIS_NOTUSED(mask);REDIS_NOTUSED(privdata);while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);if (cfd == ANET_ERR) {if (errno != EWOULDBLOCK)redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);return;}redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);acceptCommonHandler(cfd,0);} }

? ? ? ? ?該函數每次最多處理MAX_ACCEPTS_PER_CALL(1000)個連接,如果還有其他連接,則等到下次調用acceptTcpHandler時再處理,這樣做的原因是為了保證該函數的執行時間不會過長,以免影響后續事件的處理。

???????? 針對每個連接,調用anetTcpAccept函數進行accept,并將客戶端地址記錄到cip以及cport中;

???????? 建鏈后的socket描述符為cfd,根據該值調用acceptCommonHandler,該函數中,調用createClient創建一個redisClient結構,并注冊socket描述符上的可讀事件,回調函數為readQueryFromClient。最后將該redisClient結構存儲到全局客戶端列表server.clients中;

if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR) {close(fd);zfree(c);return NULL; }

?

三:接收客戶端請求,解析并處理請求

1:接收數據

???????? Redis服務器收到客戶端的請求數據后,就會觸發socket描述符上的可讀事件,從而調用其回調函數readQueryFromClient。??????

? ? ? ? ?在readQueryFromClient中,調用read讀取客戶端的請求,并緩存到redisClient結構中的輸入緩存querybuf中,該輸入緩存會根據接收到的數據長度動態擴容。接下來對收到的請求數據進行解析,并執行相應的命令處理函數。

? ? ? ? ?readQueryFromClient函數代碼如下:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = (redisClient*) privdata;int nread, readlen;size_t qblen;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);server.current_client = c;readlen = REDIS_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= REDIS_MBULK_BIG_ARG){int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);if (remaining < readlen) readlen = remaining;}qblen = sdslen(c->querybuf);if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);nread = read(fd, c->querybuf+qblen, readlen);if (nread == -1) {if (errno == EAGAIN) {nread = 0;} else {redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}} else if (nread == 0) {redisLog(REDIS_VERBOSE, "Client closed connection");freeClient(c);return;}if (nread) {sdsIncrLen(c->querybuf,nread);c->lastinteraction = server.unixtime;if (c->flags & REDIS_MASTER) c->reploff += nread;server.stat_net_input_bytes += nread;} else {server.current_client = NULL;return;}if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}processInputBuffer(c);server.current_client = NULL; }

???????? 該函數中,首先設置每次read讀取的最大字節數readlen為REDIS_IOBUF_LEN(16k)。然后得到輸入緩存c->querybuf當前長度qblen,也就是已接收到的客戶端請求數據的長度。根據qblen更新c->querybuf_peak的值,該屬性記錄了輸入緩存c->querybuf的最大長度。

?

? ? ? ? ?接下來為c->querybuf擴容,使其能容納readlen個字節;然后就調用read,最多讀取readlen個字節。讀取的內容追加到c->querybuf尾部。

???????? 如果read返回值nread為-1,若errno等于EAGAIN,說明暫無數據,置nread為0;否則記錄錯誤信息到日志,釋放客戶端結構redisClient,并關閉鏈接,然后直接返回;

???????? 如果read返回0,說明客戶端關閉連接,此時記錄信息到日志,釋放客戶端結構redisClient,并關閉鏈接,然后直接返回;

???????? read返回非0,說明讀取到了數據。判斷當前輸入緩存c->querybuf的長度是否大于閾值server.client_max_querybuf_len(1G)。若超過閾值,則記錄當前客戶端信息,以及c->querybuf中前64個字節到日志中,然后釋放客戶端結構redisClient,并關閉鏈接,然后直接返回;

?

? ? ? ? ?最后,調用processInputBuffer解析收到的數據,并在讀取到完整的一條命令請求之后,執行相應的命令處理函數。

?

2:解析處理客戶端命令

? ? ? ? ?Redis服務器收到客戶端的請求數據后,調用processInputBuffer函數解析輸入緩存redisClient->querybuf中的數據。在得到一條完整的命令請求數據后,就調用processCommand函數處理執行相應的命令。

? ? ? ? ?processInputBuffer的代碼如下:

void processInputBuffer(redisClient *c) {/* Keep processing while there is something in the input buffer */while(sdslen(c->querybuf)) {/* Return if clients are paused. */if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;/* Immediately abort if the client is in the middle of something. */if (c->flags & REDIS_BLOCKED) return;/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands). */if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;/* Determine request type when unknown. */if (!c->reqtype) {if (c->querybuf[0] == '*') {c->reqtype = REDIS_REQ_MULTIBULK;} else {c->reqtype = REDIS_REQ_INLINE;}}if (c->reqtype == REDIS_REQ_INLINE) {if (processInlineBuffer(c) != REDIS_OK) break;} else if (c->reqtype == REDIS_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != REDIS_OK) break;} else {redisPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* Only reset the client when the command was executed. */if (processCommand(c) == REDIS_OK)resetClient(c);}} }

? ? ? ? ?該函數中,只要c->querybuf不為空,就一直循環處理。在該循環中:

?

???????? 首先,根據客戶端的當前狀態標志c->flags,判斷是否需要繼續解析處理,比如:

? ? ? ? ?如果當前客戶端不是SLAVE節點,并且客戶端處于阻塞狀態,則直接返回;

? ? ? ? ?如果客戶端標志c->flags包含REDIS_BLOCKED,則直接返回;

???????? 如果客戶端標志c->flags包含REDIS_CLOSE_AFTER_REPLY,則直接返回。該標志表明發生了異常,服務器不再需要處理客戶端請求,在回復客戶端錯誤消息后直接關閉鏈接。

?

???????? 接下來,如果c->reqtype為0,說明剛要開始處理一條請求(第一次處理c->querybuf中的數據,或剛處理完一條完整的命令請求)。如果數據c->querybuf的首字節為'*',說明該請求會跨越多行(包含多個”\r\n”),則置c->reqtype為EDIS_REQ_MULTIBULK;否則說明該請求為單行請求,置c->reqtype為REDIS_REQ_INLINE;

???????? 如果c->reqtype為REDIS_REQ_INLINE,則調用processInlineBuffer解析單行請求,如果c->reqtype為EDIS_REQ_MULTIBULK,則調用processMultibulkBuffer解析多行請求。這兩個函數的返回值如果不是REDIS_OK,則說明尚未收到一條完整的請求,需要退出循環,函數返回后接著讀取剩余的數據;

???????? 如果這兩個函數返回為REDIS_OK,則說明已經收到并解析好了一條完整的請求,命令的參數已經分解到數組c->argv中,c->argc表示參數個數。

???????? 如果c->argc為0,則無需處理,直接調用resetClient重置客戶端狀態,也就是釋放c->argv數組中的元素,置c->argc、c->reqtype和c->multibulklen為0,置c->bulklen為-1等。然后接著處理c->querybuf中剩下的內容;

???????? 如果c->argc非0,則調用processCommand處理該命令,調用相應的命令處理函數。處理成功后,調用resetClient重置客戶端狀態。然后接著處理c->querybuf中剩下的內容。

?

???????? 函數processInlineBuffer和processMultibulkBuffer分別解析客戶端的單行請求和多行請求。這兩個函數返回REDIS_OK,說明已經收到并解析好了一條完整的請求,命令的參數已經分解到數組c->argv中,c->argc表示參數個數。

???????? 如果這倆函數返回REDIS_ERR,要么說明收到的客戶端命令請求尚不完整,這其實不是錯誤,這種情況下函數返回后,服務器需要繼續接收客戶端請求;要么說明客戶端發來的請求不符合統一請求協議的格式要求,這種情況下調用setProtocolError刪除c->querybuf相應的內容,并且將客戶端的標志位c->flags增加REDIS_CLOSE_AFTER_REPLY標記,從而在回復客戶端錯誤信息后直接關閉連接。

???????? processMultibulkBuffer函數要比processInlineBuffer稍微復雜一些,直接看一下processMultibulkBuffer的實現:

int processMultibulkBuffer(redisClient *c) {char *newline = NULL;int pos = 0, ok;long long ll;if (c->multibulklen == 0) {/* The client should have been reset */redisAssertWithInfo(c,NULL,c->argc == 0);/* Multi bulk length cannot be read without a \r\n */newline = strchr(c->querybuf,'\r');if (newline == NULL) {if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {addReplyError(c,"Protocol error: too big mbulk count string");setProtocolError(c,0);}return REDIS_ERR;}/* Buffer should also contain \n */if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))return REDIS_ERR;/* We know for sure there is a whole line since newline != NULL,* so go ahead and find out the multi bulk length. */redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);if (!ok || ll > 1024*1024) {addReplyError(c,"Protocol error: invalid multibulk length");setProtocolError(c,pos);return REDIS_ERR;}pos = (newline-c->querybuf)+2;if (ll <= 0) {sdsrange(c->querybuf,pos,-1);return REDIS_OK;}c->multibulklen = ll;/* Setup argv array on client structure */if (c->argv) zfree(c->argv);c->argv = zmalloc(sizeof(robj*)*c->multibulklen);}redisAssertWithInfo(c,NULL,c->multibulklen > 0);while(c->multibulklen) {/* Read bulk length if unknown */if (c->bulklen == -1) {newline = strchr(c->querybuf+pos,'\r');if (newline == NULL) {if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {addReplyError(c,"Protocol error: too big bulk count string");setProtocolError(c,0);return REDIS_ERR;}break;}/* Buffer should also contain \n */if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))break;if (c->querybuf[pos] != '$') {addReplyErrorFormat(c,"Protocol error: expected '$', got '%c'",c->querybuf[pos]);setProtocolError(c,pos);return REDIS_ERR;}ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);if (!ok || ll < 0 || ll > 512*1024*1024) {addReplyError(c,"Protocol error: invalid bulk length");setProtocolError(c,pos);return REDIS_ERR;}pos += newline-(c->querybuf+pos)+2;if (ll >= REDIS_MBULK_BIG_ARG) {size_t qblen;/* If we are going to read a large object from network* try to make it likely that it will start at c->querybuf* boundary so that we can optimize object creation* avoiding a large copy of data. */sdsrange(c->querybuf,pos,-1);pos = 0;qblen = sdslen(c->querybuf);/* Hint the sds library about the amount of bytes this string is* going to contain. */if (qblen < (size_t)ll+2)c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);}c->bulklen = ll;}/* Read bulk argument */if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {/* Not enough data (+2 == trailing \r\n) */break;} else {/* Optimization: if the buffer contains JUST our bulk element* instead of creating a new object by *copying* the sds we* just use the current sds string. */if (pos == 0 &&c->bulklen >= REDIS_MBULK_BIG_ARG &&(signed) sdslen(c->querybuf) == c->bulklen+2){c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);sdsIncrLen(c->querybuf,-2); /* remove CRLF */c->querybuf = sdsempty();/* Assume that if we saw a fat argument we'll see another one* likely... */c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);pos = 0;} else {c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen);pos += c->bulklen+2;}c->bulklen = -1;c->multibulklen--;}}/* Trim to pos */if (pos) sdsrange(c->querybuf,pos,-1);/* We're done when c->multibulk == 0 */if (c->multibulklen == 0) return REDIS_OK;/* Still not read to process the command */return REDIS_ERR; }

? ? ? ? ?redisClient結構中的multibulklen屬性,記錄正在解析的一條完整的命令請求中,尚未處理的命令參數的個數。如果c->multibulklen為0,說明當前要解析的是命令請求的開頭,格式為"*<n>\r\n"。

???????? 這種情況下,首先找到c->querybuf中的第一個'\r'的位置newline,如果c->querybuf中找不到'\r',說明收到的客戶端的請求尚不完整,直接返回REDIS_ERR。并且如果c->querybuf目前長度超過64k的話,則反饋給客戶端錯誤信息:"Protocol error: too big mbulk count string",然后調用setProtocolError為客戶端標志位c->flags增加REDIS_CLOSE_AFTER_REPLY標記;直接返回REDIS_ERR;

???????? 然后如果(newline-(c->querybuf))大于((signed)sdslen(c->querybuf)-2),說明收到的客戶端請求尚不完整(缺少'\n'),直接返回REDIS_ERR;

?

???????? 接下來就開始解析該行,該行內容的正確格式是"*<n>\r\n",其中<n>是一個表明接下來包含多少個字符串的整數。調用string2ll解析得到其中的整數ll,如果解析失敗,或者ll大于1M,則反饋給客戶端信息"Protocol error: invalid multibulk length",然后,調用setProtocolError為客戶端標志位c->flags增加REDIS_CLOSE_AFTER_REPLY標記,返回REDIS_ERR;

???????? 然后使pos記為c->querybuf下一行首地址的索引;

???????? 如果ll小于等于0,則直接清除c->querybuf中剛剛解析的行,直接返回REDIS_OK;然后將ll賦值到c->multibulklen中。然后根據c->multibulklen的值申請數組c->argv的空間,其數組長度就是c->multibulklen。

?

???????? 得到c->multibulklen的值后,接下來開始依次處理命令請求中的每一個字符串行:

???????? redisClient結構中的bulklen屬性,記錄接下來要解析的命令請求行中,包含的字符串的長度。如果c->bulklen為-1,說明當前要解析的,是字符串的長度行,格式為"$<n>\r\n"。

???????? 這種情況下,處理過程與c->multibulklen為0時的解析過程類似,不在贅述。解析完后,下一行中包含的字符串長度存儲在ll中,ll最大為512M,否則反饋給客戶端錯誤信息:"Protocol error: invalid bulk length",并且調用setProtocolError為客戶端標志位c->flags增加REDIS_CLOSE_AFTER_REPLY標記,返回REDIS_ERR;

???????? 然后使pos記為c->querybuf下一行首地址的索引;

???????? 如果字符串長度ll大于等于32k,為了后續創建字符串對象時避免復制大塊內存,直接使用c->querybuf創建字符串對象。因此直接將c->querybuf中pos之前的內容刪除,置pos為0,并且必要情況下為c->querybuf擴容。最后將ll賦值到c->bulklen中;

?

???????? 接下來開始解析c->querybuf中的字符串行,格式為"xxxx\r\n";

???????? 如果(sdslen(c->querybuf)-pos)小于((unsigned)(c->bulklen+2)),說明收到的客戶端請求中,字符串行尚不完整,直接退出循環,返回REDIS_ERR;

???????? 否則,如果同時滿足以下三個條件:

pos == 0;

c->bulklen >= REDIS_MBULK_BIG_ARG;

(signed) sdslen(c->querybuf) ==c->bulklen+2);

???????? 說明,當前c->querybuf中,不多不少正好包含的是一個大于32k的大字符串行,這種情況下,為了避免拷貝大塊內存,直接使用c->querybuf創建字符串對象,并存儲到c->argv中;然后重新創建c->querybuf,并為其擴容為c->bulklen+2,這樣可以容納在后續遇到的大字符串(Assume that if we saw a fat argument we'll see another one likely...);

???????? 如果不滿足上面的條件,則創建字符串對象,將c->querybuf+pos的內容復制到該字符串對象中;

?

???????? 處理完一個完整的字符串行后,重置c->bulklen為-1,并且c->multibulklen--;然后循環處理下一個字符串行;

????????

???????? 跳出循環后,首先刪除已解析的內容,如果c->multibulklen為0,說明已經完整的收到并解析了客戶端的一個跨多行的命令請求,返回REDIS_OK,表示可以開始處理該命令了;否則,返回REDIS_ERR,繼續接收客戶端請求;

?

? ? ? ? ?processInlineBuffer函數的實現要簡單很多,不再贅述。

?

四:回復客戶端

???????? 服務器執行完相應的命令處理函數之后,就會調用addReply類的函數將要回復給客戶端的信息寫入客戶端輸出緩存。這些函數包括addReply,addReplySds,addReplyError,addReplyStatus等。

???????? 這些函數首先都會調用prepareClientToWrite函數,注冊socket描述符上的可寫事件,然后將回復信息寫入到客戶端輸出緩存中。

? ? ? ? ?redisClient結構中有兩種客戶端輸出緩存,一種是靜態大小的數組(buf),一種是動態大小的列表(reply)。追加回復信息時,首先嘗試將信息追加到數組buf中,如果其空間不足,則將信息在追加到reply中。比如addReplyString的代碼如下:

void addReplyString(redisClient *c, char *s, size_t len) {if (prepareClientToWrite(c) != REDIS_OK) return;if (_addReplyToBuffer(c,s,len) != REDIS_OK)_addReplyStringToList(c,s,len); }

???????? 調用函數_addReplyToBuffer向c->buf中添加數據,如果該函數返回REDIS_ERR,說明添加失敗,則調用_addReplyStringToList,將數據添加到c->reply中。其他addReply類的函數也是類似的處理,不再贅述。

?

? ? ? ? ?每次向客戶端輸出緩存追加新數據之前,都要調用函數prepareClientToWrite。???? 因Redis中不同類型的客戶端需要不同的處理:有些客戶端(比如加載AOF文件時的偽客戶端)無需追加新數據,這種情況下,該函數直接返回REDIS_ERR;有些客戶端(比如Lua客戶端)需要追加新數據,但無需注冊socket描述符上的可寫事件;有些客戶端(普通客戶端)需要追加數據,并注冊socket描述符上的可寫事件;

? ? ? ? ?因此,調用prepareClientToWrite函數返回REDIS_ERR,則表示無需向輸出緩存追加新數據,只有返回REDIS_OK時才需要向輸出緩存中追加新數據。

?

? ? ? ? ?prepareClientToWrite函數的代碼如下:

int prepareClientToWrite(redisClient *c) {/* If it's the Lua client we always return ok without installing any* handler since there is no socket at all. */if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;/* Masters don't receive replies, unless REDIS_MASTER_FORCE_REPLY flag* is set. */if ((c->flags & REDIS_MASTER) &&!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;if (c->fd <= 0) return REDIS_ERR; /* Fake client for AOF loading. *//* Only install the handler if not already installed and, in case of* slaves, if the client can actually receive writes. */if (c->bufpos == 0 && listLength(c->reply) == 0 &&(c->replstate == REDIS_REPL_NONE ||(c->replstate == REDIS_REPL_ONLINE && !c->repl_put_online_on_ack))){/* Try to install the write handler. */if (aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c) == AE_ERR){freeClientAsync(c);return REDIS_ERR;}}/* Authorize the caller to queue in the output buffer of this client. */return REDIS_OK; }

? ? ? ? ?如果當前客戶端是Lua客戶端,直接返回REDIS_OK,而無需注冊socket描述符上的可寫事件,因為根本沒有socket描述符;

???????? 如果客戶端為Master節點,除非設置REDIS_MASTER_FORCE_REPLY標志,否則這種客戶端不接收回復,因此直接返回REDIS_ERR;

???????? 如果客戶端的socket描述符小于等于0,說明是加載AOF文件時的偽客戶端,直接返回REDIS_ERR;?

???????? 如果是普通客戶端,或者是在從節點需要接收數據時,如果此前從未注冊過socket上的可寫事件,則調用aeCreateFileEvent注冊socket描述符c->fd上的可寫事件,事件回調函數為sendReplyToClient;最后直接返回REDIS_OK;???????

?

???????? 當TCP輸出緩沖區有一定剩余空間時,socket描述符上的可寫事件就會觸發,從而調用事件回調函數sendReplyToClient。該函數調用write,將輸出緩存中的數據發送出去。函數的代碼如下:

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = privdata;int nwritten = 0, totwritten = 0, objlen;size_t objmem;robj *o;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);while(c->bufpos > 0 || listLength(c->reply)) {if (c->bufpos > 0) {nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);if (nwritten <= 0) break;c->sentlen += nwritten;totwritten += nwritten;/* If the buffer was sent, set bufpos to zero to continue with* the remainder of the reply. */if (c->sentlen == c->bufpos) {c->bufpos = 0;c->sentlen = 0;}} else {o = listNodeValue(listFirst(c->reply));objlen = sdslen(o->ptr);objmem = getStringObjectSdsUsedMemory(o);if (objlen == 0) {listDelNode(c->reply,listFirst(c->reply));c->reply_bytes -= objmem;continue;}nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);if (nwritten <= 0) break;c->sentlen += nwritten;totwritten += nwritten;/* If we fully sent the object on head go to the next one */if (c->sentlen == objlen) {listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;c->reply_bytes -= objmem;}}/* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT* bytes, in a single threaded server it's a good idea to serve* other clients as well, even if a very large request comes from* super fast link that is always able to accept data (in real world* scenario think about 'KEYS *' against the loopback interface).** However if we are over the maxmemory limit we ignore that and* just deliver as much data as it is possible to deliver. */server.stat_net_output_bytes += totwritten;if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&(server.maxmemory == 0 ||zmalloc_used_memory() < server.maxmemory)) break;}if (nwritten == -1) {if (errno == EAGAIN) {nwritten = 0;} else {redisLog(REDIS_VERBOSE,"Error writing to client: %s", strerror(errno));freeClient(c);return;}}if (totwritten > 0) {/* For clients representing masters we don't count sending data* as an interaction, since we always send REPLCONF ACK commands* that take some time to just fill the socket output buffer.* We just rely on data / pings received for timeout detection. */if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime;}if (c->bufpos == 0 && listLength(c->reply) == 0) {c->sentlen = 0;aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);/* Close connection after entire reply has been sent. */if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);} }

? ? ? ? ?當追加要發送的數據到輸出緩存時,首先嘗試將其添加到c->buf中;如果c->buf空間不足,則追加到c->reply中。如果使用的是c->buf,則c->bufpos表示其中緩存的數據總量,c->sentlen表示其中已發送的數據量;如果使用的是c->reply,則c->reply_bytes表示列表c->reply中,保存的所有sds字符串占用的內存總字節數,c->sentlen表示列表中的正在發送數據的單塊緩存元素中,已發送的數據量。

???????? 函數中的totwritten表示本函數當前已發送的數據量;

?

???????? 在函數中,如果c->bufpos大于0,或者listLength(c->reply)大于0,說明緩存中有數據要發送,進入循環,調用write發送數據,write返回值nwritten小于等于0時,要么是TCP輸出緩存無空間,要么是發生了錯誤,因此直接跳出循環。

? ? ? ? ?在循環中:如果c->bufpos大于0,說明使用的緩存是c->buf。因此調用write,將c->buf中的剩余數據(c->bufpos- c->sentlen個字節)發送出去。如果write返回值nwritten小于等于0時,直接跳出循環;否則,將nwritten增加到c->sentlen和totwritten中,繼續下一輪循環寫入。如果c->buf中的數據已全部發送出去,則重置c->bufpos和c->sentlen為0,表示清空緩存c->buf;

?

???????? 否則的話,表示使用的緩存是列表c->reply。得到其頭結點中保存的字符串對象o,然后得到該字符串的長度objlen,以及該字符串占用的內存objmem。接著調用write,將o->ptr中未發送的數據(objlen - c->sentlen個字節)全部發送出去。如果write返回值nwritten小于等于0時,直接跳出循環;否則,將nwritten增加到c->sentlen和totwritten中,繼續下一輪循環寫入。如果c->sentlen等于objlen,說明當前節點的數據已經全部發送完成,直接刪除該節點,并重置c->sentlen為0,并從c->reply_bytes中減去objmem;

???????? 接下來,將本次已發送的字節數totwritten加到server.stat_net_output_bytes中。

?

???????? 因本函數是可寫事件的回調函數,為了避免該函數執行時間過長,而影響其他事件的處理。因此這里限制該函數最大發送的字節數為REDIS_MAX_WRITE_PER_EVENT(64k),一旦已發送的字節數totwritten超過了該值,并且在沒設置最大內存限制,或者尚未超過設置的最大內存限制的條件下,直接退出循環,停止發送。

?

???????? 退出循環后,如果write出錯,并且errno為EAGAIN,說明TCP輸出緩存無空間了,這種情況不是錯誤,直接置nwritten = 0即可;否則需要記錄錯誤日志,并且調用freeClient釋放redisClient,關閉與客戶端的連接;

???????? 最后,如果緩存中所有的數據都已經發送完成,則置c->sentlen為0,并且刪除socket描述符c->fd上的可寫事件;如果客戶端標志c->flags中設置了REDIS_CLOSE_AFTER_REPLY,則調用freeClient釋放redisClient,關閉與客戶端的連接。

?

? ? ? ? ?其他相關代碼,可以參考:

https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/networking.c

轉載于:https://www.cnblogs.com/gqtcgq/p/7247057.html

總結

以上是生活随笔為你收集整理的Redis源码解析:14Redis服务器与客户端间的交互的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。