當下,Spark已經(jīng)在國內得到了廣泛的認可和支持:2014年,Spark Summit China在北京召開(kāi),場(chǎng)面火爆;同年,Spark Meetup在北京、上海、深圳和杭州四個(gè)城市舉辦,其中僅北京就成功舉辦了5次,內容更涵蓋Spark Core、Spark Streaming、Spark MLlib、Spark SQL等眾多領(lǐng)域。而作為較早關(guān)注和引入Spark的移動(dòng)互聯(lián)網(wǎng)大數據綜合服務(wù)公司,TalkingData也積極地參與到國內Spark社區的各種活動(dòng),并多次在Meetup中分享公司的Spark使用經(jīng)驗。本文則主要介紹TalkingData在大數據平臺建設過(guò)程中,逐漸引入Spark,并且以Hadoop YARN和Spark為基礎來(lái)構建移動(dòng)大數據平臺的過(guò)程。
初識Spark
作為一家在移動(dòng)互聯(lián)網(wǎng)大數據領(lǐng)域創(chuàng )業(yè)的公司,時(shí)刻關(guān)注大數據技術(shù)領(lǐng)域的發(fā)展和進(jìn)步是公司技術(shù)團隊必做的功課。而在整理Strata 2013公開(kāi)的講義時(shí),一篇主題為《An Introduction on the Berkeley Data Analytics Stack_BDAS_Featuring Spark,Spark Streaming,and Shark》的教程引起了整個(gè)技術(shù)團隊的關(guān)注和討論,其中Spark基于內存的RDD模型、對機器學(xué)習算法的支持、整個(gè)技術(shù)棧中實(shí)時(shí)處理和離線(xiàn)處理的統一模型以及Shark都讓人眼前一亮。同時(shí)期我們關(guān)注的還有Impala,但對比Spark,Impala可以理解為對Hive的升級,而Spark則嘗試圍繞RDD建立一個(gè)用于大數據處理的生態(tài)系統。對于一家數據量高速增長(cháng),業(yè)務(wù)又是以大數據處理為核心并且在不斷變化的創(chuàng )業(yè)公司而言,后者無(wú)疑更值得進(jìn)一步關(guān)注和研究。
Spark初探
2013年中期,隨著(zhù)業(yè)務(wù)高速發(fā)展,越來(lái)越多的移動(dòng)設備側數據被各個(gè)不同的業(yè)務(wù)平臺收集。那么這些數據除了提供不同業(yè)務(wù)所需要的業(yè)務(wù)指標,是否還蘊藏著(zhù)更多的價(jià)值?為了更好地挖掘數據潛在價(jià)值,我們決定建造自己的數據中心,將各業(yè)務(wù)平臺的數據匯集到一起,對覆蓋設備的相關(guān)數據進(jìn)行加工、分析和挖掘,從而探索數據的價(jià)值。初期數據中心主要功能設置如下所示:
1. 跨市場(chǎng)聚合的安卓應用排名;
2. 基于用戶(hù)興趣的應用推薦。
基于當時(shí)的技術(shù)掌握程度和功能需求,數據中心所采用的技術(shù)架構如圖1。
整個(gè)系統構建基于Hadoop 2.0(Cloudera CDH4.3),采用了最原始的大數據計算架構。通過(guò)日志匯集程序,將不同業(yè)務(wù)平臺的日志匯集到數據中心,并通過(guò)ETL將數據進(jìn)行格式化處理,儲存到HDFS。其中,排名和推薦算法的實(shí)現都采用了MapReduce,系統中只存在離線(xiàn)批量計算,并通過(guò)基于A(yíng)zkaban的調度系統進(jìn)行離線(xiàn)任務(wù)的調度。
第一個(gè)版本的數據中心架構基本上是以滿(mǎn)足“最基本的數據利用”這一目的進(jìn)行設計的。然而,隨著(zhù)對數據價(jià)值探索得逐漸加深,越來(lái)越多的實(shí)時(shí)分析需求被提出。與此同時(shí),更多的機器學(xué)習算法也亟需添加,以便支持不同的數據挖掘需求。對于實(shí)時(shí)數據分析,顯然不能通過(guò)“對每個(gè)分析需求單獨開(kāi)發(fā)MapReduce任務(wù)”來(lái)完成,因此引入Hive 是一個(gè)簡(jiǎn)單而直接的選擇。鑒于傳統的MapReduce模型并不能很好地支持迭代計算,我們需要一個(gè)更好的并行計算框架來(lái)支持機器學(xué)習算法。而這些正是我們一直在密切關(guān)注的Spark所擅長(cháng)的領(lǐng)域——憑借其對迭代計算的友好支持,Spark理所當然地成為了不二之選。2013年9月底,隨著(zhù)Spark 0.8.0發(fā)布,我們決定對最初的架構進(jìn)行演進(jìn),引入Hive作為即時(shí)查詢(xún)的基礎,同時(shí)引入Spark計算框架來(lái)支持機器學(xué)習類(lèi)型的計算,并且驗證Spark這個(gè)新的計算框架是否能夠全面替代傳統的以MapReduce為基礎的計算框架。圖2為整個(gè)系統的架構演變。
在這個(gè)架構中,我們將Spark 0.8.1部署在YARN上,通過(guò)分Queue,來(lái)隔離基于Spark的機器學(xué)習任務(wù),計算排名的日常MapReduce任務(wù)和基于Hive的即時(shí)分析任務(wù)。
想要引入Spark,第一步需要做的就是要取得支持我們Hadoop環(huán)境的Spark包。我們的Hadoop環(huán)境是Cloudera發(fā)布的CDH 4.3,默認的Spark發(fā)布包并不包含支持CDH 4.3的版本,因此只能自己編譯。Spark官方文檔推薦用Maven進(jìn)行編譯,可是編譯卻不如想象中順利。各種包依賴(lài)由于眾所周知的原因,不能順利地從某些依賴(lài)中心庫下載。于是我們采取了最簡(jiǎn)單直接的繞開(kāi)辦法,利用AWS云主機進(jìn)行編譯。需要注意的是,編譯前一定要遵循文檔的建議,設置:
否則,編譯過(guò)程中就會(huì )遇到內存溢出的問(wèn)題。針對CDH 4.3,mvn build的參數為:
在編譯成功所需要的Spark包后,部署和在Hadoop環(huán)境中運行Spark則是非常簡(jiǎn)單的事情。將編譯好的Spark目錄打包壓縮后,在可以運行Hadoop Client的機器上解壓縮,就可以運行Spark了。想要驗證Spark是否能夠正常在目標Hadoop環(huán)境上運行,可以參照Spark的官方文檔,運行example中的SparkPi來(lái)驗證:
完成Spark部署之后,剩下的就是開(kāi)發(fā)基于Spark的程序了。雖然Spark支持Java、Python,但最合適開(kāi)發(fā)Spark程序的語(yǔ)言還是Scala。經(jīng)過(guò)一段時(shí)間的摸索實(shí)踐,我們掌握了Scala語(yǔ)言的函數式編程語(yǔ)言特點(diǎn)后,終于體會(huì )了利用Scala開(kāi)發(fā)Spark應用的巨大好處。同樣的功能,用MapReduce幾百行才能實(shí)現的計算,在Spark中,Scala通過(guò)短短的數十行代碼就能完成。而在運行時(shí),同樣的計算功能,Spark上執行則比MapReduce有數十倍的提高。對于需要迭代的機器學(xué)習算法來(lái)講,Spark的RDD模型相比MapReduce的優(yōu)勢則更是明顯,更何況還有基本的MLlib的支持。經(jīng)過(guò)幾個(gè)月的實(shí)踐,數據挖掘相關(guān)工作被完全遷移到Spark,并且在Spark上實(shí)現了適合我們數據集的更高效的LR等等算法。
全面擁抱Spark
進(jìn)入2014年,公司的業(yè)務(wù)有了長(cháng)足的發(fā)展,對比數據中心平臺建立時(shí),每日處理的數據量亦翻了幾番。每日的排名計算所花的時(shí)間越來(lái)越長(cháng),而基于Hive的即時(shí)計算只能支持日尺度的計算,如果到周這個(gè)尺度,計算所花的時(shí)間已經(jīng)很難忍受,到月這個(gè)尺度則基本上沒(méi)辦法完成計算?;谠赟park上的認知和積累,是時(shí)候將整個(gè)數據中心遷移到Spark上了。
2014年4月,Spark Summit China在北京舉行。抱著(zhù)學(xué)習的目的,我們技術(shù)團隊也參加了在中國舉行的這一次Spark盛會(huì )。通過(guò)這次盛會(huì ),我們了解到國內的很多同行已經(jīng)開(kāi)始采用Spark來(lái)建造自己的大數據平臺,而Spark也變成了在A(yíng)SF中最為活躍的項目之一。另外,越來(lái)越多的大數據相關(guān)的產(chǎn)品也逐漸在和Spark相融合或者在向Spark遷移。Spark無(wú)疑將會(huì )變?yōu)橐粋€(gè)相比Hadoop MapReduce更好的生態(tài)系統。通過(guò)這次大會(huì ),我們更加堅定了全面擁抱Spark的決心。
基于YARN和Spark,我們開(kāi)始重新架構數據中心依賴(lài)的大數據平臺。整個(gè)新的數據平臺應該能夠承載:
1. 準實(shí)時(shí)的數據匯集和ETL;
2. 支持流式的數據加工;
3. 更高效的離線(xiàn)計算能力;
4. 高速的多維分析能力;
5. 更高效的即時(shí)分析能力;
6. 高效的機器學(xué)習能力;
7. 統一的數據訪(fǎng)問(wèn)接口;
8. 統一的數據視圖;
9. 靈活的任務(wù)調度
整個(gè)新的架構充分地利用YARN和Spark,并且融合公司的一些技術(shù)積累,架構如圖3所示。
在新的架構中,引入了Kafka作為日志匯集的通道。幾個(gè)業(yè)務(wù)系統收集的移動(dòng)設備側的日志,實(shí)時(shí)地寫(xiě)入到Kafka 中,從而方便后續的數據消費。
利用Spark Streaming,可以方便地對Kafka中的數據進(jìn)行消費處理。在整個(gè)架構中,Spark Streaming主要完成了以下工作。
1. 原始日志的保存。將Kafka中的原始日志以JSON格式無(wú)損的保存在HDFS中。
2. 數據清洗和轉換,清洗和標準化之后,轉變?yōu)镻arquet格式,存儲在HDFS中,方便后續的各種數據計算任務(wù)。
3. 定義好的流式計算任務(wù),比如基于頻次規則的標簽加工等等,計算結果直接存儲在MongoDB中。

