日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

MapReduce Java API-使用Partitioner实现输出到多个文件

發布時間:2025/3/19 java 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce Java API-使用Partitioner实现输出到多个文件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

場景

MapReduce Java API-多輸入路徑方式:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119453275

在上面的基礎上,怎樣用Partitioner的方式實現將學生的成績數據

分段輸出到不同的文件。

例如分為三個成績段:

小于60分

大于等于60分小于等于80分

大于80分

Partitioner

1、Partion發生在Map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,

每個分區映射到一個Reducer。每個分區內又調用job.setSortComparatorClass設置的key

比較函數類排序。

2、 Partitioner的作用是對Mapper產生的中間結果進行分片,以便將同一個分組的數據交給同一個Reducer處理,

它直接影響Reducer階段的復雜均衡。

3、Partitioner創建流程

① 先分析一下具體的業務邏輯,確定大概有多少個分區
② 首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類
③ 重寫public int getPartition這個方法,根據具體邏輯,讀數據庫或者配置返回相同的數字
④ 在main方法中設置Partioner的類,job.setPartitionerClass(DataPartitioner.class);
⑤ 設置Reducer的數量,job.setNumReduceTasks(6);

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。

實現

1、首先新建數據集score.txt,用來進行分段輸出。

1、自定義分區函數類

通過成績判斷,用return的值為0、1、2代表三個分區。

package com.badao.muloutput;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;public class StudentPartitioner extends Partitioner<IntWritable, Text> {@Overridepublic int getPartition(IntWritable intWritable, Text text, int i) {//學生成績int scoreInt = intWritable.get();//默認指定分區0if(i==0){return 0;}if(scoreInt < 60){return 0;}else if(scoreInt<=80){return 1;}else{return 2;}} }

3、定義Mapper類

package com.badao.muloutput;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException;public class MulOutputMapper extends Mapper<LongWritable,Text,IntWritable,Text> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] studentArr = value.toString().split(" ");if(StringUtils.isNotBlank(studentArr[1])){IntWritable pKey = new IntWritable(Integer.parseInt(studentArr[1].trim()));context.write(pKey,value);}} }

4、定義Reduce類

package com.badao.muloutput;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MulOutputReducer extends Reducer<IntWritable,Text,NullWritable,Text> {@Overridepublic void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for(Text value:values){context.write(NullWritable.get(),value);}} }

5、新建Job類

package com.badao.muloutput;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;import java.io.IOException;public class MulOutputJob {public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {wordCountLocal();}public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException{Configuration conf = new Configuration();System.setProperty("HADOOP_USER_NAME","root");conf.set("fs.defaultFS","hdfs://192.168.148.128:9000");//實例化一個作業,word count是作業的名字Job job = Job.getInstance(conf, "muloutput");//指定通過哪個類找到對應的jar包job.setJarByClass(MulOutputJob.class);//為job設置Mapper類job.setMapperClass(MulOutputMapper.class);//為job設置reduce類job.setReducerClass(MulOutputReducer.class);//設置Partitioner類job.setPartitionerClass(StudentPartitioner.class);//設置reduce的個數為3job.setNumReduceTasks(3);//mapper輸出格式job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);//reduce輸出格式job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(Text.class);//為job設置輸入路徑,輸入路徑是存在的文件夾/文件FileInputFormat.addInputPath(job,new Path("/score.txt"));//為job設置輸出路徑FileOutputFormat.setOutputPath(job,new Path("/muloutput8"));job.waitForCompletion(true);}}

6、將數據集上傳到HDFS指定的目錄下,運行job查看輸出結果

?

注意事項

這里要注意坑點,因為這里在分解數據時是按照一個空格來拆分的,所以數據集中

每個key和value之間只能有一個空格。

并且不要再數據集的最后面添加多余的換行,不然會導致不能正常輸出數據。

比如這里查看數據時發現多了個換行

?

然后找不到不出統計數據的原因,就在代碼中將每步的結果輸出下

如果是上面多了換行的話,下面輸出key-value時就會有異常數據,都跟上面這樣是正常的。

總結

以上是生活随笔為你收集整理的MapReduce Java API-使用Partitioner实现输出到多个文件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。