日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce进阶:多路径输入输出

發(fā)布時間:2025/3/20 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce进阶:多路径输入输出 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

當我們得意于 MapReduce 從一個數據輸入目錄,把數據經過程序處理之后輸出到另一個目錄時。可能你正在錯過一些更好的方案,因為 MapReduce 是支持多路徑的輸入與輸出的。比如,你一個項目中的多個 Job 產生了多個輸出路徑,后面又需要另一個 Job 去處理這些不路徑下的數據。你要怎么辦?暫停程序后,手動處理?看完本文,我想你會給你的這種想法來上一記耳光。(說笑了,別當真)


版權說明

著作權歸作者所有。
商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。
本文作者:Q-WHai
發(fā)表日期: 2016年6月18日
本文鏈接:https://qwhai.blog.csdn.net/article/details/51707283
來源:CSDN
更多內容:分類 >> 大數據之 Hadoop


多路徑輸入

寫了這么多的 MapReudce 的程序,我想你一定已經了解了 MapReduce 是如何將輸入的數據加載到程序中進行計算的了。一般情況下,我們是通過 FileInputFormat 類的 addInputPath 方法。看到這個 add 關鍵字,就可能產生很多聯(lián)想,事實上這種聯(lián)想是正確的。我們的確可以使用多個目錄共同輸入數據,并且還不止一種方式。

方式一

可以多添加幾個輸入目錄,只要按照之前添加一個目錄的方式,繼續(xù)添加就 ok 了。就像下面這樣:

FileInputFormat.addInputPath(job, new Path(inputPath_1)); FileInputFormat.addInputPath(job, new Path(inputPath_2)); FileInputFormat.addInputPath(job, new Path(inputPath_3));

這里如果你是一個重視代碼細節(jié)的人,你肯定會重構這段代碼:

private void setInputPathMothed1(Job job) throws IOException {FileInputFormat.addInputPath(job, new Path(inputPath_1));FileInputFormat.addInputPath(job, new Path(inputPath_2));FileInputFormat.addInputPath(job, new Path(inputPath_3)); }

方式二

如果你嫌上面的代碼太多了,你還有另外一種選擇:

FileInputFormat.addInputPaths(job, String.join(",", inputPath_1, inputPath_2, inputPath_3));

通過上面的代碼,你可以一次性全部加載這些不同的目錄,很方便。
當我們打開 FileInputFormat.addInputPaths() 的源碼,看到 addInputPaths() 的代碼:

/*** Add the given comma separated paths to the list of inputs for* the map-reduce job.* * @param job The job to modify* @param commaSeparatedPaths Comma separated paths to be added to* the list of inputs for the map-reduce job.*/ public static void addInputPaths(Job job, String commaSeparatedPaths) throws IOException {for (String str : getPathStrings(commaSeparatedPaths)) {addInputPath(job, new Path(str));} }

這里看似方便的 FileInputFormat.addInputPaths(),其實只是 hadoop 給我們這些懶惰的開發(fā)者的進一層封裝罷了。

方式三:

這種方式有一些特殊,也是我推薦你去使用的一種方式。你可以先看代碼感受一下。

private void setInputPathMothed3(Job job) throws IOException {MultipleInputs.addInputPath(job, new Path(inputPath_1), TextInputFormat.class, CoreComputer.CoreMapper.class);MultipleInputs.addInputPath(job, new Path(inputPath_2), TextInputFormat.class, CoreComputer.CoreMapper.class);MultipleInputs.addInputPath(job, new Path(inputPath_3), TextInputFormat.class, CoreComputer.CoreMapper.class); }

上面的代碼中使用一個新的類 MultipleInputs。從類的命名上就可以看到這是一個專門處理多路徑輸入的問題的。在上面的代碼中,我們看到 MultipleInputs.addInputPath() 多了兩個不同的參數。進入源碼可以看到他們分別是輸入數據的格式,以及數據處理的 Mapper。
其實這兩個參數是可以讓你通過更加靈活的方式來處理數據。inputFormatClass 是可以讓你輸入不同類型的數據,mapperClass 是可以讓你使用不同的 Mapper 來處理不同的數據。正因為這種可選擇性,你的程序就更加的靈活了。不過上面的代碼中,我并沒有采用不同的 Mapper,如果你感興趣,可以嘗試一下。

小結

看到這里,你可能會有疑惑,難道在 Mapper 和 Reducer 里面就不用設置了么?是的,我們不需要調整 Mapper 和 Reducer 的核心代碼就可以實現多路徑輸入。


多路徑輸出

核心代碼修改

多路徑的輸出沒有多路徑輸入那么多可選擇的方案,且在多路徑輸出中,需要編寫的代碼量也比多路徑輸入要多一些。其中還包括了對 Reducer 的修改。詳細的參考下面的代碼。

public static class CoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private MultipleOutputs<Text, IntWritable> multipleOutputs = null;@Overrideprotected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException {multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);}@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException {( ... 省略無關的 N 行 ... )multipleOutputs.write(splitKeys[1], new Text(splitKeys[0]), count);}@Overrideprotected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)throws IOException, InterruptedException {multipleOutputs.close();} }

上面的代碼中,setup() 與 cleanup() 模塊只是對 MultipleOutputs 的初始化與關閉操作,需要說明的地方不多。主要有以下兩點:

  • 將 MultipleOutputs 的初始化放在 setup() 中,因為在 setup() 只會被調用一次,如果放在 reduce() 中,則 MultipleOutputs 可能被 reduce 方法初始化 N 次,而你全然不知;
  • 你需要在 cleanup() 方法中關閉 MultipleOutputs。通過源碼我們了解到,關閉 MultipleOutputs,也就是關閉 RecordWriter,并且是一堆 RecordWriter,因為這里會有很多 reduce 被調用。
  • /*** Closes all the opened outputs.* * This should be called from cleanup method of map/reduce task.* If overridden subclasses must invoke <code>super.close()</code> at the* end of their <code>close()</code>* */ @SuppressWarnings("unchecked") public void close() throws IOException, InterruptedException { for (RecordWriter writer : recordWriters.values()) {writer.close(context); } }

    還有一個是你需要重點關注的,那就是 reduce() 方法里的 multipleOutputs.write(…)。你需要把以前的 context.write(…) 替換成現在的這個。

    調用代碼修改

    客戶端調用方面,只需要在代碼

    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    之前添加多路徑的設置,即可。如下:

    public class ComputerClient {public static void main(String[] args) throws Exception {( ... 省略無關的 N 行 ... )}private void execute() throws Exception {runFirstJob();}private int runFirstJob() throws Exception {( ... 省略無關的 N 行 ... ) addNamedOutput(job);FileOutputFormat.setOutputPath(job, new Path(outputPath));return job.waitForCompletion(true) ? 0 : 1;}private void addNamedOutput(Job job) {addNamedOutput(job, "android");addNamedOutput(job, "hadoop");addNamedOutput(job, "ios");addNamedOutput(job, "java");addNamedOutput(job, "python");}private void addNamedOutput(Job job, String pathName) {MultipleOutputs.addNamedOutput(job, pathName, TextOutputFormat.class, Text.class, IntWritable.class);} }

    效果展示

    通過上面的學習并編寫正確的程序,這樣就可以獲得如下的效果。


    工程源碼下載

    • http://download.csdn.net/detail/u013761665/9553523

    征集

    如果你也需要使用ProcessOn這款在線繪圖工具,可以使用如下邀請鏈接進行注冊:
    https://www.processon.com/i/56205c2ee4b0f6ed10838a6d

    總結

    以上是生活随笔為你收集整理的MapReduce进阶:多路径输入输出的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。