日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

redis源码客户端和服务端通信过程

發布時間:2025/6/15 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redis源码客户端和服务端通信过程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ? ? ? ? 最近想學習一下redis源碼,先看一下redis通信流程。由于功力有限,不足之處望大家指正。服務端和客戶端通信,一般都是服務端先啟動,那先從服務端的源碼看起。

? ? ? ? ?首先啟動服務端會做一些初始化動作,初始化事件處理器狀態,先看一下事件處理器狀態的結構

//事件處理器的狀態 typedef struct aeEventLoop {// 目前已注冊的最大描述符int maxfd; /* highest file descriptor currently registered */// 目前已追蹤的最大描述符int setsize; /* max number of file descriptors tracked */// 用于生成時間事件 idlong long timeEventNextId;// 最后一次執行時間事件的時間time_t lastTime; /* Used to detect system clock skew */// 已注冊的文件事件aeFileEvent *events; /* Registered events */// 已就緒的文件事件aeFiredEvent *fired; /* Fired events */// 時間事件aeTimeEvent *timeEventHead;// 事件處理器的開關int stop;// 多路復用庫的私有數據void *apidata; /* This is used for polling API specific data */// 在處理事件前要執行的函數aeBeforeSleepProc *beforesleep;} aeEventLoop;

? ? ? ? 下面要對事件處理器進行初始化。

//事件狀態 typedef struct aeApiState {// epoll_event 實例描述符int epfd;// 事件槽struct epoll_event *events; } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;// 初始化事件槽空間state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}// 創建 epoll 實例state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}// 賦值給 eventLoopeventLoop->apidata = state;return 0; }

? 上面創建的epoll句柄和初始化的事件槽保存到傳入的eventLoop事件對象中。這個事件對象保存在全局的一個redisserver中,redisServer中結構體成員很多,這里只展示一個

struct redisServer {//...// 事件狀態aeEventLoop *el;// 一個鏈表,保存了所有客戶端狀態結構list *clients; /* List of active clients *//... };

aeEventLoop *el 存儲剛才創建的事件狀態

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) {int s, rv;char _port[6]; /* strlen("65535") */struct addrinfo hints, *servinfo, *p;snprintf(_port,6,"%d",port);memset(&hints,0,sizeof(hints));hints.ai_family = af;hints.ai_socktype = SOCK_STREAM;hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {anetSetError(err, "%s", gai_strerror(rv));return ANET_ERR;}for (p = servinfo; p != NULL; p = p->ai_next) {if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)continue;if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;goto end;}if (p == NULL) {anetSetError(err, "unable to bind socket");goto error;}error:s = ANET_ERR; end:freeaddrinfo(servinfo);return s; }

上面的函數用來打開監聽端口

// 為 TCP 連接關聯連接應答(accept)處理器// 用于接受并應答客戶端的 connect() 調用for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, //使文件讀關聯一個函數acceptTcpHandler,NULL) == AE_ERR){redisPanic("Unrecoverable error creating server.ipfd file event.");}} /** 根據 mask 參數的值,監聽 fd 文件的狀態,* 當 fd 可用時,執行 proc 函數*/ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData) {if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}if (fd >= eventLoop->setsize) return AE_ERR;// 取出文件事件結構aeFileEvent *fe = &eventLoop->events[fd];// 監聽指定 fd 的指定事件if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;// 設置文件事件類型,以及事件的處理器fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;// 私有數據fe->clientData = clientData;// 如果有需要,更新事件處理器的最大 fdif (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK; }

aeCreateFileEvent函數用來注冊回調用,參數aeEventLoop *eventLoop就是前面初始化的事件處理器的狀態,當AE_READABLE產生時就會調用acceptTcpHandler函數,這時是有客戶端connect了。前面已經初始化了一定數量的處理器,aeApiAddEvent把所有的事件對象都注冊到epoll,后面接著設置對應AE_READABLE和AE_WRITABLE對應的回調函數。

