日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

mapreduce之partition分区

發(fā)布時間:2023/12/15 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mapreduce之partition分区 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

聽了超哥的一席課后逐漸明白了partition,記錄一下自己的理解!(thanks 超哥)

package partition;import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** @ClassName: FlowCount2 * @Description: TODO(這里用一句話描述這個類的作用) * @author zhangweixiang* @date 2014年3月6日 下午3:27:56*/ /*** 分區(qū)的例子必須打成jar運行* 用處: 1.根據(jù)業(yè)務需要,產(chǎn)生多個輸出文件* 2.多個reduce任務在運行,提高整體job的運行效率*/ public class FlowCount2 {public static final String INPUT_PATH = "hdfs://192.168.0.9:9000/wlan2";public static final String OUT_PATH = "hdfs://192.168.0.9:9000/myout";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = new Job(conf, FlowCount2.class.getSimpleName());//指定打包的jarjob.setJarByClass(FlowCount2.class);// 1.1指定輸入文件的路徑FileInputFormat.addInputPath(job, new Path(INPUT_PATH));// 指定輸入信息的格式化類job.setInputFormatClass(TextInputFormat.class);// 1.2指定自定義map類job.setMapperClass(MyMapper.class);// 設置map輸出類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowWritable.class);// 1.3指定分區(qū)job.setPartitionerClass(MyPartition.class);// 設置reduce的任務個數(shù),由于map輸出后建立了兩個分區(qū),所以應該設置兩個reduce任務輸出到不同的文件(一個分區(qū)對應一個reduce任務)job.setNumReduceTasks(2);// 1.4排序,分組// 1.5規(guī)約// 2.2指定自定義的reduce類job.setReducerClass(MyReduce.class);// 設置輸出類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowWritable.class);// 設置輸出格式化類job.setOutputFormatClass(TextOutputFormat.class);// 如果輸出文件路徑存在則刪除FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH),new Configuration());Path path = new Path(OUT_PATH);if (fileSystem.exists(path)) {fileSystem.delete(path, true);}// 2.3指定輸出路徑FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// 提交任務job.waitForCompletion(true);}static class MyMapper extendsMapper<LongWritable, Text, Text, FlowWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 分割行String[] split = value.toString().split("\t");// 獲取用戶電話號碼String mobile = "";long upPackNum = 0l;long downPackNum = 0l;long upPayLoad = 0l;long downPayLoad = 0l;// 符合規(guī)范的電話號碼if (!("".equals(split[2]))) {mobile = split[2];// 獲取流量信息if (!("".equals(split[21]))) {upPackNum = Long.parseLong(split[21]);}if (!("".equals(split[22]))) {downPackNum = Long.parseLong(split[22]);}if (!("".equals(split[23]))) {upPayLoad = Long.parseLong(split[23]);}if (!("".equals(split[24]))) {downPayLoad = Long.parseLong(split[24]);}FlowWritable flowWritable = new FlowWritable(upPackNum,downPackNum, upPayLoad, downPayLoad);context.write(new Text(mobile), flowWritable);}}}static class MyReduce extendsReducer<Text, FlowWritable, Text, FlowWritable> {@Overrideprotected void reduce(Text k2, Iterable<FlowWritable> v2s,Context context) throws IOException, InterruptedException {long upPackNum = 0l;long downPackNum = 0l;long upPayLoad = 0l;long downPayLoad = 0l;for (FlowWritable flowWritable : v2s) {upPackNum += flowWritable.upPackNum;downPackNum += flowWritable.downPackNum;upPayLoad += flowWritable.upPayLoad;downPayLoad += flowWritable.downPayLoad;}FlowWritable flowWritable = new FlowWritable(upPackNum,downPackNum, upPayLoad, downPayLoad);context.write(k2, flowWritable);}}}


package partition;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.Writable;/*** @ClassName: flowWritable* @Description: 自定義類型實現(xiàn)Writable接口,包含四個參數(shù)(upPackNum 上行包, downPackNum 下行包,* upPayLoad 發(fā)送流量,downPayLoad 下載流量)* @author zhangweixiang* @date 2014年3月5日 上午11:37:10*/ public class FlowWritable implements Writable {public long upPackNum;public long downPackNum;public long upPayLoad;public long downPayLoad;public FlowWritable() {// TODO Auto-generated constructor stub}public FlowWritable(long upPackNum, long downPackNum, long upPayLoad,long downPayLoad) {this.upPackNum = upPackNum;this.downPackNum = downPackNum;this.upPayLoad = upPayLoad;this.downPayLoad = downPayLoad;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upPackNum);out.writeLong(downPackNum);out.writeLong(upPackNum);out.writeLong(downPayLoad);}@Overridepublic void readFields(DataInput in) throws IOException {this.upPackNum = in.readLong();this.downPackNum = in.readLong();this.upPayLoad = in.readLong();this.downPayLoad = in.readLong();}/** (非 Javadoc)* * * @return* * @see java.lang.Object#toString()*/@Overridepublic String toString() {return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t"+ downPayLoad;}}
package partition;import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /*** @ClassName: MyPartition * @Description: 根據(jù)電話號碼分區(qū),正規(guī)號碼分區(qū)代號為0,非正規(guī)號碼分區(qū)為1(在此建立了兩個分區(qū),即會產(chǎn)生兩個reduce任務輸出到不同的文件0和1)* @param K k2(map輸出的鍵), V v2(map輸出的值)* @author zhangweixiang* @date 2014年3月6日 下午3:02:29*/ public class MyPartition extends HashPartitioner<Text,FlowWritable>{@Overridepublic int getPartition(Text key, FlowWritable value, int numReduceTasks) {int p=0;if(key.toString().length()!=11){p=1;}return p;} }
注:必須要達成jar包上傳到linux下執(zhí)行(我開始沒有打成jar包直接在eclipse下執(zhí)行拋了異常)

執(zhí)行完成后會產(chǎn)生兩個文件(part-r-00000和part-r-00001)分別記錄不同條件的信息。

eclipse直接運行拋的異常:

14/03/06 15:41:13 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Illegal partition for 10.80.203.79 (1)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at partition.FlowCount2$MyMapper.map(FlowCount2.java:120)
at partition.FlowCount2$MyMapper.map(FlowCount2.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
14/03/06 15:41:14 INFO mapred.JobClient: ?map 0% reduce 0%
14/03/06 15:41:14 INFO mapred.JobClient: Job complete: job_local_0001
14/03/06 15:41:14 INFO mapred.JobClient: Counters: 0



記錄超哥的總結:

分區(qū)的例子必須打成jar運行* 用處: *1.根據(jù)業(yè)務需要,產(chǎn)生多個輸出文件* 2.多個reduce任務在運行,提高整體job的運行效率

總結

以上是生活随笔為你收集整理的mapreduce之partition分区的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。