http://www.cnblogs.com/zjfstudio/p/3887551.htmlhtml
Hadoop學習筆記(7) 編程
——高級編程 網絡
從前面的學習中,咱們瞭解到了MapReduce整個過程須要通過如下幾個步驟: app
1.輸入(input):將輸入數據分紅一個個split,並將split進一步拆成<key, value>。 ide
2.映射(map):根據輸入的<key, value>進生處理, 函數
3.合併(combiner):合併中間相兩同的key值。 oop
4.分區(Partition):將<key, value>分紅N分,分別送到下一環節。 post
5.化簡(Reduce):將中間結果合併,獲得最終結果 學習
6.輸出(output):負責輸入最終結果。 spa
其中第三、4步又成洗牌(shuffle)過程。
從前面HelloWorld示例中,咱們看到,咱們只去個性化了Map和Reduce函數,那其餘函數呢,是否能夠個性化?答案固然是確定的。下面咱們就對每一個環節的個性化進行介紹。
自定義輸入格式
輸 入格式(InputFormat)用於描述整個MapReduce做業的數據輸入規範。先對輸入的文件進行格式規範檢查,如輸入路徑,後綴等檢查;而後對 數據文件進行輸入分塊(split);再對數據塊逐一讀出;最後轉換成Map所須要的<key, value>健值對。
系統中提供豐富的預置輸入格式。最經常使用的如下兩種:
TextInputFormat:系統默認的數據輸入格式。將文件分塊,並逐行讀入,每一行記錄行成一對<key, value>。其中,key值爲當前行在整個文件中的偏移量,value值爲這一行的文本內容。
KeyValueTextInputFormat:這是另外一個經常使用的數據輸入格式,讀入的文本文件內容要求是以<key, value>形式。讀出的結果也就直接造成<key, value>送入map函數中。
若是選擇輸入格式呢?那就只要在job函數中調用
-
job.setInputFormatClass(TextInputFormat.class);
在Hello中咱們沒有設定,系統默認選擇了TextInputFormat。
通常狀況夠用了,但某些狀況下,仍是沒法知足用戶的需求,因此仍是須要個性化。個性化則按下面的方式進行:
若是數據咱們是來源於文件,則能夠繼承FileInputFormat:
-
public class MyInputFormat extends FileInputFormat<Text,Text> {
-
@Override
-
public RecordReader<Text, Text> createRecordReader(InputSplit split,
-
TaskAttemptContext context) throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
}
若是數據咱們是來源於非文件,如關係數據,則繼承
-
public class MyInputFormat extends InputFormat<Text,Text> {
-
-
@Override
-
public RecordReader<Text, Text> createRecordReader(InputSplit arg0,
-
TaskAttemptContext arg1) throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
@Override
-
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
-
InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
}
這裏比較清晰了,下面個函數爲拆分紅split,上面個函數跟據split輸出成Key,value。
自定義map處理
這個好理解,咱們的HelloWorld程序中就自定義了map處理函數。而後在job中指定了咱們的處理類:
-
job.setMapperClass(TokenizerMapper.class);
能不能沒有map呢? 能夠的,若是沒有map,也就是這與上面的這個setMapperClass,則系統自動指定一個null,這時處理是將輸入的<key,value>值,不做任何修改,直接送到下一環節中。
個性化代碼以下:
-
public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
-
context.write(key, value);
-
}
-
}
自定義合併Combiner
自定義合併Combiner類,主要目的是減小Map階段輸出中間結果的數據量,下降數據的網絡傳輸開銷。
Combine 過程,實際跟Reduce過程類似,只是執行不一樣,Reduce是在Reducer環節運行,而Combine是緊跟着Map以後,在同一臺機器上預先將 結時進行一輪合併,以減小送到Reducer的數據量。因此在HelloWorld時,能夠看到,Combiner和Reducer用的是同一個類:
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
如何個性化呢,這個跟Reducer差很少了:
-
public static class MyCombiner
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
-
context.write(key, new IntWritable(1));
-
}
-
}
自定義分區Partitioner
在 MapReduce程序中,Partitioner決定着Map節點的輸出將被分區到哪一個Reduce節點。而默認的Partitioner是 HashPartitioner,它根據每條數據記錄的主健值進行Hash操做,得到一個非負整數的Hash碼,而後用當前做業的Reduce節點數取模 運算,有N個結點的話,就會平均分配置到N個節點上,一個隔一個依次。大多狀況下這個平均分配是夠用了,但也會有一些特殊狀況,好比某個文件的,不能被拆 開到兩個結點中,這樣就須要個性化了。
個性化方式以下:
-
public static class MyPartitioner
-
extends HashPartitioner<K,V> {
-
-
public void getPartition(K key, V value,int numReduceTasks) {
-
-
super.getPartition(key,value,numReduceTasks);
-
}
-
}
方式其實就是在執行以前能夠改變一下key,來欺騙這個hash表。
自定義化簡(Reducer)
這一塊是將Map送來的結果進行化簡處理,並造成最終的輸出值。與前面map同樣,在HelloWorld中咱們就見到過了。經過下面代碼能夠設置其值:
-
job.setReducerClass(IntSumReducer.class);
一樣,也能夠這樣類能夠不設置,若是不設置的話,就是把前面送來的值,直接送向輸出格式器中。
若是要個性化,則以下:
-
public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
context.write(key, result);
-
}
-
}
自定義輸出格式
數 據輸出格式(OutPutFormat)用於描述MapReduce做業的數據輸出規範。Hadoop提供了豐富的內置數據輸出格式。最常的數據輸出格式 是TextOutputFormat,也是系統默認的數據輸出格式,將結果以"key+\t+value"的形式逐行輸出到文本文件中。還有其它的, 如:DBOutputFormat,FileOutputFormat,FilterOutputFormat,IndexUpdataOutputFormat,LazyOutputFormat,MapFileOutputFormat, 等等。
若是要個性化,則按下面方式進行:
-
public class MyOutputFormat extends OutputFormat<Text,Text> {
-
-
@Override
-
public void checkOutputSpecs(JobContext arg0) throws IOException,
-
InterruptedException {
-
// TODO Auto-generated method stub
-
-
}
-
-
@Override
-
public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
-
throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
@Override
-
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext arg0)
-
throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
}
複合健——用戶自定義類型。
從前面的整個過程當中能夠看到,都是採用key-value的方式進行傳入傳出,而這些類型大可能是單一的字符串,和整型。若是個人key中須要包含多個信息怎麼辦?用字符串直接拼接麼? 太不方便了,最好可以本身定義一個類,做爲這個key,這樣就方便了。
若是定義一個類做爲key 或value的類型? 有什麼要求?就是這個類型必需要繼承WritableComparable<T>這個類,因此若是要自定義一個類型則能夠這麼實現:
-
public class MyType implements WritableComparable<MyType> {
-
-
private float x,y;
-
public float GetX(){return x;}
-
public float GetY(){return y;}
-
-
@Override
-
public void readFields(DataInput in) throws IOException {
-
x = in.readFloat();
-
y = in.readFloat();
-
}
-
-
@Override
-
public void write(DataOutput out) throws IOException {
-
out.writeFloat(x);
-
out.writeFloat(y);
-
}
-
-
@Override
-
public int compareTo(MyType arg0) {
-
//輸入:-1(小於) 0(等於) 1(大於)
-
return 0;
-
}
-
}
這個示例中,咱們添加了兩個float變量:x,y 。 這個信息能過int 和out按次序進行輸入輸出。最後,再實現一個比較函數便可。
Job任務的建立
-
Job job = new Job(conf, "word count");
-
job.setJarByClass(WordCount.class);
-
job.setInputFormatClass(MyInputFormat.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setPartitionerClass(MyPartitioner.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
任務建立比較容易,其實就是new一個實例,而後把上面描述的過程類設置好,而後加上第2行中,jar包的主類,第十、11行的輸入輸出路徑。這樣就完事了。
Job任務的執行
單個任務的執行,沒有什麼問題,能夠用這個:
-
job.waitForCompletion(true);
但多個任務呢? 多個任務的話,就會造成其組織方式,有串行,有並行,有無關,有組合的,以下圖:

圖中,Job2和Job3將會等Job1執行完了再執行,且能夠同時開始,而Job4必須等Job2和Job3同時結束後才結束。
這個組合,就能夠採用這樣的代碼來實現:
-
Configuration conf = new Configuration();
-
Job job1 = new Job(conf, "job1");
-
//.. config Job1
-
Job job2 = new Job(conf, "job2");
-
//.. config Job2
-
Job job3 = new Job(conf, "job3");
-
//.. config Job3
-
Job job4 = new Job(conf, "job4");
-
//.. config Job4
-
-
//添加依賴關係
-
job2.addDependingJob(job1);
-
job3.addDependingJob(job1);
-
job4.addDependingJob(job2);
-
job4.addDependingJob(job3);
-
-
JobControl jc = new JobControl("jbo name");
-
jc.addJob(job1);
-
jc.addJob(job2);
-
jc.addJob(job3);
-
jc.addJob(job4);
-
jc.run();
總述
如今回頭看看,其實整個hadoop編程,也就是這幾塊內容了,要實現某個功能,咱們就往上面這些步驟上套,而後聯起來執行,達到咱們的目的。