Hadoop 綜合揭祕——MapReduce 基礎編程(介紹 Combine、Partitioner、WritableComparable、WritableComparator 使用方式)

前言html

本文主要介紹 MapReduce 的原理及開發,講解如何利用 Combine、Partitioner、WritableComparator等組件對數據進行排序篩選聚合分組的功能。
因爲文章是針對開發人員所編寫的,在閱讀本文前,文章假設讀者已經對Hadoop的工做原理、安裝過程有必定的瞭解,所以對Hadoop的安裝就很少做說明。請確保源代碼運行在Hadoop 2.x以上版本,並以僞分佈形式安裝以方便進行調試(單機版會對 Partitioner 功能進行限制)。
文章主要利用例子介紹如何利用 MapReduce 模仿 SQL 關係數據庫進行SELECT、WHERE、GROUP、JOIN 等操做,並對 GroupingComparator、SortComparator 等功能進行說明。
但願本篇文章能對各位的學習研究有所幫助,當中有所錯漏的地方敬請點評。java

 

目錄數據庫

1、MapReduce 工做原理簡介apache

2、MapReduce 開發實例編程

3、利用 Partitioner 控制鍵值分配緩存

4、利用 Combiner 提升系統性能服務器

5、WritableComparatable 自定義鍵值說明網絡

6、實現數據排序與分組app

7、數據集鏈接處理方式介紹框架

 

 

 

1、MapReduce 工做原理簡介

對Hadoop有興趣的朋友相信對Hadoop的主要工做原理已經有必定的認識,在講解MapReduce的程序開發前,本文先針對Mapper、Reducer、Partitioner、Combiner、Suhffle、Sort的工做原理做簡單的介紹,以幫助各位更好地瞭解後面的內容。

圖 1.1

1.1 Mapper 階段

當系統對數據進行分片後,每一個輸入分片會分配到一個Mapper任務來處理,默認狀況下系統會以HDFS的一個塊大小64M做爲分片大小,固然也能夠經過配置文件設置塊的大小。隨後Mapper節點輸出的數據將保存到一個緩衝區中(緩衝區的大小默認爲512M,可經過mapreduce.task.io.sort.mb屬性進行修改),緩衝區越大排序效率越高。當該緩衝區快要溢出時(緩衝區默認大小爲80%,可經過mapreduce.map.sort.spill.percent屬性進行修改),系統會啓動一個後臺線程,將數據傳輸到會到本地的一個文件當中。

1.2 Partitioner 階段

在Mapper完成 KEY/VALUE 格式的數據操做後,Partitioner 將會被調用,因爲真實環境中 Hadoop 可能會包含幾十個甚至上百個Reducer ,Partitioner 的主要做用就是根據自定義方式肯定數據將被傳輸到哪個Reducer進行處理。

1.3 Combiner 階段

若是系統定義了Combiner,在通過 Partitioner 排序處理後將會進行 Combiner處理。咱們能夠把 Combiner 看做爲一個小型的 Reducer ,因爲數據從 Mapper 經過網絡傳送到 Reducer ,資源開銷很大,Combiner 目的就是在數據傳送到Reducer前做出初步彙集處理,減小服務器的壓力。若是數據量太大,還能夠把 mapred.compress.map.out 設置爲 true,就能夠將數據進行壓縮。(關於數據壓縮的內容已經超越本文的討論範圍,之後會有獨立的篇章針對數據壓縮進行專題討論,敬請期待)

1.4 Shuffle 階段

在 Shuffle 階段,每一個 Reducer 會啓動 5 個線程(可經過 mapreduce.reduce.shuffle.parallelcopies 進行設置)經過HTTP協議獲取Mapper傳送過來的數據。每次數據發送到 Reducer 前,都會根據鍵先進行排序。開發人員也可經過自定義的 SortComparator 進行數據排序,也是根據 GroupComparator 按照數據的其餘特性進行分組處理,下面章節將會詳細舉例介紹。對數據進行混洗、排序完成後,將傳送到對應的Reducer進行處理。

1.5 Reducer 階段

當 Mapper 實例完成輸入的數據超過設定值後(可經過mapreduce.job.reduce.slowstart.completedmaps 進行設置), Reducer 就會開始執行。Reducer 會接收到不一樣 Mapper 任務傳來已通過排序的數據,並經過Iterable 接口進行處理。在 Partitioner 階段,系統已定義哪些數據將由個 Reducer 進行管理。當 Reducer 檢測到 KEY 時發生變化時,系統就會按照已定的規則生成一個新的 Reducer 對數據進行處理。
若是 Reducer 端接受的數據量較小,數據則可直接存儲在內存緩衝區中,方便後面的數據輸出(緩衝區大小可經過mapred.job.shuffle.input.buffer.percent 進行設置)
若是數據量超過了該緩衝區大小的必定比例(能夠經過 mapred.job.shuffle.merge.percent 進行設置),數據將會被合併後寫到磁盤中。

回到目錄

2、MapReduce 開發實例

上一章節講解了 MapReduce 的主要流程,下面將以幾個簡單的例子模仿 SQL 關係數據庫向你們介紹一下 MapReduce 的開發過程。

HDFS經常使用命令  (此處只介紹幾個經常使用命令,詳細內容可在網上查找)

  • 建立目錄    hdfs dfs -mkdir -p 【Path】 
  • 複製文件    hdfs dfs -copyFromLocal 【InputPath】【OutputPath】
  • 查看目錄    hdfs dfs -ls 【Path】
  • 運行JAR    hadoop jar 【Jar名稱】 【Main類全名稱】 【InputPath】 【OutputPath】 

2.1 使用 SELECT 獲取數據

應用場景:假設在 hdfs 文件夾 / input / 20180509 路徑的 *.dat 類型文件中存放在着大量不一樣型號的 iPhone 手機當天在不一樣地區的銷售記錄,系統想對這些記錄進行統計,計算出不一樣型號手機的銷售總數。

計算時,在Mapper中獲取每一行的信息,並把iPhone名稱做爲Key插入,把數據做爲Value插入到Context當中。
當Reducer接收到相同Key數據後,再做統一處理。

注意  :   當前例子當中  Mapper 的輸入 Key 爲  LongWritable 長類型

在此過程當中要注意幾點: 例子中 SaleManager 繼承了 org.apache.hadoop.conf.Configured 類並實現了 org.apache.hadoop.util.Tool 接口的 public static int run(Configuration conf,Tool tool, String[] args) 方法,MapReduce的相關操做都在run裏面實現。因爲 Configured 已經實現了 getConf() 與setConfig() 方法,建立Job時相關的配置信息就可經過getConf()方法讀入。

系統能夠經過如下方法註冊Mapper及Reducer處理類
Job.setMapperClass(MyMapper.class);
Job.setReducerClass(MyReducer.class);

在整個運算過程中,數據會通過篩選與計算,因此Mapper的讀入信息K1,V1與Reducer的輸出信息K3,V3不必定是同一格式。
org.apache.hadoop.mapreduce.Mapper<K1,V1,K2,V2>
org.apache.hadoop.mapreduce.Reducer<K2,V2,K3,V3>

當Mapper的輸出的鍵值類型與Reduces輸出的鍵值類型相同時,系統能夠經過下面方法設置轉出數據的格式
Job.setOutputKeyClass(K);
Job.setOutputValueClass(V);

當Mapper的輸出的鍵值類型與Reduces輸出的鍵值類型不相同時,系統則須要經過下面方法設置Mapper轉出格式
Job.setMapOutputKeyClass(K);
Job.setMapOutputValueClass(V);

 1 public class Phone {
 2     public String type;
 3     public Integer count;
 4     public String area;
 5     
 6     public Phone(String line){
 7        String[] data=line.split(",");
 8        this.type=data[0].toString();
 9        this.count=Integer.valueOf(data[1].toString());
10        this.area=data[2].toString();
11     }
12     
13     public String getType(){
14         return this.type;
15     }
16     
17     public Integer getCount(){
18         return this.count;
19     }
20     
21     public String getArea(){
22         return this.area;
23     }
24 }
25 
26 public class SaleManager extends Configured implements Tool{
27     public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
28         public void map(LongWritable key,Text value,Context context)
29             throws IOException,InterruptedException{
30             String data=value.toString();
31             Phone iPhone=new Phone(data);
32             //以iPhone型號做爲Key,數量爲做Value傳入
33             context.write(new Text(iPhone.getType()), new IntWritable(iPhone.getCount()));
34         }
35     }
36     
37     public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
38         public void reduce(Text key,Iterable<IntWritable> values,Context context)
39             throws IOException,InterruptedException{
40             int sum=0;
41             //對同一型號的iPhone數量進行統計
42             for(IntWritable val : values){
43                 sum+=val.get();
44             }
45             context.write(key, new IntWritable(sum));
46         }
47     }
48 
49     public int run(String[] arg0) throws Exception {
50         // TODO 自動生成的方法存根
51         // TODO Auto-generated method stub
52         Job job=Job.getInstance(getConf());
53         job.setJarByClass(SaleManager.class);
54         //註冊Key/Value類型爲Text
55         job.setOutputKeyClass(Text.class);
56         job.setOutputValueClass(IntWritable.class);
57         //註冊Mapper及Reducer處理類
58         job.setMapperClass(MyMapper.class);
59         job.setReducerClass(MyReducer.class);
60         //輸入輸出數據格式化類型爲TextInputFormat
61         job.setInputFormatClass(TextInputFormat.class);
62         job.setOutputFormatClass(TextOutputFormat.class);
63         //默認狀況下Reducer數量爲1個(可忽略)
64         job.setNumReduceTasks(1);
65         //獲取命令參數
66         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
67         //設置讀入文件路徑
68         FileInputFormat.setInputPaths(job,new Path(args[0]));
69         //設置轉出文件路徑
70         FileOutputFormat.setOutputPath(job,new Path(args[1]));
71         boolean status=job.waitForCompletion(true);
72         if(status)
73             return 0;
74         else
75             return 1;
76     }
77     
78     public static void main(String[] args) throws Exception{
79         Configuration conf=new Configuration();
80         ToolRunner.run(new SaleManager(), args);
81     }
82 }

