日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

map/reduce的概念

發(fā)布時(shí)間:2025/4/16 编程问答 57 豆豆
生活随笔 收集整理的這篇文章主要介紹了 map/reduce的概念 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

《MapReduce: Simplified Data Processing on Large Cluster 》翻譯

MapReduce是一種編程模型和一種用來(lái)處理和產(chǎn)生大數(shù)據(jù)集的相關(guān)實(shí)現(xiàn)。用戶定義map函數(shù)來(lái)處理key/value鍵值對(duì)來(lái)產(chǎn)生一系列的中間的key/value鍵值對(duì)。還要定義一個(gè)reduce函數(shù)用來(lái)合并有著相同中間key值的中間value。許多現(xiàn)實(shí)世界中的任務(wù)都可以用這種模型來(lái)表達(dá),就像下文所展示的那樣。

用這個(gè)風(fēng)格編寫的程序可以自動(dòng)并行地在集群上工作。運(yùn)行時(shí)系統(tǒng)會(huì)自動(dòng)處理例如切割輸入數(shù)據(jù),在機(jī)器之間調(diào)度程序的執(zhí)行,處理機(jī)器故障以及管理必要的機(jī)器間通信等細(xì)節(jié)問(wèn)題。這可以讓那些對(duì)于并行分布式系統(tǒng)沒(méi)有任何經(jīng)驗(yàn)的程序員也能簡(jiǎn)單的利用起一個(gè)大的分布式系統(tǒng)的資源。

我們的MapReduce的實(shí)現(xiàn)運(yùn)行在一個(gè)由大的商業(yè)機(jī)構(gòu)成的集群當(dāng)中并且是高度可擴(kuò)展的:一個(gè)典型的MapReduce計(jì)算要在上千臺(tái)機(jī)器中處理TB數(shù)量級(jí)的數(shù)據(jù)。程序員會(huì)覺(jué)得這個(gè)系統(tǒng)非常好用:已經(jīng)有成千上萬(wàn)的MapReduce程序被實(shí)現(xiàn)出來(lái)并且每天有上千個(gè)MapReduce任務(wù)運(yùn)行在Google的集群上。

Introduction

在過(guò)去五年中,作者和許多Google的其他人已經(jīng)實(shí)現(xiàn)了成百上千個(gè)用于特殊目的的計(jì)算程序用于處理大量的raw data,各種各樣的derived data。許多這種計(jì)算程序在概念上都是非常直接的。然而輸入的數(shù)據(jù)量往往很大,并且計(jì)算需要分布在成百上千臺(tái)機(jī)器中為了在一個(gè)可接受的時(shí)間內(nèi)完成任務(wù)。但是除了簡(jiǎn)單的計(jì)算模型以外,我們需要大量復(fù)雜的代碼用來(lái)處理例如如何并行化計(jì)算、分發(fā)數(shù)據(jù)、處理故障等等問(wèn)題。

為了解決這樣的復(fù)雜性,我們?cè)O(shè)計(jì)了一種新的抽象,它讓我們只需要表示出我們想要執(zhí)行的計(jì)算模型,而將背后復(fù)雜的并行化,容錯(cuò),數(shù)據(jù)分發(fā),負(fù)載平衡等等技術(shù)的實(shí)現(xiàn)細(xì)節(jié)隱藏在了庫(kù)中。我們這種新的抽象是受Lisp以及其他一些函數(shù)式編程語(yǔ)言中的map和reduce原語(yǔ)影響而來(lái)的。我們意識(shí)到許多的計(jì)算都需要對(duì)于輸入中的每個(gè)邏輯“記錄”進(jìn)行map操作,為了計(jì)算一系列的中間鍵值對(duì)。然后還需要對(duì)所有共享同一個(gè)key的value進(jìn)行reduce操作,從而能夠?qū)ε缮臄?shù)據(jù)進(jìn)行適當(dāng)?shù)慕M合。我們這種讓用戶自定義map和reduce操作的編程模型能夠讓我們簡(jiǎn)單地對(duì)大量數(shù)據(jù)實(shí)現(xiàn)并行化,并且使用重新執(zhí)行作為主要的容錯(cuò)機(jī)制。

我們這項(xiàng)工作的主要共享是提供了一個(gè)簡(jiǎn)單并且強(qiáng)大的接口能夠讓我們實(shí)現(xiàn)自動(dòng)的并行化并且分布處理大規(guī)模的計(jì)算,同時(shí)該接口的實(shí)現(xiàn)能在大型的商用PC集群上獲得非常高的性能。

Section 2描述了基本的編程模型以及一些簡(jiǎn)單的例子。Section 3描述了為我們的基于集群的計(jì)算環(huán)境量身定做的MapReduce接口。Section 4描述了一些我們認(rèn)為有用的對(duì)于編程模型的改進(jìn)。Section 5是對(duì)我們的實(shí)現(xiàn)在不同任務(wù)下的性能測(cè)試。Section 6 包含了MapReduce在Google內(nèi)的使用情況,包括我們以它為基礎(chǔ)重寫我們的產(chǎn)品索引系統(tǒng)的經(jīng)驗(yàn)。Section 7討論了相關(guān)的工作以及未來(lái)的發(fā)展。

2 Programming Model

計(jì)算模型以一系列的鍵值對(duì)作為輸入并產(chǎn)生一系列的鍵值對(duì)作為輸出。MapReduce庫(kù)的用戶以“Map”和"Reduce"兩個(gè)函數(shù)來(lái)表達(dá)計(jì)算。

