欧美性猛交XXXX免费看蜜桃,成人网18免费韩国,亚洲国产成人精品区综合,欧美日韩一区二区三区高清不卡,亚洲综合一区二区精品久久

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費電子書(shū)等14項超值服

開(kāi)通VIP
Google MapReduce中文版

Google MapReduce中文版

    譯者: alex
 

摘要

MapReduce是一個(gè)編程模型,也是一個(gè)處理和生成超大數據集的算法模型的相關(guān)實(shí)現。用戶(hù)首先創(chuàng )建一個(gè)Map函數處理一個(gè)基于key/value pair的數據集合,輸出中間的基于key/valuepair的數據集合;然后再創(chuàng )建一個(gè)Reduce函數用來(lái)合并所有的具有相同中間key值的中間value值?,F實(shí)世界中有很多滿(mǎn)足上述處理模型的例子,本論文將詳細描述這個(gè)模型。
 
MapReduce架構的程序能夠在大量的普通配置的計算機上實(shí)現并行化處理。這個(gè)系統在運行時(shí)只關(guān)心:如何分割輸入數據,在大量計算機組成的集群上的調度,集群中計算機的錯誤處理,管理集群中計算機之間必要的通信。采用MapReduce架構可以使那些沒(méi)有并行計算和分布式處理系統開(kāi)發(fā)經(jīng)驗的程序員有效利用分布式系統的豐富資源。
 
