日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

[Zookeeper-3.6.2源码解析系列]-14-Zookeeper使用到的Reactor网络模型原理分析

發布時間:2023/12/15 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [Zookeeper-3.6.2源码解析系列]-14-Zookeeper使用到的Reactor网络模型原理分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • 14-啟服務端網絡監聽連接NIOServerCnxnFactory
    • 14.1 簡介
    • 14.2 主從Reactor網絡IO模型main-sub reactor
    • 14.3 NIOServerCnxnFactory 的初始化配置方法
    • 14.5 AcceptThread
      • 14.5.1 AcceptThread類型源碼
      • pauseAccept暫停接收
    • 14.6 SelectorThread
      • 14.6.1 SelectorThread類型源碼
      • 14.6.2 處理被接受的連接請求processAcceptedConnections
        • 14.6.2.1看下NIOServerCnxn的構造器
      • 14.6.3 更新updateQueue中連接的監聽事件processInterestOpsUpdateRequests
      • 14.6.4 處理IO事件
        • 14.6.4.1 IOWorkRequest
        • 14.6.4.2 NIOServerCnxn的doIO
        • 14.6.4.3 處理連接請求processConnectRequest
        • 14.6.4.4 跟蹤會話過期時間
        • 14.6.4.5 提交會話
        • 14.6.4.6 提交請求處理
    • 14.7 ConnectionExpirerThread
    • 14.8 NettyServerCnxnFactory

14-啟服務端網絡監聽連接NIOServerCnxnFactory

14.1 簡介

回到QuorumPeer的start()方法,數據恢復之后開始進行網絡交互
的startServerCnxnFactory();

繼續往下看:

private void startServerCnxnFactory() {if (cnxnFactory != null) {cnxnFactory.start();}if (secureCnxnFactory != null) {secureCnxnFactory.start();} }

在QuorumPeerMain類型中的runFromConfig方法中
調用ServerCnxnFactory.createFactory();方法創建連接工廠 在創建工廠對象方法中通過判斷JVM參數zookeeper.serverCnxnFactory工廠類型配置參數是否存在,不存在的話將會默認NIOServerCnxnFactory類型

創建連接對象的代碼如下:
QuorumPeerMain類型的runFromConfig方法中的調用

if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);}if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);}

ServerCnxnFactory的createFactory根據參數類型創建對象

public static ServerCnxnFactory createFactory() throws IOException {String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);if (serverCnxnFactoryName == null) {serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();}try {ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();LOG.info("Using {} as server connection factory", serverCnxnFactoryName);return serverCnxnFactory;} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);throw ioe;}}

連接工廠啟動系統提供了屬性來指定連接工廠對象默認情況下會使用
NIOServerCnxnFactory -JDK自帶的NIO工具如果指定了屬性zookeeper.serverCnxnFactory比如NettyServerCnxnFactory -Netty的NIO工具類型則在上面初始化的時候會加載對應類型,根據我們客戶端配置的clientPort來啟用對應端口提供查詢功能。

這個通信實現先看看別人怎么說
Zookeeper作為一個服務器,自然要與客戶端進行網絡通信,如何高效的與客戶端進行通信, ZooKeeper中使用ServerCnxnFactory管理與客戶端的連接,其有兩個實現,

  • 一個是NIOServerCnxnFactory,使用Java原生NIO實現;
  • 一個是NettyServerCnxnFactory,使用netty實現;

使用ServerCnxn代表一個客戶端與服務端的連接.

ServerCnxnFactory
注:下文或注釋中的連接就是客戶端發起的TCP連接,也即SocketChannel類
ZooKeeper可以通過設置系統屬性zookeeper.serverCnxnFactory配置ServerCnxnFactory的實現類,默認使用NIOServerCnxnFactory
NIOServerCnxnFactory

14.2 主從Reactor網絡IO模型main-sub reactor


一般使用Java NIO的思路為使用1個線程組監聽OP_ACCEPT事件,負責處理客戶端的連接;使用1個線程組監聽客戶端連接的OP_READ和OP_WRITE事件,處理IO事件(netty便是如此實現).
但ZooKeeper并不是如此劃分線程功能的,NIOServerCnxnFactory啟動時會啟動四類線程

  • accept thread:該線程接收來自客戶端的連接,并將其分配給selector thread(啟動一個線程)
  • selector thread:該線程執行select(),由于在處理大量連接時,select()會成為性能瓶頸,因此啟動多個selector thread,使用系統屬性zookeeper.nio.numSelectorThreads配置該類線程數,默認個數為 核心數/2 ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄√核心數/2(至少一個)
  • worker thread:該線程執行基本的套接字讀寫,使用系統屬性zookeeper.nio.numWorkerThreads配置該類線程數,默認為核心數?2核心數?2.如果該類線程數為0,則另外啟動一線程進行IO處理,見下文worker thread介紹
  • connection expiration thread:若連接上的session已過期,則關閉該連接

可以看出,ZooKeeper中對線程需要處理的工作做了更細的拆分.其認為在有大量客戶端連接的情況下,selector.select()會成為性能瓶頸,因此其將selector.select()拆分出來,交由selector thread處理.
線程間通信

上述各類線程之間通過同步隊列通信.這一小節我們看下各類線程通信使用哪幾個同步隊列?各有什么用處

  • SelectorThread.acceptedQueue
    acceptedQueue是LinkedBlockingQueue類型的,在selector thread中.其中包含了accept thread接收的客戶端連接,由selector thread負責將客戶端連接注冊到selector上,監聽OP_READ和OP_WRITE.
  • SelectorThread.updateQueue
    updateQueue和acceptedQueue一樣,也是LinkedBlockingQueue類型的,在selector thread中.但是要說明白該隊列的作用,就要對Java NIO的實現非常了解了.
    Java NIO使用epoll系統調用,且是水平觸發,也即若selector.select()發現socketChannel中有事件發生,比如有數據可讀,只要沒有將這些數據從socketChannel讀取完畢,下一次selector.select()還是會檢測到有事件發生,直至數據被讀取完畢.
    ZooKeeper一直認為selector.select()是性能的瓶頸,為了提高selector.select()的性能,避免上述水平觸發模式的缺陷,ZooKeeper在處理IO的過程中,會讓socketChannel不再監聽OP_READ和OP_WRITE事件,這樣就可以減輕selector.select()的負擔.
    此時便出現一個問題,IO處理完畢后,如何讓socketChannel再監聽OP_READ和OP_WRITE事件?
    有的小伙伴可能認為這件事情非常容易,worker thread處理IO結束后,直接調用key.interestOps(OP_READ & OP_WRITE)不就可以了嗎?事情并沒有這簡單,是因為selector.select()是在selector thread中執行的,若在selector.select()的過程中,worker thread調用了key.interestOps(OP_READ & OP_WRITE),可能會阻塞selector.select().ZooKeeper為了追求性能的極致,設計為由selector thread調用key.interestOps(OP_READ & OP_WRITE),因此worker thread就需在IO處理完畢后告訴selector thread該socketChannel可以去監聽OP_READ和OP_WRITE事件了,
    updateQueue就是存放那些需要監聽OP_READ和OP_WRITE事件的
  • socketChannel.NIOServerCnxn.outgoingBuffers
    outgoingBuffers存放待發送給客戶端的響應數據.
    注:個人推測,既然key.interestOps(OP_READ & OP_WRITE)會阻塞selector.select(),那么accepted.register(selector, SelectionKey.OP_READ)也會阻塞selector.select(),因此接收到的客戶端連接注冊到selector上也要在selector thread上執行,這也是acceptedQueue存在的理由

