日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce运行机制

發(fā)布時(shí)間:2023/12/10 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce运行机制 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

相關(guān)鏈接?
MapReduce中Shuffle機(jī)制詳解——Map端Shuffle鏈接?
MapReduce中Shuffle機(jī)制詳解——Reduce端Shuffle鏈接

MapReduce將作業(yè)job的整個(gè)運(yùn)行過(guò)程分為兩個(gè)階段:Map階段和Reduce階段。按照時(shí)間順序包括:輸入分片(input split)、map階段、combiner階段、shuffle階段和reduce階段。系統(tǒng)執(zhí)行排序、將map輸出作為輸入傳給reducer的過(guò)程稱(chēng)為shuffle(shuffle是MapReducer的心臟)

Map階段由一定數(shù)量的 Map Task組成?
1. 輸入數(shù)據(jù)格式解析: InputFormat?
2. 輸入數(shù)據(jù)處理: Mapper?
3. 本地合并: Combiner(local reduce)?
4. 數(shù)據(jù)分組: Partitioner

Reduce階段由一定數(shù)量的 Reduce Task組成?
1. 數(shù)據(jù)遠(yuǎn)程拷貝?
2. 數(shù)據(jù)按照key排序?
3. 數(shù)據(jù)處理: Reducer?
4. 數(shù)據(jù)輸出格式: OutputFormat

1、流程簡(jiǎn)介?
?
各個(gè)map函數(shù)對(duì)所劃分的數(shù)據(jù)并行處理,從不同的輸入數(shù)據(jù)產(chǎn)生不同的中間結(jié)果輸出?
各個(gè)reduce也各自并行計(jì)算,各自負(fù)責(zé)處理不同的中間結(jié)果數(shù)據(jù)集合進(jìn)行reduce處理之前,必須等到所有的map函數(shù)做完?
在進(jìn)入reduce前需要有一個(gè)同步障(barrier)?
這個(gè)階段也負(fù)責(zé)對(duì)map的中間結(jié)果數(shù)據(jù)進(jìn)行收集整理(aggregation & shuffle)處理,以便reduce更有效地計(jì)算最終結(jié)果, 最終匯總所有reduce的輸出結(jié)果即可獲得最終結(jié)果

第一步:假設(shè)一個(gè)文件有三行英文單詞作為 MapReduce 的Input(輸入)這里經(jīng)過(guò) Splitting 過(guò)程把文件分割為3塊。分割后的3塊數(shù)據(jù)就可以并行處理,每一塊交給一個(gè) map 線程處理。?
第二步:每個(gè) map 線程中,以每個(gè)單詞為key,以1作為詞頻數(shù)value,然后輸出。?
第三步:每個(gè) map 的輸出要經(jīng)過(guò) shuffling(混洗),將相同的單詞key放在一個(gè)桶里面,然后交給 reduce 處理。?
第四步:reduce 接受到 shuffling 后的數(shù)據(jù), 會(huì)將相同的單詞進(jìn)行合并,得到每個(gè)單詞的詞頻數(shù),最后將統(tǒng)計(jì)好的每個(gè)單詞的詞頻數(shù)作為輸出結(jié)果。

上述就是 MapReduce 的大致流程,前兩步可以看做 map 階段,后兩步可以看做 reduce 階段。

2、輸入分片與類(lèi)型?
MapReduce將待處理數(shù)據(jù)執(zhí)行邏輯切片(即按照一個(gè)特定切片大小,將待處理數(shù)據(jù)劃分成邏輯上的多個(gè)split),然后每一個(gè)split分配一個(gè)mapTask并行實(shí)例處理

輸入分片(split)與map對(duì)應(yīng),是每個(gè)map處理的唯一單位。每個(gè)分片包括多條記錄,每個(gè)記錄都有對(duì)應(yīng)鍵值對(duì)。?
輸入切片的接口:InputSplit接口(不需要開(kāi)發(fā)人員直接處理,由InputFormat創(chuàng)建)

