日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

Hadoop Map/Reduce的工作流

發(fā)布時(shí)間:2023/12/19 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop Map/Reduce的工作流 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

問題描述

我們的數(shù)據(jù)分析平臺是單一的Map/Reduce過程,由于半年來不斷地增加需求,導(dǎo)致了問題已經(jīng)不是那么地簡單,特別是在Reduce階段,一些大對象會常駐內(nèi)存。因此越來越頂不住壓力了,當(dāng)前內(nèi)存問題已經(jīng)是最大的問題,每個(gè)Map占用5G,每個(gè)Reduce占用9G!直接導(dǎo)致當(dāng)數(shù)據(jù)分析平臺運(yùn)行時(shí),集群處于資源匱乏狀態(tài)。

?

因此,在不改變業(yè)務(wù)數(shù)據(jù)計(jì)算的條件下,將單一的Map/Reduce過程分解成2個(gè)階段。這個(gè)時(shí)候,需求就相對來說比較復(fù)雜,將第一階段的Reduce結(jié)果輸出至HDFS,作為第二階段的輸入。

?

其基本過程圖很簡單如下所示:



我們可以使用啟動兩個(gè)Job,在第一個(gè)階段Job完成之后,再進(jìn)行第二階段Job的執(zhí)行。但是更好的方式是使用Hadoop中提供的JobControl工具,這個(gè)工具可以加入多個(gè)等待執(zhí)行的子Job,并定義其依賴關(guān)系,決定執(zhí)行的先后順序。

?

JobControl jobControl = new JobControl(GROUP_NAME);JobConf phase1JobConf = Phase1Main.getJobConf(getConf(), jsonConfigFilePhase1, reduceCountOne);Job phase1Job = new Job(phase1JobConf);jobControl.addJob(phase1Job);JobConf phase2JobConf = Phase2Main.getJobConf(getConf(), jsonConfigFilePhase2, reduceCountTwo);Job phase2Job = new Job(phase2JobConf);jobControl.addJob(phase2Job);phase2Job.addDependingJob(phase1Job);jobControl.run();return jobControl.getFailedJobs() == null || jobControl.getFailedJobs().isEmpty() ? 0 : 1;

?

正如代碼所示,可以使用job的addDependingJob(JobConf)方法來定義其依賴關(guān)系。

?

但是這種方式有一個(gè)非常大的缺點(diǎn),如果中間數(shù)據(jù)結(jié)果過大,將其放置在HDFS上是非常浪費(fèi)磁盤資源,同時(shí)也帶來后續(xù)過多的I/O操作,包括第一階段的寫磁盤和第二階段的讀磁盤(而且本身中間結(jié)果數(shù)據(jù)也沒有什么太大用途)。

?

經(jīng)過查閱,在Hadoop中,一個(gè)Job可以按順序執(zhí)行多個(gè)mapper對數(shù)據(jù)進(jìn)行前期的處理,再進(jìn)行Reduce,Reduce執(zhí)行完成后,還可以繼續(xù)執(zhí)行多個(gè)Mapper,形成一個(gè)處理鏈結(jié)構(gòu),這樣的Job是不會存儲中間結(jié)果的,大大減少了磁盤I/O操作。

?

但這種方式也對map/reduce程序有個(gè)要求,就是只能存在一個(gè)Partition規(guī)則,因?yàn)檎麄€(gè)鏈條中只會存在一次Reduce操作。前文介紹的那兩個(gè)階段的Partition規(guī)則如果不一致,是不能改造成這種方式的。

?

這種方式的大致流程圖如下:

?

?

?

由于我們的分析程序中,第二步就需要根據(jù)一定的規(guī)則進(jìn)行聚集,因此第二步就需要進(jìn)行Reduce,將原來第四步的Reduce階段強(qiáng)行改造成Map階段。注意,Map階段之間互相傳遞數(shù)據(jù)時(shí),其數(shù)量是固定的,而且不會進(jìn)行聚集(Reduce)操作,還是需要按照流的方式進(jìn)行處理,因此最好要先排序。單個(gè)Map的結(jié)果只會傳遞給特定的單個(gè)下個(gè)步驟的Map端。

?

在ChainMain類中會執(zhí)行這種方式,需要借助于ChainMapper和ChainReducer兩個(gè)Hadoop中提供的類:

String finalJobName = TongCommonConstants.JOB_NAME + jobNameSuffix;jobConf.setJobName(finalJobName);jobConf.setInputFormat(RawLogInputFormat.class);jobConf.setPartitionerClass(Phase1Partitioner.class);jobConf.setNumReduceTasks(reduceCountTwo);jobConf.set(TongCommonConstants.DIC_INFO, jsonConfigFile);DicInfoManager.getInstance().readDicManager(jobConf, jsonConfigFile);String yesterdayOutDir = DicInfoManager.getInstance().getDicManager().getPrevious_day_output_path();JobConf phase1JobConf = getJobConf(jsonConfigFile, yesterdayOutDir);ChainMapper.addMapper(jobConf, Phase1Mapper.class, Text.class, History.class, Phase1KeyDecorator.class,BytesWritable.class, true, phase1JobConf);JobConf phase2ReducerConf = getJobConf(jsonConfigFile, yesterdayOutDir);ChainReducer.setReducer(jobConf, Phase1Reducer.class, Phase1KeyDecorator.class, BytesWritable.class,Text.class, Text.class, true, phase2ReducerConf);JobConf phase3ChainJobConf = getJobConf(jsonConfigFile, yesterdayOutDir);ChainReducer.addMapper(jobConf, Phase3ChainMapper.class, Text.class, Text.class, Phase2KeyDecorator.class,BytesWritable.class, true, phase3ChainJobConf);JobConf phase4ChainJobConf = getJobConf(jsonConfigFile, yesterdayOutDir);ChainReducer.addMapper(jobConf, Phase4ChainMapper.class, Phase2KeyDecorator.class, BytesWritable.class,Text.class, Text.class, true, phase4ChainJobConf);RunningJob runningJob = JobClient.runJob(jobConf);runningJob.waitForCompletion();return runningJob.isSuccessful() ? 0 : 1;

?

經(jīng)過這種方式的改造后,對原有程序的影響最小,因?yàn)椴恍枰x中間結(jié)果存儲地址,當(dāng)然也不需要定義第二階段的配置文件。

?

新手比較容易犯的一個(gè)錯(cuò)誤是,Reducer后面的map步驟要使用ChainReducer.addMapper方法而不是ChainMapper.addMapper方法,否則會抱下面的異常,我就在這個(gè)上面栽了跟頭,查了很久。

Exception in thread "main" java.lang.IllegalArgumentException: The specified Mapper input key class does not match the previous Mapper's output key class. at org.apache.hadoop.mapreduce.lib.chain.Chain.validateKeyValueTypes(Chain.java:695) at org.apache.hadoop.mapred.lib.Chain.addMapper(Chain.java:104)

?

Hadoop工作流中的JobControl

很多情況下,用戶編寫的作業(yè)比較復(fù)雜,相互之間存在依賴關(guān)系,這種可以用有向圖表示的依賴關(guān)系稱之為“工作流”。

JobControl是由兩個(gè)類組成:Job和JobControl,Job的狀態(tài)轉(zhuǎn)移圖如下:


?

作業(yè)在剛開始的時(shí)候處于Waiting狀態(tài),如果沒有依賴作業(yè)或者所有依賴作業(yè)都已經(jīng)完成的情況下,進(jìn)入Ready狀態(tài);一旦進(jìn)入Ready狀態(tài),則作業(yè)可被提交到Hadoop集群上運(yùn)行,并進(jìn)入Running狀態(tài),根據(jù)作業(yè)的運(yùn)行情況,可能進(jìn)入Success或Failed狀態(tài)。需要注意的是,如果一個(gè)作業(yè)的依賴作業(yè)失敗,則該作業(yè)也會失敗,后續(xù)的所有作業(yè)也都會失敗。

JobControl封裝了一系列MapReduce作業(yè)及其對應(yīng)的依賴關(guān)系,它將處于不同狀態(tài)的作業(yè)放入不同的哈希表,按照J(rèn)ob的狀態(tài)轉(zhuǎn)移圖轉(zhuǎn)移作業(yè),直到所有作業(yè)運(yùn)行完成。在實(shí)現(xiàn)的時(shí)候,JobControl包含一個(gè)線程用于周期性地監(jiān)控和更新各個(gè)作業(yè)的運(yùn)行狀態(tài),調(diào)度依賴作業(yè)運(yùn)行完成的作業(yè),提交Ready狀態(tài)的作業(yè)等。

