[Hadoop in Action] 第5章 高階MapReduce

  • 連接多個MapReduce做業
  • 執行多個數據集的聯結
  • 生成Bloom filter
 
一、連接MapReduce做業
 
[順序連接MapReduce做業]
 
mapreduce-1 | mapreduce-2 | mapreduce-3 | ...
 
[具備複雜依賴的MapReduce連接]
 
     有時,在複雜數據處理任務中的子任務並非按順序運行的,所以它們的MapReduce做業不能按線性方式連接。例如,mapreduce1處理一個數據集,mapreduce2獨立處理另外一個數據集,而第3個做業mapreduce3,對前兩個做業的輸出結果作內部聯結。
 
     Hadoop有一種簡化機制,經過Job和JobControl類來管理這種(非線性)做業之間的依賴。Job對象是MapReduce做業的表現形式。Job對象的實例化可經過傳遞一個JobConf對象到做業的構造函數中來實現。除了要保持做業的配置信息外,Job還經過設定addDependingJob()方法維護做業的依賴關係。對於Job對象x和y,x.addDependingJob(y)意味着x在y完成以前不會啓動。鑑於Job對象存儲着配置和依賴信息,JobControl對象會負責管理並監視做業的執行。經過addJob()方法,你能夠爲JobControl對象添加做業。當全部做業和依賴關係添加完成後,調用JobControl的run()方法,生成一個線程來提交做業並監視其執行。JobControl有諸如allFinished()和getFailedJobs()這樣的方法來跟蹤批處理中各個做業的執行。
 
[預處理和後處理階段的連接]
 
     Hadoop在版本0.19.0中引入了ChainMapper和ChainReducer類來簡化預處理和後處理的構成。做業按序執行多個mapper來預處理數據,並在reducer以後可選地按序執行多個mapper來作數據的後處理。這一機制的優勢在於能夠將預處理和後處理步驟寫爲標準的mapper,逐個運行它們,能夠在ChainMapper和ChainReducer中調用addMapper()方法來分別組合預處理和後處理的步驟。所有預處理和後處理步驟在單一的做業中運行,不會生成中間文件,這大大減小了I/O操做。
 
     例如,有4個mapper(Map1,Map2,Map3和Map4)和一個reducer(Reduce),它們被連接爲單個MapReduce做業,順序以下:Map1 | Map2 | Reduce | Map3 | Map4
 
     這個組合中,能夠把Map2和Reduce視爲MapReduce做業的核心,在mapper和reducer之間使用標準的分區和洗牌。能夠把Map1視爲前處理步驟,而Map3和Map4做爲後處理步驟。咱們可使用driver設定這個mapper和reducer序列的構成:
 

代碼清單 用於連接MapReduce做業中mapper的driver
 
 1 Configuration conf = getConf();
 2 JobConf job = new JobConf(conf);
 3  
 4 job.setJobName("ChainJob");
 5 job.setInputFormat(TextInputFormat.class);
 6 job.setOutputFormat(TextOutputFormat.class);
 7  
 8 FileInputFormat.setInputPaths(job, in); 
 9 FileOutputFormat.setOutputPath(job, out);