輸入分片(input split):在進(jìn)行map計(jì)算之前,mapreduce會(huì)根據(jù)輸入文件計(jì)算輸入分片(input split),每個(gè)輸入分片(input split)針對(duì)一個(gè)map任務(wù),輸入分片(input split)存儲(chǔ)的并非數(shù)據(jù)本身,而是一個(gè)分片長(zhǎng)度和一個(gè)記錄數(shù)據(jù)的位置的數(shù)組,輸入分片(input split)往往和hdfs的block(塊)關(guān)系很密切,存儲(chǔ)位置供MapReduce使用以便將map任務(wù)盡量放在分片數(shù)據(jù)附近,而長(zhǎng)度用來(lái)排序分片,以便優(yōu)化處理最大的分片,從而最小化作業(yè)運(yùn)行時(shí)間。

假如我們?cè)O(shè)定hdfs的塊的大小是64mb,如果我們輸入有三個(gè)文件,大小分別是3mb、65mb和127mb,那么mapreduce會(huì)把3mb文件分為一個(gè)輸入分片(input split),65mb則是兩個(gè)輸入分片(input split)而127mb也是兩個(gè)輸入分片(input split),換句話說(shuō)我們?nèi)绻趍ap計(jì)算前做輸入分片調(diào)整,例如合并小文件,那么就會(huì)有5個(gè)map任務(wù)將執(zhí)行,而且每個(gè)map執(zhí)行的數(shù)據(jù)大小不均,這個(gè)也是mapreduce優(yōu)化計(jì)算的一個(gè)關(guān)鍵點(diǎn)。

輸入類(lèi)型

FileInputFormat類(lèi):?
FileInputFormat 是所有使用文件作為其數(shù)據(jù)源的 InputFormat 實(shí)現(xiàn)的基類(lèi)。它提供了兩個(gè)功能:一個(gè)定義哪些文件包含在一個(gè)作業(yè)的輸入中;一個(gè)為輸入文件生成分片的實(shí)現(xiàn)。把分片分割成記錄的作業(yè)由其子類(lèi)來(lái)完成。

TextlnputFormat類(lèi):?
TextInputFormat 是默認(rèn)的 InputFormat。每條記錄是一行輸入。鍵是 LongWritable 類(lèi)型,存儲(chǔ)該行在整個(gè)文件中的字節(jié)偏移量。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車(chē)符),它是 Text 類(lèi)型的。但是輸入分片和HDFS塊之間可能不能很好的匹配,出現(xiàn)跨塊的情況

KeyValueTextlnputFormat類(lèi):?
TextInputFormat 的鍵,即每一行在文件中的字節(jié)偏移量,通常并不是特別有用。通常情況下,文件中的每一行是一個(gè)鍵/值對(duì),使用某個(gè)分界符進(jìn)行分隔,比如制表符。例如 以下由 Hadoop 默認(rèn) OutputFormat(即 TextOutputFormat)產(chǎn)生的輸出。如果要正確處理這類(lèi) 文件,KeyValueTextInputFormat 比較合適??梢酝ㄟ^(guò) key.value.separator.in.input.line 屬性來(lái)指定分隔符。它的默認(rèn)值是一個(gè)制表符。

NLineInputFormat類(lèi):?
與TextInputFormat一樣,鍵是文件中行的字節(jié)偏移量,值是行本身。主要是希望mapper收到固定行數(shù)的輸入。

MultipleInputs多種輸入:?
MultipleInputs類(lèi)處理多種格式的輸入,允許為每個(gè)輸入路徑指定InputFormat和Mapper。兩個(gè)mapper的輸出類(lèi)型是一樣的,所以reducer看到的是聚集后的map輸出,并不知道輸入是不同的mapper產(chǎn)生的。?
重載版本:addInputPath(),沒(méi)有mapper參數(shù),主要支持多種輸入格式只有一個(gè)mapper。?

3、Map與Reduce的個(gè)數(shù)

Map任務(wù)的個(gè)數(shù)

讀取數(shù)據(jù)產(chǎn)生多少個(gè)Mapper???
Mapper數(shù)據(jù)過(guò)大的話,會(huì)產(chǎn)生大量的小文件,過(guò)多的Mapper創(chuàng)建和初始化都會(huì)消耗大量的硬件資源?
Mapper數(shù)太小,并發(fā)度過(guò)小,Job執(zhí)行時(shí)間過(guò)長(zhǎng),無(wú)法充分利用分布式硬件資源