Map,是由用戶編寫的,獲取一個(gè)輸入對(duì),并且產(chǎn)生一系列中間的鍵值對(duì)。MapReduce庫(kù)將那些具有相同的中間鍵I的中間值聚集在一起,然后將它們傳遞給Reduce函數(shù)。

Reduce函數(shù)同樣是由用戶編寫的,接收一個(gè)中間鍵I和該鍵對(duì)應(yīng)的一系列的中間值。Reduce函數(shù)通過(guò)將這些值合并來(lái)組成一個(gè)更小的值的集合。通常每個(gè)Reduce函數(shù)只產(chǎn)生0個(gè)或1個(gè)輸出值。Reduce函數(shù)一般通過(guò)一個(gè)迭代器來(lái)獲取中間值,從而在中間值的數(shù)目遠(yuǎn)遠(yuǎn)大于內(nèi)存容量時(shí),我們也能夠處理。

2.1 Example

下面來(lái)考慮這樣一個(gè)問(wèn)題:統(tǒng)計(jì)大量文檔中每一個(gè)單詞出現(xiàn)的次數(shù)。對(duì)此,用戶需要編寫類似于如下的偽代碼:

  map(String key, String value):// key: document name// value: document contentsfor each word w in value:      EmitIntermediate(w, "1");  reduce(String key, Iterator values):// key: a word// values: a list of countsint result = 0;for each v in values:result += ParseInt(v);    Emit(AsString(result));

Map函數(shù)為在每一個(gè)單詞出現(xiàn)的時(shí)候,為它加上一個(gè)計(jì)數(shù)(在這個(gè)簡(jiǎn)單的例子中就是加1)。Reduce函數(shù)對(duì)每個(gè)單詞的所有計(jì)數(shù)進(jìn)行疊加。

另外,用戶需要用輸入輸出文件的名字,以及一個(gè)可選的tuning parameter去填充一個(gè)叫mapreduce specification 的對(duì)象。之后,用戶調(diào)用MapReduce函數(shù),將定義的上述對(duì)象傳遞進(jìn)去。用戶的代碼將和MapReduce庫(kù)相連(由C++實(shí)現(xiàn))。Appendix A中有這個(gè)例子所有的代碼文檔。

2.2 Types

雖然在上述的偽代碼中輸入輸出都是字符串類型的,但事實(shí)上,用戶提供的Map和Reduce函數(shù)都是有相應(yīng)類型的:

  map    (k1, v1)    -> list(k2, v2)  reduce   (k2, list(v2))  -> list(v2)

需要注意的是,輸入的key和value與輸出的key和value是不同的類型,而中間的key和value與輸出的key和value是相同的類型。我們的C++實(shí)現(xiàn)都是以字符串的形式和用戶代碼進(jìn)行交互的,至于將字符串類型轉(zhuǎn)換成相應(yīng)合適的類型的工作則由用戶代碼來(lái)完成了。

2.3 More Example

接下來(lái)是一些能夠簡(jiǎn)單地用MapReduce計(jì)算模型進(jìn)行表達(dá)的例子

Distributed Grep:Map函數(shù)獲取匹配提供的模式的行,Reduce函數(shù)只是簡(jiǎn)單地將這些中間數(shù)據(jù)拷貝到輸出

Count of URL Access Frequency:Map函數(shù)處理web請(qǐng)求的日志,并且輸出<URL, 1>。Reduce函數(shù)將擁有相同URL的value相加,得到<URL, total count>對(duì)

Reverse Web-Link Graph:Map函數(shù)輸出<target, source>對(duì),其中source所在的page都有連向target這個(gè)URL的鏈接。Reduce函數(shù)將給定target的所有的source URL連接起來(lái),輸出<target, list(source)>對(duì)

Term-Vector per Host:一個(gè)term vector表示一系列<word, frequency>的鍵值對(duì),word表示一篇或者一系列文章中出現(xiàn)的比較重要的單詞,frequency表示它們出現(xiàn)的次數(shù)。Map函數(shù)對(duì)于每篇輸入的文章輸出<hostname, term vector>鍵值對(duì)(其中hostname是從文章所在的URL中抽取出來(lái)的)Reduce函數(shù)獲取給定host的term vectors。它將這些term vectors累加起來(lái),丟棄非頻繁出現(xiàn)的term,并產(chǎn)生一個(gè)最終的<hostname, term vector>對(duì)。

Inverted Index:Map函數(shù)對(duì)每篇文章進(jìn)行處理,并輸出一系列的<word, document ID>對(duì)。Reduce函數(shù)接收給定word的所有鍵值對(duì),對(duì)相應(yīng)的document ID進(jìn)行排序并且輸出<word, list>對(duì)。所有輸出對(duì)的集合構(gòu)成了一個(gè)簡(jiǎn)單的倒排索引。用了MapReduce模型,對(duì)單詞位置的追蹤就變得非常簡(jiǎn)單了。

Distributed Sort:Map函數(shù)從每個(gè)record中抽取出key,產(chǎn)生<key, record>鍵值對(duì)。Reduce函數(shù)只是簡(jiǎn)單地將所有對(duì)輸出。這個(gè)計(jì)算模型依賴于Section 4.1中描述的劃分技巧以及Section 4.2中描述的排序特性。

3 Implementation

