當(dāng)前位置:
首頁(yè) >
Mongodb源码分析--Replication之主从模式--Master
發(fā)布時(shí)間:2025/5/22
40
豆豆
生活随笔
收集整理的這篇文章主要介紹了
Mongodb源码分析--Replication之主从模式--Master
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
mongodb中提供了復(fù)制(Replication)機(jī)制,通過(guò)該機(jī)制可以幫助我們很容易實(shí)現(xiàn)讀寫(xiě)分離方案,并支持災(zāi)難恢復(fù)(服務(wù)器斷電)等意外情況下的數(shù)據(jù)安全。
?? ?? 在老版本(1.6)中,Mongo提供了兩種方式的復(fù)制:master-slave及replica pair模式(注:mongodb最新支持的replset復(fù)制集方式可看成是pair的升級(jí)版,它解決pair只能在兩個(gè)結(jié)點(diǎn)間同步的限制,支持多個(gè)結(jié)點(diǎn)同步且支持主從宕機(jī)時(shí)的自動(dòng)切換, 在1.6版以后提供)。
???
?? ?? 利用前者,我們可以實(shí)現(xiàn)讀寫(xiě)分離(主從復(fù)制模式),后者則支持當(dāng)主服務(wù)器斷電情況下的集群中其它slave自動(dòng)接管,并升級(jí)為主服務(wù)器。 并且如果后來(lái)的也出錯(cuò)了,那么master狀態(tài)將會(huì)轉(zhuǎn)回給第一個(gè)服務(wù)器(之前宕機(jī)但后來(lái)又恢復(fù)運(yùn)行的服務(wù)器)。
? ? ? 同時(shí)mongodb支持使用安全認(rèn)證(enable)。不管哪種replicate方式,只要在master/slave中創(chuàng)建一個(gè)能為各個(gè)database認(rèn)識(shí)的用戶名/密碼即可。其認(rèn)證過(guò)程如下:
????slave先在local.system.users里查找一個(gè)名為"repl"的用戶,找到后用它去認(rèn)證master。如果"repl"用戶沒(méi)有找到,則使用local.system.users中的第一個(gè)用戶去認(rèn)證。local數(shù)據(jù)庫(kù)和admin數(shù)據(jù)庫(kù)一樣,local中的用戶可以訪問(wèn)整個(gè)db?server。
?? ?下面介紹分別介紹一下這兩種復(fù)制的配置方式:
?? ?
?? ?Master-Slave(主從)模式:
?? ? 一個(gè)server可以同時(shí)為master和slave。一個(gè)slave可以有多個(gè)master(不推薦,可能會(huì)產(chǎn)生不可預(yù)期的結(jié)果)。
???? 配置選項(xiàng):
?? ? --master? 以主服務(wù)器方式啟動(dòng)
?? ? --slave?? 以從服務(wù)器方式啟動(dòng)
???? --autoresync:自動(dòng)重新sync,因?yàn)樵摬僮鲿?huì)copy 主服務(wù)器上的所有document,比較耗時(shí),在10分鐘內(nèi)最多只會(huì)進(jìn)行一次。
?? ? --oplogSize:指定master上用于存放更改的數(shù)據(jù)量,如果不指定,在32位機(jī)上最少為50M,在64位機(jī)上最少為 1G,最大為磁盤(pán)空間的5%。
?? ? --source? 主服務(wù)器地址(與--slave組合使用)
?? ? --only??? 僅限于同步指定數(shù)據(jù)庫(kù)(下面示例為test庫(kù))
?? ? --slavedelay? 同步延時(shí)
?? ? 下面是本人在本地為了測(cè)試方便所使用的配置參數(shù)
?? ???? Master:? IP->10.0.1.103?? ????
mongod --dbpath=d:\mongodb\db?--master?--oplogSize?64 ?? ??? ?Slave:?? IP->10.0.4.210
?? ????
mongod --dbpath=d:\mongodb\db?--slave?--source?10.0.1.103:27017?--only?test?--slavedelay?100
????
??? 補(bǔ)充:受限的master-master復(fù)制,這種模式對(duì)插入、查詢(xún)及根據(jù)_id進(jìn)行的刪除操作都是安全的。但對(duì)同一對(duì)象的并發(fā)更新無(wú)法進(jìn)行。Mongo?不支持完全的master-master復(fù)制,通常情況下不推薦使用master-master模式,但在一些特定的情況下master-master也可用。master-master也只支持最終一致性。配置master-master只需運(yùn)行mongod時(shí)同時(shí)加上--master選項(xiàng)和?--slave選項(xiàng)。如下:
???? mongod?--dbpath=d:\mongodb\db?--port?27017?--master?--slave?--source?localhost:27018
???? mongod?--dbpath=d:\mongodb\db?--port?27018?--master?--slave?--source?localhost:27017
???? ?
??? Replica pairs模式
???? 以這種方式啟動(dòng)后,數(shù)據(jù)庫(kù)會(huì)自動(dòng)協(xié)商誰(shuí)是master誰(shuí)是slave。一旦一個(gè)數(shù)據(jù)庫(kù)服務(wù)器斷電,另一個(gè)會(huì)自動(dòng)接管,并從那一刻起起為master。萬(wàn)一另一個(gè)將來(lái)也出錯(cuò)了,那么master狀態(tài)將會(huì)轉(zhuǎn)回給第一個(gè)服務(wù)器。以這種復(fù)制方式啟動(dòng)mongod的命令如下:
???? 配置選項(xiàng):
?? ?? mongod --pairwith <remoteserver> --arbiter <arbiterserver>
?? ?? --pairwith: remoteserver是pair里的另一個(gè)server
?? ?? --arbiter:? arbiterserver是一個(gè)起仲裁作用的Mongo數(shù)據(jù)庫(kù),用來(lái)協(xié)商pair中哪一個(gè)是master。arbiter運(yùn)行在第三個(gè)機(jī)器上,利用“平分決勝制”決定在pair中的兩臺(tái)機(jī)器不能聯(lián)系上對(duì)方時(shí)讓哪一個(gè)做master,一般是能同arbiter通話的那臺(tái)機(jī)器做master。如果不加--arbiter選項(xiàng),出現(xiàn)網(wǎng)絡(luò)問(wèn)題時(shí)兩臺(tái)機(jī)器都作為master。
?? ?? 注:可使用db.$cmd.findOne({ismaster:1})可以檢查當(dāng)前哪一個(gè)database是master。
???? 另外這種模式下的兩臺(tái)機(jī)器只能滿足最終一致性。當(dāng)replica pair中的一臺(tái)機(jī)器完全掛掉時(shí),需要用一臺(tái)新的來(lái)代替。如(n1, n2)中的n2掛掉,這時(shí)用n3來(lái)代替n2。步驟如下:
?? ?? 1. 告訴n1用n3來(lái)代替n2:db.$cmd.findOne({replacepeer:1});
?? ?? 2. 重啟n1讓它同n3對(duì)話:mongod --pairwith n3 --arbiter <arbiterserver>
?? ?? 3. 啟動(dòng)n3:mongod --pairwith n1 --arbiter <arbiterserver>。
???? 在n3的數(shù)據(jù)沒(méi)有同步到n1前n3還不能做master,這個(gè)過(guò)程長(zhǎng)短由數(shù)據(jù)量的多少?zèng)Q定。
?
?? ?
???? 了解了復(fù)制模式之后,還有一個(gè)問(wèn)題需要介紹一下,不是就是本文中mongodb使用cap collection來(lái)存儲(chǔ)操作日志,并進(jìn)而使用日志來(lái)復(fù)制(同步)結(jié)點(diǎn)間的數(shù)據(jù),其中由主結(jié)點(diǎn)保存的操作的記錄叫做oplog(operation log的簡(jiǎn)稱(chēng))。
?
?? Oplog存在一個(gè)叫l(wèi)ocal的特殊數(shù)據(jù)庫(kù)中,在oplog.$main集合。Oplog中的每一個(gè)文檔表示一個(gè)在主結(jié)點(diǎn)上執(zhí)行的操作。文檔主要包括4塊內(nèi)容,如下:
??
?Ts:操作的時(shí)間戳。時(shí)間戳類(lèi)型是一個(gè)用來(lái)跟蹤操作是何時(shí)執(zhí)行的一種內(nèi)部類(lèi)型。它由4字節(jié)的時(shí)間戳和四字節(jié)的增量計(jì)數(shù)器組成。
?Op:執(zhí)行的操作的類(lèi)型,大小為1字節(jié)。(例如,“i”代表insert,"u":update,?"d":delete,?"n":none無(wú)操作等)
?Ns:執(zhí)行操作的命名空間(集合名)
?O:執(zhí)行操作的文檔。對(duì)于插入,這是將要插入的文檔。
???? 另外這種日志只保存會(huì)“改變數(shù)據(jù)庫(kù)狀態(tài)”的操作。查詢(xún)操作不會(huì)記錄在oplog中。
? ?? 好了,了解這些知識(shí)之后,我們就來(lái)開(kāi)始看一下如何調(diào)試master-slave模式的源碼,首先要在vs2010中打開(kāi)mongod項(xiàng)目,并將啟動(dòng)參數(shù)中設(shè)置如下:
????? --master --oplogSize 64?? (master IP為10.0.1.103)
???? 如下圖:
?
?? ?
???? 之后編譯該項(xiàng)目,啟動(dòng)該主服務(wù)結(jié)點(diǎn),如下:
?? ?
?? ?
???? 接著我們可以在本地或另外一臺(tái)機(jī)器上啟動(dòng)一個(gè)slave結(jié)點(diǎn):
? mongod?--dbpath=d:\mongodb\db?--slave?--source?10.0.1.103:27017?--only?test?--slavedelay?100
?? 下面介紹一下master(主服務(wù)端)的代碼執(zhí)行流程。首先我們打開(kāi)instance.cpp文件,找到下面方法:
??
??? //instance.cpp
????//?Returns?false?when?request?includes?'end'
????void?assembleResponse(?Message?&m,?DbResponse?&dbresponse,?const?SockAddr?&client?)?{
????......
???????if?(?op?==?dbQuery?)?{
????????????if?(?handlePossibleShardedMessage(?m?,?&dbresponse?)?)
????????????????return;
????????????receivedQuery(c?,?dbresponse,?m?);
????????}
????????//服務(wù)端(master)?收到message執(zhí)行相關(guān)查詢(xún)操作
????????else?if?(?op?==?dbGetMore?)?{
????????????if?(?!?receivedGetMore(dbresponse,?m,?currentOp)?)
????????????????log?=?true;
????????}
????.....
????}
?? ?看過(guò)本系列開(kāi)頭那幾篇BLOG的朋友,會(huì)看出上面方法其實(shí)在mongodb的crud操作中都會(huì)執(zhí)行到,更多內(nèi)容可以參見(jiàn)這篇BLOG,這里不再贅述。
?? ?當(dāng)slave 從結(jié)點(diǎn)發(fā)送同步復(fù)制請(qǐng)求時(shí),master會(huì)執(zhí)行上面的dbGetMore操作,從主庫(kù)中的oplog中獲取相應(yīng)日志并返回給slave結(jié)點(diǎn),下面是receivedGetMore()方法的具體實(shí)現(xiàn):
???
???? //instance.cpp
?????bool?receivedGetMore(DbResponse&?dbresponse,?Message&?m,?CurOp&?curop?)?{
????????StringBuilder&?ss?=?curop.debug().str;
????????bool?ok?=?true;
????????//參見(jiàn):Mongodb源碼分析--消息(message)中的?查詢(xún)更多(document)消息結(jié)構(gòu)相關(guān)內(nèi)容
????????//http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html
????????DbMessage?d(m);
????????//完整的集合名稱(chēng),形如:"dbname.collectionname"
????????const?char?*ns?=?d.getns();
????????//返回的document數(shù)
????????int?ntoreturn?=?d.pullInt();
????????//在REPLY消息中的Cursor標(biāo)識(shí)符,其必須來(lái)自于數(shù)據(jù)庫(kù)
????????long?long?cursorid?=?d.pullInt64();
????????ss?<<?ns?<<?"?cid:"?<<?cursorid;
????????if(?ntoreturn?)
????????????ss?<<?"?ntoreturn:"?<<?ntoreturn;
????????time_t?start?=?0;
????????int?pass?=?0;
????????bool?exhaust?=?false;
????????QueryResult*?msgdata;//查詢(xún)結(jié)果
????????while(?1?)?{
????????????try?{
????????????????readlock?lk;
????????????????Client::Context?ctx(ns);
????????????????//執(zhí)行GetMore查詢(xún)
????????????????msgdata?=?processGetMore(ns,?ntoreturn,?cursorid,?curop,?pass,?exhaust);
????????????}
????????????catch?(?GetMoreWaitException&?)?{
????????????????exhaust?=?false;
????????????????massert(13073,?"shutting?down",?!inShutdown()?);
????????????????if(?pass?==?0?)?{
????????????????????start?=?time(0);
????????????????}
????????????????else?{
????????????????????if(?time(0)?-?start?>=?4?)?{
????????????????????????//?after?about?4?seconds,?return.??this?is?a?sanity?check.??pass?stops?at?1000?normally
????????????????????????//?for?DEV?this?helps?and?also?if?sleep?is?highly?inaccurate?on?a?platform.??we?want?to
????????????????????????//?return?occasionally?so?slave?can?checkpoint.
????????????????????????pass?=?10000;
????????????????????}
????????????????}
????????????????pass++;
????????????????DEV
????????????????sleepmillis(20);
????????????????else
????????????????????sleepmillis(2);
????????????????continue;
????????????}
????????????catch?(?AssertionException&?e?)?{
????????????????exhaust?=?false;
????????????????ss?<<?"?exception?"?<<?e.toString();
????????????????msgdata?=?emptyMoreResult(cursorid);
????????????????ok?=?false;
????????????}
????????????break;
????????};
????????//將查詢(xún)結(jié)果集綁定到message對(duì)象
????????Message?*resp?=?new?Message();
????????resp->setData(msgdata,?true);
????????ss?<<?"?bytes:"?<<?resp->header()->dataLen();
????????ss?<<?"?nreturned:"?<<?msgdata->nReturned;
????????//將上面的消息對(duì)象指針綁定到dbresponse
????????dbresponse.response?=?resp;
????????dbresponse.responseTo?=?m.header()->id;
????????if(?exhaust?)?{
????????????ss?<<?"?exhaust?";
????????????dbresponse.exhaust?=?ns;
????????}
????????return?ok;
????} ?? ?
可以看出,通過(guò)對(duì)message的解析找出相應(yīng)的cursorid,因?yàn)閙ongodb如果發(fā)現(xiàn)游標(biāo)為tailable(類(lèi)型)時(shí),會(huì)cache該cursor而不是關(guān)閉它,這主要是考慮到當(dāng)下次slave請(qǐng)求來(lái)時(shí),直接從cache中獲取該cursor以提升效率并用它來(lái)作為繼續(xù)獲取后續(xù)oplog操作信息。上面方法在執(zhí)行結(jié)束處會(huì)將獲取到的oplog結(jié)果封裝到message中并返回。但其如何獲取,就要分析下面方法了:
?? ?
????//query.cpp
?????QueryResult*?processGetMore(const?char?*ns,?int?ntoreturn,?long?long?cursorid?,?CurOp&?curop,?int?pass,?bool&?exhaust?)?{
????????exhaust?=?false;
????????//在map<CursorId,?ClientCursor*>中查詢(xún)相應(yīng)游客信息
????????ClientCursor::Pointer?p(cursorid);
????????//將結(jié)果返回(可能沒(méi)找到)
????????ClientCursor?*cc?=?p.c();
????????int?bufSize?=?512;
????????if?(?cc?)?{
????????????bufSize?+=?sizeof(?QueryResult?);
????????????bufSize?+=?MaxBytesToReturnToClientAtOnce;
????????}
????????//創(chuàng)建收集查詢(xún)記錄結(jié)果的buf對(duì)象
????????BufBuilder?b(?bufSize?);
????????//跳過(guò)預(yù)留數(shù)據(jù)區(qū)間(QueryResult)
????????b.skip(sizeof(QueryResult));
????????int?resultFlags?=?ResultFlag_AwaitCapable;
????????int?start?=?0;
????????int?n?=?0;
????????//判斷cc是否有效(如未找到則無(wú)效)
????????if?(?!cc?)?{
????????????log()?<<?"getMore:?cursorid?not?found?"?<<?ns?<<?"?"?<<?cursorid?<<?endl;
????????????cursorid?=?0;
????????????resultFlags?=?ResultFlag_CursorNotFound;
????????}
????????else?{
????????????//更新master結(jié)點(diǎn)local.slaves中的相應(yīng)信息(包括lastop時(shí)間戳)
????????????//注:主結(jié)點(diǎn)使用存儲(chǔ)在local.slaves中的syncedTo來(lái)跟蹤多少slave是已經(jīng)更新的。
????????????if?(?pass?==?0?)
????????????????cc->updateSlaveLocation(?curop?);
????????????int?queryOptions?=?cc->queryOptions();
????????????if(?pass?==?0?)?{
????????????????StringBuilder&?ss?=?curop.debug().str;
????????????????ss?<<?"?getMore:?"?<<?cc->query().toString()?<<?"?";
????????????}
????????????//獲取相應(yīng)cursor,以便while遍歷
????????????start?=?cc->pos();
????????????Cursor?*c?=?cc->c();
????????????c->checkLocation();
????????????DiskLoc?last;
????????????scoped_ptr<Projection::KeyOnly>?keyFieldsOnly;
????????????if?(?cc->modifiedKeys()?==?false?&&?cc->isMultiKey()?==?false?&&?cc->fields?)
????????????????keyFieldsOnly.reset(?cc->fields->checkKey(?cc->indexKeyPattern()?)?);
????????????//遍歷cursor,找到并封裝相應(yīng)查詢(xún)結(jié)果給buf對(duì)象
????????????while?(?1?)?{
????????????????if?(?!c->ok()?)?{//到結(jié)尾
????????????????????if?(?c->tailable()?)?{//處理tailable情況
????????????????????????//Tailable?表示在返回最后一條數(shù)據(jù)后,不要關(guān)閉當(dāng)前?cursor。
????????????????????????//這是因?yàn)橄到y(tǒng)考慮到稍后你可以再次使用該cursor.??
????????????????????????/*?when?a?tailable?cursor?hits?"EOF",?ok()?goes?false,?and?current()?is?null.??however
???????????????????????????advance()?can?still?be?retries?as?a?reactivation?attempt.??when?there?is?new?data,?it?will
???????????????????????????return?true.??that's?what?we?are?doing?here.
???????????????????????????*/
????????????????????????if?(?c->advance()?)
????????????????????????????continue;
????????????????????????if(?n?==?0?&&?(queryOptions?&?QueryOption_AwaitData)?&&?pass?<?1000?)?{
????????????????????????????throw?GetMoreWaitException();
????????????????????????}
????????????????????????break;
????????????????????}
????????????????????//釋放cursor資源關(guān)閉它(執(zhí)行delete操作)
????????????????????p.release();
????????????????????bool?ok?=?ClientCursor::erase(cursorid);
????????????????????assert(ok);
????????????????????cursorid?=?0;
????????????????????cc?=?0;
????????????????????break;
????????????????}
????????????????//?如果是clone?collection時(shí),則不會(huì)匹配 // If match succeeds on index key, then attempt to match full document.??????????????? if?(?c->matcher()?&&?!c->matcher()->matches(c->currKey(),?c->currLoc()?)?)?{
????????????????}
????????????????/*
??????????????????TODO
????????????????else?if?(?_chunkMatcher?&&?!?_chunkMatcher->belongsToMe(?c->currKey(),?c->currLoc()?)?){
????????????????????cout?<<?"TEMP?skipping?un-owned?chunk:?"?<<?c->current()?<<?endl;
????????????????}
????????????????*/
????????????????else?{//值是否重復(fù)
????????????????????if(?c->getsetdup(c->currLoc())?)?{
????????????????????????//out()?<<?"??but?it's?a?dup?\n";
????????????????????}
????????????????????else?{//如匹配
????????????????????????last?=?c->currLoc();
????????????????????????n++;
????????????????????????//裝填數(shù)據(jù)到buf中
????????????????????????if?(?keyFieldsOnly?)?{
????????????????????????????fillQueryResultFromObj(b,?0,?keyFieldsOnly->hydrate(?c->currKey()?)?);
????????????????????????}
????????????????????????else?{
????????????????????????????BSONObj?js?=?c->current();
????????????????????????????//?show?disk?loc?should?be?part?of?the?main?query,?not?in?an?$or?clause,?so?this?should?be?ok
????????????????????????????fillQueryResultFromObj(b,?cc->fields.get(),?js,?(?cc->pq.get()?&&?cc->pq->showDiskLoc()???&last?:?0));
????????????????????????}
????????????????????????if?(?(?ntoreturn?&&?n?>=?ntoreturn?)?||?b.len()?>?MaxBytesToReturnToClientAtOnce?)?{
????????????????????????????c->advance();
????????????????????????????cc->incPos(?n?);
????????????????????????????break;
????????????????????????}
????????????????????}
????????????????}
????????????????//指向下一條記錄
????????????????c->advance();
????????????????if?(?!?cc->yieldSometimes()?)?{
????????????????????cc?=?0;
????????????????????break;
????????????????}
????????????}
????????????if?(?cc?)?{
????????????????cc->updateLocation();
????????????????cc->mayUpgradeStorage();
????????????????//用last中的optime?更新_slaveReadTill
????????????????cc->storeOpForSlave(?last?);
????????????????exhaust?=?cc->queryOptions()?&?QueryOption_Exhaust;
????????????}
????????}
????????//將buf中的信息綁定到查詢(xún)結(jié)果集
????????QueryResult?*qr?=?(QueryResult?*)?b.buf();
????????qr->len?=?b.len();
????????qr->setOperation(opReply);
????????qr->_resultFlags()?=?resultFlags;
????????qr->cursorId?=?cursorid;
????????qr->startingFrom?=?start;
????????qr->nReturned?=?n;
????????b.decouple();
????????return?qr;
????}
?? ? 上面代碼有些長(zhǎng),但其目的很明確,就是針對(duì)指定的cursor進(jìn)行遍歷。這里mongodb會(huì)為每個(gè)slave保存一個(gè)cursor,并且其在遍歷完成后將最后一條oplog的時(shí)間戳作為當(dāng)前slave在local.slaves中的更新標(biāo)識(shí)信息(syncedTo),來(lái)標(biāo)識(shí)當(dāng)前slave的更新情況。(注:首次同步時(shí)全部復(fù)制會(huì)執(zhí)行copyDatabase,復(fù)制master db上的所有document)。該方法運(yùn)行截圖如下:
???
??
?? ?http://www.snailinaturtleneck.com/blog/2010/10/14/getting-to-know-your-oplog/
?? ?http://www.snailinaturtleneck.com/blog/2010/08/02/replica-sets-part-2-what-are-replica-sets/
?? ?原文鏈接:http://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_repl_master_run.html
??? 作者: daizhj, 代震軍? ?
??? 微博: http://t.sina.com.cn/daizhj
??? Tags: mongodb,c++,Replica,master-slave
?? ?? 在老版本(1.6)中,Mongo提供了兩種方式的復(fù)制:master-slave及replica pair模式(注:mongodb最新支持的replset復(fù)制集方式可看成是pair的升級(jí)版,它解決pair只能在兩個(gè)結(jié)點(diǎn)間同步的限制,支持多個(gè)結(jié)點(diǎn)同步且支持主從宕機(jī)時(shí)的自動(dòng)切換, 在1.6版以后提供)。
???
?? ?? 利用前者,我們可以實(shí)現(xiàn)讀寫(xiě)分離(主從復(fù)制模式),后者則支持當(dāng)主服務(wù)器斷電情況下的集群中其它slave自動(dòng)接管,并升級(jí)為主服務(wù)器。 并且如果后來(lái)的也出錯(cuò)了,那么master狀態(tài)將會(huì)轉(zhuǎn)回給第一個(gè)服務(wù)器(之前宕機(jī)但后來(lái)又恢復(fù)運(yùn)行的服務(wù)器)。
? ? ? 同時(shí)mongodb支持使用安全認(rèn)證(enable)。不管哪種replicate方式,只要在master/slave中創(chuàng)建一個(gè)能為各個(gè)database認(rèn)識(shí)的用戶名/密碼即可。其認(rèn)證過(guò)程如下:
????slave先在local.system.users里查找一個(gè)名為"repl"的用戶,找到后用它去認(rèn)證master。如果"repl"用戶沒(méi)有找到,則使用local.system.users中的第一個(gè)用戶去認(rèn)證。local數(shù)據(jù)庫(kù)和admin數(shù)據(jù)庫(kù)一樣,local中的用戶可以訪問(wèn)整個(gè)db?server。
?? ?下面介紹分別介紹一下這兩種復(fù)制的配置方式:
?? ?
?? ?Master-Slave(主從)模式:
?? ? 一個(gè)server可以同時(shí)為master和slave。一個(gè)slave可以有多個(gè)master(不推薦,可能會(huì)產(chǎn)生不可預(yù)期的結(jié)果)。
???? 配置選項(xiàng):
?? ? --master? 以主服務(wù)器方式啟動(dòng)
?? ? --slave?? 以從服務(wù)器方式啟動(dòng)
???? --autoresync:自動(dòng)重新sync,因?yàn)樵摬僮鲿?huì)copy 主服務(wù)器上的所有document,比較耗時(shí),在10分鐘內(nèi)最多只會(huì)進(jìn)行一次。
?? ? --oplogSize:指定master上用于存放更改的數(shù)據(jù)量,如果不指定,在32位機(jī)上最少為50M,在64位機(jī)上最少為 1G,最大為磁盤(pán)空間的5%。
?? ? --source? 主服務(wù)器地址(與--slave組合使用)
?? ? --only??? 僅限于同步指定數(shù)據(jù)庫(kù)(下面示例為test庫(kù))
?? ? --slavedelay? 同步延時(shí)
?? ? 下面是本人在本地為了測(cè)試方便所使用的配置參數(shù)
?? ???? Master:? IP->10.0.1.103?? ????
mongod --dbpath=d:\mongodb\db?--master?--oplogSize?64 ?? ??? ?Slave:?? IP->10.0.4.210
?? ????
mongod --dbpath=d:\mongodb\db?--slave?--source?10.0.1.103:27017?--only?test?--slavedelay?100
????
??? 補(bǔ)充:受限的master-master復(fù)制,這種模式對(duì)插入、查詢(xún)及根據(jù)_id進(jìn)行的刪除操作都是安全的。但對(duì)同一對(duì)象的并發(fā)更新無(wú)法進(jìn)行。Mongo?不支持完全的master-master復(fù)制,通常情況下不推薦使用master-master模式,但在一些特定的情況下master-master也可用。master-master也只支持最終一致性。配置master-master只需運(yùn)行mongod時(shí)同時(shí)加上--master選項(xiàng)和?--slave選項(xiàng)。如下:
???? mongod?--dbpath=d:\mongodb\db?--port?27017?--master?--slave?--source?localhost:27018
???? mongod?--dbpath=d:\mongodb\db?--port?27018?--master?--slave?--source?localhost:27017
???? ?
??? Replica pairs模式
???? 以這種方式啟動(dòng)后,數(shù)據(jù)庫(kù)會(huì)自動(dòng)協(xié)商誰(shuí)是master誰(shuí)是slave。一旦一個(gè)數(shù)據(jù)庫(kù)服務(wù)器斷電,另一個(gè)會(huì)自動(dòng)接管,并從那一刻起起為master。萬(wàn)一另一個(gè)將來(lái)也出錯(cuò)了,那么master狀態(tài)將會(huì)轉(zhuǎn)回給第一個(gè)服務(wù)器。以這種復(fù)制方式啟動(dòng)mongod的命令如下:
???? 配置選項(xiàng):
?? ?? mongod --pairwith <remoteserver> --arbiter <arbiterserver>
?? ?? --pairwith: remoteserver是pair里的另一個(gè)server
?? ?? --arbiter:? arbiterserver是一個(gè)起仲裁作用的Mongo數(shù)據(jù)庫(kù),用來(lái)協(xié)商pair中哪一個(gè)是master。arbiter運(yùn)行在第三個(gè)機(jī)器上,利用“平分決勝制”決定在pair中的兩臺(tái)機(jī)器不能聯(lián)系上對(duì)方時(shí)讓哪一個(gè)做master,一般是能同arbiter通話的那臺(tái)機(jī)器做master。如果不加--arbiter選項(xiàng),出現(xiàn)網(wǎng)絡(luò)問(wèn)題時(shí)兩臺(tái)機(jī)器都作為master。
?? ?? 注:可使用db.$cmd.findOne({ismaster:1})可以檢查當(dāng)前哪一個(gè)database是master。
???? 另外這種模式下的兩臺(tái)機(jī)器只能滿足最終一致性。當(dāng)replica pair中的一臺(tái)機(jī)器完全掛掉時(shí),需要用一臺(tái)新的來(lái)代替。如(n1, n2)中的n2掛掉,這時(shí)用n3來(lái)代替n2。步驟如下:
?? ?? 1. 告訴n1用n3來(lái)代替n2:db.$cmd.findOne({replacepeer:1});
?? ?? 2. 重啟n1讓它同n3對(duì)話:mongod --pairwith n3 --arbiter <arbiterserver>
?? ?? 3. 啟動(dòng)n3:mongod --pairwith n1 --arbiter <arbiterserver>。
???? 在n3的數(shù)據(jù)沒(méi)有同步到n1前n3還不能做master,這個(gè)過(guò)程長(zhǎng)短由數(shù)據(jù)量的多少?zèng)Q定。
?
?? ?
???? 了解了復(fù)制模式之后,還有一個(gè)問(wèn)題需要介紹一下,不是就是本文中mongodb使用cap collection來(lái)存儲(chǔ)操作日志,并進(jìn)而使用日志來(lái)復(fù)制(同步)結(jié)點(diǎn)間的數(shù)據(jù),其中由主結(jié)點(diǎn)保存的操作的記錄叫做oplog(operation log的簡(jiǎn)稱(chēng))。
?
?? Oplog存在一個(gè)叫l(wèi)ocal的特殊數(shù)據(jù)庫(kù)中,在oplog.$main集合。Oplog中的每一個(gè)文檔表示一個(gè)在主結(jié)點(diǎn)上執(zhí)行的操作。文檔主要包括4塊內(nèi)容,如下:
??
?Ts:操作的時(shí)間戳。時(shí)間戳類(lèi)型是一個(gè)用來(lái)跟蹤操作是何時(shí)執(zhí)行的一種內(nèi)部類(lèi)型。它由4字節(jié)的時(shí)間戳和四字節(jié)的增量計(jì)數(shù)器組成。
?Op:執(zhí)行的操作的類(lèi)型,大小為1字節(jié)。(例如,“i”代表insert,"u":update,?"d":delete,?"n":none無(wú)操作等)
?Ns:執(zhí)行操作的命名空間(集合名)
?O:執(zhí)行操作的文檔。對(duì)于插入,這是將要插入的文檔。
???? 另外這種日志只保存會(huì)“改變數(shù)據(jù)庫(kù)狀態(tài)”的操作。查詢(xún)操作不會(huì)記錄在oplog中。
? ?? 好了,了解這些知識(shí)之后,我們就來(lái)開(kāi)始看一下如何調(diào)試master-slave模式的源碼,首先要在vs2010中打開(kāi)mongod項(xiàng)目,并將啟動(dòng)參數(shù)中設(shè)置如下:
????? --master --oplogSize 64?? (master IP為10.0.1.103)
???? 如下圖:
?
?? ?
???? 之后編譯該項(xiàng)目,啟動(dòng)該主服務(wù)結(jié)點(diǎn),如下:
?? ?
?? ?
???? 接著我們可以在本地或另外一臺(tái)機(jī)器上啟動(dòng)一個(gè)slave結(jié)點(diǎn):
? mongod?--dbpath=d:\mongodb\db?--slave?--source?10.0.1.103:27017?--only?test?--slavedelay?100
?? 下面介紹一下master(主服務(wù)端)的代碼執(zhí)行流程。首先我們打開(kāi)instance.cpp文件,找到下面方法:
??
??? //instance.cpp
????//?Returns?false?when?request?includes?'end'
????void?assembleResponse(?Message?&m,?DbResponse?&dbresponse,?const?SockAddr?&client?)?{
????......
???????if?(?op?==?dbQuery?)?{
????????????if?(?handlePossibleShardedMessage(?m?,?&dbresponse?)?)
????????????????return;
????????????receivedQuery(c?,?dbresponse,?m?);
????????}
????????//服務(wù)端(master)?收到message執(zhí)行相關(guān)查詢(xún)操作
????????else?if?(?op?==?dbGetMore?)?{
????????????if?(?!?receivedGetMore(dbresponse,?m,?currentOp)?)
????????????????log?=?true;
????????}
????.....
????}
?? ?看過(guò)本系列開(kāi)頭那幾篇BLOG的朋友,會(huì)看出上面方法其實(shí)在mongodb的crud操作中都會(huì)執(zhí)行到,更多內(nèi)容可以參見(jiàn)這篇BLOG,這里不再贅述。
?? ?當(dāng)slave 從結(jié)點(diǎn)發(fā)送同步復(fù)制請(qǐng)求時(shí),master會(huì)執(zhí)行上面的dbGetMore操作,從主庫(kù)中的oplog中獲取相應(yīng)日志并返回給slave結(jié)點(diǎn),下面是receivedGetMore()方法的具體實(shí)現(xiàn):
???
???? //instance.cpp
?????bool?receivedGetMore(DbResponse&?dbresponse,?Message&?m,?CurOp&?curop?)?{
????????StringBuilder&?ss?=?curop.debug().str;
????????bool?ok?=?true;
????????//參見(jiàn):Mongodb源碼分析--消息(message)中的?查詢(xún)更多(document)消息結(jié)構(gòu)相關(guān)內(nèi)容
????????//http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html
????????DbMessage?d(m);
????????//完整的集合名稱(chēng),形如:"dbname.collectionname"
????????const?char?*ns?=?d.getns();
????????//返回的document數(shù)
????????int?ntoreturn?=?d.pullInt();
????????//在REPLY消息中的Cursor標(biāo)識(shí)符,其必須來(lái)自于數(shù)據(jù)庫(kù)
????????long?long?cursorid?=?d.pullInt64();
????????ss?<<?ns?<<?"?cid:"?<<?cursorid;
????????if(?ntoreturn?)
????????????ss?<<?"?ntoreturn:"?<<?ntoreturn;
????????time_t?start?=?0;
????????int?pass?=?0;
????????bool?exhaust?=?false;
????????QueryResult*?msgdata;//查詢(xún)結(jié)果
????????while(?1?)?{
????????????try?{
????????????????readlock?lk;
????????????????Client::Context?ctx(ns);
????????????????//執(zhí)行GetMore查詢(xún)
????????????????msgdata?=?processGetMore(ns,?ntoreturn,?cursorid,?curop,?pass,?exhaust);
????????????}
????????????catch?(?GetMoreWaitException&?)?{
????????????????exhaust?=?false;
????????????????massert(13073,?"shutting?down",?!inShutdown()?);
????????????????if(?pass?==?0?)?{
????????????????????start?=?time(0);
????????????????}
????????????????else?{
????????????????????if(?time(0)?-?start?>=?4?)?{
????????????????????????//?after?about?4?seconds,?return.??this?is?a?sanity?check.??pass?stops?at?1000?normally
????????????????????????//?for?DEV?this?helps?and?also?if?sleep?is?highly?inaccurate?on?a?platform.??we?want?to
????????????????????????//?return?occasionally?so?slave?can?checkpoint.
????????????????????????pass?=?10000;
????????????????????}
????????????????}
????????????????pass++;
????????????????DEV
????????????????sleepmillis(20);
????????????????else
????????????????????sleepmillis(2);
????????????????continue;
????????????}
????????????catch?(?AssertionException&?e?)?{
????????????????exhaust?=?false;
????????????????ss?<<?"?exception?"?<<?e.toString();
????????????????msgdata?=?emptyMoreResult(cursorid);
????????????????ok?=?false;
????????????}
????????????break;
????????};
????????//將查詢(xún)結(jié)果集綁定到message對(duì)象
????????Message?*resp?=?new?Message();
????????resp->setData(msgdata,?true);
????????ss?<<?"?bytes:"?<<?resp->header()->dataLen();
????????ss?<<?"?nreturned:"?<<?msgdata->nReturned;
????????//將上面的消息對(duì)象指針綁定到dbresponse
????????dbresponse.response?=?resp;
????????dbresponse.responseTo?=?m.header()->id;
????????if(?exhaust?)?{
????????????ss?<<?"?exhaust?";
????????????dbresponse.exhaust?=?ns;
????????}
????????return?ok;
????} ?? ?
可以看出,通過(guò)對(duì)message的解析找出相應(yīng)的cursorid,因?yàn)閙ongodb如果發(fā)現(xiàn)游標(biāo)為tailable(類(lèi)型)時(shí),會(huì)cache該cursor而不是關(guān)閉它,這主要是考慮到當(dāng)下次slave請(qǐng)求來(lái)時(shí),直接從cache中獲取該cursor以提升效率并用它來(lái)作為繼續(xù)獲取后續(xù)oplog操作信息。上面方法在執(zhí)行結(jié)束處會(huì)將獲取到的oplog結(jié)果封裝到message中并返回。但其如何獲取,就要分析下面方法了:
?? ?
????//query.cpp
?????QueryResult*?processGetMore(const?char?*ns,?int?ntoreturn,?long?long?cursorid?,?CurOp&?curop,?int?pass,?bool&?exhaust?)?{
????????exhaust?=?false;
????????//在map<CursorId,?ClientCursor*>中查詢(xún)相應(yīng)游客信息
????????ClientCursor::Pointer?p(cursorid);
????????//將結(jié)果返回(可能沒(méi)找到)
????????ClientCursor?*cc?=?p.c();
????????int?bufSize?=?512;
????????if?(?cc?)?{
????????????bufSize?+=?sizeof(?QueryResult?);
????????????bufSize?+=?MaxBytesToReturnToClientAtOnce;
????????}
????????//創(chuàng)建收集查詢(xún)記錄結(jié)果的buf對(duì)象
????????BufBuilder?b(?bufSize?);
????????//跳過(guò)預(yù)留數(shù)據(jù)區(qū)間(QueryResult)
????????b.skip(sizeof(QueryResult));
????????int?resultFlags?=?ResultFlag_AwaitCapable;
????????int?start?=?0;
????????int?n?=?0;
????????//判斷cc是否有效(如未找到則無(wú)效)
????????if?(?!cc?)?{
????????????log()?<<?"getMore:?cursorid?not?found?"?<<?ns?<<?"?"?<<?cursorid?<<?endl;
????????????cursorid?=?0;
????????????resultFlags?=?ResultFlag_CursorNotFound;
????????}
????????else?{
????????????//更新master結(jié)點(diǎn)local.slaves中的相應(yīng)信息(包括lastop時(shí)間戳)
????????????//注:主結(jié)點(diǎn)使用存儲(chǔ)在local.slaves中的syncedTo來(lái)跟蹤多少slave是已經(jīng)更新的。
????????????if?(?pass?==?0?)
????????????????cc->updateSlaveLocation(?curop?);
????????????int?queryOptions?=?cc->queryOptions();
????????????if(?pass?==?0?)?{
????????????????StringBuilder&?ss?=?curop.debug().str;
????????????????ss?<<?"?getMore:?"?<<?cc->query().toString()?<<?"?";
????????????}
????????????//獲取相應(yīng)cursor,以便while遍歷
????????????start?=?cc->pos();
????????????Cursor?*c?=?cc->c();
????????????c->checkLocation();
????????????DiskLoc?last;
????????????scoped_ptr<Projection::KeyOnly>?keyFieldsOnly;
????????????if?(?cc->modifiedKeys()?==?false?&&?cc->isMultiKey()?==?false?&&?cc->fields?)
????????????????keyFieldsOnly.reset(?cc->fields->checkKey(?cc->indexKeyPattern()?)?);
????????????//遍歷cursor,找到并封裝相應(yīng)查詢(xún)結(jié)果給buf對(duì)象
????????????while?(?1?)?{
????????????????if?(?!c->ok()?)?{//到結(jié)尾
????????????????????if?(?c->tailable()?)?{//處理tailable情況
????????????????????????//Tailable?表示在返回最后一條數(shù)據(jù)后,不要關(guān)閉當(dāng)前?cursor。
????????????????????????//這是因?yàn)橄到y(tǒng)考慮到稍后你可以再次使用該cursor.??
????????????????????????/*?when?a?tailable?cursor?hits?"EOF",?ok()?goes?false,?and?current()?is?null.??however
???????????????????????????advance()?can?still?be?retries?as?a?reactivation?attempt.??when?there?is?new?data,?it?will
???????????????????????????return?true.??that's?what?we?are?doing?here.
???????????????????????????*/
????????????????????????if?(?c->advance()?)
????????????????????????????continue;
????????????????????????if(?n?==?0?&&?(queryOptions?&?QueryOption_AwaitData)?&&?pass?<?1000?)?{
????????????????????????????throw?GetMoreWaitException();
????????????????????????}
????????????????????????break;
????????????????????}
????????????????????//釋放cursor資源關(guān)閉它(執(zhí)行delete操作)
????????????????????p.release();
????????????????????bool?ok?=?ClientCursor::erase(cursorid);
????????????????????assert(ok);
????????????????????cursorid?=?0;
????????????????????cc?=?0;
????????????????????break;
????????????????}
????????????????//?如果是clone?collection時(shí),則不會(huì)匹配 // If match succeeds on index key, then attempt to match full document.??????????????? if?(?c->matcher()?&&?!c->matcher()->matches(c->currKey(),?c->currLoc()?)?)?{
????????????????}
????????????????/*
??????????????????TODO
????????????????else?if?(?_chunkMatcher?&&?!?_chunkMatcher->belongsToMe(?c->currKey(),?c->currLoc()?)?){
????????????????????cout?<<?"TEMP?skipping?un-owned?chunk:?"?<<?c->current()?<<?endl;
????????????????}
????????????????*/
????????????????else?{//值是否重復(fù)
????????????????????if(?c->getsetdup(c->currLoc())?)?{
????????????????????????//out()?<<?"??but?it's?a?dup?\n";
????????????????????}
????????????????????else?{//如匹配
????????????????????????last?=?c->currLoc();
????????????????????????n++;
????????????????????????//裝填數(shù)據(jù)到buf中
????????????????????????if?(?keyFieldsOnly?)?{
????????????????????????????fillQueryResultFromObj(b,?0,?keyFieldsOnly->hydrate(?c->currKey()?)?);
????????????????????????}
????????????????????????else?{
????????????????????????????BSONObj?js?=?c->current();
????????????????????????????//?show?disk?loc?should?be?part?of?the?main?query,?not?in?an?$or?clause,?so?this?should?be?ok
????????????????????????????fillQueryResultFromObj(b,?cc->fields.get(),?js,?(?cc->pq.get()?&&?cc->pq->showDiskLoc()???&last?:?0));
????????????????????????}
????????????????????????if?(?(?ntoreturn?&&?n?>=?ntoreturn?)?||?b.len()?>?MaxBytesToReturnToClientAtOnce?)?{
????????????????????????????c->advance();
????????????????????????????cc->incPos(?n?);
????????????????????????????break;
????????????????????????}
????????????????????}
????????????????}
????????????????//指向下一條記錄
????????????????c->advance();
????????????????if?(?!?cc->yieldSometimes()?)?{
????????????????????cc?=?0;
????????????????????break;
????????????????}
????????????}
????????????if?(?cc?)?{
????????????????cc->updateLocation();
????????????????cc->mayUpgradeStorage();
????????????????//用last中的optime?更新_slaveReadTill
????????????????cc->storeOpForSlave(?last?);
????????????????exhaust?=?cc->queryOptions()?&?QueryOption_Exhaust;
????????????}
????????}
????????//將buf中的信息綁定到查詢(xún)結(jié)果集
????????QueryResult?*qr?=?(QueryResult?*)?b.buf();
????????qr->len?=?b.len();
????????qr->setOperation(opReply);
????????qr->_resultFlags()?=?resultFlags;
????????qr->cursorId?=?cursorid;
????????qr->startingFrom?=?start;
????????qr->nReturned?=?n;
????????b.decouple();
????????return?qr;
????}
?? ? 上面代碼有些長(zhǎng),但其目的很明確,就是針對(duì)指定的cursor進(jìn)行遍歷。這里mongodb會(huì)為每個(gè)slave保存一個(gè)cursor,并且其在遍歷完成后將最后一條oplog的時(shí)間戳作為當(dāng)前slave在local.slaves中的更新標(biāo)識(shí)信息(syncedTo),來(lái)標(biāo)識(shí)當(dāng)前slave的更新情況。(注:首次同步時(shí)全部復(fù)制會(huì)執(zhí)行copyDatabase,復(fù)制master db上的所有document)。該方法運(yùn)行截圖如下:
???
??
?
另外需要解釋的是,master結(jié)點(diǎn)貌似并不會(huì)使用slave發(fā)來(lái)的syncedTo來(lái)過(guò)濾capped collection中的舊oplog(指小于syncedTo時(shí)間戳)的數(shù)據(jù),而是使用tailable類(lèi)型的cursor來(lái)解決如果持續(xù)獲取后續(xù)新增oplog操作信息。前者的主觀臆測(cè)讓我在源碼中兜了一個(gè)圈子,因?yàn)槲乙恢敝饔^認(rèn)為mongod會(huì)執(zhí)行類(lèi)似查詢(xún)操作來(lái)過(guò)濾相應(yīng)舊oplog的時(shí)間戳信息,并將結(jié)果集返回給slave端。現(xiàn)在看來(lái)master只是不斷返回后續(xù)添加到cap collection中oplog(有可能是out of sync的情況而引發(fā)slave地點(diǎn)執(zhí)行resync操作),而最終的過(guò)濾判斷操作完全交給了slave端。這一點(diǎn)會(huì)在我下一篇文章中有所介紹。?
? ?? 好了,今天的內(nèi)容到這里就告一段落了。在接下來(lái)的文章中,將會(huì)介紹slave端是如何發(fā)起同步操作,以及最終如何使用獲取到的oplog來(lái)構(gòu)造本機(jī)數(shù)據(jù)的。
?? ?參考鏈接:
http://www.mongodb.org/display/DOCS/Replicationhttp://www.mongodb.org/display/DOCS/Master+Slave ??? http://www.snailinaturtleneck.com/blog/2010/10/12/replication-internals/
?? ?http://www.snailinaturtleneck.com/blog/2010/10/14/getting-to-know-your-oplog/
?? ?http://www.snailinaturtleneck.com/blog/2010/08/02/replica-sets-part-2-what-are-replica-sets/
?? ?原文鏈接:http://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_repl_master_run.html
??? 作者: daizhj, 代震軍? ?
??? 微博: http://t.sina.com.cn/daizhj
??? Tags: mongodb,c++,Replica,master-slave
?
轉(zhuǎn)載于:https://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_repl_master_run.html
總結(jié)
以上是生活随笔為你收集整理的Mongodb源码分析--Replication之主从模式--Master的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【小假期】反思与计划。6.9-6.10
- 下一篇: Firefox v5 正式版