之前在實(shí)現一些機器學(xué)習算法時(shí),跑數據量、feature很多的數據集往往要耗費很多時(shí)間,尤其是處理大量文本數據時(shí)候,單機跑算法的時(shí)間讓我無(wú)法容忍,理論上如果合理的將大數據量分布式并行計算框架(例如hadoop)應用到這些算法上能有效提高算法執行速度(當然,要求算法本身可以全部或部分進(jìn)行并行化處理),下一步想要學(xué)習Mahout(http://mahout.apache.org/),它的目標是:build scalable machine learning libraries,它是基于hadoop的,所以在此之前需要學(xué)習一下Hadoop,先從hadoop搭建開(kāi)始吧。
手頭上有三臺配置一樣的電腦,就不去裝虛擬機了,配置如下:
CPU:Intel(R) Pentium(R) Dual CPU E2200 @ 2.20GHz
Memory:2001MiB
Network:NetLink BCM5786 Gigabit Ethernet
三臺電腦裝有相同的操作系統——Ubuntu 11.04
任選一臺機器作為master,其他機器作為slaves,所有機器擁有相同的用戶(hù)、相同的環(huán)境變量配置、相同的hadoop目錄結構、相同的Java目錄結構。
master機器:在終端執行:sudo gedit /etc/hosts,添加以下信息:
172.22.9.209 namenode-m
172.22.9.185 datanode-1
172.22.9.220 datanode-2
slaves機器:處理方式類(lèi)似。
1)、為所有機器安裝ssh:在終端運行:sudo apt-get install ssh,查看/leozhang目錄下是否有.ssh文件夾(需要View->Show Hidden Files才能看見(jiàn)隱藏文件),如果沒(méi)有,在終端運行:sudo mkdir .ssh;
2)、在終端運行:
cd .ssh
#生成公鑰、私鑰密鑰對
ssh-keygen #一直回車(chē)
#將公鑰內容復制到authorized_keys文件
cp id_rsa.pub authorized_keys
#設定authorized_keys文件屬性為-rw-r–r–,即文件屬主擁有讀寫(xiě)權限,與文件屬主同組的用戶(hù)擁有讀權限,其他人擁有讀權限。
chmod 644 authorized_keys
#將公鑰拷貝到slaves
scp authorized_keys datanode-1:/home/leozhang/.ssh #這里也可以是scp authorized_keys leozhang@datanode-1:/home/leozhang/.ssh
scp authorized_keys datanode-2:/home/leozhang/.ssh #同上
最后測試設置是否成功,如:ssh datanode-1,如果不用輸入密碼就能登錄,說(shuō)明設置成功。
1)、從http://www.oracle.com/technetwork/java/javase/downloads/java-se-jdk-7-download-432154.html下載jdk-7-linux-i586.tar.gz,解壓后得到文件夾:jdk1.7.0,(例如下載并解壓到了:/home/leozhang/Downloads);
2)、在所有機器上做如下操作:在/usr建立文件夾java:在終端執行:sudo mkdir /usr/java,并將jdk1.7.0拷貝到j(luò )ava文件夾:進(jìn)入/home/leozhang/Downloads目錄,在終端執行sudo mv jdk1.7.0 /usr/java;
3)、在終端執行:sudo gedit /etc/profile,在文件末尾添加:
JAVA_HOME=”/usr/java/jdk1.7.0″
export JAVA_HOME
PATH=$JAVA_HOME/bin:$PATH
export PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$CLASSPATH
export CLASSPATH
4)、在終端執行:
cd /usr/java
scp -r java1.7.0 leozhang@datanode-1:/usr/java
scp -r java1.7.0 leozhang@datanode-2:/usr/java
hadoop包含三個(gè)部分:
Hadoop Common: The common utilities that support the other Hadoop subprojects.
Hadoop Distributed File System (HDFS?): A distributed file system that provides high-throughput access to application data.
Hadoop MapReduce: A software framework for distributed processing of large data sets on compute clusters.
1)、從http://labs.renren.com/apache-mirror//hadoop/core/hadoop-0.20.204.0/下載hadoop-0.20.204.0.tar.gz,解壓到home/leozhang中并重命名為hadoop;
2)、在終端執行:sudo gedit /etc/profile,在文件末尾添加:
HADOOP_HOME=/home/leozhang/hadoop
export HADOOP_HOME
export HADOOP=$HADOOP_HOME/bin
export PATH=$HADOOP:$PATH
3)、hadoop配置文件
在hadoop文件夾中有一個(gè)conf文件夾,里面是hadoop所需的配置文件,主要關(guān)注的有以下幾個(gè):
●hadoop-env.sh
需要改動(dòng)的只有一處,設置JAVA_HOME。
# The java implementation to use. Required.
export JAVA_HOME=/usr/java/jdk1.7.0
●core-site.xml
fs.default.name指出NameNode所在的地址,NameNode要跑在master機器上。
<configuration>
< property>
< name>fs.default.name</name>
< value>hdfs://172.22.9.209:9000</value>
< /property>
<property>
< name>hadoop.logfile.size</name>
< value>10000000</value>
< description>The max size of each log file</description>
< /property>
<property>
< name>hadoop.logfile.count</name>
< value>10</value>
< description>The max number of log files</description>
< /property>
</configuration>
●hdfs-site.xml
dfs.replication默認是3,如果DataNode個(gè)數小于3會(huì )報錯。
<configuration>
< property>
< name>dfs.replication</name>
< value>2</value>
< /property>
< /configuration>
●mapred-site.xml
mapred.job.tracker指出jobtracker所在地址,其它項不去配置則都為默認值。
<configuration>
< property>
< name>mapred.job.tracker</name>
< value>172.22.9.209:9001</value>
< /property>
< /configuration>
關(guān)于配置文件的詳細信息可以在http://hadoop.apache.org/common/docs/stable/cluster_setup.html中找到。
●masters
172.22.9.209
●slaves
172.22.9.185
172.22.9.220
4)、在終端執行:
cd /home/leozhang
scp -r hadoop leozhang@datanode-1:/home/leozhang
scp -r hadoop leozhang@datanode-2:/home/leozhang
5)、在終端執行:source /etc/profile,如果不管用就注銷(xiāo)然后重新登錄。
需要下載eclipse,地址是http://www.eclipse.org/downloads/,也可以在終端運行sudo apt-get install eclipse,可以裝個(gè)mapreduce的插件,方便在單機調試代碼,那個(gè)插件在下載的hadoop的目錄里,如:/home/leozhang/hadoop/contrib/eclipse-plugin/hadoop-eclipse-plugin-0.20.204.0.jar,把它拷貝到eclipse安裝目錄的plugins文件夾中即可。
第一次使用需要初始化NameNode,在master機器的終端上執行:hadoop namenode -format;
在master機器的終端上執行:start-all.sh,可以用jps來(lái)查看本機的java進(jìn)程,在master上啟動(dòng)了3個(gè)進(jìn)程:JobTracker、SecondaryNameNode、NameNode,而slaves機器上有2個(gè)進(jìn)程:TaskTracker、DataNode;需要停止進(jìn)程,只要在master機器的終端上執行:stop-all.sh。
在http://localhost:50070/可以看到NameNode的詳細信息,如:

