MapReduce源码分析之JobSplitWriter
JobSplitWriter被作業客戶端用于寫分片相關文件,包括分片數據文件job.split和分片元數據信息文件job.splitmetainfo。它有兩個靜態成員變量,如下:
?
[java]?view plaincopy? ? ? ? 并且,提供了一個靜態方法,完成SPLIT_FILE_HEADER的初始化,代碼如下:
?
?
[java]?view plaincopy? ? ? ??JobSplitWriter實現其功能的為createSplitFiles()方法,它有三種實現,我們先看其中的public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,Configuration conf, FileSystem fs, T[] splits),代碼如下:
?
?
[java]?view plaincopy? ? ? ??createSplitFiles()方法的邏輯很清晰,大體如下:
?
? ? ? ? 1、調用createFile()方法,創建分片文件,并獲取文件系統數據輸出流FSDataOutputStream實例out,對應路徑為jobSubmitDir/job.split,jobSubmitDir為參數yarn.app.mapreduce.am.staging-dir指定的路徑/作業所屬用戶user/.staging/作業ID;
? ? ? ? 2、調用writeNewSplits()方法,將分片數據寫入分片文件,并得到分片元數據信息SplitMetaInfo數組info;
? ? ? ? 3、關閉輸出流out;
? ? ? ? 4、調用writeJobSplitMetaInfo()方法,將分片元數據信息寫入分片元數據文件。
? ? ? ? 我們先來看下createFile()方法,代碼如下:
?
[java]?view plaincopy? ? ? ? 首先,調用HDFS文件系統FileSystem的create()方法,獲取文件系統數據輸出流FSDataOutputStream實例out,對應權限為JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;
?
? ? ? ? 其次,獲取副本數replication,取參數mapreduce.client.submit.file.replication,參數未配置默認為10;
? ? ? ? 接著,通過文件系統FileSystem實例fs的setReplication()方法,設置splitFile的副本數位10;
? ? ? ? 然后,調用writeSplitHeader()方法寫入分片頭信息;
? ? ? ? 最后,返回文件系統數據輸出流out。
? ? ? ??writeSplitHeader()方法專門用于將分片頭部信息寫入分片文件,代碼如下:
?
[java]?view plaincopy? ? ? ? 很簡單,首先文件系統數據輸出流out寫入byte[],內容為UTF-8格式的"SPL",然后文件系統數據輸出流out寫入int,分片版本號,目前為1。
?
? ? ? ? 接下來,我們再看下writeNewSplits()方法,它將分片數據寫入分片文件,并得到分片元數據信息SplitMetaInfo數組info,代碼如下:
?
[java]?view plaincopy? ? ? ??writeNewSplits()方法的邏輯比較清晰,大體如下:
?
? ? ? ? 1、根據array的大小,構造同等大小的分片元數據信息SplitMetaInfo數組info,array其實是傳入的分片數組;
? ? ? ? 2、如果array中有數據:
? ? ? ? ? ? ? 2.1、創建序列化工廠SerializationFactory實例factory;
? ? ? ? ? ? ? 2.2、獲取最大的數據塊位置maxBlockLocations,取參數mapreduce.job.max.split.locations,參數未配置默認為10;
? ? ? ? ? ? ? 2.3、通過輸出流out的getPos()方法獲取輸出流out的當前位置offset;
? ? ? ? ? ? ? 2.4、遍歷數組array中每個元素split:
? ? ? ? ? ? ? ? ? ? ? ?2.4.1、通過輸出流out的getPos()方法獲取輸出流out的當前位置prevCount;
? ? ? ? ? ? ? ? ? ? ? ?2.4.2、往輸出流out中寫入String,內容為split對應的類名;
? ? ? ? ? ? ? ? ? ? ? ?2.4.3、獲取序列化器Serializer實例serializer;
? ? ? ? ? ? ? ? ? ? ? ?2.4.4、打開serializer,接入輸出流out;
? ? ? ? ? ? ? ? ? ? ? ?2.4.5、將split序列化到輸出流out;
? ? ? ? ? ? ? ? ? ? ? ?2.4.6、通過輸出流out的getPos()方法獲取輸出流out的當前位置currCount;
? ? ? ? ? ? ? ? ? ? ? ?2.4.7、通過split的getLocations()方法,獲取位置信息locations;
? ? ? ? ? ? ? ? ? ? ? ?2.4.8、確保位置信息locations的長度不能超過maxBlockLocations,超過則截斷;
? ? ? ? ? ? ? ? ? ? ? ?2.4.9、構造split對應的元數據信息,并加入info指定位置,offset為當前split在split文件中的起始位置,數據長度為split.getLength(),位置信息為locations;
? ? ? ? ? ? ? ? ? ? ? ?2.4.10、offset增加當前split已寫入數據大小;
? ? ? ? 3、返回分片元數據信息SplitMetaInfo數組info。
? ? ? ? 其中,序列化split對象時,我們以FileSplit為例來分析,其write()方法如下:
?
[java]?view plaincopy? ? ? ? 比較簡單,分別寫入文件路徑全名、分片在文件中的起始位置、分片在文件中的長度三個信息。
?
? ? ? ? 綜上所述,分片文件job.split文件的內容為:
? ? ? ? 1、文件頭:"SPL"+int類型版本號1;
? ? ? ? 2、分片類信息:String類型split對應類名;
? ? ? ? 3、分片數據信息:String類型文件路徑全名+Long類型分片在文件中的起始位置+Long類型分片在文件中的長度。
? ? ? ? 而在最后,構造分片元數據信息時,產生的是JobSplit的靜態內部類SplitMetaInfo對象,包括分片位置信息locations、split在split文件中的起始位置offset、分片長度split.getLength()。
? ? ? ? 下面,我們再看下分片的元數據信息文件是如何產生的,讓我們來研究下writeJobSplitMetaInfo()方法,代碼如下:
?
[java]?view plaincopy? ? ? ??writeJobSplitMetaInfo()方法的主體邏輯也十分清晰,大體如下:
?
? ? ? ? 1、調用HDFS文件系統FileSystem的create()方法,生成分片元數據信息文件,并獲取文件系統數據輸出流FSDataOutputStream實例out,對應文件路徑為jobSubmitDir/job.splitmetainfo,jobSubmitDir為參數yarn.app.mapreduce.am.staging-dir指定的路徑/作業所屬用戶user/.staging/作業ID,對應權限為JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;
? ? ? ? 2、寫入分片元數據頭部信息UTF-8格式的字符串"META-SPL"的字節數組byte[];
? ? ? ? 3、寫入分片元數據版本號splitMetaInfoVersion,當前為1;
? ? ? ? 4、寫入分片元數據個數,為分片元數據信息SplitMetaInfo數組個數allSplitMetaInfo.length;
? ? ? ? 5、遍歷分片元數據信息SplitMetaInfo數組allSplitMetaInfo中每個splitMetaInfo,挨個寫入輸出流;
? ? ? ? 6、關閉輸出流out。
? ? ? ? 我們看下如何序列化JobSplit.SplitMetaInfo,將其寫入文件,JobSplit.SplitMetaInfo的write()如下:
?
[java]?view plaincopy ? ? ? ? 每個分片的元數據信息,包括分片位置個數、分片文件位置、分片元數據信息的起始位置、分片大小等內容。
?
? ? ? ? 總結
? ? ? ??JobSplitWriter被作業客戶端用于寫分片相關文件,包括分片數據文件job.split和分片元數據信息文件job.splitmetainfo。分片數據文件job.split存儲的主要是每個分片對應的HDFS文件路徑,和其在HDFS文件中的起始位置、長度等信息,而分片元數據信息文件job.splitmetainfo存儲的則是每個分片在分片數據文件job.split中的起始位置、分片大小等信息。
? ? ? ??job.split文件內容:文件頭 + 分片 + 分片 + ... + 分片
? ? ? ? 文件頭:"SPL" + 版本號1
? ? ? ? 分片:分片類 + 分片數據,分片類=String類型split對應類名,分片數據=String類型HDFS文件路徑全名+Long類型分片在HDFS文件中的起始位置+Long類型分片在HDFS文件中的長度
? ? ? ??job.splitmetainfo文件內容:文件頭 + 分片元數據個數 +?分片元數據 +?分片元數據 + ... +?分片元數據
? ? ? ??文件頭:"META-SPL" + 版本號1
? ? ? ??分片元數據個數:分片元數據的個數
? ? ? ? 分片元數據:分片位置個數+分片位置+在分片文件job.split中的起始位置+分片大小
轉載于:https://www.cnblogs.com/jirimutu01/p/5556356.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的MapReduce源码分析之JobSplitWriter的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: android training 笔记
- 下一篇: To-do List