10  
11  
12 JobConf map1Conf = new JobConf(false);
13 ChainMapper.addMapper(job,
14                       Map1.class,
15                       LongWritable.class,
16                       Text.class,
17                       Text.class,
18                       Text.class,
19                       true,
20                       map1Conf);
21  
22 JobConf map2Conf = new JobConf(false);
23 ChainMapper.addMapper(job,
24                       Map2.class,
25                       Text.class,
26                       Text.class,
27                       LongWritable.class,
28                       Text.class,
29                       true,
30                       map2Conf);
31  
32 JobConf reduceConf = new JobConf(false);
33 ChainReducer.setReducer(job,
34                         Reduce.class,
35                         LongWritable.class,
36                         Text.class,
37                         Text.class,
38                         Text.class,
39                         true,
40                         reduceConf);
41  
42 JobConf map3Conf = new JobConf(false);
43 ChainReducer.addMapper(job,
44                        Map3.class,
45                        Text.class,
46                        Text.class,
47                        LongWritable.class,
48                        Text.class,
49                        true,
50                        map3Conf);
51  
52 JobConf map4Conf = new JobConf(false);
53 ChainReducer.addMapper(job,
54                        Map4.class,
55                        LongWritable.class,
56                        Text.class,
57                        LongWritable.class,
58                        Text.class,
59                        true,
60                        map4Conf);
61  
62 JobClient.runJob(job);
 

 
     driver首選會設置全局的JobConf對象,包含做業名、輸入路徑及輸出路徑等。它一次性添加這個由5個步驟連接在一塊兒的做業,以步驟執行前後爲序。它用ChainMapper.addMapper()添加位於Reduce以前的全部步驟。用靜態的ChainReducer.setReducer()方法設置reducer。再用ChainReducer.addMapper()方法添加後續的步驟。全局JobConf對象經歷全部的5個add*方法。此外,每一個mapper和reducer都有一個本地JobConf對象(map1Conf、map2Conf、map3Conf、map4Conf和reduceConf),其優先級在配置各自mapper/reducer時高於全局的對象。建議本地JobConf對象採用一個新的JobConf對象,且在初始化時不設默認值——new JobConf(false)。
 
     讓咱們經過ChainMapper.addMapper()方法的簽名來詳細瞭解如何一步步地連接做業,其中ChainReducer.setReducer()的簽名和功能與ChainReducer.addMapper()相似:
 
public static <k1, v1, k2, v2> void
                                  addMapper(JobConf job,
                                                      Class <? extends Mapper<k1, v1, k2, v2>> class,
                                                      Class <? extends k1> inputKeyClass,
                                                      Class <? extends v1> inputValueClass,
                                                      Class <? extends k2> outputKeyClass,
                                                      Class <? extends v2> outputValueClass,
                                                     boolean byValue,
                                                     JobConf mapperConf)
 
     該方法有8個參數,第一個和最後一個分別爲全局和本地的JobConf對象。第二個參數klass是Mapper類,負責數據處理。對於byValue這個參數,若是確信map1的map()方法在調用OutoutCollector.collect(K k, V v)以後再也不使用k和v的內容,或者map2並不改變k和v在其上的輸入值,則能夠經過設定buValue爲false來獲取必定的性能提高;若是對Mapper的內部代碼不太瞭解,則能夠經過設定byValue爲true,確保Mapper會按預期的方式工做。餘下的4個參數inputKeyClass、inputValueClass、outputKeyClass和outputValueClass是這個Mapper類中輸入/輸出類的類型。
 
二、聯結不一樣來源數據
 
[Reduce側的聯結]
 
 
  1. 首先mapper接收的數據來自兩個文件,Customers及Orders;
  2. 在map()封裝輸入的每一個記錄後,就執行MapReduce標準的分區、洗牌和排序操做;
  3. reduce()函數接收輸入數據,並對其值進行徹底交叉乘積;
  4. 交叉乘積獲得的每一個合併結果被送入函數conbine()。
 
     Hadoop有一個名爲datajoin的contrib軟件包,在hadoop中它是一個用做數據聯結的通用框架,它的jar文件位於contrib/datajoin/hadoop-*-datajoin.jar。hadoop的datajoin軟件包有3個可供繼承和具體化的抽象類:DataJoinMapperBase、DataJoinReducerBase和TaggedMapOutput。顧名思義,MapClass會擴展DataJoinMapperBase,而Reduce類會擴展DataJoinReducerBase。Datajoin軟件包已經分別在這些基類上實現了map()和reduce方法,可用於執行聯結數據流。
 

