Hadoop MapReduce V2——找出每个月气温最高的2天
?
項(xiàng)目目錄
?
MyTQ
| package?com.henu.tq; ? import?java.io.IOException; ? import?org.apache.hadoop.conf.Configuration; import?org.apache.hadoop.fs.Path; import?org.apache.hadoop.io.IntWritable; import?org.apache.hadoop.mapreduce.Job; import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; ? public?class?MyTQ { public?static?void?main(String[] args) throws?IOException, ClassNotFoundException, InterruptedException { //1.配置 Configuration conf?= new?Configuration(); Job job?= Job.getInstance(conf); ? job.setJarByClass(MyTQ.class); job.setJobName("tq"); ? //2.設(shè)置輸入輸出路徑 Path inPath?= new?Path("/tq/input"); FileInputFormat.addInputPath(job, inPath); Path outPath?= new?Path("/tq/output"); if?(outPath.getFileSystem(conf).exists(outPath)) { outPath.getFileSystem(conf).delete(outPath,true); } FileOutputFormat.setOutputPath(job, outPath); ? //3.設(shè)置Mapper job.setMapperClass(Tmapper.class); job.setMapOutputKeyClass(Tq.class); job.setMapOutputValueClass(IntWritable.class); ? //4.自定義排序比較器 job.setSortComparatorClass(TSortComparator.class); //5.自定義分區(qū)器 job.setPartitionerClass(TPartioner.class); //6.自定義組排序 job.setGroupingComparatorClass(TGroupComparator.class); ? //7.設(shè)置reducetask數(shù)量 job.setNumReduceTasks(3); //8.設(shè)置reducer job.setReducerClass(Treducer.class); ? //9. job.waitForCompletion(true); ? } } ? |
TGroupComparator
| package?com.henu.tq; ? import?org.apache.hadoop.io.WritableComparable; import?org.apache.hadoop.io.WritableComparator; ? public?class?TGroupComparator extends?WritableComparator{ ? Tq t1?= null; Tq t2?= null; ? ? ? public?TGroupComparator() { super(Tq.class,true); } ? ? ? @Override public?int?compare(WritableComparable?a, WritableComparable?b) { t1?= (Tq) a; t2?= (Tq) b; ? int?c1?= Integer.compare(t1.getYear(), t2.getYear()); if?(c1?== 0) { return?Integer.compare(t1.getMonth(), t2.getMonth()); } return?c1; } ? } ? |
?
Tmapper
| package?com.henu.tq; ? import?java.io.IOException; import?java.text.ParseException; import?java.text.SimpleDateFormat; import?java.util.Calendar; import?java.util.Date; ? import?org.apache.hadoop.io.IntWritable; import?org.apache.hadoop.io.LongWritable; import?org.apache.hadoop.io.Text; import?org.apache.hadoop.mapreduce.Mapper; import?org.apache.hadoop.util.StringUtils; ? ? public?class?Tmapper extends?Mapper<LongWritable, Text, Tq, IntWritable>{ ? Tq tkey?= new?Tq(); IntWritable tval?= new?IntWritable(); @Override protected?void?map(LongWritable key, Text value, Mapper<LongWritable, Text, Tq, IntWritable>.Context context) throws?IOException, InterruptedException { //獲得時(shí)間 溫度數(shù)組 String[] words?= StringUtils.split(value.toString(),'\t'); String pattern?= "yyyy-MM-dd"; SimpleDateFormat simpleDateFormat?= new?SimpleDateFormat(pattern?); try?{ //1950-01-01 11:21:02 32c //處理日期 Date date?= simpleDateFormat.parse(words[0]); Calendar cal?= Calendar.getInstance(); cal.setTime(date); ? tkey.setYear(cal.get(Calendar.YEAR)); tkey.setMonth(cal.get(Calendar.MONTH)+1); tkey.setDay(cal.get(Calendar.DAY_OF_MONTH)); ? //處理溫度 String wdString?= words[1].substring(0,words[1].lastIndexOf("c")); int?wd?= Integer.parseInt(wdString); tkey.setWd(wd); tval.set(wd); ? context.write(tkey, tval); } catch?(ParseException e) { e.printStackTrace(); } } } ? |
TPartioner
| package?com.henu.tq; ? import?org.apache.hadoop.io.IntWritable; import?org.apache.hadoop.mapreduce.Partitioner; ? public?class?TPartioner extends?Partitioner<Tq, IntWritable>{ ? @Override public?int?getPartition(Tq arg0, IntWritable arg1, int?arg2) { return?arg0.getYear() % arg2; } ? } ? |
Tq
| package?com.henu.tq; ? import?java.io.DataInput; import?java.io.DataOutput; import?java.io.IOException; ? import?org.apache.hadoop.io.WritableComparable; ? public?class?Tq implements?WritableComparable<Tq>{ ? private?int?year; private?int?month; private?int?day; private?int?wd; ? public?int?getYear() { return?year; } ? public?void?setYear(int?year) { this.year?= year; } ? public?int?getMonth() { return?month; } ? public?void?setMonth(int?month) { this.month?= month; } ? public?int?getDay() { return?day; } ? public?void?setDay(int?day) { this.day?= day; } ? public?int?getWd() { return?wd; } ? public?void?setWd(int?wd) { this.wd?= wd; } ? ? ? @Override public?String toString() { return?year?+ "-"?+ month?+ "-"?+ day; } ? @Override public?void?readFields(DataInput arg0) throws?IOException { this.setYear(arg0.readInt()); this.setMonth(arg0.readInt()); this.setDay(arg0.readInt()); this.setWd(arg0.readInt()); } ? @Override public?void?write(DataOutput arg0) throws?IOException { arg0.writeInt(this.getYear()); arg0.writeInt(this.getMonth()); arg0.writeInt(this.getDay()); arg0.writeInt(this.getWd()); } ? @Override public?int?compareTo(Tq o) { int?c1?= Integer.compare(this.getYear(), o.getYear()); if?(c1?== 0) { int?c2?= Integer.compare(this.getMonth(), o.getMonth()); if?(c2?== 0) { return?Integer.compare(this.getDay(), o.getDay()); } return?c2; } return?c1; } ? } ? |
Treducer
| package?com.henu.tq; ? import?java.io.IOException; ? import?org.apache.hadoop.crypto.key.TestCachingKeyProvider; import?org.apache.hadoop.io.IntWritable; import?org.apache.hadoop.io.Text; import?org.apache.hadoop.mapreduce.Reducer; /** ?* @author?George ?* 1950-01-01 32 ?*/ public?class?Treducer extends?Reducer<Tq, IntWritable, Text, IntWritable>{ Text tkey?= new?Text(); IntWritable tval?= new?IntWritable(); @Override protected?void?reduce(Tq key, Iterable<IntWritable> vals,Context context) throws?IOException, InterruptedException { int?flag?= 0; int?day?= 0; for(IntWritable val?: vals){ if?(flag?== 0) { tkey.set(key.toString()); tval.set(val.get()); context.write(tkey, tval); flag?++; day?= key.getDay(); } if?(flag?!= 0 && day?!= key.getDay()) { tkey.set(key.toString()); tval.set(val.get()); context.write(tkey, tval); // break; return; } ? ? } ? } ? } ? |
TSortComparator
| package?com.henu.tq; ? import?org.apache.hadoop.io.WritableComparable; import?org.apache.hadoop.io.WritableComparator; ? /** ?* @author?George ?* 實(shí)現(xiàn)天氣 年月正序,溫度倒序 ?*/ public?class?TSortComparator extends?WritableComparator{ ? Tq t1?= null; Tq t2?= null; ? ? ? public?TSortComparator() { super(Tq.class,true); } ? ? ? @Override public?int?compare(WritableComparable?a, WritableComparable?b) { t1?= (Tq) a; t2?= (Tq) b; ? int?c1?= Integer.compare(t1.getYear(), t2.getYear()); if?(c1?== 0) { int?c2?= Integer.compare(t1.getMonth(), t2.getMonth()); if?(c2?== 0) { return?-Integer.compare(t1.getWd(), t2.getWd()); } ? return?c2; } return?c1; } ? } ? |
?
?
tq.txt
| 1949-10-01 14:21:02 34c 1949-10-01 19:21:02 38c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1950-10-02 12:21:02 41c 1950-10-03 12:21:02 27c 1951-07-01 12:21:02 45c 1951-07-02 12:21:02 46c 1951-07-03 12:21:03 47c |
?
?
?
將項(xiàng)目打包
?
?
?
將文件傳送到node02虛擬機(jī)上,然后
?
?
?
如果報(bào)出如上錯(cuò)誤,別慌,只是版本問(wèn)題:
這個(gè)問(wèn)題確實(shí)是由較高版本的JDK編譯的java class文件試圖在較低版本的JVM上運(yùn)行產(chǎn)生的錯(cuò)誤。
????1、解決措施就是保證jvm(java命令)和jdk(javac命令)版本一致。如果是linux版本,則在命令行中分別輸入java -version和javac -version命令來(lái)查看版本是否一致。這里假設(shè)都是1.7版本。
????2、如果都一致,但還是解決不了問(wèn)題,那么你肯定不是直接在命令行中用javac來(lái)編譯的,而是用類似于eclipse、netbeans這樣的編譯器來(lái)編譯的。因?yàn)楹芏嗑幾g器都自帶javac,而不是采用操作系統(tǒng)中的編譯器。如果你的編譯器是eclipse的話,那么需要在項(xiàng)目的屬性里設(shè)置jdk版本,方法是右擊項(xiàng)目-->properties-->java compiler --> Enable project specific settings -->將compiler compliance level設(shè)置為1.7,也就是與jvm一致的版本(在命令行中java -version所顯示的版本)。
????綜上,如果你是用編譯器來(lái)編譯的話,請(qǐng)首先確保編譯器自帶的jdk版本是否和操作系統(tǒng)中的java版本一致。
解決后再次運(yùn)行:
運(yùn)行成功,看eclipse
?
?
總結(jié)
以上是生活随笔為你收集整理的Hadoop MapReduce V2——找出每个月气温最高的2天的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: spring——使用注解声明式事务整合j
- 下一篇: spring整合mybatis基于xml