生活随笔
收集整理的這篇文章主要介紹了
Zookeeper--Watcher机制源码剖析一
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Watcher-- 數據變更通知
我們知道Zookeeper提供來分布式數據的訂閱/發布功能,一個典型的發布/訂閱模型系統定義了一種一對多的訂閱關系,能讓多個訂閱者同時監聽某個主題對象,當這個被監聽對象自身狀態發生變化時候,會通知所有訂閱者,Zookeeper中引入了Watcher機制來實現這種分布式通知功能,Zookeeper允許客戶端向服務器節點注冊一個Watcher監聽,當服務器端節點發生指定觸發的事件就會觸發這個Watcher,之后服務端會向指定客戶端發送一個事件通知,這樣來實現一個分布式通知的功能,如下圖所示的一個流程: 上圖中流程,Zookeeper的Watcher機制主要包括客戶端線程,客戶端WatcherManager,和Zookeeper服務器,流程上簡單的說: 客戶端向Zookeeper服務器注冊成功Watcher同時,將Watcher對象存儲在客戶端的WatcherManager 當Zookeeper服務器觸發Watcher事件后,向客戶端發送通知 客戶端線程從WatcherManager中撈出對應的Watcher對象來執行回調邏輯
Watcher接口
在Zookeeper中,接口Watcher表示一個標準的事件處理器,訂閱來通知相關的邏輯,我們可以看他的源碼: EventType:事件類型 KeeperState:通知狀態 Process(WatchedEvent event):會調方法 其中事件類型和通知狀態是有對應關系,如下表中所示
KeeperStateEventType觸發條件解釋說明 SyncConnected None 客戶端與服務器成功建立連接 客戶端和服務器處于連接狀態 SyncConnected NodeCreated Watcher 監聽的對應數據節點成功創建 客戶端和服務器處于連接狀態 SyncConnected NodeDeleted Watcher監聽的數據節點成功刪除 客戶端和服務器處于連接狀態 SyncConnected NodeDataChanged Watcher監聽的數據節點內容變更 客戶端和服務器處于連接狀態 SyncConnected NodeChildrenChanged Watcher監聽的對應數據節點列表發生變更 客戶端和服務器處于連接狀態 Disconnected None 客戶端與Zookeeper服務器斷開連接 客戶端和服務器斷開了連接 Expired None 會話超時 客戶端回話失效,通常同時也會收到SessionExpiredException異常 AuthFailed None 兩種情況:使用錯誤scheme進行權限檢查, SASL權限檢查失敗 通常同時收到AuthFailedException異常 Unknown 3.1.0后廢棄 NoSYncConnected 3.1.0后廢棄
如上列舉了Zookeeper中常見的幾個通知狀態和事件類型,其中針對NodeDateChange事件說明的節點的變更并不一定是內容變化,可能版本號DataVersion變化也是一樣會觸發。
回調方法process 是Watcher接口中的一個回調方法,當Zookeeper服務器端向客戶端發送一個Watcher事件通知的時候,客戶端會對相應的Process方法進行回調,從而實現對事件的處理,Process方法定義如下
void process ( WatchedEvent event
) ;
如上參數WatcherEvent包含了一個事件的基本屬性:
public class WatchedEvent { private final KeeperState keeperState
; private final EventType eventType
; private String path
; public WatchedEvent ( EventType eventType
, KeeperState keeperState
, String path
) { this . keeperState
= keeperState
; this . eventType
= eventType
; this . path
= path
; }
. . . . . .
}
Zookeeper服務端生成WatchedEvent事件后會調用getWrapper方法將字節包裝成一個可序列化的WatcherEvent,其實這是一個事務,都是對服務端事件的一個封裝,不同的是WatchedEvent是我們邏輯事件中的一個對象,主要用來我們程序內部的事件容器,而WatcherEvent因為實現了序列化的接口,因此可以用于網絡傳輸 在服務端得到WatcherEvent后,通過網絡傳到客戶端,還原成一個WatchedEvent,并傳遞給process,然后process方法根據入參就可以解析完整的服務端事件了。
工作機制
Zookeeper的Watcher機制可以有如下三個過程: 客戶端注冊Watcher 服務端處理watcher 客戶端回調Watcher 以下類圖說明各組件之間的關系:
客戶端注冊Watcher
我們通過如下部分源碼來分析Watcher的客戶端注冊,我們創建一個Zookeeper的客戶端對象實例時,可以向構造方法中傳入一個默認的Watcher:
public ZooKeeper ( String connectString
, int sessionTimeout
, Watcher watcher
) throws IOException
{ this ( connectString
, sessionTimeout
, watcher
, false ) ;
}
public ZooKeeper ( String connectString
, int sessionTimeout
, Watcher watcher
, boolean canBeReadOnly
, HostProvider aHostProvider
, ZKClientConfig clientConfig
) throws IOException
{ LOG
. info ( "Initiating client connection, connectString={} sessionTimeout={} watcher={}" , new Object [ ] { connectString
, Integer
. valueOf ( sessionTimeout
) , watcher
} ) ; if ( clientConfig
== null
) { clientConfig
= new ZKClientConfig ( ) ; } this . clientConfig
= clientConfig
; this . watchManager
= this . defaultWatchManager ( ) ; this . watchManager
. defaultWatcher
= watcher
; ConnectStringParser connectStringParser
= new ConnectStringParser ( connectString
) ; this . hostProvider
= aHostProvider
; this . cnxn
= this . createConnection ( connectStringParser
. getChrootPath ( ) , this . hostProvider
, sessionTimeout
, this , this . watchManager
, this . getClientCnxnSocket ( ) , canBeReadOnly
) ; this . cnxn
. start ( ) ; }
如上源碼中我們給定的Watcher對象實際上被保存在客戶端ZKWatcherManager的defaultWatcher中,另外Zookeeper客戶端也可以通過getData,getChildren,exist三個接口來向Zookeeper服務器注冊Watcher,無論哪一種都一樣,我們用getData方法的源碼來分析:
public byte [ ] getData ( String path
, Watcher watcher
, Stat stat
) throws KeeperException
, InterruptedException
{ PathUtils
. validatePath ( path
) ; ZooKeeper
. DataWatchRegistration wcb
= null
; if ( watcher
!= null
) { wcb
= new ZooKeeper. DataWatchRegistration ( watcher
, path
) ; } String serverPath
= this . prependChroot ( path
) ; RequestHeader h
= new RequestHeader ( ) ; h
. setType ( 4 ) ; GetDataRequest request
= new GetDataRequest ( ) ; request
. setPath ( serverPath
) ; request
. setWatch ( watcher
!= null
) ; GetDataResponse response
= new GetDataResponse ( ) ; ReplyHeader r
= this . cnxn
. submitRequest ( h
, request
, response
, wcb
) ; if ( r
. getErr ( ) != 0 ) { throw KeeperException
. create ( Code
. get ( r
. getErr ( ) ) , path
) ; } else { if ( stat
!= null
) { DataTree
. copyStat ( response
. getStat ( ) , stat
) ; } return response
. getData ( ) ; } }
如上源碼中參數Path, Watcher對象,getData接口注冊Watcher后,做了兩件事情 先用這兩個參數封裝來一個DataWatchRegistration,其實就是初始化來Zookeeper服務器中的WatchRegistration里面的 watcher,clientPath,這部分用來暫時存儲注冊信息保存節點和Watcher的對應關系 接著會向客戶端請求request進行標記,將其設置為“使用watcher監聽”。 接著繼續往下SubmitRequest方法:
ReplyHeader r
= this . cnxn
. submitRequest ( h
, request
, response
, wcb
) ;
public ReplyHeader
submitRequest ( RequestHeader h
, Record request
, Record response
, WatchRegistration watchRegistration
, WatchDeregistration watchDeregistration
) throws InterruptedException
{ ReplyHeader r
= new ReplyHeader ( ) ; Packet packet
= queuePacket ( h
, r
, request
, response
, null
, null
, null
, null
, watchRegistration
, watchDeregistration
) ; . . . . . . return r
; }
這個步驟中又一次將ClientCnxn中的WatchRegistration封裝到Packet中,Zookeeper中,Packet可以被看作是一個最小通信協議單元,用于進行客戶端與服務器之間的網絡傳輸,任何需要傳輸的對象都需要包裝成一個Packet對象,接著他被放入發送隊列,如下queuePacket代碼:
public Packet
queuePacket ( RequestHeader h
, ReplyHeader r
, Record request
, Record response
, AsyncCallback cb
, String clientPath
, String serverPath
, Object ctx
, WatchRegistration watchRegistration
, WatchDeregistration watchDeregistration
) { Packet packet
= null
; packet
= new Packet ( h
, r
, request
, response
, watchRegistration
) ; packet
. cb
= cb
; packet
. ctx
= ctx
; packet
. clientPath
= clientPath
; packet
. serverPath
= serverPath
; packet
. watchDeregistration
= watchDeregistration
; synchronized ( state
) { . . . . . . outgoingQueue
. add ( packet
) ; . . . . }
我們繼續追這個outgoingQueue 隊列,可以看到隨后Zookeeper客戶端會向服務器端發送這個請求,同時等待請求的返回,王朝請求發送后,會由客戶端的SendThread線程的readResponse方法負責接受來自服務端的響應,finishPacket方法會從Packet中取出對于的Watcher并注冊到ZKWatcherManager中去。
protected void finishPacket ( Packet p
) { int err
= p
. replyHeader
. getErr ( ) ; if ( p
. watchRegistration
!= null
) { p
. watchRegistration
. register ( err
) ; } . . . . . .
如Packet中的Watchregistration就是我們剛才第一步getData中保存的節點對應的Watcher的注冊信息。現在他又從這部分中取出來封裝的Watcher,如下具體的register方法:
public void register ( int rc
) { if ( shouldAddWatch ( rc
) ) { Map
< String
, Set
< Watcher> > watches
= getWatches ( rc
) ; synchronized ( watches
) { Set
< Watcher> watchers
= watches
. get ( clientPath
) ; if ( watchers
== null
) { watchers
= new HashSet < Watcher> ( ) ; watches
. put ( clientPath
, watchers
) ; } watchers
. add ( watcher
) ; } } } protected final ZKWatchManager watchManager
; protected Map
< String
, Set
< Watcher> > getWatches ( int rc
) { return watchManager
. dataWatches
; }
如上register方法中客戶端將之前暫時保存的Watcher取出來之后,放入到getWatcher獲取到的一個Map對象中,這個Mp對象就是ZkWatcherManager中的一個dataWatches,我們將剛才存入WatchRegistration中的臨時信息取出用來初始化ZKWatchManager.dataWatches,用于將數據節點的路徑和watcher對象進行一一映射,這樣就完成來客戶端Watcher的注冊,整個Watcher流程如下
如上流程中我們每次調用getData都會注冊一個Watcher,如果這些Watcher都隨著請求發送到服務器的話肯定會內存緊張,現實是這樣的碼,我們可以看之前代碼中負責傳輸數據的對象Packet中,我們將WatchRegistration封裝進去,如下Packet中的序列化方法createBB:
public void createBB ( ) { try { ByteArrayOutputStream baos
= new ByteArrayOutputStream ( ) ; BinaryOutputArchive boa
= BinaryOutputArchive
. getArchive ( baos
) ; boa
. writeInt ( - 1 , "len" ) ; if ( requestHeader
!= null
) { requestHeader
. serialize ( boa
, "header" ) ; } if ( request
instanceof ConnectRequest ) { request
. serialize ( boa
, "connect" ) ; boa
. writeBool ( readOnly
, "readOnly" ) ; } else if ( request
!= null
) { request
. serialize ( boa
, "request" ) ; } baos
. close ( ) ; this . bb
= ByteBuffer
. wrap ( baos
. toByteArray ( ) ) ; this . bb
. putInt ( this . bb
. capacity ( ) - 4 ) ; this . bb
. rewind ( ) ; } catch ( IOException e
) { LOG
. warn ( "Unexpected exception" , e
) ; } }
如上源碼中可以看到并沒有整個對象完全序列化進去,zookeeper只是將requestHeader和request兩個屬性進行序列化,WatchRegistration并沒有被序列化到底層字節數組中,所以不會進行網絡傳輸
服務端處理Watcher
上面講解了客戶端注冊Watcher的過程,并且已經了解了最終客戶端不會將Watcher對象真正床底到服務器,那么,服務端是怎么樣完成客戶端的Watcher注冊,一下我們對這部分文件進行解析。
ServerCnxn存儲
我們先看下服務器接收Watcher并將其存儲起來的過程,如下Zookeeper服務端處理Watcher序列圖: 我們先從源頭分析客戶端給了服務器那些信息,如下Zookeeper類中getData方法的源碼:
. . . . . .
RequestHeader h
= new RequestHeader ( ) ;
h
. setType ( ZooDefs
. OpCode
. getData
) ;
GetDataRequest request
= new GetDataRequest ( ) ;
request
. setPath ( serverPath
) ;
request
. setWatch ( watcher
!= null
) ;
GetDataResponse response
= new GetDataResponse ( ) ;
. . . . . .
如上RequestHeader中type類型設置的 4 ,request中給定了節點path路徑,以及一個boolean類型的watcher標識是否天劍監聽。服務端收到來自客戶端的請求后,在FinalRequestProcessor.processRequest()中會判斷當前請求的類型type來做一個策略來決定處理不同類型的請求,如下源碼:
switch ( request
. type
) {
. . . . . . case OpCode
. getData
: { . . . . . . Stat stat
= new Stat ( ) ; byte b
[ ] = zks
. getZKDatabase ( ) . getData ( getDataRequest
. getPath ( ) , stat
, getDataRequest
. getWatch ( ) ? cnxn
: null
) ; rsp
= new GetDataResponse ( b
, stat
) ; break ; }
. . . . . . }
如上,從getData請求的處理邏輯可以看到當getDataRequest.getwatch為true的時候,Zookeeper就認為當前客戶端請求需要進行Watcher注冊,于是將當前的ServerCnxn對象和數據節點路徑傳入getData方法 ServerCnxn是一個Zookeeper客戶端和服務器之間的鏈接接口,代表了一個客戶端和服務器的鏈接,ServerCnxn接口默認實現是NIOServerCNxn,同時3.4.0版本開始引入了Netty實現:NettyServerCnxn,都實現了Watcher接口并且實現process接口,所有把他看成一個Watcher對象,如下ServerCnxn對象以及兩種process實現
public abstract class ServerCnxn implements Stats , Watcher
{
. . . . . . public abstract void process ( WatchedEvent event
) ; . . . . . .
}
繼續追getData源碼,getZkDataBase獲取到的ZKDatabase 對象,其中DataTree 對象是現在Zookeeper現有的節點數據的樹形存儲,我們可以通過path來從這獲取到對應節點信息,如下獲取DataNode,初始化節點狀態,將DataNode天驕到WatchManager 對象中的WatchTable和watch2Paths中
byte b
[ ] = zks
. getZKDatabase ( ) . getData ( getDataRequest
. getPath ( ) , stat
, getDataRequest
. getWatch ( ) ? cnxn
: null
) ;
public byte [ ] getData ( String path
, Stat stat
, Watcher watcher
) throws KeeperException
. NoNodeException
{ DataNode n
= nodes
. get ( path
) ; if ( n
== null
) { throw new KeeperException. NoNodeException ( ) ; } synchronized ( n
) { n
. copyStat ( stat
) ; if ( watcher
!= null
) { dataWatches
. addWatch ( path
, watcher
) ; } return n
. data
; } }
Watchmanager是Zookeeper服務端Watcher的管理者,內部管理的WatcherTable和Watch2Paths,所以一個節點存儲了兩次,不過是從如下兩個未存存儲 watchTable是從數據節點路徑的粒度來托管Watcher watch2Paths是從Watcher的粒度來空值時間觸發需要出發的數據節點。
* /
public class WatchManager { private static final Logger LOG
= LoggerFactory
. getLogger ( WatchManager
. class ) ; private final HashMap
< String
, HashSet
< Watcher> > watchTable
= new HashMap < String
, HashSet
< Watcher> > ( ) ; private final HashMap
< Watcher
, HashSet
< String> > watch2Paths
= new HashMap < Watcher
, HashSet
< String> > ( ) ; . . . . . . }
WatcherManager - watchTable: HashMap<String, HashSet>(); + watch2Paths :new HashMap<Watcher, HashSet>(); + addwatch(String ,Watcher): void + removeWatcher(Watcher): void + triggerWatch(String, EventType):Set +Trigger
上一篇Zookeeper–ZAB與Paxos算法聯系與區別 下一篇Zookeeper–Watcher機制源碼剖析二
總結
以上是生活随笔 為你收集整理的Zookeeper--Watcher机制源码剖析一 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。