代碼清單 來自兩個reduce側鏈接數據的內聯結
 
  1 import java.io.DataInput;
  2 import java.io.DataOutput;
  3 import java.io.IOException;
  4 import java.util.Iterator;
  5  
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.conf.Configured;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.io.Writable;
 11 import org.apache.hadoop.mapred.FileInputFormat;
 12 import org.apache.hadoop.mapred.FileOutputFormat;
 13 import org.apache.hadoop.mapred.JobClient;
 14 import org.apache.hadoop.mapred.JobConf;
 15 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
 16 import org.apache.hadoop.mapred.MapReduceBase;
 17 import org.apache.hadoop.mapred.Mapper;
 18 import org.apache.hadoop.mapred.OutputCollector;
 19 import org.apache.hadoop.mapred.Reducer;
 20 import org.apache.hadoop.mapred.Reporter;
 21 import org.apache.hadoop.mapred.TextInputFormat;
 22 import org.apache.hadoop.mapred.TextOutputFormat;
 23 import org.apache.hadoop.util.Tool;
 24 import org.apache.hadoop.util.ToolRunner;
 25  
 26 import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
 27 import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
 28 import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
 29  
 30 public class DataJoin extends Configured implements Tool {
 31  
 32     public static class MapClass extends DataJoinMapperBase {
 33  
 34         protected Text generateInputTag(String inputFile) {
 35             String datasource = inputFile.split("-")[0];
 36             return new Text(datasource);
 37         }
 38  
 39         protected Text generateGroupKey(TaggedMapOutput aRecord) {
 40             String line = ((Text) aRecord.getData()).toString();
 41             String[] tokens = line.split(",");
 42             String groupKey = tokens[0];
 43             return new Text(groupKey);
 44         }
 45  
 46         protected TaggedMapOutput generateTaggedMapOutput(Object value) {
 47             TaggedWritable retv = new TaggedWritable((Text) value);
 48             retv.setTag(this.inputTag);
 49             return retv;
 50         }
 51     }
 52  
 53     public static class Reduce extends DataJoinReducerBase {
 54  
 55         protected TaggedMapOutput combine(Object[] tags, Object[] values) {
 56             if (tags.length < 2) return null;  
 57             String joinedStr = ""; 
 58             for (int i=0; i<values.length; i++) {
 59                 if (i > 0) joinedStr += ",";
 60                 TaggedWritable tw = (TaggedWritable) values[i];
 61                 String line = ((Text) tw.getData()).toString();
 62                 String[] tokens = line.split(",", 2);
 63                 joinedStr += tokens[1];
 64             }
 65             TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
 66             retv.setTag((Text) tags[0]); 
 67             return retv;
 68         }
 69     }
 70  
 71     public static class TaggedWritable extends TaggedMapOutput {
 72  
 73         private Writable data;
 74  
 75         public TaggedWritable(Writable data) {
 76             this.tag = new Text("");
 77             this.data = data;
 78         }
 79  
 80         public Writable getData() {
 81             return data;
 82         }
 83  
 84         public void write(DataOutput out) throws IOException {
 85             this.tag.write(out);
 86             this.data.write(out);
 87         }
 88  
 89         public void readFields(DataInput in) throws IOException {
 90             this.tag.readFields(in);
 91             this.data.readFields(in);
 92         }
 93     }
 94  
 95     public int run(String[] args) throws Exception {
 96         Configuration conf = getConf();
 97  
 98         JobConf job = new JobConf(conf, DataJoin.class);
 99  
100         Path in = new Path(args[0]);
101         Path out = new Path(args[1]);
102         FileInputFormat.setInputPaths(job, in);
103         FileOutputFormat.setOutputPath(job, out);
104  
105         job.setJobName("DataJoin");
106         job.setMapperClass(MapClass.class);
107         job.setReducerClass(Reduce.class);
108  
109         job.setInputFormat(TextInputFormat.class);
110         job.setOutputFormat(TextOutputFormat.class);
111         job.setOutputKeyClass(Text.class);
112         job.setOutputValueClass(TaggedWritable.class);
113         job.set("mapred.textoutputformat.separator", ",");
114  
115         JobClient.runJob(job); 
116         return 0;
117     }
118  
119     public static void main(String[] args) throws Exception { 
120         int res = ToolRunner.run(new Configuration(),
121                                  new DataJoin(),
122                                  args);
123  
124         System.exit(res);
125     }
126 }
 

 
 
 [轉載請註明] http://www.cnblogs.com/zhengrunjian/  
相關文章
相關標籤/搜索