使用Cygwin模擬Linux環(huán)境安裝配置運行基于單機的Hadoop
其實(shí),使用Cygwin模擬Linux環(huán)境來(lái)運行Hadoop是非常輕松的,只需要簡(jiǎn)單地配置一下就可以運行基于單機的Hadoop。
這里,比較關(guān)鍵的就是Cygwin的安裝,在選擇安裝的時(shí)候一定要安裝openssh,否則不會(huì )成功的,下面簡(jiǎn)單說(shuō)一下Cygwin的安裝及其配置:
| Cygwin的下載安裝 首先點(diǎn)擊http://cygwin.com/setup.exe下載setup.exe,例如保存到桌面,點(diǎn)擊就可以進(jìn)行下載安裝了。 在選擇安裝類(lèi)型的時(shí)候,最好是選擇第一個(gè),直接從網(wǎng)絡(luò )上下載并緊接著(zhù)安裝,如圖所示: 然后選擇安裝路徑、安裝文件存放路徑、連接方式(這里選擇Use IE5 Settings)、下載站點(diǎn)鏡像,自動(dòng)創(chuàng )建下載文件列表,接下來(lái)一步比較重要了:選擇安裝類(lèi)型,可以單擊最頂層的All后面的循環(huán)樣式圖標切換安裝類(lèi)型,是的最頂層All行的最后一個(gè)單詞為Install,如圖所示: 其實(shí),如果你選擇了Install安裝類(lèi)型,就已經(jīng)選擇了openssh包。 為讓你看到openssh包,你可以在Net [圖標] Install 下面看到與網(wǎng)絡(luò )有關(guān)的包,如圖所示: 向下滑動(dòng)滾動(dòng)條,可以看到openssh,如圖所示: 在Cirrent下如果顯示版本號,說(shuō)明該包已經(jīng)被此次安裝選擇上了,否則的話(huà)會(huì )顯示一個(gè)Skip,意思是跳過(guò)該包,并不會(huì )安裝該包的。 最后就等著(zhù)下載安裝了,這個(gè)過(guò)程可能會(huì )花費一點(diǎn)時(shí)間的。 Cygwin的配置 安裝完成之后,例如我的Cygwin安裝在G:\Cygwin\目錄下面,進(jìn)行配置如下: 設置環(huán)境變量: 在系統變量中新建變量【變量名:CYGWIN,變量值:ntsec tty】;編輯添加變量【變量名:Path,變量值:G:\Cygwin\bin;其它的保留】。 OK,基本配置好了,可以配置Hadoop了。 |
Hadoop目前的有幾個(gè)版本:hadoop-0.16.4、hadoop-0.18.0,到Apache下載一個(gè)并解壓縮即可。
將解壓縮的Hadoop放到G盤(pán)下,例如我的是:G:\hadoop-0.16.4。
配置Hadoop只需要修改G:\hadoop-0.16.4\conf目錄下的hadoop-env.sh文件即可,打開(kāi)它你可以看到:
| # The java implementation to use. Required. # export JAVA_HOME=/usr/lib/j2sdk1.5-sun |
將第二行的注釋符號去掉,同時(shí)指定在你的機器上JAVA_HOME的值,如下為我修改的內容:
| # The java implementation to use. Required. export JAVA_HOME="D:\Program Files\Java\jdk1.6.0_07" |
這里要注意,如果你的JDK安裝目錄中存在空格,需要使用雙引號引起來(lái),否則就會(huì )報錯。
啟動(dòng)Cygwin,當前它是在home/yourname目錄下的,如圖所示:
切換到根目錄下,從而進(jìn)入G:\hadoop-0.16.4目錄,并創(chuàng )建一個(gè)數據輸入目錄input-dir,如圖所示:
下面,打開(kāi)G:\hadoop-0.16.4\input-dir,在該目錄下新建幾個(gè)記事本文件,例如我創(chuàng )建了3個(gè):input-a.txt、input-b.txt、input-c.txt。三個(gè)文件的內容如下所示:
| input-a.txt:as after append actor as as add as apache as after add as input-b.txt:bench be bench believe background bench be block input-c.txt:cafe cat communications connection cat cat cat cust cafe |
接著(zhù)就可以執行Hadoop自帶的一個(gè)統計英文單詞出現頻率的例子了,直接輸入命令bin/hadoop jar hadoop-0.16.4-examples.jar wordcount input-dir output-dir,其中hadoop-0.16.4-examples.jar是G:\hadoop-0.16.4包中的例子,input-dir是數據輸入目錄,在里面已經(jīng)存在我們創(chuàng )建的三個(gè)文件了,output-dir是經(jīng)過(guò)Hadoop處理后的輸出結果的目錄,這里,你需要對Google的MapReduce算法有一個(gè)簡(jiǎn)單的了解,主要就是在處理數據的時(shí)候是怎樣的一個(gè)流程,引用IBM上的一片技術(shù)文章片段了解一下:
| 引 用 用 MapReduce 來(lái)處理大數據集的過(guò)程, 這個(gè) MapReduce 的計算過(guò)程簡(jiǎn)而言之,就是將大數據集分解為成百上千的小數據集,每個(gè)(或若干個(gè))數據集分別由集群中的一個(gè)結點(diǎn)(一般就是一臺普通的計算機)進(jìn)行處理并生成中間結果,然后這些中間結果又由大量的結點(diǎn)進(jìn)行合并, 形成最終結果。 計算模型的核心是 Map 和 Reduce 兩個(gè)函數,這兩個(gè)函數由用戶(hù)負責實(shí)現,功能是按一定的映射規則將輸入的 <key, value> 對轉換成另一個(gè)或一批 <key, value> 對輸出。 表一 Map 和 Reduce 函數
| 函數 | 輸入 | 輸出 | 說(shuō)明 | | Map | <k1, v1> | List(<k2,v2>) | 1. 將小數據集進(jìn)一步解析成一批 <key,value> 對,輸入 Map 函數中進(jìn)行處理。 2. 每一個(gè)輸入的 <k1,v1> 會(huì )輸出一批 <k2,v2>。 <k2,v2> 是計算的中間結果。 | | Reduce | <k2,List(v2)> | <k3,v3> | 輸入的中間結果 <k2,List(v2)> 中的 List(v2) 表示是一批屬于同一個(gè) k2 的 value | 基于 MapReduce 計算模型編寫(xiě)分布式并行程序非常簡(jiǎn)單,程序員的主要編碼工作就是實(shí)現 Map 和 Reduce 函數,其它的并行編程中的種種復雜問(wèn)題,如分布式存儲,工作調度,負載平衡,容錯處理,網(wǎng)絡(luò )通信等,均由 MapReduce 框架(比如 Hadoop )負責處理,程序員完全不用操心。 本地計算 數據存儲在哪一臺計算機上,就由這臺計算機進(jìn)行這部分數據的計算,這樣可以減少數據在網(wǎng)絡(luò )上的傳輸,降低對網(wǎng)絡(luò )帶寬的需求。在 Hadoop 這樣的基于集群的分布式并行系統中,計算結點(diǎn)可以很方便地擴充,而因它所能夠提供的計算能力近乎是無(wú)限的,但是由是數據需要在不同的計算機之間流動(dòng),故網(wǎng)絡(luò )帶寬變成了瓶頸,是非常寶貴的,“本地計算”是最有效的一種節約網(wǎng)絡(luò )帶寬的手段,業(yè)界把這形容為“移動(dòng)計算比移動(dòng)數據更經(jīng)濟”。 任務(wù)粒度 把原始大數據集切割成小數據集時(shí),通常讓小數據集小于或等于 HDFS 中一個(gè) Block 的大小(缺省是 64M),這樣能夠保證一個(gè)小數據集位于一臺計算機上,便于本地計算。有 M 個(gè)小數據集待處理,就啟動(dòng) M 個(gè) Map 任務(wù),注意這 M 個(gè) Map 任務(wù)分布于 N 臺計算機上并行運行,Reduce 任務(wù)的數量 R 則可由用戶(hù)指定。 Partition 把 Map 任務(wù)輸出的中間結果按 key 的范圍劃分成 R 份( R 是預先定義的 Reduce 任務(wù)的個(gè)數),劃分時(shí)通常使用 hash 函數如: hash(key) mod R,這樣可以保證某一段范圍內的 key,一定是由一個(gè) Reduce 任務(wù)來(lái)處理,可以簡(jiǎn)化 Reduce 的過(guò)程。 Combine 在 partition 之前,還可以對中間結果先做 combine,即將中間結果中有相同 key的 <key, value> 對合并成一對。combine 的過(guò)程與 Reduce 的過(guò)程類(lèi)似,很多情況下就可以直接使用 Reduce 函數,但 combine 是作為 Map 任務(wù)的一部分,在執行完 Map 函數后緊接著(zhù)執行的。Combine 能夠減少中間結果中 <key, value> 對的數目,從而減少網(wǎng)絡(luò )流量。 Reduce 任務(wù)從 Map 任務(wù)結點(diǎn)取中間結果 Map 任務(wù)的中間結果在做完 Combine 和 Partition 之后,以文件形式存于本地磁盤(pán)。中間結果文件的位置會(huì )通知主控 JobTracker, JobTracker 再通知 Reduce 任務(wù)到哪一個(gè) DataNode 上去取中間結果。注意所有的 Map 任務(wù)產(chǎn)生中間結果均按其 Key 用同一個(gè) Hash 函數劃分成了 R 份,R 個(gè) Reduce 任務(wù)各自負責一段 Key 區間。每個(gè) Reduce 需要向許多個(gè) Map 任務(wù)結點(diǎn)取得落在其負責的 Key 區間內的中間結果,然后執行 Reduce 函數,形成一個(gè)最終的結果文件。 任務(wù)管道 有 R 個(gè) Reduce 任務(wù),就會(huì )有 R 個(gè)最終結果,很多情況下這 R 個(gè)最終結果并不需要合并成一個(gè)最終結果。因為這 R 個(gè)最終結果又可以做為另一個(gè)計算任務(wù)的輸入,開(kāi)始另一個(gè)并行計算任務(wù)。 |
執行過(guò)程如下所示:
| SHIYANJUN@cbbd2ce9428e48b /cygdrive/g/hadoop-0.16.4 $ bin/hadoop jar hadoop-0.16.4-examples.jar wordcount input-dir output-dir cygpath: cannot create short name of g:\hadoop-0.16.4\logs 08/09/12 20:42:50 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName =JobTracker, sessionId= 08/09/12 20:42:51 INFO mapred.FileInputFormat: Total input paths to process : 3 08/09/12 20:42:51 INFO mapred.JobClient: Running job: job_local_1 08/09/12 20:42:51 INFO mapred.MapTask: numReduceTasks: 1 08/09/12 20:42:52 INFO mapred.JobClient: map 0% reduce 0% 08/09/12 20:42:52 INFO mapred.LocalJobRunner: file:/g:/hadoop-0.16.4/input-dir/i nput-a.txt:0+57 08/09/12 20:42:52 INFO mapred.TaskRunner: Task 'job_local_1_map_0000' done. 08/09/12 20:42:52 INFO mapred.TaskRunner: Saved output of task 'job_local_1_map_ 0000' to file:/g:/hadoop-0.16.4/output-dir 08/09/12 20:42:53 INFO mapred.MapTask: numReduceTasks: 1 08/09/12 20:42:53 INFO mapred.LocalJobRunner: file:/g:/hadoop-0.16.4/input-dir/i nput-b.txt:0+48 08/09/12 20:42:53 INFO mapred.TaskRunner: Task 'job_local_1_map_0001' done. 08/09/12 20:42:53 INFO mapred.TaskRunner: Saved output of task 'job_local_1_map_ 0001' to file:/g:/hadoop-0.16.4/output-dir 08/09/12 20:42:53 INFO mapred.MapTask: numReduceTasks: 1 08/09/12 20:42:53 INFO mapred.LocalJobRunner: file:/g:/hadoop-0.16.4/input-dir/i nput-c.txt:0+56 08/09/12 20:42:53 INFO mapred.TaskRunner: Task 'job_local_1_map_0002' done. 08/09/12 20:42:53 INFO mapred.TaskRunner: Saved output of task 'job_local_1_map_ 0002' to file:/g:/hadoop-0.16.4/output-dir 08/09/12 20:42:53 INFO mapred.JobClient: map 100% reduce 0% 08/09/12 20:42:54 INFO mapred.LocalJobRunner: reduce > reduce 08/09/12 20:42:54 INFO mapred.TaskRunner: Task 'reduce_z7f1uq' done. 08/09/12 20:42:54 INFO mapred.TaskRunner: Saved output of task 'reduce_z7f1uq' t o file:/g:/hadoop-0.16.4/output-dir 08/09/12 20:42:54 INFO mapred.JobClient: Job complete: job_local_1 08/09/12 20:42:54 INFO mapred.JobClient: Counters: 9 08/09/12 20:42:54 INFO mapred.JobClient: Map-Reduce Framework 08/09/12 20:42:54 INFO mapred.JobClient: Map input records=3 08/09/12 20:42:54 INFO mapred.JobClient: Map output records=30 08/09/12 20:42:54 INFO mapred.JobClient: Map input bytes=161 08/09/12 20:42:54 INFO mapred.JobClient: Map output bytes=284 08/09/12 20:42:54 INFO mapred.JobClient: Combine input records=30 08/09/12 20:42:54 INFO mapred.JobClient: Combine output records=16 08/09/12 20:42:54 INFO mapred.JobClient: Reduce input groups=16 08/09/12 20:42:54 INFO mapred.JobClient: Reduce input records=16 $ |
上面是根據給定的輸入的數據進(jìn)行執行的一個(gè)過(guò)程,其實(shí)結果已經(jīng)輸出到output-dir目錄中,你可以進(jìn)行查看,在目錄G:\hadoop-0.16.4\output-dir下面生成了兩個(gè)文件:.part-00000.crc和part-00000。
查看處理結果,如下所示:
或者,直接到G:\hadoop-0.16.4\output-dir目錄下面打開(kāi)part-00000文件查看即可,內容如下所示:
| actor 1 add 2 after 2 apache 1 append 1 as 6 background 1 be 2 believe 1 bench 3 block 1 cafe 2 cat 4 communications 1 connection 1 cust 1 |
同上面的是一樣的。
這是一個(gè)非常簡(jiǎn)單的例子,而且Hadoop在其中實(shí)現了Google的MapReduce算法,用以處理數據。
我們可以簡(jiǎn)單看一下關(guān)于WordCount類(lèi)的實(shí)現,在Hadoop的發(fā)行包中也附帶了例子的源代碼,WordCount.java類(lèi)實(shí)現如下所示:
| package org.apache.hadoop.examples; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** /** * 這是一個(gè)Hadoop Map/Reduce應用的例子。 * 讀取輸入的文件,實(shí)現將文件的每一行分解成單個(gè)的單詞并統計單詞的出現頻率。 * 輸出結果是被分解的單詞的列表及其詞頻。 * 使用如下命令可以運行: bin/hadoop jar build/hadoop-examples.jar wordcount * [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> */ public class WordCount extends Configured implements Tool {
/** * MapClass是一個(gè)內部靜態(tài)類(lèi)。統計數據文件中每一行的單詞。 */ public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } }
/** * Reduce是一個(gè)內部靜態(tài)類(lèi)。作為統計單詞數量的中間結果類(lèi),由于這個(gè)例子簡(jiǎn)單無(wú)須執行中間結果的合并。 */ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }
static int printUsage() { // 提示輸入命令的用法 System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; }
/** * map/reduce程序的驅動(dòng)部分,用于實(shí)現提交map/reduce任務(wù)。 */ public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount");
// the keys are words (strings) conf.setOutputKeyClass(Text.class); // the values are counts (ints) conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); List<String> other_args = new ArrayList<String>(); for(int i=0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); } } // Make sure there are exactly 2 parameters left. if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } conf.setInputPath(new Path(other_args.get(0))); conf.setOutputPath(new Path(other_args.get(1))); JobClient.runJob(conf); return 0; }
public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } } |
通過(guò)對著(zhù)這個(gè)例子進(jìn)行簡(jiǎn)單的說(shuō)明,大致了解一下MapReduce算法的思想及其實(shí)現。
解決Window環(huán)境下啟動(dòng)Hadoop0.21.0時(shí)出現的 java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName 異常
在Window下啟動(dòng)Hadoop-0.21.0版本時(shí),會(huì )出現下面這樣的錯誤提示:
1 java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName
2 Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.util.PlatformName
3
4 at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
5 at java.security.AccessController.doPrivileged(Native Method)
6 at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
7 at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
8 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
9 at java.lang.ClassLoader.loadClass(ClassLoader.java:248)
10 Could not find the main class: org.apache.hadoop.util.PlatformName. Program wil
11 l exit.
經(jīng)過(guò)不斷的查找原因和嘗試,終于有了解決這個(gè)錯誤的辦法,只需要將${HADOOP_HOME}/bin/hadoop-config.sh文件中的第190行的一下的內容
JAVA_PLATFORM=`CLASSPATH=${CLASSPATH} ${JAVA} -Xmx32m ${HADOOP_JAVA_PLATFORM_OPTS} org.apache.hadoop.util.PlatformName | sed -e "s/ /_/g"`
修改成以下的內容即可
JAVA_PLATFORM=`CLASSPATH=${CLASSPATH} ${JAVA} -Xmx32m -classpath ${HADOOP_COMMON_HOME}/hadoop-common-0.21.0.jar org.apache.hadoop.util.PlatformName | sed -e "s/ /_/g"`