MapReduce把對數據記錄的所有操作都歸結兩個(gè)步驟--Map和Reduce。其中Map對現有數據做一個(gè)先期處理,
得到一個(gè)中間數據集,Reduce再對中間數據集進(jìn)行去重、過(guò)濾等后期處理,最后得到你要的結果。Hadoop是一個(gè)MapReduce的實(shí)現,Nutch項目的大容量數據處理等功能就構建在Hadoop之上。
過(guò)程原形:
Map :: (InitialKey, IntialValue) -> [(InterKey, InterValue)]
Reduce :: (Interkey, InterValuesIterator) -> [(InterKey, InterValue)]
Map接收一個(gè)Key、Value對,返回一個(gè)Key、Value對(如果原始的Key、Value對不滿(mǎn)足你的要求你可以不返回,或者你有特殊需求也可以返回多個(gè),一般比較少見(jiàn)), Reduce接收一個(gè)Key和一個(gè)Values的Iterator,你可以根據情況返回零個(gè)或多個(gè)Key,Value對。Key是一個(gè)實(shí)現了org.apache.hadoop.WritableComparable接口的類(lèi),Value則實(shí)現了Writable,WritableComparable是Writable的子接口,Writable定義了輸入輸出(序列化)的接口,WritableComparable另外繼承Comparable,因此Key總是有序的。
一個(gè)使用MapReduce的功能塊典型結構如下:
新建一個(gè)JobConf
設置輸入路徑
設置輸入文件格式
設置輸入Key,Value類(lèi)型
設置輸出路徑
設置輸出文件格式
設置輸出Key,Value類(lèi)型
設置Partitioner
啟動(dòng)Job
執行完畢,你所有存放在輸入路徑下的數據都在被轉換之后按照你指定的格式存放于輸出路徑中,
ok,我知道很多人都是看code比看document更興奮,很不幸,我也是其中一員。
. 排序
比如你有一批URL,存放在文本文件c:/tmp/tmpin/urllist.txt中,一行一個(gè)
http://www.sohu.com
http://www.163.com
http://www.sina.com.cn
...
輸入格式一般是SequenceFileInputFormat,但是對于urllist.txt這種按行排列的格式卻是另外一種(我不知道的)格式,簡(jiǎn)單來(lái)說(shuō)就是你不需要設定輸入格式就能處理它了,在MapReduce傳遞給你的Map時(shí),你可以忽略Key值,
而Value就是URL了,然后你做一個(gè)轉換把Value(URL)作為Key來(lái)存放,然后你自己合成一個(gè)Value值,比如只是簡(jiǎn)單的一個(gè)數字1,或者是一個(gè)URL的相關(guān)信息,DocumentId, 預計抓取時(shí)間等。因為這次的任務(wù)是排序,而事實(shí)上經(jīng)過(guò)Map處理后數據已經(jīng)是按Key(URL)排好序了,所以Reduce可以什么也不做。
public class Main {
public static class InjectMapper extends MapReduceBase implements Mapper {
public void map(WritableComparable key, Writable val,
OutputCollector output, Reporter reporter) throws IOException {
UTF8 url = (UTF8) val;
UTF8 v = new UTF8("1");
output.collect(url, v); //生成數據
}
}
public static void main(String[] args) throws IOException {
JobConf job = new JobConf();
Path urlsPath = new Path("C:/tmp/tmpin");
job.setInputPath(urlsPath);
Path outputPath = new Path("c:/tmp/tmpout");
job.setOutputPath(outputPath);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(UTF8.class);
job.setMapperClass(InjectMapper.class);
//job.setReduceClass(InjectReducer.class);
JobClient.runJob(job);
}
}
. 去重
你的原始URL列表中可能有相同的URL,去除相同Key值記錄的功能就需要Reduce了,定義一個(gè)類(lèi)
public static class InjectReducer extends MapReduceBase implements Reducer {
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
output.collect(key, (Writable) values.next());
}
}
并傳遞給JobConf就Ok了,由此可以看出Reduce過(guò)程對于同一個(gè)Key只被調用一次,那個(gè)values的Iterator包含這個(gè)Key所對應的全部記錄,你可以簡(jiǎn)單的只取第一條數據,對這些記錄進(jìn)行比較,得到你認為最有效的一條記錄,或者統計每一個(gè)Key都有多少條記錄,一句話(huà),你可以做任何事情(事實(shí)上是任何MapReduce所支持的事情)。
聯(lián)系客服