zookeeper源码(04)leader选举流程
在"zookeeper源碼(03)集群啟動流程"中介紹了leader選舉的入口,本文將詳細分析leader選舉組件和流程。
leader選舉流程(重要)
- quorumPeer的start階段使用startLeaderElection()方法啟動選舉
- LOOKING狀態(tài),投自己一票
- createElectionAlgorithm - 創(chuàng)建選舉核心組件:QuorumCnxManager(管理連接)、FastLeaderElection(選舉)等
- quorumPeer的main loop根據(jù)當前狀態(tài)執(zhí)行不同流程
狀態(tài)與流程:
-
LOOKING - 使用fastLeaderElection.lookForLeader選舉
- 遞增選舉epoch開啟新一輪選舉
- 使用自己的serverId、zxid、currentEpoch初始化投票決議
- 把選票發(fā)出去
- 循環(huán)接收其他server的選票:
- LOOKING選票:對比選舉epoch、currentEpoch、zxid、serverId決定投給哪個server,若是超過半數(shù)節(jié)點同意該決議,則將該server確定為leader
- FOLLOWING選票:對比選舉epoch后將選票投給當前l(fā)eader
- LEADING選票:對比選舉epoch后將選票投給當前l(fā)eader
-
LEADING - 創(chuàng)建Leader對象執(zhí)行l(wèi)ead邏輯
- zkServer加載數(shù)據(jù)
- 啟動quorum監(jiān)聽
- 根據(jù)各個follower的當前epoch確定新的epoch和zxid
- 給follower同步數(shù)據(jù)
- 啟動zkServer
- 每間隔tick驗證多數(shù)follower同步狀態(tài)
-
FOLLOWING - 創(chuàng)建Follower對象指定followLeader邏輯
- connectToLeader - 連接leader服務器
- registerWithLeader - 向leader發(fā)送當前epoch,等待leader發(fā)送新一輪的epoch
- syncWithLeader - 接收leader同步的數(shù)據(jù):txnlog、committedlog、snapshot
- 保持通信處理來自leader的數(shù)據(jù)包
-
OBSERVING - 創(chuàng)建Observer對象執(zhí)行observeLeader邏輯,基本與FOLLOWING相同
啟動leader選舉
QuorumPeer的startLeaderElection方法是啟動選舉的入口:
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
// 投自己一票,封裝zxid和epoch
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// electionType總是3
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
// TODO: use a factory rather than a switch
// 可以使用策略模式替換switch語句
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
// 關(guān)閉oldQcm
if (oldQcm != null) {
oldQcm.halt();
}
// 用來啟動serverSocket監(jiān)聽
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
}
break;
default:
assert false;
}
return le;
}
public QuorumCnxManager createCnxnManager() {
// socket超時設(shè)置使用,默認tickTime * syncLimit
// 按照zoo_sample.cfg文件配置是2000 * 5
int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
return new QuorumCnxManager(
this,
this.getMyId(),
this.getView(), // serverId->quorumServer
this.authServer,
this.authLearner,
timeout,
this.getQuorumListenOnAllIPs(), // 是否監(jiān)聽所有IP默認false
this.quorumCnxnThreadsSize, // 默認20
this.isQuorumSaslAuthEnabled());
}
QuorumCnxManager類
概述:
This class implements a connection manager for leader election using TCP.
It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
- 維護leader選舉時server之間的tcp連接
- 確保兩個server之間存在一個連接,如果兩個server同時建立連接,則始終保留id大的一方建立的連接
- 使用隊列緩存待發(fā)送的消息
主要字段
// 用于執(zhí)行QuorumConnectionReqThread和QuorumConnectionReceiverThread
private ThreadPoolExecutor connectionExecutor;
// 管理sid -> SendWorker/BlockingQueue/ByteBuffer
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
// 接收隊列
public final BlockingQueue<Message> recvQueue;
主要方法
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid);
// 將initiateConnection方法放到了QuorumConnectionReqThread中然后提交給connectionExecutor異步執(zhí)行
public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid);
private boolean startConnection(Socket sock, Long sid) throws IOException;
public void receiveConnection(final Socket sock);
// 將receiveConnection方法放到了QuorumConnectionReceiverThread中然后提交給connectionExecutor異步執(zhí)行
public void receiveConnectionAsync(final Socket sock);
public void toSend(Long sid, ByteBuffer b);
boolean connectOne(long sid, MultipleAddresses electionAddr);
void connectOne(long sid);
public void connectAll();
其余工具方法不分析。
initiateConnection方法
創(chuàng)建Socket對象,如有必要則做ssl握手和認證,發(fā)送初始化數(shù)據(jù)包。如果自己id小則關(guān)閉連接,以確保兩個server之間存在一個連接。
public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
Socket sock = null;
try {
// 創(chuàng)建Socket
if (self.isSslQuorum()) {
sock = self.getX509Util().createSSLSocket();
} else {
sock = SOCKET_FACTORY.get();
}
setSockOpts(sock); // socket設(shè)置例如timeout
// 連接目標peer
sock.connect(electionAddr.getReachableOrOne(), cnxTO);
// ssl握手
if (sock instanceof SSLSocket) {
SSLSocket sslSock = (SSLSocket) sock;
sslSock.startHandshake();
}
} catch (X509Exception e) {
closeSocket(sock);
return;
} catch (UnresolvedAddressException | IOException e) {
closeSocket(sock);
return;
}
try {
// 發(fā)連接初始化數(shù)據(jù)包、sasl認證
// 如果selfId小于對方,關(guān)閉連接
// 創(chuàng)建SendWorker、RecvWorker并啟動
// 創(chuàng)建對應sid的發(fā)送隊列
startConnection(sock, sid);
} catch (IOException e) {
closeSocket(sock);
}
}
startConnection方法
- 發(fā)連接初始化數(shù)據(jù)包、sasl認證
- 如果selfId小于對方,關(guān)閉連接
- 創(chuàng)建SendWorker、RecvWorker并啟動
- 創(chuàng)建對應sid的發(fā)送隊列
private boolean startConnection(Socket sock, Long sid) throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// 輸出流
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// 發(fā)協(xié)議版本、myid、address初始化數(shù)據(jù)包
long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
dout.writeLong(protocolVersion);
dout.writeLong(self.getMyId());
// now we send our election address. For the new protocol version, we can send multiple addresses.
Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
? self.getElectionAddress().getAllAddresses()
: Arrays.asList(self.getElectionAddress().getOne());
String addr = addressesToSend.stream()
.map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
authLearner.authenticate(sock, qps.hostname);
}
if (sid > self.getMyId()) { // If lost the challenge, then drop the new connection
closeSocket(sock);
} else {
// 創(chuàng)建SendWorker、RecvWorker
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
// 創(chuàng)建發(fā)送隊列
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
receiveConnection方法
當server收到連接請求,如果change獲勝(selfId大于對方),將關(guān)閉該連接,由自己去連接對方。
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
// 輸入流
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din);
} catch (IOException e) {
closeSocket(sock);
}
}
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
Long sid = null, protocolVersion = null;
MultipleAddresses electionAddr = null;
try {
protocolVersion = din.readLong();
if (protocolVersion >= 0) { // this is a server id and not a protocol version
sid = protocolVersion;
} else {
try {
InitialMessage init = InitialMessage.parse(protocolVersion, din);
sid = init.sid;
if (!init.electionAddr.isEmpty()) {
electionAddr = new MultipleAddresses(init.electionAddr,
Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
}
} catch (InitialMessage.InitialMessageException ex) {
closeSocket(sock);
return;
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
// Choose identifier at random. We need a value to identify the connection.
sid = observerCounter.getAndDecrement();
}
} catch (IOException e) {
closeSocket(sock);
return;
}
// do authenticating learner
authServer.authenticate(sock, din);
// If wins the challenge, then close the new connection.
if (sid < self.getMyId()) { // 對方比自己id小,需要關(guān)閉當前連接,由自己去連接對方
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
// 關(guān)閉連接
closeSocket(sock);
if (electionAddr != null) {
connectOne(sid, electionAddr); // 連接對方
} else {
connectOne(sid);
}
} else if (sid == self.getMyId()) {
} else { // 創(chuàng)建SendWorker、RecvWorker和發(fā)送隊列
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
sw.start();
rw.start();
}
}
toSend方法
發(fā)消息。
public void toSend(Long sid, ByteBuffer b) {
// 如果是給自己的消息,直接發(fā)給recvQueue
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
// 將消息發(fā)給sid對應的發(fā)送隊列
BlockingQueue<ByteBuffer> bq =
queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
// 檢查是否建立了連接
connectOne(sid);
}
}
connectOne方法
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
// 已經(jīng)建立過連接
if (senderWorkerMap.get(sid) != null) {
if (self.isMultiAddressEnabled() && electionAddr.size() > 1 &&
self.isMultiAddressReachabilityCheckEnabled()) {
// check是否可達
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
}
return true;
}
// 異步建立新連接
return initiateConnectionAsync(electionAddr, sid);
}
synchronized void connectOne(long sid) {
if (senderWorkerMap.get(sid) != null) {
if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
}
return;
}
// 使用sid從lastCommittedView、lastProposedView中解析address之后在建立連接
synchronized (self.QV_LOCK) {
boolean knownId = false;
// Resolve hostname for the remote server before attempting to
// connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
return;
}
}
if (lastSeenQV != null
&& lastProposedView.containsKey(sid)
&& (!knownId ||
!lastProposedView.get(sid).electionAddr.equals(lastCommittedView.get(sid).electionAddr))) {
knownId = true;
if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
return;
}
}
}
}
connectAll方法
Try to establish a connection with each server if one doesn't exist.
public void connectAll() {
long sid;
for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {
sid = en.nextElement();
connectOne(sid);
}
}
Listener類
用來啟動serverSocket監(jiān)聽,一個線程類,在run方法啟動監(jiān)聽:
public void run() {
if (!shutdown) {
Set<InetSocketAddress> addresses;
// 獲取需要監(jiān)聽的地址
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getElectionAddress().getWildcardAddresses();
} else {
addresses = self.getElectionAddress().getAllAddresses();
}
// 用于阻塞等待
CountDownLatch latch = new CountDownLatch(addresses.size());
// 為每一個監(jiān)聽地址創(chuàng)建ListenerHandler
listenerHandlers = addresses.stream().map(address ->
new ListenerHandler(address,self.shouldUsePortUnification(),
self.isSslQuorum(), latch))
.collect(Collectors.toList());
final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
try {
// 啟動ListenerHandler
listenerHandlers.forEach(executor::submit);
} finally {
executor.shutdown();
}
try {
// 阻塞等待,ListenerHandler結(jié)束之后會countdown
latch.await();
} catch (InterruptedException ie) {
} finally {
// Clean up for shutdown 略
}
}
// 略
}
ListenerHandler run方法:
public void run() {
try {
// 接受連接
acceptConnections();
try {
close();
} catch (IOException e) {}
} catch (Exception e) {
} finally {
latch.countDown();
}
}
private void acceptConnections() {
int numRetries = 0;
Socket client = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
// 創(chuàng)建ServerSocket并bind端口
serverSocket = createNewServerSocket();
while (!shutdown) {
try {
// 接受客戶端Socket
client = serverSocket.accept();
setSockOpts(client); // socket設(shè)置如timeout
// 使用receiveConnection處理新的連接
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
} catch (SocketTimeoutException e) {}
}
} catch (IOException e) {
// 略
}
}
// 略
}
QuorumConnectionReqThread類
用于異步連接其他peer服務,run方法調(diào)用initiateConnection方法建立連接。
QuorumConnectionReceiverThread類
用于異步接受連接,run方法調(diào)用receiveConnection方法處理新建立的連接。
SendWorker類
Thread to send messages. Instance waits on a queue, and send a message as soon as there is one available. If connection breaks, then opens a new one.
用來發(fā)送消息的線程:
- 封裝sid、socket、連接輸出流
- 從發(fā)送隊列取消息,通過輸出流發(fā)送
RecvWorker類
Thread to receive messages. Instance waits on a socket read. If the channel breaks, then removes itself from the pool of receivers.
用來讀取消息的線程:
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
// 讀取消息長度
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
// 讀取數(shù)據(jù)
final byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
// 保存到接收隊列
addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
}
} catch (Exception e) {
} finally {
sw.finish();
closeSocket(sock);
}
}
FastLeaderElection類
文檔說明:
Implementation of leader election using TCP. It uses an object of the class QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based as with the other UDP implementations. There are a few parameters that can be tuned to change its behavior. First, finalizeWait determines the amount of time to wait until deciding upon a leader. This is part of the leader election algorithm.
- 使用tcp實現(xiàn)leader選舉,基于推送模式
- 使用QuorumCnxManager對象管理連接
構(gòu)造方法
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
starter(self, manager);
}
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<>();
recvqueue = new LinkedBlockingQueue<>();
// 用來啟動WorkerSender和WorkerReceiver
this.messenger = new Messenger(manager);
}
主要字段
// 在leader最終確定之前嘗試拉取變化選票的時長
static final int finalizeWait = 200;
// 投票箱,用于保存一輪選舉的結(jié)果、統(tǒng)計選舉結(jié)果
private SyncedLearnerTracker leadingVoteSet;
// 發(fā)送隊列
LinkedBlockingQueue<ToSend> sendqueue;
// 接收隊列
LinkedBlockingQueue<Notification> recvqueue;
// 用來啟動WorkerSender和WorkerReceiver
Messenger messenger;
// 決議leaderId
long proposedLeader;
// 決議zxid
long proposedZxid;
// 決議epoch
long proposedEpoch;
start方法啟動選舉
public void start() {
this.messenger.start(); // 會啟動WorkerSender和WorkerReceiver兩個線程
}
Messenger類
WorkerSender線程
- 從sendqueue取ToSend消息
- 通過QuorumCnxManager的toSend方法發(fā)送消息
WorkerReceiver線程
- 通過QuorumCnxManager的pollRecvQueue取接收的消息
- 封裝Notification對象,推送到recvqueue隊列
主要方法
// 創(chuàng)建發(fā)送消息
static ByteBuffer buildMsg(
int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData);
// 給所有節(jié)點發(fā)Notification投票
private void sendNotifications();
// 對比serverId、zxid、currentEpoch決定將票投給哪個server
protected boolean totalOrderPredicate(
long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch);
// 給定一個Vote集,返回SyncedLearnerTracker對象,用來確定是否有足夠的選票確定選舉結(jié)束
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote);
// 如果有l(wèi)eader當選,并且有足夠的選票,必須檢查該leader是否投票并確認其處于領(lǐng)先地位
// 需要進行這種檢查,以避免peers一次又一次地選舉一個已經(jīng)崩潰且不再領(lǐng)先的peer
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch);
// 更新proposedLeader、proposedZxid、proposedEpoch
// 確定leader或者為下一輪投票做準備
synchronized void updateProposal(long leader, long zxid, long epoch);
// 使用當前proposedLeader、proposedZxid、proposedEpoch創(chuàng)建Vote(選票)
public synchronized Vote getVote();
// 通過zkDb獲取lastLoggedZxid
private long getInitLastLoggedZxid();
// 獲取currentEpoch
private long getPeerEpoch();
// 根據(jù)參數(shù)proposedLeader更新peer狀態(tài)
// 如果已經(jīng)是leader會使用voteSet更新leadingVoteSet
private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet);
// 啟動一輪leader選舉
// 當狀態(tài)變?yōu)長OOKING該方法就會被調(diào)用,會給其他peer發(fā)投票notification
public Vote lookForLeader() throws InterruptedException;
// 收到FOLLOWING狀態(tài)notification
private Vote receivedFollowingNotification(
Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
SyncedLearnerTracker voteSet, Notification n);
// 收到LEADING狀態(tài)notification
private Vote receivedLeadingNotification(
Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
SyncedLearnerTracker voteSet, Notification n);
buildMsg方法
static ByteBuffer buildMsg(int state, long leader, long zxid,
long electionEpoch, long epoch, byte[] configData) {
byte[] requestBytes = new byte[44 + configData.length];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
requestBuffer.clear();
requestBuffer.putInt(state); // 當前狀態(tài)
requestBuffer.putLong(leader); // 投票的leaderId
requestBuffer.putLong(zxid); // zxid
requestBuffer.putLong(electionEpoch); // 選舉epoch
requestBuffer.putLong(epoch); // 數(shù)據(jù)epoch
requestBuffer.putInt(Notification.CURRENTVERSION); // 0x2
requestBuffer.putInt(configData.length); // 數(shù)據(jù)長度
requestBuffer.put(configData); // quorumVerifier數(shù)據(jù)
return requestBuffer;
}
totalOrderPredicate方法
對比serverId、zxid、currentEpoch決定將票投給哪個server:
protected boolean totalOrderPredicate(
long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
/*
* Return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}
getVoteTracker方法
給定一個Vote集,返回SyncedLearnerTracker對象,用來確定是否有足夠的選票宣布選舉結(jié)束:
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// 比對其他server響應的選票和本地的選票,決定是否將選票sid放入ack集
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey()); // key是sid
}
}
return voteSet;
}
checkLeader方法
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {
boolean predicate = true;
if (leader != self.getMyId()) {
if (votes.get(leader) == null) { // leader服務器必須投票,否則次輪投票也無效
predicate = false;
} else if (votes.get(leader).getState() != ServerState.LEADING) {
// leader服務器的狀態(tài)必須是LEADING,否則次輪投票也無效
predicate = false;
}
} else if (logicalclock.get() != electionEpoch) { // 選舉epoch必須一致
predicate = false;
}
return predicate;
}
lookForLeader方法
啟動一輪leader選舉,當狀態(tài)變?yōu)長OOKING該方法就會被調(diào)用,會給其他peer發(fā)投票notification通知:
public Vote lookForLeader() throws InterruptedException {
// 略
try {
// 存儲當前選舉周期的sid -> vote選票數(shù)據(jù)
Map<Long, Vote> recvset = new HashMap<>();
// 存儲之前選舉周期的sid -> vote選票數(shù)據(jù)
Map<Long, Vote> outofelection = new HashMap<>();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet(); // 遞增選舉epoch開始新一輪選舉
// 初始化選舉"決議",最開始都是投票給自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 給所有節(jié)點發(fā)通知
sendNotifications();
// 投票箱
SyncedLearnerTracker voteSet = null;
// 正常情況下直到選出leader才會退出
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
if (n == null) {
// 重發(fā)或者重連
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);
// 略
} else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
// 略
// 對方的選舉epoch比自己大
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch); // 同步為新的epoch
recvset.clear(); // 清空投票集
// 比對選票,如果對方贏了,則使用對方的選票更新到本地
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 把最新的選票發(fā)出去
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
// 對方的選舉epoch比自己小
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
// 保存到選票集
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 創(chuàng)建投票箱
voteSet = getVoteTracker(
recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
// 判斷acks>half表示已經(jīng)選舉出了leader
if (voteSet.hasAllQuorums()) {
// 等待拉取變化的選票
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
// 設(shè)置peer狀態(tài)
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(
proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
break;
case FOLLOWING:
// 收到FOLLOWING通知
Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
if (resultFN == null) {
break;
} else {
return resultFN;
}
case LEADING:
// 收到LEADING通知
Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
if (resultLN == null) {
break;
} else {
return resultLN;
}
default:
break;
}
} else {
// 略
}
}
return null;
} finally {
// 略
}
}
receivedFollowingNotification方法
收到FOLLOWING狀態(tài)notification。
private Vote receivedFollowingNotification(
Map<Long, Vote> recvset, Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {
// 也會將選票投給當前l(fā)eader
// 之后會進行quorum驗證和leaderCheck驗證
if (n.electionEpoch == logicalclock.get()) {
// 創(chuàng)建投票箱
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(
recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
// acks>half和leaderCheck
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
// 更新節(jié)點狀態(tài)
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
// 當本節(jié)點較晚進入集群,集群已經(jīng)有了leader時,會進入下面邏輯
// 與前面的代碼基本相同
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(
outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
return null;
}
receivedLeadingNotification方法
收到LEADING狀態(tài)notification。
private Vote receivedLeadingNotification(Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
SyncedLearnerTracker voteSet, Notification n) {
Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n);
if (result == null) {
if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) {
// 略
} else {
return null;
}
} else {
return result;
}
}
QuorumPeer類
管理quorum協(xié)議,服務器可能處于以下三種狀態(tài):
- Leader選舉 - 每個服務器將選出一個leader,最初都會選自己
- Follower節(jié)點 - 將與Leader同步并復制所有事務
- Leader節(jié)點 - 處理請求并將其轉(zhuǎn)發(fā)給Follower節(jié)點,大多數(shù)Follower節(jié)點必須同步,該請求才能被提交
run方法main loop
run方法main loop判斷當前peer狀態(tài),執(zhí)行選舉、lead、follow等邏輯:
public void run() {
// 略
try {
// Main loop
while (running) {
switch (getPeerState()) {
case LOOKING:
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
if (Boolean.getBoolean("readonlymode.enabled")) {
// 略
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
break;
case FOLLOWING:
try {
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
}
} finally {
// 略
}
}
LOOKING分支
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
// 使用FastLeaderElection選舉
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
setPeerState(ServerState.LOOKING); // 重置為LOOKING狀態(tài)
}
FOLLOWING分支
try {
setFollower(makeFollower(logFactory));
follower.followLeader(); // 啟動follower
} catch (Exception e) {
} finally {
follower.shutdown();
setFollower(null);
updateServerState(); // 更新服務狀態(tài)
}
創(chuàng)建Follower對象:
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}
LEADING分支
try {
setLeader(makeLeader(logFactory));
leader.lead(); // 啟動leader
setLeader(null);
} catch (Exception e) {
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState(); // 更新服務狀態(tài)
}
創(chuàng)建Leader對象:
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}
OBSERVING分支
try {
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
創(chuàng)建Observer對象:
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}
總結(jié)
以上是生活随笔為你收集整理的zookeeper源码(04)leader选举流程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kali Linux安装pyenv
- 下一篇: 数据库系列:InnoDB下实现高并发控制