排名計算任務(wù)則在Spark上做了重新實(shí)現,借力Spark帶來(lái)的性能提高,以及Parquet列式存儲帶來(lái)的高效數據訪(fǎng)問(wèn)。同樣的計算任務(wù),在數據量提高到原來(lái)3倍的情況下,時(shí)間開(kāi)銷(xiāo)只有原來(lái)的1/6。
同時(shí),在利用Spark和Parquet列式存儲帶來(lái)的性能提升之外,曾經(jīng)很難滿(mǎn)足業(yè)務(wù)需求的即時(shí)多維度數據分析終于成為了可能。曾經(jīng)利用Hive需要小時(shí)級別才能完成日尺度的多維度即時(shí)分析,在新架構上,只需要2分鐘就能夠順利完成。而周尺度上也不過(guò)十分鐘就能夠算出結果。曾經(jīng)在Hive上無(wú)法完成的月尺度多維度分析計算,則在兩個(gè)小時(shí)內也可以算出結果。另外Spark SQL的逐漸完善也降低了開(kāi)發(fā)的難度。
利用YARN提供的資源管理能力,用于多維度分析,自主研發(fā)的Bitmap引擎也被遷移到了YARN上。對于已經(jīng)確定好的維度,可以預先創(chuàng )建Bitmap索引。而多維度的分析,如果所需要的維度已經(jīng)預先建立了Bitmap索引,則通過(guò)Bitmap引擎由Bitmap計算來(lái)實(shí)現,從而可以提供實(shí)時(shí)的多維度的分析能力。
在新的架構中,為了更方便地管理數據,我們引入了基于HCatalog的元數據管理系統,數據的定義、存儲、訪(fǎng)問(wèn)都通過(guò)元數據管理系統,從而實(shí)現了數據的統一視圖,方便了數據資產(chǎn)的管理。
YARN只提供了資源的調度能力,在一個(gè)大數據平臺,分布式的任務(wù)調度系統同樣不可或缺。在新的架構中,我們自行開(kāi)發(fā)了一個(gè)支持DAG的分布式任務(wù)調度系統,結合YARN提供的資源調度能力,從而實(shí)現定時(shí)任務(wù)、即時(shí)任務(wù)以及不同任務(wù)構成的pipeline。
基于圍繞YARN和Spark的新的架構,一個(gè)針對數據業(yè)務(wù)部門(mén)的自服務(wù)大數據平臺得以實(shí)現,數據業(yè)務(wù)部門(mén)可以方便地利用這個(gè)平臺對進(jìn)行多維度的分析、數據的抽取,以及進(jìn)行自定義的標簽加工。自服務(wù)系統提高了數據利用的能力,同時(shí)也大大提高了數據利用的效率。
使用Spark遇到的一些坑
任何新技術(shù)的引入都會(huì )歷經(jīng)陌生到熟悉,從最初新技術(shù)帶來(lái)的驚喜,到后來(lái)遇到困難時(shí)的一籌莫展和惆悵,再到問(wèn)題解決后的愉悅,大數據新貴Spark同樣不能免俗。下面就列舉一些我們遇到的坑。
【坑一:跑很大的數據集的時(shí)候,會(huì )遇到org.apache.spark.SparkException: Error communicating with MapOutputTracker】
這個(gè)錯誤報得很隱晦,從錯誤日志看,是Spark集群partition了,但如果觀(guān)察物理機器的運行情況,會(huì )發(fā)現磁盤(pán)I/O非常高。進(jìn)一步分析會(huì )發(fā)現原因是Spark在處理大數據集時(shí)的shuffle過(guò)程中生成了太多的臨時(shí)文件,造成了操作系統磁盤(pán)I/O負載過(guò)大。找到原因后,解決起來(lái)就很簡(jiǎn)單了,設置spark.shuffle.consolidateFiles為true。這個(gè)參數在默認的設置中是false的,對于linux的ext4文件系統,建議大家還是默認設置為true吧。Spark官方文檔的描述也建議ext4文件系統設置為true來(lái)提高性能。
【坑二:運行時(shí)報Fetch failure錯】
在大數據集上,運行Spark程序,在很多情況下會(huì )遇到Fetch failure的錯。由于Spark本身設計是容錯的,大部分的Fetch failure會(huì )經(jīng)過(guò)重試后通過(guò),因此整個(gè)Spark任務(wù)會(huì )正常跑完,不過(guò)由于重試的影響,執行時(shí)間會(huì )顯著(zhù)增長(cháng)。造成Fetch failure的根本原因則不盡相同。從錯誤本身看,是由于任務(wù)不能從遠程的節點(diǎn)讀取shuffle的數據,具體原因則需要利用:

