日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce源码分析之JobSplitWriter

發布時間:2025/4/16 编程问答 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce源码分析之JobSplitWriter 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

JobSplitWriter被作業客戶端用于寫分片相關文件,包括分片數據文件job.split和分片元數據信息文件job.splitmetainfo。它有兩個靜態成員變量,如下:

?

[java]?view plaincopy
  • //?分片版本,當前默認為1??
  • private?static?final?int?splitVersion?=?JobSplit.META_SPLIT_VERSION;??
  • //?分片文件頭部,為UTF-8格式的字符串"SPL"的字節數組"SPL"??
  • private?static?final?byte[]?SPLIT_FILE_HEADER;??
  • ? ? ? ? 并且,提供了一個靜態方法,完成SPLIT_FILE_HEADER的初始化,代碼如下:

    ?

    ?

    [java]?view plaincopy
  • //?靜態方法,加載SPLIT_FILE_HEADER為UTF-8格式的字符串"SPL"的字節數組byte[]??
  • static?{??
  • ??try?{??
  • ????SPLIT_FILE_HEADER?=?"SPL".getBytes("UTF-8");??
  • ??}?catch?(UnsupportedEncodingException?u)?{??
  • ????throw?new?RuntimeException(u);??
  • ??}??
  • }??
  • ? ? ? ??JobSplitWriter實現其功能的為createSplitFiles()方法,它有三種實現,我們先看其中的public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,Configuration conf, FileSystem fs, T[] splits),代碼如下:

    ?

    ?

    [java]?view plaincopy
  • ?//?創建分片文件??
  • ?public?static?<T?extends?InputSplit>?void?createSplitFiles(Path?jobSubmitDir,???
  • ?????Configuration?conf,?FileSystem?fs,?T[]?splits)???
  • ?throws?IOException,?InterruptedException?{??
  • ?????
  • //?調用createFile()方法,創建分片文件,并獲取文件系統數據輸出流FSDataOutputStream實例out,??
  • //?對應路徑為jobSubmitDir/job.split,jobSubmitDir為參數yarn.app.mapreduce.am.staging-dir指定的路徑/作業所屬用戶user/.staging/作業ID??
  • FSDataOutputStream?out?=?createFile(fs,???
  • ???????JobSubmissionFiles.getJobSplitFile(jobSubmitDir),?conf);??
  • ??
  • //?調用writeNewSplits()方法,將分片數據寫入分片文件,并得到分片元數據信息SplitMetaInfo數組info??
  • ???SplitMetaInfo[]?info?=?writeNewSplits(conf,?splits,?out);??
  • ?????
  • ???//?關閉輸出流??
  • ???out.close();??
  • ?????
  • ???//?調用writeJobSplitMetaInfo()方法,將分片元數據信息寫入分片元數據文件??
  • ???writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),???
  • ???????new?FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION),?splitVersion,??
  • ???????info);??
  • ?}??
  • ? ? ? ??createSplitFiles()方法的邏輯很清晰,大體如下:

    ?

    ? ? ? ? 1、調用createFile()方法,創建分片文件,并獲取文件系統數據輸出流FSDataOutputStream實例out,對應路徑為jobSubmitDir/job.split,jobSubmitDir為參數yarn.app.mapreduce.am.staging-dir指定的路徑/作業所屬用戶user/.staging/作業ID;

    ? ? ? ? 2、調用writeNewSplits()方法,將分片數據寫入分片文件,并得到分片元數據信息SplitMetaInfo數組info;

    ? ? ? ? 3、關閉輸出流out;

    ? ? ? ? 4、調用writeJobSplitMetaInfo()方法,將分片元數據信息寫入分片元數據文件。

    ? ? ? ? 我們先來看下createFile()方法,代碼如下:

    ?

    [java]?view plaincopy
  • ?private?static?FSDataOutputStream?createFile(FileSystem?fs,?Path?splitFile,???
  • ?????Configuration?job)??throws?IOException?{??
  • ????
  • //?調用HDFS文件系統FileSystem的create()方法,獲取文件系統數據輸出流FSDataOutputStream實例out,??
  • //?對應權限為JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--??
  • ???FSDataOutputStream?out?=?FileSystem.create(fs,?splitFile,???
  • ???????new?FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));??
  • ?????
  • ???//?獲取副本數replication,取參數mapreduce.client.submit.file.replication,參數未配置默認為10??
  • ???int?replication?=?job.getInt(Job.SUBMIT_REPLICATION,?10);??
  • ?????
  • ???//?通過文件系統FileSystem實例fs的setReplication()方法,設置splitFile的副本數位10??
  • ???fs.setReplication(splitFile,?(short)replication);??
  • ?????
  • ???//?調用writeSplitHeader()方法寫入分片頭信息??
  • ???writeSplitHeader(out);??
  • ?????
  • ???//?返回文件系統數據輸出流out??
  • ???return?out;??
  • ?}??
  • ? ? ? ? 首先,調用HDFS文件系統FileSystem的create()方法,獲取文件系統數據輸出流FSDataOutputStream實例out,對應權限為JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;

    ?

    ? ? ? ? 其次,獲取副本數replication,取參數mapreduce.client.submit.file.replication,參數未配置默認為10;

    ? ? ? ? 接著,通過文件系統FileSystem實例fs的setReplication()方法,設置splitFile的副本數位10;

    ? ? ? ? 然后,調用writeSplitHeader()方法寫入分片頭信息;

    ? ? ? ? 最后,返回文件系統數據輸出流out。

    ? ? ? ??writeSplitHeader()方法專門用于將分片頭部信息寫入分片文件,代碼如下:

    ?

    [java]?view plaincopy
  • ?private?static?void?writeSplitHeader(FSDataOutputStream?out)???
  • ?throws?IOException?{??
  • ????
  • //?文件系統數據輸出流out寫入byte[],內容為UTF-8格式的"SPL"??
  • ???out.write(SPLIT_FILE_HEADER);??
  • ???//?文件系統數據輸出流out寫入int,分片版本號,目前為1??
  • ???out.writeInt(splitVersion);??
  • ?}??
  • ? ? ? ? 很簡單,首先文件系統數據輸出流out寫入byte[],內容為UTF-8格式的"SPL",然后文件系統數據輸出流out寫入int,分片版本號,目前為1。

    ?

    ? ? ? ? 接下來,我們再看下writeNewSplits()方法,它將分片數據寫入分片文件,并得到分片元數據信息SplitMetaInfo數組info,代碼如下:

    ?

    [java]?view plaincopy
  • ?@SuppressWarnings("unchecked")??
  • ?private?static?<T?extends?InputSplit>???
  • ?SplitMetaInfo[]?writeNewSplits(Configuration?conf,???
  • ?????T[]?array,?FSDataOutputStream?out)??
  • ?throws?IOException,?InterruptedException?{??
  • ??
  • //?根據array的大小,構造同等大小的分片元數據信息SplitMetaInfo數組info,??
  • //?array其實是傳入的分片數組??
  • ???SplitMetaInfo[]?info?=?new?SplitMetaInfo[array.length];??
  • ???if?(array.length?!=?0)?{//?如果array中有數據??
  • ??????
  • ?????//?創建序列化工廠SerializationFactory實例factory??
  • ?????SerializationFactory?factory?=?new?SerializationFactory(conf);??
  • ?????int?i?=?0;??
  • ???????
  • ?????//?獲取最大的數據塊位置maxBlockLocations,取參數mapreduce.job.max.split.locations,參數未配置默認為10??
  • ?????int?maxBlockLocations?=?conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,??
  • ?????????MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);??
  • ???????
  • ?????//?通過輸出流out的getPos()方法獲取輸出流out的當前位置offset??
  • ?????long?offset?=?out.getPos();??
  • ???????
  • ?????//?遍歷數組array中每個元素split??
  • ???????
  • ?????for(T?split:?array)?{??
  • ????????
  • ????//?通過輸出流out的getPos()方法獲取輸出流out的當前位置prevCount??
  • ???????long?prevCount?=?out.getPos();??
  • ?????????
  • ???????//?往輸出流out中寫入String,內容為split對應的類名??
  • ???????Text.writeString(out,?split.getClass().getName());??
  • ?????????
  • ???????//?獲取序列化器Serializer實例serializer??
  • ???????Serializer<T>?serializer?=???
  • ?????????factory.getSerializer((Class<T>)?split.getClass());??
  • ?????????
  • ???????//?打開serializer,接入輸出流out??
  • ???????serializer.open(out);??
  • ?????????
  • ???????//?將split序列化到輸出流out??
  • ???????serializer.serialize(split);??
  • ?????????
  • ???????//?通過輸出流out的getPos()方法獲取輸出流out的當前位置currCount??
  • ???????long?currCount?=?out.getPos();??
  • ?????????
  • ???????//?通過split的getLocations()方法,獲取位置信息locations??
  • ???????String[]?locations?=?split.getLocations();??
  • ???????if?(locations.length?>?maxBlockLocations)?{??
  • ?????????LOG.warn("Max?block?location?exceeded?for?split:?"??
  • ?????????????+?split?+?"?splitsize:?"?+?locations.length?+??
  • ?????????????"?maxsize:?"?+?maxBlockLocations);??
  • ?????????locations?=?Arrays.copyOf(locations,?maxBlockLocations);??
  • ???????}??
  • ?????????
  • ???????//?構造split對應的元數據信息,并加入info指定位置,??
  • ???????//?offset為當前split在split文件中的起始位置,數據長度為split.getLength(),位置信息為locations??
  • ???????info[i++]?=???
  • ?????????new?JobSplit.SplitMetaInfo(???
  • ?????????????locations,?offset,??
  • ?????????????split.getLength());??
  • ?????????
  • ???????//?offset增加當前split已寫入數據大小??
  • ???????offset?+=?currCount?-?prevCount;??
  • ?????}??
  • ???}??
  • ?????
  • ???//?返回分片元數據信息SplitMetaInfo數組info??
  • ???return?info;??
  • ?}??
  • ? ? ? ??writeNewSplits()方法的邏輯比較清晰,大體如下:

    ?

    ? ? ? ? 1、根據array的大小,構造同等大小的分片元數據信息SplitMetaInfo數組info,array其實是傳入的分片數組;

    ? ? ? ? 2、如果array中有數據:

    ? ? ? ? ? ? ? 2.1、創建序列化工廠SerializationFactory實例factory;

    ? ? ? ? ? ? ? 2.2、獲取最大的數據塊位置maxBlockLocations,取參數mapreduce.job.max.split.locations,參數未配置默認為10;

    ? ? ? ? ? ? ? 2.3、通過輸出流out的getPos()方法獲取輸出流out的當前位置offset;

    ? ? ? ? ? ? ? 2.4、遍歷數組array中每個元素split:

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.1、通過輸出流out的getPos()方法獲取輸出流out的當前位置prevCount;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.2、往輸出流out中寫入String,內容為split對應的類名;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.3、獲取序列化器Serializer實例serializer;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.4、打開serializer,接入輸出流out;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.5、將split序列化到輸出流out;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.6、通過輸出流out的getPos()方法獲取輸出流out的當前位置currCount;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.7、通過split的getLocations()方法,獲取位置信息locations;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.8、確保位置信息locations的長度不能超過maxBlockLocations,超過則截斷;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.9、構造split對應的元數據信息,并加入info指定位置,offset為當前split在split文件中的起始位置,數據長度為split.getLength(),位置信息為locations;

    ? ? ? ? ? ? ? ? ? ? ? ?2.4.10、offset增加當前split已寫入數據大小;

    ? ? ? ? 3、返回分片元數據信息SplitMetaInfo數組info。

    ? ? ? ? 其中,序列化split對象時,我們以FileSplit為例來分析,其write()方法如下:

    ?

    [java]?view plaincopy
  • ?@Override??
  • ?public?void?write(DataOutput?out)?throws?IOException?{??
  • //?寫入文件路徑全名??
  • ???Text.writeString(out,?file.toString());??
  • ???//?寫入分片在文件中的起始位置??
  • ???out.writeLong(start);??
  • ???//?寫入分片在文件中的長度??
  • ???out.writeLong(length);??
  • ?}??
  • ? ? ? ? 比較簡單,分別寫入文件路徑全名、分片在文件中的起始位置、分片在文件中的長度三個信息。

    ?

    ? ? ? ? 綜上所述,分片文件job.split文件的內容為:

    ? ? ? ? 1、文件頭:"SPL"+int類型版本號1;

    ? ? ? ? 2、分片類信息:String類型split對應類名;

    ? ? ? ? 3、分片數據信息:String類型文件路徑全名+Long類型分片在文件中的起始位置+Long類型分片在文件中的長度。

    ? ? ? ? 而在最后,構造分片元數據信息時,產生的是JobSplit的靜態內部類SplitMetaInfo對象,包括分片位置信息locations、split在split文件中的起始位置offset、分片長度split.getLength()。

    ? ? ? ? 下面,我們再看下分片的元數據信息文件是如何產生的,讓我們來研究下writeJobSplitMetaInfo()方法,代碼如下:

    ?

    [java]?view plaincopy
  • ?//?寫入作業分片元數據信息??
  • ?private?static?void?writeJobSplitMetaInfo(FileSystem?fs,?Path?filename,???
  • ?????FsPermission?p,?int?splitMetaInfoVersion,???
  • ?????JobSplit.SplitMetaInfo[]?allSplitMetaInfo)???
  • ?throws?IOException?{??
  • ???//?write?the?splits?meta-info?to?a?file?for?the?job?tracker??
  • //?調用HDFS文件系統FileSystem的create()方法,生成分片元數據信息文件,并獲取文件系統數據輸出流FSDataOutputStream實例out,??
  • //?對應文件路徑為jobSubmitDir/job.splitmetainfo,jobSubmitDir為參數yarn.app.mapreduce.am.staging-dir指定的路徑/作業所屬用戶user/.staging/作業ID??
  • //?對應權限為JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--??
  • ???FSDataOutputStream?out?=???
  • ?????FileSystem.create(fs,?filename,?p);??
  • ?????
  • ???//?寫入分片元數據頭部信息UTF-8格式的字符串"META-SPL"的字節數組byte[]??
  • ???out.write(JobSplit.META_SPLIT_FILE_HEADER);??
  • ?????
  • ???//?寫入分片元數據版本號splitMetaInfoVersion,當前為1??
  • ???WritableUtils.writeVInt(out,?splitMetaInfoVersion);??
  • ???//?寫入分片元數據個數,為分片元數據信息SplitMetaInfo數組個數allSplitMetaInfo.length??
  • ???WritableUtils.writeVInt(out,?allSplitMetaInfo.length);??
  • ?????
  • ???//?遍歷分片元數據信息SplitMetaInfo數組allSplitMetaInfo中每個splitMetaInfo,挨個寫入輸出流??
  • ???for?(JobSplit.SplitMetaInfo?splitMetaInfo?:?allSplitMetaInfo)?{??
  • ?????splitMetaInfo.write(out);??
  • ???}??
  • ?????
  • ???//?關閉輸出流out??
  • ???out.close();??
  • ?}??
  • ? ? ? ??writeJobSplitMetaInfo()方法的主體邏輯也十分清晰,大體如下:

    ?

    ? ? ? ? 1、調用HDFS文件系統FileSystem的create()方法,生成分片元數據信息文件,并獲取文件系統數據輸出流FSDataOutputStream實例out,對應文件路徑為jobSubmitDir/job.splitmetainfo,jobSubmitDir為參數yarn.app.mapreduce.am.staging-dir指定的路徑/作業所屬用戶user/.staging/作業ID,對應權限為JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;

    ? ? ? ? 2、寫入分片元數據頭部信息UTF-8格式的字符串"META-SPL"的字節數組byte[];

    ? ? ? ? 3、寫入分片元數據版本號splitMetaInfoVersion,當前為1;

    ? ? ? ? 4、寫入分片元數據個數,為分片元數據信息SplitMetaInfo數組個數allSplitMetaInfo.length;

    ? ? ? ? 5、遍歷分片元數據信息SplitMetaInfo數組allSplitMetaInfo中每個splitMetaInfo,挨個寫入輸出流;

    ? ? ? ? 6、關閉輸出流out。
    ? ? ? ? 我們看下如何序列化JobSplit.SplitMetaInfo,將其寫入文件,JobSplit.SplitMetaInfo的write()如下:

    ?

    [java]?view plaincopy
  • public?void?write(DataOutput?out)?throws?IOException?{??
  • ????
  • ??//?將分片位置個數寫入分片元數據信息文件??
  • ??WritableUtils.writeVInt(out,?locations.length);??
  • ??//?遍歷位置信息,寫入分片元數據信息文件??
  • ??for?(int?i?=?0;?i?<?locations.length;?i++)?{??
  • ????Text.writeString(out,?locations[i]);??
  • ??}??
  • ??//?寫入分片元數據信息的起始位置??
  • ??WritableUtils.writeVLong(out,?startOffset);??
  • ??//?寫入分片大小??
  • ??WritableUtils.writeVLong(out,?inputDataLength);??
  • }??
  • ? ? ? ? 每個分片的元數據信息,包括分片位置個數、分片文件位置、分片元數據信息的起始位置、分片大小等內容。

    ?

    ? ? ? ? 總結

    ? ? ? ??JobSplitWriter被作業客戶端用于寫分片相關文件,包括分片數據文件job.split和分片元數據信息文件job.splitmetainfo。分片數據文件job.split存儲的主要是每個分片對應的HDFS文件路徑,和其在HDFS文件中的起始位置、長度等信息,而分片元數據信息文件job.splitmetainfo存儲的則是每個分片在分片數據文件job.split中的起始位置、分片大小等信息。

    ? ? ? ??job.split文件內容:文件頭 + 分片 + 分片 + ... + 分片

    ? ? ? ? 文件頭:"SPL" + 版本號1

    ? ? ? ? 分片:分片類 + 分片數據,分片類=String類型split對應類名,分片數據=String類型HDFS文件路徑全名+Long類型分片在HDFS文件中的起始位置+Long類型分片在HDFS文件中的長度

    ? ? ? ??job.splitmetainfo文件內容:文件頭 + 分片元數據個數 +?分片元數據 +?分片元數據 + ... +?分片元數據

    ? ? ? ??文件頭:"META-SPL" + 版本號1

    ? ? ? ??分片元數據個數:分片元數據的個數

    ? ? ? ? 分片元數據:分片位置個數+分片位置+在分片文件job.split中的起始位置+分片大小

    轉載于:https://www.cnblogs.com/jirimutu01/p/5556356.html

    《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

    總結

    以上是生活随笔為你收集整理的MapReduce源码分析之JobSplitWriter的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。