Hadoop:The Definitive Guid 总结 Chapter 1~2 初识Hadoop、MapReduce
?
1.數(shù)據(jù)存儲(chǔ)與分析
問(wèn)題:當(dāng)磁盤的存儲(chǔ)量隨著時(shí)間的推移越來(lái)越大的時(shí)候,對(duì)磁盤上的數(shù)據(jù)的讀取速度卻沒(méi)有多大的增長(zhǎng)
從多個(gè)磁盤上進(jìn)行并行讀寫(xiě)操作是可行的,但是存在以下幾個(gè)方面的問(wèn)題:
1).第一個(gè)問(wèn)題是硬件錯(cuò)誤。使用的硬件越多出錯(cuò)的幾率就越大。一種常用的解決方式是數(shù)據(jù)冗余,保留多分拷貝,即使一份數(shù)據(jù)處理出錯(cuò),還有另外的數(shù)據(jù)。HDFS使用的也是類似的方式,但稍有不同。
2).第二個(gè)問(wèn)題是數(shù)據(jù)處理的相關(guān)性問(wèn)題。例如很多分析工作在一快磁盤上處理出來(lái)的結(jié)果需要與其他磁盤上處理處理出來(lái)的結(jié)果合并才能完成任務(wù)。各種分布式系統(tǒng)也都給出了合并的策略,但是做好這方面確實(shí)是一個(gè)挑戰(zhàn)。MapReduce提供了一種編程模型,他將從硬盤上讀寫(xiě)數(shù)據(jù)的問(wèn)題抽象出來(lái),轉(zhuǎn)化成對(duì)一系列鍵值對(duì)的計(jì)算
簡(jiǎn)而言之,Hadoop提供了一個(gè)可靠的存儲(chǔ)和分析系統(tǒng)。存儲(chǔ)又HDFS提供,分析由MapReduce提供。
?
2.與其他系統(tǒng)比較
1).RDBMS
為什么需要MapReduce?
a.磁盤的尋道時(shí)間提高的速度低于數(shù)據(jù)的傳輸速度,如果數(shù)據(jù)訪問(wèn)模式由尋道時(shí)間支配的話,在讀寫(xiě)數(shù)據(jù)集的一大部分的時(shí)候速度就會(huì)較流式讀取慢很多,這樣就出現(xiàn)了瓶頸。
b.另一方面在更新數(shù)據(jù)集的少量數(shù)據(jù)的時(shí)候,傳統(tǒng)的B-樹(shù)工作的比較好,但是在更新數(shù)據(jù)集的大部分?jǐn)?shù)據(jù)的時(shí)候B-樹(shù)就顯得比MapReduce方式慢了。MapReduce使用排序/合并操作去重建數(shù)據(jù)庫(kù)(完成數(shù)據(jù)更新).
c.MapReduce比較適合于需要分析整個(gè)數(shù)據(jù)集,并且要使用批處理方式,特別是特定的分析的情況;RDBMS點(diǎn)查詢方面占優(yōu)勢(shì),或在已編制索引的數(shù)據(jù)集提供低延遲的檢索和更新的數(shù)據(jù),但是數(shù)據(jù)量不能太大。MapReduce適合一次寫(xiě)入,多次讀取的操作,但是關(guān)系數(shù)據(jù)庫(kù)就比較適合對(duì)數(shù)據(jù)集的持續(xù)更新。
d.MapReduce比較適合處理半結(jié)構(gòu)化,非結(jié)構(gòu)化的數(shù)據(jù)
e.MapReduce是可以進(jìn)行線性擴(kuò)展的編程模型。一個(gè)對(duì)集群級(jí)別的數(shù)據(jù)量而寫(xiě)的MapReduce可以不加修改的應(yīng)用于小數(shù)據(jù)量或者更大數(shù)據(jù)量的處理上。更重要的是當(dāng)你的輸入數(shù)據(jù)增長(zhǎng)一倍的時(shí)候,相應(yīng)的處理時(shí)間也會(huì)增加一倍。但是如果你把集群也增長(zhǎng)一倍的話,處理的速度則會(huì)和沒(méi)有增加數(shù)據(jù)量時(shí)候的速度一樣快,這方面對(duì)SQL查詢來(lái)說(shuō)不見(jiàn)得是正確的。
f.關(guān)系數(shù)據(jù)往往進(jìn)行規(guī)則化以保證數(shù)據(jù)完整性,并刪除冗余。這樣做給MapReduce提出了新的問(wèn)題:它使得讀數(shù)據(jù)變成了非本地執(zhí)行,而MapReduce的一個(gè)重要前提(假設(shè))就是數(shù)據(jù)可以進(jìn)行高速的流式讀寫(xiě)。
?
2).Grid Compuing 網(wǎng)格計(jì)算
a.MapReduce使數(shù)據(jù)和計(jì)算在一個(gè)節(jié)點(diǎn)上完成,這樣就變成了本地的讀取。這是MapReduce高性能的核心.
b.MPI將控制權(quán)大大的交給了程序員,但是這就要求程序員明確的處理數(shù)據(jù)流等情況,而MapReduce只提供高層次的操作:程序員只需考慮處理鍵值對(duì)的函數(shù),而對(duì)數(shù)據(jù)流則是比較隱晦的。
c.MapReduce是一種非共享(Shared-nothing)的架構(gòu),當(dāng)MapReduce實(shí)現(xiàn)檢測(cè)到map或者reduce過(guò)程出錯(cuò)的時(shí)候,他可以將錯(cuò)誤的部分再執(zhí)行一次。MPI程序員則需要明確的考慮檢查點(diǎn)和恢復(fù),這雖然給程序員很大自由,但是也使得程序變得難寫(xiě)。
?
3).志愿計(jì)算
MapReduce是針對(duì)在一個(gè)高聚合網(wǎng)絡(luò)連接的數(shù)據(jù)中心中進(jìn)行的可信的、使用專用的硬件工作持續(xù)數(shù)分鐘或者數(shù)個(gè)小時(shí)而設(shè)計(jì)的。相比之下,志愿計(jì)算則是在不可信的、鏈接速度有很大差異的、沒(méi)有數(shù)據(jù)本地化特性的,互聯(lián)網(wǎng)上的計(jì)算機(jī)上運(yùn)行永久的(超長(zhǎng)時(shí)間的)計(jì)算,
?
3.天氣數(shù)據(jù)集
數(shù)據(jù)是NCDC的數(shù)據(jù),我們關(guān)注以下特點(diǎn):
1)? 數(shù)據(jù)是半格式化的
2)? 目錄里面存放的是從1901-2001年一個(gè)世紀(jì)的記錄,是gzip壓縮過(guò)的文件。
3)? 以行為單位,使用ASCII格式存儲(chǔ),每行就是一條記錄
4)? 每條記錄我們關(guān)注一些基本的元素,比如溫度,這些數(shù)據(jù)在每條數(shù)據(jù)中都會(huì)出現(xiàn),并且寬度也是固定的。
下面是一條記錄的格式,為了便于顯示,做了一部分調(diào)整。
?
?
?
4.使用Unix工具分析數(shù)據(jù)
以分析某年份的最高溫度為例,下面是一段Unix的腳本程序:
#!/usr/bin/env bash for year in all/* doecho -ne `basename $year .gz`"\t"gunzip -c $year | \awk '{ temp = substr($0, 88, 5) + 0;q = substr($0, 93, 1);if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }END { print max }' done這段腳本的執(zhí)行過(guò)程如下:
腳本循環(huán)處理每一個(gè)壓縮的年份文件,首先打印出年份,然后對(duì)每一個(gè)文件使用awk處理。Awk腳本從數(shù)據(jù)中解析出兩個(gè)字段:一個(gè)air temperature,一個(gè)quality code。air temperature值加0被轉(zhuǎn)換成整形。接下來(lái)查看溫度數(shù)據(jù)是否有效(9999表示在NCDC數(shù)據(jù)集中丟失的值),并且檢查quality code是不是可信并沒(méi)有錯(cuò)誤的。如果讀取一切正常,temp將與目前的最大值比較,如果出現(xiàn)新的最大值,則更新當(dāng)前max的值。當(dāng)文件中所有行的數(shù)據(jù)都被處理之后,開(kāi)始執(zhí)行End程序塊,并且打印出最大值。
下面是某次運(yùn)行結(jié)果的起始部分:
為了加速處理速度,我們將程序的某些部分進(jìn)行并行執(zhí)行。這在理論上是比較簡(jiǎn)單的,我們可以按照年份來(lái)在不同的處理器上執(zhí)行,使用所有可用的硬件線程,但是還是有些問(wèn)題:
1).把任務(wù)切分成相同大小的塊不總是那么容易的。
2).合并單獨(dú)處理出來(lái)的結(jié)果還需要進(jìn)一步的處理
3).人們?nèi)耘f被單機(jī)的處理能力所束縛。
?
5.?使用Hadoop分析數(shù)據(jù)
1).Map和Reduce
MapReduce將工作分為map階段和reduce階段,每個(gè)階段都將鍵值對(duì)作為輸入輸入,鍵值對(duì)的類型可以由程序員選擇。程序員還指定兩個(gè)函數(shù):map和reduce函數(shù)。
Map階段的輸入數(shù)據(jù)是NCDC的原始數(shù)據(jù),我們選擇文本格式輸入,這樣可以把記錄中的每一行作為文本value。Key是當(dāng)前行離開(kāi)始行的偏移量,但是我們并不需要這個(gè)信息,所以忽略不要。
我們的Map函數(shù)比較簡(jiǎn)單,僅僅從輸入中析取出temperature。其中,map函數(shù)僅僅是完成了數(shù)據(jù)的準(zhǔn)備階段,這樣使得reducer函數(shù)可以基于它查找歷年的最高溫度。Map函數(shù)也是一個(gè)很好的過(guò)濾階段,這里可以過(guò)濾掉丟失、置疑、錯(cuò)誤的temperature數(shù)據(jù)。
下面是輸入數(shù)據(jù)樣例:
下面這些行以鍵值對(duì)的方式來(lái)給map函數(shù)處理,其中加粗的是我們需要處理的數(shù)據(jù)
上面的鍵(key)是文件中的行偏移量,map函數(shù)不需要,這里的map函數(shù)的功能僅限于提取年份和氣溫,并將他們作為輸出:
map函數(shù)輸出經(jīng)由mapreduce框架中進(jìn)行進(jìn)一步的處理后,主要需要根據(jù)鍵對(duì)鍵/值對(duì)進(jìn)行排序和分組。經(jīng)過(guò)這一番處理之后,Reduce收來(lái)的結(jié)果如下:
處理這些數(shù)據(jù),reduce所需要做的工作僅僅是遍歷這些數(shù)據(jù),找出最大值,產(chǎn)生最終的輸出結(jié)果:
MapReduce的邏輯數(shù)據(jù)流:
?
?
2).Java MapReduce 程序
這里需要三塊代碼:Map函數(shù)、Reduce函數(shù)、用來(lái)運(yùn)行作業(yè)的main函數(shù)
Map函數(shù)的實(shí)現(xiàn)
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapper extendsMapper<LongWritable, Text, Text, IntWritable> {private static final int MISSING = 9999;@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+') { // parseInt doesn't like leading plus// signsairTemperature = Integer.parseInt(line.substring(88, 92));} else {airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")) {context.write(new Text(year), new IntWritable(airTemperature));}} }Hadoop提供了他自己的基本類型,這些類型為網(wǎng)絡(luò)序列化做了專門的優(yōu)化。可以在org.apache.hadoop.io包中找到他們。比如LongWritable相當(dāng)于Java中的Long,Text相當(dāng)于String而IntWritable在相當(dāng)于Integer。map()方法傳入一個(gè)key和一個(gè)value。我們將Text類型的value轉(zhuǎn)化成Java的String,然后用String的substring方法取出我偶們需要的部分,最后利用context.write按照鍵/值的形式收集數(shù)據(jù)。
?
Reduce函數(shù)的實(shí)現(xiàn)
?
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;public class MaxTemperatureReducer extendsReducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int maxValue = Integer.MIN_VALUE;for (IntWritable value : values) {maxValue = Math.max(maxValue, value.get());}context.write(key, new IntWritable(maxValue));} }Reduce的輸入類型必須是:Text,IntWritable類型。Reduce在本例輸出結(jié)果是Text和IntWritbale類型,year和與其對(duì)應(yīng)的maxValue是經(jīng)過(guò)遍歷、比較之后得到的。
?
負(fù)責(zé)運(yùn)行MapReduce作業(yè)的main函數(shù)
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MaxTemperature {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperature <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperature.class);job.setJobName("Max temperature");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);} }
?
6.數(shù)據(jù)流
1).MapReduce作業(yè)的工作單元由如下組成:輸入數(shù)據(jù)、MapReduce程序、配置信息
2).Hadoop將Task分成兩類:MapTask、ReduceTask.
3).為了控制Hadoop的Job運(yùn)行,Hadoop有兩累節(jié)點(diǎn):一種是jobtracker,一種是tasktracker。Jobtracker通過(guò)調(diào)度tasktracker協(xié)調(diào)所有工作的執(zhí)行。Tasktracker運(yùn)行任務(wù)并將報(bào)告發(fā)送給jobtracker,jobtracker所有工作的進(jìn)度。如果一個(gè)任務(wù)失敗,jobtracker再重新調(diào)度一個(gè)不同的tasktracker進(jìn)行工作。但是在Hadoop 0.23和2.0版本ongoing他們已經(jīng)被Resource Manager、Node Manager和ApplicationMaster所替代,相應(yīng)的資源并被Container所封裝
4).Hadoop將MapReduce的輸入數(shù)據(jù)分成固定大小的分片,成為數(shù)據(jù)分片(input split),然后為每個(gè)分片創(chuàng)建一個(gè)MapTask (分片默認(rèn)64M大小)
5).Hadoop可以通過(guò)在存儲(chǔ)有輸入的數(shù)據(jù)節(jié)點(diǎn)上運(yùn)行相應(yīng)Map任務(wù),提高性能(數(shù)據(jù)本地優(yōu)化),另外Map任務(wù)將其樹(shù)立后的數(shù)據(jù)寫(xiě)到本地磁盤,減少Hadoop分布式Node之間的數(shù)據(jù)傳輸壓力。相比之下任務(wù)沒(méi)有數(shù)據(jù)本地優(yōu)化的優(yōu)勢(shì)-----單個(gè)Reduce任務(wù)的輸入通常來(lái)自所有的map輸出
下圖展現(xiàn)了Data-local(數(shù)據(jù)本地),Rack-local與Off-local Map任務(wù)的區(qū)別:
下圖給出MapReduce的集中執(zhí)行方式數(shù)據(jù)流的情況:
?一個(gè)Reduce的情況:
兩個(gè)Reduce的情況:
無(wú)Reduce情況:
?
?
?
?
?
?
7.Combiner函數(shù)
為了優(yōu)化集群數(shù)據(jù)傳輸,減少M(fèi)ap任務(wù)和Reduce任務(wù)之間的數(shù)據(jù)傳輸,Hadop針對(duì)Map任務(wù)指定一個(gè)Combiner函數(shù)(合并函數(shù))--對(duì)本地鍵(key)相同的做合并處理,實(shí)際上Combiner函數(shù)很像Reduce函數(shù),只不過(guò)運(yùn)行在本地,最后Combiner函數(shù)的輸出作為作為Reduce的函數(shù)的輸入,Hadoop執(zhí)行函數(shù)的順序就變?yōu)镸ap--->Combiner--->Reduce
示例如下:
第一個(gè)Map的輸出:
第二個(gè)Map的輸出:
Reduce函數(shù)被調(diào)用時(shí):
最后輸出結(jié)果:
?
指定一個(gè)Combiner函數(shù),這里用Reduce函數(shù)作為Combiner函數(shù)
?
public class MaxTemperatureWithCombiner {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperatureWithCombiner <input path> "+ "<output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperatureWithCombiner.class);job.setJobName("Max temperature");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);job.setCombinerClass(MaxTemperatureReducer.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);} }
?
8.Hadoop的Streaming
Streaming天生適合用于文本處理,在文本模式下使用時(shí),它有一個(gè)數(shù)據(jù)的行視圖,map的輸入數(shù)據(jù)通過(guò)標(biāo)準(zhǔn)輸入流傳遞給map函數(shù),并且一行一行的傳輸,最后將結(jié)果行寫(xiě)到標(biāo)準(zhǔn)輸出。
map輸出的鍵/值對(duì)是以一個(gè)制表符分隔的行,它以這樣的形式寫(xiě)到標(biāo)準(zhǔn)輸出,reduce函數(shù)的輸入格式相同----通過(guò)制表符來(lái)分隔的鍵/值對(duì)----并通過(guò)標(biāo)準(zhǔn)輸入流進(jìn)行傳輸
1)ruby版本
2)Python版本
?
9.Hadoop的Pipes
?
?
?
?
?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/biyeymyhjob/archive/2012/08/08/2628265.html
總結(jié)
以上是生活随笔為你收集整理的Hadoop:The Definitive Guid 总结 Chapter 1~2 初识Hadoop、MapReduce的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 利用kickstart实现pxe的自动化
- 下一篇: 基于corosync和NFS服务器实现L