對(duì)于MapReduce的接口,各種各樣不同的實(shí)現(xiàn)都是可能的。所有正確的實(shí)現(xiàn)都是基于應(yīng)用環(huán)境的。比如,一種實(shí)現(xiàn)可能適合于小的共享內(nèi)存的機(jī)器,另一種可能適合于大型的NUMA多處理器機(jī)器,甚至有的是為更大的互聯(lián)的機(jī)器集群設(shè)計(jì)的。

本節(jié)中描述的實(shí)現(xiàn)基于的是Google中最常用的計(jì)算環(huán)境:一個(gè)由大量商用PC機(jī)通過(guò)交換以太網(wǎng)互聯(lián)的集群。在我們的環(huán)境中:

(1)、機(jī)器通常都是x86的雙核處理器,其上運(yùn)行Linux,每臺(tái)機(jī)器擁有2-4G的內(nèi)存

(2)、商用網(wǎng)絡(luò)硬件—通常是100 M/s或者1 G/s,但是綜合起來(lái)要小于平均帶寬

(3)、一個(gè)集群由成千上萬(wàn)臺(tái)機(jī)器組成,因此機(jī)器故障是常有的事

(4)、存儲(chǔ)由便宜的IDE磁盤提供,它們都與獨(dú)立的機(jī)器直接相連。一個(gè)內(nèi)部研發(fā)的文件系統(tǒng)用于管理所有存儲(chǔ)于這些硬盤上的文件。該文件系統(tǒng)通過(guò)Replication在不可靠的硬件上提供了可用性和可靠性

(5)、用戶提交jobs給調(diào)度系統(tǒng)。每個(gè)job由一系列的task組成,并且由調(diào)度器分配到集群中一系列可用的機(jī)器上

3.1 Execution Overview

通過(guò)將輸入數(shù)據(jù)自動(dòng)分割成M份,Map函數(shù)得以在多臺(tái)機(jī)器上分布式執(zhí)行。每一個(gè)輸入塊都能并行地在不同的機(jī)器上執(zhí)行。通過(guò)劃分函數(shù)(例如,hash(key) mod R)將中間鍵劃分為R份,Reduce函數(shù)也能被分布式地調(diào)用。其中劃分的數(shù)目R和劃分函數(shù)都是由用戶指定的。

上圖1展示了在我們的實(shí)現(xiàn)中MapReduce全部的流程。當(dāng)用戶程序調(diào)用MapReduce函數(shù)時(shí),接下來(lái)的動(dòng)作將按序發(fā)生(圖1中標(biāo)記的數(shù)字與下面的數(shù)字是一一對(duì)應(yīng)的):

(1)、用戶程序中的MapReduce庫(kù)首先將輸入文件劃分為M片,每片大小一般在16M到64M之間(由用戶通過(guò)一個(gè)可選的參數(shù)指定)。之后,它在集群的很多臺(tái)機(jī)器上都啟動(dòng)了相同的程序拷貝。

(2)其中有一個(gè)拷貝程序是特別的----master。剩下的都是worker,它們接收master分配的任務(wù)。其中有M個(gè)Map任務(wù)和R個(gè)Reduce任務(wù)要分配。master挑選一個(gè)空閑的worker并且給它分配一個(gè)map任務(wù)或者reduce任務(wù)。

(3)、被分配到Map任務(wù)的worker會(huì)去讀取相應(yīng)的輸入塊的內(nèi)容。它從輸入文件中解析出鍵值對(duì)并且將每個(gè)鍵值對(duì)傳送給用戶定義的Map函數(shù)。而由Map函數(shù)產(chǎn)生的中間鍵值對(duì)緩存在內(nèi)存中。

(4)、被緩存的鍵值對(duì)會(huì)階段性地寫回本地磁盤,并且被劃分函數(shù)分割成R份。這些緩存對(duì)在磁盤上的位置會(huì)被回傳給master,master再負(fù)責(zé)將這些位置轉(zhuǎn)發(fā)給Reduce worker。

(5)、當(dāng)Reduce worker從master那里接收到這些位置信息時(shí),它會(huì)使用遠(yuǎn)程過(guò)程調(diào)用從Map worker的本地磁盤中獲取緩存的數(shù)據(jù)。當(dāng)Reduce worker讀入全部的中間數(shù)據(jù)之后,它會(huì)根據(jù)中間鍵對(duì)它們進(jìn)行排序,這樣所有具有相同鍵的鍵值對(duì)就都聚集在一起了。排序是必須的,因?yàn)闀?huì)有許多不同的鍵被映射到同一個(gè)reduce task中。如果中間數(shù)據(jù)的數(shù)量太大,以至于不能夠裝入內(nèi)存的話,還需要另外的排序。

(6)、Reduce worker遍歷已經(jīng)排完序的中間數(shù)據(jù)。每當(dāng)遇到一個(gè)新的中間鍵,它會(huì)將key和相應(yīng)的中間值傳遞給用戶定義的Reduce函數(shù)。Reduce函數(shù)的輸出會(huì)被添加到這個(gè)Reduce部分的輸出文件中。

(7)、當(dāng)所有的Map tasks和Reduce tasks都已經(jīng)完成的時(shí)候,master將喚醒用戶程序。到此為止,用戶代碼中的MapReduce調(diào)用返回。

