MR作业的提交监控、输入输出控制及特性使用
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
MR作業(yè)的提交監(jiān)控、輸入輸出控制及特性使用 博客分類: hadoop提交作業(yè)并監(jiān)控
JobClient是用戶作業(yè)與JobTracker交互的主要接口,它提供了提交作業(yè),跟蹤作業(yè)進(jìn)度、訪問任務(wù)報(bào)告及l(fā)ogs、以及獲取MR集群狀態(tài)信息等方法。
提交作業(yè)流程包括:
- 檢查作業(yè)的輸入輸出
- 計(jì)算作業(yè)的輸入分片(InputSplit)
- 如果需要,為DistributedCache設(shè)置必須的賬戶信息
- 將作業(yè)用到的jar包文件和配置信息拷貝至文件系統(tǒng)(一般為HDFS)上的MR系統(tǒng)路徑中
- 提交作業(yè)到JobTracker,并可監(jiān)控作業(yè)狀態(tài)
作業(yè)歷史(Job History)文件會(huì)記錄在hadoop.job.history.user.location指定的位置,默認(rèn)在作業(yè)輸出路徑下的logs/history/路徑下。因此歷史日志默認(rèn)在mapred.output.dir/logs/history下。用戶可以將hadoop.job.history.user.location值設(shè)置為none來不記錄作業(yè)歷史。
使用命令來查看歷史日志:
| 1 | $hadoop job -history output-dir |
上面命令會(huì)顯示作業(yè)的詳細(xì)信息、失敗的被kill的任務(wù)(tip)的詳細(xì)信息。使用下面命令可以查看作業(yè)更詳細(xì)的信息:
| 1 | $hadoop job -history all output-dir |
可以使用OutputLogFilter從輸出路徑中過濾日志文件。
一般,我們創(chuàng)建應(yīng)用,通過JobConf設(shè)置作業(yè)的各種屬性,然后使用JobClient提交作業(yè)并監(jiān)控進(jìn)度。
作業(yè)控制
有時(shí)可能需要一個(gè)作業(yè)鏈完成復(fù)雜的任務(wù)。這點(diǎn)是可以輕松實(shí)現(xiàn)的,因?yàn)樽鳂I(yè)輸出一般都在分布式文件系統(tǒng)上,作業(yè)輸出可以當(dāng)做下個(gè)作業(yè)的輸入,這樣就形成了鏈?zhǔn)阶鳂I(yè)。
這種作業(yè)成功是否依賴于客戶端。客戶端可以使用以下方式來控制作業(yè)的執(zhí)行:
- runJob(JobConf):提交作業(yè)并僅在作業(yè)完成時(shí)返回
- submitJob(JobConf):提交作業(yè)后立即返回一個(gè)RunningJob的引用,使用它可以查詢作業(yè)狀態(tài)并處理調(diào)度邏輯。
- JobConf.setJobEndNotificationURI(String):設(shè)置作業(yè)完成時(shí)通知
你也可以使用Oozie來實(shí)現(xiàn)復(fù)雜的作業(yè)鏈。
作業(yè)輸入
下面講作業(yè)輸入的內(nèi)容。
InputFormat?描述MR作業(yè)的輸入信息。InputFormat有以下作用:
基于文件的InputFormat實(shí)現(xiàn)類即FileInputFormal是通過計(jì)算輸入文件的總大小(以字節(jié)為單位)來分裂成邏輯分片的。然而文件系統(tǒng)的塊大小又作為輸入分片大小的上限,下限可以通過mapred.min.split.size來設(shè)定。
基于輸入大小進(jìn)行邏輯分片機(jī)制在很多情況下是不適合的,還需要注意記錄邊界。在這些情況下,應(yīng)用應(yīng)該實(shí)現(xiàn)RecordReader來處理記錄邊界,為獨(dú)立的mapper任務(wù)提供面向行的邏輯分片。
TextInputFormat是默認(rèn)的輸入格式。
如果作業(yè)輸入格式為TextInputFormat,MR框架可以檢測(cè)以.gz擴(kuò)展名的輸入文件并自動(dòng)使用合適的壓縮算法來解壓文件。特別注意的是,.gz格式的壓縮文件不能被分片,每個(gè)文件作為一個(gè)輸入分片被一個(gè)mapper處理。
輸入分片(InputSplit)
InputSplit的數(shù)據(jù)會(huì)被一個(gè)獨(dú)立的mapper來處理。一般輸入分片是對(duì)輸入基于面向字節(jié)為單位的。可以使用RecordReader來處理提供面向行的輸入分片。
FileSplit是默認(rèn)的InputSplit,通過設(shè)置map.input.file設(shè)置輸入文件路徑。
RecordReader
RecordReader從InputSplit讀入數(shù)據(jù)對(duì)。RecordReader將InputSplit提供得面向字節(jié)的的輸入轉(zhuǎn)換為面向行的分片給Mapper實(shí)現(xiàn)類來處理。RecordReader的責(zé)任就是處理行邊界并以kv方式將數(shù)據(jù)傳給mapper作業(yè)。
作業(yè)輸出
OutFormat描述作業(yè)的輸出信息。MR框架使用OutputFormat來處理:
- 驗(yàn)證作業(yè)的輸出信息。例如驗(yàn)證輸出路徑是否存在.
- 提供RecordWriter實(shí)現(xiàn)來寫作業(yè)的輸出文件。輸出文件存儲(chǔ)在文件系統(tǒng)中。TextOutputFormat是默認(rèn)的輸出格式。
OutputCommitter
OutputCommitter是MR作業(yè)的輸出提交對(duì)象。MR框架使用它來處理:
- 初始化時(shí)準(zhǔn)備作業(yè)。例如,作業(yè)初始化時(shí)創(chuàng)建臨時(shí)輸出路徑。作業(yè)準(zhǔn)備階段通過一個(gè)獨(dú)立的任務(wù)在作業(yè)的PREP狀態(tài)時(shí)完成,然后初始化作業(yè)的所有任務(wù)。一旦準(zhǔn)備階段完成,作業(yè)狀態(tài)切換到Running狀態(tài)。
- 作業(yè)完成后清理作業(yè)。例如,在作業(yè)完成后清理輸出臨時(shí)路徑。作業(yè)清理階段通過一個(gè)獨(dú)立的任務(wù)在作業(yè)最后完成。作業(yè)在清理階段完成后設(shè)置為成功/失敗/中止。
- 設(shè)置任務(wù)的臨時(shí)輸出。任務(wù)準(zhǔn)備階段在任務(wù)初始化時(shí)作為同一個(gè)任務(wù)的一部分。
- 檢查任務(wù)是否需要一個(gè)提交對(duì)象。目的是為了在任務(wù)不需要時(shí)避免一個(gè)提交過程。
- 提交任務(wù)輸出。一旦任務(wù)完成,如果需要的話,任務(wù)將提交其輸出物。
- 銷毀任務(wù)的提交。如果任務(wù)失敗或者被中止,輸出將被清理。如果不能被清理(如異常的塊),與該任務(wù)相同的任務(wù)id會(huì)啟動(dòng)并處理清理。
FileOutputCommitter是默認(rèn)的OutputCommiter實(shí)現(xiàn)。作業(yè)準(zhǔn)備及清理任務(wù)占用同一個(gè)TaskTracker上空閑的map或者reduce槽。作業(yè)清理任務(wù)、任務(wù)清理任務(wù)以及作業(yè)準(zhǔn)備任務(wù)按照該順序擁有最高的優(yōu)先級(jí)。
任務(wù)副作用文件
在一些應(yīng)用中,任務(wù)需要?jiǎng)?chuàng)建和/或?qū)懭胛募?#xff0c;這些文件不同于實(shí)際的作業(yè)輸出文件。
在這種情況下,相同的2個(gè)Mapper或者Reducer有可能同時(shí)運(yùn)行(比如推測(cè)執(zhí)行的任務(wù)),并在文件系統(tǒng)上嘗試打開和或者寫入相同文件或路徑。所以你必須為每個(gè)任務(wù)的執(zhí)行使用唯一名字(比如使用attapid,attempt2007092218120001m000000_0)。
為了避免這樣的情況,當(dāng)OutputCommiter為FileOutputCommiter時(shí),MR框架通過${mapred.work.output.dir}為每個(gè)任務(wù)執(zhí)行嘗試維護(hù)一個(gè)特殊的路徑:${mapred.output.dir}/temporary/${taskid},這個(gè)路徑用來存儲(chǔ)任務(wù)執(zhí)行的輸出。在任務(wù)執(zhí)行成功完成時(shí),${mapred.output.dir}/temporary/${taskid} 下的文件被移到${mapred.output.dir}下。當(dāng)然框架丟棄不成功的任務(wù)執(zhí)行的子路徑。這個(gè)進(jìn)程對(duì)用戶應(yīng)用透明。
MR應(yīng)用開發(fā)者可以利用這個(gè)特性在執(zhí)行過程中通過FileOutputFormat.getWorkOutputPath()在${mapred.work.output.dir} 下創(chuàng)建需要的site files,框架會(huì)像成功的任務(wù)試執(zhí)行那樣處理,這樣就避免為每個(gè)試執(zhí)行任務(wù)取唯一名字。
注意:特殊試執(zhí)行任務(wù)執(zhí)行過程中${mapred.work.output.dir} 值實(shí)際是${mapred.output.dir}/temporary/${taskid},該值被框架定制。所以直接在FileOutputFormat.getWorkOutputPath()返回的路徑中創(chuàng)建site-files,來使用這個(gè)特性。
對(duì)于只有mapper的作業(yè),side-files將直接進(jìn)入hdfs。
RecordWriter
RecordWriter將輸出的kv值寫入輸出文件。RecordWriter實(shí)現(xiàn)類將作業(yè)輸出寫入文件系統(tǒng)。
其他特性
提交作業(yè)到隊(duì)列
用戶提交的作業(yè)到隊(duì)列中,隊(duì)列是一個(gè)作業(yè)的集合,允許MR提供一些特定功能。隊(duì)列使用ACL控制哪些用戶提交作業(yè)。隊(duì)列一般和Hadoop Scheduler調(diào)度器一起使用。
Hadoop 安裝后默認(rèn)配置了名稱為default的隊(duì)列,這個(gè)隊(duì)列是必須的。隊(duì)列名稱可以在hadoop配置文件中mapred.queue.names屬性里配置。一些作業(yè)調(diào)度器比如Capacity Scheduler支持多個(gè)隊(duì)列。
作業(yè)可以通過mapred.job.queue.name或者通過setQueueName(Stirng)設(shè)置隊(duì)列名稱,這是可選的。如果作業(yè)名稱沒有設(shè)置隊(duì)列名稱,則提交到default的隊(duì)列中。
計(jì)數(shù)器Counters
計(jì)數(shù)器用來描述MR中所有的計(jì)數(shù),可以是MR框架定義也可以是應(yīng)用提供。每個(gè)計(jì)數(shù)器可以是任何Enum類型。一組特定的Enum被聚合成一個(gè)組即為Counters.Group.
應(yīng)用可以定義Enum類型的計(jì)數(shù)器,并可以通過Reporter.incrCounter(Enum,long或者Reporter.incrCounter(String,String,long在map和或者reduce中更新這個(gè)計(jì)數(shù)器的值。然后這些計(jì)數(shù)器統(tǒng)一被框架聚合。
DistributedCache
DistributedCache可以高效地將應(yīng)用明細(xì)、大的只讀文件發(fā)布。DistributedCache是MR框架提供的緩存文件(如文本、壓縮包、jar包等)的高效工具。
應(yīng)用需要在JobConf里使用hdfs://指定地址進(jìn)行緩存。這些文件必須在文件系統(tǒng)中已經(jīng)存在。
MR作業(yè)的任務(wù)在某個(gè)節(jié)點(diǎn)上執(zhí)行前,MR框架會(huì)拷貝必需的文件到這個(gè)節(jié)點(diǎn)。特別說明的是,作業(yè)必需的所有文件只需要拷貝一次,支持壓縮包,并在各個(gè)節(jié)點(diǎn)上解壓,有助于提高M(jìn)R執(zhí)行效率。
DistributedCache會(huì)跟蹤所有緩存文件的修改時(shí)間戳。作業(yè)執(zhí)行過程中應(yīng)用或者外部不應(yīng)該修改被緩存的文件。
通常,DistributedCache被用來緩存簡(jiǎn)單、只讀的數(shù)據(jù)或者文本文件以及像壓縮包/jar包等復(fù)雜文件。壓縮包比如zip\tar\tgz\tar.gz文件到節(jié)點(diǎn)上被解壓。所有文件都有執(zhí)行權(quán)限。
屬性 mapred.cache.{files|archives}配置的文件和包等可以被發(fā)布,多個(gè)文件可以使用“,”來分隔。也可以調(diào)用 DistributedCache.addCacheFile(URI,conf)或者DistributedCache.addCacheArchive(URI,conf) ,DistributedCache.setCacheFiles(URIs,conf)/DistributedCache.setCacheArchives(URIs,conf) 設(shè)置,其中URI為hdfs://host:port/absolute-path#link-name形式。如果使用streaming方式,可以通過 -cacheFile/-cacheArchive來設(shè)置。
另外,用戶也可以通過DistributedCache.createSymlink(Configuration)?API 將分布式緩存的文件使用符號(hào)鏈接到當(dāng)前工作路徑。或者也可以通過設(shè)置mapred.create.symlink 為yes。這樣分布式緩存將URI的最末部分作為符號(hào)鏈接。例如hdfs://namenode:port/lib.so.1#lib.so 的符號(hào)鏈接為lib.so.
分布式緩存也可以用作在map,reduce階段中基本的軟件分布式處理機(jī)制,可以用來分布式處理jar包或者本地庫。DistributedCache.addArchiveToClassPath(Path, Configuration)或者DistributedCache.addFileToClassPath(Path, Configuration)用來將需要緩存的文件或者jar包加入到作業(yè)運(yùn)行的子jvm的classpath中。也可以通過mapred.job.classpath.{files|archives}來設(shè)置。同樣這些分布式緩存的文件或jar包也可以使用符號(hào)鏈接到當(dāng)前工作路徑,當(dāng)然也支持本地庫并可被作業(yè)在執(zhí)行中加載。
工具類(Tool)
Tool接口支持常用的Hadoop命令行參數(shù)的處理操作。它是MR工具或者應(yīng)用的標(biāo)準(zhǔn)工具。應(yīng)用應(yīng)該通過ToolRunner.run(Tool,Stirng[])將標(biāo)準(zhǔn)命令行的參數(shù)使用GenericOptionsParser來處理。只處理應(yīng)用的自定義參數(shù)。
Hadoop中命令行里通用的參數(shù)有:
| 1 2 3 4 | -conf -D -fs -jt |
IsolationRunner
IsolationRunner是MR應(yīng)用的調(diào)試工具。
要使用它,需要先設(shè)置keep.failed.task.files = true,另外請(qǐng)注意屬性keep.tasks.files.pattern。
下一步,到失敗作業(yè)的某個(gè)失敗節(jié)點(diǎn)上,cd到TT的本地路徑并運(yùn)行IsolationRunner。
| 1 2 | $ cd /taskTracker/${taskid}/work $ hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml |
IsolationRunner 會(huì)再單獨(dú)的jvm中對(duì)相同的輸入上運(yùn)行失敗的任務(wù)并可以debug。
性能分析(Profiling)
Profiling可以用來對(duì)作業(yè)的map,reduce任務(wù)使用java內(nèi)置的性能分析工具進(jìn)行采樣,得出具有代表性(2個(gè)或3個(gè)試樣)的采樣結(jié)果。
用戶可以通過設(shè)置mapred.task.profile屬性指定框架是否對(duì)作業(yè)的任務(wù)收集性能信息。也可以通過API JobConf.setProfileEnabled(boolean)來指定。如果值為true,任務(wù)性能分析就啟用了。采樣結(jié)果保存在用戶日志目錄中,默認(rèn)性能采樣不開啟。
一旦需要性能分析,可以通過屬性mapred.task.profile.{maps|reduces} 設(shè)置采樣任務(wù)的范圍。使用JobConf.setProfileTaskRange(boolean,String)一樣可以設(shè)置。默認(rèn)的range是0-2.
可以設(shè)置mapred.task.profile.params確定分析參數(shù),api方式為 JobConf.setProfileParams(String)。如果參數(shù)中含有“%s”占位符,MR框架會(huì)使用采樣結(jié)果輸出文件名替代它。這些參數(shù)被傳到子JVM.默認(rèn)值為-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s.
調(diào)試工具(debugging)
MR框架支持用戶自定義腳本調(diào)試工具。例如當(dāng)MR任務(wù)失敗時(shí),用戶可以運(yùn)行debug腳本處理任務(wù)日志。腳本可以訪問任務(wù)的輸出和錯(cuò)誤日志,系統(tǒng)日志以及作業(yè)配置信息。調(diào)試腳本輸出和錯(cuò)誤流作為作業(yè)輸出一部分顯示到控制臺(tái)診斷信息中。
下面講討論如何為作業(yè)提交調(diào)試腳本。當(dāng)然腳本需要分發(fā)并提交到Hadoop中MR中。
如何分發(fā)腳本文件
使用DistributedCache分發(fā)并且為腳本文件建立符號(hào)鏈接。
如何提交腳本
一種便捷的途徑是通過設(shè)置mapred.map.task.debug.script和mapred.reduce.task.debug.script分別實(shí)現(xiàn)對(duì)map和reduce指定調(diào)試腳本。也可以通過API方式,分別是JobConf.setMapDebugScript(String)和JobConf.setReduceDebugScript(String)。如果是streaming方式,可以通過命令行里指定-mapdebug 和-reducedebug實(shí)現(xiàn)。
腳本的參數(shù)是任務(wù)的輸出流、錯(cuò)誤流、系統(tǒng)日志以及job配置文件。調(diào)試模式下,以下命令在任務(wù)失敗的節(jié)點(diǎn)上運(yùn)行:
| 1 | $script $stdout $stderr $syslog $jobconf |
Pipes程序在命令形式上以第五個(gè)參數(shù)傳入。如:
| 1 | $script $stdout $stderr $syslog $jobconf $program |
默認(rèn)行為
對(duì)pipes,使用默認(rèn)的腳本以gdb方式處理核心轉(zhuǎn)儲(chǔ)(core dumps),打印棧軌跡,并給出正在運(yùn)行線程的信息。
JobControl
JobControl可以對(duì)一組MR作業(yè)和他們的依賴環(huán)境進(jìn)行集中管理。
數(shù)據(jù)壓縮
Hadoop Mapreduce可以對(duì)中間結(jié)果(map輸出)和作業(yè)最終輸出以指定壓縮方式壓縮。通常都會(huì)啟用中間結(jié)果的壓縮,因?yàn)榭梢燥@著提高作業(yè)運(yùn)行效率并且不需要對(duì)作業(yè)本身進(jìn)行改變。只有MR shuffle階段生成的臨時(shí)數(shù)據(jù)文件被壓縮(最終結(jié)果可能或者不被壓縮)。
Hadoop支持zlib,gzip,snappy壓縮算法。推薦Snappy,因?yàn)槠鋲嚎s和解壓相比其他算法要高效。
因?yàn)樾阅芎蚸ava庫不可用原因,Hadoop也提供本地的zlib和gzip實(shí)現(xiàn)。詳情可以參考這個(gè)文檔
中間輸出物
應(yīng)用可以通過JobConf.setCOmpressMapOutput(boolean)設(shè)置是否對(duì)中間結(jié)果進(jìn)行壓縮,使用JobConf.setMapOutputCompressorClass(Class)設(shè)置壓縮算法。
作業(yè)輸出
通過FileOutputFormat.setCompressOutput(JobConf,boolean)設(shè)置是否壓縮最終產(chǎn)出物,通過FileOutputFormat.setOutputComressorClass(JobConf,Class)設(shè)置壓縮方式。
如果作業(yè)輸出以SequenceFileOutputFormat格式存儲(chǔ)。則可以通過SequenceFileOutputFormat.setOutputCompressionType(Jobconf,SequenceFile.CompressionType)API 設(shè)定壓縮方式.壓縮方式有RECORD/BLOCK默認(rèn)是RECORD。
跳過損壞的記錄
Hadoop提供了一個(gè)選項(xiàng),在MR處理map階段時(shí)跳過被損壞的輸入記錄。應(yīng)用可以通過SkipBadRecords類使用這個(gè)特性。
作業(yè)處理時(shí)可能對(duì)確定的輸入集上map任務(wù)會(huì)失敗。通常是map函數(shù)存在bug,這時(shí)需要fix這些bug。但有時(shí)卻無法解決這種特殊情況。比如這個(gè)bug可能是第三方庫導(dǎo)致的。這時(shí)這些任務(wù)在經(jīng)過若干嘗試后仍然無法成功完成,作業(yè)失敗。這時(shí)跳過這些記錄集,對(duì)作業(yè)最終結(jié)果影響不大,仍然可以接受(比如對(duì)非常大的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)時(shí))。
默認(rèn)該特性沒有開啟。可以通過SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration,long)開啟。
開啟后,框架對(duì)一定數(shù)目的map失敗使用skipping 模式。更多信息可以查看SkipBadRecords.setAttemptsToStartSkipping(Configuration, int)。skipping模式中,map任務(wù)維護(hù)一個(gè)正被處理的記錄范圍值。這個(gè)過程框架通過被處理的記錄行數(shù)計(jì)數(shù)器實(shí)現(xiàn)。可以了解SkipBadRecords.COUNTERMAPPROCESSED_RECORDS和SkipBadRecords.COUNTERREDUCEPROCESSED_GROUPS。這個(gè)計(jì)數(shù)器讓框架知道多少行記錄被成功處理了,哪些范圍的記錄導(dǎo)致了map的失敗,當(dāng)然這些壞記錄就被跳過了。
被跳過的記錄數(shù)目依賴應(yīng)用被處理的記錄計(jì)數(shù)器統(tǒng)計(jì)頻率。建議當(dāng)每行記錄被處理時(shí)就增加該計(jì)數(shù)器。但是這個(gè)在一些應(yīng)用中無法實(shí)現(xiàn)。這時(shí)框架可能也跳過了壞記錄附近的記錄。用戶可以通過SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration,long)控制被跳過的記錄數(shù)。框架會(huì)使用類似二分搜索的方式努力去減小被跳過的記錄范圍。被跳過的范圍會(huì)分為2部分,并對(duì)其中的一部分執(zhí)行。一旦失敗了,框架可以知道這部分含有損壞的記錄。任務(wù)將重新執(zhí)行直到遇到可接受的數(shù)據(jù)或者所有嘗試機(jī)會(huì)用完。如果要提高任務(wù)嘗試次數(shù),可以通過JobConf.setMaxMapAttampts(int)和JobConf.setMaxReduceAttempts(int)設(shè)置。
被跳過的記錄隨后會(huì)以sequence file格式寫入hdfs以便于后面可能的分析。路徑可以通過SkipBadRecords.setSkipOutputPath(JobConf,Path)設(shè)定。
?
http://www.importnew.com/4736.html
轉(zhuǎn)載于:https://my.oschina.net/xiaominmin/blog/1597339
總結(jié)
以上是生活随笔為你收集整理的MR作业的提交监控、输入输出控制及特性使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 0x3f_Java源码位操作技
- 下一篇: BZOJ1911 特别行动队