生活随笔
收集整理的這篇文章主要介紹了
《Hadoop权威指南》第二章 关于MapReduce
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
《Hadoop權威指南》第二章 關于MapReduce
目錄
使用Hadoop來數據分析橫向擴展
注:《Hadoop權威指南》重點學習摘要筆記
1. 使用Hadoop來數據分析
例如,對氣象數據集進行處理。
1. map和reduce
為了充分利用Hadoop提供的并行處理優勢,需要將查詢表示成MapReduce作業。MapReduce任務過程分成兩個處理階段:map階段和reduce階段。每個階段都以鍵值對作為輸入輸出,類型由程序員指定。程序員需要編寫兩個函數:map函數和reduce函數。map階段的輸入是原始數據,選擇文本格式作為輸入格式,將數據集的每一行作為文本輸入。鍵是某一行起始位置相對于文件起始位置的偏移量。map函數很簡單,因為只對年份和氣溫感興趣,只需要取出這兩個字段數據。map函數只是一個數據準備階段,通過這個方式來準備數據,使reduce函數能夠繼續對它進行處理:即找出每年的最高氣溫。map函數還是一個比較適合去除已損記錄的地方。比如,篩選缺失的、可疑的或錯誤的氣溫數據。為了全面了解map的工作方式,輸入數據的示例數據如下:
這些行以鍵值對的方式作為map函數的輸入:
鍵(key)是文件中的行偏移量,map函數并不需要這個信息,所以將其忽略。map函數的功能僅限于提取年份和氣溫信息(以粗體顯示),并將它們作為輸出(氣溫值已用整數表示):
map函數的輸出由MapReduce框架處理后,最后發送到reduce函數。這個處理過程是基于鍵來對鍵值對僅限排序和分組。因此,在這一示例中,reduce函數看到如下輸入:
每一年后緊跟著一系列氣溫數據。reduce函數需要做的是遍歷整個列表并從中找出最大的讀數:
這是最終輸出結果,每一年的全球最高氣溫記錄。這是最終輸出結果,每一年的全球最高氣溫記錄。在圖的底部是Unix管線,用于模擬整個MapReduce的流程。
2. Java MapReduce
1. MapReduce程序
明白MapReduce程序的工作原理之后,下一步就是寫代碼實現它。需要三個準備:一個map函數,一個reduce函數和一些用來運行作業的代碼。map函數用Mapper類來表示,后者聲明一個抽象的map()方法。下面代碼顯示了map函數的實現。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private static final int MISSING = 9999;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+') {airTemperature = Integer.parseInt(line.substring(88, 92));} else {airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")) {context.write(new Text(year), new IntWritable(airTemperature));}}
}
書上是繼承MapReduceBase類,實現Mapper接口,但報錯,暫時不知道什么原因,就按之前學的方式,繼承Mapper類了。書上繼承實現代碼如下,其他都是一樣的。
這個Mapper類是一個泛型類型,有四個形參類型,分別指定map函數的輸入鍵、輸入值、輸出鍵和輸出值的類型。以這個例子來說,輸入鍵是一個長整數偏移量,輸入值是一行文本,輸出鍵是年份,輸出值是氣溫(整數)。
Hadoop本身提供了一套可優化網絡序列化傳輸的基本類型,而不直接使用Java內嵌的類型。這些類型都在org.apache.hadoop.io包中。這里使用LongWritable類型(相當于Java的Long類型)、Text類型(相當于Java中的String類型)和IntWritable類型(相當于Java的Integer類型)
map() 方法的輸入是一個鍵和一個值。首先將包含有一行輸入的Text值轉換成java的String類型,之后用substring() 方法提取我們感興趣的列。
map() 方法還提供 Context 實例用于輸出內容的寫入。在這種情況下,我們將年份數據按Text對象進行讀/寫(因為我們把年份當做鍵),將氣溫值封裝在IntWritable類型中。只有氣溫數據不缺失并且對應質量代碼顯示為正確的氣溫讀數時,這些數據才會被寫入輸出記錄中
以類似方法用Reducer來定義reduce函數,如下。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int maxValue = Integer.MIN_VALUE;for (IntWritable value : values) {maxValue = Math.max(maxValue, value.get());}context.write(key, new IntWritable(maxValue));}
}
同樣,reduce函數也有四個形式參數類型用于指定輸入和輸出類型。reduce函數的輸入類型必須匹配map函數的輸出類型:即Text類型和IntWritable類型。在這種情況下,reduce函數的輸出類型也必須是Text和IntWritable類型,分別輸出年份及其最高氣溫。這個最高氣溫是通過循環比較每個氣溫與當前所知最高氣溫所得到的。
第三部分代碼負責運行MapReduce作業,代碼如下。直接從書本中截圖出來了,錯誤的代碼,現在不是很懂…具體等我后來研究一下。
這是之前的例子,可以參考下。
import java
.io
.IOException
;
import org
.apache
.hadoop
.conf
.Configuration
;
import org
.apache
.hadoop
.fs
.Path
;
import org
.apache
.hadoop
.io
.IntWritable
;
import org
.apache
.hadoop
.io
.Text
;
import org
.apache
.hadoop
.mapreduce
.Job
;
import org
.apache
.hadoop
.mapreduce
.lib
.input
.FileInputFormat
;
import org
.apache
.hadoop
.mapreduce
.lib
.output
.FileOutputFormat
;public class WordcountDriver {public static void main(String
[] args
) throws IOException
, ClassNotFoundException
, InterruptedException
{Configuration configuration
= new Configuration();Job job
= Job
.getInstance(configuration
);job
.setJarByClass(WordcountDriver
.class);job
.setMapperClass(WordcountMapper
.class);job
.setReducerClass(WordcountReducer
.class);job
.setMapOutputKeyClass(Text
.class);job
.setMapOutputValueClass(IntWritable
.class);job
.setOutputKeyClass(Text
.class);job
.setOutputValueClass(IntWritable
.class);FileInputFormat
.setInputPaths(job
, new Path(args
[0]));FileOutputFormat
.setOutputPath(job
, new Path(args
[1]));boolean result
= job
.waitForCompletion(true);System
.exit(result
? 0 : 1);}
}
Job對象指定作業執行規范。我們可以用它來控制整個作業的運行。我們在Hadoop集群上運行這個作業時,要把代碼打包成一個JAR文件(Hadoop在集群上發布這個文件)。不必明確指定JAR文件的名稱,在Job對象的setJarByClass()方法中傳遞一個類即可,Hadoop利用這個類來查找包含它的JAR文件,進而找到相關JAR文件。構造Job對象之后,需要指定輸入和輸出數據的路徑。調用FileInputFormat類的靜態方法addInputPath()來定義輸入數據的路徑,這個路徑可以是單個文件、一個目錄(此時,將目錄下所有文件當做輸入)或符合特定文件模塊的一系列文件。可以多次調用addInputPath()來實現多路徑的輸入。調用FileOutputFormat類中的靜態方法setOutputPath()來指定輸出路徑(只能有一個輸出路徑)。這個方法指定的是reduce函數輸出文件的寫入目錄。在運行作業前該目錄是不應該存在的,否則Hadoop會報錯并拒絕運行作業。這種預防措施的目的是防止數據丟失(長時間運行的作業如果結果被意外覆蓋,肯定是非常惱人的)。接著,通過setMapperClass()和setReducerClass()方法指定要用的map類型和reduce類型。setOutputKeyClass()和setOutputValueClass()方法控制reduce函數的輸出類型,并且必須和Reduce類產生的相匹配。map函數的輸出類型默認情況下和reduce函數是相同的,因此如果mapper產生出和reduce相同的類型時(如同本例所示),不需要單獨設置。但是,如果不同,則必須通過setMapOutputKeyClass()和setMapOutputValueClass()方法來設置map函數的輸出類型。輸入的類型通過輸入格式來控制,我們的例子中沒有設置,因為使用的是默認的TextInputFormat(文本輸入格式)在設置定義map和reduce函數的類之后,可以開始運行作業。Job中的waitForCompletion()方法提交作業并等待執行完成。該方法唯一參數是一個標識,指示是否已經生成詳細輸出。當標識為true時,作業會把其進度信息寫到控制臺。waitForCompletion()方法返回一個布爾值,表示執行的成(true)敗(false),這樣布爾值被轉換成退出代碼0或者1。
2. 運行測試
寫好MMapReduce作業之后,作業輸出結果如下:
如果調用hadoop命令的第一個參數是類名,Hadoop就會啟動一個JVM(Java 虛擬機)來運行這個類。該Hadoop命令將Hadoop庫(及其依賴關系)添加到類路徑中,同時也能獲得Hadoop配置信息。
運行作業所得到的輸出提供了一些有用的信息。例如,作業指定的標識。
并執行了一個map任務和一個reduce任務。
輸出的最后一部分,以Counters為標題,顯示Hadoop上運行的每個作業的一些統計信息。這些信息對檢查數據是否按照預期進行處理非常有用。例如,查看系統輸出的記錄信息可知:5個map輸入記錄產生5個map輸出記錄(由于mapper為每個合法的輸入記錄產生一個輸出記錄),隨后,分為兩組的5個reduce輸入記錄(一組對應一個唯一的鍵),產生兩個reduce輸出記錄。
輸出數據寫入output目錄,其中每個reducer都有一個輸出文件。例子中的作業只有一個reducer,所以只能找到一個名為part-r-00000的文件。
2. 橫向擴展
為了實現橫向擴展,我們需要將數據存儲在分布式文件系統中(HDFS)。通過使用Hadoop資源管理系統YARN,Hadoop可以將MapReduce計算轉移到存儲有部分數據的各臺機器上。
1. 數據流
首先定義一些術語。MapReduce作業(job)是客戶端需要執行的一個工作單元:它包括輸入數據,MapReduce程序和配置信息。Hadoop將作業劃分成若干個任務(task)來執行,其中包括兩類任務:map任務和reduce任務。這些任務運行在集群的節點上,并通過YARN進行調度。如果一個任務失敗,它將在另一個不同的節點上自動重新調度運行。Hadoop將MapReduce的輸入數據劃分成等長的小數據塊,稱為輸入分別(input split)或簡稱“分片”。Hadoop為每個分片構建一個map任務,并由改任務來運行用戶自定義的map函數從而處理分片中的每條記錄。擁有許多分片,意味著處理每個分片所需要的時間少于處理整個輸入數據所花的時間。因此,如果我們并行處理每個分片,且每個分片數據比較小,那么整個處理過程將獲得更好的負載均衡,因為一臺一臺較快的計算機能夠處理的數據分片比一臺較慢的計算機更多,且成一定比例。即使使用相同的機器,失敗的進程或其他并發運行的作業能夠實現滿意的負載均衡,并且隨著分片被切分得更細,負載均衡的質量會更高。另一方面,如果分片切分得太小,那么管理分片的總時間和構建map任務的總時間將決定作業的整個執行時間。對于大多數作業來說,一個合理的分片大小趨向于HDFS的一個塊的大小,默認128M,不過可以針對集群調整整個默認值(對所有新建的文件),或在每個文件創建時指定。Hadoop在存儲輸入數據(HDFS中的數據)的節點上運行map任務,可以獲得最佳性能,因為它無需使用寶貴的集群帶寬資源。這就是所謂的“數據本地化優先”。但是,有時對于一個map任務輸入分片來說,存儲該分片的HDFS數據塊副本的所有節點可能正在運行其他map任務,此時作業調度需要從某一數據塊所在的架構中的一個節點上尋找一個空閑的map槽(slot)來運行該map任務分片。僅僅在非常偶然的情況(該情況基本不會發生),會使用其他機架中的節點運行該map任務,這將導致機架與機架之間的網絡傳輸。
為什么最佳分片的大小應該與塊的大小相同:因為它是確保可以存儲在單個節點上的最大輸入塊的大小。如果分片跨越兩個數據塊,那么對于任何一個HDFS節點,基本上都不可能同時存儲這兩個數據塊,因此分片中的部分數據需要通過網絡傳輸到map任務運行的節點。與使用本地數據運行整個map任務相比,這種方法顯然效率更低。map任務將其輸出寫入本地磁盤,而非HDFS。這是為什么?因為map的輸出是中間結果:該中間結果由reduce任務處理后才產生最終輸出結果,而且一旦作業完成,map的輸出結果就可以刪除。因此,如果把它存儲在HDFS中并實現備份,難免有點小題大做。如果允許map任務的節點在將map中間結果傳送給reduce任務之前失敗,Hadoop將在另一個節點上重新運行這個map任務以再次構建map中間結果。reduce任務并不具備數據本地化的優勢,單個reduce任務的輸入通常來自于所有mapper的輸出。在本例中,僅有一個reduce任務,其輸入是所有map任務的輸出。因此,排過序的map輸出需要通過網絡傳輸發送到運行reduce任務的節點。數據在reduce端合并,然后由用戶自定義reduce函數處理。reduce的輸出通常存儲在HDFS中實現可靠存儲。對于reduce輸出的每個HDFS塊,第一個副本存儲在本地節點上,其他副本出于安全性考慮存儲在其他機架的節點中。因此。將reduce的輸出寫入HDFS確實需要占用網絡帶寬,但這與正常的HDFS管線寫入的消耗一樣。一個reduce任務的完整數據流如下圖所示。虛線框表示節點,虛線箭頭表示節點內部的數據傳輸,而實線箭頭表示不同節點之間的數據傳輸。
reduce任務的數量并非由輸入數據的大小決定,相反是獨立指定的。如果有好多個reduce任務,每個map任務就會針對輸出進行分區(partition),即為每個reduce任務建一個分區。每個分區有許多鍵(以及對應的值),但每個鍵對應的鍵值對記錄都在同一個分區中。分區可由用戶定義的分區函數控制,但通常默認的partitioner通過哈希函數來分區,很高效。一般情況下,多個reduce任務的數據流如下圖所示。該圖清楚地表明了為什么map任務和reduce任務之間的數據流稱為shuffle(混洗),因為每個reduce任務的輸入都來自多個map任務。shuffle一般比圖中所示的更復雜,而且調整混洗參數對作業總執行時間的影響非常大。
最后,當書記處理可以完全并行化(即無需混洗時),可能會出現無reduce任務的情況。在這種情況下,唯一的非本地節點數據傳輸是map任務將結果寫入HDFS。如下圖。
2. biner函數
集群上的可用帶寬限制了MapReduce作業的數量,因此盡量避免map和reduce任務之間的數據傳輸是有利的。Hadoop允許用戶針對map任務的輸出指定一個combiner(就像mapper和reducer一樣),combiner函數的輸出作為reduce函數的輸人。由于combiner屬于優化方案,所以Hadoop無法確定要對一個指定的map任務輸出記錄調用多少次combiner(如果需要)。換而言之,不管調用combiner多少次,0次、1次或多次,reducer的輸出結果都是一樣的。combiner的規則制約著可用的函數類型。這里最好用一個例子來說明。還是假設以前計算最高氣溫的例子,1950年的讀數由兩個map任務處理(因為它們在不同的分片中)。假設第一個map的輸出如下:
第二個map輸出如下:
reduce函數調用時,輸入如下:
因為25是該列數據中最大的,所以它的輸出如下:
我們可以像使用reduce函數那樣,使用combiner找出每個map任務輸出結果中的最高氣溫。如此一來,reduce函數調用時將傳入以下數據:
reduce的輸出結果和以前一樣。更簡單地說,我們可以通過下面的表達式來說明氣溫數值的函數調用:
并非所有的函數都具有該屬性。例如,如果我們計算平均氣溫,就不能用求平均函數mean作為我們的combiner函數,因為
但是又有
combiner函數不能取代reduce函數。為什么呢?我們仍然需要reduce函數來處理不同map輸出中具有相同鍵的記錄。但combiner函數能幫助減少mapper和reducer之間的數據傳輸量,因此,單純就這點而言,在MapReduce作業中是否使用combiner函數還是值得斟酌的。
1. 指定一個combiner
讓我們回到Java MapReduce程序,combiner是通過Reducer類來定義的,并且在這個例子中,它的實現與MaxTemperatureReducer中的reduce函數相同。唯一的改動是在Job中設置combiner類(參見如下范例)
總結
以上是生活随笔為你收集整理的《Hadoop权威指南》第二章 关于MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。