mapreduce很適合數據之間相關(guān)性較低且數據量龐大的情況,map操作將原始數據經(jīng)過(guò)特定操作打散后輸出,作為中間結果,hadoop通過(guò)shuffle操作對中間結果排序,之后,reduce操作接收中間結果并進(jìn)行匯總操作,最后將結果輸出到文件中,從這里也可以看到在hadoop中,hdfs是mapreduce的基石??梢杂孟旅孢@幅圖描述map和reduce的過(guò)程:
有人用這么一句話(huà)解釋mapreduce:
We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.
我們要數圖書(shū)館中的所有書(shū)。你數1號書(shū)架,我數2號書(shū)架。這就是“Map”。我們人越多,數書(shū)就更快。
Now we get together and add our individual counts. That’s reduce.
現在我們到一起,把所有人的統計數加在一起。這就是“Reduce”。
將待排序文本上傳到hdfs上并放在input文件夾中,在終端執行:hadoop dfs –mkdir input;
假設數據文件data.txt放在本地磁盤(pán)的/home/leozhang/testdata中,在終端執行:cd /home/leozhang/testdata;hadoop dfs –put data input/
借鑒快速排序的思路:假設為升序排序,那么每完成一次partition,pivot左邊所有元素的值都小于等于pivot,而pivot右邊的所有元素的值都大于等于pivot,如果現在有N個(gè)pivot,那么數據就被map成了N+1個(gè)區間,讓reducer個(gè)數等于N+1,將不同區間的數據發(fā)送到相應區間的reducer;hadoop利用shuffle操作將這N+1份數據自動(dòng)排序,reduce操作只需要接收中間結果后直接輸出到文件即可。
由此歸納出用hadoop對大量數據排序的步驟:
1)、對待排序數據進(jìn)行抽樣;
2)、對抽樣數據進(jìn)行排序,產(chǎn)生pivot(例如得到的pivot為:3,9,11);
3)、Map對輸入的每條數據計算其處于哪兩個(gè)pivot之間,之后將數據發(fā)給相應的reduce(例如區間劃分為:<3、[3,9)、>=9,分別對應reducer0、reducer1、reducer2);
4)、Reduce將獲得數據直接輸出。
數據抽樣由:RandomSelectMapper和RandomSelectReducer完成,數據劃分由ReducerPatition完成,排序輸出由SortMapper和SortReducer完成,執行順序為:RandomSelectMapper –> RandomSelectReducer –> SortMapper –> SortReducer。
這個(gè)實(shí)現方式總覺(jué)得不給力,尤其是數據劃分那塊兒,不知道大家會(huì )怎么做,指導一下我吧,呵呵。代碼可以從這里得到。
1)、pivot的選取采用隨機的方式:
1: package MRTEST.Sort;
2:
3: import java.io.IOException;
4: import java.util.Random;
5: import java.util.StringTokenizer;
6:
7: import org.apache.hadoop.io.Text;
8: import org.apache.hadoop.mapreduce.Mapper;
9:
10: public class RandomSelectMapper
11: extends Mapper<Object, Text, Text, Text>{12: private static int currentSize = 0;
13: private Random random = new Random();
14:
15: public void map(Object key, Text value, Context context)
16: throws IOException, InterruptedException{17: StringTokenizer itr = new StringTokenizer(value.toString());
18: while(itr.hasMoreTokens()){19: currentSize++;
20: Random ran = new Random();
21: if(random.nextInt(currentSize) == ran.nextInt(1)){22: Text v = new Text(itr.nextToken());
23: context.write(v, v);
24: }
25: else{26: itr.nextToken();
27: }
28: }
29: }
30:
31: }
pivot的排序由hadoop完成:
1: package MRTEST.Sort;
2:
3: import java.io.IOException;
4:
5: import org.apache.hadoop.io.Text;
6: import org.apache.hadoop.mapreduce.Reducer;
7:
8: public class RandomSelectReducer
9: extends Reducer<Text, Text, Text, Text>{10:
11: public void reduce(Text key, Iterable<Text> values, Context context)
12: throws IOException, InterruptedException{13:
14: for (Text data : values) {15: context.write(null,data);
16: break;
17: }
18: }
19: }
2)、SortMapper直接讀取數據:
1: package MRTEST.Sort;
2:
3: import java.io.IOException;
4: import java.util.StringTokenizer;
5:
6: import org.apache.hadoop.io.Text;
7: import org.apache.hadoop.mapreduce.Mapper;
8:
9: public class SortMapper
10: extends Mapper<Object, Text, Text, Text> {11:
12: public void map(Object key, Text values,
13: Context context) throws IOException,InterruptedException {14: StringTokenizer itr = new StringTokenizer(values.toString());
15: while (itr.hasMoreTokens()) {16: Text v = new Text(itr.nextToken());
17: context.write(v, v);
18: }
19: }
20:
21: }
向相應的Reducer分發(fā)數據:
1: package MRTEST.Sort;
2:
3: import org.apache.hadoop.io.Text;
4: import org.apache.hadoop.mapreduce.Partitioner;
5:
6: public class ReducerPartition
7: extends Partitioner<Text, Text>{8:
9: public int getPartition(Text key, Text value ,int numPartitions){10: return HadoopUtil.getReducerId(value, numPartitions);
11: }
12: }
最后由SortReducer輸出結果:
1: package MRTEST.Sort;
2:
3: import java.io.IOException;
4:
5:
6: import org.apache.hadoop.io.Text;
7: import org.apache.hadoop.mapreduce.Reducer;
8:
9: public class SortReducer
10: extends Reducer<Text, Text, Text, Text> {11:
12: public void reduce(Text key, Iterable<Text> values,
13: Context context) throws IOException, InterruptedException {14:
15: for (Text data : values) {16: context.write(key,data);
17: }
18: }
19: }
3)、作業(yè)的組織由SortDriver完成:
1: package MRTEST.Sort;
2:
3: import java.io.IOException;
4:
5: import org.apache.hadoop.conf.Configuration;
6: import org.apache.hadoop.fs.Path;
7: import org.apache.hadoop.io.Text;
8: import org.apache.hadoop.mapreduce.Job;
9: import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10: import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11: import org.apache.hadoop.util.GenericOptionsParser;
12:
13:
14: public class SortDriver {15:
16: public static void runPivotSelect(Configuration conf,
17: Path input,
18: Path output) throws IOException, ClassNotFoundException, InterruptedException{19:
20: Job job = new Job(conf, "get pivot");
21: job.setJarByClass(SortDriver.class);
22: job.setMapperClass(RandomSelectMapper.class);
23: job.setReducerClass(RandomSelectReducer.class);
24: job.setOutputKeyClass(Text.class);
25: job.setOutputValueClass(Text.class);
26: FileInputFormat.addInputPath(job, input);
27: FileOutputFormat.setOutputPath(job, output);
28: if(!job.waitForCompletion(true)){29: System.exit(2);
30: }
31: }
32:
33: public static void runSort(Configuration conf,
34: Path input,
35: Path partition,
36: Path output) throws IOException, ClassNotFoundException, InterruptedException{37: Job job = new Job(conf, "sort");
38: job.setJarByClass(SortDriver.class);
39: job.setMapperClass(SortMapper.class);
40: job.setCombinerClass(SortReducer.class);
41: job.setPartitionerClass(ReducerPartition.class);
42: job.setReducerClass(SortReducer.class);
43: job.setOutputKeyClass(Text.class);
44: job.setOutputValueClass(Text.class);
45: HadoopUtil.readPartition(conf, new Path(partition.toString() + "\\part-r-00000"));
46: job.setNumReduceTasks(HadoopUtil.pivots.size());
47: FileInputFormat.addInputPath(job, input);
48: FileOutputFormat.setOutputPath(job, output);
49:
50: System.exit(job.waitForCompletion(true) ? 0 : 1);
51: }
52:
53: public static void main(String[] args) throws Exception {54: Configuration conf = new Configuration();
55: String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
56: if (otherArgs.length != 3) { 57: System.err.println("Usage: sort <input> <partition> <output>");58: System.exit(2);
59: }
60:
61: Path input = new Path(otherArgs[0]);
62: Path partition = new Path(otherArgs[1]);
63: Path output = new Path(otherArgs[2]);
64:
65: HadoopUtil.delete(conf, partition);
66: HadoopUtil.delete(conf, output);
67:
68: SortDriver.runPivotSelect(conf,input,partition);
69: SortDriver.runSort(conf,input, partition, output);
70: }
71: }
在master機器上,單擊eclipse的File菜單中的Export,選擇Java –> JAR file,單擊Next,在左邊樹(shù)形結構中把你想打包的文件勾選,單擊Next,再單擊Next,在Main class里選擇應用程序入口(可選項),最后點(diǎn)Finish,可以看到一個(gè)jar文件,例如:Sort.jar。
進(jìn)入Sort.jar所在路徑,在終端輸入:hadoop jar Sort.jar input partition output
在http://localhost:50030中可以跟蹤所有作業(yè)的執行情況。
在hdfs上查看結果,終端輸入:hadoop dfs –cat output/*,或者將hdfs上的文件抓到本地查看:hadoop dfs –get output output。
聯(lián)系客服