Hadoop學習筆記(7) ——高級編程

http://www.cnblogs.com/zjfstudio/p/3887551.htmlhtml

Hadoop學習筆記(7) 編程

——高級編程 網絡

從前面的學習中,咱們瞭解到了MapReduce整個過程須要通過如下幾個步驟: app

1.輸入(input):將輸入數據分紅一個個split,並將split進一步拆成<key, value>。 ide

2.映射(map):根據輸入的<key, value>進生處理, 函數

3.合併(combiner):合併中間相兩同的key值。 oop

4.分區(Partition):將<key, value>分紅N分,分別送到下一環節。 post

5.化簡(Reduce):將中間結果合併,獲得最終結果 學習

6.輸出(output):負責輸入最終結果。 spa

其中第三、4步又成洗牌(shuffle)過程。

 

從前面HelloWorld示例中,咱們看到,咱們只去個性化了Map和Reduce函數,那其餘函數呢,是否能夠個性化?答案固然是確定的。下面咱們就對每一個環節的個性化進行介紹。

自定義輸入格式

輸 入格式(InputFormat)用於描述整個MapReduce做業的數據輸入規範。先對輸入的文件進行格式規範檢查,如輸入路徑,後綴等檢查;而後對 數據文件進行輸入分塊(split);再對數據塊逐一讀出;最後轉換成Map所須要的<key, value>健值對。

系統中提供豐富的預置輸入格式。最經常使用的如下兩種:

TextInputFormat:系統默認的數據輸入格式。將文件分塊,並逐行讀入,每一行記錄行成一對<key, value>。其中,key值爲當前行在整個文件中的偏移量,value值爲這一行的文本內容。

KeyValueTextInputFormat:這是另外一個經常使用的數據輸入格式,讀入的文本文件內容要求是以<key, value>形式。讀出的結果也就直接造成<key, value>送入map函數中。

 

若是選擇輸入格式呢?那就只要在job函數中調用

  1. job.setInputFormatClass(TextInputFormat.class);

在Hello中咱們沒有設定,系統默認選擇了TextInputFormat。

通常狀況夠用了,但某些狀況下,仍是沒法知足用戶的需求,因此仍是須要個性化。個性化則按下面的方式進行:

若是數據咱們是來源於文件,則能夠繼承FileInputFormat:

  1. public class MyInputFormat extends FileInputFormat<Text,Text> {
  2.    @Override
  3.    public RecordReader<Text, Text> createRecordReader(InputSplit split,
  4.          TaskAttemptContext context) throws IOException, InterruptedException {
  5.       // TODO Auto-generated method stub
  6.       return null;
  7.    }
  8. }

若是數據咱們是來源於非文件,如關係數據,則繼承

  1. public class MyInputFormat extends InputFormat<Text,Text> {
  2.  
  3.    @Override
  4.    public RecordReader<Text, Text> createRecordReader(InputSplit arg0,
  5.          TaskAttemptContext arg1) throws IOException, InterruptedException {
  6.       // TODO Auto-generated method stub
  7.       return null;
  8.    }
  9.  
  10.    @Override
  11.    public List<InputSplit> getSplits(JobContext arg0) throws IOException,
  12.          InterruptedException {
  13.       // TODO Auto-generated method stub
  14.       return null;
  15.    }
  16.  
  17. }

這裏比較清晰了,下面個函數爲拆分紅split,上面個函數跟據split輸出成Key,value。

 

自定義map處理

這個好理解,咱們的HelloWorld程序中就自定義了map處理函數。而後在job中指定了咱們的處理類:

  1. job.setMapperClass(TokenizerMapper.class);

能不能沒有map呢? 能夠的,若是沒有map,也就是這與上面的這個setMapperClass,則系統自動指定一個null,這時處理是將輸入的<key,value>值,不做任何修改,直接送到下一環節中。

個性化代碼以下:

  1. public static class TokenizerMapper
  2.        extends Mapper<Object, Text, Text, IntWritable>{
  3.  
  4.     public void map(Object key, Text value, Context context
  5.                     ) throws IOException, InterruptedException {
  6.  
  7.         context.write(key, value);
  8.     }
  9.   }

 

