3 MapReduce计算模型
MapReduce被廣泛應(yīng)用于日志分析、海量數(shù)據(jù)排序、在海量數(shù)據(jù)中查找特定模式等場(chǎng)景中。
MapReduceJob
在Hadoop中,每個(gè)MapReduce任務(wù)都被初始化為一個(gè)Job。
每個(gè)Job又可以分為兩個(gè)階段:Map階段和Reduce階段。這兩個(gè)階段分別用Map函數(shù)和Reduce函數(shù)來(lái)表示。
Map函數(shù)接收一個(gè)<key,value>形式的輸入,然后產(chǎn)生另一種<key,value>的中間輸出,Hadoop負(fù)責(zé)將所有具有相同中間key值的value集合到一起傳遞給Reduce函數(shù);Reduce函數(shù)接收一個(gè)如<key,(list of values)>形式的輸入,然后對(duì)這個(gè)value集合進(jìn)行處理并輸出結(jié)果,Reduce的輸出也是<key,value>形式的。
InputFormat()和InputSplit
InputSplit是Hadoop中用來(lái)把輸入數(shù)據(jù)傳送給每個(gè)單獨(dú)的Map,InputSplit存儲(chǔ)的并非數(shù)據(jù)本身,而是起始位置、分片長(zhǎng)度和一個(gè)記錄數(shù)據(jù)所在主機(jī)的數(shù)組。生成InputSplit的方法可以通過(guò)InputFormat()來(lái)設(shè)置。當(dāng)數(shù)據(jù)傳送給Map時(shí),Map會(huì)將輸入分片傳送到InputFormat()上,InputFormat調(diào)用getRecordReader()方法生成RecordReader,RecordReader在通過(guò)creatKey()、createValue()方法將InputSplit創(chuàng)建成可供Map處理的<key,value>對(duì)。即,InputFormat()方法是用來(lái)生成可供Map處理的<key,value>對(duì)的。
InputFormat
? ? BaileyBorweinPlouffe.BbpInputFormat
? ? ComposableInputFormat
? ? CompositeInputFormat
? ? DBInputFormat
? ? DistSum.Machine.AbstractInputFormat
? ? FileInputFormat
? ? ? ? CombineFileInputFormat
? ? ? ? KeyValueTextInputFormat
? ? ? ? NLineInputFormat
? ? ? ? SequenceFileInputFormat
? ? ? ? TeraInputFormat
? ? ? ? TextInputFormat
TextInputFormat是Hadoop默認(rèn)的輸入方式。在TextInputFormat中,每個(gè)文件(或其一部分)都會(huì)單獨(dú)作為Map的輸入,而這是繼承自FileInputFormat的。之后,每行數(shù)據(jù)都會(huì)生成一條記錄,每條記錄則表示成<key,value>形式。
key值是每個(gè)數(shù)據(jù)記錄在數(shù)據(jù)分片中的字節(jié)偏移量,數(shù)據(jù)類(lèi)型是LongWritable;
value值是每行的內(nèi)容,數(shù)據(jù)類(lèi)型是Text。
如:
file1:
0 hello world bye world
file2:
0 hello hadoop bye hadoop
兩個(gè)文件都會(huì)被單獨(dú)輸入到一個(gè)Map中,因此它們的值都是0。
OutputFormat()
默認(rèn)的輸出格式是TextOutputFormat,每條記錄以一行的形式存入文本文件,鍵和值是任意形式的,程序內(nèi)部調(diào)用toString()方法將鍵和值轉(zhuǎn)換為String類(lèi)型再輸出。
Map函數(shù)和Reduce函數(shù):
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] words = StringUtils.split(value.toString(), ' ');for(String w :words){context.write(new Text(w), new IntWritable(1));}} }public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {int sum =0;for(IntWritable i: values){sum=sum+i.get();}arg2.write(key, new IntWritable(sum));} } public class RunJob {public static void main(String[] args) {Configuration config =new Configuration(); // config.set("fs.defaultFS", "hdfs://node1:8020"); // config.set("yarn.resourcemanager.hostname", "node1");config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("wc");job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/wc");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completion");}} catch (Exception e) {e.printStackTrace();}} }
注意兩種情況:
1、Reduce Task的數(shù)量可以由程序指定,當(dāng)存在多個(gè)Reduce Task時(shí),每個(gè)Reduce會(huì)搜集一個(gè)或多個(gè)key值。當(dāng)存在多個(gè)Reduce Task時(shí),每個(gè)Reduce Task都會(huì)生成一個(gè)輸出文件;
2、沒(méi)有Reduce任務(wù)的時(shí)候,系統(tǒng)會(huì)直接將Map的輸出結(jié)果作為最終結(jié)果,有多少個(gè)Map就有多少個(gè)輸出。
MapReduce任務(wù)的優(yōu)化
如何完成這個(gè)任務(wù),怎么能讓程序運(yùn)行的更快。
MapReduce計(jì)算模型的優(yōu)化主要集中在兩個(gè)方面:計(jì)算性能方面;IO操作方面。
1、任務(wù)調(diào)度;
計(jì)算方面:優(yōu)先將任務(wù)分配給空閑機(jī)器;
IO方面:盡量將Map任務(wù)分配給InputSplit所在的機(jī)器。
2、數(shù)據(jù)預(yù)處理與InputSplit的大小
MapReduce擅長(zhǎng)處理少量的大數(shù)據(jù),在處理大量的小數(shù)據(jù)時(shí)性能會(huì)很遜色。
因此在提交MapReduce任務(wù)前可以先對(duì)數(shù)據(jù)進(jìn)行一次預(yù)處理,將數(shù)據(jù)合并以提高M(jìn)apReduce任務(wù)的執(zhí)行效率。
另一方面是參考Map任務(wù)的運(yùn)行時(shí)間,當(dāng)一個(gè)Map任務(wù)只需要運(yùn)行幾秒就可以結(jié)束時(shí),就需要考慮是否應(yīng)該給它分配更多的數(shù)據(jù)。通常而言,一個(gè)Map任務(wù)的運(yùn)行時(shí)間在一分鐘左右比較合適。
在FileInputFormat中(除了CombineFileInputFormat),Hadoop會(huì)在處理每個(gè)Block后將其作為一個(gè)InputSplit,因此合理地設(shè)置block塊大小是很重要的調(diào)節(jié)方式。
3、Map和Reduce任務(wù)的數(shù)量
Map/Reduce任務(wù)槽:集群能夠同時(shí)運(yùn)行的Map/Reduce任務(wù)的最大數(shù)量。
如100臺(tái)機(jī)器,每臺(tái)最多同時(shí)運(yùn)行10個(gè)Map和5個(gè)Reduce,則Map任務(wù)槽為1000,Reduce任務(wù)槽為500。
設(shè)置Map任務(wù)的數(shù)量主要參考的是Map的運(yùn)行時(shí)間,設(shè)置Reduce任務(wù)的數(shù)量主要參考的是Reduce槽的數(shù)量。
Reduce任務(wù)槽的0.95倍,如果一個(gè)Reduce任務(wù)失敗,可以很快找到一個(gè)空閑的機(jī)器重新執(zhí)行;
Reduce任務(wù)槽的1.75倍,執(zhí)行快的機(jī)器可以獲得更多的Reduce任務(wù),因此可以使負(fù)載更加均衡,以提高任務(wù)的處理速度。
4、Combine函數(shù)
用于本地合并數(shù)據(jù),以減少網(wǎng)絡(luò)IO操作的消耗。
合理的設(shè)計(jì)combine函數(shù)會(huì)有效減少網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量,提高M(jìn)apReduce的效率。
job.setCombinerClass(combine.class);
在WordCount中,可以指定Reduce類(lèi)為combine函數(shù):
job.setCombinerClass(Reduce.class);
5、壓縮
可以選擇對(duì)Map的輸出和最終的輸出結(jié)果進(jìn)行不同壓縮方式的壓縮。
在一些情況下,Map的中間輸出可能會(huì)很大,對(duì)其進(jìn)行壓縮可以有效地減少網(wǎng)絡(luò)上的數(shù)據(jù)傳輸量。
6、自定義comparator
自定義Hadoop數(shù)據(jù)類(lèi)型時(shí),推薦自定義comparator來(lái)實(shí)現(xiàn)數(shù)據(jù)的比較,這樣可以省去數(shù)據(jù)序列化和反序列化的時(shí)間,提高程序的運(yùn)行效率。
總結(jié)
以上是生活随笔為你收集整理的3 MapReduce计算模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: HadoopHA集群搭建
- 下一篇: 4 开发MapReduce应用程序