計算結果

 

2.2 使用 WHERE 對數據進行篩選

在計算過程當中,並不是全部的數據都適用於Reduce的計算,因爲海量數據是經過網絡傳輸的,所消耗的 I/O 資源巨大,因此能夠嘗試在Mapper過程當中提早對數據進行篩選。以上面的數據爲例,當前系統只須要計算輸入參數地區的銷售數據。此時只須要修改一下Mapper類,重寫setup方法,經過Configuration類的 public String[] Configuration.getStrings(參數名,默認值) 方法獲取命令輸入的參數,再對數據進行篩選。

 1 public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
 2         private String area;
 3         
 4         @Override
 5         public void setup(Context context){
 6             this.area=context.getConfiguration().getStrings("area", "BeiJing")[0];
 7         }
 8         
 9         public void map(LongWritable key,Text value,Context context)
10             throws IOException,InterruptedException{
11             String data=value.toString();
12             Phone iPhone=new Phone(data);
13             if(this.area.equals(iPhone.area))
14                 context.write(new Text(iPhone.getType()), new IntWritable(iPhone.getCount()));
15         }
16     }

執行命令 hadoop jar 【Jar名稱】 【Main類全名稱】-D 【參數名=參數值】 【InputPath】 【OutputPath】
例如:hadoop jar hadoopTest-0.2.jar sun.hadoopTest.SaleManager -D area=BeiJing /tmp/input/050901 /tmp/output/050901 
此時系統將選擇 area 等於BeiJing 的數據進行統計
計算結果

回到目錄

3、利用 Partitioner 控制鍵值分配

3.1 深刻分析 Partitioner

Partitioner 類在 org.apache.hadoop.mapreduce.Partitioner 中,經過 Job.setPartitionerClass(Class<? extends Partitioner> cls) 方法可綁定自定義的 Partitioner。若用戶沒有實現自定義Partitioner 時,系統將自動綁定 Hadoop 的默認類 org.apache.hadoop.mapreduce.lib.partiton.HashPartitioner 。Partitioner 包含一個主要方法是 int getPartition(K key,V value,int numReduceTasks) ,功能是控制將哪些鍵分配到哪一個 Reducer。此方法的返回值是 Reducer 的索引值,若系統定義了4個Reducer,其返回值爲0~3。numReduceTasks 側是當前系統的 Reducer 數量,此數量可經過Job.setNumReduceTasks(int tasks) 進行設置,在僞分佈環境下,其默認值爲1。 

注意:

在單機環境下,系統只會使用一個 Reducer,這將致使 Partitioner 缺少意義,這也是在本文引言中強調要使用僞分佈環境進行調試的緣由 。

經過反編譯查看 HashPartitioner ,可見系統是經過(key.hashCode() & Interger.MAX_VALUE )%numReduceTasks 方法,根據 KEY 的 HashCode 對 Reducer 數量求餘方式,肯定數據分配到哪個 Reducer 進行處理的。但若是想根據用戶自定義的邏輯把數據分配到對應 Reducer,單依靠 HashPartitioner 是沒法實現的,此時側須要自定義 Partitioner 。

1 public class HashPartitioner<K, V> extends Partitioner<K, V> {
2 
3   public int getPartition(K key, V value, int numReduceTasks) {
4     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
5   }
6 }

3.2 自定義 Partitioner

在例子當中,假設系統須要把北、上、廣、深4個不一樣的地區的iPhone銷售狀況分別交付給不一樣 Reducer 進行統計處理。咱們能夠自定義一個 MyPartitioner, 經過 Job.setPartitionerClass( MyPartitioner.class ) 進行綁定。經過 Job.setNumReduceTasks(4) 設置4個Reducer 。以手機類型做爲KEY,把銷售數據與地區做爲VALUE。在 int getPartition(K key,V value,int numReduceTasks) 方法中,根據 VALUE 值的不一樣返回不一樣的索引值。

  1 public class Phone {
  2     public String type;
  3     public Integer count;
  4     public String area;
  5     
  6     public Phone(String line){
  7        String[] data=line.split(",");
  8        this.type=data[0].toString();
  9        this.count=Integer.valueOf(data[1].toString());
 10        this.area=data[2].toString();
 11     }
 12     
 13     public String getType(){
 14         return this.type;
 15     }
 16     
 17     public Integer getCount(){
 18         return this.count;
 19     }
 20     
 21     public String getArea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class MyPatitional extends Partitioner<Text,Text> {
 27 
 28     @Override
 29     public int getPartition(Text arg0, Text arg1, int arg2) {
 30         // TODO 自動生成的方法存根
 31         String area=arg1.toString().split(",")[0];
 32         // 根據不一樣的地區返回不一樣的索引值       
 33         if(area.contentEquals("BeiJing"))
 34             return 0;
 35         if(area.contentEquals("GuangZhou"))
 36             return 1;
 37         if(area.contentEquals("ShenZhen"))
 38             return 2;
 39         if(area.contentEquals("ShangHai"))
 40             return 3;
 41         return 0;
 42     }
 43 }    
 44 
 45 public class SaleManager extends Configured implements Tool{
 46     public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
 47         
 48         public void map(LongWritable key,Text value,Context context)
 49             throws IOException,InterruptedException{
 50             String data=value.toString();
 51             Phone iPhone=new Phone(data);
 52             context.write(new Text(iPhone.getType()), new Text(iPhone.getArea()+","+iPhone.getCount().toString()));
 53         }
 54     }
 55     
 56     public static class MyReducer extends Reducer<Text,Text,Text,IntWritable>{
 57         
 58         public void reduce(Text key,Iterable<Text> values,Context context)
 59             throws IOException,InterruptedException{
 60             int sum=0;
 61             //對同一型號的iPhone數量進行統計
 62             for(Text value : values){
 63                 String count=value.toString().split(",")[1];
 64                 sum+=Integer.valueOf(count).intValue();
 65             }
 66             context.write(key, new IntWritable(sum));
 67         }
 68     }
 69 
 70     public int run(String[] arg0) throws Exception {
 71         // TODO 自動生成的方法存根
 72          // TODO Auto-generated method stub
 73         Job job=Job.getInstance(getConf());
 74         job.setJarByClass(SaleManager.class);
 75         //註冊Key/Value類型爲Text
 76         job.setOutputKeyClass(Text.class);
 77         job.setOutputValueClass(IntWritable.class);
 78         //若Map的轉出Key/Value不相同是須要分別註冊
 79         job.setMapOutputKeyClass(Text.class);
 80         job.setMapOutputValueClass(Text.class);
 81         //註冊Mapper及Reducer處理類
 82         job.setMapperClass(MyMapper.class);
 83         job.setReducerClass(MyReducer.class);
 84         //輸入輸出數據格式化類型爲TextInputFormat
 85         job.setInputFormatClass(TextInputFormat.class);
 86         job.setOutputFormatClass(TextOutputFormat.class);
 87         //設置Reduce數量爲4個,僞分佈式狀況下不設置時默認爲1
 88         job.setNumReduceTasks(4);
 89         //註冊自定義Partitional類
 90         job.setPartitionerClass(MyPatitional.class);
 91         //獲取命令參數
 92         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
 93         //設置讀入文件路徑
 94         FileInputFormat.setInputPaths(job,new Path(args[0]));
 95         //設置轉出文件路徑
 96         FileOutputFormat.setOutputPath(job,new Path(args[1]));
 97         boolean status=job.waitForCompletion(true);
 98         if(status)
 99             return 0;
100         else
101             return 1;
102     }
103     
104     public static void main(String[] args) throws Exception{
105         Configuration conf=new Configuration();
106         ToolRunner.run(new SaleManager(), args);
107     }
108 }

計算結果


回到目錄

4、利用 Combiner 提升系統性能

在前面幾節所描述的例子當中,咱們都是把全部的數據完整發送到 Reducer 中再做統計。試想一下,在真實環境當中,iPhone 的銷售記錄數以千萬計,如此巨大的數據須要在 Mapper/Reducer 當中進行傳輸,將會耗費多少的網絡資源。這麼多年來 iPhone 出品的機型不過十多個,系統可否先針對同類的機型在Mapper端做出初步的聚合計算,再把計算結果發送到 Reducer。如此一來,傳到 Reducer 端的數據量將會大大減小,只要在適當的情形下使用將有利於系統的性能提高。
針對此類問題,Combiner 應運而生,咱們能夠把 Combiner 看做爲一個小型的 Reducer ,它的目的就是在數據傳送到Reducer前在Mapper中做出初步彙集處理,減小服務器之間的 I/O 數據傳輸壓力。Combiner 也繼承於Reducer,經過Job.setCombinerClass(Class<? extends Reducer> cls) 方法進行註冊。
下面繼續以第3節的例子做爲參考,系統想要在同一個Reducer中計算全部地區不一樣型號手機的銷售狀況。咱們能夠把地區名做爲KEY,把銷售數量和手機類型轉換成 MapWritable 做爲 VALUE。當數據輸入後,不是直接把數據傳輸到 Reducer ,而是經過Combiner 把Mapper中不一樣的型號手機的銷售數量進行聚合計算,把5種型號手機的銷售總數算好後傳輸給Reducer。在Reducer中再把來源於不一樣 Combiner 的數據進行求和,得出最後結果。

注意  :   

