日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

3 MapReduce计算模型

發(fā)布時(shí)間:2023/11/30 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 3 MapReduce计算模型 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

MapReduce被廣泛應(yīng)用于日志分析、海量數(shù)據(jù)排序、在海量數(shù)據(jù)中查找特定模式等場(chǎng)景中。


MapReduceJob

在Hadoop中,每個(gè)MapReduce任務(wù)都被初始化為一個(gè)Job。

每個(gè)Job又可以分為兩個(gè)階段:Map階段和Reduce階段。這兩個(gè)階段分別用Map函數(shù)和Reduce函數(shù)來(lái)表示。

Map函數(shù)接收一個(gè)<key,value>形式的輸入,然后產(chǎn)生另一種<key,value>的中間輸出,Hadoop負(fù)責(zé)將所有具有相同中間key值的value集合到一起傳遞給Reduce函數(shù);Reduce函數(shù)接收一個(gè)如<key,(list of values)>形式的輸入,然后對(duì)這個(gè)value集合進(jìn)行處理并輸出結(jié)果,Reduce的輸出也是<key,value>形式的。


InputFormat()和InputSplit

InputSplit是Hadoop中用來(lái)把輸入數(shù)據(jù)傳送給每個(gè)單獨(dú)的Map,InputSplit存儲(chǔ)的并非數(shù)據(jù)本身,而是起始位置、分片長(zhǎng)度和一個(gè)記錄數(shù)據(jù)所在主機(jī)的數(shù)組。生成InputSplit的方法可以通過(guò)InputFormat()來(lái)設(shè)置。當(dāng)數(shù)據(jù)傳送給Map時(shí),Map會(huì)將輸入分片傳送到InputFormat()上,InputFormat調(diào)用getRecordReader()方法生成RecordReader,RecordReader在通過(guò)creatKey()、createValue()方法將InputSplit創(chuàng)建成可供Map處理的<key,value>對(duì)。即,InputFormat()方法是用來(lái)生成可供Map處理的<key,value>對(duì)的。

InputFormat

? ? BaileyBorweinPlouffe.BbpInputFormat

? ? ComposableInputFormat

? ? CompositeInputFormat

? ? DBInputFormat

? ? DistSum.Machine.AbstractInputFormat

? ? FileInputFormat

? ? ? ? CombineFileInputFormat

? ? ? ? KeyValueTextInputFormat

? ? ? ? NLineInputFormat

? ? ? ? SequenceFileInputFormat

? ? ? ? TeraInputFormat

? ? ? ? TextInputFormat


TextInputFormat是Hadoop默認(rèn)的輸入方式。在TextInputFormat中,每個(gè)文件(或其一部分)都會(huì)單獨(dú)作為Map的輸入,而這是繼承自FileInputFormat的。之后,每行數(shù)據(jù)都會(huì)生成一條記錄,每條記錄則表示成<key,value>形式。

key值是每個(gè)數(shù)據(jù)記錄在數(shù)據(jù)分片中的字節(jié)偏移量,數(shù)據(jù)類(lèi)型是LongWritable;

value值是每行的內(nèi)容,數(shù)據(jù)類(lèi)型是Text。

如:

file1:

0 hello world bye world

file2:

0 hello hadoop bye hadoop

兩個(gè)文件都會(huì)被單獨(dú)輸入到一個(gè)Map中,因此它們的值都是0。


OutputFormat()

默認(rèn)的輸出格式是TextOutputFormat,每條記錄以一行的形式存入文本文件,鍵和值是任意形式的,程序內(nèi)部調(diào)用toString()方法將鍵和值轉(zhuǎn)換為String類(lèi)型再輸出。