自定義合併Combiner

自定義合併Combiner類,主要目的是減小Map階段輸出中間結果的數據量,下降數據的網絡傳輸開銷。

Combine 過程,實際跟Reduce過程類似,只是執行不一樣,Reduce是在Reducer環節運行,而Combine是緊跟着Map以後,在同一臺機器上預先將 結時進行一輪合併,以減小送到Reducer的數據量。因此在HelloWorld時,能夠看到,Combiner和Reducer用的是同一個類:

  1. job.setCombinerClass(IntSumReducer.class);
  2. job.setReducerClass(IntSumReducer.class);

如何個性化呢,這個跟Reducer差很少了:

  1. public static class MyCombiner
  2.       extends Reducer<Text,IntWritable,Text,IntWritable> {
  3.  
  4.    public void reduce(Text key, Iterable<IntWritable> values,
  5.                       Context context
  6.                       ) throws IOException, InterruptedException {
  7.  
  8.      context.write(key, new IntWritable(1));
  9.    }
  10.  }

 

自定義分區Partitioner

在 MapReduce程序中,Partitioner決定着Map節點的輸出將被分區到哪一個Reduce節點。而默認的Partitioner是 HashPartitioner,它根據每條數據記錄的主健值進行Hash操做,得到一個非負整數的Hash碼,而後用當前做業的Reduce節點數取模 運算,有N個結點的話,就會平均分配置到N個節點上,一個隔一個依次。大多狀況下這個平均分配是夠用了,但也會有一些特殊狀況,好比某個文件的,不能被拆 開到兩個結點中,這樣就須要個性化了。

個性化方式以下:

  1. public static class MyPartitioner
  2.       extends HashPartitioner<K,V> {
  3.  
  4.    public void getPartition(K key, V value,int numReduceTasks) {
  5.  
  6.      super.getPartition(key,value,numReduceTasks);
  7.    }
  8.  }

方式其實就是在執行以前能夠改變一下key,來欺騙這個hash表。

 

自定義化簡(Reducer)

這一塊是將Map送來的結果進行化簡處理,並造成最終的輸出值。與前面map同樣,在HelloWorld中咱們就見到過了。經過下面代碼能夠設置其值:

  1. job.setReducerClass(IntSumReducer.class);

一樣,也能夠這樣類能夠不設置,若是不設置的話,就是把前面送來的值,直接送向輸出格式器中。

若是要個性化,則以下:

  1.   public static class IntSumReducer
  2.      extends Reducer<Text,IntWritable,Text,IntWritable> {
  3.  
  4.   public void reduce(Text key, Iterable<IntWritable> values,
  5.                      Context context
  6.                      ) throws IOException, InterruptedException {
  7.     context.write(key, result);
  8.   }
  9. }

 

自定義輸出格式

數 據輸出格式(OutPutFormat)用於描述MapReduce做業的數據輸出規範。Hadoop提供了豐富的內置數據輸出格式。最常的數據輸出格式 是TextOutputFormat,也是系統默認的數據輸出格式,將結果以"key+\t+value"的形式逐行輸出到文本文件中。還有其它的, 如:DBOutputFormat,FileOutputFormat,FilterOutputFormat,IndexUpdataOutputFormat,LazyOutputFormat,MapFileOutputFormat, 等等。

若是要個性化,則按下面方式進行:

  1. public class MyOutputFormat extends OutputFormat<Text,Text> {
  2.  
  3.    @Override
  4.    public void checkOutputSpecs(JobContext arg0) throws IOException,
  5.          InterruptedException {
  6.       // TODO Auto-generated method stub
  7.  
  8.    }
  9.  
  10.    @Override
  11.    public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
  12.          throws IOException, InterruptedException {
  13.       // TODO Auto-generated method stub
  14.       return null;
  15.    }
  16.  
  17.    @Override
  18.    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext arg0)
  19.          throws IOException, InterruptedException {
  20.       // TODO Auto-generated method stub
  21.       return null;
  22.    }
  23.  
  24. }

 