MapWritable 是 系統自帶的 Writable 集合類中的其中一個,它實現了  java.util.Map<Writable,Writable> 接口,以單字節充當類型數據的索引,經常使用於枚舉集合的元素。

 

  1 public class SaleManager extends Configured implements Tool{
  2     private static IntWritable TYPE=new IntWritable(0);
  3     private static IntWritable VALUE=new IntWritable(1);
  4     private static IntWritable IPHONE7=new IntWritable(2);
  5     private static IntWritable IPHONE7_PLUS=new IntWritable(3);
  6     private static IntWritable IPHONE8=new IntWritable(4);
  7     private static IntWritable IPHONE8_PLUS=new IntWritable(5);
  8     private static IntWritable IPHONEX=new IntWritable(6);
  9     
 10     public static class MyMapper extends Mapper<LongWritable,Text,Text,MapWritable>{
 11         
 12         public void map(LongWritable key,Text value,Context context)
 13             throws IOException,InterruptedException{
 14             String data=value.toString();
 15             Phone iPhone=new Phone(data);
 16             context.write(new Text(iPhone.getArea()), getMapWritable(iPhone.getType(), iPhone.getCount()));
 17         }      
 18         
 19         private MapWritable getMapWritable(String type,Integer count){
 20             Text _type=new Text(type);
 21             Text _count=new Text(count.toString());
 22             MapWritable mapWritable=new MapWritable();
 23             mapWritable.put(TYPE,_type);
 24             mapWritable.put(VALUE,_count);
 25             return mapWritable;
 26         }
 27     }
 28     
 29     public static class MyCombiner extends Reducer<Text,MapWritable,Text,MapWritable> {
 30         public void reduce(Text key,Iterable<MapWritable> values,Context context) 
 31             throws IOException, InterruptedException{
 32                int iPhone7=0;
 33                int iPhone7_PLUS=0;
 34                int iPhone8=0;
 35                int iPhone8_PLUS=0;
 36                int iPhoneX=0;
 37                //對同一個Mapper所處理的不一樣型號的手機數據進行初步統計
 38                for(MapWritable value:values){
 39                     String type=value.get(TYPE).toString();
 40                     Integer count=Integer.valueOf(value.get(VALUE).toString());
 41                     if(type.contentEquals("iPhone7"))
 42                         iPhone7+=count;
 43                     if(type.contentEquals("iPhone7_PLUS"))
 44                         iPhone7_PLUS+=count;
 45                     if(type.contentEquals("iPhone8"))
 46                         iPhone8+=count;
 47                     if(type.contentEquals("iPhone8_PLUS"))
 48                         iPhone8_PLUS+=count;
 49                     if(type.contentEquals("iPhoneX"))
 50                         iPhoneX+=count;
 51                 }                
 52                 MapWritable mapWritable=new MapWritable();
 53                 mapWritable.put(IPHONE7, new IntWritable(iPhone7));
 54                 mapWritable.put(IPHONE7_PLUS, new IntWritable(iPhone7_PLUS));
 55                 mapWritable.put(IPHONE8, new IntWritable(iPhone8));
 56                 mapWritable.put(IPHONE8_PLUS, new IntWritable(iPhone8_PLUS));
 57                 mapWritable.put(IPHONEX, new IntWritable(iPhoneX));
 58                 context.write(key,mapWritable);
 59         }
 60    }
 61     
 62     public static class MyReducer extends Reducer<Text,MapWritable,Text,Text>{
 63         public void reduce(Text key,Iterable<MapWritable> values,Context context)
 64             throws IOException,InterruptedException{
 65                int iPhone7=0;
 66                int iPhone7_PLUS=0;
 67                int iPhone8=0;
 68                int iPhone8_PLUS=0;
 69                int iPhoneX=0;
 70 
 71             //對同一地區不一樣型的iPhone數量進行統計
 72             for(MapWritable value : values){
 73                 iPhone7+=Integer.parseInt(value.get(IPHONE7).toString());
 74                 iPhone7_PLUS+=Integer.parseInt(value.get(IPHONE7_PLUS).toString());
 75                 iPhone8+=Integer.parseInt(value.get(IPHONE8).toString());
 76                 iPhone8_PLUS+=Integer.parseInt(value.get(IPHONE8_PLUS).toString());
 77                 iPhoneX+=Integer.parseInt(value.get(IPHONEX).toString());
 78             }
 79             
 80             StringBuffer data=new StringBuffer();
 81             data.append("iPhone7:"+iPhone7+"   ");
 82             data.append("iPhone7_PLUS:"+iPhone7_PLUS+"   ");
 83             data.append("iPhone8:"+iPhone8+"   ");
 84             data.append("iPhone8_PLUS:"+iPhone8_PLUS+"   ");
 85             data.append("iPhoneX:"+iPhoneX+"   ");
 86             context.write(key, new Text(data.toString()));
 87         }
 88     }
 89 
 90     public int run(String[] arg0) throws Exception {
 91         // TODO 自動生成的方法存根
 92         // TODO Auto-generated method stub
 93         Job job=Job.getInstance(getConf());
 94         job.setJarByClass(SaleManager.class);
 95         //註冊Key/Value類型爲Text
 96         job.setOutputKeyClass(Text.class);
 97         job.setOutputValueClass(Text.class);
 98         //若Map的轉出Key/Value不相同是須要分別註冊
 99         job.setMapOutputKeyClass(Text.class);
100         job.setMapOutputValueClass(MapWritable.class);
101         //註冊Mapper及Reducer處理類
102         job.setMapperClass(MyMapper.class);
103         job.setReducerClass(MyReducer.class);
104         //註冊Combiner處理類
105         job.setCombinerClass(MyCombiner.class);
106         //輸入輸出數據格式化類型爲TextInputFormat
107         job.setInputFormatClass(TextInputFormat.class);
108         job.setOutputFormatClass(TextOutputFormat.class);
109         //僞分佈式狀況下不設置時默認爲1
110         job.setNumReduceTasks(1);
111         //獲取命令參數
112         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
113         //設置讀入文件路徑
114         FileInputFormat.setInputPaths(job,new Path(args[0]));
115         //設置轉出文件路徑
116         FileOutputFormat.setOutputPath(job,new Path(args[1]));
117         boolean status=job.waitForCompletion(true);
118         if(status)
119             return 0;
120         else
121             return 1;
122     }
123     
124     public static void main(String[] args) throws Exception{
125         Configuration conf=new Configuration();
126         ToolRunner.run(new SaleManager(), args);
127     }
128 }

計算結果

回到目錄

5、WritableComparable自定義鍵值說明

5.1 Writable、Comparable、WritableComparable 之間關係

在 Mapper 與 Reducer 中使用到的鍵類型、值類型都必須實現 Writable 接口,而鍵類型側須要實現 WritableComparable,它們之間的關係以下圖:

 

Writable 接口有兩個方法

  • write(java.io.DataOutput out) 將實例的原始屬性寫到 dataOutput 輸出流中,其做用是序列化基礎數據
  • readFields(java.io.DataInput in) 從 dataInput 對象中抓取數據並從新建立 Writable

Comparable 接口中 int compareTo(object) 方法側定義了排序的方式,若是返回值爲0(判斷爲兩個對象相等),側被同一個 reduce 方法處理,一旦兩個對象不相等,系統就會生成另外一個 reduce 處理。

5.2 自定義值類型