當(dāng)成功執(zhí)行完之后,MapReduce的執(zhí)行結(jié)果被存放在R個(gè)輸出文件中(每個(gè)Reduce task對(duì)應(yīng)一個(gè),文件名由用戶指定)。通常用戶并不需要將R個(gè)輸出文件歸并成一個(gè)。因?yàn)樗鼈兺ǔ⑦@些文件作為另一個(gè)MapReduce調(diào)用的輸入,或者將它們用于另外一個(gè)能夠以多個(gè)文件作為輸入的分布式應(yīng)用。

3.2 Master Data Structures

在master中保存了許多的數(shù)據(jù)結(jié)構(gòu)。對(duì)于每個(gè)Map task和Reduce task,master都保存了它們的狀態(tài)(idle,in-progress或者是completed)以及worker所在機(jī)器的標(biāo)識(shí)(對(duì)于非idle狀態(tài)的tasks而言)。

master相當(dāng)于是一個(gè)管道,通過(guò)它Map task所產(chǎn)生的中間文件被傳遞給了Reduce task。因此,對(duì)于每一個(gè)已經(jīng)完成的Map task,master會(huì)存儲(chǔ)由它產(chǎn)生的R個(gè)中間文件的位置和大小。當(dāng)Map task完成的時(shí)候,master就會(huì)收到位置和大小的更新信息。而這些信息接下來(lái)就會(huì)逐漸被推送到處于in-progress狀態(tài)的Reduce task中。

3.3 Fault Tolerance

因?yàn)镸apReduce庫(kù)的設(shè)計(jì)初衷是用成千上萬(wàn)的機(jī)器去處理大量的數(shù)據(jù),所以它就必須能用優(yōu)雅的方式對(duì)機(jī)器故障進(jìn)行處理。

Worker Failure

master會(huì)周期性地ping每一個(gè)worker。如果經(jīng)過(guò)了一個(gè)特定的時(shí)間還未從某一個(gè)worker上獲得響應(yīng),那么master會(huì)將worker標(biāo)記為failed。所有由該worker完成的Map task都被回退為idle狀態(tài),因此能夠被重新調(diào)度到其他的worker上。同樣的,所有failed worker正在執(zhí)行的Map task或者Reduce task也會(huì)被回退為idle狀態(tài),并且被重新調(diào)度。

發(fā)生故障的機(jī)器上已經(jīng)完成的Map task需要重新執(zhí)行的原因是,它們的輸入是保存在本地磁盤的,因此發(fā)生故障之后就不能獲取了。而已經(jīng)完成的Reduce task并不需要被重新執(zhí)行,因?yàn)樗鼈兊妮敵鍪谴娣旁谌值奈募到y(tǒng)中的。

當(dāng)一個(gè)Map task開(kāi)始由worker A執(zhí)行,后來(lái)又由worker B執(zhí)行(因?yàn)锳故障了)。所有執(zhí)行Reduce task的worker都會(huì)收到這個(gè)重新執(zhí)行的通知。那些還未從worker A中讀取數(shù)據(jù)的Reduce task將會(huì)從worker B中讀取數(shù)據(jù)。

MapReduce對(duì)于大面積的機(jī)器故障是非常具有彈性的。例如,在一次MapReduce操作中,網(wǎng)絡(luò)維護(hù)造成了集群中八十臺(tái)機(jī)器在幾分鐘的時(shí)間內(nèi)處于不可達(dá)的狀態(tài)。MapReduce的master只是簡(jiǎn)單地將不可達(dá)的worker機(jī)器上的工作重新執(zhí)行了一遍,接著再繼續(xù)往下執(zhí)行,最終完成了MapReduce的操作。

Master Failure

對(duì)于master,我們可以簡(jiǎn)單地對(duì)上文所述的master數(shù)據(jù)結(jié)構(gòu)做周期性的快照。如果一個(gè)master task死了,我們可以很快地根據(jù)最新的快照來(lái)重新啟動(dòng)一個(gè)master task。但是,因?yàn)槲覀冎挥幸粋€(gè)master,因此故障的概率比較低。所以,在我們的實(shí)現(xiàn)中如果master出現(xiàn)了故障就只是簡(jiǎn)單地停止MapReduce操作。用戶可以檢測(cè)到這種情況,并且如果他們需要的話可以重新開(kāi)始一次MapReduce操作。

Semantics in the Presence of Failures

如果用戶提供的Map和Reduce操作是關(guān)于輸入值的確定性函數(shù),那么我們分布式的實(shí)現(xiàn)將會(huì)產(chǎn)生同樣的輸出,在整個(gè)程序經(jīng)過(guò)沒(méi)有出現(xiàn)故障的順序執(zhí)行之后。

我們依賴Map task和Reduce task原子性地提交輸出來(lái)實(shí)現(xiàn)上述特性。每一個(gè)正在執(zhí)行的task都會(huì)將它的輸出寫到一個(gè)私有的臨時(shí)文件中。一個(gè)Reduce task產(chǎn)生一個(gè)這樣的文件,而一個(gè)Map task產(chǎn)生R個(gè)這樣的文件(每個(gè)Reduce work一個(gè))。當(dāng)一個(gè)Map task完成的時(shí)候,worker就會(huì)給master發(fā)送一個(gè)信息,,其中包含了R個(gè)臨時(shí)文件的名字。如果master收到了一個(gè)來(lái)自于已經(jīng)完成了的Map task的完成信息,那么它就將它自動(dòng)忽略。否則,將R個(gè)文件的名稱記錄到一個(gè)master數(shù)據(jù)結(jié)構(gòu)中。