複合健——用戶自定義類型。

從前面的整個過程當中能夠看到,都是採用key-value的方式進行傳入傳出,而這些類型大可能是單一的字符串,和整型。若是個人key中須要包含多個信息怎麼辦?用字符串直接拼接麼? 太不方便了,最好可以本身定義一個類,做爲這個key,這樣就方便了。

若是定義一個類做爲key 或value的類型? 有什麼要求?就是這個類型必需要繼承WritableComparable<T>這個類,因此若是要自定義一個類型則能夠這麼實現:

  1. public class MyType implements WritableComparable<MyType> {
  2.  
  3.    private float x,y;
  4.    public float GetX(){return x;}
  5.    public float GetY(){return y;}
  6.  
  7.       @Override
  8.       public void readFields(DataInput in) throws IOException {
  9.          x = in.readFloat();
  10.          y = in.readFloat();
  11.       }
  12.  
  13.       @Override
  14.       public void write(DataOutput out) throws IOException {
  15.          out.writeFloat(x);
  16.          out.writeFloat(y);
  17.       }
  18.  
  19.       @Override
  20.       public int compareTo(MyType arg0) {
  21.          //輸入:-1(小於) 0(等於) 1(大於)
  22.          return 0;
  23.       }
  24.    }

這個示例中,咱們添加了兩個float變量:x,y 。 這個信息能過int 和out按次序進行輸入輸出。最後,再實現一個比較函數便可。

 

Job任務的建立

  1. Job job = new Job(conf, "word count");
  2.    job.setJarByClass(WordCount.class);
  3.    job.setInputFormatClass(MyInputFormat.class);
  4.    job.setMapperClass(TokenizerMapper.class);
  5.    job.setCombinerClass(IntSumReducer.class);
  6.    job.setPartitionerClass(MyPartitioner.class);
  7.    job.setReducerClass(IntSumReducer.class);
  8.    job.setOutputFormatClass(TextOutputFormat.class);
  9.    job.setOutputKeyClass(Text.class);
  10.    job.setOutputValueClass(IntWritable.class);
  11.    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  12.    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

任務建立比較容易,其實就是new一個實例,而後把上面描述的過程類設置好,而後加上第2行中,jar包的主類,第十、11行的輸入輸出路徑。這樣就完事了。

 

Job任務的執行

單個任務的執行,沒有什麼問題,能夠用這個:

  1. job.waitForCompletion(true);

但多個任務呢? 多個任務的話,就會造成其組織方式,有串行,有並行,有無關,有組合的,以下圖:

圖中,Job2和Job3將會等Job1執行完了再執行,且能夠同時開始,而Job4必須等Job2和Job3同時結束後才結束。

這個組合,就能夠採用這樣的代碼來實現:

  1. Configuration conf = new Configuration();
  2.       Job job1 = new Job(conf, "job1");
  3.       //.. config Job1
  4.       Job job2 = new Job(conf, "job2");
  5.       //.. config Job2
  6.       Job job3 = new Job(conf, "job3");
  7.       //.. config Job3
  8.       Job job4 = new Job(conf, "job4");
  9.       //.. config Job4
  10.  
  11.       //添加依賴關係
  12.       job2.addDependingJob(job1);
  13.       job3.addDependingJob(job1);
  14.       job4.addDependingJob(job2);
  15.       job4.addDependingJob(job3);
  16.  
  17.       JobControl jc = new JobControl("jbo name");
  18.       jc.addJob(job1);
  19.       jc.addJob(job2);
  20.       jc.addJob(job3);
  21.       jc.addJob(job4);
  22.       jc.run();

 

總述

如今回頭看看,其實整個hadoop編程,也就是這幾塊內容了,要實現某個功能,咱們就往上面這些步驟上套,而後聯起來執行,達到咱們的目的。

相關文章
相關標籤/搜索