MapReduce程序本質上是並行運行的,所以能夠將大規模的數據分析任務分發給任何一個擁有足夠多機器的數據中心。網絡
MapReduce任務過程分爲兩個處理階段:map階段和reduce階段。每階段都以鍵-值對做爲輸入和輸出。map函數是一個數據準備階段,經過這種方式來準備數據,使reduce函數可以繼續對它進行處理。除此以外,map函數仍是一個比較適合去除已損記錄的地方。app
public class MyMapper extends MapReduceBase框架
implements Mapper<LongWritable, Text, Text, IntWritable> {ide
@override函數
public void map(LongWritable key, Text value, Context context) {oop
...... // 輸入鍵值對處理過程性能
context.write(new Text(...), new IntWritable(...));優化
}spa
}設計
Mapper類是一個泛型類,有四個形參,分別指定map函數的輸入鍵、輸入值、輸出鍵、輸出值的類型。Hadoop自己提供了一套可優化網絡序列化傳輸的基本類型,而不直接使用Java內嵌的類型。map()方法還提供Context實例用於輸出內容的寫入,將數據按照輸出鍵-值類型進行格式化便可。
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@override
public viod reduce(Text text, Iterable<IntWritable> values, Context context) {
...... // 輸入鍵值對處理過程
context.write(key, new IntWritable(...));
}
}
reduce函數也有四個形式參數類型用於指定輸入和輸出類型。reduce函數的輸入類型必須匹配map函數的輸出類型。
public class MyApplication {
public static void main(String[] args) {
// ①
Job job = new Job();
job.setJarByClass(MyApplication.class);
job.estJobName(「My Application」);
// ②
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// ③
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// ④
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// ⑤
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
① Job對象指定做業執行規範,能夠用來控制整個做業的運行。在Hadoop集羣上運行這個做業時,要把代碼打包成一個JAR文件(Hadoop在集羣上發佈這個文件)。
② 構造Job對象後,須要指定輸入和輸出數據的路徑。調用FileInputFormat類的靜態方法addInputPath()來定義輸入數據的路徑,這個路徑能夠是單個的文件、一個目錄(此時,將目錄下全部文件當作輸入)或符合特定文件模式的一系列文件。能夠屢次調用addInputPath()來實現多路徑的輸入。調用FileOutputFormat類中的靜態方法setOutputPath()來指定輸出路徑(只能有一個輸出路徑)。這個方法指定是reduce函數輸出文件的寫入目錄。在運行做業前該目錄是不該該存在的,不然Hadoop會報錯並拒絕運行做業。這種預防措施的目的是防止數據丟失。
③ 經過setMapperClass()和serReducerClass()方法指定要用的map類型和reduce類型。
④ setOutputKeyClass()和setOutputValueClass()方法控制reduce函數的輸出類型,而且必須和Reduce類產生的相匹配。map函數的輸出類型默認狀況下和reduce函數是相同的,所以若是mapper產生出和reducer相同的類型時,不須要單獨設置。若是不一樣,則必須經過setMapOutputKeyClass()和setMapOutputValueClass方法來設置map函數的輸出類型。
⑤ Job中的waitForCompletion()方法提交做業並等待執行完成。該方法惟一的參數是一個標識,指示是否已生成詳細輸出。當標識爲true時,做業會把其進度信息寫到控制檯。返回值是一個布爾值,表示執行的成敗。
Hadoop將MapReduce的輸入數據劃分紅等長的小數據塊,稱爲輸入分片(input split)或簡稱「分片」。Hadoop爲每個分片構建一個map任務,並由該任務來運行用戶自定義的map函數從而處理分片中的每條記錄。若是分片切分得過小,那麼管理分片的總時間和構建map任務的總時間將決定做業的整個執行時間。對於大多數做業來講,一個合理的分片大小趨向於HDFS的一個塊的大小,默認是128MB,能夠針對集羣調整這個默認值,或在每一個文件建立時指定。
Hadoop在存儲有輸入數據(HDFS中的數據)的節點上運行map任務,能夠得到最佳性能,由於它無需使用集羣帶寬資源。這就是所謂的「數據本地化優化」。可是,有時對於一個map任務的輸入分片來講,存儲該分片的HDFS數據塊複本的全部節點可能正在運行其餘map任務,此時做業調度須要從某一數據塊所在的機架中的一個節點上尋找一個空閒的map槽(slot)來運行該map任務分片。很是偶然的狀況下(基本不會發生),會使用其餘機架中的節點運行該map任務。
最佳分片的大小與塊大小相同是由於它是確保能夠存儲在單個節點上的最大輸入快的大小。若是分片跨越兩個數據塊,那麼對於任何一個HDFS節點,基本上都不可能存儲這兩個數據塊,所以分片中的部分數據須要經過網絡傳輸到map任務運行的節點。
map任務將其輸出寫入本地硬盤,而非HDFS。若是運行map任務的節點在將map中間結果傳送給reduce任務以前失敗,Hadoop將在另外一個節點上從新運行這個map任務以再次構建map中間結果。
reduce任務並不具有數據本地化的優點,單個reduce任務的輸入一般來自於全部mapper的輸出。對於reduce輸出的每一個HDFS塊,第一個複本存儲在本地節點上,其餘複本出於可靠性考慮存儲在其餘機架的節點中。
reduce任務的數量並不是由輸入數據的大小決定,相反是獨立指定的。若是有多個reduce任務,每一個map任務就會針對輸出進行分區(partition),即爲每一個reduce任務建一個分區。分區可由用戶定義的分區函數控制,但一般默認的partitioner經過哈希函數來分區。
Hadoop容許用戶針對map任務的輸出指定一個combiner,combiner函數的輸出做爲reduce函數的輸入。無論調用combiner多少次,reducer的輸出結果都是同樣的,combiner函數能幫助減小mapper和reducer之間的數據傳輸量。combiner函數是經過Reducer類來定義的。
Hadoop Streaming使用Unix標準流做爲Hadoop和應用程序之間的接口,適合用於文本處理。
map的輸入數據經過標準輸入流傳遞給map函數,而且是一行一行地傳輸,最後將結果行寫到標準輸出。map輸出的鍵-值對是以一個製表符分隔的行,reduce函數的輸入格式與之相同並經過標準輸入流進行傳輸。reduce函數從標準輸入流中讀取輸入行,該輸入已由Hadoop框架根據鍵排過序,最後將結果寫入標準輸出。
Streaming和Java MapReduce API設計差別:Java API控制的map函數一次只處理一條記錄,針對輸入數據中的每一條記錄,該框架均需調用Mapper的map()方法來處理;而在Streaming中,map程序能夠本身決定如何處理輸入數據。