當(dāng)一個(gè)Reduce task完成的時(shí)候,Reduce worker會(huì)自動(dòng)將臨時(shí)輸出文件命名為最終輸出文件。如果同一個(gè)Reduce task在多臺(tái)機(jī)器上運(yùn)行,那么多個(gè)重命名操作產(chǎn)生的最終輸出文件名將會(huì)產(chǎn)生沖突。對(duì)此,我們依賴底層文件系統(tǒng)提供的原子重命名操作來(lái)保證最終文件系統(tǒng)中的數(shù)據(jù)來(lái)自一個(gè)Reduce task。

大多數(shù)的Map和Reduce操作都是確定性的,事實(shí)上,我們的語(yǔ)義等同于順序執(zhí)行。因此這讓程序員非常容易地能夠解釋他們程序的行為。當(dāng)Map和Reduce操作是非確定性的時(shí)候,我們提供較弱,但仍然合理的語(yǔ)義。在非確定性的操作中,對(duì)于一個(gè)特定的Reduce task R1的輸出是和非確定性程序順序執(zhí)行產(chǎn)生R1產(chǎn)生的輸出是相同的。然而,對(duì)于另一個(gè)Reduce task R2,它的輸出對(duì)應(yīng)于非確定性程序另一個(gè)順序執(zhí)行的結(jié)果。

下面考慮Map task M和Reduce task R1和R2。讓e(Ri)表示Ri的執(zhí)行結(jié)果。更弱的語(yǔ)義意味著,e(R1)可能從M的一次執(zhí)行結(jié)果中讀取輸入,而e(R2)可能從M的另一次執(zhí)行中讀取輸入。

3.4 Locality

網(wǎng)絡(luò)帶寬在我們的計(jì)算環(huán)境中是相對(duì)稀缺的資源。我們通過(guò)將輸入數(shù)據(jù)存儲(chǔ)在集群中每臺(tái)機(jī)器的本地磁盤的方法來(lái)節(jié)省帶寬。GFS將輸入文件切分成64MB大小的塊,并且將每個(gè)塊的多份拷貝(通常為3份)存儲(chǔ)在不同的機(jī)器上。MapReduce的master獲取所有輸入文件的位置信息,然后將Map task調(diào)度到有相應(yīng)輸入文件副本的機(jī)器上。當(dāng)發(fā)生故障時(shí),再將Map task調(diào)度到鄰近的具有該task輸入文件副本的機(jī)器(即在同一臺(tái)交換機(jī)內(nèi)具有相同數(shù)據(jù)的機(jī)器)。當(dāng)在一個(gè)集群的大量機(jī)器上做MapReduce操作時(shí),大多數(shù)的輸入數(shù)據(jù)都是從本地讀取的,而不用消耗帶寬。

3.5 Task Granularity

如上所述,我們將Map操作分成M份,Reduce操作分成R份。在理想的情況下,M和R的值應(yīng)該要比集群中worker machine的數(shù)量多得多。讓一個(gè)worker同時(shí)進(jìn)行許多不同的task有利于提高動(dòng)態(tài)的負(fù)載均衡,同時(shí)在一個(gè)worker故障的時(shí)候能盡快恢復(fù)。許多已經(jīng)完成的Map task也能盡快地傳播到其他所有的worker machine上。

在我們的實(shí)現(xiàn)中,M和R的大小是有一個(gè)實(shí)用范圍的。因?yàn)槲覀兊膍aster需要做O(M+R)個(gè)調(diào)度決定,并且還要在內(nèi)存中保存O(MR)個(gè)狀態(tài)。(但是內(nèi)存使用的常數(shù)還是比較小的,O(MR)個(gè)Map task/Reduce task 狀態(tài)對(duì),每個(gè)的大小大概在一個(gè)字節(jié))

另外,R通常受限于用戶,因?yàn)槊總€(gè)Reduce task的輸出都分散在不同的輸出文件中。事實(shí)上,我們會(huì)選擇M,因此每個(gè)輸入文件大概16MB到64MB的輸入文件(因此上文所述的局部性優(yōu)化會(huì)達(dá)到最優(yōu))。而我們會(huì)讓R成為worker machine數(shù)量的一個(gè)較小的倍數(shù)。因此,我們通常在進(jìn)行MapReduce操作時(shí),將M設(shè)為200000,R設(shè)為5000,使用2000個(gè)worker machine。

3.6 Backup Tasks

“straggler”(落伍的士兵)的存在是拖慢整個(gè)MapReduce操作的通常的原因之一。所謂的"straggler"是指一臺(tái)機(jī)器用了過(guò)長(zhǎng)的時(shí)間去完成整個(gè)計(jì)算任務(wù)中最后幾個(gè)Map或者Reduce task。Straggler出現(xiàn)的原因有很多。比如一臺(tái)機(jī)器上硬盤壞了,它就會(huì)經(jīng)歷大量的可糾正錯(cuò)誤,從而讓它的性能從30MB/s下降到1MB/s。集群的調(diào)度系統(tǒng)可能將其他task調(diào)度到該機(jī)器上,導(dǎo)致它執(zhí)行MapReduce代碼的速度變慢很多,因?yàn)镃PU,內(nèi)存,本地磁盤,網(wǎng)絡(luò)帶寬的競(jìng)爭(zhēng)加劇。我們最近遇到的一個(gè)問(wèn)題是一臺(tái)機(jī)器的初始化代碼有點(diǎn)問(wèn)題,它會(huì)導(dǎo)致處理器的緩存被禁用,在這些受影響的機(jī)器上進(jìn)行的計(jì)算速度會(huì)下降到原來(lái)的百分之一。