Mapper數(shù)量由什么決定???
(1)輸入文件數(shù)目(2)輸入文件的大小(3)配置參數(shù) 這三個(gè)因素決定的。?
輸入的目錄中文件的數(shù)量決定多少個(gè)map會(huì)被運(yùn)行起來(lái),應(yīng)用針對(duì)每一個(gè)分片運(yùn)行一個(gè)map,一般而言,對(duì)于每一個(gè)輸入的文件會(huì)有一個(gè)map split。如果輸入文件太大,超過(guò)了hdfs塊的大小(128M)那么對(duì)于同一個(gè)輸入文件我們會(huì)有多余2個(gè)的map運(yùn)行起來(lái)。

涉及參數(shù): mapreduce.input.fileinputformat.split.minsize //啟動(dòng)map最小的split size大小,默認(rèn)0 mapreduce.input.fileinputformat.split.maxsize //啟動(dòng)map最大的split size大小,默認(rèn)256M dfs.block.size//block塊大小,默認(rèn)128M 計(jì)算公式:splitSize = Math.max(minSize, Math.min(maxSize, blockSize))下面是FileInputFormat class 的getSplits()的偽代碼: num_splits = 0for each input file f:remaining = f.lengthwhile remaining / split_size > split_slope:num_splits += 1remaining -= split_sizewhere:split_slope = 1.1 分割斜率split_size =~ dfs.blocksize 分割大小約等于hdfs塊大小會(huì)有一個(gè)比例進(jìn)行運(yùn)算來(lái)進(jìn)行切片,為了減少資源的浪費(fèi) 例如一個(gè)文件大小為260M,在進(jìn)行MapReduce運(yùn)算時(shí),會(huì)首先使用260M/128M,得出的結(jié)果和1.1進(jìn)行比較 大于則切分出一個(gè)128M作為一個(gè)分片,剩余132M,再次除以128,得到結(jié)果為1.03,小于1.1 則將132作為一個(gè)切片,即最終260M被切分為兩個(gè)切片進(jìn)行處理,而非3個(gè)切片。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

reduce任務(wù)的個(gè)數(shù)

Reduce任務(wù)是一個(gè)數(shù)據(jù)聚合的步驟,數(shù)量默認(rèn)為1。而使用過(guò)多的Reduce任務(wù)則意味著復(fù)雜的shuffle,并使輸出文件的數(shù)量激增。

一個(gè)job的ReduceTasks數(shù)量是通過(guò)mapreduce.job.reduces參數(shù)設(shè)置 也可以通過(guò)編程的方式,調(diào)用Job對(duì)象的setNumReduceTasks()方法來(lái)設(shè)置 一個(gè)節(jié)點(diǎn)Reduce任務(wù)數(shù)量上限由mapreduce.tasktracker.reduce.tasks.maximum設(shè)置(默認(rèn)2)。可以采用以下探試法來(lái)決定Reduce任務(wù)的合理數(shù)量: 1.每個(gè)reducer都可以在Map任務(wù)完成后立即執(zhí)行:0.95 * (節(jié)點(diǎn)數(shù)量 * mapreduce.tasktracker.reduce.tasks.maximum) 2.較快的節(jié)點(diǎn)在完成第一個(gè)Reduce任務(wù)后,馬上執(zhí)行第二個(gè):1.75 * (節(jié)點(diǎn)數(shù)量 * mapreduce.tasktracker.reduce.tasks.maximum)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

4、小文件合并?
Mapper是基于文件自動(dòng)產(chǎn)生的,如何自己控制Mapper的個(gè)數(shù)?需要通過(guò)參數(shù)的控制來(lái)調(diào)節(jié)Mapper的個(gè)數(shù)。減少M(fèi)apper的個(gè)數(shù)就要合并小文件,這種小文件有可能是直接來(lái)自于數(shù)據(jù)源的小文件,也可能是Reduce產(chǎn)生的小文件。

