HBase Replication源码解析之HLog读取
2019獨角獸企業重金招聘Python工程師標準>>>
在HRegionServer中兩個量和replication相關,如下所示:
?
[java]?view plain?copy
在ReplicationSourceService中只有一個方法getWALActionsListener,該方法返回WALActionsListener。ReplicationSinkService同樣也是一個接口類,它有一個方法replicateLogEntries。在HRegionServer的如下代碼段中會啟動replicationservice。
?
?
[java]?view plain?copy
startReplicationService中做了三件事,分別是調用ReplicationSourceManager的init方法,初始化replicationSink,初始化調度線程池scheduleThreadPool;
startReplicationService方法中調用了ReplicationSourceManager的init方法,init中遍歷replicationPeers中的peerid,并以該id為參數,調用addSource方法,在addSource中針對每一個peerid構造了一個對象ReplicationSource,ReplicationSource是個守護進程,這里初始化的時候并不是通過構造函數,而是通過getReplicationSource函數,在這個方法里先獲得了一個ReplicationSource的接口,接著調用init初始化該接口,此外,getReplicationSource還有一個重要的作用是它實例化了replicationEndpoint(HBaseInterClusterReplicationEndpoint)?;氐絘ddSource這個方法,它返回前調用了ReplicationSource的startup方法,startup是個挺有意思的方法,代碼如下:
?
ReplicationSource是個守護線程,在startUp中啟動了自己。。。。這么說也就是replicationPeers中的每個peerid都表示了一個slave集群,而每個slave集群都有一個自己的ReplicationSource線程?,F在的重點就落在了ReplicationSource這個守護線程的處理邏輯,可以從它的run方法入手分析。
?
run中有如下幾個關鍵步驟,首先:
? ? ? ? ? 1、啟動replicationEndpoint :Service.State state = replicationEndpoint.start().get();
? ? ?? ? ?2、構造walEntryFilter:this.walEntryFilter = new ChainWALEntryFilter(filters);
? ? ?? ? ?3、進入一個循環,循環持續運行至守護線程ReplicationSource終止:
? ? ? ? ? ? ? ? ? ??? ? ?while(isActive) {
? ? ? ? ? ? ? ? ? ? ? ? ?? ? ?獲取log path;
? ? ? ? ? ? ? ? ? ? ? ? ?? ? ?調用openReader打開當前path的log reader(后文詳解);
? ? ? ? ? ? ? ? ? ? ? ? ?? ? ?從reader中依次讀取WAL.Entry并放入一個List<WAL.Entry>的數據結構中,方法調用如下:
? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ? ?readAllEntriesToReplicateOrNextFile(currentWALisBingWrittenTo, entries)
? ? ? ? ? ? ? ? ? ? ? ? ?? ? ?最后調用shipEdits將entries發送到遠端集群;
? ? ? ? ? ? ? ? ? ? ?????}
?
發送WALEntry到從集群的邏輯在方法shipEdits中完成,ship方法接收一個List<WAL.Entry>類型的參數entries,在shipEdits中entries參數被包裝進replicateContext中并發送到從集群,這部分的主要代碼如下所示:
還記得前文中說到,replicationEndpoint在getReplicationSource中初始化為HBaseInterClusterReplicationEndpoint類型的變量。進入HBaseInterClusterReplicationEndpoint的replicate方法的實現,該方法首先從參數replicateContext中獲得List<Entry> entries,關鍵的wal傳遞在下面這段代碼中:
其中最后一句將Entry對象序列化之后由文首RegionServer中初始化的ReplicationSinkService發送到遠端集群;
以上這些就是大概的replication時,wal跨集群傳遞的一些細節實現。接下來回過頭詳細解釋上文留下的一個小辮子,就是圍繞ReplicationSource的openReader方法的實現,分析這個調用的目的是理清wal的讀邏輯是什么樣的。
?
ReplicationSource的openReader以currentPath為參數,調用ReplicationWALReaderManager的openReader
?
ReplicationWALReaderManager的openReader通過WALFactory.createReader返回指定文件的reader;
?
看看WALFactory.createReader中的關鍵代碼吧:
?
?
可見Reader是在這里構建的,我們以最常見的lrClass屬于ProtobufLogReader.class為例來解釋,首先初始化一個數據輸入流FSDataInputStream,通過這個流打開文件fs(fs在輸入參數中指定),根據isPbWal選擇new不同的Reader實例,最后調用reader的init方法完成初始化工作。這里的Reader大多數是DefaultWALProvider.Reader類型的。
?
Reader創建已經分析完畢,那讀實現是什么樣的?
?
讀的動作主要在readAllEntriesToReplicateOrNextFile中,該方法接收一個List<WAL.Entry>類型的參數entries,也就是說讀到的各個log entry在entries中返回,下面一一分析readAllEntriesToReplicateOrNextFile中的主要邏輯。
? ? ? ? ? 1、this.repLogReader.seek();
?? ? ? ? ?2、WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
?? ? ? ? ?3、進入循環
?? ? ? ? ? ? ? ? while(entry != null) {
?? ? ? ? ? ? ? ? ? ? ? //過濾掉已經消費掉的log entry
?? ? ? ? ? ? ? ? ? ? ?if (replicationEndpoint.canReplicateToSameCluster()
? ? ? ? ? ? ? ? ? ? ? ? ? || !entry.getKey().getClusterIds().contains(peerClusterId)) {
? ? ? ? ? ? ? ? ? ? ? ? ? entry = walEntryFilter.filter(entry); ?//過濾的邏輯在walEntryFilter中實現
? ? ? ? ? ? ? ? ? ? ? ? ? entries.add(entry);
? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? try {
?? ? ? ? ? ? ? ? ? ? ? ? ? ? ?entry = this.repLogReader.readNextAndSetPosition();
? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? }
?
?
? ? ?? ? ?4、各種metrics處理;
?
WALEntryFilter的作用是在把wal entries發送到slave集群前過濾掉某些并不需要的發送WAL Entries,它有很多個實現類,所有的類都實現了filter方法,這些不同的WALEntryFilter可以通過ChainWALEntryFilter構成一條責任鏈。HLog文件讀出的wal entries流經責任鏈,篩選出需要replicate的walEntry,這是典型的責任鏈模式的應用。
轉載于:https://my.oschina.net/sniperLi/blog/910764
總結
以上是生活随笔為你收集整理的HBase Replication源码解析之HLog读取的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [BZOJ 1076][SCOI2008
- 下一篇: 常用的 16 个 Sublime Tex