對(duì)此,我們有一個(gè)通用的機(jī)制用來(lái)緩解straggler的問(wèn)題。當(dāng)MapReduce操作接近結(jié)束的時(shí)候,master會(huì)將那些還在執(zhí)行的task的備份進(jìn)行調(diào)度執(zhí)行。無(wú)論是原來(lái)的還是備份執(zhí)行完成,該task都被標(biāo)記為已完成。我們通過(guò)調(diào)整將該操作導(dǎo)致的計(jì)算資源消耗僅僅提高了幾個(gè)百分點(diǎn)。但是在完成大型的MapReduce操作時(shí),卻讓整個(gè)執(zhí)行時(shí)間下降了好多。例如,Section 5.3中所描述的排序算法在備份機(jī)制關(guān)閉的情況下,需要多消耗44%的時(shí)間。

4 Refinement

雖然對(duì)于大多數(shù)需求由Map和Reduce函數(shù)提供的功能已經(jīng)足夠了,但是我們還是發(fā)現(xiàn)了一些有用的擴(kuò)展。對(duì)它們的描述如下。

4.1 Partitioning Function

MapReduce用戶決定他們的Reduce task或者輸出文件的數(shù)目R。通過(guò)一個(gè)劃分函數(shù),根據(jù)中間鍵值將各個(gè)task的數(shù)據(jù)進(jìn)行劃分。默認(rèn)的劃分函數(shù)是通過(guò)哈希(比如,hash(key) mod R)。這通常會(huì)產(chǎn)生非常好的較為均衡的劃分。但是在其他一些情況下,通過(guò)鍵值的其他函數(shù)來(lái)劃分要更好一些。例如,有的時(shí)候輸出鍵值是一些URL,我們希望同一個(gè)host的內(nèi)容能放在同一個(gè)輸出文件中。為了支持這種情況,MapReduce庫(kù)的用戶可以提供一個(gè)特殊的劃分函數(shù)。例如,使用“hash(Hostname(urlKey)) mod R”作為劃分函數(shù),從而讓所有來(lái)自于同一個(gè)host的URL的內(nèi)容都輸出到同一個(gè)輸出文件。

4.2 Ordering Guarantees

我們確保在一個(gè)給定的劃分中,中間的鍵值對(duì)都按照鍵值的升序進(jìn)行處理。這樣的處理順序確保了每一個(gè)劃分產(chǎn)生一個(gè)排好序的輸出文件。這樣的話,如果輸出文件格式需要支持根據(jù)key進(jìn)行有效的隨機(jī)查找會(huì)比較方便。同時(shí),輸出的用戶也會(huì)覺(jué)得已經(jīng)排好序的數(shù)據(jù)使用起來(lái)特別方便。

4.3 Combiner Function

在有些情況下,每個(gè)Map task都會(huì)產(chǎn)生大量的中間鍵的重復(fù)而用戶指定的Reduce函數(shù)是交互和關(guān)聯(lián)的。Section 2.1中的單詞統(tǒng)計(jì)就是一個(gè)很好的例子。因?yàn)閱卧~的出現(xiàn)頻率服從于Zipf分布,每個(gè)Map Task都會(huì)產(chǎn)生成百上千個(gè)<the, 1>這樣的記錄。所有這些記錄都會(huì)通過(guò)網(wǎng)絡(luò)被送到一個(gè)Reduce task中,并且由Reduce函數(shù)加在一起去產(chǎn)生一個(gè)數(shù)。我們?cè)试S用戶使用了可選的Cominer函數(shù),用于在網(wǎng)絡(luò)傳輸之前部分地進(jìn)行歸并操作。

Combiner函數(shù)在每個(gè)執(zhí)行Map task的機(jī)器上執(zhí)行。通常Combiner和Reduce函數(shù)使用的是相同的代碼。Reduce函數(shù)和Combiner函數(shù)唯一的不同是MapReduce庫(kù)如何處理函數(shù)的輸出。Reduce函數(shù)的輸出寫到最終的輸出文件中。而Combiner函數(shù)的輸出會(huì)被寫到一個(gè)最終將被送給Reduce task的中間文件中。

部分的合并操作能極大地加速某類特定的MapReduce操作。Appendix A包含了一個(gè)使用Combiner的例子。

4.4 Input and Output Types

MapReduce庫(kù)提供了對(duì)讀入數(shù)據(jù)文件多種的格式支持。例如,"text"格式的輸入將每一行作為鍵值對(duì):key是文件內(nèi)的偏移,value是該行的內(nèi)容。另外一種比較常用的格式存儲(chǔ)一系列按照鍵進(jìn)行排序的鍵值對(duì)。每一個(gè)輸出格式的實(shí)現(xiàn)都知道如何將自己進(jìn)行合理的劃分從而能讓不同的Map task進(jìn)行處理(例如,text模式就知道將區(qū)域劃分到以行為邊界)。用戶可以通過(guò)簡(jiǎn)單地定義一個(gè)reader接口來(lái)提供一個(gè)新的輸入類型的實(shí)現(xiàn)。事實(shí)上,大多數(shù)用戶只使用了預(yù)定義輸入類型的很小一部分。

