[Hadoop in Action] 第7章 細則手冊

  • 向任務傳遞定製參數
  • 獲取任務待定的信息
  • 生成多個輸出
  • 與關係數據庫交互
  • 讓輸出作全局排序
 
一、向任務傳遞做業定製的參數
 
     在編寫Mapper和Reducer時,一般會想讓一些地方能夠配置。例如第5章的聯結程序被固定地寫爲取第一個數據列做爲聯結鍵。若是用戶能夠在運行時指定某個列做爲聯結鍵,就會讓程序更具普適性。hadoop自身使用一個配置對象來存儲全部做業的配置屬性。你也可使用這個對象將參數傳遞到Mapper和Reducer。
 
     咱們已經知道MapReduce的driver是如何用屬性來配置JobConf對象的,這些屬性包括輸入格式、輸出格式、Mapper類等。若要引入本身的屬性,須要在這個配置對象中,給屬性一個惟一的名稱並設置它的值。這個配置對象會被傳遞給全部的TaskTracker,而後做業中的全部任務就可以看到配置對象中的屬性。Mapper和Reducer也就能夠讀取該配置對象並獲取它的屬性值。
 
     Configuration類(JobConf的父類)有許多通用的setter方法。屬性採用鍵/值對的形式,鍵必須是一個String,而值能夠是經常使用類型的任意一個。經常使用setter方法的簽名爲:
     public void set(String name, String value);
     public void setBoolean(String name, Boolean value);
     public void setInt(String name, Int value);
     public void setLong(String name, Long value);
     public void setStrings(String name, String... values);
請注意在hadoop內部,全部的屬性都存爲字符串。在set(String, String)方法以外的全部其餘方法都是它的便捷方法。
 
     Driver會首先設置配置對象中的屬性,讓它們在全部任務中可見。Mapper和Reducer能夠訪問configure()方法中的配置對象。任務初始化時會調用configure(),它已經被覆寫爲能夠提取和存儲你設置的屬性。以後,map()和reduce()方法會訪問這些屬性的副本。示例,調用新的屬性myjob.myproperty,用一個由用戶指定的整數值:
     public int run(String[] args) throws Exception {
          Configuration conf = getConf();
          JobConf job = new JobConf(conf, MyJob.class);
          ...
          job.setInt(「myjob.myproperty」, Integer.parseInt(args[2]));
          JobClient.runJob(job);
          return 0;
     }
 
在MapClass中,configure()方法取出屬性值,並將它存儲在對象的範圍中。Configuration類的getter方法須要指定默認的值,若是所請求的屬性未在配置對象中設置,就會返回默認值。在這個例子中,咱們取默認值爲0:
     public static class MapClass extends MapReduceBase
          implements Mapper<Text, Text, Text, Text> {
          int myproperty;
          public void configure(JobConf job) {
               myproperty = job.getInt(「myjob.myproperty」,0);
          }
          ...
     }
 
若是你但願在Reducer中使用該屬性,Reducer也必須檢索這個屬性:
     public static class Reduce extends MapReduceBase
          implements Reducer<Text, Text, Text, Text> {
          int myproperty;
          public void configure(JobConf job) {
               myproperty = job.getInt(「myjob.myproperty」,0);
          }
          ...
     }
 
Configuration類中getter方法的列表比setter方法更長,幾乎全部的getter方法都須要將參數設置爲默認值。惟一例外是get(String),若是沒有設置特定的名稱,它就會返回null:
     public String get(String name)
     public String get(String name, String defaultValue)
     public Boolean getBoolean(String name, Boolean defaultValue)
     public float getFloat(String name, Float defaultValue)
     public Int getInt(String name, Int defaultValue)
     public Long getLong(String name, Long defaultValue)
     public String[] getStrings(String name, String... defaultValue)
 
     既然咱們的job類實現了Tool接口並使用了ToolRunner,咱們還可讓用戶直接使用通用的選項來配置定製化的屬性,方法與用戶設置hadoop的配置屬性相同:
     hadoop jar MyJob.jar MyJob -D myjob.myproperty=1 input output
 
     咱們能夠將driver中老是須要用戶經過參數來設定屬性值的那行代碼刪掉。若是在大多數時間裏默認值是可用的,這樣作會讓用戶感受更加方便。當你容許用戶設定屬性時,在driver中最好對用戶的輸入進行驗證:
     public int run(String[] args) throws Exception {
          Configuration conf = getConf();
          JobConf job = new JobConf(conf, MyJob.class);
          ...
          Int myproperty = job.getInt(「myjob.myproperty」, 0);
          if (my property < 0) {
               System.err.println(「Invalid myjob.myproperty」+myproperty);
                    System.exit(0);
          }
          JobClient.runJob(job);
          return 0;
     }
 
