MapReduce运行机制-Map阶段
MapTask 運(yùn)行機(jī)制
整個(gè)Map階段流程大體如上圖所示。
簡單概述:inputFile通過split被邏輯切分為多個(gè)split文件,通過Record按行讀取內(nèi)容給map(用戶自己實(shí)現(xiàn)的)進(jìn)行處理,數(shù)據(jù)被map處理結(jié)束之后交給OutputCollector收集器,對其結(jié)果key進(jìn)行分區(qū)(默認(rèn)使用hash分區(qū)),然后寫入buffer,每個(gè)map task都有
一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤,當(dāng)整個(gè)map task結(jié)束后再對磁盤中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并,生成最終的正式輸出文件,然后等待reduce task來拉數(shù)據(jù)?
詳細(xì)步驟
1. 讀取數(shù)據(jù)組件 InputFormat (默認(rèn) TextInputFormat) 會(huì)通過 getSplits 方法對輸入目錄中文件進(jìn)行邏輯切片規(guī)劃得到 splits, 有多少個(gè) split 就對應(yīng)啟動(dòng)多少個(gè)MapTask . split 與 block 的對應(yīng)關(guān)系默認(rèn)是一對一
2. 將輸入文件切分為 splits 之后, 由 RecordReader 對象 (默認(rèn)是LineRecordReader)進(jìn)行讀取, 以 \n 作為分隔符, 讀取一行數(shù)據(jù), 返回 <key,value> . Key 表示每行首字符偏移值, Value 表示這一行文本內(nèi)容
3. 讀取 split 返回 <key,value> , 進(jìn)入用戶自己繼承的 Mapper 類中,執(zhí)行用戶重寫的 map 函數(shù), RecordReader 讀取一行這里調(diào)用一次
4. Mapper 邏輯結(jié)束之后, 將 Mapper 的每條結(jié)果通過 context.write 進(jìn)行collect數(shù)據(jù)收集. 在 collect 中, 會(huì)先對其進(jìn)行分區(qū)處理,默認(rèn)使用 HashPartitioner
MapReduce 提供 Partitioner 接口, 它的作用就是根據(jù) Key 或 Value 及Reducer 的數(shù)量來決定當(dāng)前的這對輸出數(shù)據(jù)最終應(yīng)該交由哪個(gè) Reduce task處理, 默認(rèn)對 Key Hash 后再以 Reducer 數(shù)量取模. 默認(rèn)的取模方式只是為了平均 Reducer 的處理能力, 如果用戶自己對 Partitioner 有需求, 可以訂制并設(shè)置到 Job 上
5. 接下來, 會(huì)將數(shù)據(jù)寫入內(nèi)存, 內(nèi)存中這片區(qū)域叫做環(huán)形緩沖區(qū), 緩沖區(qū)的作用是批量收集Mapper 結(jié)果, 減少磁盤 IO 的影響. 我們的 Key/Value 對以及 Partition 的結(jié)果都會(huì)被寫入緩沖區(qū). 當(dāng)然, 寫入之前,Key 與 Value 值都會(huì)被序列化成字節(jié)數(shù)組
環(huán)形緩沖區(qū)其實(shí)是一個(gè)數(shù)組, 數(shù)組中存放著 Key, Value 的序列化數(shù)據(jù)和 Key,Value 的元數(shù)據(jù)信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及Value 的長度. 環(huán)形結(jié)構(gòu)是一個(gè)抽象概念
緩沖區(qū)是有大小限制, 默認(rèn)是 100MB. 當(dāng) Mapper 的輸出結(jié)果很多時(shí), 就可能會(huì)撐爆內(nèi)存, 所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時(shí)寫入磁盤, 然后重新利用這塊緩沖區(qū). 這個(gè)從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為 Spill, 中文可譯為溢寫. 這個(gè)溢寫是由單獨(dú)線程來完成, 不影響往緩沖區(qū)寫 Mapper 結(jié)果的線程.溢寫線程啟動(dòng)時(shí)不應(yīng)該阻止 Mapper 的結(jié)果輸出, 所以整個(gè)緩沖區(qū)有個(gè)溢寫的
比例 spill.percent . 這個(gè)比例默認(rèn)是 0.8, 也就是當(dāng)緩沖區(qū)的數(shù)據(jù)已經(jīng)達(dá)到閾值 buffer size * spill percent = 100MB * 0.8 = 80MB , 溢寫線程啟動(dòng),鎖定這 80MB 的內(nèi)存, 執(zhí)行溢寫過程. Mapper 的輸出結(jié)果還可以往剩下的20MB 內(nèi)存中寫, 互不影響
6. 當(dāng)溢寫線程啟動(dòng)后, 需要對這 80MB 空間內(nèi)的 Key 做排序 (Sort). 排序是 MapReduce模型默認(rèn)的行為, 這里的排序也是對序列化的字節(jié)做的排序
如果 Job 設(shè)置過 Combiner, 那么現(xiàn)在就是使用 Combiner 的時(shí)候了. 將有相同 Key 的 Key/Value 對的 Value 加起來, 減少溢寫到磁盤的數(shù)據(jù)量.Combiner 會(huì)優(yōu)化 MapReduce 的中間結(jié)果, 所以它在整個(gè)模型中會(huì)多次使用
那哪些場景才能使用 Combiner 呢? 從這里分析, Combiner 的輸出是Reducer 的輸入, Combiner 絕不能改變最終的計(jì)算結(jié)果. Combiner 只應(yīng)該用于那種 Reduce 的輸入 Key/Value 與輸出 Key/Value 類型完全一致, 且不影響最終結(jié)果的場景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它對 Job 執(zhí)行效率有幫助, 反之會(huì)影響 Reducer 的最終結(jié)果
7. 合并溢寫文件, 每次溢寫會(huì)在磁盤上生成一個(gè)臨時(shí)文件 (寫之前判斷是否有 Combiner),如果 Mapper 的輸出結(jié)果真的很大, 有多次這樣的溢寫發(fā)生, 磁盤上相應(yīng)的就會(huì)有多個(gè)臨時(shí)文件存在. 當(dāng)整個(gè)數(shù)據(jù)處理結(jié)束之后開始對磁盤中的臨時(shí)文件進(jìn)行 Merge 合并, 因?yàn)樽罱K的文件只有一個(gè), 寫入磁盤, 并且為這個(gè)文件提供了一個(gè)索引文件, 以記錄每個(gè)reduce對應(yīng)數(shù)據(jù)的偏移量
配置
| 配置 | 默認(rèn)值 | 解釋 |
| mapreduce.task.io.sort.mb | 100 | 設(shè)置 環(huán)型 緩沖 區(qū)的 內(nèi)存 值大 小 |
| mapreduce.map.sort.spill.percent | 0.8 | 設(shè)置 溢寫 的比 例 |
| mapreduce.cluster.local.dir | ${hadoop.tmp.dir}/mapred/local | 溢寫 數(shù)據(jù) 目錄 |
| mapreduce.task.io.sort.factor | 10 | 設(shè)置 一次 合并 多少 個(gè)溢 寫文 件 |
?
總結(jié)
以上是生活随笔為你收集整理的MapReduce运行机制-Map阶段的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MapReduce-流量统计求和-分区代
- 下一篇: MapReduce运行机制-Reduce