Hadoop 副本存储策略的源码修改和设置
Table of Contents
- BlockPlacementPolicy
- Hadoop 提供的 BlockPlacementPolicy 實(shí)現(xiàn)
- BlockPlacementPolicyDefault 源碼閱讀
- 首先
- 處理favoredNodes
- 三副本選擇
- 再到具體的選擇
- 源碼閱讀的幾個(gè)注意
- 修改HDFS默認(rèn)的副本放置機(jī)制
- RackAwareness 機(jī)架感知
大多數(shù)的叫法都是副本放置策略,實(shí)質(zhì)上是HDFS對(duì)所有數(shù)據(jù)的位置放置策略,并非只是針對(duì)數(shù)據(jù)的副本。因此Hadoop的源碼里有block replicator(configuration)、 BlockPlacementPolicy(具體邏輯源碼)兩種叫法。
主要用途:上傳文件時(shí)決定文件在HDFS上存儲(chǔ)的位置(具體到datanode上的具體存儲(chǔ)介質(zhì),如具體到存儲(chǔ)在哪塊硬盤(pán));rebalance、datanode退出集群、副本數(shù)量更改等導(dǎo)致數(shù)據(jù)移動(dòng)的操作中,數(shù)據(jù)移動(dòng)的具體位置。
BlockPlacementPolicy
BlockPlacementPolicy 作為虛基類(lèi)提供了基本的接口,具體的子類(lèi)重點(diǎn)實(shí)現(xiàn)下面?選擇副本?、?驗(yàn)證副本放置是否滿足要求?、?選擇能夠刪除的副本?三個(gè)函數(shù):
/*** 核心的副本放置策略實(shí)現(xiàn),返回副本放置數(shù)量的存儲(chǔ)位置* **如果有效節(jié)點(diǎn)數(shù)量不夠(少于副本數(shù)),返回盡可能多的節(jié)點(diǎn),而非失敗**** @param srcPath 上傳文件的路徑* @param numOfReplicas 除下面chosen參數(shù)里已經(jīng)選擇的datanode,還需要的副本數(shù)量* @param writer 寫(xiě)數(shù)據(jù)的機(jī)器, null if not in the cluster. 一般用于放置第一個(gè)副本以降低網(wǎng)絡(luò)通信* @param chosen 已經(jīng)選擇的節(jié)點(diǎn)* @param returnChosenNodes 返回結(jié)果里是否包含chosen的datanode* @param excludedNodes 不選的節(jié)點(diǎn)* @param blocksize 塊大小* @return 排序好的選擇結(jié)果*/public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,int numOfReplicas,Node writer,List<DatanodeStorageInfo> chosen,boolean returnChosenNodes,Set<Node> excludedNodes,long blocksize,BlockStoragePolicy storagePolicy);/*** 判斷傳入的放置方式是否符合要求*/abstract public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, int numOfReplicas);/*** 當(dāng)副本數(shù)量較多時(shí),選擇需要?jiǎng)h除的節(jié)點(diǎn)*/abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas,List<StorageType> excessTypes, DatanodeDescriptor addedNode,DatanodeDescriptor delNodeHint);Hadoop 提供的 BlockPlacementPolicy 實(shí)現(xiàn)
Hadoop提供了BlockPlacementPolicyDefault、BlockPlacementPolicyWithNodeGroup、AvailableSpaceBlockPlacementPolicy三種實(shí)現(xiàn)(hadoop 2.7.7)。
其中BlockPlacementPolicyDefault是默認(rèn)三副本策略的實(shí)現(xiàn):第一個(gè)副本盡可能放在寫(xiě)入數(shù)據(jù)的節(jié)點(diǎn),第二個(gè)副本放在與第一個(gè)副本不在同一機(jī)架(rack)下的節(jié)點(diǎn),第三個(gè)副本與第二副本放在同一個(gè)機(jī)架。
BlockPlacementPolicyWithNodeGroup中第一、二個(gè)副本和Default副本放置相同,第三個(gè)副本在第二個(gè)機(jī)架下選擇不同node group的結(jié)點(diǎn)。AvailableSpaceBlockPlacementPolicy實(shí)現(xiàn)存儲(chǔ)平衡。Hadoop3.1中還加入了BlockPlacementPolicyRackFaultTolerant將數(shù)據(jù)存儲(chǔ)到更多的機(jī)架下,BlockPlacementPolicyWithUpgradeDomain使用默認(rèn)的副本放置策略,但是3個(gè)副本選擇的datanode都要有不同的upgrade domains(為了方便大集群中datanode的更新和重啟、將結(jié)點(diǎn)分配給不同的upgrade domain)。
通過(guò)改變dfs.block.replicator.classname?能夠選擇具體的實(shí)現(xiàn)類(lèi),默認(rèn)值為org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault。(Hadoop 2.7.7下,貌似不同版本的Hadoop的命名還不一樣,而且2.7.7默認(rèn)的配置文件里還沒(méi)有,需要在源碼中查)
BlockPlacementPolicyDefault 源碼閱讀
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,int numOfReplicas,Node writer,List<DatanodeStorageInfo> chosen,boolean returnChosenNodes,Set<Node> excludedNodes,long blocksize,BlockStoragePolicy storagePolicy);chooseTarget函數(shù)實(shí)現(xiàn)了具體的三副本策略。各種特殊情況(如只有1個(gè)副本、datanode數(shù)量不夠、集群拓?fù)洳粷M足要求等)的考慮讓代碼看起來(lái)比較復(fù)雜,常規(guī)情況直接跟著調(diào)試代碼走會(huì)跳過(guò)很多異常處理部分,便于裂解正常流程。
在副本的選擇上用了各種帶chooseTarget函數(shù),注意有幾個(gè)函數(shù)結(jié)果是通過(guò)參數(shù)傳出而不是返回值。
主要實(shí)現(xiàn)思路:
首先
srcPath沒(méi)有被考慮,被直接舍棄:
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,excludedNodes, blocksize, storagePolicy, flags); // ignore srcPath因此默認(rèn)的副本放置策略,在同一文件包含多個(gè)block時(shí),每個(gè)block的存儲(chǔ)位置獨(dú)立考慮,并非存儲(chǔ)在同一datanode。
處理favoredNodes
上傳文件時(shí)可以指定favoredNodes(默認(rèn)為空),首先對(duì)favoredNodes所在的節(jié)點(diǎn)判斷是否合適。如果滿足條件的節(jié)點(diǎn)數(shù)還低于副本數(shù),則添加新的副本。
// --------------Choose favored nodes ---------------// 從favored nodes中選擇,在上傳文件時(shí)可以指定List<DatanodeStorageInfo> results = new ArrayList<>();boolean avoidStaleNodes = stats != null&& stats.isAvoidingStaleDataNodesForWrite();int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas);numOfReplicas = maxNodesAndReplicas[0];int maxNodesPerRack = maxNodesAndReplicas[1];chooseFavouredNodes(src, numOfReplicas, favoredNodes,favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results,avoidStaleNodes, storageTypes);// ---------------如果滿足要求的favored nodes數(shù)量不足-----------if (results.size() < numOfReplicas) {// Not enough favored nodes, choose other nodes, based on block// placement policy (HDFS-9393).numOfReplicas -= results.size();for (DatanodeStorageInfo storage : results) {// add localMachine and related nodes to favoriteAndExcludedNodesaddToExcludedNodes(storage.getDatanodeDescriptor(),favoriteAndExcludedNodes);}DatanodeStorageInfo[] remainingTargets =chooseTarget(src, numOfReplicas, writer,new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,favoriteAndExcludedNodes, blocksize, storagePolicy, flags);for (int i = 0; i < remainingTargets.length; i++) {results.add(remainingTargets[i]);}}三副本選擇
實(shí)現(xiàn)邏輯在 chooseTargetInOrder(…) 函數(shù)中
// 第一個(gè)副本的選擇 if (numOfResults == 0) {writer = chooseLocalStorage(writer, excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes, storageTypes, true).getDatanodeDescriptor();if (--numOfReplicas == 0) {return writer;} }// 選擇與第一個(gè)副本不在同一Rack下的第二個(gè)副本 final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); if (numOfResults <= 1) {chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);if (--numOfReplicas == 0) {return writer;} }// 第三個(gè)副本 if (numOfResults <= 2) {final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();// 第一、二副本在同一Rack下時(shí)選第三個(gè)副本 // (前面的favoredNodes以及集群條件可能造成這種情況)if (clusterMap.isOnSameRack(dn0, dn1)) {chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);} else if (newBlock){ // 正常情況,第二副本的localRack下選第三副本chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);} else { // 其它的以外chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);}if (--numOfReplicas == 0) {return writer;} }// 如果副本數(shù)量還沒(méi)到0,剩下的副本隨機(jī)選擇 chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes, storageTypes); return writer;再到具體的選擇
選擇具體的存儲(chǔ)位置被上面包裝到了 chooseRemoteRack 和 chooseLocalRack 兩個(gè)函數(shù)。
實(shí)際調(diào)用時(shí)只是 chooseRandom 函數(shù),在限定的rack下選擇一個(gè)隨機(jī)的節(jié)點(diǎn)。
源碼閱讀的幾個(gè)注意
代碼在直接閱讀時(shí)各種跳,但主線思路比較明確。主要帶來(lái)閱讀困難的位置:
修改HDFS默認(rèn)的副本放置機(jī)制
可以選擇直接復(fù)制或繼承BlockPlacementPolicyDefault的實(shí)現(xiàn),或者直接繼承BlockPlacementPolicy類(lèi)編寫(xiě)對(duì)應(yīng)的接口具體實(shí)現(xiàn)。
將編譯好的jar包放入$HADOOP_PREFIX/share/hadoop/common下(或者其它的Hadoop jar包路徑)。
改變dfs.block.replicator.classname?為上面的實(shí)現(xiàn)類(lèi),要帶包的名稱(chēng)。
RackAwareness 機(jī)架感知
Hadoop 并不能自動(dòng)檢測(cè)集群的機(jī)架狀態(tài),而是要預(yù)先設(shè)置機(jī)架的狀態(tài),通過(guò)腳本或java類(lèi)將datanode的ip轉(zhuǎn)換成具體的機(jī)架上的位置。
官方文檔介紹了基本思路,雖然實(shí)現(xiàn)上介紹得不是太清楚,只要將輸入的ip轉(zhuǎn)換成"/rackNum"的形式即可。
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/RackAwareness.html
?Categories:cloud computing
?Tags:hadoop
總結(jié)
以上是生活随笔為你收集整理的Hadoop 副本存储策略的源码修改和设置的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 第五章I/O管理
- 下一篇: 不同网段通过静态路由实现互通(强烈推荐)