了解了線程IO模型我們來看一下啟動的源碼:

NIOServerCnxnFactory的配置方法,這個是在Zookeeper啟動時前面加載配置信息時候會調用這個方法:

14.3 NIOServerCnxnFactory 的初始化配置方法

@Override public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {if (secure) {throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");}configureSaslLogin();maxClientCnxns = maxcc;initMaxCnxns();sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);// We also use the sessionlessCnxnTimeout as expiring interval for// cnxnExpiryQueue. These don't need to be the same, but the expiring// interval passed into the ExpiryQueue() constructor below should be// less than or equal to the timeout.cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);expirerThread = new ConnectionExpirerThread();int numCores = Runtime.getRuntime().availableProcessors();// 32 cores sweet spot seems to be 4 selector threadsnumSelectorThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,Math.max((int) Math.sqrt((float) numCores / 2), 1));if (numSelectorThreads < 1) {throw new IOException("numSelectorThreads must be at least 1");}numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);String logMsg = "Configuring NIO connection handler with "+ (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "+ numSelectorThreads + " selector thread(s), "+ (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "+ (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));LOG.info(logMsg);for (int i = 0; i < numSelectorThreads; ++i) {selectorThreads.add(new SelectorThread(i));}listenBacklog = backlog; //創建socket對象,獲取文件描述符this.ss = ServerSocketChannel.open();ss.socket().setReuseAddress(true);LOG.info("binding to port {}", addr);if (listenBacklog == -1) {ss.socket().bind(addr);} else {ss.socket().bind(addr, listenBacklog);}ss.configureBlocking(false);acceptThread = new AcceptThread(ss, addr, selectorThreads); }## 14.4 NIOServerCnxnFactory的啟動方法 @Override public void start() {stopped = false; //啟動工作線程if (workerPool == null) {workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);} //啟動selector線程for (SelectorThread thread : selectorThreads) {if (thread.getState() == Thread.State.NEW) {thread.start();}} //啟動accept線程// ensure thread is started once and only onceif (acceptThread.getState() == Thread.State.NEW) {acceptThread.start();} //啟動過期處理線程if (expirerThread.getState() == Thread.State.NEW) {expirerThread.start();} }

14.5 AcceptThread

14.5.1 AcceptThread類型源碼

accept thread的源碼如下:先全局看下:

private class AcceptThread extends AbstractSelectThread {private final ServerSocketChannel acceptSocket;private final SelectionKey acceptKey;private final RateLogger acceptErrorLogger = new RateLogger(LOG);private final Collection<SelectorThread> selectorThreads;private Iterator<SelectorThread> selectorIterator;private volatile boolean reconfiguring = false;public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {super("NIOServerCxnFactory.AcceptThread:" + addr);this.acceptSocket = ss; //向通道注冊接收事件this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));selectorIterator = this.selectorThreads.iterator();} public void run() {try {while (!stopped && !acceptSocket.socket().isClosed()) {try { //未關閉則循環執行select方法select();} catch (RuntimeException e) {LOG.warn("Ignoring unexpected runtime exception", e);} catch (Exception e) {LOG.warn("Ignoring unexpected exception", e);}}} finally {closeSelector();// This will wake up the selector threads, and tell the// worker thread pool to begin shutdown.if (!reconfiguring) {NIOServerCnxnFactory.this.stop();}LOG.info("accept thread exitted run method");} }private void select() {try { //阻塞到至少有一個通道在你注冊的事件上就緒了。selector.select(); //一旦調用select()方法,并且返回值不為0時,則 可以通過調用Selector的selectedKeys()方法來訪問已選擇鍵集合Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();while (!stopped && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selectedKeys.remove();if (!key.isValid()) {continue;} //測試此鍵的通道是否已準備好接受新的套接字連接。if (key.isAcceptable()) {if (!doAccept()) {// If unable to pull a new connection off the accept// queue, pause accepting to give us time to free// up file descriptors and so the accept thread// doesn't spin in a tight loop.pauseAccept(10);}} else {LOG.warn("Unexpected ops in accept select {}", key.readyOps());}}} catch (IOException e) {LOG.warn("Ignoring IOException while selecting", e);} }private boolean doAccept() {boolean accepted = false;SocketChannel sc = null;try { // accept() 方法監聽新進來的連接。當 accept()方法返回的時候,它返回一個包含新進來的連接的 SocketChannel。因此, accept()方法會一直阻塞到有新連接到達sc = acceptSocket.accept();accepted = true; //當前連接數超過配置最大連接數量則拒絕接受新進連接if (limitTotalNumberOfCnxns()) {throw new IOException("Too many connections max allowed is " + maxCnxns);} //獲取當前連接的地址InetAddress ia = sc.socket().getInetAddress();int cnxncount = getClientCnxnCount(ia); //單個客戶端鏈接數量超過上限if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);}LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); //可以設置 SocketChannel 為非阻塞模式(non-blocking mode).設置之后,就可以在異步模式下調用connect(), read() 和write()了。sc.configureBlocking(false);// Round-robin assign this connection to a selector threadif (!selectorIterator.hasNext()) {selectorIterator = selectorThreads.iterator();} //獲取當前的Selector線程SelectorThread selectorThread = selectorIterator.next(); //調用選擇線程的接收請求方法*將新接受的連接放到等待添加的隊列中。 if (!selectorThread.addAcceptedConnection(sc)) {throw new IOException("Unable to add connection to selector queue"+ (stopped ? " (shutdown in progress)" : ""));}acceptErrorLogger.flush();} catch (IOException e) {// accept, maxClientCnxns, configureBlockingServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());fastCloseSock(sc);}return accepted;}}

pauseAccept暫停接收

//如果無法將新連接從接受隊列中拉出,則暫停接受以給我們時間釋放文件描述符,這樣接受線程就不會在一個緊密的循環中旋轉。 private void pauseAccept(long millisecs) {acceptKey.interestOps(0);try {selector.select(millisecs);} catch (IOException e) {// ignore} finally {acceptKey.interestOps(SelectionKey.OP_ACCEPT);} }

14.6 SelectorThread

14.6.1 SelectorThread類型源碼

SelectorThread
SelectorThread從AcceptThread接收新接收的連接,并負責選擇連接之間的I/O準備情況。這個線程是唯一一個對選擇器執行非線程安全或潛在阻塞調用的線程(注冊新連接和讀寫興趣操作)。將一個連接分配給一個SelectorThread是永久的,并且只有一個SelectorThread會與這個連接交互。有1-N個SelectorThreads,連接平均分配在SelectorThreads之間。
如果有一個工作線程池,當一個連接有I/O來執行時,SelectorThread通過清除它感興趣的操作將它從選擇中刪除,并安排I/O由工作線程處理。當工作完成時,連接被放置在就緒隊列上,以恢復其感興趣的操作并恢復選擇。如果沒有工作線程池,SelectorThread將直接執行I/O操作。

class SelectorThread extends AbstractSelectThread {private final int id;private final Queue<SocketChannel> acceptedQueue;private final Queue<SelectionKey> updateQueue;public SelectorThread(int id) throws IOException {super("NIOServerCxnFactory.SelectorThread-" + id);this.id = id;acceptedQueue = new LinkedBlockingQueue<SocketChannel>();updateQueue = new LinkedBlockingQueue<SelectionKey>();}/*** Place new accepted connection onto a queue for adding. Do this* so only the selector thread modifies what keys are registered* with the selector. 將新接受的連接放到要添加的隊列上。這樣,只有選擇器線程修改向選擇器注冊的鍵。*/public boolean addAcceptedConnection(SocketChannel accepted) {if (stopped || !acceptedQueue.offer(accepted)) {return false;}//喚醒selector 調用的父類型AbstractSelectThread中的wakeupSelector方法wakeupSelector();return true;}/*** Place interest op update requests onto a queue so that only the* selector thread modifies interest ops, because interest ops* reads/sets are potentially blocking operations if other select* operations are happening.*/public boolean addInterestOpsUpdateRequest(SelectionKey sk) {if (stopped || !updateQueue.offer(sk)) {return false;}wakeupSelector();return true;}/*** The main loop for the thread selects() on the connections and* dispatches ready I/O work requests, then registers all pending* newly accepted connections and updates any interest ops on the* queue. 線程的主循環在連接上選擇()并分派準備好的I/O工作請求,然后注冊所有等待的新接受的連接并更新隊列上的任何感興趣的操作。*/public void run() {try {while (!stopped) {try {select();processAcceptedConnections();processInterestOpsUpdateRequests();} catch (RuntimeException e) {LOG.warn("Ignoring unexpected runtime exception", e);} catch (Exception e) {LOG.warn("Ignoring unexpected exception", e);}}// Close connections still pending on the selector. Any others// with in-flight work, let drain out of the work queue.for (SelectionKey key : selector.keys()) {NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();if (cnxn.isSelectable()) {cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);}cleanupSelectionKey(key);}SocketChannel accepted;while ((accepted = acceptedQueue.poll()) != null) {fastCloseSock(accepted);}updateQueue.clear();} finally {closeSelector();// This will wake up the accept thread and the other selector// threads, and tell the worker thread pool to begin shutdown.NIOServerCnxnFactory.this.stop();LOG.info("selector thread exitted run method");}}private void select() {try { //選擇一組鍵,其對應的通道已準備好進行I/O操作。selector.select(); //返回選擇器的選定鍵集。Set<SelectionKey> selected = selector.selectedKeys();ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);Collections.shuffle(selectedList);Iterator<SelectionKey> selectedKeys = selectedList.iterator();while (!stopped && selectedKeys.hasNext()) {SelectionKey key = selectedKeys.next();selected.remove(key);if (!key.isValid()) {cleanupSelectionKey(key);continue;}if (key.isReadable() || key.isWritable()) { //安排I/O處理與給定的SelectionKey關聯的連接。如果一個工作線程池沒有被使用,I/O將直接由這個線程運行。handleIO(key);} else {LOG.warn("Unexpected ops in select {}", key.readyOps());}}} catch (IOException e) {LOG.warn("Ignoring IOException while selecting", e);}}/*** Schedule I/O for processing on the connection associated with* the given SelectionKey. If a worker thread pool is not being used,* I/O is run directly by this thread. 安排I/O處理與給定的SelectionKey關聯的連接。如果一個工作線程池沒有被使用,I/O將直接由這個線程運行。*/private void handleIO(SelectionKey key) {IOWorkRequest workRequest = new IOWorkRequest(this, key);NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();// Stop selecting this key while processing on its// connection在處理其連接時停止選擇此鍵cnxn.disableSelectable(); //清除興趣組key.interestOps(0); //刷新連接的Session超時時間touchCnxn(cnxn); //執行IO線程來觸發IO讀寫工作workerPool.schedule(workRequest);}/*** Iterate over the queue of accepted connections that have been* assigned to this thread but not yet placed on the selector.*/private void processAcceptedConnections() {SocketChannel accepted;while (!stopped && (accepted = acceptedQueue.poll()) != null) {SelectionKey key = null;try {key = accepted.register(selector, SelectionKey.OP_READ);NIOServerCnxn cnxn = createConnection(accepted, key, this);key.attach(cnxn);addCnxn(cnxn);} catch (IOException e) {// register, createConnectioncleanupSelectionKey(key);fastCloseSock(accepted);}}}/*** Iterate over the queue of connections ready to resume selection,* and restore their interest ops selection mask.*/private void processInterestOpsUpdateRequests() {SelectionKey key;while (!stopped && (key = updateQueue.poll()) != null) {if (!key.isValid()) {cleanupSelectionKey(key);}NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();if (cnxn.isSelectable()) {key.interestOps(cnxn.getInterestOps());}}}}

針對SelectorThread我們一共看3個操作,這3個操作通過while來做無限循環,當stop變量設置為true時候終止循環,
在while無限循環中, 線程的主循環在連接上選擇()并分派準備好的I/O工作請求,然后注冊所有等待的新接受的連接并更新隊列上的任何感興趣的操作。

  • select();
    在連接上選擇()并分派準備好的I/O工作請求
  • processAcceptedConnections();
    處理accept線程新分派的連接, // (1)將新連接注冊到selector上;(2)包裝為NIOServerCnxn后注冊到NIOServerCnxnFactory中
  • processInterestOpsUpdateRequests();
    更新updateQueue中連接的監聽事件

14.6.2 處理被接受的連接請求processAcceptedConnections

接下來詳細看下processAcceptedConnections如何處理可以接收的連接的:

private void processAcceptedConnections() {SocketChannel accepted;while (!stopped && (accepted = acceptedQueue.poll()) != null) {SelectionKey key = null;try { //為SocketChannel注冊OP_READ事件用來接收讀請求key = accepted.register(selector, SelectionKey.OP_READ);NIOServerCnxn cnxn = createConnection(accepted, key, this); //將給定對象附加到此鍵上。在處理連接,讀取IO數據的時候都會使用到此對象來操作key.attach(cnxn); //將同一IP客戶端的連接緩存至NIOServerCnxnFactory類型中的ipMap對象中用于限制同一客戶端的連接數量,如果同一個客戶端連接數量過多則拋出Too many connections錯誤,拒絕accept連接addCnxn(cnxn);} catch (IOException e) {// register, createConnectioncleanupSelectionKey(key);fastCloseSock(accepted);}} }

為SocketChannel注冊OP_READ事件用來接收讀請求之后開始創建連接對象如下:
創建連接如下:

protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException {return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread); }

14.6.2.1看下NIOServerCnxn的構造器

public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, SelectorThread selectorThread) throws IOException {super(zk);this.sock = sock;this.sk = sk;this.factory = factory;this.selectorThread = selectorThread;if (this.factory.login != null) {this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);} //關閉Nagle算法算法,不需要緩存延遲發送sock.socket().setTcpNoDelay(true);/* set socket ling SO_LINGER還有一個作用就是用來減少TIME_WAIT套接字的數量。在設置SO_LINGER選項時,指定等待時間為0,此時調用主動關閉時不會發送FIN來結束連接,而是直接將連接設置為CLOSE狀態,清除套接字中的發送和接收緩沖區,直接對對端發送RST包//第一個參數為是否開啟SoLinger,第二個參數為如果開啟SoLinger持續時間sock.socket().setSoLinger(false, -1);InetAddress addr = ((InetSocketAddress) sock.socket().getRemoteSocketAddress()).getAddress(); //緩存遠程ip地址addAuthInfo(new Id("ip", addr.getHostAddress()));this.sessionTimeout = factory.sessionlessCnxnTimeout; }

14.6.3 更新updateQueue中連接的監聽事件processInterestOpsUpdateRequests

processInterestOpsUpdateRequests()方法:
前面我們說過處理IO事件時候會停止訂閱事件,IO處理完畢之后則獲取updateQueue中連接的監聽事件來訂閱interestOps

private void processInterestOpsUpdateRequests() {SelectionKey key;while (!stopped && (key = updateQueue.poll()) != null) {if (!key.isValid()) {cleanupSelectionKey(key);}NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();if (cnxn.isSelectable()) {key.interestOps(cnxn.getInterestOps());}}}}

14.6.4 處理IO事件

14.6.4.1 IOWorkRequest

IOWorkRequest處理IO事件發生時機當SocketChannel上有數據可讀時,worker thread調用NIOServerCnxn.doIO()進行讀操作

粘包拆包問題
處理讀事件比較麻煩的問題就是通過TCP發送的報文會出現粘包拆包問題,Zookeeper為了解決此問題,在設計通信協議時將報文分為3個部分:

  • 請求頭和請求體的長度(4個字節)
  • 請求頭
  • 請求體

注:(1)請求頭和請求體也細分為更小的部分,但在此不做深入研究,只需知道請求的前4個字節是請求頭和請求體的長度即可.(2)將請求頭和請求體稱之為payload
在報文頭增加了4個字節的長度字段,表示整個報文除長度字段之外的長度.服務端可根據該長度將粘包拆包的報文分離或組合為完整的報文.NIOServerCnxn讀取數據流程如下:

NIOServerCnxn中有兩個屬性,一個是lenBuffer,容量為4個字節,用于讀取長度信息.一個是incomingBuffer,其初始化時即為lenBuffer,但是讀取長度信息后,就為incomingBuffer分配對應的空間用于讀取payload
根據請求報文的長度分配incomingBuffer的大小
將讀到的字節存放在incomingBuffer中,直至讀滿(由于第2步中為incomingBuffer分配的長度剛好是報文的長度,此時incomingBuffer中剛好時一個報文)
處理報文

private class IOWorkRequest extends WorkerService.WorkRequest {private final SelectorThread selectorThread;private final SelectionKey key;private final NIOServerCnxn cnxn;IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {this.selectorThread = selectorThread;this.key = key;this.cnxn = (NIOServerCnxn) key.attachment();}public void doWork() throws InterruptedException {if (!key.isValid()) {selectorThread.cleanupSelectionKey(key);return;}if (key.isReadable() || key.isWritable()) { //可讀或者可寫則進行 IO操作cnxn.doIO(key);// Check if we shutdown or doIO() closed this connectionif (stopped) {cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);return;}if (!key.isValid()) {selectorThread.cleanupSelectionKey(key);return;}touchCnxn(cnxn);}// Mark this connection as once again ready for selectioncnxn.enableSelectable();// Push an update request on the queue to resume selecting// on the current set of interest ops, which may have changed// as a result of the I/O operations we just performed.if (!selectorThread.addInterestOpsUpdateRequest(key)) {cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);}}@Overridepublic void cleanup() {cnxn.close(ServerCnxn.DisconnectReason.CLEAN_UP);}}

14.6.4.2 NIOServerCnxn的doIO

可以參考這個博客:
https://blog.csdn.net/jpf254/article/details/80792086

//處理IO void doIO(SelectionKey k) throws InterruptedException {try {if (!isSocketOpen()) {LOG.warn("trying to do i/o on a null socket for session: 0x{}", Long.toHexString(sessionId));return;}if (k.isReadable()) { 從此通道讀取字節序列到給定緩沖區。 //若是客戶端請求,此時觸發讀事件//初始化時incomingBuffer即時lengthBuffer,只分配了4個字節,供用戶讀取一個int(此int值就是此次請求報文的總長度)int rc = sock.read(incomingBuffer);if (rc < 0) {handleFailedRead();} //返回當前位置和限制之間的元素數 /*只有incomingBuffer.remaining() == 0,才會進行下一步的處理,否則一直讀取數據直到incomingBuffer讀滿,此時有兩種可能:1.incomingBuffer就是lenBuffer,此時incomingBuffer的內容是此次請求報文的長度.根據lenBuffer為incomingBuffer分配空間后調用readPayload().在readPayload()中會立馬進行一次數據讀取,(1)若可以將incomingBuffer讀滿,則incomingBuffer中就是一個完整的請求,處理該請求;(2)若不能將incomingBuffer讀滿,說明出現了拆包問題,此時不能構造一個完整的請求,只能等待客戶端繼續發送數據,等到下次socketChannel可讀時,繼續將數據讀取到incomingBuffer中2.incomingBuffer不是lenBuffer,說明上次讀取時出現了拆包問題,incomingBuffer中只有一個請求的部分數據.而這次讀取的數據加上上次讀取的數據湊成了一個完整的請求,調用readPayload()if (incomingBuffer.remaining() == 0) {boolean isPayload; //一個是lenBuffer,容量為4個字節,用于讀取長度信息.一個是incomingBuffer,其初始化時即為lenBuffer,但是讀取長度信息后,就為incomingBuffer分配對應的空間用于讀取payloadif (incomingBuffer == lenBuffer) { // start of next request //切換為讀就緒狀態incomingBuffer.flip(); //在此緩沖區的當前位置讀取接下來的四個字節,根據當前字節順序將它們組合成一個 int 值,然后將位置增加 4 //這里只有傳輸的數據為四字命令時候才為false后面不需要讀取數據內容結果為false直接返回isPayload = readLength(k);incomingBuffer.clear();} else {// continuationisPayload = true;} //不是四字命令則讀取詳細內容if (isPayload) { // not the case for 4letterwordreadPayload();} else {// four letter words take care// need not do anything else //四字命令請求前面已經進行過處理則直接返回return;}}}if (k.isWritable()) {handleWrite(k);if (!initialized && !getReadInterest() && !getWriteInterest()) {throw new CloseRequestException("responded to info probe", DisconnectReason.INFO_PROBE);}}} catch (CancelledKeyException e) {LOG.warn("CancelledKeyException causing close of session: 0x{}", Long.toHexString(sessionId));LOG.debug("CancelledKeyException stack trace", e);close(DisconnectReason.CANCELLED_KEY_EXCEPTION);} catch (CloseRequestException e) {// expecting close to log session closureclose();} catch (EndOfStreamException e) {LOG.warn("Unexpected exception", e);// expecting close to log session closureclose(e.getReason());} catch (ClientCnxnLimitException e) {// Common case exception, print at debug levelServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);LOG.warn("Closing session 0x{}", Long.toHexString(sessionId), e);close(DisconnectReason.CLIENT_CNX_LIMIT);} catch (IOException e) {LOG.warn("Close of session 0x{}", Long.toHexString(sessionId), e);close(DisconnectReason.IO_EXCEPTION);} } /*** 有兩種情況會調用此方法:* 1.根據lengthBuffer的值為incomingBuffer分配空間后,此時尚未將數據從socketChannel讀取至incomingBuffer中* 2.已經將數據從socketChannel中讀取至incomingBuffer,且讀取完畢* <p>* Read the request payload (everything following the length prefix)*/ private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {if (incomingBuffer.remaining() != 0) { // have we read length bytes? //對應情況1,此時剛為incomingBuffer分配空間,incomingBuffer為空,進行一次數據讀取//(1)若將incomingBuffer讀滿,則直接進行處理;//(2)若未將incomingBuffer讀滿,則說明此次發送的數據不能構成一個完整的請求,則等待下一次數據到達后調用doIo()時再次將數據//從socketChannel讀取至incomingBufferint rc = sock.read(incomingBuffer); // sock is non-blocking, so okif (rc < 0) {handleFailedRead();}}if (incomingBuffer.remaining() == 0) { // have we read length bytes?incomingBuffer.flip(); //不管是情況1還是情況2,此時incomingBuffer已讀滿,其中內容必是一個request,處理該request//更新統計值packetReceived(4 + incomingBuffer.remaining());if (!initialized) { /處理連接請求readConnectRequest();} else { //處理普通請求readRequest();} //請求處理結束,重置lenBuffer和incomingBufferlenBuffer.clear();incomingBuffer = lenBuffer;} } //讀取連接數據 private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {if (!isZKServerRunning()) {throw new IOException("ZooKeeperServer not running");}zkServer.processConnectRequest(this, incomingBuffer);initialized = true; } //讀取包數據 private void readRequest() throws IOException {zkServer.processPacket(this, incomingBuffer); }

ZooKeeperServer處理連接請求:processConnectRequest
可以參考文章:

  • https://www.cnblogs.com/Benjious/p/11462064.html
    session:
  • https://my.oschina.net/anxiaole/blog/3217373
  • https://segmentfault.com/a/1190000022193168

14.6.4.3 處理連接請求processConnectRequest

調用代碼如下:
zkServer.processConnectRequest(this, incomingBuffer);

解析代碼如下: @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup") public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)throws IOException, ClientCnxnLimitException {BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); //ConnectRequest connReq = new ConnectRequest(); //反序列化連接信息connReq.deserialize(bia, "connect");LOG.debug("Session establishment request from client {} client's lastZxid is 0x{}",cnxn.getRemoteSocketAddress(),Long.toHexString(connReq.getLastZxidSeen()));long sessionId = connReq.getSessionId();int tokensNeeded = 1; //節流時候是否考慮連接權重配置參數為zookeeper.connection_throttle_weight_enabled 默認值為falseif (connThrottle.isConnectionWeightEnabled()) {if (sessionId == 0) {if (localSessionEnabled) {tokensNeeded = connThrottle.getRequiredTokensForLocal();} else {tokensNeeded = connThrottle.getRequiredTokensForGlobal();}} else {tokensNeeded = connThrottle.getRequiredTokensForRenew();}} //令牌桶限流算法,是否有可用tokenif (!connThrottle.checkLimit(tokensNeeded)) {throw new ClientCnxnLimitException();}ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);boolean readOnly = false;try { //是否為只讀readOnly = bia.readBool("readOnly");cnxn.isOldClient = false;} catch (IOException e) {// this is ok -- just a packet from an old client which// doesn't contain readOnly fieldLOG.warn("Connection request from old client {}; will be dropped if server is in r-o mode",cnxn.getRemoteSocketAddress());} //當前是只讀對象,數據不為只讀數據if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();LOG.info(msg);throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);} //當客戶端的可見zxid大于服務端的最新的事物zxid則拒絕處理,這種情況可能出現在服務端的快照和事物日志文件被刪除掉后重啟了zookeeper導致服務端zxid變小或者出現腦裂客戶端訪問了不同的分區下的zookeeper會出現如下錯誤if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {String msg = "Refusing session request for client "+ cnxn.getRemoteSocketAddress()+ " as it has seen zxid 0x"+ Long.toHexString(connReq.getLastZxidSeen())+ " our last zxid is 0x"+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())+ " client must try another server";LOG.info(msg);throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);} //初始化session超時參數int sessionTimeout = connReq.getTimeOut();byte[] passwd = connReq.getPasswd();int minSessionTimeout = getMinSessionTimeout();if (sessionTimeout < minSessionTimeout) {sessionTimeout = minSessionTimeout;}int maxSessionTimeout = getMaxSessionTimeout();if (sessionTimeout > maxSessionTimeout) {sessionTimeout = maxSessionTimeout;}cnxn.setSessionTimeout(sessionTimeout);// We don't want to receive any packets until we are sure that the// session is setupcnxn.disableRecv();if (sessionId == 0) { //創建sessionidlong id = createSession(cnxn, passwd, sessionTimeout);LOG.debug("Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",Long.toHexString(id),Long.toHexString(connReq.getLastZxidSeen()),connReq.getTimeOut(),cnxn.getRemoteSocketAddress());} else {long clientSessionId = connReq.getSessionId();LOG.debug("Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",Long.toHexString(clientSessionId),Long.toHexString(connReq.getLastZxidSeen()),connReq.getTimeOut(),cnxn.getRemoteSocketAddress());if (serverCnxnFactory != null) {serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);}if (secureServerCnxnFactory != null) {secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);}cnxn.setSessionId(sessionId);reopenSession(cnxn, sessionId, passwd, sessionTimeout);ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);} }#### 創建會話與提交會話 createSession //創建session代碼 long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {if (passwd == null) {// Possible since it's just deserialized from a packet on the wire.passwd = new byte[0];}long sessionId = sessionTracker.createSession(timeout);Random r = new Random(sessionId ^ superSecret);r.nextBytes(passwd);ByteBuffer to = ByteBuffer.allocate(4);to.putInt(timeout);cnxn.setSessionId(sessionId);Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); //f封裝session信息請求提交請求submitRequest(si);return sessionId; } Zookeeper服務器使用SessionTrackerImpl來創建sessionid,sessionid每次遞增1 public long createSession(int sessionTimeout) {long sessionId = nextSessionId.getAndIncrement();trackSession(sessionId, sessionTimeout);return sessionId; } 這里初始化的nextSessionId是: public static long initializeNextSessionId(long id) {long nextSid;nextSid = (Time.currentElapsedTime() << 24) >>> 8;nextSid = nextSid | (id << 56);if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {++nextSid; // this is an unlikely edge case, but check it just in case}return nextSid; }

14.6.4.4 跟蹤會話過期時間

//跟蹤session的過期時間 @Override public synchronized boolean trackSession(long id, int sessionTimeout) {boolean added = false;SessionImpl session = sessionsById.get(id);if (session == null) {session = new SessionImpl(id, sessionTimeout);}// findbugs2.0.3 complains about get after put.// long term strategy would be use computeIfAbsent after JDK 1.8 //將sessionid與session對象映射關系存入本地sessionsById Map緩存對象中SessionImpl existedSession = sessionsById.putIfAbsent(id, session);if (existedSession != null) {session = existedSession;} else {added = true;LOG.debug("Adding session 0x{}", Long.toHexString(id));}if (LOG.isTraceEnabled()) {String actionStr = added ? "Adding" : "Existing";ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"SessionTrackerImpl --- " + actionStr+ " session 0x" + Long.toHexString(id) + " " + sessionTimeout);}updateSessionExpiry(session, sessionTimeout);return added; } //將對應session對象的過期時間存入sessionExpiryQueue private void updateSessionExpiry(SessionImpl s, int timeout) {logTraceTouchSession(s.sessionId, timeout, "");sessionExpiryQueue.update(s, timeout); } sessionExpiryQueued的update方法,以tickTime為單位將過期時間放入桶集合中 public Long update(E elem, int timeout) {Long prevExpiryTime = elemMap.get(elem);long now = Time.currentElapsedTime();Long newExpiryTime = roundToNextInterval(now + timeout);if (newExpiryTime.equals(prevExpiryTime)) {// No change, so nothing to updatereturn null;}// First add the elem to the new expiry time bucket in expiryMap. // expiryMap為過期時間映射元素集合的Map,key為單位過期時間,value為當前過期時間對應的集合Set<E> set = expiryMap.get(newExpiryTime);if (set == null) {// Construct a ConcurrentHashSet using a ConcurrentHashMapset = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());// Put the new set in the map, but only if another thread// hasn't beaten us to itSet<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);if (existingSet != null) {set = existingSet;}} //存入當前過期時間對應的集合中set.add(elem);// Map the elem to the new expiry time. If a different previous// mapping was present, clean up the previous expiry bucket. //prevExpiryTime = elemMap.put(elem, newExpiryTime); //如果之前的過期時間對應的元素存在則移除老數據保證數據正常刷新if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {Set<E> prevSet = expiryMap.get(prevExpiryTime);if (prevSet != null) {prevSet.remove(elem);}}return newExpiryTime; }

14.6.4.5 提交會話

ZookeeperServer提交session請求submitRequest public void submitRequest(Request si) {enqueueRequest(si); }public void enqueueRequest(Request si) {if (requestThrottler == null) {synchronized (this) {try {// Since all requests are passed to the request// processor it should wait for setting up the request// processor chain. The state will be updated to RUNNING// after the setup.while (state == State.INITIAL) {wait(1000);}} catch (InterruptedException e) {LOG.warn("Unexpected interruption", e);}if (requestThrottler == null) {throw new RuntimeException("Not started");}}}requestThrottler.submitRequest(si); }public void submitRequest(Request request) {if (stopping) {LOG.debug("Shutdown in progress. Request cannot be processed");dropRequest(request);} else {submittedRequests.add(request);} }

submittedRequests為LinkedBlockingQueue類型的請求隊列

那被放到隊列中的請求接下來是如何處理呢:
Zookeeper使用了隊列+異步的模型:請求鏈如下:

  • 提交請求:RequestThrottler.run()>Zookeeper.submitRequestNow(Request si)>
  • 預處理請求:PrepRequestProcessor.processRequest(Request request)
  • 請求持久化:SyncReuqestProcessor .run
  • 處理請求的業務:FinalRequestProcessor. processTxn

14.6.4.6 提交請求處理

具體的請求詳情到后面再看到這里我們就看完了ZookeeperServer中的readConnectRequest方法

接下來可以看下Zookeeper是如何處理度請求的
Zookeeper的讀請求處理NIOServerCnxn類中的readRequest()
調用了ZookeeperServer中的processPacket方法

private void readRequest() throws IOException {zkServer.processPacket(this, incomingBuffer); }

如何處理數據包呢可以看如下代碼:

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {// We have the request, now process and setup for next //使用jute反序列化對象InputStream bais = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);RequestHeader h = new RequestHeader();h.deserialize(bia, "header");// Need to increase the outstanding request count first, otherwise// there might be a race condition that it enabled recv after// processing request and then disabled when check throttling.//// Be aware that we're actually checking the global outstanding// request before this request.//// It's fine if the IOException thrown before we decrease the count// in cnxn, since it will close the cnxn anyway.cnxn.incrOutstandingAndCheckThrottle(h);// Through the magic of byte buffers, txn will not be// pointing// to the start of the txn 從當前待讀取位置生成新的只讀BufferincomingBuffer = incomingBuffer.slice();//如果當前請求是認證授權請求if (h.getType() == OpCode.auth) {LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());AuthPacket authPacket = new AuthPacket();ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);String scheme = authPacket.getScheme();ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);Code authReturn = KeeperException.Code.AUTHFAILED;if (ap != null) {try {// handleAuthentication may close the connection, to allow the client to choose// a different server to connect to. 處理客戶端認證authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn),authPacket.getAuth());} catch (RuntimeException e) {LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);authReturn = KeeperException.Code.AUTHFAILED;}} //認證成功則返回認證成功的消息 if (authReturn == KeeperException.Code.OK) {LOG.debug("Authentication succeeded for scheme: {}", scheme);LOG.info("auth success {}", cnxn.getRemoteSocketAddress());ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());cnxn.sendResponse(rh, null, null);} else { //認證失敗返回認證失敗的消息,同時關閉連接if (ap == null) {LOG.warn("No authentication provider for scheme: {} has {}",scheme,ProviderRegistry.listProviders());} else {LOG.warn("Authentication failed for scheme: {}", scheme);}// send a response...ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());cnxn.sendResponse(rh, null, null);// ... and close connectioncnxn.sendBuffer(ServerCnxnFactory.closeConn);cnxn.disableRecv();}return;} else if (h.getType() == OpCode.sasl) { //處理sasl認證processSasl(incomingBuffer, cnxn, h);} else { //處理請求if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) { //如果是未認證請求則直接關閉連接ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());cnxn.sendResponse(replyHeader, null, "response");cnxn.sendCloseSession();cnxn.disableRecv();} else { //處理請求Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());int length = incomingBuffer.limit(); //檢查是否是大請求,通過參數zookeeper.largeRequestThreshold配置, if (isLargeRequest(length)) {// checkRequestSize will throw IOException if request is rejected 如果是大數量傳輸則判斷最大請求字節大小防止JVM堆內存溢出,這個默認大小是100KB通過參數zookeeper.largeRequestMaxBytes配置checkRequestSizeWhenMessageReceived(length);si.setLargeRequestSize(length);}si.setOwner(ServerCnxn.me); //提交包請求,這個請求的處理與連接請求發起的處理是一樣的。具體細節可以看請求代碼submitRequest(si);}} }

在accept thread 的run()中,其執行selector.select(),并調用doAccept()接收客戶端連接,將其添加至SelectorThread.acceptedQueue()

selector thread@Overridepublic void run() {try {while (!stopped) {try {//1.調用select()讀取就緒的IO事件,交由worker thread處理select();//2.處理accept線程新分派的連接,// (1)將新連接注冊到selector上;(2)包裝為NIOServerCnxn后注冊到NIOServerCnxnFactory中processAcceptedConnections();//3.更新updateQueue中連接的監聽事件processInterestOpsUpdateRequests();} catch (RuntimeException e) {LOG.warn("Ignoring unexpected runtime exception", e);} catch (Exception e) {LOG.warn("Ignoring unexpected exception", e);}}//執行清理操作,關閉所有在selector上等待的連接...} finally {...//清理工作}}

在selector thread的run()中,主要執行3件事情
調用select()讀取就緒的IO事件,交由worker thread處理(在交由worker thread 處理之前會調用key.interestOps(0))
處理accept線程新分派的連接,
(1)將新連接注冊到selector上;
(2)包裝為NIOServerCnxn后注冊到NIOServerCnxnFactory中
更新updateQueue中連接的監聽事件
worker thread
ZooKeeper中通過WorkerService管理一組worker thread線程,其有兩種管理模式:

模式名解釋**使用場景*實現
可指定線程模式將任務指定由某一線程完成,若一系列任務需有序完成,可使用此種模式,將需按序完成的任務指定到同一線程同一會話下的一系列請求生成N個ExecutorService,每個ExecutorService只包含一個線程
不可指定線程模式任務提交后,由WorkerService隨機指定線程完成,任務之間無順序要求則使用該模式執行網絡IO生成1個ExecutorService,其中有N個線程

由于各連接的網絡IO任務之間無順序要求,NIOServerCnxnFactory使用的WorkerService采用不可指定線程模式.

/*** Schedule work to be done by the thread assigned to this id. Thread* assignment is a single mod operation on the number of threads. If a* worker thread pool is not being used, work is done directly by* this thread.* 根據id取模將workRequest分配給對應的線程.如果沒有使用worker thread* (即numWorkerThreads=0),則啟動ScheduledWorkRequest線程完成任務,當前* 線程阻塞到任務完成.** @param workRequest 待處理的IO請求* @param id 根據此值選擇使用哪一個thread處理workRequest*/public void schedule(WorkRequest workRequest, long id) {if (stopped) {workRequest.cleanup();return;}ScheduledWorkRequest scheduledWorkRequest =new ScheduledWorkRequest(workRequest);// If we have a worker thread pool, use that; // otherwise, do the work directly.int size = workers.size();if (size > 0) {try {// make sure to map negative ids as well to [0, size-1]int workerNum = ((int) (id % size) + size) % size;ExecutorService worker = workers.get(workerNum);worker.execute(scheduledWorkRequest);} catch (RejectedExecutionException e) {LOG.warn("ExecutorService rejected execution", e);workRequest.cleanup();}} else {// When there is no worker thread pool, do the work directly// and wait for its completionscheduledWorkRequest.start();try {scheduledWorkRequest.join();} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);Thread.currentThread().interrupt();}}}

在上文介紹worker thread時,說”如果該類線程數為0,則使用selector thread 直接執行IO讀寫”,但從上面源碼可以看出,若worker thread個數為0,為每個網絡IO啟動一個線程去執行,且主線程阻塞都到網絡IO執行完畢,這簡直是浪費資源,既然要阻塞到網絡IO執行完畢,為何還要單獨啟動一個線程?個人認為可能是遺留代碼或為日后擴展做準備,才會有如此不合理的代碼.因此一定不能將worker thread的個數設置為0.
我們繼續看ScheduledWorkRequest是如何處理網絡IO的

@Overridepublic void run() {try {// Check if stopped while request was on queueif (stopped) {workRequest.cleanup();return;}workRequest.doWork();} catch (Exception e) {LOG.warn("Unexpected exception", e);workRequest.cleanup();}}@Overridepublic void doWork() throws InterruptedException {//如果Channel已經關閉則清理該SelectionKeyif (!key.isValid()) {selectorThread.cleanupSelectionKey(key);return;}//1.如果可讀或可寫,則調用NIOServerCnxn.doIO()方法,通知NIOServerCnxn連接對象進行IO讀寫及處理if (key.isReadable() || key.isWritable()) {//調用NIOServerCnxn的doIO()完成IO處理cnxn.doIO(key);// Check if we shutdown or doIO() closed this connection//如果已經shutdown則關閉連接if (stopped) {cnxn.close();return;}//如果Channel已經關閉則清理該SelectionKeyif (!key.isValid()) {selectorThread.cleanupSelectionKey(key);return;}//2.更新該會話的過期時間touchCnxn(cnxn);}//3.已經處理完讀寫,重新標記該連接已準備好新的select事件監聽cnxn.enableSelectable();//把該連接重新放到selectThread的updateQueue中,selectThread會在處理處理完所有Channel的讀寫和新連接后,更新此Channel的注冊監聽事件if (!selectorThread.addInterestOpsUpdateRequest(key)) {cnxn.close();}}

除去一些健壯性代碼,主要完成3件事:
NIOServerCnxn.doIO()方法,通知NIOServerCnxn連接對象進行IO讀寫及處理
更新該連接的過期時間
網絡IO已處理完畢,修改selectable標志位和將socketChannel添加至selector thread的updateQueue中,其作用已在前文說明.

在selector thread處理accept thread接收的連接時,除了將新連接注冊到selector上之外,還將連接包裝為NIOServerCnxn后注冊到NIOServerCnxnFactory中.NIOServerCnxn是對客戶端連接的封裝,worker thread中調用NIOServerCnxn.doIO()處理網絡IO.詳見ZooKeeper-客戶端連接ServerCnxn之NIOServerCnxn

14.7 ConnectionExpirerThread

此線程用于清理過期的連接,主要方法如下:

@Overridepublic void run() {try {while (!stopped) {long waitTime = cnxnExpiryQueue.getWaitTime();if (waitTime > 0) {Thread.sleep(waitTime);continue;}for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {conn.close();}}} catch (InterruptedException e) {LOG.info("ConnnectionExpirerThread interrupted");}}

此線程的工作原理詳見Zookeeper-連接和會話的過期清理策略(ExpiryQueue)

14.8 NettyServerCnxnFactory

前面詳細說了NIO模式的連接器下面可以比較下他們兩個的差異:
NettyServerCnxnFactory使用netty進行網絡IO,但是其使用netty3.*版本,與4.*版本的實現思路雖然一致,但API差別很大,為此不再深入研究NettyServerCnxnFactory,簡單介紹下其與NIOServerCnxnFactory的不同.

不同點NIONetty
accept事件啟動1個accept threadboss group處理accept事件,默認啟動1個線程
select()啟動select thread添加handler時調用addLast(EventExecutorGroup, ChannelHandler…),則handler處理IO事件會在EventExecutorGroup中進行
網絡IO啟動worker thread啟動work group處理網絡IO,默認啟動核心數?2核心數?2個線程
處理讀事件在worker thread中調用NIOServerCnxn.doIO()處理在handler中處理讀事件
粘包拆包通過lenBuffer和incomingBuffer解決該問題,代碼很復雜插入處理粘包拆包的handler即可
處理寫事件執行FinalRP.processRequest()的線程與worker thread通過NIOServerCnxn.outgoingBuffers進行通信,由worker thread批量寫netty天生支持異步寫,若當前線程為EventLoop線程,則將待寫入數據存放到ChannelOutboundBuffer中.若當前線程不是EventLoop線程,構造寫任務添加至EventLoop任務隊列中
直接內存使用ThreadLocal的直接內存記不太清楚netty中如何使用直接內存了,但netty支持直接內存,且使用較為方便
處理連接關閉啟動connection expiration thread管理連接在handler中處理連接

注:上述區別是將netty4.版本與NIOServerCnxnFactory的對比,由于ZooKeeper使用netty3.,因此其NettyServerCnxnFactory中存在一些無用代碼,比如處理粘包拆包的代碼
從上述的比較中可以看出使用netty處理網絡IO比基于Java NIO自己編碼方便太多了,netty大法好~~
總結
總結下線程通信所用的三個隊列:

  • SelectorThread.acceptedQueue:accept thread和selector thread通信
  • SelectorThread.updateQueue:worker thread和selector thread通信
  • NIOServerCnxn.outgoingBuffers:worker thread和請求處理線程通信

總結

以上是生活随笔為你收集整理的[Zookeeper-3.6.2源码解析系列]-14-Zookeeper使用到的Reactor网络模型原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。