Map函數(shù)和Reduce函數(shù):

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] words = StringUtils.split(value.toString(), ' ');for(String w :words){context.write(new Text(w), new IntWritable(1));}} }

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {int sum =0;for(IntWritable i: values){sum=sum+i.get();}arg2.write(key, new IntWritable(sum));} } public class RunJob {public static void main(String[] args) {Configuration config =new Configuration(); // config.set("fs.defaultFS", "hdfs://node1:8020"); // config.set("yarn.resourcemanager.hostname", "node1");config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("wc");job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/wc");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completion");}} catch (Exception e) {e.printStackTrace();}} }

注意兩種情況:

1、Reduce Task的數(shù)量可以由程序指定,當(dāng)存在多個(gè)Reduce Task時(shí),每個(gè)Reduce會(huì)搜集一個(gè)或多個(gè)key值。當(dāng)存在多個(gè)Reduce Task時(shí),每個(gè)Reduce Task都會(huì)生成一個(gè)輸出文件;

2、沒(méi)有Reduce任務(wù)的時(shí)候,系統(tǒng)會(huì)直接將Map的輸出結(jié)果作為最終結(jié)果,有多少個(gè)Map就有多少個(gè)輸出。


MapReduce任務(wù)的優(yōu)化

如何完成這個(gè)任務(wù),怎么能讓程序運(yùn)行的更快。

MapReduce計(jì)算模型的優(yōu)化主要集中在兩個(gè)方面:計(jì)算性能方面;IO操作方面。


1、任務(wù)調(diào)度;

計(jì)算方面:優(yōu)先將任務(wù)分配給空閑機(jī)器;

IO方面:盡量將Map任務(wù)分配給InputSplit所在的機(jī)器。


2、數(shù)據(jù)預(yù)處理與InputSplit的大小

MapReduce擅長(zhǎng)處理少量的大數(shù)據(jù),在處理大量的小數(shù)據(jù)時(shí)性能會(huì)很遜色。

因此在提交MapReduce任務(wù)前可以先對(duì)數(shù)據(jù)進(jìn)行一次預(yù)處理,將數(shù)據(jù)合并以提高M(jìn)apReduce任務(wù)的執(zhí)行效率。

另一方面是參考Map任務(wù)的運(yùn)行時(shí)間,當(dāng)一個(gè)Map任務(wù)只需要運(yùn)行幾秒就可以結(jié)束時(shí),就需要考慮是否應(yīng)該給它分配更多的數(shù)據(jù)。通常而言,一個(gè)Map任務(wù)的運(yùn)行時(shí)間在一分鐘左右比較合適。

在FileInputFormat中(除了CombineFileInputFormat),Hadoop會(huì)在處理每個(gè)Block后將其作為一個(gè)InputSplit,因此合理地設(shè)置block塊大小是很重要的調(diào)節(jié)方式。


3、Map和Reduce任務(wù)的數(shù)量

Map/Reduce任務(wù)槽:集群能夠同時(shí)運(yùn)行的Map/Reduce任務(wù)的最大數(shù)量。

如100臺(tái)機(jī)器,每臺(tái)最多同時(shí)運(yùn)行10個(gè)Map和5個(gè)Reduce,則Map任務(wù)槽為1000,Reduce任務(wù)槽為500。

設(shè)置Map任務(wù)的數(shù)量主要參考的是Map的運(yùn)行時(shí)間,設(shè)置Reduce任務(wù)的數(shù)量主要參考的是Reduce槽的數(shù)量。

Reduce任務(wù)槽的0.95倍,如果一個(gè)Reduce任務(wù)失敗,可以很快找到一個(gè)空閑的機(jī)器重新執(zhí)行;

Reduce任務(wù)槽的1.75倍,執(zhí)行快的機(jī)器可以獲得更多的Reduce任務(wù),因此可以使負(fù)載更加均衡,以提高任務(wù)的處理速度。


4、Combine函數(shù)

用于本地合并數(shù)據(jù),以減少網(wǎng)絡(luò)IO操作的消耗。

合理的設(shè)計(jì)combine函數(shù)會(huì)有效減少網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量,提高M(jìn)apReduce的效率。

job.setCombinerClass(combine.class);

在WordCount中,可以指定Reduce類(lèi)為combine函數(shù):

job.setCombinerClass(Reduce.class);


5、壓縮

可以選擇對(duì)Map的輸出和最終的輸出結(jié)果進(jìn)行不同壓縮方式的壓縮。

在一些情況下,Map的中間輸出可能會(huì)很大,對(duì)其進(jìn)行壓縮可以有效地減少網(wǎng)絡(luò)上的數(shù)據(jù)傳輸量。


6、自定義comparator

自定義Hadoop數(shù)據(jù)類(lèi)型時(shí),推薦自定義comparator來(lái)實(shí)現(xiàn)數(shù)據(jù)的比較,這樣可以省去數(shù)據(jù)序列化和反序列化的時(shí)間,提高程序的運(yùn)行效率。






總結(jié)

以上是生活随笔為你收集整理的3 MapReduce计算模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。