MapReduce Java API-多输入路径方式
生活随笔
收集整理的這篇文章主要介紹了
MapReduce Java API-多输入路径方式
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
場景
MapReduce Java API實例-統計單詞出現頻率:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119410169
在上面實現統計單次出現的頻率的基礎上。
數據集只是單路徑,如果有多個數據集文件,即有多個txt文件,要怎么實現。
多文件輸入采用MultipleInputs.addInputPath方法即可完成。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。
實現
map和reduce的代碼基本和上面的一致
1、map類
package com.badao.multinput;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException; import java.util.StringTokenizer;public class MultInputMapper extends Mapper<LongWritable,Text,Text,IntWritable> {//1、編寫map函數,通過繼承Mapper類實現里面的map函數//?? Mapper類當中的第一個函數是Object,也可以寫成Long//?? 第一個參數對應的值是行偏移量//2、第二個參數類型通常是Text類型,Text是Hadoop實現的String 類型的可寫類型//?? 第二個參數對應的值是每行字符串//3、第三個參數表示的是輸出key的數據類型//4、第四個參數表示的是輸出value的數據類型,IntWriable 是Hadoop實現的int類型的可寫數據類型public final static IntWritable one = new IntWritable(1);public Text word = new Text();//key 是行偏移量//value是每行字符串@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer stringTokenizer = new StringTokenizer(value.toString());while (stringTokenizer.hasMoreTokens()){//stringTokenizer.nextToken()是字符串類型,使用set函數完成字符串到Text數據類型的轉換word.set(stringTokenizer.nextToken());//通過write函數寫入到本地文件context.write(word,one);}} }2、reduce類
package com.badao.multinput;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;//第一個參數類型是輸入值key的數據類型,map中間輸出key的數據類型 //第二個參數類型是輸入值value的數據類型,map中間輸出value的數據類型 //第三個參數類型是輸出值key的數據類型,他的數據類型要跟job.setOutputKeyClass(Text.class) 保持一致 //第四個參數類型是輸出值value的數據類型,它的數據類型要跟job.setOutputValueClass(IntWriable.class) 保持一致public class MultInputReducer extends Reducer<Text, IntWritable,Text,IntWritable> {public IntWritable result = new IntWritable();//key就是單詞? values是單詞出現頻率列表@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for(IntWritable val:values){//get就是取出IntWriable的值sum += val.get();}result.set(sum);context.write(key,result);} }3、job類
job這里不同,單路徑時
FileInputFormat.addInputPath(job,new Path("D:\\words.txt"));多路徑時
??????? Path path1 = new Path("D:\\words.txt");Path path2 = new Path("D:\\words2.txt");MultipleInputs.addInputPath(job,path1, TextInputFormat.class,MultInputMapper.class);MultipleInputs.addInputPath(job,path2, TextInputFormat.class,MultInputMapper.class);完整代碼
package com.badao.multinput;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;import java.io.IOException;public class MultInputJob {public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {wordCountLocal();}public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException{Configuration conf = new Configuration();//實例化一個作業,word count是作業的名字Job job = Job.getInstance(conf, "multinputwordcount");//指定通過哪個類找到對應的jar包job.setJarByClass(MultInputJob.class);//為job設置Mapper類job.setMapperClass(MultInputMapper.class);job.setCombinerClass(IntSumReducer.class);//為job設置reduce類job.setReducerClass(MultInputReducer.class);//為job的輸出數據設置key類job.setOutputKeyClass(Text.class);//為job輸出設置value類job.setOutputValueClass(IntWritable.class);//多個輸入路徑Path path1 = new Path("D:\\words.txt");Path path2 = new Path("D:\\words2.txt");MultipleInputs.addInputPath(job,path1, TextInputFormat.class,MultInputMapper.class);MultipleInputs.addInputPath(job,path2, TextInputFormat.class,MultInputMapper.class);//為job設置輸出路徑FileOutputFormat.setOutputPath(job,new Path("D:\\mulinputOut"));job.waitForCompletion(true);}}運行job類查看效果
?
總結
以上是生活随笔為你收集整理的MapReduce Java API-多输入路径方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Three.js中实现场景雾化效果
- 下一篇: MapReduce Java API-使