Redis源码剖析(十二)--客户端和服务器
?客戶端屬性
客戶端的狀態保存在結構體 redisClient 中,下面給出redisClient的部分屬性:
typedef struct redisClient{// 套接字描述符int fd; // 客戶端狀態標志int flags;// 輸入緩沖區 sds querybuf;// 命令參數robj** argv;int argc;// 命令的實現函數struct redisCommand *cmd;// 固定輸出緩沖區char buf[REDIS_REPLY_CHUNK_BYTES];int bufpos;// 可變大小輸出緩沖區list* reply;// ...... };-
?fd 屬性:客戶端使用的套接字描述符,偽客戶端的fd屬性為 -1,普通客戶端的fd屬性為大于0的整數。
-
flags 屬性:客戶端狀態。在主從服務器復制時,主服務器和從服務器互為客戶端,REDIS_MASTER 標志表示客戶端代表的是一個主服務器,REDIS_SLAVE 標志表示客戶端代表的是一個從服務器。
-
querybuf 屬性:保存客戶端發送的命令請求。
-
argv、argc 屬性:對客戶端的命令請求分析,得到的命令參數及命令參數的個數。
-
cmd 屬性:服務器從客戶端發送的命令請求中分析得到argv、argc參數后,會根據argv[0]的值,去查找該命令對應的實現函數,并使cmd指針指向該實現函數。
-
buf、bufpos 屬性:bufpos屬性記錄了buf 數組已使用的字節數量。
-
reply 屬性:當buf 數組空間不夠用時,服務器會使用 reply 可變大小緩沖區。
?
?命令請求
命令的執行過程
服務器在接收到命令后,會將命令以對象的形式保存在服務器client的參數列表 robj** argv 中,因此服務器執行命令請求時,服務器已經讀入了一套命令參數保存在參數列表中。執行命令的過程對應的函數是processCommand(),部分源碼如下:
int processCommand(redisClient *c) {// 查找命令,并進行命令合法性檢查,以及命令參數個數檢查c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);if (!c->cmd) {// 沒找到指定的命令 flagTransaction(c);addReplyErrorFormat(c,"unknown command '%s'",(char*)c->argv[0]->ptr);return REDIS_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {// 參數個數錯誤 flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);return REDIS_OK;}// ....../* Exec the command */if (c->flags & REDIS_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){// 在事務上下文中// 除 EXEC 、 DISCARD 、 MULTI 和 WATCH 命令之外// 其他所有命令都會被入隊到事務隊列中 queueMultiCommand(c);addReply(c,shared.queued);} else {// 執行命令 call(c,REDIS_CALL_FULL);c->woff = server.master_repl_offset;// 處理那些解除了阻塞的鍵if (listLength(server.ready_keys))handleClientsBlockedOnLists();}return REDIS_OK; }?
我們總結出執行命令的大致過程:
-
查找命令。對應的代碼是:c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr)
-
執行命令前的準備
-
執行命令。對應代碼是:call(c,REDIS_CALL_FULL)
?
?
查找命令
lookupCommand 函數是對 dictFetchValue 函數的封裝。dictFetchValue 函數會從 server.commands 字典中查找 name 命令。這個保存命令表的字典,鍵是命令的名稱,值是命令表的地址。服務器初始化時會創建一張命令表。命令表部分代碼如下:
?
?
?
?執行命令
執行命令調用了call(c, CMD_CALL_FULL)函數,該函數是執行命令的核心。該函數其實是對 c->cmd->proc(c) 的封裝, proc 指向命令的實現函數。
void call(redisClient *c, int flags) {// start 記錄命令開始執行的時間long long dirty, start, duration;// 記錄命令開始執行前的 FLAGint client_old_flags = c->flags; // 如果可以的話,將命令發送到 MONITORif (listLength(server.monitors) &&!server.loading &&!(c->cmd->flags & REDIS_CMD_SKIP_MONITOR)){replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);}/* Call the command. */c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);redisOpArrayInit(&server.also_propagate);// 保留舊 dirty 計數器值dirty = server.dirty;// 計算命令開始執行的時間start = ustime();// 執行實現函數c->cmd->proc(c);// 計算命令執行耗費的時間duration = ustime()-start;// 計算命令執行之后的 dirty 值dirty = server.dirty-dirty; // 不將從 Lua 中發出的命令放入 SLOWLOG ,也不進行統計if (server.loading && c->flags & REDIS_LUA_CLIENT)flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); // 如果調用者是 Lua ,那么根據命令 FLAG 和客戶端 FLAG// 打開傳播(propagate)標志if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {if (c->flags & REDIS_FORCE_REPL)server.lua_caller->flags |= REDIS_FORCE_REPL;if (c->flags & REDIS_FORCE_AOF)server.lua_caller->flags |= REDIS_FORCE_AOF;} // 如果有需要,將命令放到 SLOWLOG 里面if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand)slowlogPushEntryIfNeeded(c->argv,c->argc,duration);// 更新命令的統計信息if (flags & REDIS_CALL_STATS) {c->cmd->microseconds += duration;c->cmd->calls++;} // 將命令復制到 AOF 和 slave 節點if (flags & REDIS_CALL_PROPAGATE) {int flags = REDIS_PROPAGATE_NONE;// 強制 REPL 傳播if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;// 強制 AOF 傳播if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;// 如果數據庫有被修改,那么啟用 REPL 和 AOF 傳播if (dirty)flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);if (flags != REDIS_PROPAGATE_NONE)propagate(c->cmd,c->db->id,c->argv,c->argc,flags);} // 將客戶端的 FLAG 恢復到命令執行之前// 因為 call 可能會遞歸執行c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL); // 傳播額外的命令if (server.also_propagate.numops) {int j;redisOp *rop;for (j = 0; j < server.also_propagate.numops; j++) {rop = &server.also_propagate.ops[j];propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);}redisOpArrayFree(&server.also_propagate);}server.stat_numcommands++; }?
?執行命令 c->cmd->proc(c) 就相當于執行了命令實現的函數,然后會在執行完成后,由這些函數產生相應的命令回復,根據回復的大小,會將回復保存在輸出緩沖區 buf 或可變輸出緩沖區鏈表 reply 中。
?
?
?maxmemory策略
Redis 服務器對內存使用會有一個server.maxmemory的限制,如果超過這個限制,就要通過刪除一些鍵空間來釋放一些內存,具體函數對應freeMemoryIfNeeded()。釋放內存時,可以指定不同的策略。策略保存在maxmemory_policy中,可以指定以下的幾個值:
#define MAXMEMORY_VOLATILE_LRU 0 #define MAXMEMORY_VOLATILE_TTL 1 #define MAXMEMORY_VOLATILE_RANDOM 2 #define MAXMEMORY_ALLKEYS_LRU 3 #define MAXMEMORY_ALLKEYS_RANDOM 4 #define MAXMEMORY_NO_EVICTION 5可以看出主要分為三種:
- LRU:優先刪除最近最少使用的鍵。
- TTL:優先刪除生存時間最短的鍵。
- RANDOM:隨機刪除。
而ALLKEYS和VOLATILE的不同之處就是要確定是從數據庫的鍵值對字典還是過期鍵字典中刪除。
int freeMemoryIfNeeded(void) {size_t mem_used, mem_tofree, mem_freed;int slaves = listLength(server.slaves);// 計算出 Redis 目前占用的內存總數,但有兩個方面的內存不會計算在內:// 1)從服務器的輸出緩沖區的內存// 2)AOF 緩沖區的內存mem_used = zmalloc_used_memory();if (slaves) {listIter li;listNode *ln;listRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = listNodeValue(ln);unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);if (obuf_bytes > mem_used)mem_used = 0;elsemem_used -= obuf_bytes;}}if (server.aof_state != REDIS_AOF_OFF) {mem_used -= sdslen(server.aof_buf);mem_used -= aofRewriteBufferSize();} // 如果目前使用的內存大小比設置的 maxmemory 要小,那么無須執行進一步操作if (mem_used <= server.maxmemory) return REDIS_OK;// 如果占用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那么直接返回if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)return REDIS_ERR; /* We need to free memory, but policy forbids. */// 計算需要釋放多少字節的內存mem_tofree = mem_used - server.maxmemory;// 初始化已釋放內存的字節數為 0mem_freed = 0;// 根據 maxmemory 策略,// 遍歷字典,釋放內存并記錄被釋放內存的字節數while (mem_freed < mem_tofree) {int j, k, keys_freed = 0;// 遍歷所有字典for (j = 0; j < server.dbnum; j++) {long bestval = 0; /* just to prevent warning */sds bestkey = NULL;dictEntry *de;redisDb *db = server.db+j;dict *dict;if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM){// 如果策略是 allkeys-lru 或者 allkeys-random // 那么淘汰的目標為所有數據庫鍵dict = server.db[j].dict;} else {// 如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl // 那么淘汰的目標為帶過期時間的數據庫鍵dict = server.db[j].expires;}// 跳過空字典if (dictSize(dict) == 0) continue;/* volatile-random and allkeys-random policy */// 如果使用的是隨機策略,那么從目標字典中隨機選出鍵if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM){de = dictGetRandomKey(dict);bestkey = dictGetKey(de);} // 如果使用的是 LRU 策略,// 那么從一集 sample 鍵中選出 IDLE 時間最長的那個鍵else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU){struct evictionPoolEntry *pool = db->eviction_pool;while(bestkey == NULL) {evictionPoolPopulate(dict, db->dict, db->eviction_pool);/* Go backward from best to worst element to evict. */for (k = REDIS_EVICTION_POOL_SIZE-1; k >= 0; k--) {if (pool[k].key == NULL) continue;de = dictFind(dict,pool[k].key);/* Remove the entry from the pool. */sdsfree(pool[k].key);/* Shift all elements on its right to left. */memmove(pool+k,pool+k+1,sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1));/* Clear the element on the right which is empty* since we shifted one position to the left. */pool[REDIS_EVICTION_POOL_SIZE-1].key = NULL;pool[REDIS_EVICTION_POOL_SIZE-1].idle = 0;/* If the key exists, is our pick. Otherwise it is* a ghost and we need to try the next element. */if (de) {bestkey = dictGetKey(de);break;} else {/* Ghost... */continue;}}}}// 策略為 volatile-ttl ,從一集 sample 鍵中選出過期時間距離當前時間最接近的鍵else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {for (k = 0; k < server.maxmemory_samples; k++) {sds thiskey;long thisval;de = dictGetRandomKey(dict);thiskey = dictGetKey(de);thisval = (long) dictGetVal(de);/* Expire sooner (minor expire unix timestamp) is better* candidate for deletion */if (bestkey == NULL || thisval < bestval) {bestkey = thiskey;bestval = thisval;}}} // 刪除被選中的鍵if (bestkey) {long long delta;robj *keyobj = createStringObject(bestkey,sdslen(bestkey));propagateExpire(db,keyobj);// 計算刪除鍵所釋放的內存數量delta = (long long) zmalloc_used_memory();dbDelete(db,keyobj);delta -= (long long) zmalloc_used_memory();mem_freed += delta;// 對淘汰鍵的計數器增一server.stat_evictedkeys++;notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",keyobj, db->id);decrRefCount(keyobj);keys_freed++; if (slaves) flushSlavesOutputBuffers();}}if (!keys_freed) return REDIS_ERR; /* nothing to free... */}return REDIS_OK; }?
轉載于:https://www.cnblogs.com/lizhimin123/p/10215368.html
總結
以上是生活随笔為你收集整理的Redis源码剖析(十二)--客户端和服务器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java HashMap的put操作(J
- 下一篇: 数据库时间内接受的是lang类型的时间