日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

如何使用Hadoop的JobControl

發布時間:2025/7/14 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何使用Hadoop的JobControl 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉自:http://qindongliang.iteye.com/blog/2064281


如果MapReduce中需要用到多個job,而且多個job之間需要設置一些依賴關系,比如Job3需要依賴于Job1和Job2,這就要用到JobControl,具體的用法如下:

  • ?public?static?int?handleJobChain(Job?job1?,Job?job2,Job job3, String?chainName)?throws?IOException{??
  • ????????ControlledJob?controlledJob1?=?new?ControlledJob(job1.getConfiguration());??
  • ????????controlledJob1.setJob(job1);??
  • ??????????
  • ????????ControlledJob?controlledJob2?=?new?ControlledJob(job2.getConfiguration());??
  • ????????controlledJob2.setJob(job2);??
  • ? ? ? ?
  • ? ? ? ??ControlledJob?controlledJob3?=?new?ControlledJob(job2.getConfiguration());?
  • ? ? ? ??controlledJob3.setJob(job3);
  • ????????controlledJob3.addDependingJob(controlledJob1);
  • ? ? ? ? controlledJob3.addDependingJob(controlledJob2); ???
  • ??????????
  • ????????JobControl?jc?=?new?JobControl(chainName);??
  • ????????jc.addJob(controlledJob1);??
  • ????????jc.addJob(controlledJob2);?
  • ? ? ? ? jc.addJob(controlledJob2);?
  • ?
  • ????????Thread?jcThread?=?new?Thread(jc);??
  • ????????jcThread.start();??
  • ????????while(true){??
  • ????????????if(jc.allFinished()){??
  • ????????????????System.out.println(jc.getSuccessfulJobList());??
  • ????????????????jc.stop();??
  • ????????????????return?0;??
  • ????????????}??
  • ????????????if(jc.getFailedJobList().size()?>?0){??
  • ????????????????System.out.println(jc.getFailedJobList());??
  • ????????????????jc.stop();??
  • ????????????????return?1;??
  • ????????????}??
  • ????????}??
  • ????} ?

  • 需要給每個Job設置自己的Configuration,然后通過JobControl將多個Job連接到一起。?

    由于JobControl實現了Runnable接口,可以通過線程運行JobControl,最后通過Stop方法可以停止。如果不用一個Thread來運行,就會導致Hadoop中所有Job執行完畢之后,最后不會退出,但是結果是輸出完畢的。


    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------


    使用Hadoop里面的MapReduce來處理海量數據是非常簡單方便的,但有時候我們的應用程序,往往需要多個MR作業,來計算結果,比如說一個最簡單的使用MR提取海量搜索日志的TopN的問題,注意,這里面,其實涉及了兩個MR作業,第一個是詞頻統計,第兩個是排序求TopN,這顯然是需要兩個MapReduce作業來完成的。其他的還有,比如一些數據挖掘類的作業,常常需要迭代組合好幾個作業才能完成,這類作業類似于DAG類的任務,各個作業之間是具有先后,或相互依賴的關系,比如說,這一個作業的輸入,依賴上一個作業的輸出等等。?


    在Hadoop里實際上提供了,JobControl類,來組合一個具有依賴關系的作業,在新版的API里,又新增了ControlledJob類,細化了任務的分配,通過這兩個類,我們就可以輕松的完成類似DAG作業的模式,這樣我們就可以通過一個提交來完成原來需要提交2次的任務,大大簡化了任務的繁瑣度。具有依賴式的作業提交后,hadoop會根據依賴的關系,先后執行的job任務,每個任務的運行都是獨立的。?

    下面來看下散仙的例子,組合一個詞頻統計+排序的作業,測試數據如下:?


    Java代碼??
  • 秦東亮;72??
  • 秦東亮;34??
  • 秦東亮;100??
  • 三劫;899??
  • 三劫;32??
  • 三劫;1??
  • a;45??
  • b;567??
  • b;12??

  • 代碼如下: ?
    Java代碼??
  • package?com.qin.test.hadoop.jobctrol;??
  • ??
  • import?java.io.IOException;??
  • ??
  • import?org.apache.hadoop.fs.FileSystem;??
  • import?org.apache.hadoop.fs.Path;??
  • import?org.apache.hadoop.io.IntWritable;??
  • import?org.apache.hadoop.io.LongWritable;??
  • import?org.apache.hadoop.io.Text;??
  • import?org.apache.hadoop.io.WritableComparator;??
  • import?org.apache.hadoop.mapred.JobConf;??
  • import?org.apache.hadoop.mapreduce.Job;??
  • import?org.apache.hadoop.mapreduce.Mapper;??
  • import?org.apache.hadoop.mapreduce.Reducer;??
  • import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;??
  • import?org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;??
  • import?org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;??
  • import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;??
  • ??
  • ??
  • ??
  • ??
  • /**?
  • ?*?Hadoop的版本是1.2的?
  • ?*?JDK環境1.6?
  • ?*?使用ControlledJob+JobControl新版API?
  • ?*?完成組合式任務??
  • ?*?第一個任務是統計詞頻?
  • ?*?第二個任務是降序排序?
  • ?*??
  • ?*?如果使用MapReduce作業來完成的話,則需要跑2個MR任務?
  • ?*?但是如果我們使用了JobControl+ControlledJob就可以在?
  • ?*?一個類里面完成類型的DAG依賴式的作業?
  • ?*??
  • ?*??
  • ?*?@author?qindongliang?
  • ?*??
  • ?*??
  • ?*??
  • ?*?***/??
  • public?class?MyHadoopControl?{??
  • ??????
  • ??????
  • ??????
  • ????/***?
  • ?????*?
  • ?????*MapReduce作業1的Mapper?
  • ?????*?
  • ?????*LongWritable?1??代表輸入的key值,默認是文本的位置偏移量?
  • ?????*Text?2??????????每行的具體內容?
  • ?????*Text?3??????????輸出的Key類型?
  • ?????*Text?4??????????輸出的Value類型?
  • ?????*??
  • ?????*?*/??
  • ????private?static?class?SumMapper?extends?Mapper<LongWritable,?Text,?Text,?IntWritable>{??
  • ??????????
  • ????????private?Text?t=new?Text();??
  • ????????private?IntWritable?one=new?IntWritable(1);??
  • ??????????
  • ????????/**?
  • ?????????*??
  • ?????????*?map階段輸出詞頻?
  • ?????????*??
  • ?????????*??
  • ?????????*?**/??
  • ????????@Override??
  • ????????protected?void?map(LongWritable?key,?Text?value,Context?context)??
  • ????????????????throws?IOException,?InterruptedException?{??
  • ????????????String?data=value.toString();??
  • ????????????String?words[]=data.split(";");??
  • ??????????if(words[0].trim()!=null){??
  • ??????????????t.set("?"+words[0]);//賦值K??
  • ??????????????one.set(Integer.parseInt(words[1]));??
  • ??????????????context.write(t,?one);??
  • ??????????}???
  • ????????}??
  • ??????????
  • ????}??
  • ??????
  • ????/**?
  • ?????*?MapReduce作業1的Reducer?
  • ?????*?負責詞頻累加,并輸出?
  • ?????*??
  • ?????*?**/??
  • ????private?static?class?SumReduce?extends?Reducer<Text,?IntWritable,?IntWritable,?Text>{??
  • ??????????
  • ????????//存儲詞頻對象??
  • ????????private?IntWritable?iw=new?IntWritable();??
  • ??????????
  • ????????@Override??
  • ????????protected?void?reduce(Text?key,?Iterable<IntWritable>?value,Context?context)??
  • ????????????????throws?IOException,?InterruptedException?{??
  • ???????????
  • ??????????????
  • ????????????int?sum=0;??
  • ????????????for(IntWritable?count:value){??
  • ????????????????sum+=count.get();//累加詞頻??
  • ????????????}??
  • ????????????iw.set(sum);//設置詞頻??
  • ????????????context.write(iw,?key);//輸出數據??
  • ??????????????
  • ??????????????
  • ??????????????
  • ??????????????
  • ??????????????
  • ????????}??
  • ??????????
  • ????}??
  • ??????
  • ??????
  • ????/**?
  • ?????*?MapReduce作業2排序的Mapper?
  • ?????*??
  • ?????*?**/??
  • ????private?static?class?SortMapper??extends?Mapper<LongWritable,?Text,?IntWritable,?Text>{??
  • ??????????
  • ??????????
  • ????????IntWritable?iw=new?IntWritable();//存儲詞頻??
  • ????????private?Text?t=new?Text();//存儲文本??
  • ????????@Override??
  • ????????protected?void?map(LongWritable?key,?Text?value,Context?context)throws?IOException,?InterruptedException?{??
  • ??????????????
  • ????????????String?words[]=value.toString().split("?");??
  • ???????????System.out.println("數組的長度:?"+words.length);??
  • ????????????System.out.println("Map讀入的文本:?"+value.toString());??
  • ????????????System.out.println("=====>??"+words[0]+"??=====>"+words[1]);??
  • ?????????????if(words[0]!=null){??
  • ?????????????????iw.set(Integer.parseInt(words[0].trim()));??
  • ?????????????????t.set(words[1].trim());??
  • ?????????????????context.write(iw,?t);//map階段輸出,默認按key排序??
  • ?????????????}??
  • ??????????????
  • ???????????????
  • ??????????????
  • ????????}??
  • ??????????
  • ??????????
  • ??????????
  • ????}??
  • ??????
  • ??????
  • ????/**?
  • ?????*?MapReduce作業2排序的Reducer?
  • ?????*??
  • ?????*?**/??
  • ????private?static?class?SortReduce?extends?Reducer<IntWritable,?Text,?Text,?IntWritable>{??
  • ??????????
  • ??????????
  • ??????????
  • ????????/**?
  • ?????????*??
  • ?????????*?輸出排序內容?
  • ?????????*??
  • ?????????*?**/??
  • ????????@Override??
  • ????????protected?void?reduce(IntWritable?key,?Iterable<Text>?value,Context?context)??
  • ????????????????throws?IOException,?InterruptedException?{??
  • ???????????
  • ?????????????for(Text?t:value){??
  • ?????????????????context.write(t,?key);//輸出排好序后的K,V??
  • ?????????????}??
  • ??????????????
  • ????????}??
  • ??????????
  • ????}??
  • ??????
  • ??????
  • ??????
  • ??????
  • ????/***?
  • ?????*?排序組件,在排序作業中,需要使用?
  • ?????*?按key的降序排序?
  • ?????*??
  • ?????*?**/??
  • ????????public?static?class?DescSort?extends??WritableComparator{??
  • ??
  • ?????????????public?DescSort()?{??
  • ?????????????????super(IntWritable.class,true);//注冊排序組件??
  • ????????????}??
  • ?????????????@Override??
  • ????????????public?int?compare(byte[]?arg0,?int?arg1,?int?arg2,?byte[]?arg3,??
  • ????????????????????int?arg4,?int?arg5)?{??
  • ????????????????return?-super.compare(arg0,?arg1,?arg2,?arg3,?arg4,?arg5);//注意使用負號來完成降序??
  • ????????????}??
  • ???????????????
  • ?????????????@Override??
  • ????????????public?int?compare(Object?a,?Object?b)?{??
  • ???????????
  • ????????????????return???-super.compare(a,?b);//注意使用負號來完成降序??
  • ????????????}??
  • ??????????????
  • ????????}??
  • ??????
  • ??????
  • ??????
  • ??????
  • ??????
  • ??????
  • ????/**?
  • ?????*?驅動類?
  • ?????*??
  • ?????*?**/??
  • ????public?static?void?main(String[]?args)throws?Exception?{??
  • ??????
  • ??????????
  • ???????????JobConf?conf=new?JobConf(MyHadoopControl.class);???
  • ???????????conf.set("mapred.job.tracker","192.168.75.130:9001");??
  • ???????????conf.setJar("tt.jar");??
  • ???????????
  • ?????????System.out.println("模式:??"+conf.get("mapred.job.tracker"));;??
  • ??????????
  • ??????????
  • ????????/**?
  • ?????????*??
  • ?????????*作業1的配置?
  • ?????????*統計詞頻?
  • ?????????*??
  • ?????????*?**/??
  • ????????Job?job1=new?Job(conf,"Join1");??
  • ????????job1.setJarByClass(MyHadoopControl.class);??
  • ??????????
  • ????????job1.setMapperClass(SumMapper.class);??
  • ????????job1.setReducerClass(SumReduce.class);??
  • ??????????
  • ????????job1.setMapOutputKeyClass(Text.class);//map階段的輸出的key??
  • ????????job1.setMapOutputValueClass(IntWritable.class);//map階段的輸出的value??
  • ??????????
  • ????????job1.setOutputKeyClass(IntWritable.class);//reduce階段的輸出的key??
  • ????????job1.setOutputValueClass(Text.class);//reduce階段的輸出的value??
  • ??????????
  • ??????
  • ??????????
  • ????????//加入控制容器??
  • ????????ControlledJob?ctrljob1=new??ControlledJob(conf);??
  • ????????ctrljob1.setJob(job1);??
  • ??????????
  • ??????????
  • ????????FileInputFormat.addInputPath(job1,?new?Path("hdfs://192.168.75.130:9000/root/input"));??
  • ????????FileSystem?fs=FileSystem.get(conf);??
  • ???????????
  • ?????????Path?op=new?Path("hdfs://192.168.75.130:9000/root/op");??
  • ???????????
  • ?????????if(fs.exists(op)){??
  • ?????????????fs.delete(op,?true);??
  • ?????????????System.out.println("存在此輸出路徑,已刪除!!!");??
  • ?????????}??
  • ????????FileOutputFormat.setOutputPath(job1,?op);??
  • ??????????
  • ????/**========================================================================*/??
  • ??????????
  • ????????/**?
  • ?????????*??
  • ?????????*作業2的配置?
  • ?????????*排序?
  • ?????????*??
  • ?????????*?**/??
  • ????????Job?job2=new?Job(conf,"Join2");??
  • ????????job2.setJarByClass(MyHadoopControl.class);??
  • ??????????
  • ????????//job2.setInputFormatClass(TextInputFormat.class);??
  • ??????????
  • ??????????
  • ????????job2.setMapperClass(SortMapper.class);??
  • ????????job2.setReducerClass(SortReduce.class);??
  • ??????????
  • ????????job2.setSortComparatorClass(DescSort.class);//按key降序排序??
  • ??????????
  • ????????job2.setMapOutputKeyClass(IntWritable.class);//map階段的輸出的key??
  • ????????job2.setMapOutputValueClass(Text.class);//map階段的輸出的value??
  • ??????????
  • ????????job2.setOutputKeyClass(Text.class);//reduce階段的輸出的key??
  • ????????job2.setOutputValueClass(IntWritable.class);//reduce階段的輸出的value??
  • ??????????
  • ??????????
  • ??
  • ????????//作業2加入控制容器??
  • ????????ControlledJob?ctrljob2=new?ControlledJob(conf);??
  • ????????ctrljob2.setJob(job2);??
  • ??????????
  • ????????/***?
  • ?????????*??
  • ?????????*?設置多個作業直接的依賴關系?
  • ?????????*?如下所寫:?
  • ?????????*?意思為job2的啟動,依賴于job1作業的完成?
  • ?????????*??
  • ?????????*?**/??
  • ????????ctrljob2.addDependingJob(ctrljob1);??
  • ??????????
  • ??????????
  • ??????????
  • ????????//輸入路徑是上一個作業的輸出路徑??
  • ????????FileInputFormat.addInputPath(job2,?new?Path("hdfs://192.168.75.130:9000/root/op/part*"));??
  • ????????FileSystem?fs2=FileSystem.get(conf);??
  • ???????????
  • ?????????Path?op2=new?Path("hdfs://192.168.75.130:9000/root/op2");??
  • ?????????if(fs2.exists(op2)){??
  • ?????????????fs2.delete(op2,?true);??
  • ?????????????System.out.println("存在此輸出路徑,已刪除!!!");??
  • ?????????}??
  • ????????FileOutputFormat.setOutputPath(job2,?op2);??
  • ??????????
  • ????????//?System.exit(job2.waitForCompletion(true)???0?:?1);??
  • ??????????
  • ??????????
  • ??????????
  • ????????/**====================================================================***/??
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ????????/**?
  • ?????????*??
  • ?????????*?主的控制容器,控制上面的總的兩個子作業?
  • ?????????*??
  • ?????????*?**/??
  • ????????JobControl?jobCtrl=new?JobControl("myctrl");??
  • ????????//ctrljob1.addDependingJob(ctrljob2);//?job2在job1完成后,才可以啟動??
  • ????????//添加到總的JobControl里,進行控制??
  • ??????????
  • ????????jobCtrl.addJob(ctrljob1);???
  • ????????jobCtrl.addJob(ctrljob2);??
  • ??????????
  • ???
  • ????????//在線程啟動??
  • ????????Thread??t=new?Thread(jobCtrl);??
  • ????????t.start();??
  • ??????????
  • ????????while(true){??
  • ??????????????
  • ????????????if(jobCtrl.allFinished()){//如果作業成功完成,就打印成功作業的信息??
  • ????????????????System.out.println(jobCtrl.getSuccessfulJobList());??
  • ??????????????????
  • ????????????????jobCtrl.stop();??
  • ????????????????break;??
  • ????????????}??
  • ??????????????
  • ????????????if(jobCtrl.getFailedJobList().size()>0){//如果作業失敗,就打印失敗作業的信息??
  • ????????????????System.out.println(jobCtrl.getFailedJobList());??
  • ??????????????????
  • ????????????????jobCtrl.stop();??
  • ????????????????break;??
  • ????????????}??
  • ??????????????
  • ????????}??
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ???????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ??????????
  • ???
  • ??????????
  • ??????????
  • ????}??
  • ??????
  • ??????
  • ??????
  • ??????
  • ??
  • }??

  • 運行日志如下: ?
    Java代碼??
  • 模式:??192.168.75.130:9001??
  • 存在此輸出路徑,已刪除!!!??
  • 存在此輸出路徑,已刪除!!!??
  • WARN?-?JobClient.copyAndConfigureFiles(746)?|?Use?GenericOptionsParser?for?parsing?the?arguments.?Applications?should?implement?Tool?for?the?same.??
  • INFO?-?FileInputFormat.listStatus(237)?|?Total?input?paths?to?process?:?1??
  • WARN?-?NativeCodeLoader.<clinit>(52)?|?Unable?to?load?native-hadoop?library?for?your?platform...?using?builtin-java?classes?where?applicable??
  • WARN?-?LoadSnappy.<clinit>(46)?|?Snappy?native?library?not?loaded??
  • WARN?-?JobClient.copyAndConfigureFiles(746)?|?Use?GenericOptionsParser?for?parsing?the?arguments.?Applications?should?implement?Tool?for?the?same.??
  • INFO?-?FileInputFormat.listStatus(237)?|?Total?input?paths?to?process?:?1??
  • [job?name:??Join1??
  • job?id:?myctrl0??
  • job?state:??SUCCESS??
  • job?mapred?id:??job_201405092039_0001??
  • job?message:????just?initialized??
  • job?has?no?depending?job:?????
  • ,?job?name:?Join2??
  • job?id:?myctrl1??
  • job?state:??SUCCESS??
  • job?mapred?id:??job_201405092039_0002??
  • job?message:????just?initialized??
  • job?has?1?dependeng?jobs:??
  • ?????depending?job?0:???Join1??
  • ]??

  • 處理的結果如下: ?
    Java代碼??
  • 三劫??932??
  • b???579??
  • 秦東亮?206??
  • a???45??

  • 可以看出,結果是正確的。程序運行成功,上面只是散仙測的2個MapReduce作業的組合,更多的組合其實和上面的一樣。?
    總結:在配置多個作業時,Job的配置盡量分離單獨寫,不要輕易拷貝修改,這樣很容易出錯的,散仙在配置的時候,就是拷貝了一個,結果因為少修改了一個地方,在運行時候一直報錯,最后才發現,原來少改了某個地方,這一點需要注意一下。

    《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

    總結

    以上是生活随笔為你收集整理的如何使用Hadoop的JobControl的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。