查看Spark的運行日志,從而找到造成Fetch failure的根本原因。其中大部分的問(wèn)題都可以通過(guò)合理的參數配置以及對程序進(jìn)行優(yōu)化來(lái)解決。2014年Spark Summit China上陳超的那個(gè)專(zhuān)題,對于如何對Spark性能進(jìn)行優(yōu)化,有非常好的建議。
當然,在使用Spark過(guò)程中還遇到過(guò)其他不同的問(wèn)題,不過(guò)由于Spark本身是開(kāi)源的,通過(guò)源代碼的閱讀,以及借助開(kāi)源社區的幫助,大部分問(wèn)題都可以順利解決。
下一步的計劃
Spark在2014年取得了長(cháng)足的發(fā)展,圍繞Spark的大數據生態(tài)系統也逐漸的完善。Spark 1.3引入了一個(gè)新的DataFrame API,這個(gè)新的DataFrame API將會(huì )使得Spark對于數據的處理更加友好。同樣出自于A(yíng)MPLab的分布式緩存系統Tachyon因為其與Spark的良好集成也逐漸引起了人們的注意。鑒于在業(yè)務(wù)場(chǎng)景中,很多基礎數據是需要被多個(gè)不同的Spark任務(wù)重復使用,下一步,我們將會(huì )在架構中引入Tachyon來(lái)作為緩存層。另外,隨著(zhù)SSD的日益普及,我們后續的計劃是在集群中每臺機器都引入SSD存儲,配置Spark的shuffle的輸出到SSD,利用SSD的高速隨機讀寫(xiě)能力,進(jìn)一步提高大數據處理效率。
在機器學(xué)習方面,H2O機器學(xué)習引擎也和Spark有了良好的集成從而產(chǎn)生了Sparkling-water。相信利用Sparking-water,作為一家創(chuàng )業(yè)公司,我們也可以利用深度學(xué)習的力量來(lái)進(jìn)一步挖掘數據的價(jià)值。
結語(yǔ)
2004年,Google的MapReduce論文揭開(kāi)了大數據處理的時(shí)代,Hadoop的MapReduce在過(guò)去接近10年的時(shí)間成了大數據處理的代名詞。而Matei Zaharia 2012年關(guān)于RDD的一篇論文“Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing”則揭示了大數據處理技術(shù)一個(gè)新時(shí)代的到來(lái)。伴隨著(zhù)新的硬件技術(shù)的發(fā)展、低延遲大數據處理的廣泛需求以及數據挖掘在大數據領(lǐng)域的日益普及,Spark作為一個(gè)嶄新的大數據生態(tài)系統,逐漸取代傳統的MapReduce而成為新一代大數據處理技術(shù)的熱門(mén)。我們過(guò)去兩年從MapReduce到Spark架構的演變過(guò)程,也基本上代表了相當一部分大數據領(lǐng)域從業(yè)者的技術(shù)演進(jìn)的歷程。相信隨著(zhù)Spark生態(tài)的日益完善,會(huì )有越來(lái)越多的企業(yè)將自己的數據處理遷移到Spark上來(lái)。而伴隨著(zhù)越來(lái)越多的大數據工程師熟悉和了解Spark,國內的Spark社區也會(huì )越來(lái)越活躍,Spark作為一個(gè)開(kāi)源的平臺,相信也會(huì )有越來(lái)越多的華人變成Spark相關(guān)項目的Contributor,Spark也會(huì )變得越來(lái)越成熟和強大。
作者簡(jiǎn)介:閻志濤,TalkingData研發(fā)副總裁,領(lǐng)導研發(fā)了公司的數據管理平臺(DMP)、數據觀(guān)象臺等產(chǎn)品,并且負責公司大數據計算平臺的研發(fā)。

本文選自程序員電子版2015年3月A刊。
聯(lián)系客服