日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

JobControl的使用及获取计数器

發布時間:2024/3/24 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 JobControl的使用及获取计数器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

JobControl的使用##


####1.JobControl配置 ##

//1.首先聲明一個JobControl JobControl jobControl = new JobControl("groupName");//2.然后寫conf的配置 Configuration conf = new Configuration(); conf.set("name","value"); ...省略配置......一般這里會判斷輸出路徑是否存在... FileSystem fileSystem = FileSystem.get(new Configuration()); if (fileSystem.exists(new Path(outputPath))) {LOG.warn("output: " + outputPath + " already exists! DELETE");fileSystem.delete(new Path(outputPath), true);}//3.開始寫job配置 Job job = Job.getInstance(conf);trackViewJob.setJarByClass(xxx.class);trackViewJob.setJobName("jobName");trackViewJob.setInputFormatClass(OrcNewInputFormat.class);for(String date: dateList) {//這里最好加一個輸入路徑是否存在的判斷MultipleInputs.addInputPath(job, new Path(inputPath), RCFileMapReduceInputFormat.class, TrackDataMapper.class);}job.setMapperClass(TrackDataMapper.class);job.setReducerClass(TrackDataReducer.class);job.setMapOutputKeyClass(TextTuple.class);job.setMapOutputValueClass(TextTuple.class);job.setNumReduceTasks(5000);job.setOutputKeyClass(TextTuple.class);job.setOutputValueClass(TextTuple.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath));//4.job寫完,創建ControlledJob//job加入ControlledJob ControlledJob cj = new ControlledJob(conf);cj.setJob(job); //5.如果有其他依賴,則把依賴寫在這里,例如cj.addDependingJob(other_cj1);cj.addDependingJob(other_cj2);//表示當前cj依賴于其他other_cj完成,他才能提交運行,有多個就add多個//如果他不需要其他程序運行完在運行,則這里可以不寫//6.把剛創建的 ControlledJob加入 JobControljobControl.addJob(cj);以上完成JobControl配置 下面運行

####2.JobControl運行 ##
應該有直接運行的方法。下面是自己寫的方法
直接貼代碼了,不多說。

調用方法:
boolean res = RunTool.runJobControll(job, true);

package xxx;import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map;public class RunTool {private static final Logger LOG = Logger.getLogger(RunTool.class);public static boolean runJobControll(JobControl jobControl, boolean verbose)throws InterruptedException, IOException {LOG.info("start to run job control with model" + (verbose ? " verbose" : " clean"));Thread runningThread = new Thread(jobControl);runningThread.start();Map<String, Integer> jobProgress = null;if (verbose) {jobProgress = new HashMap<String, Integer>();}int numWait = -1;int numRunning = -1;int numSuccess = -1;int numFailed = -1; while (!jobControl.allFinished()) {Thread.sleep(10 * 1000);if (verbose) {int wait = jobControl.getWaitingJobList().size();int running = jobControl.getRunningJobList().size();int success = jobControl.getSuccessfulJobList().size();int failed = jobControl.getFailedJobList().size();if (numWait != wait) {numWait = wait;LOG.info("job control state alert -- waiting jobs: " + numWait);}if (numRunning != running) {numRunning = running;LOG.info("job control state alert -- running jobs: " + numRunning);}if (numSuccess != success) {numSuccess = success;LOG.info("job control state alert -- successful jobs: " + numSuccess);}if (failed != numFailed) {numFailed = failed;LOG.info("job control state alert -- failed jobs: " + numFailed);}if (failed > 0) {jobControl.stop();LOG.info("some controlled job failed! stop the job control");LOG.info("stop all running jobs");for (ControlledJob wcj: jobControl.getRunningJobList()) {LOG.info("killing job: " + wcj.getJobName());wcj.killJob();}break;}for (ControlledJob cj: jobControl.getRunningJobList()) {String jobId = cj.getJobID();Job job = cj.getJob();int currentJobProgress = (int) (100 * (0.5 * job.mapProgress() + 0.5 * job.reduceProgress()));if (!jobProgress.containsKey(jobId) || jobProgress.get(jobId) != currentJobProgress) {LOG.info("Controlled Job Alert -- job: " + job.getJobName() +", progress: " + currentJobProgress + "%" +", track url: " + job.getTrackingURL());jobProgress.put(jobId, currentJobProgress);}}}}List<ControlledJob> failedList = jobControl.getFailedJobList();for (ControlledJob fcj: failedList) {LOG.error("job: " + fcj.getJobName() + " failed!");}return failedList.size() == 0;} }

####3.JobControl成功或失敗判斷 ##

if (res) {//成功,獲取計數器的值List<ControlledJob> finishList = job.getSuccessfulJobList();for (ControlledJob controlledJob : finishList) {Counters counters = controlledJob.getJob().getCounters(); //**下面這部分輸出所有的計數器,調試用,不管什么,大大小小的統計一并輸出Iterable<String> gcList = counters.getGroupNames(); //獲取所有計數器名字for (String counter_name : gcList) {CounterGroup gc = counters.getGroup(counter_name);cnt.add( counter_name +"# start ~~~~~");for (Counter counter : gc) {cnt.add( counter.getName() +"#"+counter.getValue());LOG.info( counter.getName() + "\t" + counter.getValue() );}cnt.add( counter_name +"# end ~~~~~");}/**最終要的結果,只有每部分輸出以及最終輸出統計CounterGroup gc = counters.getGroup(Consts.COUNTER_NAME);for (Counter counter : gc) {cnt.add( counter.getName() +"#"+counter.getValue());LOG.info( counter.getName() + "\t" + counter.getValue() );}**/ }}else {//失敗,獲取失敗的任務名字List<ControlledJob> failedList = job.getFailedJobList();for (ControlledJob fail : failedList) {failed.add( fail.getJob().getJobName() );LOG.info("### Failed:\t" + fail.getJob().getJobName());}}

如果是多線程,那么需要一個鎖。

synchronized (this) {isSucc = res;isFinished = true;}

####4.JobControl獲取計數器值 ##
代碼在3中都有,既可以獲取指定計數器的值,也可以獲取所有mapreduce 計數器的值

Iterable<String> gcList = counters.getGroupNames(); //獲取所有計數器名字,這個包括了mapreduce所有的計數器,內部計數器那些for (String counter_name : gcList) {//又起要注意,這里迭代出的是所有計數器//不止用戶自定義的計數器,還包括程序輸入輸出等一些內部計數器}

總結

以上是生活随笔為你收集整理的JobControl的使用及获取计数器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。