二、探查任務特定信息
 
     除了獲取自定義屬性和全局配置以外,咱們還可使用配置對象上的getter方法得到當前任務和做業狀態的一些信息:
     this.inputFile = job.get(「map.input.file」);    //得到當前map任務的文件路徑
     this.inputTag = generateInputTag(this.inputFile);    //在data join軟件包的DataJoinMapperBase中,configure()方法中用一個標籤來表示數據源
 
在配置對象中可得到的任務特定狀態信息:
 
屬性
類型
描述
mapred.job.id String 做業ID
mapred.jar String 做業目錄中jar的位置
job.local.dir String 做業的本地空間
mapred.tip.id String 任務ID
mapred.task.id String 任務重試ID
mapred.task.is.map Boolean 標誌量,表示是否爲一個map任務
mapred.task.partition Int 做業內部的任務ID
map.input.file String Mapper讀取的文件路徑
map.input.start Long 當前Mapper輸入分片的文件偏移量
map.input.length Long 當前Mapper輸入分片的字節數
mapred.work.output.dir String 任務的工做(即臨時)輸出目錄
 
三、劃分爲多個輸出文件
 
     在有些有些場景中,輸出多組文件或把一個數據集分爲多個數據集更爲方便。MultipleOutputFormat提供了一個建黨的方法,將類似的記錄結組爲不一樣的數據集。在寫每條記錄以前,這個OutputFormat類調用一個內部方法來肯定要寫入的文件名。更具體地說,你將擴展MultipleOutputFormat的某個特定子類,並實現generateFileNameForKeyValue()方法。你擴展的子類將決定輸出的格式,例如MultipleTextOutputFormat將輸出文本文件,而MultipleSequenceFileOutputFormat將輸出序列文件。
 
     不管哪一種狀況,你會覆寫下面的方法以返回每一個輸出鍵/值對的文件名:
     protected String generateFileNameForKeyValue(K key, V value, String name)
 

代碼清單 根據國家將專利元數據分割到多個目錄中
 
 1 import java.io.IOException;
 2 import java.util.Iterator;
 3  
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.conf.Configured;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.NullWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.mapred.FileInputFormat;
12 import org.apache.hadoop.mapred.FileOutputFormat;
13 import org.apache.hadoop.mapred.SequenceFileInputFormat;
14 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
15 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
16 import org.apache.hadoop.mapred.TextInputFormat;
17 import org.apache.hadoop.mapred.TextOutputFormat;
18 import org.apache.hadoop.mapred.JobClient;
19 import org.apache.hadoop.mapred.JobConf;
20 import org.apache.hadoop.mapred.MapReduceBase;
21 import org.apache.hadoop.mapred.Mapper;
22 import org.apache.hadoop.mapred.OutputCollector;
23 import org.apache.hadoop.mapred.Reducer;
24 import org.apache.hadoop.mapred.Reporter;
25 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
26 import org.apache.hadoop.util.Tool;
27 import org.apache.hadoop.util.ToolRunner;
28  
29  
30 public class MultiFile extends Configured implements Tool {
31  
32     public static class MapClass extends MapReduceBase
33         implements Mapper<LongWritable, Text, NullWritable, Text> {
34  
35         public void map(LongWritable key, Text value,
36                         OutputCollector<NullWritable, Text> output,
37                         Reporter reporter) throws IOException {
38  
39             output.collect(NullWritable.get(), value);
40         }
41     }
42  
43     public static class PartitionByCountryMTOF
44         extends MultipleTextOutputFormat<NullWritable,Text>
45     {
46         protected String generateFileNameForKeyValue(NullWritable key,
47                                                      Text value,
48                                                      String inputfilename)
49         {
50             String[] arr = value.toString().split(",", -1);
51             String country = arr[4].substring(1,3);
52             return country+"/"+inputfilename;
53         }
54     }
55  
56     public int run(String[] args) throws Exception {
57         // Configuration processed by ToolRunner
58         Configuration conf = getConf();
59  
60         // Create a JobConf using the processed conf
61         JobConf job = new JobConf(conf, MultiFile.class);
62  
63         // Process custom command-line options
64         Path in = new Path(args[0]);
65         Path out = new Path(args[1]);
66         FileInputFormat.setInputPaths(job, in);
67         FileOutputFormat.setOutputPath(job, out);
68  
69         // Specify various job-specific parameters     
70         job.setJobName("MultiFile");
71         job.setMapperClass(MapClass.class);
72  
73         job.setInputFormat(TextInputFormat.class);
74         job.setOutputFormat(PartitionByCountryMTOF.class);
75         job.setOutputKeyClass(NullWritable.class);
76         job.setOutputValueClass(Text.class);
77  
78         job.setNumReduceTasks(0);
79  
80         // Submit the job, then poll for progress until the job is complete
81         JobClient.runJob(job);
82  
83         return 0;
84     }
85  
86     public static void main(String[] args) throws Exception {
87         // Let ToolRunner handle generic command-line options 
88         int res = ToolRunner.run(new Configuration(), new MultiFile(), args);
89  
90         System.exit(res);
91     }
92 }
 

 
     MutipleOutputFormat很簡單,能夠按行拆分輸入數據,但若是想按列拆分會該怎樣作呢?咱們能夠在hadoop 0.19版本zhong引入的MutipleOutputs,以得到更強的能力。
     
     MutipleOutputs所採用的方法不一樣於MutipleOutputFormat。它不是要求給每條記錄請求文件名,而是建立多個OutputCollector,每一個OutputCollector能夠有本身的OutputFormat和鍵/值對的類型。MapReduce程序將決定如何向每一個OutputCollector輸出數據。
 