以第三節的例子做爲討論,假設系統須要把北、上、廣、深4個不一樣的地區的iPhone銷售狀況分別交付給4個不一樣 Reducer 節點進行統計處理。使用地區 area 做爲Key,使用繼承 Writable 接口的PhoneValue做爲 Value 值類型,實現 write 方法與 readFields 方法,最後在 reduce 方法區分不一樣的型號進行計算。

  1 public class Phone {
  2     public String type;
  3     public Integer count;
  4     public String area;
  5     
  6     public Phone(String line){
  7        String[] data=line.split(",");
  8        this.type=data[0].toString().trim();
  9        this.count=Integer.valueOf(data[1].toString().trim());
 10        this.area=data[2].toString().trim();
 11     }
 12     
 13     public String getType(){
 14         return this.type;
 15     }
 16     
 17     public Integer getCount(){
 18         return this.count;
 19     }
 20     
 21     public String getArea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class PhoneValue implements Writable {
 27     public Text type=new Text();
 28     public IntWritable count=new IntWritable();
 29     public Text area=new Text();
 30     
 31     public PhoneValue(){
 32         
 33     }
 34 
 35     public PhoneValue(String type,Integer count,String area){        
 36         this.type=new Text(type);
 37         this.count=new IntWritable(count);
 38         this.area=new Text(area);
 39     }
 40     
 41     public Text getType() {
 42         return type;
 43     }
 44 
 45     public void setType(Text type) {
 46         this.type = type;
 47     }
 48 
 49     public IntWritable getCount() {
 50         return count;
 51     }
 52 
 53     public void setCount(IntWritable count) {
 54         this.count = count;
 55     }
 56 
 57     public Text getArea() {
 58         return area;
 59     }
 60 
 61     public void setArea(Text area) {
 62         this.area = area;
 63     }
 64 
 65     @Override
 66     public void readFields(DataInput arg0) throws IOException {
 67         // TODO 自動生成的方法存根
 68         this.type.readFields(arg0);
 69         this.count.readFields(arg0);
 70         this.area.readFields(arg0);
 71     }
 72 
 73     @Override
 74     public void write(DataOutput arg0) throws IOException {
 75         // TODO 自動生成的方法存根
 76         this.type.write(arg0);
 77         this.count.write(arg0);
 78         this.area.write(arg0);
 79     }
 80 }
 81 
 82 public class SaleManager extends Configured implements Tool{
 83     
 84     public static class MyMapper extends Mapper<LongWritable,Text,Text,PhoneValue>{
 85         
 86         public void map(LongWritable key,Text value,Context context)
 87             throws IOException,InterruptedException{
 88             String data=value.toString();
 89             Phone iPhone=new Phone(data);
 90             PhoneValue phoneValue=new PhoneValue(iPhone.getType(),iPhone.getCount(),iPhone.getArea());
 91             context.write(new Text(iPhone.getArea()), phoneValue);
 92         }      
 93     }
 94     
 95     public static class MyReducer extends Reducer<Text,PhoneValue,Text,Text>{
 96         Integer iPhone7=new Integer(0);
 97         Integer iPhone7_PLUS=new Integer(0);
 98         Integer iPhone8=new Integer(0);
 99         Integer iPhone8_PLUS=new Integer(0);
100         Integer iPhoneX=new Integer(0);
101         
102         public void reduce(Text key,Iterable<PhoneValue> values,Context context)
103             throws IOException,InterruptedException{
104             //對不一樣類型iPhone數量進行統計
105             for(PhoneValue phone : values){
106                 int count=phone.getCount().get();
107                 
108                 if(phone.type.toString().equals("iPhone7"))
109                     iPhone7+=count;
110                 if(phone.type.toString().equals("iPhone7_PLUS"))
111                     iPhone7_PLUS+=count;
112                 if(phone.type.toString().equals("iPhone8"))
113                     iPhone8+=count;
114                 if(phone.type.toString().equals("iPhone8_PLUS"))
115                     iPhone8_PLUS+=count;
116                 if(phone.type.toString().equals("iPhoneX"))
117                     iPhoneX+=count;
118             }
119             
120             context.write(new Text("iPhone7"), new Text(iPhone7.toString()));
121             context.write(new Text("iPhone7_PLUS"), new Text(iPhone7_PLUS.toString()));
122             context.write(new Text("iPhone8"), new Text(iPhone8.toString()));
123             context.write(new Text("iPhone8_PLUS"), new Text(iPhone8_PLUS.toString()));
124             context.write(new Text("iPhoneX"), new Text(iPhoneX.toString()));
125         }
126     }
127 
128     public int run(String[] arg0) throws Exception {
129         // TODO 自動生成的方法存根
130          // TODO Auto-generated method stub
131         Job job=Job.getInstance(getConf());
132         job.setJarByClass(SaleManager.class);
133         //註冊Key/Value類型爲Text
134         job.setOutputKeyClass(Text.class);
135         job.setOutputValueClass(Text.class);
136         //若Map的轉出Key/Value不相同是須要分別註冊
137         job.setMapOutputKeyClass(Text.class);
138         job.setMapOutputValueClass(PhoneValue.class);
139         //註冊Mapper及Reducer處理類
140         job.setMapperClass(MyMapper.class);
141         job.setReducerClass(MyReducer.class);
142         //輸入輸出數據格式化類型爲TextInputFormat
143         job.setInputFormatClass(TextInputFormat.class);
144         job.setOutputFormatClass(TextOutputFormat.class);
145         //僞分佈式狀況下不設置時默認爲1
146         job.setNumReduceTasks(4);
147         //獲取命令參數
148         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
149         //設置讀入文件路徑
150         FileInputFormat.setInputPaths(job,new Path(args[0]));
151         //設置轉出文件路徑
152         FileOutputFormat.setOutputPath(job,new Path(args[1]));
153         boolean status=job.waitForCompletion(true);
154         if(status)
155             return 0;
156         else
157             return 1;
158     }
159     
160     public static void main(String[] args) throws Exception{
161         Configuration conf=new Configuration();
162         ToolRunner.run(new SaleManager(), args);      
163     } 
164 }

計算結果與第三節相同

 

5.3 自定義鍵類型

Hadoop 經常使用的類 IntWritable、LongWritable、Text、BooleanWritable 等都實現了WritableComparable 接口,當用戶須要自定義鍵類型時,只須要實現WritableComparable接口便可。public boolean equals(Object o) 與 public int hashCode() 都是 Object 的方法,回顧本文的第三節能夠看到 hashCode 會被系統默認的 Partitioner 即 HashPartitioner 類所使用。在使用系統默認的 HashPartitioner 類時,一旦 hashCode 相等,數據將返回到同一個Reducer 節點,所以應該按業務的需求從新定義鍵類型的 hashCode。同時 equals 方法應該按照 hashCode 邏輯統一修改,避免在使用 Hash 散列時出現邏輯錯誤。

以第三節的例子做爲討論,假設系統須要把北、上、廣、深4個不一樣的地區的iPhone銷售狀況分別交付給不一樣 Reducer 節點進行統計處理,咱們只須要定義 PhoneKey 做爲鍵類型,當中包含地區號 area 和型號 type。在 hashCode 中以地區號 area 做爲指標,在 compareTo 方法中咱們以手機的類型 type 進行排序。系統就可在不一樣的 Reducer 節點中計算出同一地點不一樣類型手機的銷售狀況。

  1 public class Phone {
  2     public String type;
  3     public Integer count;
  4     public String area;
  5     
  6     public Phone(String line){
  7        String[] data=line.split(",");
  8        this.type=data[0].toString().trim();
  9        this.count=Integer.valueOf(data[1].toString().trim());
 10        this.area=data[2].toString().trim();
 11     }
 12     
 13     public String getType(){
 14         return this.type;
 15     }
 16     
 17     public Integer getCount(){
 18         return this.count;
 19     }
 20     
 21     public String getArea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class PhoneKey implements WritableComparable<PhoneKey> {
 27     public Text type=new Text();
 28     public Text area=new Text();
 29     
 30     public PhoneKey(){
 31         
 32     }
 33 
 34     public PhoneKey(String type,String area){        
 35         this.type=new Text(type);
 36         this.area=new Text(area);
 37     }
 38     
 39     public Text getType() {
 40         return type;
 41     }
 42 
 43     public void setType(Text type) {
 44         this.type = type;
 45     }
 46 
 47     public Text getArea() {
 48         return area;
 49     }
 50 
 51     public void setArea(Text area) {
 52         this.area = area;
 53     }
 54 
 55     @Override
 56     public void readFields(DataInput arg0) throws IOException {
 57         // TODO 自動生成的方法存根
 58         this.type.readFields(arg0);
 59         this.area.readFields(arg0);
 60     }
 61 
 62     @Override
 63     public void write(DataOutput arg0) throws IOException {
 64         // TODO 自動生成的方法存根
 65         this.type.write(arg0);
 66         this.area.write(arg0);
 67     }
 68 
 69     @Override
 70     public int compareTo(PhoneKey o) {
 71         // TODO 自動生成的方法存根
 72         return this.type.compareTo(o.type);
 73     }
 74  
 75     @Override
 76     public boolean equals(Object o){
 77         if(!(o instanceof PhoneKey)){
 78             return false;
 79         }
 80         PhoneKey phone=(PhoneKey) o;
 81         return this.area.equals(phone.area);
 82     }
 83     
 84     @Override
 85     public int hashCode(){
 86         return this.area.hashCode();
 87     }
 88 }
 89 
 90 public class SaleManager extends Configured implements Tool{
 91     
 92     public static class MyMapper extends Mapper<LongWritable,Text,PhoneKey,IntWritable>{
 93         
 94         public void map(LongWritable key,Text value,Context context)
 95             throws IOException,InterruptedException{
 96             String data=value.toString();
 97             Phone iPhone=new Phone(data);
 98             PhoneKey phoneKey=new PhoneKey(iPhone.getType(),iPhone.getArea());
 99             context.write(phoneKey, new IntWritable(iPhone.count));
100         }      
101     }
102     
103     public static class MyReducer extends Reducer<PhoneKey,IntWritable,Text,Text>{        
104         public void reduce(PhoneKey phoneKey,Iterable<IntWritable> values,Context context)
105             throws IOException,InterruptedException{
106             String type=phoneKey.getType().toString();
107             Integer total=new Integer(0);
108             //對不一樣類型iPhone數量進行統計
109             for(IntWritable count : values){
110                 total+=count.get();
111             }            
112             context.write(new Text(type),new Text(total.toString()));
113         }
114     }
115 
116     public int run(String[] arg0) throws Exception {
117         // TODO 自動生成的方法存根
118          // TODO Auto-generated method stub
119         Job job=Job.getInstance(getConf());
120         job.setJarByClass(SaleManager.class);
121         //註冊Key/Value類型爲Text
122         job.setOutputKeyClass(Text.class);
123         job.setOutputValueClass(Text.class);
124         //若Map的轉出Key/Value不相同是須要分別註冊
125         job.setMapOutputKeyClass(PhoneKey.class);
126         job.setMapOutputValueClass(IntWritable.class);
127         //註冊Mapper及Reducer處理類
128         job.setMapperClass(MyMapper.class);
129         job.setReducerClass(MyReducer.class);
130         //輸入輸出數據格式化類型爲TextInputFormat
131         job.setInputFormatClass(TextInputFormat.class);
132         job.setOutputFormatClass(TextOutputFormat.class);
133         //僞分佈式狀況下不設置時默認爲1
134         job.setNumReduceTasks(4);
135         //獲取命令參數
136         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
137         //設置讀入文件路徑
138         FileInputFormat.setInputPaths(job,new Path(args[0]));
139         //設置轉出文件路徑
140         FileOutputFormat.setOutputPath(job,new Path(args[1]));
141         boolean status=job.waitForCompletion(true);
142         if(status)
143             return 0;
144         else
145             return 1;
146     }
147     
148     public static void main(String[] args) throws Exception{
149         Configuration conf=new Configuration();
150         ToolRunner.run(new SaleManager(), args);      
151     } 
152 }

計算結果與第三節相同

在這個例子中只是爲了讓你們更好地瞭解自定義鍵類型的使用方法,而在真實環境中,自定義鍵類型,主要做爲區分數據的標準。若是須要更好地平衡服務器資源,分配 Reducer 數據處理的負荷,仍是要經過自定義的 Partitioner 進行管理。

回到目錄

6、實現數據排序與分組處理

6.1 RawComparator 接口介紹

在實際的應用場景當中,極可能會用到第三方類庫做爲鍵類型,但咱們沒法直接對源代碼進行修改。爲此係統定義了 RawComparator 接口,假設第三方類已實現了 Writable 接口,用戶可經過自定義類實現 RawComparator 接口,經過 job.setSortComparatorClass(rawComparator.class) 設置便可。RawComparator 繼承了  java.util.Comparator 接口,並添加了 int compare(byte[] b1,int s1, int l1,byte[] b2 ,int s2, int l2) 方法。此方法最簡單的實現方式是經過 Writable 實例中的 readField 重構對象,而後使用通用類的 compareTo 完成排序。

public interface RawComparator<T> extends Comparator<T> {
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

下面例子假設 PhoneWritable 是第三方類庫中的值類型,咱們沒法直接修改,但系統須要把 PhoneWritable 用做 KEY 處理,按照不一樣地區不一樣型號進行排序計算出手機的銷售狀況。此時可創建 PhoneComparator 類並實現 RawComparator 接口,在主程序中經過 job.setSortComparatorClass(PhoneComparator.class) 設置此接口的實現類。

  1 public class Phone {
  2     public String type;
  3     public Integer count;
  4     public String area;
  5     
  6     public Phone(String line){
  7        String[] data=line.split(",");
  8        this.type=data[0].toString().trim();
  9        this.count=Integer.valueOf(data[1].toString().trim());
 10        this.area=data[2].toString().trim();
 11     }
 12     
 13     public String getType(){
 14         return this.type;
 15     }
 16     
 17     public Integer getCount(){
 18         return this.count;
 19     }
 20     
 21     public String getArea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class PhoneWritable implements Writable {
 27     public Text type=new Text();
 28     public IntWritable count=new IntWritable();
 29     public Text area=new Text();
 30     
 31     public PhoneWritable(){
 32         
 33     }
 34 
 35     public PhoneWritable(String type,Integer count,String area){        
 36         this.type=new Text(type);
 37         this.count=new IntWritable(count);
 38         this.area=new Text(area);
 39     }
 40     
 41     public Text getType() {
 42         return type;
 43     }
 44 
 45     public void setType(Text type) {
 46         this.type = type;
 47     }
 48 
 49     public IntWritable getCount() {
 50         return count;
 51     }
 52 
 53     public void setCount(IntWritable count) {
 54         this.count = count;
 55     }
 56 
 57     public Text getArea() {
 58         return area;
 59     }
 60 
 61     public void setArea(Text area) {
 62         this.area = area;
 63     }
 64 
 65     @Override
 66     public void readFields(DataInput arg0) throws IOException {
 67         // TODO 自動生成的方法存根
 68         this.type.readFields(arg0);
 69         this.count.readFields(arg0);
 70         this.area.readFields(arg0);
 71     }
 72 
 73     @Override
 74     public void write(DataOutput arg0) throws IOException {
 75         // TODO 自動生成的方法存根
 76         this.type.write(arg0);
 77         this.count.write(arg0);
 78         this.area.write(arg0);
 79     }
 80 }
 81 
 82 public class PhoneComparator implements RawComparator<PhoneWritable> {
 83     private DataInputBuffer buffer=null;
 84     private PhoneWritable phone1=null;
 85     private PhoneWritable phone2=null;
 86     
 87     public PhoneComparator(){
 88         buffer=new DataInputBuffer();
 89         phone1=new PhoneWritable();
 90         phone2=new PhoneWritable();
 91     }
 92     
 93     @Override
 94     public int compare(PhoneWritable o1, PhoneWritable o2) {
 95         // TODO 自動生成的方法存根
 96         if(!o1.getArea().equals(o2.getArea()))
 97             return o1.getArea().compareTo(o2.getArea());
 98         else
 99             return o1.getType().compareTo(o2.getType());
100     }
101 
102     @Override
103     public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
104         // TODO 自動生成的方法存根
105         try {
106             buffer.reset(arg0,arg1,arg2);
107             phone1.readFields(buffer);
108             buffer.reset(arg3,arg4,arg5);
109             phone2.readFields(buffer);
110         } catch (IOException e) {
111             // TODO 自動生成的 catch 塊
112             e.printStackTrace();
113         }
114         return this.compare(phone1, phone2);
115     }
116 
117 }
118 
119 public class SaleManager extends Configured implements Tool{
120     
121     public static class MyMapper extends Mapper<LongWritable,Text,PhoneWritable,IntWritable>{
122         
123         public void map(LongWritable key,Text value,Context context)
124             throws IOException,InterruptedException{
125             String data=value.toString();
126             Phone iPhone=new Phone(data);
127             PhoneWritable phone=new PhoneWritable(iPhone.getType(),iPhone.getCount(),iPhone.getArea());
128             context.write(phone, phone.getCount());
129         }      
130     }
131     
132     public static class MyReducer extends Reducer<PhoneWritable,IntWritable,Text,Text>{        
133         public void reduce(PhoneWritable phone,Iterable<IntWritable> values,Context context)
134             throws IOException,InterruptedException{
135             //對不一樣類型iPhone數量進行統計
136             Integer total=new Integer(0);
137             
138             for(IntWritable count : values){
139                 total+=count.get();
140             }
141             context.write(new Text(phone.getArea()+" "+phone.getType()),new Text(total.toString()));
142         }
143     }
144 
145     public int run(String[] arg0) throws Exception {
146         // TODO 自動生成的方法存根
147          // TODO Auto-generated method stub
148         Job job=Job.getInstance(getConf());
149         job.setJarByClass(SaleManager.class);
150         //註冊Key/Value類型爲Text
151         job.setOutputKeyClass(Text.class);
152         job.setOutputValueClass(Text.class);
153         //若Map的轉出Key/Value不相同是須要分別註冊
154         job.setMapOutputKeyClass(PhoneWritable.class);
155         job.setSortComparatorClass(PhoneComparator.class);
156         job.setMapOutputValueClass(IntWritable.class);
157         //註冊Mapper及Reducer處理類
158         job.setMapperClass(MyMapper.class);
159         job.setReducerClass(MyReducer.class);
160         //輸入輸出數據格式化類型爲TextInputFormat
161         job.setInputFormatClass(TextInputFormat.class);
162         job.setOutputFormatClass(TextOutputFormat.class);
163         //僞分佈式狀況下不設置時默認爲1
164         job.setNumReduceTasks(1);
165         //獲取命令參數
166         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
167         //設置讀入文件路徑
168         FileInputFormat.setInputPaths(job,new Path(args[0]));
169         //設置轉出文件路徑
170         FileOutputFormat.setOutputPath(job,new Path(args[1]));
171         boolean status=job.waitForCompletion(true);
172         if(status)
173             return 0;
174         else
175             return 1;
176     }
177     
178     public static void main(String[] args) throws Exception{
179         Configuration conf=new Configuration();
180         ToolRunner.run(new SaleManager(), args);      
181     } 
182 }

計算結果

 

6.2 WritableComparator 類介紹

WritableComparator 是系統自帶的接口 RawComparartor 實現類,它實現了 RawComparator 接口的兩個基礎方法 int compare(object , object ) 與 int compare(byte[] b1,int s1, int l1,byte[] b2 ,int s2, int l2)

經過反編譯查看源代碼可知道,系統也是經過 WritableComparable 接口的 readField 方法重構對象,而後調用 int compareTo (WritableComparable,WritableComparable) 方法完成排序的。所以通常狀況下咱們在繼承 WritableComparator 類實現排序時,只須要重構此方法實現業務邏輯便可。

 

6.3 利用 WritableComparator 實現數據排序

假設系統原有的鍵類型 PhoneKey 是以手機類型 type做爲排序標準,如今咱們須要經過 WritableComparator 把排序標準修改成按先按地區 area 再按類型 type 排序。按照第上節所述,咱們只須要繼承 WritableComparator 類,重寫 int compareTo (WritableComparable,WritableComparable),按照地區 area 及 類型 type 進行排序,最後使用 job.setSortComparatorClass(Class<? extends RawComparator> cls) 設置排序方式便可。

  1 public class Phone {
  2     public String type;
  3     public Integer count;
  4     public String area;
  5     
  6     public Phone(String line){
  7        String[] data=line.split(",");
  8        this.type=data[0].toString().trim();
  9        this.count=Integer.valueOf(data[1].toString().trim());
 10        this.area=data[2].toString().trim();
 11     }
 12     
 13     public String getType(){
 14         return this.type;
 15     }
 16     
 17     public Integer getCount(){
 18         return this.count;
 19     }
 20     
 21     public String getArea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class PhoneKey implements WritableComparable<PhoneKey> {
 27     public Text type=new Text();
 28     public Text area=new Text();
 29     
 30     public PhoneKey(){
 31         
 32     }
 33 
 34     public PhoneKey(String type,String area){        
 35         this.type=new Text(type);
 36         this.area=new Text(area);
 37     }
 38     
 39     public Text getType() {
 40         return type;
 41     }
 42 
 43     public void setType(Text type) {
 44         this.type = type;
 45     }
 46 
 47     public Text getArea() {
 48         return area;
 49     }
 50 
 51     public void setArea(Text area) {
 52         this.area = area;
 53     }
 54 
 55     @Override
 56     public void readFields(DataInput arg0) throws IOException {
 57         // TODO 自動生成的方法存根
 58         this.type.readFields(arg0);
 59         this.area.readFields(arg0);
 60     }
 61 
 62     @Override
 63     public void write(DataOutput arg0) throws IOException {
 64         // TODO 自動生成的方法存根
 65         this.type.write(arg0);
 66         this.area.write(arg0);
 67     }
 68 
 69     @Override
 70     public int compareTo(PhoneKey o) {
 71         // TODO 自動生成的方法存根
 72         return this.type.compareTo(o.type);
 73     }
 74  
 75     @Override
 76     public boolean equals(Object o){
 77         if(!(o instanceof PhoneKey)){
 78             return false;
 79         }
 80         PhoneKey phone=(PhoneKey) o;
 81         return this.area.equals(phone.area);
 82     }
 83     
 84     @Override
 85     public int hashCode(){
 86         return this.area.hashCode();
 87     }
 88 }
 89 
 90 public class PhoneSortComparator extends WritableComparator{
 91 
 92     public PhoneSortComparator(){
 93         super(PhoneKey.class,true);
 94     }
 95     
 96     @Override
 97     public int compare(WritableComparable a,WritableComparable b){
 98         PhoneKey key1=(PhoneKey) a;
 99         PhoneKey key2=(PhoneKey) b;
100         if(!key1.getArea().equals(key2.getArea()))
101             return key1.getArea().compareTo(key2.getArea());
102         else
103             return key1.getType().compareTo(key2.getType());
104     }
105 }
106 
107 public class SaleManager extends Configured implements Tool{
108     
109     public static class MyMapper extends Mapper<LongWritable,Text,PhoneKey,IntWritable>{
110         
111         public void map(LongWritable key,Text value,Context context)
112             throws IOException,InterruptedException{
113             String data=value.toString();
114             Phone iPhone=new Phone(data);
115             PhoneKey phone=new PhoneKey(iPhone.getType(),iPhone.getArea());
116             context.write(phone, new IntWritable(iPhone.getCount()));
117         }      
118     }
119     
120     public static class MyReducer extends Reducer<PhoneKey,IntWritable,Text,Text>{        
121         public void reduce(PhoneKey phone,Iterable<IntWritable> values,Context context)
122             throws IOException,InterruptedException{
123             //對不一樣類型iPhone數量進行統計
124             Integer total=new Integer(0);
125             
126             for(IntWritable count : values){
127                 total+=count.get();
128             }
129             context.write(new Text(phone.getArea()+" "+phone.getType()+": "),new Text(total.toString()));
130         }
131     }
132 
133     public int run(String[] arg0) throws Exception {
134         // TODO 自動生成的方法存根
135          // TODO Auto-generated method stub
136         Job job=Job.getInstance(getConf());
137         job.setJarByClass(SaleManager.class);
138         //註冊Key/Value類型爲Text
139         job.setOutputKeyClass(Text.class);
140         job.setOutputValueClass(Text.class);
141         //若Map的轉出Key/Value不相同是須要分別註冊
142         job.setMapOutputKeyClass(PhoneKey.class);
143         job.setMapOutputValueClass(IntWritable.class);
144         //設置排序類型 SortComparator
145         job.setSortComparatorClass(PhoneSortComparator.class);
146         //註冊Mapper及Reducer處理類
147         job.setMapperClass(MyMapper.class);
148         job.setReducerClass(MyReducer.class);
149         //輸入輸出數據格式化類型爲TextInputFormat
150         job.setInputFormatClass(TextInputFormat.class);
151         job.setOutputFormatClass(TextOutputFormat.class);
152         //僞分佈式狀況下不設置時默認爲1
153         job.setNumReduceTasks(1);
154         //獲取命令參數
155         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
156         //設置讀入文件路徑
157         FileInputFormat.setInputPaths(job,new Path(args[0]));
158         //設置轉出文件路徑
159         FileOutputFormat.setOutputPath(job,new Path(args[1]));
160         boolean status=job.waitForCompletion(true);
161         if(status)
162             return 0;
163         else
164             return 1;
165     }
166     
167     public static void main(String[] args) throws Exception{
168         Configuration conf=new Configuration();
169         ToolRunner.run(new SaleManager(), args);      
170     } 
171 }

從計算結果能夠看到數據是先按照地區 area 再按手機型號 type 進行排序的

 

6.4 利用 WritableComparator 實現數據分組

在 6.3 節的例子中,數據是先按照地區號 area 再按手機類型 type 進行排序的,所以在 reduce 方法中根據 Iterable 集合計算出來的將會同一地區同一類型的手機。若此時須要對同一地區全部手機類型的銷售狀況進行合計,可使用 GroupingComparator 分組計算方式 。其方法是繼承 WritableComparator 類,重寫 int compareTo (WritableComparable,WritableComparable),以地區號 area 做爲分組標識,最後使用 job.setGroupComparatorClass(Class<? extends RawComparator> cls) 設置分組方式便可。

  1 public class Phone {
  2     public String type;
  3     public Integer count;
  4     public String area;
  5     
  6     public Phone(String line){
  7        String[] data=line.split(",");
  8        this.type=data[0].toString().trim();
  9        this.count=Integer.valueOf(data[1].toString().trim());
 10        this.area=data[2].toString().trim();
 11     }
 12     
 13     public String getType(){
 14         return this.type;
 15     }
 16     
 17     public Integer getCount(){
 18         return this.count;
 19     }
 20     
 21     public String getArea(){
 22         return this.area;
 23     }
 24 }
 25 
 26 public class PhoneKey implements WritableComparable<PhoneKey> {
 27     public Text type=new Text();
 28     public Text area=new Text();
 29     
 30     public PhoneKey(){
 31         
 32     }
 33 
 34     public PhoneKey(String type,String area){        
 35         this.type=new Text(type);
 36         this.area=new Text(area);
 37     }
 38     
 39     public Text getType() {
 40         return type;
 41     }
 42 
 43     public void setType(Text type) {
 44         this.type = type;
 45     }
 46 
 47     public Text getArea() {
 48         return area;
 49     }
 50 
 51     public void setArea(Text area) {
 52         this.area = area;
 53     }
 54 
 55     @Override
 56     public void readFields(DataInput arg0) throws IOException {
 57         // TODO 自動生成的方法存根
 58         this.type.readFields(arg0);
 59         this.area.readFields(arg0);
 60     }
 61 
 62     @Override
 63     public void write(DataOutput arg0) throws IOException {
 64         // TODO 自動生成的方法存根
 65         this.type.write(arg0);
 66         this.area.write(arg0);
 67     }
 68 
 69     @Override
 70     public int compareTo(PhoneKey o) {
 71         // TODO 自動生成的方法存根
 72         return this.type.compareTo(o.type);
 73     }
 74  
 75     @Override
 76     public boolean equals(Object o){
 77         if(!(o instanceof PhoneKey)){
 78             return false;
 79         }
 80         PhoneKey phone=(PhoneKey) o;
 81         return this.area.equals(phone.area);
 82     }
 83     
 84     @Override
 85     public int hashCode(){
 86         return this.area.hashCode();
 87     }
 88 }
 89 
 90 public class PhoneSortComparator extends WritableComparator{
 91 
 92     public PhoneSortComparator(){
 93         super(PhoneKey.class,true);
 94     }
 95     
 96     @Override
 97     public int compare(WritableComparable a,WritableComparable b){
 98         PhoneKey key1=(PhoneKey) a;
 99         PhoneKey key2=(PhoneKey) b;
100         if(!key1.getArea().equals(key2.getArea()))
101             return key1.getArea().compareTo(key2.getArea());
102         else
103             return key1.getType().compareTo(key2.getType());
104     }
105 }
106 
107 public class PhoneGroupComparator extends WritableComparator{
108 
109     public PhoneGroupComparator(){
110         super(PhoneKey.class,true);
111     }
112     
113     @Override
114     public int compare(WritableComparable a,WritableComparable b){
115         PhoneKey key1=(PhoneKey) a;
116         PhoneKey key2=(PhoneKey) b;
117         return key1.getArea().compareTo(key2.getArea());
118     }
119 }
120 
121 public class SaleManager extends Configured implements Tool{
122     
123     public static class MyMapper extends Mapper<LongWritable,Text,PhoneKey,IntWritable>{
124         
125         public void map(LongWritable key,Text value,Context context)
126             throws IOException,InterruptedException{
127             String data=value.toString();
128             Phone iPhone=new Phone(data);
129             PhoneKey phone=new PhoneKey(iPhone.getType(),iPhone.getArea());
130             context.write(phone, new IntWritable(iPhone.getCount()));
131         }      
132     }
133     
134     public static class MyReducer extends Reducer<PhoneKey,IntWritable,Text,Text>{        
135         public void reduce(PhoneKey phone,Iterable<IntWritable> values,Context context)
136             throws IOException,InterruptedException{
137             //對不一樣類型iPhone數量進行統計
138             Integer total=new Integer(0);
139             
140             for(IntWritable count : values){
141                 total+=count.get();
142             }
143             context.write(new Text(phone.getArea()),new Text(total.toString()));
144         }
145     }
146 
147     public int run(String[] arg0) throws Exception {
148         Job job=Job.getInstance(getConf());
149         job.setJarByClass(SaleManager.class);
150         //註冊Key/Value類型爲Text
151         job.setOutputKeyClass(Text.class);
152         job.setOutputValueClass(Text.class);
153         //若Map的轉出Key/Value不相同是須要分別註冊
154         job.setMapOutputKeyClass(PhoneKey.class);
155         job.setMapOutputValueClass(IntWritable.class);
156         //設置排序類型 SortComparator
157         job.setSortComparatorClass(PhoneSortComparator.class);
158         //設置分組類型GroupComparator
159         job.setGroupingComparatorClass(PhoneGroupComparator.class);
160         //註冊Mapper及Reducer處理類
161         job.setMapperClass(MyMapper.class);
162         job.setReducerClass(MyReducer.class);
163         //輸入輸出數據格式化類型爲TextInputFormat
164         job.setInputFormatClass(TextInputFormat.class);
165         job.setOutputFormatClass(TextOutputFormat.class);
166         //僞分佈式狀況下不設置時默認爲1
167         job.setNumReduceTasks(1);
168         //獲取命令參數
169         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
170         //設置讀入文件路徑
171         FileInputFormat.setInputPaths(job,new Path(args[0]));
172         //設置轉出文件路徑
173         FileOutputFormat.setOutputPath(job,new Path(args[1]));
174         boolean status=job.waitForCompletion(true);
175         if(status)
176             return 0;
177         else
178             return 1;
179     }
180     
181     public static void main(String[] args) throws Exception{
182         Configuration conf=new Configuration();
183         ToolRunner.run(new SaleManager(), args);      
184     } 
185 }

計算結果

注意通常狀況下  job.setSortComparatorClass(Class<? extends RawComparator> cls) 與  job.setGroupComparatorClass(Class<? extends RawComparator> cls) 應該同時調用。若只設置了排序方式 SortComparator 而沒有調用  job.setGroupComparatorClass(Class<? extends RawComparator> cls) 方法,則 GroupComparator 分組方式視爲與 SortComparator 一致。
這也是 6.3 節在沒有設置 GroupComparator 的狀況下系統會按照  area 與 type 進行分組計算的緣由。

回到目錄

7、數據集鏈接處理方式介紹

在處理關係數據庫時,常常會遇到外鏈接,內鏈接等複雜數據查詢,在 Hadoop 的數據集處理上一樣會趕上相似問題。當數據源來源於不一樣數據集時,Hadoop 框架提供了2種不一樣的方法實現合併查詢。

  • Map 端鏈接查詢:當兩個數據集中有一個很是小而另外一個很是大時,咱們能夠利用 DistributeCache 作緩存處理,把較小的數據集加載到緩存,在 Map 加載較大的數據源時,從緩存中查找對應的擴展數據,一同發送到 Reduce 端。
  • Reduce 端鏈接查詢:當兩個數據集中的數據都很是大時,在 Map 端已經沒法徹底加載其中一個數據集時,咱們能夠設置不一樣的 Mapper 數據讀入類,把鏈接鍵做爲 Mapper 的輸出鍵。爲了在 Reduce 中實現鏈接,注意設置 GroupingComparator 時按須要把鏈接鍵的屬性做爲分組處理的標識,這樣就能確保兩個數據集中相同鏈接鍵的數據會被同一個 reduce 方法處理。

7.1 Map 端鏈接介紹

假設有這樣一個使用場景:在 *.gds 結尾的數據集中記錄了全部手機的編號 number、類型 type 、單價 price,因爲手機類型有限,因此數據量較小。在 *.sal 結尾的數據集中記錄了每一個客戶名稱 name 與其所購買的手機編號 number,數量 count,整體價格 total,因爲訂單數據具大,因此數據量比較龐大。此時,咱們能夠在 map 方法運行前,在 setup 方法中經過 job.addCacheFile(URI)把手機型號的數據加載到緩存當中,在讀入銷售訂單數據時,從緩存中查詢對應的手機型號,合併數據後一同發送到 Reduce 端。

*.gds 數據

*.sal 數據

  1 public class Goods {
  2     public Text number;
  3     public Text type;
  4     public IntWritable price;
  5     
  6     public Goods(String[] dataline){
  7         this.number=new Text(dataline[0]);
  8         this.type=new Text(dataline[1]);
  9         this.price=new IntWritable(Integer.valueOf(dataline[2]));
 10     }
 11 
 12     public Text getNumber() {
 13         return number;
 14     }
 15 
 16     public void setNumber(Text number) {
 17         this.number = number;
 18     }
 19 
 20     public Text getType() {
 21         return type;
 22     }
 23 
 24     public void setType(Text type) {
 25         this.type = type;
 26     }
 27 
 28     public IntWritable getPrice() {
 29         return price;
 30     }
 31 
 32     public void setPrice(IntWritable price) {
 33         this.price = price;
 34     }
 35 }
 36 
 37 public class OrderWritable implements Writable{
 38     public Text name=new Text();
 39     public Text number=new Text();
 40     public Text type=new Text();
 41     public IntWritable price=new IntWritable();
 42     public IntWritable count=new IntWritable();
 43     public IntWritable total=new IntWritable();
 44     
 45     public OrderWritable(){
 46         
 47     }
 48     
 49     public OrderWritable(String[] data,Goods goods){
 50         this.name=new Text(data[0]);
 51         this.number=new Text(data[1]);
 52         this.type=goods.getType();
 53         this.price=goods.getPrice();
 54         this.count=new IntWritable(new Integer(data[2]));
 55         this.total=new IntWritable(new Integer(data[3]));
 56     }
 57     
 58     public Text getName() {
 59         return name;
 60     }
 61     public void setName(Text name) {
 62         this.name = name;
 63     }
 64     public Text getNumber() {
 65         return number;
 66     }
 67     public void setNumber(Text number) {
 68         this.number = number;
 69     }
 70     public Text getType() {
 71         return type;
 72     }
 73     public void setType(Text type) {
 74         this.type = type;
 75     }
 76     public IntWritable getPrice() {
 77         return price;
 78     }
 79 
 80     public void setPrice(IntWritable price) {
 81         this.price = price;
 82     }
 83     public IntWritable getCount() {
 84         return count;
 85     }
 86     public void setCount(IntWritable count) {
 87         this.count = count;
 88     }
 89     public IntWritable getTotal() {
 90         return total;
 91     }
 92     public void setTotal(IntWritable total) {
 93         this.total = total;
 94     }
 95     
 96     @Override
 97     public void readFields(DataInput in) throws IOException {
 98         // TODO 自動生成的方法存根
 99         this.name.readFields(in);
100         this.number.readFields(in);
101         this.type.readFields(in);
102         this.price.readFields(in);
103         this.count.readFields(in);
104         this.total.readFields(in);
105     }
106     
107     @Override
108     public void write(DataOutput out) throws IOException {
109         // TODO 自動生成的方法存根
110         this.name.write(out);
111         this.number.write(out);
112         this.type.write(out);
113         this.price.write(out);
114         this.count.write(out);
115         this.total.write(out);
116     }
117 }
118 
119 public class MapJoinExample extends Configured implements Tool{
120     
121     public static class MyMapper extends Mapper<LongWritable,Text,Text,OrderWritable>{       
122         private Map<String,Goods> map=new HashMap<String,Goods>();
123         private Configuration conf;
124         
125         //讀取該URI下的文件,把文件中的goods放入map中存儲
126         private void read(URI uri){
127             try {
128                 FileSystem file=FileSystem.get(uri,conf);
129                 FSDataInputStream hdfsInStream = file.open(new Path(uri));
130                 InputStreamReader isr = new InputStreamReader(hdfsInStream);  
131                 BufferedReader br = new BufferedReader(isr);  
132                 String line;  
133                 while ((line = br.readLine()) != null) {  
134                     Goods goods=new Goods(line.split(","));  
135                     map.put(goods.getNumber().toString(), goods);
136                 }  
137             } catch (IOException e) {
138                 // TODO 自動生成的 catch 塊
139                 e.printStackTrace();
140             }
141         }
142         
143         public void setup(Context context){
144             //獲取配置
145             conf=context.getConfiguration();
146             //按照輸入路徑讀取文件
147             try{
148                 URI[] uris=context.getCacheFiles();
149                 if(uris[0].toString().endsWith("gds")){
150                     read(uris[0]);    
151                 }
152             }catch(Exception ex){
153                 throw new RuntimeException(ex);
154             }
155         }
156         
157         public void map(LongWritable key,Text value,Context context)
158             throws IOException,InterruptedException{
159             String[] datalist=value.toString().split(",");
160             Goods goods=map.get(datalist[1]);
161             if(goods!=null){
162                 OrderWritable order=new OrderWritable(datalist,goods);
163                 context.write(goods.getNumber(), order);
164             }
165         }      
166     }
167     
168     public static class MyReducer extends Reducer<Text,OrderWritable,Text,Text>{        
169         public void reduce(Text number,Iterable<OrderWritable> orders,Context context)
170             throws IOException,InterruptedException{
171           
172             for(OrderWritable order : orders)
173                  context.write(number,new Text(order.getName()+","+order.getType()
174                      +","+order.getPrice().toString()+","+order.getCount().toString()
175                      +","+order.getTotal().toString()));
176         }
177     }
178 
179     public int run(String[] arg0) throws Exception {
180         // TODO 自動生成的方法存根
181          // TODO Auto-generated method stub
182         Job job=Job.getInstance(getConf());
183         job.setJarByClass(MapJoinExample.class);
184         //註冊Key/Value類型爲Text
185         job.setOutputKeyClass(Text.class);
186         job.setOutputValueClass(Text.class);
187         //若Map的轉出Key/Value不相同是須要分別註冊
188         job.setMapOutputKeyClass(Text.class);
189         job.setMapOutputValueClass(OrderWritable.class);
190         //註冊Mapper及Reducer處理類
191         job.setMapperClass(MyMapper.class);
192         job.setReducerClass(MyReducer.class);
193         //輸入輸出數據格式化類型爲TextInputFormat
194         job.setInputFormatClass(TextInputFormat.class);
195         job.setOutputFormatClass(TextOutputFormat.class);
196         //獲取命令參數
197         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
198         //設置讀入文件路徑
199         FileInputFormat.setInputPaths(job,new Path(args[0]));
200         //設置轉出文件路徑
201         FileOutputFormat.setOutputPath(job,new Path(args[1]));
202         //加入緩存文件
203         job.addCacheFile(new URI(args[2]));
204         boolean status=job.waitForCompletion(true);
205         if(status)
206             return 0;
207         else
208             return 1;
209     }
210     
211     public static void main(String[] args) throws Exception{
212         Configuration conf=new Configuration();
213         ToolRunner.run(new MapJoinExample(), args);      
214     } 
215 }

執行命令 hadoop jar 【Jar名稱】 【Main類全名稱】【InputPath】 【OutputPath】 【*.gds 數據的URI】
當中最後一個參數正是 *.gds 數據集的 HDFS 路徑,可見執行結果以下:

 

7.2 Reduce 端鏈接介紹

假設有如下的一個應用場景,在 Hadoop 數據集中存儲了商品的訂單數據 *.odr ,當中包含了訂單號 orderCode,商品 goods,單價 price,數量 count,整體價格 total。在另外一個數據集中存儲了商品的送貨信息 *.snd,當中包含中訂單號 orderCode,商品 goods,送貨地址 address,收貨人 name,電話號碼 tel。因爲數據集的數據是一對一關係,因此數據量都很是具大,沒法在 Mapper 端實現緩存擴展的方式。此時,能夠經過系統提供的 MultipleInputs 類實現多個 Mapper 數據輸入,不一樣的數據格式由不一樣的 Mapper 類進行處理。經過設置 GroupingComparator 按須要把鏈接鍵的屬性做爲分組處理的標識,最後在 Reduce 端把具備相同特性的數據進行合併處理。

*.odr 訂單數據

*.snd 送貨數據

咱們把訂單號 orderCode 與商品 goods 做爲了鍵的兩個屬性,目的在排序時先按商品類型再按訂單號進行排序,而 type 值是用於區分此鍵的數據是來源於訂單數據集仍是送貨單數據集。在設置 GroupComparator 時,咱們把商品 goods 做爲標識,把同一類商品的訂單交付到同一個reduce方法中進行處理。最後經過 MultipleInputs.addInputPath (Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass) 方法綁定不一樣數據集的 Mapper 處理方式。 

  1 public class DispatchKey implements WritableComparable<DispatchKey> {
  2     
  3     public static final IntWritable TYPE_ORDER=new IntWritable(0);
  4     public static final IntWritable TYPE_SEND=new IntWritable(1);
  5     ////數據類型,當前數據類型爲訂單時爲0,當前數據類型爲送貨單時爲1
  6     public IntWritable type=new IntWritable();    
  7     public Text orderCode=new Text();
  8     public Text goods=new Text();
  9  
 10     @Override
 11     public void readFields(DataInput in) throws IOException {
 12         // TODO 自動生成的方法存根
 13         this.type.readFields(in);
 14         this.orderCode.readFields(in);
 15         this.goods.readFields(in);
 16     }
 17 
 18     @Override
 19     public void write(DataOutput out) throws IOException {
 20         // TODO 自動生成的方法存根
 21         this.type.write(out);
 22         this.orderCode.write(out);
 23         this.goods.write(out);
 24     }
 25 
 26     @Override
 27     public int compareTo(DispatchKey key) {
 28         // 先按商品類型,再按訂單號進行排序
 29         if(this.goods.equals(key.goods)){
 30             if(this.orderCode.equals(key.orderCode))
 31                 return this.type.compareTo(key.type);
 32             else
 33                 return this.orderCode.compareTo(key.orderCode);
 34         }
 35         else
 36             return this.goods.compareTo(key.goods);
 37     }
 38     
 39     @Override
 40     public boolean equals(Object o){
 41         if(!(o instanceof DispatchKey)){
 42             return false;
 43         }
 44         
 45         DispatchKey key=(DispatchKey) o;
 46         if(this.orderCode.equals(key.orderCode)&&this.goods.equals(key.orderCode)&&
 47                 (this.type.get()==key.type.get()))
 48             return true;
 49         return false;
 50     }
 51     
 52     @Override
 53     public int hashCode(){
 54         return (this.orderCode.toString()+this.goods.toString()
 55                  +this.type.toString()).hashCode();
 56     }
 57 
 58 }
 59 
 60 public class DispatchValue implements Writable {
 61     public Text order=new Text();
 62     public IntWritable price=new IntWritable();
 63     public IntWritable count=new IntWritable();
 64     public IntWritable total=new IntWritable();
 65     public Text address=new Text();
 66     public Text name=new Text();
 67     public Text tel=new Text();
 68     
 69     @Override
 70     public void readFields(DataInput in) throws IOException {
 71         // TODO 自動生成的方法存根
 72         this.order.readFields(in);
 73         this.price.readFields(in);
 74         this.count.readFields(in);
 75         this.total.readFields(in);
 76         this.address.readFields(in);
 77         this.name.readFields(in);
 78         this.tel.readFields(in);
 79     }
 80 
 81     @Override
 82     public void write(DataOutput out) throws IOException {
 83         // TODO 自動生成的方法存根
 84         this.order.write(out);
 85         this.price.write(out);
 86         this.count.write(out);
 87         this.total.write(out);
 88         this.address.write(out);
 89         this.name.write(out);
 90         this.tel.write(out);
 91     }
 92 
 93 }
 94 
 95 public class DispatchSortComparator extends WritableComparator{
 96 
 97     public DispatchSortComparator(){
 98         super(DispatchKey.class,true);
 99     }
100 }
101 
102 public class DispatchGroupComparator extends WritableComparator{
103 
104     public DispatchGroupComparator(){
105         super(DispatchKey.class,true);
106     }
107     
108     @Override
109     public int compare(WritableComparable a,WritableComparable b){
110         DispatchKey key1=(DispatchKey) a;
111         DispatchKey key2=(DispatchKey) b;
112         return key1.goods.compareTo(key2.goods);
113     }
114 }
115 
116 public class MapJoinExample extends Configured implements Tool{
117     
118     public static class OrderMapper extends Mapper<LongWritable,Text,DispatchKey,DispatchValue>{       
119         
120         public void map(LongWritable longwritable,Text text,Context context)
121             throws IOException,InterruptedException{
122             String[] data=text.toString().split(",");
123             DispatchKey key=new DispatchKey();
124             //設置KEY的類型爲訂單
125             key.type=DispatchKey.TYPE_ORDER;
126             key.orderCode=new Text(data[0]);
127             key.goods=new Text(data[1]);
128             DispatchValue value=new DispatchValue();
129             value.order=new Text(data[0]);
130             value.price=new IntWritable(new Integer(data[2]));
131             value.count=new IntWritable(new Integer(data[3]));
132             value.total=new IntWritable(new Integer(data[4]));
133             context.write(key, value);
134         }      
135     }
136     
137     public static class SendMapper extends Mapper<LongWritable,Text,DispatchKey,DispatchValue>{       
138         
139         public void map(LongWritable longwritable,Text text,Context context)
140             throws IOException,InterruptedException{
141             String[] data=text.toString().split(",");
142             DispatchKey key=new DispatchKey();
143             //設置KEY類型爲送貨單
144             key.type=DispatchKey.TYPE_SEND;
145             key.orderCode=new Text(data[0]);
146             key.goods=new Text(data[1]);
147             DispatchValue value=new DispatchValue();
148             value.order=new Text(data[0]);
149             value.address=new Text(data[2]);
150             value.name=new Text(data[3]);
151             value.tel=new Text(data[4]);
152             context.write(key, value);
153         }      
154     }
155     
156     public static class MyReducer extends Reducer<DispatchKey,DispatchValue,Text,Text>{        
157         public void reduce(DispatchKey key,Iterable<DispatchValue> values,Context context)
158             throws IOException,InterruptedException{
159             String mes="unknow";
160             String orderCode="unknow";
161             
162             for(DispatchValue value : values){
163                 //當數據爲訂單數據時進行記錄
164                 if(key.type.equals(DispatchKey.TYPE_ORDER)){
165                     orderCode=key.orderCode.toString();
166                     mes=key.goods.toString()+","+value.price.toString()+","
167                          +value.count.toString()+","+value.total.toString()+",";
168                 }//當數據爲送貨數據且訂單號相等時追加記錄
169                 else if(key.type.equals(DispatchKey.TYPE_SEND)
170                         &&key.orderCode.toString().equals(orderCode)){
171                     mes+=value.name.toString()+","+value.address.toString()+","+value.tel.toString();
172                     context.write(key.orderCode, new Text(mes));
173                     //清空記錄
174                     orderCode="unknow";
175                     mes="unkonw";
176                 }
177             }
178         }
179     }
180 
181     public int run(String[] arg0) throws Exception {
182         // TODO 自動生成的方法存根
183          // TODO Auto-generated method stub
184         Job job=Job.getInstance(getConf());
185         job.setJarByClass(MapJoinExample.class);
186         //註冊Key/Value類型爲Text
187         job.setOutputKeyClass(Text.class);
188         job.setOutputValueClass(Text.class);
189         //若Map的轉出Key/Value不相同是須要分別註冊
190         job.setMapOutputKeyClass(DispatchKey.class);
191         job.setMapOutputValueClass(DispatchValue.class);
192         //設置排序類型 SortComparator
193         job.setSortComparatorClass(DispatchSortComparator.class);
194         //設置分組類型
195         job.setGroupingComparatorClass(DispatchGroupComparator.class);
196         //註冊Mapper及Reducer處理類
197         //job.setMapperClass(OrderMapper.class);
198         job.setReducerClass(MyReducer.class);
199         //輸入輸出數據格式化類型爲TextInputFormat
200         job.setInputFormatClass(TextInputFormat.class);
201         job.setOutputFormatClass(TextOutputFormat.class);
202         //獲取命令參數
203         String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs();
204         //設置Order數據處理Mapper
205         MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,OrderMapper.class);
206         //設置Send數據處理Mapper
207         MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,SendMapper.class);
208         //設置輸出路徑
209         FileOutputFormat.setOutputPath(job, new Path(args[2]));
210         
211         boolean status=job.waitForCompletion(true);
212         if(status)
213             return 0;
214         else
215             return 1;
216     }
217     
218     public static void main(String[] args) throws Exception{
219         Configuration conf=new Configuration();
220         ToolRunner.run(new MapJoinExample(), args);      
221     } 
222 }

執行命令 hadoop jar 【Jar名稱】 【Main類全名稱】【OrderMapperInputPath】 【SendMapperInputPath】【OutputPath】 能夠到如下計算結果。

注意系統是經過 String[] args=new GenericOptionsParser(getConf(),arg0).getRemainingArgs() 來獲取輸入參數的,因此執行時注意參數的輸入順序與代碼獲取參數時保持一致。

 

回到目錄

 

 

本章小結

本章主要介紹了 MapReduce 的開發原理及應用場景,講解如何利用 Combine、Partitioner、WritableComparable、WritableComparator 等組件對數據進行排序篩選聚合分組的功能。對多數據集的鏈接查詢進行分析,介紹如何經過 Map 端與 Reduce 端對多數據集鏈接進行處理。
後面的文章將會進一步對 Apache Hive 的應用,HBase 的集成進行講解,敬請期待。
但願本篇文章能對各位的學習研究有所幫助,因爲時間比較倉促,當中有所錯漏的地方歡迎點評。

對 JAVA 開發有興趣的朋友歡迎加入QQ羣:174850571 共同探討!
對 .NET  開發有興趣的朋友歡迎加入QQ羣:230564952 共同探討 !

 

 

Hadoop 綜合揭祕

HBase 的原理與應用

MapReduce 基礎編程(介紹 Combine、Partitioner、WritableComparable、WritableComparator 使用方式)

 

做者:風塵浪子

http://www.javashuo.com/article/p-ypiyexto-da.html

原創做品,轉載時請註明做者及出處

相關文章
相關標籤/搜索