MapReduce編程二

(1) InputFormat接口正則表達式

用戶須要實現該接口以指定輸入文件的內容格式。該接口有兩個方法app

public interface InputFormat<K, V> {
 
     InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 
     RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;
 
}負載均衡

其中getSplits函數將全部輸入數據分紅numSplits個split,每一個split交給一個map task處理。getRecordReader函數提供一個用戶解析split的迭代器對象,它將split中的每一個record解析成key/value對。ide

Hadoop自己提供了一些InputFormat:函數

TextInputFormat
做爲默認的文件輸入格式,用於讀取純文本文件,文件被分爲一系列以LF或者CR結束的行,key是每一行的位置偏移量,是LongWritable類型的,value是每一行的內容,爲Text類型。oop

KeyValueTextInputFormat
一樣用於讀取文件,若是行被分隔符(缺省是tab)分割爲兩部分,第一部分爲key,剩下的部分爲value;若是沒有分隔符,整行做爲 key,value爲空。性能

SequenceFileInputFormat
用於讀取sequence file。 sequence file是Hadoop用於存儲數據自定義格式的binary文件。它有兩個子類:SequenceFileAsBinaryInputFormat,將 key和value以BytesWritable的類型讀出;SequenceFileAsTextInputFormat,將key和value以Text類型讀出。優化

SequenceFileInputFilter
根據filter從sequence文件中取得部分知足條件的數據,經過 setFilterClass指定Filter,內置了三種 Filter,RegexFilter取key值知足指定的正則表達式的記錄;PercentFilter經過指定參數f,取記錄行數%f==0的記錄;MD5Filter經過指定參數f,取MD5(key)%f==0的記錄。spa

NLineInputFormat
能夠將文件以行爲單位進行split,好比文件的每一行對應一個map。獲得的key是每一行的位置偏移量(LongWritable類型),value是每一行的內容,Text類型。orm

MultipleInputs

用於多個數據源的join

(2)Mapper接口
用戶需繼承Mapper接口實現本身的Mapper,Mapper中必須實現的函數是

Mapper有setup(),map(),cleanup()和run()四個方法。其中setup()通常是用來進行一些map()前的準備工做,map()則通常承擔主要的處理工做,cleanup()則是收尾工做如關閉文件或者執行map()後的K-V分發等。run()方法提供了setup->map->cleanup()的執行模板。

(3)Partitioner接口
用戶需繼承該接口實現本身的Partitioner以指定map task產生的key/value對交給哪一個reduce task處理,好的Partitioner能讓每一個reduce task處理的數據相近,從而達到負載均衡。Partitioner中需實現的函數是
getPartition(  K2   key, V2 value, int numPartitions)
該函數返回<K2 V2>對應的reduce task ID。
用戶若是不提供Partitioner,Hadoop會使用默認的(其實是個hash函數)。

Partitioner如何使用
•實現Partitioner接口覆蓋getPartition()方法
•Partitioner示例
        public static class MyPartitioner extends Partitioner<Text, Text> {
          
         @Override
            public int getPartition(Text key, Text value, int numPartitions) {
             }
 
}
Partitioner需求示例
•需求描述
•數據文件中含有省份
•須要相同的省份送到相同的Reduce裏
•從而產生不一樣的文件
•步驟
•實現Partitioner,覆蓋getPartition
•根據省份字段進行切分

(4)Combiner

combine函數把一個map函數產生的<key,value>對(多個key, value)合併成一個新的<key2,value2>. 將新的<key2,value2>做爲輸入到reduce函數中,其格式與reduce函數相同。Combiner使得map task與reduce task之間的數據傳輸量大大減少,可明顯提升性能。大多數狀況下,Combiner與Reducer相同。

什麼狀況下可使用Combiner
•能夠對記錄進行彙總統計的場景,如求和。
•求平均數的場景就不可使用了
Combiner執行時機
•運行combiner函數的時機有可能會是merge完成以前,或者以後,這個時機能夠由一個參數控制,即 min.num.spill.for.combine(default 3)
•當job中設定了combiner,而且spill數最少有3個的時候,那麼combiner函數就會在merge產生結果文件以前運行
•經過這樣的方式,就能夠在spill很是多須要merge,而且不少數據須要作conbine的時候,減小寫入到磁盤文件的數據數量,一樣是爲了減小對磁盤的讀寫頻率,有可能達到優化做業的目的。
•Combiner也有可能不執行, Combiner會考慮當時集羣的負載狀況。

Combiner如何使用
•繼承Reducer類

public static class Combiner extends Reducer<Text, Text, Text, Text> {
       public void reduce(Text key, Iterator<Text> values,
               OutputCollector<Text, Text> output, Reporter reporter)
               throws IOException {
                 }
    }

(5)Reducer接口

實現本身的Reducer,必須實現reduce函數

(6)OutputFormat
用戶經過OutputFormat指定輸出文件的內容格式,不過它沒有split。每一個reduce task將其數據寫入本身的文件,文件名爲part-nnnnn,其中nnnnn爲reduce task的ID。

public abstract class OutputFormat<K, V> {  
  /** 
   * 建立一個記錄寫入器
   */ 
  public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;   
  /** 
   * 檢查結果輸出的存儲空間是否有效
   */ 
  public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;  
  /**
   * 建立一個任務提交器
   */ 
  public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; 
}

TextOutputFormat,輸出到純文本文件,格式爲 key + " " + value。NullOutputFormat,hadoop中的/dev/null,將輸出送進黑洞。SequenceFileOutputFormat, 輸出到sequence file格式文件。MultipleSequenceFileOutputFormat, MultipleTextOutputFormat,根據key將記錄輸出到不一樣的文件。DBInputFormat和DBOutputFormat,從DB讀取,輸出到DB。

相關文章
相關標籤/搜索