MapReduce:超大機群上的簡(jiǎn)單數據處理
摘要
MapReduce是一個(gè)編程模型,和處理,產(chǎn)生大數據集的相關(guān)實(shí)現.用戶(hù)指定一個(gè)map函數處理一個(gè)key/value對,從而產(chǎn)生中間的key/value對集.然后再指定一個(gè)reduce函數合并所有的具有相同中間key的中間value.下面將列舉許多可以用這個(gè)模型來(lái)表示的現實(shí)世界的工作.
以這種方式寫(xiě)的程序能自動(dòng)的在大規模的普通機器上實(shí)現并行化.這個(gè)運行時(shí)系統關(guān)心這些細節:分割輸入數據,在機群上的調度,機器的錯誤處理,管理機器之間必要的通信.這樣就可以讓那些沒(méi)有并行分布式處理系統經(jīng)驗的程序員利用大量分布式系統的資源.
我們的MapReduce實(shí)現運行在規??梢造`活調整的由普通機器組成的機群上,一個(gè)典型的MapReduce計算處理幾千臺機器上的以TB計算的數據.程序員發(fā)現這個(gè)系統非常好用:已經(jīng)實(shí)現了數以百計的MapReduce程序,每天在Google的機群上都有1000多個(gè)MapReduce程序在執行.
1.介紹
在過(guò)去的5年里,作者和Google的許多人已經(jīng)實(shí)現了數以百計的為專(zhuān)門(mén)目的而寫(xiě)的計算來(lái)處理大量的原始數據,比如,爬行的文檔,Web請求日志,等等.為了計算各種類(lèi)型的派生數據,比如,倒排索引,Web文檔的圖結構的各種表示,每個(gè)主機上爬行的頁(yè)面數量的概要,每天被請求數量最多的集合,等等.很多這樣的計算在概念上很容易理解.然而,輸入的數據量很大,并且只有計算被分布在成百上千的機器上才能在可以接受的時(shí)間內完成.怎樣并行計算,分發(fā)數據,處理錯誤,所有這些問(wèn)題綜合在一起,使得原本很簡(jiǎn)介的計算,因為要大量的復雜代碼來(lái)處理這些問(wèn)題,而變得讓人難以處理.
作為對這個(gè)復雜性的回應,我們設計一個(gè)新的抽象模型,它讓我們表示我們將要執行的簡(jiǎn)單計算,而隱藏并行化,容錯,數據分布,負載均衡的那些雜亂的細節,在一個(gè)庫里.我們的抽象模型的靈感來(lái)自L(fǎng)isp和許多其他函數語(yǔ)言的map和reduce的原始表示.我們認識到我們的許多計算都包含這樣的操作:在我們輸入數據的邏輯記錄上應用map操作,來(lái)計算出一個(gè)中間key/value對集,在所有具有相同key的value上應用reduce操作,來(lái)適當的合并派生的數據.功能模型的使用,再結合用戶(hù)指定的map和reduce操作,讓我們可以非常容易的實(shí)現大規模并行化計算,和使用再次執行作為初級機制來(lái)實(shí)現容錯.
這個(gè)工作的主要貢獻是通過(guò)簡(jiǎn)單有力的接口來(lái)實(shí)現自動(dòng)的并行化和大規模分布式計算,結合這個(gè)接口的實(shí)現來(lái)在大量普通的PC機上實(shí)現高性能計算.
第二部分描述基本的編程模型,并且給一些例子.第三部分描述符合我們的基于集群的計算環(huán)境的MapReduce的接口的實(shí)現.第四部分描述我們覺(jué)得編程模型中一些有用的技巧.第五部分對于各種不同的任務(wù),測量我們實(shí)現的性能.第六部分探究在Google內部使用MapReduce作為基礎來(lái)重寫(xiě)我們的索引系統產(chǎn)品.第七部分討論相關(guān)的,和未來(lái)的工作.
2.編程模型
計算利用一個(gè)輸入key/value對集,來(lái)產(chǎn)生一個(gè)輸出key/value對集.MapReduce庫的用戶(hù)用兩個(gè)函數表達這個(gè)計算:map和reduce.
用戶(hù)自定義的map函數,接受一個(gè)輸入對,然后產(chǎn)生一個(gè)中間key/value對集.MapReduce庫把所有具有相同中間key I的中間value聚合在一起,然后把它們傳遞給reduce函數.
用戶(hù)自定義的reduce函數,接受一個(gè)中間key I和相關(guān)的一個(gè)value集.它合并這些value,形成一個(gè)比較小的value集.一般的,每次reduce調用只產(chǎn)生0或1個(gè)輸出value.通過(guò)一個(gè)迭代器把中間value提供給用戶(hù)自定義的reduce函數.這樣可以使我們根據內存來(lái)控制value列表的大小.
2.1 實(shí)例
考慮這個(gè)問(wèn)題:計算在一個(gè)大的文檔集合中每個(gè)詞出現的次數.用戶(hù)將寫(xiě)和下面類(lèi)似的偽代碼:
map(String key,String value):
//key:文檔的名字
//value:文檔的內容
for each word w in value:
EmitIntermediate(w,"1");
reduce(String key,Iterator values):
//key:一個(gè)詞
//values:一個(gè)計數列表
int result=0;
for each v in values:
result+=ParseInt(v);
Emit(AsString(resut));
map函數產(chǎn)生每個(gè)詞和這個(gè)詞的出現次數(在這個(gè)簡(jiǎn)單的例子里就是1).reduce函數把產(chǎn)生的每一個(gè)特定的詞的計數加在一起.
另外,用戶(hù)用輸入輸出文件的名字和可選的調節參數來(lái)填充一個(gè)mapreduce規范對象.用戶(hù)然后調用MapReduce函數,并把規范對象傳遞給它.用戶(hù)的代碼和MapReduce庫鏈接在一起(用C++實(shí)現).附錄A包含這個(gè)實(shí)例的全部文本.
2.2類(lèi)型
即使前面的偽代碼寫(xiě)成了字符串輸入和輸出的term格式,但是概念上用戶(hù)寫(xiě)的map和reduce函數有關(guān)聯(lián)的類(lèi)型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
例如,輸入的key,value和輸出的key,value的域不同.此外,中間key,value和輸出key,values的域相同.
我們的C++實(shí)現傳遞字符串來(lái)和用戶(hù)自定義的函數交互,并把它留給用戶(hù)的代碼,來(lái)在字符串和適當的類(lèi)型間進(jìn)行轉換.
2.3更多實(shí)例
這里有一些讓人感興趣的簡(jiǎn)單程序,可以容易的用MapReduce計算來(lái)表示.
分布式的Grep(UNIX工具程序, 可做文件內的字符串查找):如果輸入行匹配給定的樣式,map函數就輸出這一行.reduce函數就是把中間數據復制到輸出.
計算URL訪(fǎng)問(wèn)頻率:map函數處理web頁(yè)面請求的記錄,輸出(URL,1).reduce函數把相同URL的value都加起來(lái),產(chǎn)生一個(gè)(URL,記錄總數)的對.
倒轉網(wǎng)絡(luò )鏈接圖:map函數為每個(gè)鏈接輸出(目標,源)對,一個(gè)URL叫做目標,包含這個(gè)URL的頁(yè)面叫做源.reduce函數根據給定的相關(guān)目標URLs連接所有的源URLs形成一個(gè)列表,產(chǎn)生(目標,源列表)對.
每個(gè)主機的術(shù)語(yǔ)向量:一個(gè)術(shù)語(yǔ)向量用一個(gè)(詞,頻率)列表來(lái)概述出現在一個(gè)文檔或一個(gè)文檔集中的最重要的一些詞.map函數為每一個(gè)輸入文檔產(chǎn)生一個(gè)(主機名,術(shù)語(yǔ)向量)對(主機名來(lái)自文檔的URL).reduce函數接收給定主機的所有文檔的術(shù)語(yǔ)向量.它把這些術(shù)語(yǔ)向量加在一起,丟棄低頻的術(shù)語(yǔ),然后產(chǎn)生一個(gè)最終的(主機名,術(shù)語(yǔ)向量)對.
倒排索引:map函數分析每個(gè)文檔,然后產(chǎn)生一個(gè)(詞,文檔號)對的序列.reduce函數接受一個(gè)給定詞的所有對,排序相應的文檔IDs,并且產(chǎn)生一個(gè)(詞,文檔ID列表)對.所有的輸出對集形成一個(gè)簡(jiǎn)單的倒排索引.它可以簡(jiǎn)單的增加跟蹤詞位置的計算.
分布式排序:map函數從每個(gè)記錄提取key,并且產(chǎn)生一個(gè)(key,record)對.reduce函數不改變任何的對.這個(gè)計算依賴(lài)分割工具(在4.1描述)和排序屬性(在4.2描述).
3實(shí)現
MapReduce接口可能有許多不同的實(shí)現.根據環(huán)境進(jìn)行正確的選擇.例如,一個(gè)實(shí)現對一個(gè)共享內存較小的機器是合適的,另外的適合一個(gè)大NUMA的多處理器的機器,而有的適合一個(gè)更大的網(wǎng)絡(luò )機器的集合.
這部分描述一個(gè)在Google廣泛使用的計算環(huán)境的實(shí)現:用交換機連接的普通PC機的大機群.我們的環(huán)境是:
1.Linux操作系統,雙處理器,2-4GB內存的機器.
2.普通的網(wǎng)絡(luò )硬件,每個(gè)機器的帶寬或者是百兆或者千兆,但是平均小于全部帶寬的一半.
3.因為一個(gè)機群包含成百上千的機器,所有機器會(huì )經(jīng)常出現問(wèn)題.
4.存儲用直接連到每個(gè)機器上的廉價(jià)IDE硬盤(pán).一個(gè)從內部文件系統發(fā)展起來(lái)的分布式文件系統被用來(lái)管理存儲在這些磁盤(pán)上的數據.文件系統用復制的方式在不可靠的硬件上來(lái)保證可靠性和有效性.
5.用戶(hù)提交工作給調度系統.每個(gè)工作包含一個(gè)任務(wù)集,每個(gè)工作被調度者映射到機群中一個(gè)可用的機器集上.
3.1執行預覽
通過(guò)自動(dòng)分割輸入數據成一個(gè)有M個(gè)split的集,map調用被分布到多臺機器上.輸入的split能夠在不同的機器上被并行處理.通過(guò)用分割函數分割中間key,來(lái)形成R個(gè)片(例如,hash(key) mod R),reduce調用被分布到多臺機器上.分割數量(R)和分割函數由用戶(hù)來(lái)指定.
圖1顯示了我們實(shí)現的MapReduce操作的全部流程.當用戶(hù)的程序調用MapReduce的函數的時(shí)候,將發(fā)生下面的一系列動(dòng)作(下面的數字和圖1中的數字標簽相對應):
1.在用戶(hù)程序里的MapReduce庫首先分割輸入文件成M個(gè)片,每個(gè)片的大小一般從 16到64MB(用戶(hù)可以通過(guò)可選的參數來(lái)控制).然后在機群中開(kāi)始大量的拷貝程序.
2.這些程序拷貝中的一個(gè)是master,其他的都是由master分配任務(wù)的worker.有M 個(gè)map任務(wù)和R個(gè)reduce任務(wù)將被分配.管理者分配一個(gè)map任務(wù)或reduce任務(wù)給一個(gè)空閑的worker.
3.一個(gè)被分配了map任務(wù)的worker讀取相關(guān)輸入split的內容.它從輸入數據中分析出key/value對,然后把key/value對傳遞給用戶(hù)自定義的map函數.由map函數產(chǎn)生的中間key/value對被緩存在內存中.
4.緩存在內存中的key/value對被周期性的寫(xiě)入到本地磁盤(pán)上,通過(guò)分割函數把它們寫(xiě)入R個(gè)區域.在本地磁盤(pán)上的緩存對的位置被傳送給master,master負責把這些位置傳送給reduce worker.
5.當一個(gè)reduce worker得到master的位置通知的時(shí)候,它使用遠程過(guò)程調用來(lái)從map worker的磁盤(pán)上讀取緩存的數據.當reduce worker讀取了所有的中間數據后,它通過(guò)排序使具有相同key的內容聚合在一起.因為許多不同的key映射到相同的reduce任務(wù),所以排序是必須的.如果中間數據比內存還大,那么還需要一個(gè)外部排序.
6.reduce worker迭代排過(guò)序的中間數據,對于遇到的每一個(gè)唯一的中間key,它把key和相關(guān)的中間value集傳遞給用戶(hù)自定義的reduce函數.reduce函數的輸出被添加到這個(gè)reduce分割的最終的輸出文件中.
7.當所有的map和reduce任務(wù)都完成了,管理者喚醒用戶(hù)程序.在這個(gè)時(shí)候,在用戶(hù)程序里的MapReduce調用返回到用戶(hù)代碼.
在成功完成之后,mapreduce執行的輸出存放在R個(gè)輸出文件中(每一個(gè)reduce任務(wù)產(chǎn)生一個(gè)由用戶(hù)指定名字的文件).一般,用戶(hù)不需要合并這R個(gè)輸出文件成一個(gè)文件--他們經(jīng)常把這些文件當作一個(gè)輸入傳遞給其他的MapReduce調用,或者在可以處理多個(gè)分割文件的分布式應用中使用他們.
3.2master數據結構
master保持一些數據結構.它為每一個(gè)map和reduce任務(wù)存儲它們的狀態(tài)(空閑,工作中,完成),和worker機器(非空閑任務(wù)的機器)的標識.
master就像一個(gè)管道,通過(guò)它,中間文件區域的位置從map任務(wù)傳遞到reduce任務(wù).因此,對于每個(gè)完成的map任務(wù),master存儲由map任務(wù)產(chǎn)生的R個(gè)中間文件區域的大小和位置.當map任務(wù)完成的時(shí)候,位置和大小的更新信息被接受.這些信息被逐步增加的傳遞給那些正在工作的reduce任務(wù).
3.3容錯
因為MapReduce庫被設計用來(lái)使用成百上千的機器來(lái)幫助處理非常大規模的數據,所以這個(gè)庫必須要能很好的處理機器故障.
worker故障
master周期性的ping每個(gè)worker.如果master在一個(gè)確定的時(shí)間段內沒(méi)有收到worker返回的信息,那么它將把這個(gè)worker標記成失效.因為每一個(gè)由這個(gè)失效的worker完成的map任務(wù)被重新設置成它初始的空閑狀態(tài),所以它可以被安排給其他的worker.同樣的,每一個(gè)在失敗的worker上正在運行的map或reduce任務(wù),也被重新設置成空閑狀態(tài),并且將被重新調度.
在一個(gè)失敗機器上已經(jīng)完成的map任務(wù)將被再次執行,因為它的輸出存儲在它的磁盤(pán)上,所以不可訪(fǎng)問(wèn).已經(jīng)完成的reduce任務(wù)將不會(huì )再次執行,因為它的輸出存儲在全局文件系統中.
當一個(gè)map任務(wù)首先被worker A執行之后,又被B執行了(因為A失效了),重新執行這個(gè)情況被通知給所有執行reduce任務(wù)的worker.任何還沒(méi)有從A讀數據的reduce任務(wù)將從worker B讀取數據.
MapReduce可以處理大規模worker失敗的情況.例如,在一個(gè)MapReduce操作期間,在正在運行的機群上進(jìn)行網(wǎng)絡(luò )維護引起80臺機器在幾分鐘內不可訪(fǎng)問(wèn)了,MapReduce master只是簡(jiǎn)單的再次執行已經(jīng)被不可訪(fǎng)問(wèn)的worker完成的工作,繼續執行,最終完成這個(gè)MapReduce操作.
master失敗
可以很容易的讓管理者周期的寫(xiě)入上面描述的數據結構的checkpoints.如果這個(gè)master任務(wù)失效了,可以從上次最后一個(gè)checkpoint開(kāi)始啟動(dòng)另一個(gè)master進(jìn)程.然而,因為只有一個(gè)master,所以它的失敗是比較麻煩的,因此我們現在的實(shí)現是,如果master失敗,就中止MapReduce計算.客戶(hù)可以檢查這個(gè)狀態(tài),并且可以根據需要重新執行MapReduce操作.
在錯誤面前的處理機制
當用戶(hù)提供的map和reduce操作對它的輸出值是確定的函數時(shí),我們的分布式實(shí)現產(chǎn)生,和全部程序沒(méi)有錯誤的順序執行一樣,相同的輸出.
我們依賴(lài)對map和reduce任務(wù)的輸出進(jìn)行原子提交來(lái)完成這個(gè)性質(zhì).每個(gè)工作中的任務(wù)把它的輸出寫(xiě)到私有臨時(shí)文件中.一個(gè)reduce任務(wù)產(chǎn)生一個(gè)這樣的文件,而一個(gè)map任務(wù)產(chǎn)生R個(gè)這樣的文件(一個(gè)reduce任務(wù)對應一個(gè)文件).當一個(gè)map任務(wù)完成的時(shí)候,worker發(fā)送一個(gè)消息給master,在這個(gè)消息中包含這R個(gè)臨時(shí)文件的名字.如果master從一個(gè)已經(jīng)完成的map任務(wù)再次收到一個(gè)完成的消息,它將忽略這個(gè)消息.否則,它在master的數據結構里記錄這R個(gè)文件的名字.
當一個(gè)reduce任務(wù)完成的時(shí)候,這個(gè)reduce worker原子的把臨時(shí)文件重命名成最終的輸出文件.如果相同的reduce任務(wù)在多個(gè)機器上執行,多個(gè)重命名調用將被執行,并產(chǎn)生相同的輸出文件.我們依賴(lài)由底層文件系統提供的原子重命名操作來(lái)保證,最終的文件系統狀態(tài)僅僅包含一個(gè)reduce任務(wù)產(chǎn)生的數據.
我們的map和reduce操作大部分都是確定的,并且我們的處理機制等價(jià)于一個(gè)順序的執行的這個(gè)事實(shí),使得程序員可以很容易的理解程序的行為.當map或/和reduce操作是不確定的時(shí)候,我們提供雖然比較弱但是合理的處理機制.當在一個(gè)非確定操作的前面,一個(gè)reduce任務(wù)R1的輸出等價(jià)于一個(gè)非確定順序程序執行產(chǎn)生的輸出.然而,一個(gè)不同的reduce任務(wù)R2的輸出也許符合一個(gè)不同的非確定順序程序執行產(chǎn)生的輸出.
考慮map任務(wù)M和reduce任務(wù)R1,R2的情況.我們設定e(Ri)為已經(jīng)提交的Ri的執行(有且僅有一個(gè)這樣的執行).這個(gè)比較弱的語(yǔ)義出現,因為e(R1)也許已經(jīng)讀取了由M的執行產(chǎn)生的輸出,而e(R2)也許已經(jīng)讀取了由M的不同執行產(chǎn)生的輸出.
3.4存儲位置
在我們的計算機環(huán)境里,網(wǎng)絡(luò )帶寬是一個(gè)相當缺乏的資源.我們利用把輸入數據(由GFS管理)存儲在機器的本地磁盤(pán)上來(lái)保存網(wǎng)絡(luò )帶寬.GFS把每個(gè)文件分成64MB的一些塊,然后每個(gè)塊的幾個(gè)拷貝存儲在不同的機器上(一般是3個(gè)拷貝).MapReduce的master考慮輸入文件的位置信息,并且努力在一個(gè)包含相關(guān)輸入數據的機器上安排一個(gè)map任務(wù).如果這樣做失敗了,它嘗試在那個(gè)任務(wù)的輸入數據的附近安排一個(gè)map任務(wù)(例如,分配到一個(gè)和包含輸入數據塊在一個(gè)switch里的worker機器上執行).當運行巨大的MapReduce操作在一個(gè)機群中的一部分機器上的時(shí)候,大部分輸入數據在本地被讀取,從而不消耗網(wǎng)絡(luò )帶寬.
3.5任務(wù)粒度
象上面描述的那樣,我們細分map階段成M個(gè)片,reduce階段成R個(gè)片.M和R應當比worker機器的數量大許多.每個(gè)worker執行許多不同的工作來(lái)提高動(dòng)態(tài)負載均衡,也可以加速從一個(gè)worker失效中的恢復,這個(gè)機器上的許多已經(jīng)完成的map任務(wù)可以被分配到所有其他的worker機器上.
在我們的實(shí)現里,M和R的范圍是有大小限制的,因為master必須做O(M+R)次調度,并且保存O(M*R)個(gè)狀態(tài)在內存中.(這個(gè)因素使用的內存是很少的,在O(M*R)個(gè)狀態(tài)片里,大約每個(gè)map任務(wù)/reduce任務(wù)對使用一個(gè)字節的數據).
此外,R經(jīng)常被用戶(hù)限制,因為每一個(gè)reduce任務(wù)最終都是一個(gè)獨立的輸出文件.實(shí)際上,我們傾向于選擇M,以便每一個(gè)單獨的任務(wù)大概都是16到64MB的輸入數據(以便上面描述的位置優(yōu)化是最有效的),我們把R設置成我們希望使用的worker機器數量的小倍數.我們經(jīng)常執行MapReduce計算,在M=200000,R=5000,使用2000臺工作者機器的情況下.
3.6備用任務(wù)
一個(gè)落后者是延長(cháng)MapReduce操作時(shí)間的原因之一:一個(gè)機器花費一個(gè)異乎尋常地的長(cháng)時(shí)間來(lái)完成最后的一些map或reduce任務(wù)中的一個(gè).有很多原因可能產(chǎn)生落后者.例如,一個(gè)有壞磁盤(pán)的機器經(jīng)常發(fā)生可以糾正的錯誤,這樣就使讀性能從30MB/s降低到3MB/s.機群調度系統也許已經(jīng)安排其他的任務(wù)在這個(gè)機器上,由于計算要使用CPU,內存,本地磁盤(pán),網(wǎng)絡(luò )帶寬的原因,引起它執行MapReduce代碼很慢.我們最近遇到的一個(gè)問(wèn)題是,一個(gè)在機器初始化時(shí)的Bug引起處理器緩存的失效:在一個(gè)被影響的機器上的計算性能有上百倍的影響.
我們有一個(gè)一般的機制來(lái)減輕這個(gè)落后者的問(wèn)題.當一個(gè)MapReduce操作將要完成的時(shí)候,master調度備用進(jìn)程來(lái)執行那些剩下的還在執行的任務(wù).無(wú)論是原來(lái)的還是備用的執行完成了,工作都被標記成完成.我們已經(jīng)調整了這個(gè)機制,通常只會(huì )占用多幾個(gè)百分點(diǎn)的機器資源.我們發(fā)現這可以顯著(zhù)的減少完成大規模MapReduce操作的時(shí)間.作為一個(gè)例子,將要在5.3描述的排序程序,在關(guān)閉掉備用任務(wù)的情況下,要比有備用任務(wù)的情況下多花44%的時(shí)間.
4技巧
盡管簡(jiǎn)單的map和reduce函數的功能對于大多數需求是足夠的了,但是我們開(kāi)發(fā)了一些有用的擴充.這些將在這個(gè)部分描述.
4.1分割函數
MapReduce用戶(hù)指定reduce任務(wù)和reduce任務(wù)需要的輸出文件的數量.在中間key上使用分割函數,使數據分割后通過(guò)這些任務(wù).一個(gè)缺省的分割函數使用hash方法(例如,hash(key) mod R).這個(gè)導致非常平衡的分割.然后,有的時(shí)候,使用其他的key分割函數來(lái)分割數據有非常有用的.例如,有時(shí)候,輸出的key是URLs,并且我們希望每個(gè)主機的所有條目保持在同一個(gè)輸出文件中.為了支持像這樣的情況,MapReduce庫的用戶(hù)可以提供專(zhuān)門(mén)的分割函數.例如,使用"hash(Hostname(urlkey)) mod R"作為分割函數,使所有來(lái)自同一個(gè)主機的URLs保存在同一個(gè)輸出文件中.
4.2順序保證
我們保證在一個(gè)給定的分割里面,中間key/value對以key遞增的順序處理.這個(gè)順序保證可以使每個(gè)分割產(chǎn)出一個(gè)有序的輸出文件,當輸出文件的格式需要支持有效率的隨機訪(fǎng)問(wèn)key的時(shí)候,或者對輸出數據集再作排序的時(shí)候,就很容易.
4.3combiner函數
在某些情況下,允許中間結果key重復會(huì )占據相當的比重,并且用戶(hù)定義的reduce函數
滿(mǎn)足結合律和交換律.一個(gè)很好的例子就是在2.1部分的詞統計程序.因為詞頻率傾向于一個(gè)zipf分布(齊夫分布),每個(gè)map任務(wù)將產(chǎn)生成百上千個(gè)這樣的記錄<the,1>.所有的這些計數將通過(guò)網(wǎng)絡(luò )被傳輸到一個(gè)單獨的reduce任務(wù),然后由reduce函數加在一起產(chǎn)生一個(gè)數字.我們允許用戶(hù)指定一個(gè)可選的combiner函數,先在本地進(jìn)行合并一下,然后再通過(guò)網(wǎng)絡(luò )發(fā)送.
在每一個(gè)執行map任務(wù)的機器上combiner函數被執行.一般的,相同的代碼被用在combiner和reduce函數.在combiner和reduce函數之間唯一的區別是MapReduce庫怎樣控制函數的輸出.reduce函數的輸出被保存最終輸出文件里.combiner函數的輸出被寫(xiě)到中間文件里,然后被發(fā)送給reduce任務(wù).
部分使用combiner可以顯著(zhù)的提高一些MapReduce操作的速度.附錄A包含一個(gè)使用combiner函數的例子.
4.4輸入輸出類(lèi)型
MapReduce庫支持以幾種不同的格式讀取輸入數據.例如,文本模式輸入把每一行看作是一個(gè)key/value對.key是文件的偏移量,value是那一行的內容.其他普通的支持格式以key的順序存儲key/value對序列.每一個(gè)輸入類(lèi)型的實(shí)現知道怎樣把輸入分割成對每個(gè)單獨的map任務(wù)來(lái)說(shuō)是有意義的(例如,文本模式的范圍分割確保僅僅在每行的邊界進(jìn)行范圍分割).雖然許多用戶(hù)僅僅使用很少的預定意輸入類(lèi)型的一個(gè),但是用戶(hù)可以通過(guò)提供一個(gè)簡(jiǎn)單的reader接口來(lái)支持一個(gè)新的輸入類(lèi)型.
一個(gè)reader不必要從文件里讀數據.例如,我們可以很容易的定義它從數據庫里讀記錄,或從內存中的數據結構讀取.
4.5副作用
有的時(shí)候,MapReduce的用戶(hù)發(fā)現在map操作或/和reduce操作時(shí)產(chǎn)生輔助文件作為一個(gè)附加的輸出是很方便的.我們依靠應用程序寫(xiě)來(lái)使這個(gè)副作用成為原子的.一般的,應用程序寫(xiě)一個(gè)臨時(shí)文件,然后一旦這個(gè)文件全部產(chǎn)生完,就自動(dòng)的被重命名.
對于單個(gè)任務(wù)產(chǎn)生的多個(gè)輸出文件來(lái)說(shuō),我們沒(méi)有提供其上的兩階段提交的原子操作支持.因此,一個(gè)產(chǎn)生需要交叉文件連接的多個(gè)輸出文件的任務(wù),應該使確定性的任務(wù).不過(guò)這個(gè)限制在實(shí)際的工作中并不是一個(gè)問(wèn)題.
4.6跳過(guò)錯誤記錄
有的時(shí)候因為用戶(hù)的代碼里有bug,導致在某一個(gè)記錄上map或reduce函數突然crash掉.這樣的bug使得MapReduce操作不能完成.雖然一般是修復這個(gè)bug,但是有時(shí)候這是不現實(shí)的;也許這個(gè)bug是在源代碼不可得到的第三方庫里.有的時(shí)候也可以忽略一些記錄,例如,當在一個(gè)大的數據集上進(jìn)行統計分析.我們提供一個(gè)可選的執行模式,在這個(gè)模式下,MapReduce庫檢測那些記錄引起的crash,然后跳過(guò)那些記錄,來(lái)繼續執行程序.
每個(gè)worker程序安裝一個(gè)信號處理器來(lái)獲取內存段異常和總線(xiàn)錯誤.在調用一個(gè)用戶(hù)自定義的map或reduce操作之前,MapReduce庫把記錄的序列號存儲在一個(gè)全局變量里.如果用戶(hù)代碼產(chǎn)生一個(gè)信號,那個(gè)信號處理器就會(huì )發(fā)送一個(gè)包含序號的"last gasp"UDP包給MapReduce的master.當master不止一次看到同一個(gè)記錄的時(shí)候,它就會(huì )指出,當相關(guān)的map或reduce任務(wù)再次執行的時(shí)候,這個(gè)記錄應當被跳過(guò).
4.7本地執行
調試在map或reduce函數中問(wèn)題是很困難的,因為實(shí)際的計算發(fā)生在一個(gè)分布式的系統中,經(jīng)常是有一個(gè)master動(dòng)態(tài)的分配工作給幾千臺機器.為了簡(jiǎn)化調試和測試,我們開(kāi)發(fā)了一個(gè)可替換的實(shí)現,這個(gè)實(shí)現在本地執行所有的MapReduce操作.用戶(hù)可以控制執行,這樣計算可以限制到特定的map任務(wù)上.用戶(hù)以一個(gè)標志調用他們的程序,然后可以容易的使用他們認為好用的任何調試和測試工具(例如,gdb).
4.8狀態(tài)信息
master運行一個(gè)HTTP服務(wù)器,并且可以輸出一組狀況頁(yè)來(lái)供人們使用.狀態(tài)頁(yè)顯示計算進(jìn)度,象多少個(gè)任務(wù)已經(jīng)完成,多少個(gè)還在運行,輸入的字節數,中間數據字節數,輸出字節數,處理百分比,等等.這個(gè)頁(yè)也包含到標準錯誤的鏈接,和由每個(gè)任務(wù)產(chǎn)生的標準輸出的鏈接.用戶(hù)可以根據這些數據預測計算需要花費的時(shí)間,和是否需要更多的資源.當計算比預期的要慢很多的時(shí)候,這些頁(yè)面也可以被用來(lái)判斷是不是這樣.
此外,最上面的狀態(tài)頁(yè)顯示已經(jīng)有多少個(gè)工作者失敗了,和當它們失敗的時(shí)候,那個(gè)map和reduce任務(wù)正在運行.當試圖診斷在用戶(hù)代碼里的bug時(shí),這個(gè)信息也是有用的.
4.9計數器
MapReduce庫提供一個(gè)計數器工具,來(lái)計算各種事件的發(fā)生次數.例如,用戶(hù)代碼想要計算所有處理的詞的個(gè)數,或者被索引的德文文檔的數量.
為了使用這個(gè)工具,用戶(hù)代碼創(chuàng )建一個(gè)命名的計數器對象,然后在map或/和reduce函數里適當的增加計數器.例如:
Counter * uppercase;
uppercase=GetCounter("uppercase");
map(String name,String contents):
for each word w in contents:
if(IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w,"1");
來(lái)自不同worker機器上的計數器值被周期性的傳送給master(在ping回應里).master把來(lái)自成功的map和reduce任務(wù)的計數器值加起來(lái),在MapReduce操作完成的時(shí)候,把它返回給用戶(hù)代碼.當前計數器的值也被顯示在master狀態(tài)頁(yè)里,以便人們可以查看實(shí)際的計算進(jìn)度.當計算計數器值的時(shí)候消除重復執行的影響,避免數據的累加.(在備用任務(wù)的使用,和由于出錯的重新執行,可以產(chǎn)生重復執行)
有些計數器值被MapReduce庫自動(dòng)的維護,比如,被處理的輸入key/value對的數量,和被產(chǎn)生的輸出key/value對的數量.
用戶(hù)發(fā)現計數器工具對于檢查MapReduce操作的完整性很有用.例如,在一些MapReduce操作中,用戶(hù)代碼也許想要確保輸出對的數量完全等于輸入對的數量,或者處理過(guò)的德文文檔的數量是在全部被處理的文檔數量中屬于合理的范圍.
5性能
在本節,我們用在一個(gè)大型集群上運行的兩個(gè)計算來(lái)衡量MapReduce的性能.一個(gè)計算用來(lái)在一個(gè)大概1TB的數據中查找特定的匹配串.另一個(gè)計算排序大概1TB的數據.
這兩個(gè)程序代表了MapReduce的用戶(hù)實(shí)現的真實(shí)的程序的一個(gè)大子集.一類(lèi)是,把數據從一種表示轉化到另一種表示.另一類(lèi)是,從一個(gè)大的數據集中提取少量的關(guān)心的數據.
5.1機群配置
所有的程序在包含大概1800臺機器的機群上執行.機器的配置是:2個(gè)2G的Intel Xeon超線(xiàn)程處理器,4GB內存,兩個(gè)160GB IDE磁盤(pán),一個(gè)千兆網(wǎng)卡.這些機器部署在一個(gè)由兩層的,樹(shù)形交換網(wǎng)絡(luò )中,在根節點(diǎn)上大概有100到2000G的帶寬.所有這些機器都有相同的部署(對等部署),因此任意兩點(diǎn)之間的來(lái)回時(shí)間小于1毫秒.
在4GB的內存里,大概有1-1.5GB被用來(lái)運行在機群中其他的任務(wù).這個(gè)程序是在周末的下午開(kāi)始執行的,這個(gè)時(shí)候CPU,磁盤(pán),網(wǎng)絡(luò )基本上是空閑的.
5.2Grep
這個(gè)Grep程序掃描大概10^10個(gè),每個(gè)100字節的記錄,查找比較少的3字符的查找串(這個(gè)查找串出現在92337個(gè)記錄中).輸入數據被分割成大概64MB的片(M=15000),全部 的輸出存放在一個(gè)文件中(R=1).
圖2顯示計算過(guò)程隨時(shí)間變化的情況.Y軸表示輸入數據被掃描的速度.隨著(zhù)更多的機群被分配給這個(gè)MapReduce計算,速度在逐步的提高,當有1764個(gè)worker的時(shí)候這個(gè)速度達到最高的30GB/s.當map任務(wù)完成的時(shí)候,速度開(kāi)始下降,在計算開(kāi)始后80秒,輸入的速度降到0.這個(gè)計算持續的時(shí)間大概是150秒.這包括了前面大概一分鐘的啟動(dòng)時(shí)間.啟動(dòng)時(shí)間用來(lái)把程序傳播到所有的機器上,等待GFS打開(kāi)1000個(gè)輸入文件,得到必要的位置優(yōu)化信息.
5.3排序
這個(gè)sort程序排序10^10個(gè)記錄,每個(gè)記錄100個(gè)字節(大概1TB的數據).這個(gè)程序是模仿TeraSort的.
這個(gè)排序程序只包含不到50行的用戶(hù)代碼.其中有3行map函數用來(lái)從文本行提取10字節的排序key,并且產(chǎn)生一個(gè)由這個(gè)key和原始文本行組成的中間key/value對.我們使用一個(gè)內置的Identity函數作為reduce操作.這個(gè)函數直接把中間key/value對作為輸出的key/value對.最終的排序輸出寫(xiě)到一個(gè)2路復制的GFS文件中(也就是,程序的輸出會(huì )寫(xiě)2TB的數據).
象以前一樣,輸入數據被分割成64MB的片(M=15000).我們把排序后的輸出寫(xiě)到4000個(gè)文件中(R=4000).分區函數使用key的原始字節來(lái)把數據分區到R個(gè)小片中.
我們以這個(gè)基準的分割函數,知道key的分布情況.在一般的排序程序中,我們會(huì )增加一個(gè)預處理的MapReduce操作,這個(gè)操作用于采樣key的情況,并且用這個(gè)采樣的key的分布情況來(lái)計算對最終排序處理的分割點(diǎn)。
圖3(a)顯示這個(gè)排序程序的正常執行情況.左上圖顯示輸入數據的讀取速度.這個(gè)速度最高到達13GB/s,并且在不到200秒所有map任務(wù)完成之后迅速滑落到0.注意到這個(gè)輸入速度小于Grep.這是因為這個(gè)排序map任務(wù)花費大概一半的時(shí)間和帶寬,來(lái)把中間數據寫(xiě)到本地硬盤(pán)中.而Grep相關(guān)的中間數據可以忽略不計.
左中圖顯示數據通過(guò)網(wǎng)絡(luò )從map任務(wù)傳輸給reduce任務(wù)的速度.當第一個(gè)map任務(wù)完成后,這個(gè)排序過(guò)程就開(kāi)始了.圖示上的第一個(gè)高峰是啟動(dòng)了第一批大概1700個(gè)reduce任務(wù)(整個(gè)MapReduce任務(wù)被分配到1700臺機器上,每個(gè)機器一次只執行一個(gè)reduce任務(wù)).大概開(kāi)始計算后的300秒,第一批reduce任務(wù)中的一些完成了,我們開(kāi)始執行剩下的reduce任務(wù).全部的排序過(guò)程持續了大概600秒的時(shí)間.
左下圖顯示排序后的數據被reduce任務(wù)寫(xiě)入最終文件的速度.因為機器忙于排序中間數據,所以在第一個(gè)排序階段的結束和寫(xiě)階段的開(kāi)始有一個(gè)延遲.寫(xiě)的速度大概是2-4GB/s.大概開(kāi)始計算后的850秒寫(xiě)過(guò)程結束.包括前面的啟動(dòng)過(guò)程,全部的計算任務(wù)持續的891秒.這個(gè)和TeraSort benchmark的最高紀錄1057秒差不多.
需要注意的事情是:因此位置優(yōu)化的原因,很多數據都是從本地磁盤(pán)讀取的而沒(méi)有通過(guò)我們有限帶寬的網(wǎng)絡(luò ),所以輸入速度比排序速度和輸出速度都要快.排序速度比輸出速度快的原因是輸出階段寫(xiě)兩個(gè)排序后數據的拷貝(我們寫(xiě)兩個(gè)副本的原因是為了可靠性和可用性).我們寫(xiě)兩份的原因是因為底層文件系統的可靠性和可用性的要求.如果底層文件系統用類(lèi)似容錯編碼(erasure coding)的方式,而不采用復制寫(xiě)的方式,在寫(xiě)盤(pán)階段可以降低網(wǎng)絡(luò )帶寬的要求。
5.4備用任務(wù)的影響
在圖3(b)中,顯示我們不用備用任務(wù)的排序程序的執行情況.除了它有一個(gè)很長(cháng)的幾乎沒(méi)有寫(xiě)動(dòng)作發(fā)生的尾巴外,執行流程和圖3(a)相似.在960秒后,只有5個(gè)reduce任務(wù)沒(méi)有完成.然而,就是這最后幾個(gè)落后者知道300秒后才完成.全部的計算任務(wù)執行了1283秒,多花了44%的時(shí)間.
5.5機器失效
在圖3(c)中,顯示我們有意的在排序程序計算過(guò)程中停止1746臺worker中的200臺機器上的程序的情況.底層機群調度者在這些機器上馬上重新開(kāi)始新的worker程序(因為僅僅程序被停止,而機器仍然在正常運行).
因為已經(jīng)完成的map工作丟失了(由于相關(guān)的map worker被殺掉了),需要重新再作,所以worker死掉會(huì )導致一個(gè)負數的輸入速率.相關(guān)map任務(wù)的重新執行很快就重新執行了.整個(gè)計算過(guò)程在933秒內完成,包括了前邊的啟動(dòng)時(shí)間(只比正常執行時(shí)間多了5%的時(shí)間).
6經(jīng)驗
我們在2003年的2月寫(xiě)了MapReduce庫的第一個(gè)版本,并且在2003年的8月做了顯著(zhù)的增強,包括位置優(yōu)化,worker機器間任務(wù)執行的動(dòng)態(tài)負載均衡,等等.從那個(gè)時(shí)候起,我們驚奇的發(fā)現MapReduce函數庫廣泛用于我們日常處理的問(wèn)題.它現在在Google內部各個(gè)領(lǐng)域內廣泛應用,包括:
大規模機器學(xué)習問(wèn)題
Google News和Froogle產(chǎn)品的機器問(wèn)題.
提取數據產(chǎn)生一個(gè)流行查詢(xún)的報告(例如,Google Zeitgeist).
為新的試驗和產(chǎn)品提取網(wǎng)頁(yè)的屬性(例如,從一個(gè)web頁(yè)的大集合中提取位置信息 用在位置查詢(xún)).
大規模的圖計算.
圖4顯示了我們主要的源代碼管理系統中,隨著(zhù)時(shí)間推移,MapReduce程序的顯著(zhù)增加,從2003年早先時(shí)候的0個(gè)增長(cháng)到2004年9月份的差不多900個(gè)不同的程序.MapReduce之所以這樣的成功,是因為他能夠在不到半小時(shí)時(shí)間內寫(xiě)出一個(gè)簡(jiǎn)單的能夠應用于上千臺機器的大規模并發(fā)程序,并且極大的提高了開(kāi)發(fā)和原形設計的周期效率.并且,他可以讓一個(gè)完全沒(méi)有分布式和/或并行系統經(jīng)驗的程序員,能夠很容易的利用大量的資源.
在每一個(gè)任務(wù)結束的時(shí)候,MapReduce函數庫記錄使用的計算資源的統計信息.在圖1里,我們列出了2004年8月份在Google運行的一些MapReduce的工作的統計信息.
6.1大規模索引
到目前為止,最成功的MapReduce的應用就是重寫(xiě)了Google web 搜索服務(wù)所使用到的index系統.索引系統處理爬蟲(chóng)系統抓回來(lái)的超大量的文檔集,這些文檔集保存在GFS文件里.這些文檔的原始內容的大小,超過(guò)了20TB.索引程序是通過(guò)一系列的,大概5到10次MapReduce操作來(lái)建立索引.通過(guò)利用MapReduce(替換掉上一個(gè)版本的特別設計的分布處理的索引程序版本)有這樣一些好處:
索引的代碼簡(jiǎn)單,量少,容易理解,因為容錯,分布式,并行處理都隱藏在MapReduce庫中了.例如,當使用MapReduce函數庫的時(shí)候,計算的代碼行數從原來(lái)的3800行C++代碼一下減少到大概700行代碼.
MapReduce的函數庫的性能已經(jīng)非常好,所以我們可以把概念上不相關(guān)的計算步驟分開(kāi)處理,而不是混在一起以期減少在數據上的處理.這使得改變索引過(guò)程很容易.例如,我們對老索引系統的一個(gè)小更改可能要好幾個(gè)月的時(shí)間,但是在新系統內,只需要花幾天時(shí)間就可以了.
索引系統的操作更容易了,這是因為機器的失效,速度慢的機器,以及網(wǎng)絡(luò )失效都已經(jīng)由MapReduce自己解決了,而不需要操作人員的交互.另外,我們可以簡(jiǎn)單的通過(guò)對索引系統增加機器的方式提高處理性能.
7相關(guān)工作
很多系統都提供了嚴格的設計模式,并且通過(guò)對編程的嚴格限制來(lái)實(shí)現自動(dòng)的并行計算.例如,一個(gè)結合函數可以通過(guò)N個(gè)元素的數組的前綴在N個(gè)處理器上使用并行前綴計算在log N的時(shí)間內計算完.MapReduce是基于我們的大型現實(shí)計算的經(jīng)驗,對這些模型的一個(gè)簡(jiǎn)化和精煉.并且,我們還提供了基于上千臺處理器的容錯實(shí)現.而大部分并發(fā)處理系統都只在小規模的尺度上實(shí)現,并且機器的容錯還是程序員來(lái)控制的.
Bulk Synchronous Programming以及一些MPI primitives提供了更高級別的抽象,可以更容易寫(xiě)出并行處理的程序.這些系統和MapReduce系統的不同之處在,MapReduce利用嚴格的編程模式自動(dòng)實(shí)現用戶(hù)程序的并發(fā)處理,并且提供了透明的容錯處理.
我們本地的優(yōu)化策略是受active disks等技術(shù)的啟發(fā),在active disks中,計算任務(wù)是盡量推送到靠近本地磁盤(pán)的處理單元上,這樣就減少了通過(guò)I/O子系統或網(wǎng)絡(luò )的數據量.我們在少量磁盤(pán)直接連接到普通處理機運行,來(lái)代替直接連接到磁盤(pán)控制器的處理機上,但是一般的步驟是相似的.
我們的備用任務(wù)的機制和在Charlotte系統上的積極調度機制相似.這個(gè)簡(jiǎn)單的積極調度的一個(gè)缺陷是,如果一個(gè)任務(wù)引起了一個(gè)重復性的失敗,那個(gè)整個(gè)計算將無(wú)法完成.我們通過(guò)在故障情況下跳過(guò)故障記錄的機制,在某種程度上解決了這個(gè)問(wèn)題.
MapReduce實(shí)現依賴(lài)一個(gè)內置的機群管理系統來(lái)在一個(gè)大規模共享機器組上分布和運行用戶(hù)任務(wù).雖然這個(gè)不是本論文的重點(diǎn),但是集群管理系統在理念上和Condor等其他系統是一樣的.
在MapReduce庫中的排序工具在操作上和NOW-Sort相似.源機器(map worker)分割將要被排序的數據,然后把它發(fā)送到R個(gè)reduce worker中的一個(gè)上.每個(gè)reduce worker來(lái)本地排序它的數據(如果可能,就在內存中).當然,NOW-Sort沒(méi)有用戶(hù)自定義的map和reduce函數,使得我們的庫可以廣泛的應用.
River提供一個(gè)編程模型,在這個(gè)模型下,處理進(jìn)程可以靠在分布式的隊列上發(fā)送數據進(jìn)行彼此通訊.和MapReduce一樣,River系統嘗試提供對不同應用有近似平均的性能,即使在不對等的硬件環(huán)境下或者在系統顛簸的情況下也能提供近似平均的性.River是通過(guò)精心調度硬盤(pán)和網(wǎng)絡(luò )的通訊,來(lái)平衡任務(wù)的完成時(shí)間.MapReduce不和它不同.利用嚴格編程模型,MapReduce構架來(lái)把問(wèn)題分割成大量的任務(wù).這些任務(wù)被自動(dòng)的在可用的worker上調度,以便速度快的worker可以處理更多的任務(wù).這個(gè)嚴格編程模型也讓我們可以在工作快要結束的時(shí)候安排冗余的執行,來(lái)在非一致處理的情況減少完成時(shí)間(比如,在有慢機或者阻塞的worker的時(shí)候).
BAD-FS是一個(gè)很MapReduce完全不同的編程模型,它的目標是在一個(gè)廣闊的網(wǎng)絡(luò )上執行工作.然而,它們有兩個(gè)基本原理是相同的.(1)這兩個(gè)系統使用冗余的執行來(lái)從由失效引起的數據丟失中恢復.(2)這兩個(gè)系統使用本地化調度策略,來(lái)減少通過(guò)擁擠的網(wǎng)絡(luò )連接發(fā)送的數據數量.
TACC是一個(gè)被設計用來(lái)簡(jiǎn)化高有效性網(wǎng)絡(luò )服務(wù)結構的系統.和MapReduce一樣,它通過(guò)再次執行來(lái)實(shí)現容錯.
8結束語(yǔ)
MapReduce編程模型已經(jīng)在Google成功的用在不同的目的.我們把這個(gè)成功歸于以下幾個(gè)原因:第一,這個(gè)模型使用簡(jiǎn)單,甚至對沒(méi)有并行和分布式經(jīng)驗的程序員也是如此,因為它隱藏了并行化,容錯,位置優(yōu)化和負載均衡的細節.第二,大量不同的問(wèn)題可以用MapReduce計算來(lái)表達.例如,MapReduce被用來(lái),為Google的產(chǎn)品web搜索服務(wù),排序,數據挖掘,機器學(xué)習,和其他許多系統,產(chǎn)生數據.第三,我們已經(jīng)在一個(gè)好幾千臺計算機的大型集群上開(kāi)發(fā)實(shí)現了這個(gè)MapReduce.這個(gè)實(shí)現使得對于這些機器資源的利用非常簡(jiǎn)單,因此也適用于解決Google遇到的其他很多需要大量計算的問(wèn)題.
從這個(gè)工作中我們也學(xué)習到了一些東西.首先,嚴格的編程模型使得并行化和分布式計算簡(jiǎn)單,并且也易于構造這樣的容錯計算環(huán)境.第二,網(wǎng)絡(luò )帶寬是系統的瓶頸.因此在我們的系統中大量的優(yōu)化目標是減少通過(guò)網(wǎng)絡(luò )發(fā)送的數據量,本地優(yōu)化使用我們從本地磁盤(pán)讀取數據,并且把中間數據寫(xiě)到本地磁盤(pán),以保留網(wǎng)絡(luò )帶寬.第三,冗余的執行可以用來(lái)減少速度慢的機器的影響,和控制機器失效和數據丟失.
感謝
Josh Levenberg校定和擴展了用戶(hù)級別的MapReduce API,并且結合他的適用經(jīng)驗和其他人的改進(jìn)建議,增加了很多新的功能.MapReduce從GFS中讀取和寫(xiě)入數據.我們要感謝Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他們在開(kāi)發(fā)GFS中的工作.我們還感謝Percy Liang Olcan Sercinoglu 在開(kāi)發(fā)用于MapReduce的集群管理系統得工作.Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach為本論文提出了寶貴的意見(jiàn).OSDI的無(wú)名審閱者,以及我們的審核者Eric Brewer,在論文應當如何改進(jìn)方面給出了有益的意見(jiàn).最后,我們感謝Google的工程部的所有MapReduce的用戶(hù),感謝他們提供了有用的反饋,建議,以及錯誤報告等等.
A單詞頻率統計
本節包含了一個(gè)完整的程序,用于統計在一組命令行指定的輸入文件中,每一個(gè)不同的單詞出現頻率.
#include "mapreduce/mapreduce.h"
//用戶(hù)map函數
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
//跳過(guò)前導空格
while ((i < n) && isspace(text[i]))
i++;
// 查找單詞的結束位置
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
//用戶(hù)的reduce函數
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
//迭代具有相同key的所有條目,并且累加它們的value
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
//提交這個(gè)輸入key的綜合
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// 把輸入文件列表存入"spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
//指定輸出文件:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// 可選操作:在map任務(wù)中做部分累加工作,以便節省帶寬
out->set_combiner_class("Adder");
// 調整參數: 使用2000臺機器,每個(gè)任務(wù)100MB內存
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// 運行它
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// 完成: 'result'結構包含計數,花費時(shí)間,和使用機器的信息
return 0;
}