日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

【Redis学习笔记】2018-06-12 复制与传播

發(fā)布時間:2025/3/20 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Redis学习笔记】2018-06-12 复制与传播 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

順風(fēng)車運營研發(fā)團(tuán)隊 譚淼
一、復(fù)制

Redis的主從同步復(fù)制包括兩種方式:一種是完全復(fù)制,另一種是部分復(fù)制。

完全復(fù)制:主Redis生產(chǎn)RDB,傳輸?shù)綇姆?wù)器進(jìn)行同步。

部分復(fù)制:主Redis從復(fù)制積壓緩沖區(qū)中獲取數(shù)據(jù),發(fā)送給從服務(wù)器進(jìn)行同步。

假設(shè)兩臺Redis服務(wù)器A和B啟動后,對B執(zhí)行slaveof命令,使得B變?yōu)锳的從服務(wù)器,整體流程如下:

(1)B接收到slaveof命令時候,會執(zhí)行slaveofCommand()函數(shù),slaveofCommand()函數(shù)會調(diào)用replicationSetMaster()函數(shù)中,將復(fù)制狀態(tài)為REPL_STATE_CONNECT

/* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) {......server.repl_state = REPL_STATE_CONNECT;server.repl_down_since = 0; }

(2)周期調(diào)度的時間事件中,會定期執(zhí)行replicationCron()函數(shù)

/* This is our timer interrupt, called server.hz times per second.* Here is where we do a number of things that need to be done asynchronously.* For instance:** - Active expired keys collection (it is also performed in a lazy way on* lookup).* - Software watchdog.* - Update some statistic.* - Incremental rehashing of the DBs hash tables.* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.* - Clients timeout of different kinds.* - Replication reconnection.* - Many more...** Everything directly called here will be called server.hz times per second,* so in order to throttle execution of things we want to do less frequently* a macro is used: run_with_period(milliseconds) { .... }*/int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {....../* Replication cron function -- used to reconnect to master,* detect transfer failures, start background RDB transfers and so forth. */run_with_period(1000) replicationCron();...... }

(3)在replicationCron()函數(shù)中,如果檢測到REPL_STATE_CONNECT狀態(tài),調(diào)用connectWithMaster()。

/* --------------------------- REPLICATION CRON ---------------------------- *//* Replication cron function, called 1 time per second. */ void replicationCron(void) {....../* Check if we should connect to a MASTER */if (server.repl_state == REPL_STATE_CONNECT) {serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",server.masterhost, server.masterport);if (connectWithMaster() == C_OK) {serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");}}...... }

(4)在connectWithMaster()中,會設(shè)置文件事件,事件處理函數(shù)為syncWithMaster()函數(shù)

int connectWithMaster(void) {int fd;fd = anetTcpNonBlockBestEffortBindConnect(NULL,server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);if (fd == -1) {serverLog(LL_WARNING,"Unable to connect to MASTER: %s",strerror(errno));return C_ERR;}if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==AE_ERR){close(fd);serverLog(LL_WARNING,"Can't create readable event for SYNC");return C_ERR;}server.repl_transfer_lastio = server.unixtime;server.repl_transfer_s = fd;server.repl_state = REPL_STATE_CONNECTING;return C_OK; }

(5)在syncWithMaster中,從Redis首先會與主Redis進(jìn)行通信,交換關(guān)鍵信息

(6)master根接收到PSYN消息后,根據(jù)復(fù)制ID如果復(fù)制ID與自己的復(fù)制ID相同且復(fù)制偏移量仍然存在于復(fù)制緩存區(qū)中(server.repl_backlog),那么執(zhí)行部分重同步,回復(fù)CONTINUE消息,并從復(fù)制緩存區(qū)中復(fù)制相關(guān)的數(shù)據(jù)到slave。否則執(zhí)行全量重同步,回復(fù)FULLRESYNC消息,生成RDB傳輸?shù)絪lave。

二、命令傳播

當(dāng)在主Redis寫一條命令時,會調(diào)用server.c中的call()函數(shù),call()函數(shù)會調(diào)用propagate()來向從Redis更新數(shù)據(jù)

/* Call() is the core of Redis execution of a command.** The following flags can be passed:* CMD_CALL_NONE No flags.* CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed.* CMD_CALL_STATS Populate command stats.* CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset* or if the client flags are forcing propagation.* CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset* or if the client flags are forcing propagation.* CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.* CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.** The exact propagation behavior depends on the client flags.* Specifically:** 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set* and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set* in the call flags, then the command is propagated even if the* dataset was not affected by the command.* 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP* are set, the propagation into AOF or to slaves is not performed even* if the command modified the dataset.** Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF* or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or* slaves propagation will never occur.** Client flags are modified by the implementation of a given command* using the following API:** forceCommandPropagation(client *c, int flags);* preventCommandPropagation(client *c);* preventCommandAOF(client *c);* preventCommandReplication(client *c);**/ void call(client *c, int flags) {....../* Propagate the command into the AOF and replication link */if (flags & CMD_CALL_PROPAGATE &&(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP){......if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);}...... }

propagate()會調(diào)用replicationFeedSlaves(),向復(fù)制積壓緩沖區(qū)中寫入數(shù)據(jù)

/* Propagate the specified command (in the context of the specified database id)* to AOF and Slaves.** flags are an xor between:* + PROPAGATE_NONE (no propagation of command at all)* + PROPAGATE_AOF (propagate into the AOF file if is enabled)* + PROPAGATE_REPL (propagate into the replication link)** This should not be used inside commands implementation. Use instead* alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().*/ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags) {if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)feedAppendOnlyFile(cmd,dbid,argv,argc);if (flags & PROPAGATE_REPL)replicationFeedSlaves(server.slaves,dbid,argv,argc); }

replicationFeedSlaves()負(fù)責(zé)向復(fù)制積壓緩沖區(qū)中寫入數(shù)據(jù)

/* Propagate write commands to slaves, and populate the replication backlog* as well. This function is used if the instance is a master: we use* the commands received by our clients in order to create the replication* stream. Instead if the instance is a slave and has sub-slaves attached,* we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {....../* Write the command to the replication backlog if any. */if (server.repl_backlog) {char aux[LONG_STR_SIZE+3];/* Add the multi bulk reply length. */aux[0] = '*';len = ll2string(aux+1,sizeof(aux)-1,argc);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);for (j = 0; j < argc; j++) {long objlen = stringObjectLen(argv[j]);/* We need to feed the buffer with the object as a bulk reply* not just as a plain string, so create the $..CRLF payload len* and add the final CRLF */aux[0] = '$';len = ll2string(aux+1,sizeof(aux)-1,objlen);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);feedReplicationBacklogWithObject(argv[j]);feedReplicationBacklog(aux+len+1,2);}}/* Write the command to every slave. */listRewind(slaves,&li);while((ln = listNext(&li))) {client *slave = ln->value;/* Don't feed slaves that are still waiting for BGSAVE to start */if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;/* Feed slaves that are waiting for the initial SYNC (so these commands* are queued in the output buffer until the initial SYNC completes),* or are already in sync with the master. *//* Add the multi bulk length. */addReplyMultiBulkLen(slave,argc);/* Finally any additional argument that was not stored inside the* static buffer if any (from j to argc). */for (j = 0; j < argc; j++)addReplyBulk(slave,argv[j]);} }

總結(jié)

以上是生活随笔為你收集整理的【Redis学习笔记】2018-06-12 复制与传播的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。