ChainMapper/ChainReduce

ChainMapper/ChainReducer主要是為了解決線性鏈?zhǔn)組apper而提出的,在Map或Reduce階段存在多個(gè)Mapper,像多個(gè)Linux管道一樣,前一個(gè)Mapper的輸出結(jié)果直接重定向到下一個(gè)Mapper的輸入,形成一個(gè)流水線,最后的Mapper或Reducer才會將結(jié)果寫到HDFS上。對于任意一個(gè)MapReduce作業(yè),Map和Reduce階段可以由無限個(gè)Mapper,但只能有一個(gè)Reducer。

Hadoop MapReduce有一個(gè)約定,函數(shù)OutputCollector.collect(key, value)執(zhí)行期間不能改變key和value的值,這是因?yàn)槟硞€(gè)map/reduce調(diào)用該方法之后,可能后續(xù)繼續(xù)再次使用key和value的值,如果被改變,可能會造成潛在的錯(cuò)誤。

ChainMapper/Reducer實(shí)現(xiàn)的關(guān)鍵技術(shù)點(diǎn)就是修改Mapper和Reducer的輸出流,將本來要寫入文件的輸出結(jié)果重定向到另外一個(gè)Mapper中。盡管鏈?zhǔn)阶鳂I(yè)在Map和Reduce階段添加了多個(gè)Mapper,但仍然只是一個(gè)MapReduce作業(yè),因而只能有一個(gè)與之對應(yīng)的JobConf對象。

ChainMapper中實(shí)現(xiàn)的map函數(shù)大概如下:

public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException{Mapper mapper = chain.getFirstMap();if(mapper != null){mapper.map(key, value, chain.getMapperCollector(0, output, reporter), reporter);} }

?

chain.getMapperCollector返回一個(gè)OutputCollector實(shí)現(xiàn),即ChainOutputCollector,collector方法大概如下:

public void collect(K key, V value) throws IOException{if(nextMapperIndex < mappers.size()){ //調(diào)用下一個(gè)Mapper,直到?jīng)]有mappernextMapper.map(key, value, new ChainOutputCollector(nextMapperIndex, nextKeySerialization, nextValueSerialization, output, reporter));} else {//如果是最后一個(gè)Mapper,直接調(diào)用真正的Collectoroutput.collect(key, value);} }

?

?在使用ChainMapper/ChainReducer時(shí)需要注意一個(gè)問題:就是其中參數(shù)byValue的選擇,究竟是該傳值還是傳遞引用。因?yàn)樵贖adoop編程中需要處理的數(shù)據(jù)量比較大,經(jīng)常使用復(fù)用同一個(gè)對象的情況,普通的Mapper/Reducer程序由于不會執(zhí)行鏈?zhǔn)教幚?#xff0c;在其他的JVM中來重建Map輸出的對象,而Chain API中需要管道一樣的操作來進(jìn)行下一步處理,Mapper.map()函數(shù)調(diào)用完outputCollector.collect(key, value)之后,可能再次使用key和value的值,才導(dǎo)致這個(gè)問題的發(fā)生。

?

個(gè)人總覺得雖然重用引用的方式雖然可以節(jié)省一定的內(nèi)存,但是不重用引用也僅僅會對Minor GC造成一定的壓力,如果嚴(yán)格控制生成的new對象Key,Value的生命周期的話。

?

正是為了防止OutputCollector直接對key/value進(jìn)行修改,ChainMapper允許用戶指定key/value的傳遞方式,如果編寫的程序確定key/value執(zhí)行期間不會被重用以修改(如果是不可變對象最好),則可以選擇按照引用來進(jìn)行傳遞,否則按值傳遞。需要注意的是,引用傳遞可以避免對象的深層拷貝,提高處理效率,但需要編程時(shí)做出key/value不能修改的保證。

?

?

轉(zhuǎn)載于:https://www.cnblogs.com/mmaa/p/5789916.html

總結(jié)

以上是生活随笔為你收集整理的Hadoop Map/Reduce的工作流的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。