設(shè)置合并器:(set都是在hive腳本,也可以配置Hadoop)設(shè)置合并器本身:set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;set hive.merge.mapFiles=true;set hive.merge.mapredFiles=true;set hive.merge.size.per.task=256000000;//每個(gè)Mapper要處理的數(shù)據(jù),就把上面的5M10M……合并成為一個(gè)一般還要配合一個(gè)參數(shù):set mapred.max.split.size=256000000 // mapred切分的大小set mapred.min.split.size.per.node=128000000//低于128M就算小文件,數(shù)據(jù)在一個(gè)節(jié)點(diǎn)會(huì)合并,在多個(gè)不同的節(jié)點(diǎn)會(huì)把數(shù)據(jù)抓過(guò)來(lái)進(jìn)行合并。Hadoop中的參數(shù):可以通過(guò)控制文件的數(shù)量控制mapper數(shù)量mapreduce.input.fileinputformat.split.minsize(default:0),小于這個(gè)值會(huì)合并mapreduce.input.fileinputformat.split.maxsize 大于這個(gè)值會(huì)切分
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

5、Map階段?
Map階段是由一定數(shù)量的 Map Task組成。這些Map Task可以同時(shí)運(yùn)行,每個(gè)Map Task又是由以下三個(gè)部分組成。?
  ?
1. InputFormat輸入數(shù)據(jù)格式解析組件:?
因?yàn)椴煌臄?shù)據(jù)可能存儲(chǔ)的數(shù)據(jù)格式不一樣,這就需要有一個(gè)InputFormat組件來(lái)解析這些數(shù)據(jù)的存放格式,默認(rèn)情況下,它提供了一個(gè)TextInputFormat文本文件輸入格式來(lái)解釋數(shù)據(jù)格式。?
它會(huì)將文件的每一行解釋成(key,value),key代表每行偏移量,value代表每行數(shù)據(jù)內(nèi)容,通常情況我們不需要自定義InputFormat,因?yàn)镸apReduce提供了多種支持不同數(shù)據(jù)格式InputFormat的實(shí)現(xiàn)?
    ?
2. Mapper輸入數(shù)據(jù)處理:這個(gè)Mapper是必須要實(shí)現(xiàn)的,因?yàn)楦鶕?jù)不同的業(yè)務(wù)對(duì)數(shù)據(jù)有不同的處理?
  ?
3. Partitioner數(shù)據(jù)分組:?
Mapper數(shù)據(jù)處理之后輸出之前,輸出key會(huì)經(jīng)過(guò)Partitioner分組或者分桶選擇不同的reduce,默認(rèn)的情況下Partitioner會(huì)對(duì)map輸出的key進(jìn)行hash取模。?
比如有6個(gè)ReduceTask,它就是模6,如果key的hash值為0,就選擇第0個(gè)ReduceTask(為1,選Reduce Task1)。這樣不同的map對(duì)相同單詞key,它的hash值取模是一樣的,所以會(huì)交給同一個(gè)reduce來(lái)處理。

6、Reduce階段

  • 數(shù)據(jù)運(yùn)程拷貝?
    Reduce Task要遠(yuǎn)程拷貝每個(gè)map處理的結(jié)果,從每個(gè)map中讀取一部分結(jié)果,每個(gè)Reduce Task拷貝哪些數(shù)據(jù),是由上面Partitioner決定的。
  • 數(shù)據(jù)按照key排序?
    Reduce Task讀取完數(shù)據(jù)后,要按照key進(jìn)行排序,相同的key被分到一組,交給同一個(gè)Reduce Task處理
  • Reducer數(shù)據(jù)處理?
    以WordCount為例,相同的單詞key分到一組,交個(gè)同一個(gè)Reducer處理,這樣就實(shí)現(xiàn)了對(duì)每個(gè)單詞的詞頻統(tǒng)計(jì)。
  • OutputFormat數(shù)據(jù)輸出格式?
    Reducer統(tǒng)計(jì)的結(jié)果將按照OutputFormat格式輸出(默認(rèn)情況下的輸出格式為T(mén)extOutputFormat)
  • 總結(jié)

    以上是生活随笔為你收集整理的MapReduce运行机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。