MIT 6.824 Lab2A (raft) -- Leader Election
文章目錄
- 實(shí)驗(yàn)要求
- Leader Election流程 及詳細(xì)實(shí)現(xiàn)介紹
- 基本角色
- 關(guān)鍵超時(shí)變量
- 關(guān)鍵的兩個(gè)RPC實(shí)現(xiàn)
- RequestVote RPC
- AppendEntries RPC
- Go并發(fā)編程實(shí)現(xiàn)leader election調(diào)度
本節(jié)記錄的是完成MIT6.824 raft lab的leader Election部分實(shí)驗(yàn)。
代碼: https://github.com/BaronStack/MIT-6.824-lab, clone之后git checkout lab2-2A
實(shí)驗(yàn)要求
這里是raft lab2 的2A部分,也是這個(gè)lab的一個(gè)基礎(chǔ)部分。主要完成的raft功能是 leader election 和 heartbeat 心跳。即 集群選舉從初始化狀態(tài)選舉出一個(gè)leader,且在集群沒有異常的情況下這個(gè)leader會(huì)通過heartbeat心跳一直保持自己的leader狀態(tài)。
詳細(xì)的功能可以 通過test_test.go的兩個(gè)測試看看2A這里主要的功能是什么?
func TestInitialElection2A(t *testing.T) {servers := 3 //初始化三個(gè)peercfg := make_config(t, servers, false) // 完成初使選舉defer cfg.cleanup()cfg.begin("Test (2A): initial election")// is a leader elected?cfg.checkOneLeader() // 檢查leader是否選舉出來且只有一個(gè)// sleep a bit to avoid racing with followers learning of the// election, then check that all peers agree on the term.time.Sleep(50 * time.Millisecond)// 完成leader選舉之后,當(dāng)前l(fā)eader任期內(nèi)的term 大于等于初始化的term// 且后續(xù)沒有網(wǎng)絡(luò)異常的情況下這個(gè)term不會(huì)發(fā)生變化term1 := cfg.checkTerms()if term1 < 1 {t.Fatalf("term is %v, but should be at least 1", term1)}// does the leader+term stay the same if there is no network failure?time.Sleep(2 * RaftElectionTimeout)// 過了一段時(shí)間,確保term不會(huì)發(fā)生變化term2 := cfg.checkTerms()if term1 != term2 {fmt.Printf("warning: term changed even though there were no failures")}// there should still be a leader.// 仍然只有一個(gè)leadercfg.checkOneLeader()cfg.end()
}
后面的一個(gè)測試是針對(duì)leader election過程中的其他異常情況進(jìn)行的,詳細(xì)代碼可以看看test_test.go 的 TestReElection2A函數(shù)的測試內(nèi)容:
- 三個(gè)peer選舉出一個(gè)leader
- 一個(gè)peer異常,leader能夠正常選出來
- 兩個(gè)peer異常,leader選舉不出來,因?yàn)橐呀?jīng)超過大多數(shù)異常了
- 恢復(fù)了一個(gè)peer之后有兩個(gè)peer,能夠選舉出來一個(gè)leader
- 再加入一個(gè)peer之后不影響之前正常的leader
整體來看就是一個(gè)完整的leader election的實(shí)現(xiàn)。
Leader Election流程 及詳細(xì)實(shí)現(xiàn)介紹
基本角色
這里的角色在實(shí)際raft相關(guān)的應(yīng)用中是以服務(wù)進(jìn)程的形式存在的。
follower,所有角色開始時(shí)的狀態(tài),等待接受leader心跳RPCs,如果收不到則會(huì)變成CandidateCandidate,候選人。是變成Leader的上一個(gè)角色,候選人會(huì)向其他所有節(jié)點(diǎn)發(fā)送RequestVote RPCs,如果收到集群大多數(shù)的回復(fù),則會(huì)將自己角色變更為Leader,并發(fā)送AppendEntries RPCs。Leader,集群的皇帝/主人…,raft能夠保證每一個(gè)集群僅有一個(gè)leader。負(fù)責(zé)和客戶端進(jìn)行通信,并將客戶端請(qǐng)求轉(zhuǎn)發(fā)給集群其他成員。
代碼中定義了三種常量表示peer不同的state:
const (STATE_FOLLOWER = iota // 0STATE_CANDIDATESTATE_LEADERHBINTERVAL = 50 * time.Millisecond // 50ms 心跳間隔
)
關(guān)鍵超時(shí)變量
-
Election Timeout選舉超時(shí)時(shí)間。即Cadidate 向集群其他節(jié)點(diǎn)發(fā)送vote請(qǐng)求時(shí),如果在Election Timeout時(shí)間內(nèi)沒有收到大多數(shù)的回復(fù),則會(huì)重新發(fā)送vote rpc。以上將實(shí)際
RequestVote簡寫為vote ,就是請(qǐng)求投票的rpc一般這個(gè)超時(shí)時(shí)間是在
150-300ms的隨機(jī)時(shí)間,為了防止集群出現(xiàn)頻繁的 split vote 影響leader選舉效率的情況,將這個(gè)超時(shí)時(shí)間取在155-300ms范圍內(nèi)的隨機(jī)時(shí)間。當(dāng)然,這個(gè)數(shù)值也是經(jīng)過測試的,超時(shí)時(shí)間設(shè)置在150-300ms 之間能夠保證raft集群 leader的穩(wěn)定性,也可以將超時(shí)時(shí)間設(shè)置的比較低(12-24ms),但是存在的網(wǎng)絡(luò)延遲則會(huì)導(dǎo)致一些不必要的leader選舉。隨機(jī)超時(shí)時(shí)間的設(shè)定實(shí)現(xiàn)如下,因?yàn)榭吹接泻芏嗤瓿?.824的伙伴有說這里超市時(shí)間是個(gè)坑,測試數(shù)百上千次可能無法保證每次都能在超市時(shí)間內(nèi)選舉出leader,目前還沒有遇到:
time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond) //這里設(shè)置的是550-880ms之間關(guān)于splite vote的情況可以看如下圖,圖片來自raft可視化官網(wǎng):
兩個(gè)節(jié)點(diǎn)收到對(duì)方的vote請(qǐng)求之前變成了candidate,發(fā)送了各自的request vote。 -
Heartbeats Timeout心跳超時(shí)時(shí)間。follower接受來自leader的心跳,如果在heartbeats timeout這個(gè)時(shí)間段內(nèi)follower沒有收到來自leader的AppendEntries RPCs,則follower會(huì)重新觸發(fā)選舉。收到了,則重置follower 本地的 heartbeats timeout。 -
TermLeader選舉過程中除了之前提到的基本變量,還會(huì)有一個(gè)Term 的概念。
,每一個(gè)term的變更不一定表示Leader一定會(huì)被選舉出來了。
上圖中的 term3 則完全沒有選出leader,這種情況的出現(xiàn)就是上文中描述的splite vote的情況,這個(gè)時(shí)候Term也會(huì)增加,當(dāng)時(shí)并沒有l(wèi)eader 被選出來,在ceph/zookeeper中 其實(shí)就類比于Epoch。
關(guān)鍵的兩個(gè)RPC實(shí)現(xiàn)
在講實(shí)際的RequestVoterpc和SendRequestVote實(shí)現(xiàn)之前我們先來看看什么是RPC(remote procedure call)遠(yuǎn)程進(jìn)程調(diào)用。
我們知道raft維護(hù)的是一個(gè)集群多臺(tái)機(jī)器之間的共識(shí)狀態(tài),那需要這個(gè)集群內(nèi)的機(jī)器之間頻繁得進(jìn)行數(shù)據(jù)傳輸。而我們希望實(shí)際發(fā)送過去得不僅僅是數(shù)據(jù)流,還有可以執(zhí)行產(chǎn)生數(shù)據(jù)流的函數(shù),這樣能夠高效得完成一些邏輯上的數(shù)據(jù)處理。比如,raft中我們將RequestVote封裝成一個(gè)函數(shù),將本地的peer狀態(tài)作為參數(shù)和整個(gè)函數(shù)一起發(fā)送到遠(yuǎn)端的機(jī)器,遠(yuǎn)端的機(jī)器能夠根據(jù)發(fā)送過來的peer狀態(tài)通過RequestVote內(nèi)部邏輯來決定自己本地的行為。這個(gè)過程如果純粹得通過網(wǎng)絡(luò)發(fā)送數(shù)據(jù)包,顯然需要大量的數(shù)據(jù)傳輸,所以RPC也就應(yīng)運(yùn)而生了。
實(shí)現(xiàn)RPC的話 不像我們本地服務(wù)器進(jìn)程之間通信或者進(jìn)程內(nèi)部的函數(shù)調(diào)用這么簡單方便,因?yàn)槭强绶?wù)器的,之間的信息交流只能通過網(wǎng)絡(luò)。我們想要讓本地的函數(shù)在遠(yuǎn)端也能夠執(zhí)行,需要實(shí)現(xiàn)如下幾個(gè)機(jī)制:
- Call ID映射。保證本地和遠(yuǎn)端服務(wù)器都能夠通過這個(gè)映射找到唯一的函數(shù)指針執(zhí)行
- 序列化和反序列化。需要將函數(shù)參數(shù)進(jìn)行序列化成字節(jié)流 通過網(wǎng)絡(luò)傳輸?shù)竭h(yuǎn)端,遠(yuǎn)端服務(wù)器再進(jìn)行反序列化解析得到參數(shù)。
- 網(wǎng)絡(luò)傳輸。需要通過網(wǎng)絡(luò)協(xié)議將Call ID、序列化和反序列化數(shù)據(jù) 發(fā)送到遠(yuǎn)端。這里的協(xié)議并不會(huì)有太多的限制,TCP/UDP/HTTP等都可以。
輕量級(jí)得RPC的實(shí)現(xiàn)感興趣的同學(xué)可以看看labrpc.go,對(duì)于RPC過程中需要處理的網(wǎng)絡(luò)異常或者流量控制這樣的需求 學(xué)習(xí)gRPC或者bRPC等C++實(shí)現(xiàn)也是很經(jīng)典的。
RequestVote RPC
這個(gè)RPC存在的目的是為了選舉leader,即集群中有peer變成了candidate狀態(tài)時(shí)就會(huì)發(fā)送RequestVote rpc。
-
RequestVote RPCs以上兩個(gè)超時(shí)過程也說了,投票是通過rpc請(qǐng)求實(shí)現(xiàn)的,且當(dāng)有RequestVote 出現(xiàn)時(shí),說明發(fā)送的peer本省的state已經(jīng)是處于Candidate了實(shí)際的RPC-args和reply結(jié)構(gòu)體如下:
type RequestVoteArgs struct {// Your data here (2A, 2B).Term int // current candidate's termCandidateId int // candidate's id requesting voteLastLogIndex int // index of current candidate's last log entryLastLogTerm int // term of current candidate's last log entry }// // example RequestVote RPC reply structure. // field names must start with capital letters! // type RequestVoteReply struct {// Your data here (2A).Term int // current term, for candidate to update itselfVoteGranted bool // true means candidate received vote }我們的raft中
RequestVote的實(shí)現(xiàn)中,如果想要接收這個(gè)rpc的peer為發(fā)送的rpc即RequestVoteArgs投票,則需要滿足以下幾個(gè)條件:-
Receive-peer 的 term > send-peer 的term,則receiver-peer保留自己本身的狀態(tài),畢竟Term都比請(qǐng)求投票的peer term新
-
為了保證一致性,當(dāng)send-peer的term 滿足大于等于receive-peer的term的時(shí)候需要比較上一個(gè)term是否比receive-peer的上一個(gè)term新,如果是相等,還需要確認(rèn)上一個(gè)log index是否更新。這一些都滿足之后才能更新receive-peer的狀態(tài)為follower 以及 投票的id。即receive-peer認(rèn)可了send-peer是leader。
除了最開始的term的比較之外,后續(xù)的last-term以及l(fā)ast-log-index 都是為了保證選舉出來的leader能夠擁有最更新的日志。
代碼實(shí)現(xiàn)如下:
func (rf *Raft) RequestVote(args RequestVoteArgs,reply *RequestVoteReply) {// Your code here (2A, 2B).rf.mu.Lock()defer rf.mu.Unlock()reply.VoteGranted = falseif args.Term < rf.currentTerm { // 判斷term,rf.currentTerm是receiver-peer的term// send-peer的term沒有receiver-peer的term新,直接返回reply.Term = rf.currentTermreturn}// send-peer的term更新,則更新receive-peer的state和本地term// 如果兩者相等, 則需要繼續(xù)后續(xù)的last-term和last-index的判斷if args.Term > rf.currentTerm { rf.currentTerm = args.Termrf.state = STATE_FOLLOWERrf.voteFor = -1}reply.Term = rf.currentTermlast_term := rf.GetLastTerm()last_index := rf.GetLastIndex()update := false// only the leader have the newer term and log-index than current peer// then we could vote for the peerif args.LastLogTerm > last_term {update = true}if args.LastLogTerm == last_term && args.LastLogIndex >= last_index {update = true}// 都滿足send-peer擁有更全的日志,receive-peer才會(huì)選擇去更新本地相關(guān)狀態(tài)和跟進(jìn)termif (rf.voteFor == -1 || rf.voteFor == args.CandidateId) && update {rf.chanGrantVote <- truerf.state = STATE_FOLLOWERreply.VoteGranted = truerf.voteFor = args.CandidateId // 投票給send-peer的peer id} } -
-
sendAppendEntries的實(shí)現(xiàn) 大體是在send-peer端在發(fā)送完rpc接收到reply之后的處理邏輯
- 在收到
RequestVote之后,檢查發(fā)現(xiàn)當(dāng)前state的狀態(tài)已經(jīng)發(fā)生變化了,則保持這個(gè)狀態(tài)直接返回(在此期間可能收到了AppendEntries RPC ,則會(huì)直接變更為follower) - term 發(fā)生了變化,則認(rèn)為當(dāng)前peer在收到自己發(fā)送的rpc回復(fù)之前收到別人的rpc且為別人投了票,也就是狀態(tài)也發(fā)生了變化
- 自己還是保持的發(fā)送之前的state和term,只是收到回復(fù)的term比自己的term大,那將自己狀態(tài)變更為follower
- 收到的回復(fù)中發(fā)現(xiàn)別人投給自己一票,那就準(zhǔn)備將自己變更為leader
func (rf *Raft) sendRequestVote(server int, args RequestVoteArgs, reply *RequestVoteReply) bool {ok := rf.peers[server].Call("Raft.RequestVote", args, reply)rf.mu.Lock()defer rf.mu.Unlock()if ok {// find that the current peer's state changed , return okif rf.state != STATE_CANDIDATE {return ok}// keep the current peer's state, our state have been changedterm := rf.currentTermif args.Term != term {return ok}if reply.Term > term {rf.currentTerm = reply.Termrf.state = STATE_FOLLOWERrf.voteFor = -1}// 別人的回復(fù)認(rèn)可了自己,投了自己一票if reply.VoteGranted {rf.voteCount ++// 確認(rèn)自己的投票總數(shù)超過半數(shù),則通過channel 標(biāo)記自己成為leaderif rf.state == STATE_CANDIDATE && rf.voteCount > len(rf.peers)/2 {rf.state = STATE_FOLLOWERrf.chanLeader <- true}}}return ok
}
關(guān)于Term在candidate 投票過程中發(fā)生的變化 如下圖。
AppendEntries RPC
這個(gè)rpc是leader維護(hù)自己狀態(tài)的,每隔一段時(shí)間像其他的follower發(fā)送AppendEntries,這段時(shí)間的集群term不會(huì)發(fā)生變化。并且AppendEntries也會(huì)攜帶著log-entry 更新log index。
-
AppendEntries RPCsleader 同步數(shù)據(jù)時(shí)的rpc請(qǐng)求。其發(fā)送和接收回復(fù)的結(jié)構(gòu)體形態(tài)如下:
// AppendEntries RPC args type AppendEntriesArgs struct {Term int // leader 的termLeaderId int // leader 所在peer的idPrevLogIndex int // leader上一個(gè)log indexPrevLogTerm int // leader 上一個(gè)log的termEntries []LogEntry // leader 存放的logLeaderCommit int // leader 已經(jīng)commit的index }// AppendEntries RPC reply type AppendEntriesReply struct {Term int // 當(dāng)前peer回復(fù)給leader的term,leader用來判斷是否需要變更自身的狀態(tài)Success bool // 當(dāng)前peer是否仍然認(rèn)可leaderNextIndex int // 下一個(gè)log entry的index內(nèi)容 }AppendEntries 中主要做的事情如下(L表示leader,P表示收到RPC的peer):
- 發(fā)現(xiàn)L-term < P-term,這個(gè)時(shí)候認(rèn)為集群發(fā)生了異常,返回success為false表示當(dāng)前peer不認(rèn)可leader的任期了
- 檢查L-prevLogIndex和P-LogIndex是否匹配,如過發(fā)現(xiàn)L-prevLogIndex更 新,則認(rèn)為follower的log不全,需要從leader補(bǔ)充,那需要找到和P-LogIndex 匹配的index,將找到的index+1返回給leader。這個(gè)過程其實(shí)就是leader補(bǔ)全和follower之間的日志差異,需要向前找到leader和follower所處的同一個(gè)term的同一個(gè)index才能返回。
當(dāng)然,第二點(diǎn)其實(shí)是lab 2B要做的事情,這個(gè)無關(guān)于leader election。
看一下實(shí)現(xiàn)
func (rf *Raft) AppendEntries(args AppendEntriesArgs,reply *AppendEntriesReply) {rf.mu.Lock()defer rf.mu.Unlock()// 發(fā)現(xiàn)L-term < P-term,認(rèn)為集群發(fā)生了異常,將當(dāng)前peer的term返回回去reply.Success = falseif args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.NextIndex = rf.GetLastIndex() + 1return}// 如果是Term正常的,那就直接填充channel,告訴leader當(dāng)前peer仍然是followerrf.chanHeartbeat <- trueif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.state = STATE_FOLLOWERrf.voteFor = -1}reply.Term = args.Termif args.PrevLogIndex > rf.GetLastIndex() {reply.NextIndex = rf.GetLastIndex() + 1return}baseIndex := rf.log[0].LogIndex// 對(duì)PrevLogIndex的檢查,確保follower的entry是和leader的log entry同步的if args.PrevLogIndex > baseIndex {term := rf.log[args.PrevLogIndex-baseIndex].LogTermif args.PrevLogTerm != term {for i := args.PrevLogIndex - 1 ; i >= baseIndex; i-- {if rf.log[i-baseIndex].LogTerm != term {reply.NextIndex = i + 1break}}return}}if args.PrevLogIndex < baseIndex {} else {rf.log = rf.log[: args.PrevLogIndex+1-baseIndex]rf.log = append(rf.log, args.Entries...)reply.Success = truereply.NextIndex = rf.GetLastIndex() + 1}return } -
sendAppendEntries是leader 發(fā)送完AppendEntriesRPC之后的一些處理邏輯func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool {ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)rf.mu.Lock()defer rf.mu.Unlock()if ok {// 發(fā)送RPC之后發(fā)現(xiàn)當(dāng)前l(fā)eader的狀態(tài)和term發(fā)生了變化,直接返回吧// 可能在此期間收到其他peer的rpc擁有更高的term,也會(huì)讓自身的狀態(tài)和term發(fā)生變化if rf.state != STATE_LEADER {return ok}if args.Term != rf.currentTerm {return ok}// 如果leader自身沒有變化,但是發(fā)現(xiàn)收到回復(fù)的term比自己的term新// 只能認(rèn)為自己的follower了if reply.Term > rf.currentTerm {rf.currentTerm = reply.Termrf.state = STATE_FOLLOWERrf.voteFor = -1return ok}// 更新logindexif reply.Success {if len(args.Entries) > 0 {rf.nextIndex[server] = args.Entries[len(args.Entries) - 1].LogIndex + 1rf.matchInex[server] = rf.nextIndex[server] - 1}} else {rf.nextIndex[server] = reply.NextIndex}}return ok }
從上面的AppendEntries和RequestVote兩種RPC我們大體清楚了在leader Election過程中的節(jié)點(diǎn)狀態(tài)變化的情況。
總體來說就是當(dāng)Follower長時(shí)間沒有收到心跳的時(shí)候就會(huì)變成 Candidate,Candidate通過RequestVote邏輯對(duì)一些term新舊的判斷或者logIndex新舊的判斷進(jìn)行投票從而選擇term最新且log最全的peer作為leader,不認(rèn)為自己能夠當(dāng)選leader 的peer同時(shí)也會(huì)將自己的狀態(tài)變更為follower;leader會(huì)不斷得向follower發(fā)送AppendEntries 來維持自己的leader狀態(tài)。當(dāng)集群發(fā)生異常(宕機(jī)的舊leader重新啟動(dòng),收到了新Leader的狀態(tài)信息)則會(huì)將自己標(biāo)記為Follower。
如下圖:
Go并發(fā)編程實(shí)現(xiàn)leader election調(diào)度
我們?cè)赗PC中已經(jīng)將大多數(shù)的核心實(shí)現(xiàn)已經(jīng)描述清楚了, 接下來就是在外部構(gòu)造集群需要的多個(gè)peer ,每個(gè)peer通過接收發(fā)送RPC來維護(hù)自己的外部狀態(tài)機(jī)的行為,從而更好得在三種狀態(tài)之間變遷。
根據(jù)Lab 2A的要求,會(huì)檢查集群從最開始沒有l(wèi)eader的狀態(tài)進(jìn)行選舉,完成leader選舉;到集群正常運(yùn)行時(shí)模擬節(jié)點(diǎn)網(wǎng)絡(luò)異常進(jìn)行l(wèi)eader選舉。
這里面需要用到GO并發(fā)的一些知識(shí) 多路選擇 + 超時(shí)控制 + CSP(communicating sequential processes),能夠體會(huì)到GO語言在并發(fā)編程下的強(qiáng)大。
調(diào)度這里我們要做的事情就是:
-
初始化幾個(gè)peer
-
每一個(gè)peer維護(hù)一個(gè)狀態(tài)機(jī),每一個(gè)狀態(tài)下去調(diào)度各自狀態(tài)的邏輯。
-
Follower
a. 對(duì)candidate和leader的rpc進(jìn)行回復(fù)
b. 如果超市時(shí)間內(nèi)沒有收到AppendEntries rpc 或者 收到candidate的投票,會(huì)講自己的狀態(tài)轉(zhuǎn)為candidate -
Candidates
投票過程中會(huì)做的事情:
a. 增加當(dāng)前peer的term
b. 為自己投票
c. 重置選舉超市時(shí)間
d. 發(fā)送RequestVote RPC 發(fā)送給其他peer如果發(fā)送的rpc收到的回復(fù)大多數(shù)都認(rèn)可自己,那就變成leader
如果收到了AppendEntries RPC, 那就變成follower,說明有其他人在選舉
如果超時(shí)時(shí)間過期了,那就開啟一個(gè)新的term -
Leader
選舉過程中的leader主要是發(fā)送AppendEntries RPC來維護(hù)自己的term
-
看看具體的實(shí)現(xiàn)(僅僅是leader選舉的部分,并沒有處理持久化的log信息):
這個(gè)Make的調(diào)用會(huì)在測試代碼通過make_config --> Start1 --> Make初始化三個(gè)peer
func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = me// Your initialization code here (2A, 2B, 2C).// 初始化當(dāng)前peerrf.state = STATE_FOLLOWERrf.voteFor = -1rf.voteCount = 0rf.log = append(rf.log, LogEntry{LogTerm: 0})// 這里維護(hù)了幾個(gè)channel,在后續(xù)變更peer狀態(tài)的時(shí)候會(huì)從channel中取數(shù)據(jù)// heartbeat和requestVote 的兩個(gè)計(jì)時(shí)器也都是依賴channel來實(shí)現(xiàn)的// channel填充的話則是在我們前面實(shí)現(xiàn)的RPC之中rf.chanLeader = make(chan bool, 100) // 變更為leaderrf.chanHeartbeat = make(chan bool, 100) // 接收到heartbeat心跳rf.chanGrantVote = make(chan bool, 100) // 投票完成rf.chanApply = applyCh// 啟動(dòng)一個(gè)go routine,來維護(hù)當(dāng)前peer的狀態(tài)機(jī)go func() {for {switch rf.state {case STATE_FOLLOWER:select {// 有一段時(shí)間接受不到心跳,或者收到Candidate的投票// 則將當(dāng)前follower狀態(tài)變更為candidate,準(zhǔn)備進(jìn)行l(wèi)eader electioncase <- rf.chanHeartbeat:case <- rf.chanGrantVote:case <-time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond): // 計(jì)時(shí)器rf.state = STATE_CANDIDATE} case STATE_LEADER:rf.broadCastAppendEntries() //leader 廣播AppendEntries,有l(wèi)og的話會(huì)攜帶著log-indextime.Sleep(HBINTERVAL)case STATE_CANDIDATE:rf.mu.Lock()rf.currentTerm ++ // 增加當(dāng)前term,表示開啟了一個(gè)新一輪的leader任期rf.voteFor = rf.me // 每個(gè)candidate先為自己投票 rf.voteCount = 1 // 投票計(jì)數(shù)自增,后續(xù)通過這個(gè)計(jì)數(shù)判斷是否能夠成為leaderrf.mu.Unlock()go rf.broadCastReqeustVote() // candidate 向除自己之外的其他peer廣播RequestVoteselect {case <-time.After(time.Duration(rand.Int63() % 333 + 550) * time.Millisecond):case <-rf.chanHeartbeat: // chanHeartbeat為真,收到了AppendEntries,則變更為follower(已經(jīng)有l(wèi)eader了)rf.state = STATE_FOLLOWERcase <-rf.chanLeader: // 在處理RequestVote返回的邏輯中發(fā)現(xiàn)自己能夠成為leader,變更為leaderrf.mu.Lock()rf.state = STATE_LEADER// 調(diào)整后續(xù)要發(fā)送的rf.nextIndex = make([]int,len(rf.peers))rf.matchInex = make([]int,len(rf.peers))for i := range rf.peers {rf.nextIndex[i] = rf.GetLastIndex() + 1rf.matchInex[i] = 0}rf.mu.Unlock()}}}}()
}
需要注意的是go的channel機(jī)制如果不初始化buffer,則會(huì)是阻塞的,一個(gè)channel 會(huì)一直阻塞在這段超時(shí)時(shí)間內(nèi) 直到拿到了值。
ch := make(chan bool, 10) //設(shè)置大小為10的buffer,如果不設(shè)置buffer大小,則后續(xù)取值的時(shí)候會(huì)阻塞ch <- true // 向ch中填值
ret := <- ch // 從ch取值
所以在raft的go實(shí)現(xiàn)中 針對(duì)channel變量的設(shè)置 都會(huì)有 buffer,從而防止其他routine獲取channel 值時(shí)阻塞。
總結(jié)
以上是生活随笔為你收集整理的MIT 6.824 Lab2A (raft) -- Leader Election的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 100个单位的溶解酶能溶解多少玻尿酸
- 下一篇: 为什么都做试管婴儿