/* File event structure** 文件事件結構*/ typedef struct aeFileEvent {// 監聽事件類型掩碼,// 值可以是 AE_READABLE 或 AE_WRITABLE ,// 或者 AE_READABLE | AE_WRITABLEint mask; /* one of AE_(READABLE|WRITABLE) */// 讀事件處理器aeFileProc *rfileProc;// 寫事件處理器aeFileProc *wfileProc;// 多路復用庫的私有數據void *clientData;} aeFileEvent;

上面是文件事件結構的結構體,對應的讀和寫的回調函數都保存在aeFileEvent(文件事件)中,aeFileEvent(文件事件)就是aeEventLoop(事件處理器狀態)的成員,aeEventLoop(事件處理器狀態)就是redisServer結構體中aeEventLoop *el(事件狀態成員),所有的這些都保存在全局的redisServer結構體中。接下來就是事件處理器主循環中

/** 事件處理器的主循環*/ void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {// 如果有需要在事件處理前執行的函數,那么運行它if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);// 開始處理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS);//一直循環調用這個函數等到消息} } //處理所有已到達的時間事件,以及所有已就緒的文件事件。 int aeProcessEvents(aeEventLoop *eventLoop, int flags) {int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;// 獲取最近的時間事件if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {// 如果時間事件存在的話// 那么根據最近可執行時間事件和現在時間的時間差來決定文件事件的阻塞時間long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */// 計算距今最近的時間事件還要多久才能達到// 并將該時間距保存在 tv 結構中aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}// 時間差小于 0 ,說明事件已經可以執行了,將秒和毫秒設為 0 (不阻塞)if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {// 執行到這一步,說明沒有時間事件// 那么根據 AE_DONT_WAIT 是否設置來決定是否阻塞,以及阻塞的時間長度/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {// 設置文件事件不阻塞tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */// 文件事件可以阻塞直到有事件到達為止tvp = NULL; /* wait forever */}}// 處理文件事件,阻塞時間由 tvp 決定numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {// 從已就緒數組中獲取事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */// 讀事件if (fe->mask & mask & AE_READABLE) {// rfired 確保讀/寫事件只能執行其中一個rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}// 寫事件if (fe->mask & mask & AE_WRITABLE) {printf("can writable\n");if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/* Check time events */// 執行時間事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */ } /** 獲取可執行事件*/ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// 等待時間retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);//epoll_wait用于向用戶進程返回ready list// 有至少一個事件就緒?if (retval > 0) {int j;// 為已就緒事件設置相應的模式// 并加入到 eventLoop 的 fired 數組中numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}// 返回已就緒事件個數return numevents; }

aeProcessEvents一直被循環調用用來處理就緒的文件事件(時間事件這里不考慮),通過調用aeApiPoll中的epoll_wait等待事件的促發。

typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; /* A fired event** 已就緒事件*/ typedef struct aeFiredEvent {// 已就緒文件描述符int fd;// 事件類型掩碼,// 值可以是 AE_READABLE 或 AE_WRITABLE// 或者是兩者的或int mask;} aeFiredEvent;

上面列出了epoll結構體和aeFiredEvent(已就緒事件結構體),aeFiredEvent屬于事件處理器狀態(aeEventLoop)成員,并循環保存就緒文件事件對應中已就緒描述符和其類型,這些又都保存在事件處理器狀態(aeEventLoop)中。函數返回到aeProcessEvents中,然后走對應的回調(這時候還沒講回調關聯對應的函數)。

現在假如有客戶端來連接了,按前面說的,套接字變的可讀,acceptTcpHandler被調用,acceptTcpHandler函數接收客戶端的連接,并為客戶端創建狀態,并注冊讀取客戶端命令的函數readQueryFromClient。并把創建的客戶端保存在redisServer里面的list *clients里面。

/** 創建一個新客戶端*/ redisClient *createClient(int fd) {printf("-----------%s--------\n",__FUNCTION__);// 分配空間redisClient *c = zmalloc(sizeof(redisClient));/* passing -1 as fd it is possible to create a non connected client.* This is useful since all the Redis commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */// 當 fd 不為 -1 時,創建帶網絡連接的客戶端// 如果 fd 為 -1 ,那么創建無網絡連接的偽客戶端// 因為 Redis 的命令必須在客戶端的上下文中使用,所以在執行 Lua 環境中的命令時// 需要用到這種偽終端if (fd != -1) {// 非阻塞anetNonBlock(NULL,fd);// 禁用 Nagle 算法anetEnableTcpNoDelay(NULL,fd);// 設置 keep aliveif (server.tcpkeepalive)anetKeepAlive(NULL,fd,server.tcpkeepalive);// 綁定讀事件到事件 loop (開始接收命令請求)if (aeCreateFileEvent(server.el,fd,AE_READABLE, //客戶端連接上之后,再為客戶端關聯一個讀數據的函數。之前關聯的建立連接readQueryFromClient, c) == AE_ERR) //沒有建立連接之前關聯建立函數,建立連接之后關聯讀數據的函數{close(fd);zfree(c);return NULL;}}// 初始化各個屬性// 默認數據庫selectDb(c,0);// 套接字c->fd = fd;// 名字c->name = NULL;// 回復緩沖區的偏移量c->bufpos = 0;// 查詢緩沖區c->querybuf = sdsempty();// 查詢緩沖區峰值c->querybuf_peak = 0;// 命令請求的類型c->reqtype = 0;// 命令參數數量c->argc = 0;// 命令參數c->argv = NULL;// 當前執行的命令和最近一次執行的命令c->cmd = c->lastcmd = NULL;// 查詢緩沖區中未讀入的命令內容數量c->multibulklen = 0;// 讀入的參數的長度c->bulklen = -1;// 已發送字節數c->sentlen = 0;// 狀態 FLAGc->flags = 0;// 創建時間和最后一次互動時間c->ctime = c->lastinteraction = server.unixtime;// 認證狀態c->authenticated = 0;// 復制狀態c->replstate = REDIS_REPL_NONE;// 復制偏移量c->reploff = 0;// 通過 ACK 命令接收到的偏移量c->repl_ack_off = 0;// 通過 AKC 命令接收到偏移量的時間c->repl_ack_time = 0;// 客戶端為從服務器時使用,記錄了從服務器所使用的端口號c->slave_listening_port = 0;// 回復鏈表c->reply = listCreate();// 回復鏈表的字節量c->reply_bytes = 0;// 回復緩沖區大小達到軟限制的時間c->obuf_soft_limit_reached_time = 0;// 回復鏈表的釋放和復制函數listSetFreeMethod(c->reply,decrRefCountVoid);listSetDupMethod(c->reply,dupClientReplyValue);// 阻塞類型c->btype = REDIS_BLOCKED_NONE;// 阻塞超時c->bpop.timeout = 0;// 造成客戶端阻塞的列表鍵c->bpop.keys = dictCreate(&setDictType,NULL);// 在解除阻塞時將元素推入到 target 指定的鍵中// BRPOPLPUSH 命令時使用c->bpop.target = NULL;c->bpop.numreplicas = 0;c->bpop.reploffset = 0;c->woff = 0;// 進行事務時監視的鍵c->watched_keys = listCreate();// 訂閱的頻道和模式c->pubsub_channels = dictCreate(&setDictType,NULL);c->pubsub_patterns = listCreate();c->peerid = NULL;listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);listSetMatchMethod(c->pubsub_patterns,listMatchObjects);// 如果不是偽客戶端,那么添加到服務器的客戶端鏈表中if (fd != -1) listAddNodeTail(server.clients,c);// 初始化客戶端的事務狀態initClientMultiState(c);// 返回客戶端return c; } /* With multiplexing we need to take per-client state.* Clients are taken in a liked list.** 因為 I/O 復用的緣故,需要為每個客戶端維持一個狀態。** 多個客戶端狀態被服務器用鏈表連接起來。*/ typedef struct redisClient {// 套接字描述符int fd;// 當前正在使用的數據庫redisDb *db;// 當前正在使用的數據庫的 id (號碼)int dictid;// 客戶端的名字robj *name; /* As set by CLIENT SETNAME */// 查詢緩沖區sds querybuf;// 查詢緩沖區長度峰值size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */// 參數數量int argc;// 參數對象數組robj **argv;// 記錄被客戶端執行的命令struct redisCommand *cmd, *lastcmd;// 請求的類型:內聯命令還是多條命令int reqtype;// 剩余未讀取的命令內容數量int multibulklen; /* number of multi bulk arguments left to read */// 命令內容的長度long bulklen; /* length of bulk argument in multi bulk request */// 回復鏈表list *reply;// 回復鏈表中對象的總大小unsigned long reply_bytes; /* Tot bytes of objects in reply list */// 已發送字節,處理 short write 用int sentlen; /* Amount of bytes already sent in the currentbuffer or object being sent. */// 創建客戶端的時間time_t ctime; /* Client creation time */// 客戶端最后一次和服務器互動的時間time_t lastinteraction; /* time of the last interaction, used for timeout */// 客戶端的輸出緩沖區超過軟性限制的時間time_t obuf_soft_limit_reached_time;// 客戶端狀態標志int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */// 當 server.requirepass 不為 NULL 時// 代表認證的狀態// 0 代表未認證, 1 代表已認證int authenticated; /* when requirepass is non-NULL */// 復制狀態int replstate; /* replication state if this is a slave */// 用于保存主服務器傳來的 RDB 文件的文件描述符int repldbfd; /* replication DB file descriptor */// 讀取主服務器傳來的 RDB 文件的偏移量off_t repldboff; /* replication DB file offset */// 主服務器傳來的 RDB 文件的大小off_t repldbsize; /* replication DB file size */sds replpreamble; /* replication DB preamble. */// 主服務器的復制偏移量long long reploff; /* replication offset if this is our master */// 從服務器最后一次發送 REPLCONF ACK 時的偏移量long long repl_ack_off; /* replication ack offset, if this is a slave */// 從服務器最后一次發送 REPLCONF ACK 的時間long long repl_ack_time;/* replication ack time, if this is a slave */// 主服務器的 master run ID// 保存在客戶端,用于執行部分重同步char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */// 從服務器的監聽端口號int slave_listening_port; /* As configured with: SLAVECONF listening-port */// 事務狀態multiState mstate; /* MULTI/EXEC state */// 阻塞類型int btype; /* Type of blocking op if REDIS_BLOCKED. */// 阻塞狀態blockingState bpop; /* blocking state */// 最后被寫入的全局復制偏移量long long woff; /* Last write global replication offset. */// 被監視的鍵list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */// 這個字典記錄了客戶端所有訂閱的頻道// 鍵為頻道名字,值為 NULL// 也即是,一個頻道的集合dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */// 鏈表,包含多個 pubsubPattern 結構// 記錄了所有訂閱頻道的客戶端的信息// 新 pubsubPattern 結構總是被添加到表尾list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */sds peerid; /* Cached peer ID. *//* Response buffer */// 回復偏移量int bufpos;// 回復緩沖區char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient;

當客戶端發送命令過來時,epoll返回,readQueryFromClient被調用,注意回調函數的轉變。沒建立連接之前是關聯acceptTcpHandler,建立連接之后關聯readQueryFromClient函數讀取客戶端的數據。

/** 讀取客戶端的查詢緩沖區內容*/ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {printf("-----------%s--------\n",__FUNCTION__);redisClient *c = (redisClient*) privdata;int nread, readlen;size_t qblen;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);// 設置服務器的當前客戶端server.current_client = c;// 讀入長度(默認為 16 MB)readlen = REDIS_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= REDIS_MBULK_BIG_ARG){int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);if (remaining < readlen) readlen = remaining;}// 獲取查詢緩沖區當前內容的長度// 如果讀取出現 short read ,那么可能會有內容滯留在讀取緩沖區里面// 這些滯留內容也許不能完整構成一個符合協議的命令,qblen = sdslen(c->querybuf);// 如果有需要,更新緩沖區內容長度的峰值(peak)if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;// 為查詢緩沖區分配空間c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);// 讀入內容到查詢緩存nread = read(fd, c->querybuf+qblen, readlen);//接收客戶端發送過來的數據到// 讀入出錯if (nread == -1) {if (errno == EAGAIN) {nread = 0;} else {redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}// 遇到 EOF} else if (nread == 0) {redisLog(REDIS_VERBOSE, "Client closed connection");freeClient(c);return;}if (nread) {// 根據內容,更新查詢緩沖區(SDS) free 和 len 屬性// 并將 '\0' 正確地放到內容的最后sdsIncrLen(c->querybuf,nread);// 記錄服務器和客戶端最后一次互動的時間c->lastinteraction = server.unixtime;// 如果客戶端是 master 的話,更新它的復制偏移量if (c->flags & REDIS_MASTER) c->reploff += nread;} else {// 在 nread == -1 且 errno == EAGAIN 時運行server.current_client = NULL;return;}// 查詢緩沖區長度超出服務器最大緩沖區長度// 清空緩沖區并釋放客戶端if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}// 從查詢緩存重讀取內容,創建參數,并執行命令// 函數會執行到緩存中的所有內容都被處理完為止processInputBuffer(c);server.current_client = NULL; }

收到客戶端的命令之后就要分析并執行命令,然后被結果返給客戶端。readQueryFromClient->processInputBuffer->processCommand->addReply->prepareClientToWrite。prepareClientToWrite這個函數就是注冊回復客戶端的函數sendReplyToClient。

int prepareClientToWrite(redisClient *c) {// LUA 腳本環境所使用的偽客戶端總是可寫的if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;// 客戶端是主服務器并且不接受查詢,// 那么它是不可寫的,出錯if ((c->flags & REDIS_MASTER) &&!(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;// 無連接的偽客戶端總是不可寫的if (c->fd <= 0) return REDIS_ERR; /* Fake client */// 一般情況,為客戶端套接字安裝寫處理器到事件循環if (c->bufpos == 0 && listLength(c->reply) == 0 &&(c->replstate == REDIS_REPL_NONE ||c->replstate == REDIS_REPL_ONLINE) &&aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c) == AE_ERR) return REDIS_ERR;return REDIS_OK; }

每一個階段都關聯一個回調函數,當事件觸發后走回調函數。

總結

以上是生活随笔為你收集整理的redis源码客户端和服务端通信过程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。