reader并不一定要從文件中讀取數(shù)據(jù)。例如,我們可以很容易地定義一個(gè)從數(shù)據(jù)庫(kù),或者內(nèi)存中映射的數(shù)據(jù)結(jié)構(gòu)中讀取記錄的reader。

同理,我們也支持產(chǎn)生不同格式的輸出數(shù)據(jù),用戶也能編寫新的輸出數(shù)據(jù)格式。

4.5 Side-effects

在有些情況下,MapReduce的用戶會(huì)很容易發(fā)現(xiàn)Map或者Reduce操作會(huì)產(chǎn)生一些輔助文件作為額外的輸出文件。我們依賴應(yīng)用的編寫者去保證這些副作用是原子和冪等的。一般來(lái)說(shuō),應(yīng)用會(huì)寫到一個(gè)臨時(shí)文件中,并且在它完全產(chǎn)生之后,通過(guò)一個(gè)原子操作將它重命名。

對(duì)于一個(gè)單一的task產(chǎn)生的多個(gè)輸出文件,我們不提供原子性的兩相提交支持。因此,產(chǎn)生多個(gè)輸出文件并且有跨文件一致性要求的task需要是確定性的。但是這樣的限制在實(shí)踐過(guò)程中并不是什么問(wèn)題。

4.5 Skipping Bad Records

有時(shí)候,如果用戶的代碼中有bug的話,會(huì)導(dǎo)致Map或者Reduce操作在某些記錄上崩潰。這些bug會(huì)導(dǎo)致MapReduce操作的正常完成。對(duì)于這種情況,通常就是去修bug。不過(guò)有時(shí)候這是不可行的,也許bug是第三方庫(kù)造成的,而我們并不能得到它的源代碼。而且,有時(shí)候我們?cè)试S忽略掉一些記錄,例如在對(duì)一個(gè)大數(shù)據(jù)集做分析的時(shí)候。因此我們提供了一種可選的執(zhí)行模式,當(dāng)MapReduce庫(kù)檢測(cè)到一些記錄會(huì)造成崩潰時(shí),就會(huì)主動(dòng)跳過(guò)它們,從而保證正常地運(yùn)行。

每一個(gè)worker進(jìn)程都安裝了一個(gè)signal handler用于捕捉段錯(cuò)誤和bug。在調(diào)用用戶的Map和Reduce操作之前,MapReduce庫(kù)會(huì)將參數(shù)的序號(hào)保存在一個(gè)全局變量中。如果用戶代碼產(chǎn)生了一個(gè)信號(hào),signal handler就會(huì)傳輸一個(gè)參數(shù)含有序號(hào)的"last gasp"UDP包給MapReduce的master。當(dāng)master在一個(gè)特定的記錄中發(fā)現(xiàn)了不知一次的錯(cuò)誤,這表示在下一次執(zhí)行相應(yīng)的Map或者Reduce操作的時(shí)候一個(gè)將它跳過(guò)。

4.7 Local Execution

Map或者Reduce函數(shù)的調(diào)試問(wèn)題是非常tricky的。因?yàn)閷?shí)際的計(jì)算發(fā)生在分布式的系統(tǒng)中,通常由成百上千臺(tái)機(jī)器組成,并且工作的分配由master動(dòng)態(tài)執(zhí)行。為了幫助調(diào)試,分析,以及小規(guī)模的測(cè)試,我們開(kāi)發(fā)了另外一個(gè)MapReduce庫(kù)的實(shí)現(xiàn),它能夠在本地機(jī)器上順序執(zhí)行一個(gè)MapReduce操作的所有工作。它的控制交給用戶,因此計(jì)算可以被限定到制定的Map task中執(zhí)行。用戶利用指定的flag啟動(dòng)程序,然后就能非常簡(jiǎn)單地使用任何它們覺(jué)得有用的調(diào)試或者測(cè)試工具了。

4.8 Status Information

master運(yùn)行了一個(gè)內(nèi)置的HTTP server并且暴露了一系列供人類使用的狀態(tài)頁(yè)。狀態(tài)頁(yè)會(huì)顯示程序的計(jì)算過(guò)程,例如已經(jīng)完成了多少個(gè)task,還有多少個(gè)task正在執(zhí)行,輸入的字節(jié)數(shù),中間數(shù)據(jù)的字節(jié)數(shù),輸出的字節(jié)數(shù),以及處理速度等等。該頁(yè)還包含了指向各個(gè)task的標(biāo)準(zhǔn)錯(cuò)誤和標(biāo)準(zhǔn)輸出鏈接。用戶可以利用這些數(shù)據(jù)來(lái)判斷計(jì)算會(huì)持續(xù)多長(zhǎng)時(shí)間,以及計(jì)算是否需要添加更多的資源。這些頁(yè)面還能用來(lái)發(fā)現(xiàn)什么時(shí)候處理速度比預(yù)期地下降好多。

另外,頂層的狀態(tài)頁(yè)顯示了那些worker出錯(cuò)了,以及在它們出錯(cuò)時(shí)正在執(zhí)行哪些Map和Reduce task。這些信息在診斷用戶代碼出現(xiàn)的bug時(shí)是非常有用的。

4.9 Counter

