IBM®、Google、VMWare 和 Amazon 等公司已經(jīng)開(kāi)始提供云計算產(chǎn)品和戰略。本文講解如何使用 Apache Hadoop 構建一個(gè) MapReduce 框架以建立 Hadoop 集群,以及如何創(chuàng )建在 Hadoop 上運行的示例 MapReduce 應用程序。還將討論如何在云上設置耗費時(shí)間/磁盤(pán)的任務(wù)。 近來(lái)云計算越來(lái)越熱門(mén)了,云計算已經(jīng)被看作 IT 業(yè)的新趨勢。云計算可以粗略地定義為使用自己環(huán)境之外的某一服務(wù)提供的可伸縮計算資源,并按使用量付費??梢酝ㄟ^(guò) Internet 訪(fǎng)問(wèn) “云” 中的任何資源,而不需要擔心計算能力、帶寬、存儲、安全性和可靠性等問(wèn)題。 本文簡(jiǎn)要介紹 Amazon EC2 這樣的云計算平臺,可以租借這種平臺上的虛擬 Linux® 服務(wù)器;然后介紹開(kāi)放源碼 MapReduce 框架 Apache Hadoop,這個(gè)框架將構建在虛擬 Linux 服務(wù)器中以建立云計算框架。但是,Hadoop 不僅可以部署在任何廠(chǎng)商提供的 VM 上,還可以部署在物理機器上的一般 Linux OS 中。 在討論 Apache Hadoop 之前,我們先簡(jiǎn)要介紹一下云計算系統的結構。圖 1 顯示云計算的各個(gè)層以及現有的一些服務(wù)。關(guān)于云計算的各個(gè)層的詳細信息,請參見(jiàn) 參考資料。 基礎設施即服務(wù) (Infrastructure-as-a-Service,IaaS)是指以服務(wù)的形式租借基礎設施(計算資源和存儲)。IaaS 讓用戶(hù)可以租借計算機(即虛擬主機)或數據中心,可以指定特定的服務(wù)質(zhì)量約束,比如能夠運行某些操作系統和軟件。Amazon EC2 在這些層中作為 IaaS,向用戶(hù)提供虛擬的主機。平臺即服務(wù) (Platform-as-a-Service,PaaS)主要關(guān)注軟件框架或服務(wù),提供在基礎設施中進(jìn)行 “云” 計算所用的 API。Apache Hadoop 作為 PaaS,它構建在虛擬主機上,作為云計算平臺。 圖 1. 云計算的層和現有服務(wù) Amazon EC2 是一個(gè) Web 服務(wù),它允許用戶(hù)請求具有各種資源(CPU、磁盤(pán)、內存等)的虛擬機器。用戶(hù)只需按使用的計算時(shí)間付費,其他事情全交給 Amazon 處理。 這些實(shí)例 (Amazon Machine Image,AMI) 基于 Linux,可以運行您需要的任何應用程序或軟件。在從 Amazon 租借服務(wù)器之后,可以像對待物理服務(wù)器一樣使用一般的 SSH 工具設置連接和維護服務(wù)器。 對 EC2 的詳細介紹超出了本文的范圍。更多信息請參見(jiàn) 參考資料。 部署 Hadoop 云計算框架的最好方法是把它部署在 AMI 上,這樣可以利用云資源,不需要考慮計算能力、帶寬、存儲等問(wèn)題。但是,在本文的下一部分中,我們將在本地的 Linux 服務(wù)器 VMWare 映像中構建 Hadoop,因為 Hadoop 不僅適用于云解決方案。在此之前,我們先介紹一下 Apache Hadoop。 Apache Hadoop 是一個(gè)軟件框架(平臺),它可以分布式地操縱大量數據。它于 2006 年出現,由 Google、Yahoo! 和 IBM 等公司支持??梢哉J為它是一種 PaaS 模型。 它的設計核心是 MapReduce 實(shí)現和 HDFS (Hadoop Distributed File System),它們源自 MapReduce(由一份 Google 文件引入)和 Google File System。 MapReduce MapReduce 是 Google 引入的一個(gè)軟件框架,它支持在計算機(即節點(diǎn))集群上對大型數據集進(jìn)行分布式計算。它由兩個(gè)過(guò)程組成,映射(Map)和縮減(Reduce)。 在映射過(guò)程中,主節點(diǎn)接收輸入,把輸入分割為更小的子任務(wù),然后把這些子任務(wù)分布到工作者節點(diǎn)。 工作者節點(diǎn)處理這些小任務(wù),把結果返回給主節點(diǎn)。 然后,在縮減過(guò)程中,主節點(diǎn)把所有子任務(wù)的結果組合成輸出,這就是原任務(wù)的結果。 圖 2 說(shuō)明 MapReduce 流程的概念。 MapReduce 的優(yōu)點(diǎn)是它允許對映射和縮減操作進(jìn)行分布式處理。因為每個(gè)映射操作都是獨立的,所有映射都可以并行執行,這會(huì )減少總計算時(shí)間。 HDFS 對 HDFS 及其使用方法的完整介紹超出了本文的范圍。更多信息請參見(jiàn) 參考資料。 從最終用戶(hù)的角度來(lái)看,HDFS 就像傳統的文件系統一樣??梢允褂媚夸浡窂綄ξ募绦?CRUD 操作。但是,由于分布式存儲的性質(zhì),有 “NameNode” 和 “DataNode” 的概念,它們承擔各自的責任。 NameNode 是 DataNode 的主節點(diǎn)。它在 HDFS 中提供元數據服務(wù)。元數據說(shuō)明 DataNode 的文件映射。它還接收操作命令并決定哪些 DataNode 應該執行操作和復制。 DataNode 作為 HDFS 的存儲塊。它們還響應從 NameNode 接收的塊創(chuàng )建、刪除和復制命令。 JobTracker 和 TaskTracker 在提交應用程序時(shí),應該提供包含在 HDFS 中的輸入和輸出目錄。JobTracker 作為啟動(dòng) MapReduce 應用程序的單一控制點(diǎn),它決定應該創(chuàng )建多少個(gè) TaskTracker 和子任務(wù),然后把每個(gè)子任務(wù)分配給 TaskTracker。每個(gè) TaskTracker 向 JobTracker 報告狀態(tài)和完成后的任務(wù)。 通常,一個(gè)主節點(diǎn)作為 NameNode 和 JobTracker,從節點(diǎn)作為 DataNode 和 TaskTracker。Hadoop 集群的概念視圖和 MapReduce 的流程見(jiàn)圖 2。 圖 2. Hadoop 集群的概念視圖和 MapReduce 的流程 現在在 Linux VM 上設置 Hadoop 集群,然后就可以在 Hadoop 集群上運行 MapReduce 應用程序。 Apache Hadoop 支持三種部署模式: - 單獨模式:在默認情況下,Hadoop 以非分布的單獨模式運行。這個(gè)模式適合應用程序調試。
- 偽分布模式:Hadoop 還可以以單節點(diǎn)的偽分布模式運行。在這種情況下,每個(gè) Hadoop 守護進(jìn)程作為單獨的 Java™ 進(jìn)程運行。
- 全分布模式:Hadoop 配置在不同的主機上,作為集群運行。
要想以單獨或偽分布模式設置 Hadoop,請參考 Hadoop 的網(wǎng)站。在本文中,我們只討論以全分布模式設置 Hadoop。 準備環(huán)境 在本文中,我們需要三臺 GNU/Linux 服務(wù)器;一個(gè)作為主節點(diǎn),另外兩個(gè)作為從節點(diǎn)。 表 1. 服務(wù)器信息 | 服務(wù)器 IP | 服務(wù)器主機名 | 角色 | | 9.30.210.159 | Vm-9-30-210-159 | 主節點(diǎn)(NameNode 和 JobTracker) | | 9.30.210.160 | Vm-9-30-210-160 | 從節點(diǎn) 1 (DataNode 和 TaskTracker) | | 9.30.210.161 | Vm-9-30-210-161 | 從節點(diǎn) 2 (DataNode 和 TaskTracker) | 每臺機器都需要安裝 Java SE 6 和 Hadoop 二進(jìn)制代碼。更多信息見(jiàn) 參考資料。本文使用 Hadoop version 0.19.1。 還需要在每臺機器上安裝 SSH 并運行 sshd。SUSE 和 RedHat 等流行的 Linux 發(fā)行版在默認情況下已經(jīng)安裝了它們。 設置通信 更新 /etc/hosts 文件,確保這三臺機器可以使用 IP 和主機名相互通信。 因為 Hadoop 主節點(diǎn)使用 SSH 與從節點(diǎn)通信,所以應該在主節點(diǎn)和從節點(diǎn)之間建立經(jīng)過(guò)身份驗證的無(wú)密碼的 SSH 連接。在每臺機器上執行以下命令,從而生成 RSA 公共和私有密鑰。 這會(huì )在 /root/.ssh 目錄中生成 id_rsa.pub。重命名主節點(diǎn)的 id_rsa.pub(這里改名為 59_rsa.pub)并把它復制到從節點(diǎn)。然后執行以下命令,把主節點(diǎn)的公共密鑰添加到從節點(diǎn)的已授權密鑰中。 cat /root/.ssh/59_rsa.pub >> /root/.ssh/authorized_keys | 現在嘗試使用 SSH 連接從節點(diǎn)。應該可以成功連接,不需要提供密碼。 設置主節點(diǎn) 把 Hadoop 設置為全分布模式需要配置 <Hadoop_home>/conf/ 目錄中的配置文件。 在 hadoop-site.xml 中配置 Hadoop 部署。這里的配置覆蓋 hadoop-default.xml 中的配置。 表 2. 配置屬性 | 屬性 | 解釋 | | fs.default.name | NameNode URI | | mapred.job.tracker | JobTracker URI | | dfs.replication | 復制的數量 | | hadoop.tmp.dir | 臨時(shí)目錄 | hadoop-site.xml <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>fs.default.name</name> <value>hdfs://9.30.210.159:9000</value> </property> <property> <name>mapred.job.tracker</name> <value>9.30.210.159:9001</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/root/hadoop/tmp/</value> </property> </configuration> | 通過(guò)配置 hadoop-env.sh 文件指定 JAVA_HOME。注釋掉這一行并指定自己的 JAVA_HOME 目錄。 export JAVA_HOME=<JAVA_HOME_DIR> | 在 master 文件中添加主節點(diǎn)的 IP 地址。 在 slave 文件中添加從節點(diǎn)的 IP 地址。 9.30.210.160 9.30.210.161 | 設置從節點(diǎn) 把 hadoop-site.xml、hadoop-env.sh、masters 和 slaves 復制到每個(gè)從節點(diǎn);可以使用 SCP 或其他復制工具。 對 HDFS 進(jìn)行格式化 運行以下命令對 HDFS 分布式文件系統進(jìn)行格式化。 <Hadoop_home>/bin/hadoop namenode -format | 檢查 Hadoop 集群 現在,可以使用 bin/start-all.sh 啟動(dòng) Hadoop 集群。命令輸出指出主節點(diǎn)和從節點(diǎn)上的一些日志。檢查這些日志,確認一切正常。如果弄亂了什么東西,可以格式化 HDFS 并清空 hadoop-site.xml 中指定的臨時(shí)目錄,然后重新啟動(dòng)。 訪(fǎng)問(wèn)以下 URL,確認主節點(diǎn)和從節點(diǎn)是正常的。 NameNode: http://9.30.210.159:50070 JobTracker: http://9.30.210.159:50030 | 現在,已經(jīng)在云中設置了 Hadoop 集群,該運行 MapReduce 應用程序了。 MapReduce 應用程序必須具備 “映射” 和 “縮減” 的性質(zhì),也就是說(shuō)任務(wù)或作業(yè)可以分割為小片段以進(jìn)行并行處理。然后,可以縮減每個(gè)子任務(wù)的結果,得到原任務(wù)的結果。這種任務(wù)之一是網(wǎng)站關(guān)鍵字搜索。搜索和抓取任務(wù)可以分割為子任務(wù)并分配給從節點(diǎn),然后在主節點(diǎn)上聚合所有結果并得到最終結果。 試用示例應用程序 Hadoop 附帶一些用于測試的示例應用程序。其中之一是單詞計數器,它統計某一單詞在幾個(gè)文件中出現的次數。通過(guò)運行這個(gè)應用程序檢查 Hadoop 集群。 首先,把輸入文件放在分布式文件系統中(conf/ 目錄下面)。我們將統計單詞在這些文件中出現的次數。 $ bin/hadoop fs –put conf input | 然后,運行這個(gè)示例應用程序,以下命令統計以 “dfs” 開(kāi)頭的單詞出現的次數。 $ bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+' | 命令的輸出說(shuō)明映射和縮減過(guò)程。 前兩個(gè)命令會(huì )在 HDFS 中生成兩個(gè)目錄,“input” 和 “output”??梢允褂靡韵旅盍谐鏊鼈?。 查看分布式文件系統中已經(jīng)輸出的文件。它以鍵-值對的形式列出以 “dfs*” 開(kāi)頭的單詞出現的次數。 $ bin/hadoop fs -cat ouput/* | 現在,訪(fǎng)問(wèn) JobTracker 站點(diǎn)查看完成的作業(yè)日志。 創(chuàng )建 Log Analyzer MapReduce 應用程序 現在創(chuàng )建一個(gè) Portal (IBM WebSphere® Portal v6.0) Log Analyzer 應用程序,它與 Hadoop 中的 WordCount 應用程序有許多共同點(diǎn)。這個(gè)分析程序搜索所有 Portal 的 SystemOut*.log 文件,顯示在特定的時(shí)間段內應用程序在 Portal 上啟動(dòng)了多少次。 在 Portal 環(huán)境中,所有日志分割為 5MB 的片段,很適合由幾個(gè)節點(diǎn)并行地分析。 hadoop.sample.PortalLogAnalyzer.java public class PortalLogAnalyzer { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static String APP_START_TOKEN = "Application started:"; private Text application = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); if(line.indexOf(APP_START_TOKEN) > -1) { int startIndex = line.indexOf(APP_START_TOKEN); startIndex += APP_START_TOKEN.length(); String appName = line.substring(startIndex).trim(); application.set(appName); output.collect(application, new IntWritable(1)); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while(values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws IOException { JobConf jobConf = new JobConf(PortalLogAnalyzer.class); jobConf.setJobName("Portal Log Analizer"); jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(IntWritable.class); jobConf.setMapperClass(Map.class); jobConf.setCombinerClass(Reduce.class); jobConf.setReducerClass(Reduce.class); jobConf.setInputFormat(TextInputFormat.class); jobConf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(jobConf, new Path(args[0])); FileOutputFormat.setOutputPath(jobConf, new Path(args[1])); JobClient.runJob(jobConf); } } | 對 Hadoop API 的完整解釋請參見(jiàn) Hadoop 網(wǎng)站上的 API 文檔。這里只做簡(jiǎn)要說(shuō)明。 Map 類(lèi)實(shí)現映射功能,它搜索日志文件的每一行,尋找應用程序的名稱(chēng)。然后把應用程序名稱(chēng)以鍵-值對的形式放在輸出集合中。 Reduce 類(lèi)計算具有相同鍵(相同應用程序名稱(chēng))的所有值的總和。因此,這個(gè)應用程序最終輸出的鍵-值對表示每個(gè)應用程序在 Portal 上啟動(dòng)的次數。 Main 函數配置并運行 MapReduce 作業(yè)。 運行 PortalLogAnalyzer 首先,把這些 Java 代碼復制到主節點(diǎn)并編譯。把 Java 代碼復制到 <hadoop_home>/workspace 目錄中。對它執行編譯并存檔在一個(gè) Jar 文件中,后面 hadoop 命令將運行這個(gè)文件。 $ mkdir classes $ javac –cp ../hadoop-0.19.1-core.jar –d classes hadoop/sample/PortalLogAnalyzer.java $ jar –cvf PortalLogAnalyzer.jar –C classes/ . | 把 Portal 日志復制到 workspace/input 中。假設有多個(gè)日志文件,其中包含 2009 年 5 月的所有日志。把這些日志放到 HDFS 中。 $ bin/hadoop fs –put workspace/input input2 | 在運行 PortalLogAnalyzer 時(shí),輸出說(shuō)明映射和縮減過(guò)程。 $ bin/hadoop jar workspace/PortalLogAnalizer.jar hadoop.sample.PortalLogAnalizer input2 output2 | 圖 3. 任務(wù)的輸出 應用程序執行完之后,輸出應該與圖 4 相似。 $ bin/hadoop fs –cat output2/* | 圖 4. 部分輸出 在訪(fǎng)問(wèn) JobTracker 站點(diǎn)時(shí),會(huì )看到另一個(gè)完成的作業(yè)。注意圖 5 中的最后一行。 圖 5. 完成的作業(yè) |