mapreduce 聚合_MapReduce:处理数据密集型文本处理–局部聚合第二部分
mapreduce 聚合
這篇文章繼續進行有關使用MapReduce進行數據密集型處理的書中實現算法的系列文章。 第一部分可以在這里找到。 在上一篇文章中,我們討論了使用本地聚合技術來減少通過網絡進行混洗和傳輸的數據量的方法。 減少傳輸的數據量是提高MapReduce作業效率的主要方法之一。 單詞計數MapReduce作業用于演示本地聚合。 由于結果只需要總數,因此我們可以為合并器重新使用相同的化簡器,因為更改加數的順序或分組不會影響總和。但是,如果您想要平均水平呢? 然后,由于計算平均值的平均值不等于原始數字集的平均值,因此相同的方法將行不通。 盡管有了一點見識,我們仍然可以使用本地聚合。 對于這些示例,我們將使用Hadoop最終指南書中使用的NCDC天氣數據集的示例。 我們將計算1901年每個月的平均溫度??梢栽贛apReduce的數據密集型處理的第3.1.3章中找到組合器和映射器內組合選項的平均值算法。
一種尺寸并不適合所有人
上次我們介紹了兩種用于在MapReduce作業中減少數據的方法:Hadoop組合器和映射器內組合方法。 Hadoop框架將組合器視為一種優化,并且無法保證調用組合器的次數(如果有的話)。 結果,映射器必須以減速器期望的形式發出數據,因此,如果不涉及組合器,則最終結果不會更改。 為了調整計算平均值,我們需要返回到映射器并更改其輸出。
映射器更改
在單詞計數示例中,未優化的映射器僅發出單詞和1的計數。合并器和映射器內組合映射器通過將每個單詞作為哈希映射中的鍵(總計數為n)來優化此輸出。值。 每次看到一個單詞,計數都會增加1。使用此設置時,如果未調用組合器,則縮減器將接收到該單詞作為鍵,并將一長串的1?s加在一起,從而得到相同的輸出(當然,使用映射器內組合映射器可以避免此問題,因為可以保證合并結果是映射器代碼的一部分)。 為了計算平均值,我們將使基本映射器發出一個字符串鍵(將天氣觀測的年和月連接在一起)和一個自定義可寫對象,稱為TemperatureAveragingPair。 TemperatureAveragingPair對象將包含兩個數字(IntWritables),獲取的溫度和一個計數。 我們將從Hadoop:權威指南中獲取MaximumTemperatureMapper,并以此為靈感來創建AverageTemperatureMapper:
public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {//sample line of weather data//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999private Text outText = new Text();private TemperatureAveragingPair pair = new TemperatureAveragingPair();private static final int MISSING = 9999;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {outText.set(yearMonth);pair.set(temp, 1);context.write(outText, pair);}} } 通過使映射器輸出鍵和TemperatureAveragingPair對象,無論調用組合器如何,我們的MapReduce程序都可以保證具有正確的結果。
合路器
我們需要減少發送的數據量,因此我們將對溫度求和,并對計數求和并分別存儲。 這樣,我們將減少發送的數據,但保留計算正確平均值所需的格式。 如果/在調用組合器時,它將采用所有傳入的TemperatureAveragingPair對象,并為同一鍵發出單個TemperatureAveragingPair對象,其中包含溫度和計數值的總和。 這是合并器的代碼:
public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> {private TemperatureAveragingPair pair = new TemperatureAveragingPair();@Overrideprotected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {int temp = 0;int count = 0;for (TemperatureAveragingPair value : values) {temp += value.getTemp().get();count += value.getCount().get();}pair.set(temp,count);context.write(key,pair);} } 但是我們非常有興趣確保我們減少了發送到reducer的數據量,因此我們將看看下一步如何實現。
在Mapper合并平均值中
與單詞計數示例相似,為了計算平均值,映射器內組合映射器將使用哈希圖,將連接的年+月作為鍵,將TemperatureAveragingPair作為值。 每次獲得相同的年+月組合時,我們都會將對對象從地圖中取出,添加溫度并將計數增加一個。 調用cleanup方法后,我們將發出所有對及其各自的鍵:
public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {//sample line of weather data//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999private static final int MISSING = 9999;private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String yearMonth = line.substring(15, 21);int tempStartPosition = 87;if (line.charAt(tempStartPosition) == '+') {tempStartPosition += 1;}int temp = Integer.parseInt(line.substring(tempStartPosition, 92));if (temp != MISSING) {TemperatureAveragingPair pair = pairMap.get(yearMonth);if(pair == null){pair = new TemperatureAveragingPair();pairMap.put(yearMonth,pair);}int temps = pair.getTemp().get() + temp;int count = pair.getCount().get() + 1;pair.set(temps,count);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {Set<String> keys = pairMap.keySet();Text keyText = new Text();for (String key : keys) {keyText.set(key);context.write(keyText,pairMap.get(key));}} } 通過遵循在映射調用之間跟蹤數據的相同模式,我們可以通過實現映射器內合并策略來實現可靠的數據縮減。 同樣的注意事項適用于在對映射器的所有調用中保持狀態,但是考慮使用這種方法可以提高處理效率,這值得考慮。
減速器
在這一點上,編寫我們的reducer很容易,為每個鍵獲取一個成對列表,將所有溫度和計數求和,然后將溫度總和除以計數總和。
public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> {private IntWritable average = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {int temp = 0;int count = 0;for (TemperatureAveragingPair pair : values) {temp += pair.getTemp().get();count += pair.getCount().get();}average.set(temp / count);context.write(key, average);} }
結果
使用合并器和映射器內合并映射器選項可以預測結果,從而顯著減少數據輸出。
未優化的映射器選項:
組合器選項:
12/10/10 23:07:19 INFO mapred.JobClient: Reduce input groups=12 12/10/10 23:07:19 INFO mapred.JobClient: Combine output records=12 12/10/10 23:07:19 INFO mapred.JobClient: Map input records=6565 12/10/10 23:07:19 INFO mapred.JobClient: Reduce shuffle bytes=210 12/10/10 23:07:19 INFO mapred.JobClient: Reduce output records=12 12/10/10 23:07:19 INFO mapred.JobClient: Spilled Records=24 12/10/10 23:07:19 INFO mapred.JobClient: Map output bytes=98460 12/10/10 23:07:19 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 12/10/10 23:07:19 INFO mapred.JobClient: Combine input records=6564 12/10/10 23:07:19 INFO mapred.JobClient: Map output records=6564 12/10/10 23:07:19 INFO mapred.JobClient: SPLIT_RAW_BYTES=108 12/10/10 23:07:19 INFO mapred.JobClient: Reduce input records=12映射器內合并選項:
12/10/10 23:09:09 INFO mapred.JobClient: Reduce input groups=12 12/10/10 23:09:09 INFO mapred.JobClient: Combine output records=0 12/10/10 23:09:09 INFO mapred.JobClient: Map input records=6565 12/10/10 23:09:09 INFO mapred.JobClient: Reduce shuffle bytes=210 12/10/10 23:09:09 INFO mapred.JobClient: Reduce output records=12 12/10/10 23:09:09 INFO mapred.JobClient: Spilled Records=24 12/10/10 23:09:09 INFO mapred.JobClient: Map output bytes=180 12/10/10 23:09:09 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 12/10/10 23:09:09 INFO mapred.JobClient: Combine input records=0 12/10/10 23:09:09 INFO mapred.JobClient: Map output records=12 12/10/10 23:09:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=108 12/10/10 23:09:09 INFO mapred.JobClient: Reduce input records=12 計算結果:
(注意:示例文件中的溫度以攝氏度* 10為單位)
| 未優化 | 合路器 | 映射器內合并器映射器 |
| 190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 | 190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 | 190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 |
結論
我們已經討論了本地聚合,無論是簡單的情況(可以將reducer用作組合器),還是更復雜的情況(對于如何構造數據,同時仍能從本地聚合數據以提高處理效率)中獲得一些見解。
進一步閱讀
- Jimmy Lin和Chris Dyer 使用MapReduce進行的數據密集型處理
- Hadoop: Tom White 的權威指南
- 來自博客的源代碼
- Hadoop API
- MRUnit用于單元測試Apache Hadoop映射減少工作
- Gutenberg項目提供了大量純文本格式的書籍,非常適合在本地測試Hadoop作業。
參考: 使用MapReduce進行數據密集型文本處理-本地聚合第二部分,來自我們的JCG合作伙伴 Bill Bejeck,來自“ 隨機編碼思考”博客。
翻譯自: https://www.javacodegeeks.com/2012/10/mapreduce-working-through-data-2.html
mapreduce 聚合
總結
以上是生活随笔為你收集整理的mapreduce 聚合_MapReduce:处理数据密集型文本处理–局部聚合第二部分的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 上山容易下山难什么意思 上山容易下山难意
- 下一篇: 使用DynamoDBMapper扫描Dy