MapReduce庫(kù)提供了一個(gè)叫counter的設(shè)施用于統(tǒng)計(jì)各種不同事件出現(xiàn)的次數(shù)。例如,用戶可能想要統(tǒng)計(jì)已經(jīng)處理過(guò)的單詞的數(shù)目或者德國(guó)文件的索引數(shù)量。

為了使用這一特性,用戶代碼創(chuàng)建一個(gè)命名的counter對(duì)象,并且在Map以及Reduce函數(shù)中對(duì)counter進(jìn)行增加。例如:

Counter* uppercase;uppercase = GetCounter("uppercase")map(String name, String contents):for each word w in contents:if(IsCapitalized(w)):uppercase->Increment();    EmitIntermediate(w, "1");

每個(gè)worker機(jī)器上counter的值會(huì)定期傳給master(捎帶在給master的ping回復(fù)中)。master將來(lái)自成功執(zhí)行的Map和Reduce task的counter值聚集起來(lái)。然后在MapReduce操作完成之后返回給用戶代碼。當(dāng)前的counter值也會(huì)顯示在master的狀態(tài)頁(yè)上,所以用戶能從實(shí)時(shí)觀看計(jì)算的進(jìn)行。在聚集counter的值的時(shí)候,master會(huì)消除Map或者Reduce task的重復(fù)執(zhí)行造成的重復(fù)計(jì)算。(重復(fù)執(zhí)行可能由backup tasks或者因?yàn)殄e(cuò)誤重新執(zhí)行的task引起)。

有些counter的值是由MapReduce庫(kù)自動(dòng)維護(hù)的,例如已經(jīng)處理的輸入鍵值對(duì)數(shù)目以及已經(jīng)產(chǎn)生的輸出鍵值對(duì)數(shù)目。

用戶發(fā)現(xiàn)counter特性對(duì)于檢查MapReduce操作的執(zhí)行是非常有用的。例如,在有些MapReduce操作中,用戶代碼想要確保產(chǎn)生的輸出對(duì)的數(shù)目和已經(jīng)處理的輸入對(duì)的數(shù)目是恰好相等的,或者處理的德語(yǔ)文件的數(shù)目占總處理文件數(shù)目的比重在一個(gè)可容忍的范圍內(nèi)。

5 Performance

在這個(gè)section中,我們通過(guò)運(yùn)行在一個(gè)集群上的兩個(gè)computation來(lái)測(cè)試MapReduce的性能。一個(gè)Computation搜索一個(gè)T的數(shù)據(jù),從中獲取一個(gè)特定的模式。另一個(gè)computation對(duì)一個(gè)T的數(shù)據(jù)進(jìn)行排序。

這兩個(gè)程序代表了由用戶實(shí)際編寫的MapReduce程序的一個(gè)子集------一類程序用于將數(shù)據(jù)從一種表示方法切換到另一種表示方法。另一類程序則從大數(shù)據(jù)集中抽取出一小部分有趣的數(shù)據(jù)。

5.1 Cluster Configuration

所有程序都運(yùn)行在一個(gè)由1800臺(tái)機(jī)器組成的機(jī)器上。每一臺(tái)機(jī)器都有兩個(gè)2GHz 的Intel Xeon處理器,并且Hyper-Threading打開(kāi), 4GB內(nèi)存,兩個(gè)160GB的IDE磁盤,以及一個(gè)G的以太網(wǎng)鏈路。這些機(jī)器被安排在一個(gè)兩層樹(shù)狀的交換網(wǎng)絡(luò)中,根節(jié)點(diǎn)的帶寬大概在100-200Gbps。因?yàn)樗袡C(jī)器都在同一個(gè)托管設(shè)備中,因此任意兩臺(tái)機(jī)器見(jiàn)的通信時(shí)間少于1ms。

其中4GB中的1-1.5G是為集群中運(yùn)行的其他任務(wù)預(yù)留的。程序在一個(gè)周末的下午運(yùn)行,此時(shí)CPU,磁盤,網(wǎng)絡(luò)基本都處于空閑狀態(tài)。

5.2 Grep

grep程序需要掃描10的十次方條100-byte的記錄,搜索一個(gè)相對(duì)罕見(jiàn)的三字符模式(出現(xiàn)了92337次)。輸入被分成大概64MB份(M = 15000),所有的輸出文件都存放在一個(gè)文件中(R = 1)。

Figure 2顯示了Computation隨著時(shí)間的變化過(guò)程。Y軸代表了輸入數(shù)據(jù)的掃描速度。隨著機(jī)器逐漸加入MapReduce的計(jì)算當(dāng)中,速度越來(lái)越快,當(dāng)有1764個(gè)worker加入時(shí),達(dá)到峰值30GB/s。隨著Map task的結(jié)束,速度開(kāi)始下降并且在80s的時(shí)候到達(dá)0,。整個(gè)Computation從開(kāi)始到結(jié)束總共花費(fèi)了大概150s。這其中還包括了1分鐘的啟動(dòng)開(kāi)銷。開(kāi)銷主要來(lái)源于將程序分發(fā)到worker machine中,和GFS交互并打開(kāi)1000個(gè)輸入文件,以及獲取局部性優(yōu)化所需的信息的延時(shí)。

5.3 Sort

排序程序用于對(duì)10的十次方條記錄(大概1T的數(shù)據(jù))進(jìn)行排序。程序以TeraSort benchmark為模型。

總結(jié)

以上是生活随笔為你收集整理的map/reduce的概念的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。