代碼清單 將輸入數據的不一樣列提取爲不一樣文件的程序
 
  1 import java.io.IOException;
  2 import java.util.Iterator;
  3  
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.conf.Configured;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.IntWritable;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.NullWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapred.FileInputFormat;
 12 import org.apache.hadoop.mapred.FileOutputFormat;
 13 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 14 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 15 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
 16 import org.apache.hadoop.mapred.TextInputFormat;
 17 import org.apache.hadoop.mapred.TextOutputFormat;
 18 import org.apache.hadoop.mapred.JobClient;
 19 import org.apache.hadoop.mapred.JobConf;
 20 import org.apache.hadoop.mapred.MapReduceBase;
 21 import org.apache.hadoop.mapred.Mapper;
 22 import org.apache.hadoop.mapred.OutputCollector;
 23 import org.apache.hadoop.mapred.Reducer;
 24 import org.apache.hadoop.mapred.Reporter;
 25 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
 26 import org.apache.hadoop.mapred.lib.MultipleOutputs;
 27 import org.apache.hadoop.util.Tool;
 28 import org.apache.hadoop.util.ToolRunner;
 29  
 30  
 31 public class MultiFile extends Configured implements Tool {
 32  
 33     public static class MapClass extends MapReduceBase
 34         implements Mapper<LongWritable, Text, NullWritable, Text> {
 35  
 36         private MultipleOutputs mos;
 37         private OutputCollector<NullWritable, Text> collector;
 38  
 39         public void configure(JobConf conf) {
 40             mos = new MultipleOutputs(conf);
 41         }
 42  
 43         public void map(LongWritable key, Text value,
 44                         OutputCollector<NullWritable, Text> output,
 45                         Reporter reporter) throws IOException {
 46  
 47             String[] arr = value.toString().split(",", -1);
 48             String chrono = arr[0] + "," + arr[1] + "," + arr[2];
 49             String geo    = arr[0] + "," + arr[4] + "," + arr[5];
 50  
 51             collector = mos.getCollector("chrono", reporter);
 52             collector.collect(NullWritable.get(), new Text(chrono));
 53             collector = mos.getCollector("geo", reporter);
 54             collector.collect(NullWritable.get(), new Text(geo));
 55         }
 56  
 57         public void close() throws IOException {
 58             mos.close();
 59         }
 60     }
 61  
 62     public int run(String[] args) throws Exception {
 63         // Configuration processed by ToolRunner
 64         Configuration conf = getConf();
 65  
 66         // Create a JobConf using the processed conf
 67         JobConf job = new JobConf(conf, MultiFile.class);
 68  
 69         // Process custom command-line options
 70         Path in = new Path(args[0]);
 71         Path out = new Path(args[1]);
 72         FileInputFormat.setInputPaths(job, in);
 73         FileOutputFormat.setOutputPath(job, out);
 74  
 75         // Specify various job-specific parameters     
 76         job.setJobName("MultiFile");
 77         job.setMapperClass(MapClass.class);
 78  
 79         job.setInputFormat(TextInputFormat.class);
 80 //        job.setOutputFormat(PartitionByCountryMTOF.class);
 81         job.setOutputKeyClass(NullWritable.class);
 82         job.setOutputValueClass(Text.class);
 83         job.setNumReduceTasks(0);
 84  
 85         MultipleOutputs.addNamedOutput(job,
 86                                        "chrono",
 87                                        TextOutputFormat.class,
 88                                        NullWritable.class,
 89                                        Text.class);
 90         MultipleOutputs.addNamedOutput(job,
 91                                        "geo",
 92                                        TextOutputFormat.class,
 93                                        NullWritable.class,
 94                                        Text.class);
 95  
 96         // Submit the job, then poll for progress until the job is complete
 97         JobClient.runJob(job);
 98  
 99         return 0;
100     }
101  
102     public static void main(String[] args) throws Exception {
103         // Let ToolRunner handle generic command-line options 
104         int res = ToolRunner.run(new Configuration(), new MultiFile(), args);
105  
106         System.exit(res);
107     }
108 }
 

 
四、以數據庫做爲輸入輸出
 
     雖然有可能創建一個MapReduce程序經過直接查詢數據庫來取得輸入數據,而不是從HDFS中讀取文件,但其性能不甚理想。更多時候,你須要將數據集從數據庫複製到HDFS中。你能夠很容易地經過標準的數據庫工具dump,來取得一個flat文件,而後使用HDFS的shell文件put將它上傳到HDFS中。可是有時更合理的作法是讓MapReduce程序直接寫入數據庫。
 
     DBOutputFormat是用於訪問數據庫的關鍵類。你能夠經過在DBConfiguration中的靜態方法configureDB()作到這一點:
     public static void configureDB(Jobconf job, String driverClass, String dbUrl, String userName, String passwd)
 
     以後,你要指定將寫入的表,以及那裏有哪些字段。這是經過在DBOutputForamt中的靜態setOutput()方法作到的:
     public static void setOutput(Jobconf job, String tableName, String… fieldNames)
 
     你的driver應該包含以下樣式的幾行代碼:
     conf.setOutputFormat(DBOutputFormat.class);
     DBConfiguration.configureDB(job,
                                                     「com.mysql.jdbc.Driver」,
                                                     「jdbc.mysql://db.host.com/mydb」,
                                                     「username」,
                                                     「password" ) ;
     DBOutputFormat.setOutput(job, 「Events」, 「event_id」, 「time");
 
使用DBOutputFormat將強制你輸出的鍵實現DBWritable接口。只有這個鍵會被寫入到數據庫中。一般,鍵必須實現Writable接口。在Writable中write()方法用DataOutput,而DBWritable中的write()方法用PreparedStatement。相似的,用在Writable中的readFields()方法採用DataInput,而DBWritable中的readFields()採用ResultSet。除非你打算使用DBInputFormat直接從數據庫中讀取輸入的數據,不然在DBWritable中的readFields()將永遠不會被調用。
    
     public class EventsDBWritable implements Writable, DBWritable {
          private int id;
          private long timestamp;
 
          public void write(DataOutput out) throws IOException {
               out.writeInt(id);
               out.writeLong(timestamp);
          }
 
          public void readFields(DataInput in) throws IOException {
               id = in.readInt();
               timestamp = in.readLong();
          }
 
     public void write(PreparedStatement statement) throws IOException {
               statement.setInt(1, id);
               statement.setLong(2, timestamp);
          }
 
     public void readFields(ResultSet resultSet) throws IOException {
               id = resultSet.getInt(1);
               timestamp = resultSet.getLong(2);
          }
     }
 
五、保持輸出的順序
 
     請記住MapReduce框架並不能保證reducer輸出的順序,它只是已經排序好的輸入以及reducer所執行的典型操做類型的一種副產品。對於某些應用,這種排序是沒有必要的。
 
     Partitioner的任務是肯定地爲每一個鍵分配一個reducer,相同鍵的全部記錄都結成組並在reduce階段被集中處理。Partitioner的一個重要設計需求是在reducer之間達到負載均衡。Partitioner默認使用散列函數來均勻、隨機地將鍵分配給reducer。若是視線知道鍵是大體均勻分佈的,咱們就可使用一個partitioner給每一個reducer分配一個鍵的範圍,仍然能夠確保reducer的負載是相對均衡的。
 
TotalOrderPartitioner是一個能夠保證在輸入分區之間,而不只僅是分區內部排序的partitioner。這種類利用一個排好序的分區鍵組讀取一個序列文件,並進一步將不一樣區域的鍵分配到reducer上。 
 
 [轉載請註明] http://www.cnblogs.com/zhengrunjian/ 
相關文章
相關標籤/搜索