我們的MapReduce實(shí)現運行在規??梢造`活調整的由普通機器組成的集群上:一個(gè)典型的MapReduce計算往往由幾千臺機器組成、處理以TB計算的數據。程序員發(fā)現這個(gè)系統非常好用:已經(jīng)實(shí)現了數以百計的MapReduce程序,在Google的集群上,每天都有1000多個(gè)MapReduce程序在執行。

1、介紹

在過(guò)去的5年里,包括本文作者在內的Google的很多程序員,為了處理海量的原始數據,已經(jīng)實(shí)現了數以百計的、專(zhuān)用的計算方法。這些計算方法用來(lái)處理大量的原始數據,比如,文檔抓?。?lèi)似網(wǎng)絡(luò )爬蟲(chóng)的程序)、Web請求日志等等;也為了計算處理各種類(lèi)型的衍生數據,比如倒排索引、Web文檔的圖結構的各種表示形勢、每臺主機上網(wǎng)絡(luò )爬蟲(chóng)抓取的頁(yè)面數量的匯總、每天被請求的最多的查詢(xún)的集合等等。大多數這樣的數據處理運算在概念上很容易理解。然而由于輸入的數據量巨大,因此要想在可接受的時(shí)間內完成運算,只有將這些計算分布在成百上千的主機上。如何處理并行計算、如何分發(fā)數據、如何處理錯誤?所有這些問(wèn)題綜合在一起,需要大量的代碼處理,因此也使得原本簡(jiǎn)單的運算變得難以處理。
 
為了解決上述復雜的問(wèn)題,我們設計一個(gè)新的抽象模型,使用這個(gè)抽象模型,我們只要表述我們想要執行的簡(jiǎn)單運算即可,而不必關(guān)心并行計算、容錯、數據分布、負載均衡等復雜的細節,這些問(wèn)題都被封裝在了一個(gè)庫里面。設計這個(gè)抽象模型的靈感來(lái)自L(fǎng)isp和許多其他函數式語(yǔ)言的Map和Reduce的原語(yǔ)。我們意識到我們大多數的運算都包含這樣的操作:在輸入數據的“邏輯”記錄上應用Map操作得出一個(gè)中間key/valuepair集合,然后在所有具有相同key值的value值上應用Reduce操作,從而達到合并中間的數據,得到一個(gè)想要的結果的目的。使用MapReduce模型,再結合用戶(hù)實(shí)現的Map和Reduce函數,我們就可以非常容易的實(shí)現大規模并行化計算;通過(guò)MapReduce模型自帶的“再次執行”(re-execution)功能,也提供了初級的容災實(shí)現方案。
 
這個(gè)工作(實(shí)現一個(gè)MapReduce框架模型)的主要貢獻是通過(guò)簡(jiǎn)單的接口來(lái)實(shí)現自動(dòng)的并行化和大規模的分布式計算,通過(guò)使用MapReduce模型接口實(shí)現在大量普通的PC機上高性能計算。
 
第二部分描述基本的編程模型和一些使用案例。第三部分描述了一個(gè)經(jīng)過(guò)裁剪的、適合我們的基于集群的計算環(huán)境的MapReduce實(shí)現。第四部分描述我們認為在MapReduce編程模型中一些實(shí)用的技巧。第五部分對于各種不同的任務(wù),測量我們MapReduce實(shí)現的性能。第六部分揭示了在Google內部如何使用MapReduce作為基礎重寫(xiě)我們的索引系統產(chǎn)品,包括其它一些使用MapReduce的經(jīng)驗。第七部分討論相關(guān)的和未來(lái)的工作。

2、編程模型

MapReduce編程模型的原理是:利用一個(gè)輸入key/value pair集合來(lái)產(chǎn)生一個(gè)輸出的key/value pair集合。MapReduce庫的用戶(hù)用兩個(gè)函數表達這個(gè)計算:Map和Reduce。
 
用戶(hù)自定義的Map函數接受一個(gè)輸入的key/value pair值,然后產(chǎn)生一個(gè)中間key/value pair值的集合。MapReduce庫把所有具有相同中間key值I的中間value值集合在一起后傳遞給reduce函數。
 
用戶(hù)自定義的Reduce函數接受一個(gè)中間key的值I和相關(guān)的一個(gè)value值的集合。Reduce函數合并這些value值,形成一個(gè)較小的value值的集合。一般的,每次Reduce函數調用只產(chǎn)生0或1個(gè)輸出value值。通常我們通過(guò)一個(gè)迭代器把中間value值提供給Reduce函數,這樣我們就可以處理無(wú)法全部放入內存中的大量的value值的集合。

2.1、例子

例如,計算一個(gè)大的文檔集合中每個(gè)單詞出現的次數,下面是偽代碼段:
map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
        EmitIntermediate(w, “1″);
reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));
 
Map函數輸出文檔中的每個(gè)詞、以及這個(gè)詞的出現次數(在這個(gè)簡(jiǎn)單的例子里就是1)。Reduce函數把Map函數產(chǎn)生的每一個(gè)特定的詞的計數累加起來(lái)。
 
另外,用戶(hù)編寫(xiě)代碼,使用輸入和輸出文件的名字、可選的調節參數來(lái)完成一個(gè)符合MapReduce模型規范的對象,然后調用MapReduce函數,并把這個(gè)規范對象傳遞給它。用戶(hù)的代碼和MapReduce庫鏈接在一起(用C++實(shí)現)。附錄A包含了這個(gè)實(shí)例的全部程序代碼。

2.2、類(lèi)型

盡管在前面例子的偽代碼中使用了以字符串表示的輸入輸出值,但是在概念上,用戶(hù)定義的Map和Reduce函數都有相關(guān)聯(lián)的類(lèi)型:
map(k1,v1) ->list(k2,v2)
  reduce(k2,list(v2)) ->list(v2)

比如,輸入的key和value值與輸出的key和value值在類(lèi)型上推導的域不同。此外,中間key和value值與輸出key和value值在類(lèi)型上推導的域相同。

(alex注:原文中這個(gè)domain的含義不是很清楚,我參考Hadoop、KFS等實(shí)現,map和reduce都使用了泛型,因此,我把domain翻譯成類(lèi)型推導的域)。
我們的C++中使用字符串類(lèi)型作為用戶(hù)自定義函數的輸入輸出,用戶(hù)在自己的代碼中對字符串進(jìn)行適當的類(lèi)型轉換。

2.3、更多的例子

這里還有一些有趣的簡(jiǎn)單例子,可以很容易的使用MapReduce模型來(lái)表示:
  • 分布式的Grep:Map函數輸出匹配某個(gè)模式的一行,Reduce函數是一個(gè)恒等函數,即把中間數據復制到輸出。
  • 計算URL訪(fǎng)問(wèn)頻率:Map函數處理日志中web頁(yè)面請求的記錄,然后輸出(URL,1)。Reduce函數把相同URL的value值都累加起來(lái),產(chǎn)生(URL,記錄總數)結果。
  • 倒轉網(wǎng)絡(luò )鏈接圖:Map函數在源頁(yè)面(source)中搜索所有的鏈接目標(target)并輸出為(target,source)。Reduce函數把給定鏈接目標(target)的鏈接組合成一個(gè)列表,輸出(target,list(source))。
  • 每個(gè)主機的檢索詞向量:檢索詞向量用一個(gè)(詞,頻率)列表來(lái)概述出現在文檔或文檔集中的最重要的一些詞。Map函數為每一個(gè)輸入文檔輸出(主機 名,檢索詞向量),其中主機名來(lái)自文檔的URL。Reduce函數接收給定主機的所有文檔的檢索詞向量,并把這些檢索詞向量加在一起,丟棄掉低頻的檢索 詞,輸出一個(gè)最終的(主機名,檢索詞向量)。
  • 倒排索引:Map函數分析每個(gè)文檔輸出一個(gè)(詞,文檔號)的列表,Reduce函數的輸入是一個(gè)給定詞的所有(詞,文檔號),排序所有的文檔號,輸出(詞,list(文檔號))。所有的輸出集合形成一個(gè)簡(jiǎn)單的倒排索引,它以一種簡(jiǎn)單的算法跟蹤詞在文檔中的位置。
  • 分布式排序:Map函數從每個(gè)記錄提取key,輸出(key,record)。Reduce函數不改變任何的值。這個(gè)運算依賴(lài)分區機制(在4.1描述)和排序屬性(在4.2描述)。

3、實(shí)現

MapReduce模型可以有多種不同的實(shí)現方式。如何正確選擇取決于具體的環(huán)境。例如,一種實(shí)現方式適用于小型的共享內存方式的機器,另外一種實(shí)現方式則適用于大型NUMA架構的多處理器的主機,而有的實(shí)現方式更適合大型的網(wǎng)絡(luò )連接集群。
本章節描述一個(gè)適用于Google內部廣泛使用的運算環(huán)境的實(shí)現:用以太網(wǎng)交換機連接、由普通PC機組成的大型集群。在我們的環(huán)境里包括:
1.x86架構、運行Linux操作系統、雙處理器、2-4GB內存的機器。
2.普通的網(wǎng)絡(luò )硬件設備,每個(gè)機器的帶寬為百兆或者千兆,但是遠小于網(wǎng)絡(luò )的平均帶寬的一半。 (alex注:這里需要網(wǎng)絡(luò )專(zhuān)家解釋一下了)
3.集群中包含成百上千的機器,因此,機器故障是常態(tài)。
4.存儲為廉價(jià)的內置IDE硬盤(pán)。一個(gè)內部分布式文件系統用來(lái)管理存儲在這些磁盤(pán)上的數據。文件系統通過(guò)數據復制來(lái)在不可靠的硬件上保證數據的可靠性和有效性。
5.用戶(hù)提交工作(job)給調度系統。每個(gè)工作(job)都包含一系列的任務(wù)(task),調度系統將這些任務(wù)調度到集群中多臺可用的機器上。

3.1、執行概括

通過(guò)將Map調用的輸入數據自動(dòng)分割為M個(gè)數據片段的集合,Map調用被分布到多臺機器上執行。輸入的數據片段能夠在不同的機器上并行處理。使用分區函數將Map調用產(chǎn)生的中間key值分成R個(gè)不同分區(例如,hash(key) modR),Reduce調用也被分布到多臺機器上執行。分區數量(R)和分區函數由用戶(hù)來(lái)指定。
圖1展示了我們的MapReduce實(shí)現中操作的全部流程。當用戶(hù)調用MapReduce函數時(shí),將發(fā)生下面的一系列動(dòng)作(下面的序號和圖1中的序號一一對應):
1.用戶(hù)程序首先調用的MapReduce庫將輸入文件分成M個(gè)數據片度,每個(gè)數據片段的大小一般從 16MB到64MB(可以通過(guò)可選的參數來(lái)控制每個(gè)數據片段的大小)。然后用戶(hù)程序在機群中創(chuàng )建大量的程序副本。 (alex:copies of the program還真難翻譯)
2.這些程序副本中的有一個(gè)特殊的程序–master。副本中其它的程序都是worker程序,由master分配任務(wù)。有M個(gè)Map任務(wù)和R個(gè)Reduce任務(wù)將被分配,master將一個(gè)Map任務(wù)或Reduce任務(wù)分配給一個(gè)空閑的worker。
3.被分配了map任務(wù)的worker程序讀取相關(guān)的輸入數據片段,從輸入的數據片段中解析出key/value pair,然后把key/value pair傳遞給用戶(hù)自定義的Map函數,由Map函數生成并輸出的中間key/value pair,并緩存在內存中。
4.緩存中的key/value pair通過(guò)分區函數分成R個(gè)區域,之后周期性的寫(xiě)入到本地磁盤(pán)上。緩存的key/value pair在本地磁盤(pán)上的存儲位置將被回傳給master,由master負責把這些存儲位置再傳送給Reduce worker。
5.當Reduce worker程序接收到master程序發(fā)來(lái)的數據存儲位置信息后,使用RPC從Mapworker所在主機的磁盤(pán)上讀取這些緩存數據。當Reduceworker讀取了所有的中間數據后,通過(guò)對key進(jìn)行排序后使得具有相同key值的數據聚合在一起。由于許多不同的key值會(huì )映射到相同的Reduce任務(wù)上,因此必須進(jìn)行排序。如果中間數據太大無(wú)法在內存中完成排序,那么就要在外部進(jìn)行排序。
6.Reduce worker程序遍歷排序后的中間數據,對于每一個(gè)唯一的中間key值,Reduce worker程序將這個(gè)key值和它相關(guān)的中間value值的集合傳遞給用戶(hù)自定義的Reduce函數。Reduce函數的輸出被追加到所屬分區的輸出文件。
7.當所有的Map和Reduce任務(wù)都完成之后,master喚醒用戶(hù)程序。在這個(gè)時(shí)候,在用戶(hù)程序里的對MapReduce調用才返回。
 

在成功完成任務(wù)之后,MapReduce的輸出存放在R個(gè)輸出文件中(對應每個(gè)Reduce任務(wù)產(chǎn)生一個(gè)輸出文件,文件名由用戶(hù)指定)。一般情況下,用戶(hù)不需要將這R個(gè)輸出文件合并成一個(gè)文件–他們經(jīng)常把這些文件作為另外一個(gè)MapReduce的輸入,或者在另外一個(gè)可以處理多個(gè)分割文件的分布式應用中使用。

3.2、Master數據結構

Master持有一些數據結構,它存儲每一個(gè)Map和Reduce任務(wù)的狀態(tài)(空閑、工作中或完成),以及Worker機器(非空閑任務(wù)的機器)的標識。
 
Master就像一個(gè)數據管道,中間文件存儲區域的位置信息通過(guò)這個(gè)管道從Map傳遞到Reduce。因此,對于每個(gè)已經(jīng)完成的Map任務(wù),master存儲了Map任務(wù)產(chǎn)生的R個(gè)中間文件存儲區域的大小和位置。當Map任務(wù)完成時(shí),Master接收到位置和大小的更新信息,這些信息被逐步遞增的推送給那些正在工作的Reduce任務(wù)。

3.3、容錯

因為MapReduce庫的設計初衷是使用由成百上千的機器組成的集群來(lái)處理超大規模的數據,所以,這個(gè)庫必須要能很好的處理機器故障。
worker故障
master周期性的ping每個(gè)worker。如果在一個(gè)約定的時(shí)間范圍內沒(méi)有收到worker返回的信息,master將把這個(gè)worker標記為失效。所有由這個(gè)失效的worker完成的Map任務(wù)被重設為初始的空閑狀態(tài),之后這些任務(wù)就可以被安排給其他的worker。同樣的,worker失效時(shí)正在運行的Map或Reduce任務(wù)也將被重新置為空閑狀態(tài),等待重新調度。
 

當worker故障時(shí),由于已經(jīng)完成的Map任務(wù)的輸出存儲在這臺機器上,Map任務(wù)的輸出已不可訪(fǎng)問(wèn)了,因此必須重新執行。而已經(jīng)完成的Reduce任務(wù)的輸出存儲在全局文件系統上,因此不需要再次執行。

 
當一個(gè)Map任務(wù)首先被worker A執行,之后由于worker A失效了又被調度到workerB執行,這個(gè)“重新執行”的動(dòng)作會(huì )被通知給所有執行Reduce任務(wù)的worker。任何還沒(méi)有從workerA讀取數據的Reduce任務(wù)將從worker B讀取數據。
 
MapReduce可以處理大規模worker失效的情況。比如,在一個(gè)MapReduce操作執行期間,在正在運行的集群上進(jìn)行網(wǎng)絡(luò )維護引起80臺機器在幾分鐘內不可訪(fǎng)問(wèn)了,MapReducemaster只需要簡(jiǎn)單的再次執行那些不可訪(fǎng)問(wèn)的worker完成的工作,之后繼續執行未完成的任務(wù),直到最終完成這個(gè)MapReduce操作。
 
master失敗
一個(gè)簡(jiǎn)單的解決辦法是讓master周期性的將上面描述的數據結構(alex注:指3.2節)的寫(xiě)入磁盤(pán),即檢查點(diǎn)(checkpoint)。如果這個(gè)master任務(wù)失效了,可以從最后一個(gè)檢查點(diǎn)(checkpoint)開(kāi)始啟動(dòng)另一個(gè)master進(jìn)程。然而,由于只有一個(gè)master進(jìn)程,master失效后再恢復是比較麻煩的,因此我們現在的實(shí)現是如果master失效,就中止MapReduce運算??蛻?hù)可以檢查到這個(gè)狀態(tài),并且可以根據需要重新執行MapReduce操作。
 
在失效方面的處理機制
(alex注:原文為”semantics in the presence of failures”)
當用戶(hù)提供的Map和Reduce操作是輸入確定性函數(即相同的輸入產(chǎn)生相同的輸出)時(shí),我們的分布式實(shí)現在任何情況下的輸出都和所有程序沒(méi)有出現任何錯誤、順序的執行產(chǎn)生的輸出是一樣的。
 
我們依賴(lài)對Map和Reduce任務(wù)的輸出是原子提交的來(lái)完成這個(gè)特性。每個(gè)工作中的任務(wù)把它的輸出寫(xiě)到私有的臨時(shí)文件中。每個(gè)Reduce任務(wù)生成一個(gè)這樣的文件,而每個(gè)Map任務(wù)則生成R個(gè)這樣的文件(一個(gè)Reduce任務(wù)對應一個(gè)文件)。當一個(gè)Map任務(wù)完成的時(shí),worker發(fā)送一個(gè)包含R個(gè)臨時(shí)文件名的完成消息給master。如果master從一個(gè)已經(jīng)完成的Map任務(wù)再次接收到到一個(gè)完成消息,master將忽略這個(gè)消息;否則,master將這R個(gè)文件的名字記錄在數據結構里。
 
當Reduce任務(wù)完成時(shí),Reduceworker進(jìn)程以原子的方式把臨時(shí)文件重命名為最終的輸出文件。如果同一個(gè)Reduce任務(wù)在多臺機器上執行,針對同一個(gè)最終的輸出文件將有多個(gè)重命名操作執行。我們依賴(lài)底層文件系統提供的重命名操作的原子性來(lái)保證最終的文件系統狀態(tài)僅僅包含一個(gè)Reduce任務(wù)產(chǎn)生的數據。
 

使用MapReduce模型的程序員可以很容易的理解他們程序的行為,因為我們絕大多數的Map和Reduce操作是確定性的,而且存在這樣的一個(gè)事實(shí):我們的失效處理機制等價(jià)于一個(gè)順序的執行的操作。當Map或/和Reduce操作是不確定性的時(shí)候,我們提供雖然較弱但是依然合理的處理機制。當使用非確定操作的時(shí)候,一個(gè)Reduce任務(wù)R1的輸出等價(jià)于一個(gè)非確定性程序順序執行產(chǎn)生時(shí)的輸出。但是,另一個(gè)Reduce任務(wù)R2的輸出也許符合一個(gè)不同的非確定順序程序執行產(chǎn)生的R2的輸出。

 
考慮Map任務(wù)M和Reduce任務(wù)R1、R2的情況。我們設定e(Ri)是Ri已經(jīng)提交的執行過(guò)程(有且僅有一個(gè)這樣的執行過(guò)程)。當e(R1)讀取了由M一次執行產(chǎn)生的輸出,而e(R2)讀取了由M的另一次執行產(chǎn)生的輸出,導致了較弱的失效處理。

3.4、存儲位置

在我們的計算運行環(huán)境中,網(wǎng)絡(luò )帶寬是一個(gè)相當匱乏的資源。我們通過(guò)盡量把輸入數據(由GFS管理)存儲在集群中機器的本地磁盤(pán)上來(lái)節省網(wǎng)絡(luò )帶寬。GFS把每個(gè)文件按64MB一個(gè)Block分隔,每個(gè)Block保存在多臺機器上,環(huán)境中就存放了多份拷貝(一般是3個(gè)拷貝)。MapReduce的master在調度Map任務(wù)時(shí)會(huì )考慮輸入文件的位置信息,盡量將一個(gè)Map任務(wù)調度在包含相關(guān)輸入數據拷貝的機器上執行;如果上述努力失敗了,master將嘗試在保存有輸入數據拷貝的機器附近的機器上執行Map任務(wù)(例如,分配到一個(gè)和包含輸入數據的機器在一個(gè)switch里的worker機器上執行)。當在一個(gè)足夠大的cluster集群上運行大型MapReduce操作的時(shí)候,大部分的輸入數據都能從本地機器讀取,因此消耗非常少的網(wǎng)絡(luò )帶寬。

3.5、任務(wù)粒度

如前所述,我們把Map拆分成了M個(gè)片段、把Reduce拆分成R個(gè)片段執行。理想情況下,M和R應當比集群中worker的機器數量要多得多。在每臺worker機器都執行大量的不同任務(wù)能夠提高集群的動(dòng)態(tài)的負載均衡能力,并且能夠加快故障恢復的速度:失效機器上執行的大量Map任務(wù)都可以分布到所有其他的worker機器上去執行。
 

但是實(shí)際上,在我們的具體實(shí)現中對M和R的取值都有一定的客觀(guān)限制,因為master必須執行O(M+R)次調度,并且在內存中保存O(M*R)個(gè)狀態(tài)(對影響內存使用的因素還是比較小的:O(M*R)塊狀態(tài),大概每對Map任務(wù)/Reduce任務(wù)1個(gè)字節就可以了)。

 
更進(jìn)一步,R值通常是由用戶(hù)指定的,因為每個(gè)Reduce任務(wù)最終都會(huì )生成一個(gè)獨立的輸出文件。實(shí)際使用時(shí)我們也傾向于選擇合適的M值,以使得每一個(gè)獨立任務(wù)都是處理大約16M到64M的輸入數據(這樣,上面描寫(xiě)的輸入數據本地存儲優(yōu)化策略才最有效),另外,我們把R值設置為我們想使用的worker機器數量的小的倍數。我們通常會(huì )用這樣的比例來(lái)執行MapReduce:M=200000,R=5000,使用2000臺worker機器。

3.6、備用任務(wù)

影響一個(gè)MapReduce的總執行時(shí)間最通常的因素是“落伍者”:在運算過(guò)程中,如果有一臺機器花了很長(cháng)的時(shí)間才完成最后幾個(gè)Map或Reduce任務(wù),導致MapReduce操作總的執行時(shí)間超過(guò)預期。出現“落伍者”的原因非常多。比如:如果一個(gè)機器的硬盤(pán)出了問(wèn)題,在讀取的時(shí)候要經(jīng)常的進(jìn)行讀取糾錯操作,導致讀取數據的速度從30M/s降低到1M/s。如果cluster的調度系統在這臺機器上又調度了其他的任務(wù),由于CPU、內存、本地硬盤(pán)和網(wǎng)絡(luò )帶寬等競爭因素的存在,導致執行MapReduce代碼的執行效率更加緩慢。我們最近遇到的一個(gè)問(wèn)題是由于機器的初始化代碼有bug,導致關(guān)閉了的處理器的緩存:在這些機器上執行任務(wù)的性能和正常情況相差上百倍。
 
我們有一個(gè)通用的機制來(lái)減少“落伍者”出現的情況。當一個(gè)MapReduce操作接近完成的時(shí)候,master調度備用(backup)任務(wù)進(jìn)程來(lái)執行剩下的、處于處理中狀態(tài)(in-progress)的任務(wù)。無(wú)論是最初的執行進(jìn)程、還是備用(backup)任務(wù)進(jìn)程完成了任務(wù),我們都把這個(gè)任務(wù)標記成為已經(jīng)完成。我們調優(yōu)了這個(gè)機制,通常只會(huì )占用比正常操作多幾個(gè)百分點(diǎn)的計算資源。我們發(fā)現采用這樣的機制對于減少超大MapReduce操作的總處理時(shí)間效果顯著(zhù)。例如,在5.3節描述的排序任務(wù),在關(guān)閉掉備用任務(wù)的情況下要多花44%的時(shí)間完成排序任務(wù)。
 

4、技巧

雖然簡(jiǎn)單的Map和Reduce函數提供的基本功能已經(jīng)能夠滿(mǎn)足大部分的計算需要,我們還是發(fā)掘出了一些有價(jià)值的擴展功能。本節將描述這些擴展功能。

4.1、分區函數

MapReduce的使用者通常會(huì )指定Reduce任務(wù)和Reduce任務(wù)輸出文件的數量(R)。我們在中間key上使用分區函數來(lái)對數據進(jìn)行分區,之后再輸入到后續任務(wù)執行進(jìn)程。一個(gè)缺省的分區函數是使用hash方法(比如,hash(key) modR)進(jìn)行分區。hash方法能產(chǎn)生非常平衡的分區。然而,有的時(shí)候,其它的一些分區函數對key值進(jìn)行的分區將非常有用。比如,輸出的key值是URLs,我們希望每個(gè)主機的所有條目保持在同一個(gè)輸出文件中。為了支持類(lèi)似的情況,MapReduce庫的用戶(hù)需要提供專(zhuān)門(mén)的分區函數。例如,使用“hash(Hostname(urlkey)) mod R”作為分區函數就可以把所有來(lái)自同一個(gè)主機的URLs保存在同一個(gè)輸出文件中。

4.2、順序保證

我們確保在給定的分區中,中間key/value pair數據的處理順序是按照key值增量順序處理的。這樣的順序保證對每個(gè)分成生成一個(gè)有序的輸出文件,這對于需要對輸出文件按key值隨機存取的應用非常有意義,對在排序輸出的數據集也很有幫助。

4.3、Combiner函數

在某些情況下,Map函數產(chǎn)生的中間key值的重復數據會(huì )占很大的比重,并且,用戶(hù)自定義的Reduce函數滿(mǎn)足結合律和交換律。在2.1節的詞數統計程序是個(gè)很好的例子。由于詞頻率傾向于一個(gè)zipf分布(齊夫分布),每個(gè)Map任務(wù)將產(chǎn)生成千上萬(wàn)個(gè)這樣的記錄<the,1>。所有的這些記錄將通過(guò)網(wǎng)絡(luò )被發(fā)送到一個(gè)單獨的Reduce任務(wù),然后由這個(gè)Reduce任務(wù)把所有這些記錄累加起來(lái)產(chǎn)生一個(gè)數字。我們允許用戶(hù)指定一個(gè)可選的combiner函數,combiner函數首先在本地將這些記錄進(jìn)行一次合并,然后將合并的結果再通過(guò)網(wǎng)絡(luò )發(fā)送出去。
 
Combiner函數在每臺執行Map任務(wù)的機器上都會(huì )被執行一次。一般情況下,Combiner和Reduce函數是一樣的。Combiner函數和Reduce函數之間唯一的區別是MapReduce庫怎樣控制函數的輸出。Reduce函數的輸出被保存在最終的輸出文件里,而Combiner函數的輸出被寫(xiě)到中間文件里,然后被發(fā)送給Reduce任務(wù)。
 

部分的合并中間結果可以顯著(zhù)的提高一些MapReduce操作的速度。附錄A包含一個(gè)使用combiner函數的例子。

4.4、輸入和輸出的類(lèi)型

MapReduce庫支持幾種不同的格式的輸入數據。比如,文本模式的輸入數據的每一行被視為是一個(gè)key/valuepair。key是文件的偏移量,value是那一行的內容。另外一種常見(jiàn)的格式是以key進(jìn)行排序來(lái)存儲的key/valuepair的序列。每種輸入類(lèi)型的實(shí)現都必須能夠把輸入數據分割成數據片段,該數據片段能夠由單獨的Map任務(wù)來(lái)進(jìn)行后續處理(例如,文本模式的范圍分割必須確保僅僅在每行的邊界進(jìn)行范圍分割)。雖然大多數MapReduce的使用者僅僅使用很少的預定義輸入類(lèi)型就滿(mǎn)足要求了,但是使用者依然可以通過(guò)提供一個(gè)簡(jiǎn)單的Reader接口實(shí)現就能夠支持一個(gè)新的輸入類(lèi)型。
 

Reader并非一定要從文件中讀取數據,比如,我們可以很容易的實(shí)現一個(gè)從數據庫里讀記錄的Reader,或者從內存中的數據結構讀取數據的Reader。

類(lèi)似的,我們提供了一些預定義的輸出數據的類(lèi)型,通過(guò)這些預定義類(lèi)型能夠產(chǎn)生不同格式的數據。用戶(hù)采用類(lèi)似添加新的輸入數據類(lèi)型的方式增加新的輸出類(lèi)型。

4.5、副作用

在某些情況下,MapReduce的使用者發(fā)現,如果在Map和/或Reduce操作過(guò)程中增加輔助的輸出文件會(huì )比較省事。我們依靠程序writer把這種“副作用”變成原子的和冪等的(alex注:冪等的指一個(gè)總是產(chǎn)生相同結果的數學(xué)運算)。通常應用程序首先把輸出結果寫(xiě)到一個(gè)臨時(shí)文件中,在輸出全部數據之后,在使用系統級的原子操作rename重新命名這個(gè)臨時(shí)文件。
 

如果一個(gè)任務(wù)產(chǎn)生了多個(gè)輸出文件,我們沒(méi)有提供類(lèi)似兩階段提交的原子操作支持這種情況。因此,對于會(huì )產(chǎn)生多個(gè)輸出文件、并且對于跨文件有一致性要求的任務(wù),都必須是確定性的任務(wù)。但是在實(shí)際應用過(guò)程中,這個(gè)限制還沒(méi)有給我們帶來(lái)過(guò)麻煩。

4.6、跳過(guò)損壞的記錄

有時(shí)候,用戶(hù)程序中的bug導致Map或者Reduce函數在處理某些記錄的時(shí)候crash掉,MapReduce操作無(wú)法順利完成。慣常的做法是修復bug后再次執行MapReduce操作,但是,有時(shí)候找出這些bug并修復它們不是一件容易的事情;這些bug也許是在第三方庫里邊,而我們手頭沒(méi)有這些庫的源代碼。而且在很多時(shí)候,忽略一些有問(wèn)題的記錄也是可以接受的,比如在一個(gè)巨大的數據集上進(jìn)行統計分析的時(shí)候。我們提供了一種執行模式,在這種模式下,為了保證保證整個(gè)處理能繼續進(jìn)行,MapReduce會(huì )檢測哪些記錄導致確定性的crash,并且跳過(guò)這些記錄不處理。
 

每個(gè)worker進(jìn)程都設置了信號處理函數捕獲內存段異常(segmentation violation)和總線(xiàn)錯誤(buserror)。在執行Map或者Reduce操作之前,MapReduce庫通過(guò)全局變量保存記錄序號。如果用戶(hù)程序觸發(fā)了一個(gè)系統信號,消息處理函數將用“最后一口氣”通過(guò)UDP包向master發(fā)送處理的最后一條記錄的序號。當master看到在處理某條特定記錄不止失敗一次時(shí),master就標志著(zhù)條記錄需要被跳過(guò),并且在下次重新執行相關(guān)的Map或者Reduce任務(wù)的時(shí)候跳過(guò)這條記錄。

4.7、本地執行

調試Map和Reduce函數的bug是非常困難的,因為實(shí)際執行操作時(shí)不但是分布在系統中執行的,而且通常是在好幾千臺計算機上執行,具體的執行位置是由master進(jìn)行動(dòng)態(tài)調度的,這又大大增加了調試的難度。為了簡(jiǎn)化調試、profile和小規模測試,我們開(kāi)發(fā)了一套MapReduce庫的本地實(shí)現版本,通過(guò)使用本地版本的MapReduce庫,MapReduce操作在本地計算機上順序的執行。用戶(hù)可以控制MapReduce操作的執行,可以把操作限制到特定的Map任務(wù)上。用戶(hù)通過(guò)設定特別的標志來(lái)在本地執行他們的程序,之后就可以很容易的使用本地調試和測試工具(比如gdb)。

4.8、狀態(tài)信息

master使用嵌入式的HTTP服務(wù)器(如Jetty)顯示一組狀態(tài)信息頁(yè)面,用戶(hù)可以監控各種執行狀態(tài)。狀態(tài)信息頁(yè)面顯示了包括計算執行的進(jìn)度,比如已經(jīng)完成了多少任務(wù)、有多少任務(wù)正在處理、輸入的字節數、中間數據的字節數、輸出的字節數、處理百分比等等。頁(yè)面還包含了指向每個(gè)任務(wù)的stderr和stdout文件的鏈接。用戶(hù)根據這些數據預測計算需要執行大約多長(cháng)時(shí)間、是否需要增加額外的計算資源。這些頁(yè)面也可以用來(lái)分析什么時(shí)候計算執行的比預期的要慢。
 

另外,處于最頂層的狀態(tài)頁(yè)面顯示了哪些worker失效了,以及他們失效的時(shí)候正在運行的Map和Reduce任務(wù)。這些信息對于調試用戶(hù)代碼中的bug很有幫助。

4.9、計數器

MapReduce庫使用計數器統計不同事件發(fā)生次數。比如,用戶(hù)可能想統計已經(jīng)處理了多少個(gè)單詞、已經(jīng)索引的多少篇German文檔等等。
 
為了使用這個(gè)特性,用戶(hù)在程序中創(chuàng )建一個(gè)命名的計數器對象,在Map和Reduce函數中相應的增加計數器的值。例如:
Counter* uppercase;
uppercase = GetCounter(“uppercase”);

map(String name, String contents):
 for each word w in contents:
  if (IsCapitalized(w)):
   uppercase->Increment();
  EmitIntermediate(w, “1″);

這些計數器的值周期性的從各個(gè)單獨的worker機器上傳遞給master(附加在ping的應答包中傳遞)。master把執行成功的Map和Reduce任務(wù)的計數器值進(jìn)行累計,當MapReduce操作完成之后,返回給用戶(hù)代碼。
 
計數器當前的值也會(huì )顯示在master的狀態(tài)頁(yè)面上,這樣用戶(hù)就可以看到當前計算的進(jìn)度。當累加計數器的值的時(shí)候,master要檢查重復運行的Map或者Reduce任務(wù),避免重復累加(之前提到的備用任務(wù)和失效后重新執行任務(wù)這兩種情況會(huì )導致相同的任務(wù)被多次執行)。
 
有些計數器的值是由MapReduce庫自動(dòng)維持的,比如已經(jīng)處理的輸入的key/value pair的數量、輸出的key/value pair的數量等等。
 

計數器機制對于MapReduce操作的完整性檢查非常有用。比如,在某些MapReduce操作中,用戶(hù)需要確保輸出的key value pair精確的等于輸入的key value pair,或者處理的German文檔數量在處理的整個(gè)文檔數量中屬于合理范圍。

5、性能

本節我們用在一個(gè)大型集群上運行的兩個(gè)計算來(lái)衡量MapReduce的性能。一個(gè)計算在大約1TB的數據中進(jìn)行特定的模式匹配,另一個(gè)計算對大約1TB的數據進(jìn)行排序。
 
這兩個(gè)程序在大量的使用MapReduce的實(shí)際應用中是非常典型的 — 一類(lèi)是對數據格式進(jìn)行轉換,從一種表現形式轉換為另外一種表現形式;另一類(lèi)是從海量數據中抽取少部分的用戶(hù)感興趣的數據。

5.1、集群配置

所有這些程序都運行在一個(gè)大約由1800臺機器構成的集群上。每臺機器配置2個(gè)2G主頻、支持超線(xiàn)程的IntelXeon處理器,4GB的物理內存,兩個(gè)160GB的IDE硬盤(pán)和一個(gè)千兆以太網(wǎng)卡。這些機器部署在一個(gè)兩層的樹(shù)形交換網(wǎng)絡(luò )中,在root節點(diǎn)大概有100-200GBPS的傳輸帶寬。所有這些機器都采用相同的部署(對等部署),因此任意兩點(diǎn)之間的網(wǎng)絡(luò )來(lái)回時(shí)間小于1毫秒。
 

在4GB內存里,大概有1-1.5G用于運行在集群上的其他任務(wù)。測試程序在周末下午開(kāi)始執行,這時(shí)主機的CPU、磁盤(pán)和網(wǎng)絡(luò )基本上處于空閑狀態(tài)。

5.2、GREP

這個(gè)分布式的grep程序需要掃描大概10的10次方個(gè)由100個(gè)字節組成的記錄,查找出現概率較小的3個(gè)字符的模式(這個(gè)模式在92337個(gè)記錄中出現)。輸入數據被拆分成大約64M的Block(M=15000),整個(gè)輸出數據存放在一個(gè)文件中(R=1)。

圖2顯示了這個(gè)運算隨時(shí)間的處理過(guò)程。其中Y軸表示輸入數據的處理速度。處理速度隨著(zhù)參與MapReduce計算的機器數量的增加而增加,當1764臺worker參與計算的時(shí),處理速度達到了30GB/s。當Map任務(wù)結束的時(shí)候,即在計算開(kāi)始后80秒,輸入的處理速度降到0。整個(gè)計算過(guò)程從開(kāi)始到結束一共花了大概150秒。這包括了大約一分鐘的初始啟動(dòng)階段。初始啟動(dòng)階段消耗的時(shí)間包括了是把這個(gè)程序傳送到各個(gè)worker機器上的時(shí)間、等待GFS文件系統打開(kāi)1000個(gè)輸入文件集合的時(shí)間、獲取相關(guān)的文件本地位置優(yōu)化信息的時(shí)間。

5.3、排序

排序程序處理10的10次方個(gè)100個(gè)字節組成的記錄(大概1TB的數據)。這個(gè)程序模仿TeraSort benchmark[10]。
 
排序程序由不到50行代碼組成。只有三行的Map函數從文本行中解析出10個(gè)字節的key值作為排序的key,并且把這個(gè)key和原始文本行作為中間的key/value pair值輸出。我們使用了一個(gè)內置的恒等函數作為Reduce操作函數。這個(gè)函數把中間的key/valuepair值不作任何改變輸出。最終排序結果輸出到兩路復制的GFS文件系統(也就是說(shuō),程序輸出2TB的數據)。
 
如前所述,輸入數據被分成64MB的Block(M=15000)。我們把排序后的輸出結果分區后存儲到4000個(gè)文件(R=4000)。分區函數使用key的原始字節來(lái)把數據分區到R個(gè)片段中。
 

在這個(gè)benchmark測試中,我們使用的分區函數知道key的分區情況。通常對于排序程序來(lái)說(shuō),我們會(huì )增加一個(gè)預處理的MapReduce操作用于采樣key值的分布情況,通過(guò)采樣的數據來(lái)計算對最終排序處理的分區點(diǎn)。

圖三(a)顯示了這個(gè)排序程序的正常執行過(guò)程。左上的圖顯示了輸入數據讀取的速度。數據讀取速度峰值會(huì )達到13GB/s,并且所有Map任務(wù)完成之后,即大約200秒之后迅速滑落到0。值得注意的是,排序程序輸入數據讀取速度小于分布式grep程序。這是因為排序程序的Map任務(wù)花了大約一半的處理時(shí)間和I/O帶寬把中間輸出結果寫(xiě)到本地硬盤(pán)。相應的分布式grep程序的中間結果輸出幾乎可以忽略不計。
 
左邊中間的圖顯示了中間數據從Map任務(wù)發(fā)送到Reduce任務(wù)的網(wǎng)絡(luò )速度。這個(gè)過(guò)程從第一個(gè)Map任務(wù)完成之后就開(kāi)始緩慢啟動(dòng)了。圖示的第一個(gè)高峰是啟動(dòng)了第一批大概1700個(gè)Reduce任務(wù)(整個(gè)MapReduce分布到大概1700臺機器上,每臺機器1次最多執行1個(gè)Reduce任務(wù))。排序程序運行大約300秒后,第一批啟動(dòng)的Reduce任務(wù)有些完成了,我們開(kāi)始執行剩下的Reduce任務(wù)。所有的處理在大約600秒后結束。
 
左下圖表示Reduce任務(wù)把排序后的數據寫(xiě)到最終的輸出文件的速度。在第一個(gè)排序階段結束和數據開(kāi)始寫(xiě)入磁盤(pán)之間有一個(gè)小的延時(shí),這是因為worker機器正在忙于排序中間數據。磁盤(pán)寫(xiě)入速度在2-4GB/s持續一段時(shí)間。輸出數據寫(xiě)入磁盤(pán)大約持續850秒。計入初始啟動(dòng)部分的時(shí)間,整個(gè)運算消耗了891秒。這個(gè)速度和TeraSort benchmark[18]的最高紀錄1057秒相差不多。
 

還有一些值得注意的現象:輸入數據的讀取速度比排序速度和輸出數據寫(xiě)入磁盤(pán)速度要高不少,這是因為我們的輸入數據本地化優(yōu)化策略起了作用 —絕大部分數據都是從本地硬盤(pán)讀取的,從而節省了網(wǎng)絡(luò )帶寬。排序速度比輸出數據寫(xiě)入到磁盤(pán)的速度快,這是因為輸出數據寫(xiě)了兩份(我們使用了2路的GFS文件系統,寫(xiě)入復制節點(diǎn)的原因是為了保證數據可靠性和可用性)。我們把輸出數據寫(xiě)入到兩個(gè)復制節點(diǎn)的原因是因為這是底層文件系統的保證數據可靠性和可用性的實(shí)現機制。如果底層文件系統使用類(lèi)似容錯編碼[14](erasurecoding)的方式而不是復制的方式保證數據的可靠性和可用性,那么在輸出數據寫(xiě)入磁盤(pán)的時(shí)候,就可以降低網(wǎng)絡(luò )帶寬的使用。

5.4、高效的backup任務(wù)

圖三(b)顯示了關(guān)閉了備用任務(wù)后排序程序執行情況。執行的過(guò)程和圖3(a)很相似,除了輸出數據寫(xiě)磁盤(pán)的動(dòng)作在時(shí)間上拖了一個(gè)很長(cháng)的尾巴,而且在這段時(shí)間里,幾乎沒(méi)有什么寫(xiě)入動(dòng)作。在960秒后,只有5個(gè)Reduce任務(wù)沒(méi)有完成。這些拖后腿的任務(wù)又執行了300秒才完成。整個(gè)計算消耗了1283秒,多了44%的執行時(shí)間。

5.5、失效的機器

在圖三(c)中演示的排序程序執行的過(guò)程中,我們在程序開(kāi)始后幾分鐘有意的kill了1746個(gè)worker中的200個(gè)。集群底層的調度立刻在這些機器上重新開(kāi)始新的worker處理進(jìn)程(因為只是worker機器上的處理進(jìn)程被kill了,機器本身還在工作)。
 
圖三(c)顯示出了一個(gè)“負”的輸入數據讀取速度,這是因為一些已經(jīng)完成的Map任務(wù)丟失了(由于相應的執行Map任務(wù)的worker進(jìn)程被kill了),需要重新執行這些任務(wù)。相關(guān)Map任務(wù)很快就被重新執行了。整個(gè)運算在933秒內完成,包括了初始啟動(dòng)時(shí)間(只比正常執行多消耗了5%的時(shí)間)。

6、經(jīng)驗

我們在2003年1月完成了第一個(gè)版本的MapReduce庫,在2003年8月的版本有了顯著(zhù)的增強,這包括了輸入數據本地優(yōu)化、worker機器之間的動(dòng)態(tài)負載均衡等等。從那以后,我們驚喜的發(fā)現,MapReduce庫能廣泛應用于我們日常工作中遇到的各類(lèi)問(wèn)題。它現在在Google內部各個(gè)領(lǐng)域得到廣泛應用,包括:
  • 大規模機器學(xué)習問(wèn)題
  • Google News和Froogle產(chǎn)品的集群?jiǎn)?wèn)題
  • 從公眾查詢(xún)產(chǎn)品(比如Google的Zeitgeist)的報告中抽取數據。
  • 從大量的新應用和新產(chǎn)品的網(wǎng)頁(yè)中提取有用信息(比如,從大量的位置搜索網(wǎng)頁(yè)中抽取地理位置信息)。
  • 大規模的圖形計算。
圖四顯示了在我們的源代碼管理系統中,隨著(zhù)時(shí)間推移,獨立的MapReduce程序數量的顯著(zhù)增加。從2003年早些時(shí)候的0個(gè)增長(cháng)到2004年9月份的差不多900個(gè)不同的程序。MapReduce的成功取決于采用MapReduce庫能夠在不到半個(gè)小時(shí)時(shí)間內寫(xiě)出一個(gè)簡(jiǎn)單的程序,這個(gè)簡(jiǎn)單的程序能夠在上千臺機器的組成的集群上做大規模并發(fā)處理,這極大的加快了開(kāi)發(fā)和原形設計的周期。另外,采用MapReduce庫,可以讓完全沒(méi)有分布式和/或并行系統開(kāi)發(fā)經(jīng)驗的程序員很容易的利用大量的資源,開(kāi)發(fā)出分布式和/或并行處理的應用。

在每個(gè)任務(wù)結束的時(shí)候,MapReduce庫統計計算資源的使用狀況。在表1,我們列出了2004年8月份MapReduce運行的任務(wù)所占用的相關(guān)資源。

6.1、大規模索引

到目前為止,MapReduce最成功的應用就是重寫(xiě)了Google網(wǎng)絡(luò )搜索服務(wù)所使用到的index系統。索引系統的輸入數據是網(wǎng)絡(luò )爬蟲(chóng)抓取回來(lái)的海量的文檔,這些文檔數據都保存在GFS文件系統里。這些文檔原始內容(alex注:raw contents,我認為就是網(wǎng)頁(yè)中的剔除html標記后的內容、pdf和word等有格式文檔中提取的文本內容等)的大小超過(guò)了20TB。索引程序是通過(guò)一系列的MapReduce操作(大約5到10次)來(lái)建立索引。使用MapReduce(替換上一個(gè)特別設計的、分布式處理的索引程序)帶來(lái)這些好處:
  • 實(shí)現索引部分的代碼簡(jiǎn)單、小巧、容易理解,因為對于容錯、分布式以及并行計算的處理都是MapReduce庫提供的。比如,使用MapReduce庫,計算的代碼行數從原來(lái)的3800行C++代碼減少到大概700行代碼。
  • MapReduce庫的性能已經(jīng)足夠好了,因此我們可以把在概念上不相關(guān)的計算步驟分開(kāi)處理,而不是混在一起以期減少數據傳遞的額外消耗。概念 上不相關(guān)的計算步驟的隔離也使得我們可以很容易改變索引處理方式。比如,對之前的索引系統的一個(gè)小更改可能要耗費好幾個(gè)月的時(shí)間,但是在使用 MapReduce的新系統上,這樣的更改只需要花幾天時(shí)間就可以了。
  • 索引系統的操作管理更容易了。因為由機器失效、機器處理速度緩慢、以及網(wǎng)絡(luò )的瞬間阻塞等引起的絕大部分問(wèn)題都已經(jīng)由MapReduce庫解決了,不再需要操作人員的介入了。另外,我們可以通過(guò)在索引系統集群中增加機器的簡(jiǎn)單方法提高整體處理性能。

7、相關(guān)工作

很多系統都提供了嚴格的編程模式,并且通過(guò)對編程的嚴格限制來(lái)實(shí)現并行計算。例如,一個(gè)結合函數可以通過(guò)把N個(gè)元素的數組的前綴在N個(gè)處理器上使用并行前綴算法,在log N的時(shí)間內計算完[6,9,13](alex注:完全沒(méi)有明白作者在說(shuō)啥,具體參考相關(guān)6、9、13文檔)。MapReduce可以看作是我們結合在真實(shí)環(huán)境下處理海量數據的經(jīng)驗,對這些經(jīng)典模型進(jìn)行簡(jiǎn)化和萃取的成果。更加值得驕傲的是,我們還實(shí)現了基于上千臺處理器的集群的容錯處理。相比而言,大部分并發(fā)處理系統都只在小規模的集群上實(shí)現,并且把容錯處理交給了程序員。
 
Bulk SynchronousProgramming[17]和一些MPI原語(yǔ)[11]提供了更高級別的并行處理抽象,可以更容易寫(xiě)出并行處理的程序。MapReduce和這些系統的關(guān)鍵不同之處在于,MapReduce利用限制性編程模式實(shí)現了用戶(hù)程序的自動(dòng)并發(fā)處理,并且提供了透明的容錯處理。
 
我們數據本地優(yōu)化策略的靈感來(lái)源于active disks[12,15]等技術(shù),在active disks中,計算任務(wù)是盡量推送到數據存儲的節點(diǎn)處理(alex注:即靠近數據源處理),這樣就減少了網(wǎng)絡(luò )和IO子系統的吞吐量。我們在掛載幾個(gè)硬盤(pán)的普通機器上執行我們的運算,而不是在磁盤(pán)處理器上執行我們的工作,但是達到的目的一樣的。
 
我們的備用任務(wù)機制和Charlotte System[3]提出的eager調度機制比較類(lèi)似。Eager調度機制的一個(gè)缺點(diǎn)是如果一個(gè)任務(wù)反復失效,那么整個(gè)計算就不能完成。我們通過(guò)忽略引起故障的記錄的方式在某種程度上解決了這個(gè)問(wèn)題。
 
MapReduce的實(shí)現依賴(lài)于一個(gè)內部的集群管理系統,這個(gè)集群管理系統負責在一個(gè)超大的、共享機器的集群上分布和運行用戶(hù)任務(wù)。雖然這個(gè)不是本論文的重點(diǎn),但是有必要提一下,這個(gè)集群管理系統在理念上和其它系統,如Condor[16]是一樣。
 
MapReduce庫的排序機制和NOW-Sort[1]的操作上很類(lèi)似。讀取輸入源的機器(mapworkers)把待排序的數據進(jìn)行分區后,發(fā)送到R個(gè)Reduce worker中的一個(gè)進(jìn)行處理。每個(gè)Reduceworker在本地對數據進(jìn)行排序(盡可能在內存中排序)。當然,NOW-Sort沒(méi)有給用戶(hù)自定義的Map和Reduce函數的機會(huì ),因此不具備MapReduce庫廣泛的實(shí)用性。
 
River[2]提供了一個(gè)編程模型:處理進(jìn)程通過(guò)分布式隊列傳送數據的方式進(jìn)行互相通訊。和MapReduce類(lèi)似,River系統嘗試在不對等的硬件環(huán)境下,或者在系統顛簸的情況下也能提供近似平均的性能。River是通過(guò)精心調度硬盤(pán)和網(wǎng)絡(luò )的通訊來(lái)平衡任務(wù)的完成時(shí)間。MapReduce庫采用了其它的方法。通過(guò)對編程模型進(jìn)行限制,MapReduce框架把問(wèn)題分解成為大量的“小”任務(wù)。這些任務(wù)在可用的worker集群上動(dòng)態(tài)的調度,這樣快速的worker就可以執行更多的任務(wù)。通過(guò)對編程模型進(jìn)行限制,我們可用在工作接近完成的時(shí)候調度備用任務(wù),縮短在硬件配置不均衡的情況下縮小整個(gè)操作完成的時(shí)間(比如有的機器性能差、或者機器被某些操作阻塞了)。
 
BAD-FS[5]采用了和MapReduce完全不同的編程模式,它是面向廣域網(wǎng)(alex注:wide-area network)的。不過(guò),這兩個(gè)系統有兩個(gè)基礎功能很類(lèi)似。(1)兩個(gè)系統采用重新執行的方式來(lái)防止由于失效導致的數據丟失。(2)兩個(gè)都使用數據本地化調度策略,減少網(wǎng)絡(luò )通訊的數據量。
 

TACC[7]是一個(gè)用于簡(jiǎn)化構造高可用性網(wǎng)絡(luò )服務(wù)的系統。和MapReduce一樣,它也依靠重新執行機制來(lái)實(shí)現的容錯處理。

8、結束語(yǔ)

MapReduce編程模型在Google內部成功應用于多個(gè)領(lǐng)域。我們把這種成功歸結為幾個(gè)方面:首先,由于MapReduce封裝了并行處理、容錯處理、數據本地化優(yōu)化、負載均衡等等技術(shù)難點(diǎn)的細節,這使得MapReduce庫易于使用。即便對于完全沒(méi)有并行或者分布式系統開(kāi)發(fā)經(jīng)驗的程序員而言;其次,大量不同類(lèi)型的問(wèn)題都可以通過(guò)MapReduce簡(jiǎn)單的解決。比如,MapReduce用于生成Google的網(wǎng)絡(luò )搜索服務(wù)所需要的數據、用來(lái)排序、用來(lái)數據挖掘、用于機器學(xué)習,以及很多其它的系統;第三,我們實(shí)現了一個(gè)在數千臺計算機組成的大型集群上靈活部署運行的MapReduce。這個(gè)實(shí)現使得有效利用這些豐富的計算資源變得非常簡(jiǎn)單,因此也適合用來(lái)解決Google遇到的其他很多需要大量計算的問(wèn)題。
 

我們也從MapReduce開(kāi)發(fā)過(guò)程中學(xué)到了不少東西。首先,約束編程模式使得并行和分布式計算非常容易,也易于構造容錯的計算環(huán)境;其次,網(wǎng)絡(luò )帶寬是稀有資源。大量的系統優(yōu)化是針對減少網(wǎng)絡(luò )傳輸量為目的的:本地優(yōu)化策略使大量的數據從本地磁盤(pán)讀取,中間文件寫(xiě)入本地磁盤(pán)、并且只寫(xiě)一份中間文件也節約了網(wǎng)絡(luò )帶寬;第三,多次執行相同的任務(wù)可以減少性能緩慢的機器帶來(lái)的負面影響(alex注:即硬件配置的不平衡),同時(shí)解決了由于機器失效導致的數據丟失問(wèn)題。

9、感謝

(alex注:還是原汁原味的感謝詞比較好,這個(gè)就不翻譯了)JoshLevenberg has been instrumental in revising and extending theuser-level MapReduce API with a number of new features based on hisexperience with using MapReduce and other people’s suggestions forenhancements. MapReduce reads its input from and writes its output tothe Google File System [8]. We would like to thank Mohit Aron, HowardGobioff, Markus Gutschke, David Kramer, Shun-Tak Leung, and JoshRedstone for their work in developing GFS. We would also like to thankPercy Liang and Olcan Sercinoglu for their work in developing thecluster management system used by MapReduce. Mike Burrows, Wilson Hsieh,Josh Levenberg, Sharon Perl, Rob Pike, and Debby Wallach providedhelpful comments on earlier drafts of this paper.The anonymous OSDIreviewers, and our shepherd, Eric Brewer, provided many usefulsuggestions of areas where the paper could be improved. Finally, wethank all the users of MapReduce within Google’s engineeringorganization for providing helpful feedback, suggestions, and bugreports.

10、參考資料

[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E.Culler, Joseph M. Hellerstein, and David A. Patterson.High-performancesorting on networks of workstations.In Proceedings of the 1997 ACMSIGMOD InternationalConference on Management of Data, Tucson,Arizona,May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E.Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick.Cluster I/O with River:Making the fast case common. In Proceedings ofthe Sixth Workshop on Input/Output in Parallel and Distributed Systems(IOPADS ‘99), pages 10.22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff.Charlotte: Metacomputing on the web. In Proceedings of the 9thInternational Conference on Parallel and Distributed Computing Systems,1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search fora planet: The Google cluster architecture. IEEE Micro, 23(2):22.28,April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H.Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-awaredistributed file system. In Proceedings of the 1st USENIX Symposium onNetworked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, andPaul Gauthier. Cluster-based scalable network services. In Proceedingsof the 16th ACM Symposium on Operating System Principles, pages 78. 91,Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google filesystem. In 19th Symposium on Operating Systems Principles, pages 29.43,Lake George, New York, 2003. To appear in OSDI 2004 12
[9] S. Gorlatch. Systematic efficient parallelization of scan and otherlist homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y.Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes inComputer Science 1124, pages 401.408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI:Portable Parallel Programming with the Message-Passing Interface. MITPress, Cambridge, MA, 1999.
[12] L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G.R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecturefor early discard in interactive search. In Proceedings of the 2004USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980.
[14] Michael O. Rabin. Efficient dispersal of information for security,load balancing and fault tolerance. Journal of the ACM, 36(2):335.348,1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle.Active disks for large-scale data processing. IEEE Computer, pages68.74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributedcomputing in practice: The Condor experience. Concurrency andComputation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
 

附錄A、單詞頻率統計

本節包含了一個(gè)完整的程序,用于統計在一組命令行指定的輸入文件中,每一個(gè)不同的單詞出現頻率。
#include “mapreduce/mapreduce.h”

// User’s map function
class WordCounter : public Mapper {
 public:
  virtual void Map(const MapInput& input) {
   const string& text = input.value();
   const int n = text.size();
   for (int i = 0; i < n; ) {
    // Skip past leading whitespace
    while ((i < n) && isspace(text[i]))
     i++;

   // Find word end
   int start = i;
   while ((i < n) && !isspace(text[i]))
    i++;
   if (start < i)
    Emit(text.substr(start,i-start),”1″);
  }
 }
};

REGISTER_MAPPER(WordCounter);

// User’s reduce function
class Adder : public Reducer {
 virtual void Reduce(ReduceInput* input) {
  // Iterate over all entries with the
  // same key and add the values
  int64 value = 0;
  while (!input->done()) {
   value += StringToInt(input->value());
   input->NextValue();
  }

  // Emit sum for input->key()
  Emit(IntToString(value));
 }
};

REGISTER_REDUCER(Adder);

int main(int argc, char** argv) {
 ParseCommandLineFlags(argc, argv);
 
 MapReduceSpecification spec;
 
 // Store list of input files into “spec”
 for (int i = 1; i < argc; i++) {
  MapReduceInput* input = spec.add_input();
  input->set_format(“text”);
  input->set_filepattern(argv[i]);
  input->set_mapper_class(“WordCounter”);
 }

 // Specify the output files:
 // /gfs/test/freq-00000-of-00100
 // /gfs/test/freq-00001-of-00100
 // …
 MapReduceOutput* out = spec.output();
 out->set_filebase(“/gfs/test/freq”);
 out->set_num_tasks(100);
 out->set_format(“text”);
 out->set_reducer_class(“Adder”);
 
 // Optional: do partial sums within map
 // tasks to save network bandwidth
 out->set_combiner_class(“Adder”);

 // Tuning parameters: use at most 2000
 // machines and 100 MB of memory per task
 spec.set_machines(2000);
 spec.set_map_megabytes(100);
 spec.set_reduce_megabytes(100);
 
 // Now run it
 MapReduceResult result;
 if (!MapReduce(spec, &result)) abort();
 
 // Done: ‘result’ structure contains info
 // about counters, time taken, number of
 // machines used, etc.
 return 0;
}

本站僅提供存儲服務(wù),所有內容均由用戶(hù)發(fā)布,如發(fā)現有害或侵權內容,請點(diǎn)擊舉報。
打開(kāi)APP,閱讀全文并永久保存 查看更多類(lèi)似文章
猜你喜歡
類(lèi)似文章
Google分布式系統三大論文(三)MapReduce: Simplified Data Processing on Large Clusters
Google服務(wù)器架構圖解簡(jiǎn)析(轉載)
谷歌三篇論文之二---MapReduce
[轉]MapReduce:超大機群上的簡(jiǎn)單數據處理 - heiyeluren的blog(黑...
Google揭開(kāi)了內部工作方式的神秘面紗。
分布式計算概述
更多類(lèi)似文章 >>
生活服務(wù)
分享 收藏 導長(cháng)圖 關(guān)注 下載文章
綁定賬號成功
后續可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服

欧美性猛交XXXX免费看蜜桃,成人网18免费韩国,亚洲国产成人精品区综合,欧美日韩一区二区三区高清不卡,亚洲综合一区二区精品久久