Hadoop入門進階課程6--MapReduce應用案例

本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,博主爲石山園,博客地址爲 http://www.cnblogs.com/shishanyuan  。該系列課程是應邀實驗樓整理編寫的,這裏須要贊一下實驗樓提供了學習的新方式,能夠邊看博客邊上機實驗,課程地址爲 https://www.shiyanlou.com/courses/237java

【注】該系列所使用到安裝包、測試數據和代碼都可在百度網盤下載,具體地址爲 http://pan.baidu.com/s/10PnDs,下載該PDF文件linux

1環境說明

部署節點操做系統爲CentOS,防火牆和SElinux禁用,建立了一個shiyanlou用戶並在系統根目錄下建立/app目錄,用於存放Hadoop等組件運行包。由於該目錄用於安裝hadoop等組件程序,用戶對shiyanlou必須賦予rwx權限(通常作法是root用戶在根目錄下建立/app目錄,並修改該目錄擁有者爲shiyanlou(chown R shiyanlou:shiyanlou /app)。算法

Hadoop搭建環境:apache

l  虛擬機操做系統: CentOS6.6  64位,單核,1G內存緩存

l  JDK1.7.0_55 64網絡

l  Hadoop1.1.2app

2準備測試數據

測試數據包括兩個文件dept(部門)和emp(員工),其中各字段用逗號分隔:ide

dept文件內容:函數

10,ACCOUNTING,NEW YORKoop

20,RESEARCH,DALLAS

30,SALES,CHICAGO

40,OPERATIONS,BOSTON

 

emp文件內容:

7369,SMITH,CLERK,7902,17-12-80,800,,20

7499,ALLEN,SALESMAN,7698,20-2-81,1600,300,30

7521,WARD,SALESMAN,7698,22-2-81,1250,500,30

7566,JONES,MANAGER,7839,02-4-81,2975,,20

7654,MARTIN,SALESMAN,7698,28-9-81,1250,1400,30

7698,BLAKE,MANAGER,7839,01-5-81,2850,,30

7782,CLARK,MANAGER,7839,09-6-81,2450,,10

7839,KING,PRESIDENT,,17-11-81,5000,,10

7844,TURNER,SALESMAN,7698,08-9-81,1500,0,30

7900,JAMES,CLERK,7698,03-12-81,950,,30

7902,FORD,ANALYST,7566,03-12-81,3000,,20

7934,MILLER,CLERK,7782,23-1-82,1300,,10

 

/home/shiyanlou/install-pack/class6目錄能夠找到這兩個文件,把這兩個文件上傳到HDFS/class6/input目錄中,執行以下命令:

cd /home/shiyanlou/install-pack/class6

hadoop fs -mkdir -p /class6/input

hadoop fs -copyFromLocal dept /class6/input

hadoop fs -copyFromLocal emp /class6/input

hadoop fs -ls /class6/input

clip_image002

3應用案例

3.1 測試例子1:求各個部門的總工資

3.1.1 問題分析

MapReduce中的join分爲好幾種,好比有最多見的 reduce side joinmap side joinsemi join 等。reduce join shuffle階段要進行大量的數據傳輸,會形成大量的網絡IO效率低下,而map side join 在處理多個小表關聯大表時很是有用 。

Map side join是針對如下場景進行的優化:兩個待鏈接表中,有一個表很是大,而另外一個表很是小,以致於小表能夠直接存放到內存中。這樣咱們能夠將小表複製多份,讓每一個map task內存中存在一份(好比存放到hash table中),而後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,若是有,則鏈接後輸出便可。爲了支持文件的複製,Hadoop提供了一個類DistributedCache,使用該類的方法以下:

1)用戶使用靜態方法DistributedCache.addCacheFile()指定要複製的文件,它的參數是文件的URI(若是是HDFS上的文件,能夠這樣:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在做業啓動以前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。

2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。

在下面代碼中,將會把數據量小的表(部門dept)緩存在內存中,在Mapper階段對員工部門編號映射成部門名稱,該名稱做爲key輸出到Reduce中,在Reduce中計算按照部門計算各個部門的總工資。

3.1.2 處理流程圖

clip_image004

3.1.3 測試代碼

Q1SumDeptSalary.java代碼(vi編輯代碼是不能存在中文):

 

  1 import java.io.BufferedReader;
  2 import java.io.FileReader;
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.conf.Configured;
  9 import org.apache.hadoop.filecache.DistributedCache;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20 import org.apache.hadoop.util.GenericOptionsParser;
 21 import org.apache.hadoop.util.Tool;
 22 import org.apache.hadoop.util.ToolRunner;
 23 
 24 public class Q1SumDeptSalary extends Configured implements Tool {
 25 
 26     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
 27 
 28         // 用於緩存 dept文件中的數據
 29         private Map<String, String> deptMap = new HashMap<String, String>();
 30         private String[] kv;
 31 
 32         // 此方法會在Map方法執行以前執行且執行一次
 33         @Override
 34         protected void setup(Context context) throws IOException, InterruptedException {
 35             BufferedReader in = null;
 36             try {
 37 
 38                 // 從當前做業中獲取要緩存的文件
 39                 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
 40                 String deptIdName = null;
 41                 for (Path path : paths) {
 42 
 43                     // 對部門文件字段進行拆分並緩存到deptMap中
 44                     if (path.toString().contains("dept")) {
 45                         in = new BufferedReader(new FileReader(path.toString()));
 46                         while (null != (deptIdName = in.readLine())) {
 47                             
 48                             // 對部門文件字段進行拆分並緩存到deptMap中
 49                             // 其中Map中key爲部門編號,value爲所在部門名稱
 50                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
 51                         }
 52                     }
 53                 }
 54             } catch (IOException e) {
 55                 e.printStackTrace();
 56             } finally {
 57                 try {
 58                     if (in != null) {
 59                         in.close();
 60                     }
 61                 } catch (IOException e) {
 62                     e.printStackTrace();
 63                 }
 64             }
 65         }
 66 
 67 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 68 
 69             // 對員工文件字段進行拆分
 70             kv = value.toString().split(",");
 71 
 72             // map join: 在map階段過濾掉不須要的數據,輸出key爲部門名稱和value爲員工工資
 73             if (deptMap.containsKey(kv[7])) {
 74                 if (null != kv[5] && !"".equals(kv[5].toString())) {
 75                     context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
 76                 }
 77             }
 78         }
 79     }
 80 
 81     public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {
 82 
 83 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 84 
 85             // 對同一部門的員工工資進行求和
 86             long sumSalary = 0;
 87             for (Text val : values) {
 88                 sumSalary += Long.parseLong(val.toString());
 89             }
 90 
 91             // 輸出key爲部門名稱和value爲該部門員工工資總和
 92             context.write(key, new LongWritable(sumSalary));
 93         }
 94     }
 95 
 96     @Override
 97     public int run(String[] args) throws Exception {
 98 
 99         // 實例化做業對象,設置做業名稱、Mapper和Reduce類
100         Job job = new Job(getConf(), "Q1SumDeptSalary");
101         job.setJobName("Q1SumDeptSalary");
102         job.setJarByClass(Q1SumDeptSalary.class);
103         job.setMapperClass(MapClass.class);
104         job.setReducerClass(Reduce.class);
105 
106         // 設置輸入格式類
107         job.setInputFormatClass(TextInputFormat.class);
108 
109         // 設置輸出格式
110         job.setOutputFormatClass(TextOutputFormat.class);
111         job.setOutputKeyClass(Text.class);
112         job.setOutputValueClass(Text.class);
113 
114         // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第3個參數爲輸出路徑
115     String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
116     DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
117         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
118         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
119 
120         job.waitForCompletion(true);
121         return job.isSuccessful() ? 0 : 1;
122     }
123 
124     /**
125      * 主方法,執行入口
126      * @param args 輸入參數
127      */
128     public static void main(String[] args) throws Exception {
129         int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);
130         System.exit(res);
131     }
132 }

 

3.1.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q1SumDeptSalary.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q1SumDeptSalary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q1SumDeptSalary.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java

把編譯好的代碼打成jar包(若是不打成jar形式運行會提示class沒法找到的錯誤)

jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.class

mv *.jar ../..

rm Q1SumDept*.class

clip_image006

3.1.5 運行並查看結果

運行Q1SumDeptSalary時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out1

 

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1

clip_image008

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out1目錄,打開part-r-00000文件

hadoop fs -ls /class6/out1

hadoop fs -cat /class6/out1/part-r-00000

能夠看到運行結果:

ACCOUNTING8750

RESEARCH6775

SALES  9400

clip_image010

3.2 測試例子2:求各個部門的人數和平均工資

3.2.1 問題分析

求各個部門的人數和平均工資,須要獲得各部門工資總數和部門人數,經過二者相除獲取各部門平均工資。首先和問題1相似在MapperSetup階段緩存部門數據,而後在Mapper階段抽取出部門編號和員工工資,利用緩存部門數據把部門編號對應爲部門名稱,接着在Shuffle階段把傳過來的數據處理爲部門名稱對應該部門全部員工工資的列表,最後在Reduce中按照部門歸組,遍歷部門全部員工,求出總數和員工數,輸出部門名稱和平均工資。

3.2.2 處理流程圖

clip_image012

3.2.3 編寫代碼

Q2DeptNumberAveSalary.java代碼:

 

  1 import java.io.BufferedReader;
  2 import java.io.FileReader;
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.conf.Configured;
  9 import org.apache.hadoop.filecache.DistributedCache;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20 import org.apache.hadoop.util.GenericOptionsParser;
 21 import org.apache.hadoop.util.Tool;
 22 import org.apache.hadoop.util.ToolRunner;
 23 
 24 public class Q2DeptNumberAveSalary extends Configured implements Tool {
 25 
 26     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
 27 
 28         // 用於緩存 dept文件中的數據
 29         private Map<String, String> deptMap = new HashMap<String, String>();
 30         private String[] kv;
 31 
 32         // 此方法會在Map方法執行以前執行且執行一次
 33         @Override
 34         protected void setup(Context context) throws IOException, InterruptedException {
 35             BufferedReader in = null;
 36             try {
 37                 // 從當前做業中獲取要緩存的文件
 38                 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
 39                 String deptIdName = null;
 40                 for (Path path : paths) {
 41 
 42                     // 對部門文件字段進行拆分並緩存到deptMap中
 43                     if (path.toString().contains("dept")) {
 44                         in = new BufferedReader(new FileReader(path.toString()));
 45                         while (null != (deptIdName = in.readLine())) {
 46                             
 47                             // 對部門文件字段進行拆分並緩存到deptMap中
 48                             // 其中Map中key爲部門編號,value爲所在部門名稱
 49                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
 50                         }
 51                     }
 52                 }
 53             } catch (IOException e) {
 54                 e.printStackTrace();
 55             } finally {
 56                 try {
 57                     if (in != null) {
 58                         in.close();
 59                     }
 60                 } catch (IOException e) {
 61                     e.printStackTrace();
 62                 }
 63             }
 64         }
 65 
 66     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 67 
 68             // 對員工文件字段進行拆分
 69             kv = value.toString().split(",");
 70 
 71             // map join: 在map階段過濾掉不須要的數據,輸出key爲部門名稱和value爲員工工資
 72             if (deptMap.containsKey(kv[7])) {
 73                 if (null != kv[5] && !"".equals(kv[5].toString())) {
 74                     context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
 75                 }
 76             }
 77         }
 78     }
 79 
 80     public static class Reduce extends Reducer<Text, Text, Text, Text> {
 81 
 82     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 83 
 84             long sumSalary = 0;
 85             int deptNumber = 0;
 86 
 87             // 對同一部門的員工工資進行求和
 88             for (Text val : values) {
 89                 sumSalary += Long.parseLong(val.toString());
 90                 deptNumber++;
 91             }
 92 
 93             // 輸出key爲部門名稱和value爲該部門員工工資平均值
 94     context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber));
 95         }
 96     }
 97 
 98     @Override
 99     public int run(String[] args) throws Exception {
100 
101         // 實例化做業對象,設置做業名稱、Mapper和Reduce類
102         Job job = new Job(getConf(), "Q2DeptNumberAveSalary");
103         job.setJobName("Q2DeptNumberAveSalary");
104         job.setJarByClass(Q2DeptNumberAveSalary.class);
105         job.setMapperClass(MapClass.class);
106         job.setReducerClass(Reduce.class);
107 
108         // 設置輸入格式類
109         job.setInputFormatClass(TextInputFormat.class);
110 
111         // 設置輸出格式類
112         job.setOutputFormatClass(TextOutputFormat.class);
113         job.setOutputKeyClass(Text.class);
114         job.setOutputValueClass(Text.class);
115 
116         // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第3個參數爲輸出路徑
117     String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
118         DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
119         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
120         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
121 
122         job.waitForCompletion(true);
123         return job.isSuccessful() ? 0 : 1;
124     }
125 
126     /**
127      * 主方法,執行入口
128      * @param args 輸入參數
129      */
130     public static void main(String[] args) throws Exception {
131         int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);
132         System.exit(res);
133     }
134 }

 

3.2.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q2DeptNumberAveSalary.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q2DeptNumberAveSalary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q2DeptNumberAveSalary.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q2DeptNumberAveSalary.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q2DeptNumberAveSalary.jar ./Q2DeptNum*.class

mv *.jar ../..

rm Q2DeptNum*.class

clip_image014

3.2.5 運行並查看結果

運行Q2DeptNumberAveSalary時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out2

 

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q2DeptNumberAveSalary.jar Q2DeptNumberAveSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out2

clip_image016

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out2目錄

hadoop fs -ls /class6/out2

hadoop fs -cat /class6/out2/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

ACCOUNTINGDept Number:3,Ave Salary:2916

RESEARCHDept Number:3,Ave Salary:2258

SALES  Dept Number:6,Ave Salary:1566

clip_image018

3.3 測試例子3:求每一個部門最先進入公司的員工姓名

3.3.1 問題分析

求每一個部門最先進入公司員工姓名,須要獲得各部門全部員工的進入公司日期,經過比較獲取最先進入公司員工姓名。首先和問題1相似在MapperSetup階段緩存部門數據,而後Mapper階段抽取出key爲部門名稱(利用緩存部門數據把部門編號對應爲部門名稱),value爲員工姓名和進入公司日期,接着在Shuffle階段把傳過來的數據處理爲部門名稱對應該部門全部員工+進入公司日期的列表,最後在Reduce中按照部門歸組,遍歷部門全部員工,找出最先進入公司的員工並輸出。

3.3.2 處理流程圖

clip_image020

3.3.3 編寫代碼

 

  1 import java.io.BufferedReader;
  2 import java.io.FileReader;
  3 import java.io.IOException;
  4 import java.text.DateFormat;
  5 import java.text.ParseException;
  6 import java.text.SimpleDateFormat;
  7 import java.util.Date;
  8 import java.util.HashMap;
  9 import java.util.Map;
 10 
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.conf.Configured;
 13 import org.apache.hadoop.filecache.DistributedCache;
 14 import org.apache.hadoop.fs.Path;
 15 import org.apache.hadoop.io.LongWritable;
 16 import org.apache.hadoop.io.Text;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.Mapper;
 19 import org.apache.hadoop.mapreduce.Reducer;
 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 23 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 24 import org.apache.hadoop.util.GenericOptionsParser;
 25 import org.apache.hadoop.util.Tool;
 26 import org.apache.hadoop.util.ToolRunner;
 27 
 28 public class Q3DeptEarliestEmp extends Configured implements Tool {
 29 
 30     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
 31 
 32         // 用於緩存 dept文件中的數據
 33         private Map<String, String> deptMap = new HashMap<String, String>();
 34         private String[] kv;
 35 
 36         // 此方法會在Map方法執行以前執行且執行一次
 37         @Override
 38         protected void setup(Context context) throws IOException, InterruptedException {
 39             BufferedReader in = null;
 40             try {
 41                 // 從當前做業中獲取要緩存的文件
 42                 Path[] paths =     DistributedCache.getLocalCacheFiles(context.getConfiguration());
 43                 String deptIdName = null;
 44                 for (Path path : paths) {
 45                     if (path.toString().contains("dept")) {
 46                         in = new BufferedReader(new FileReader(path.toString()));
 47                         while (null != (deptIdName = in.readLine())) {
 48 
 49                             // 對部門文件字段進行拆分並緩存到deptMap中
 50                             // 其中Map中key爲部門編號,value爲所在部門名稱
 51                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
 52                         }
 53                     }
 54                 }
 55             } catch (IOException e) {
 56                 e.printStackTrace();
 57             } finally {
 58                 try {
 59                     if (in != null) {
 60                         in.close();
 61                     }
 62                 } catch (IOException e) {
 63                     e.printStackTrace();
 64                 }
 65             }
 66         }
 67 
 68         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {
 69 
 70             // 對員工文件字段進行拆分
 71             kv = value.toString().split(",");
 72 
 73             // map join: 在map階段過濾掉不須要的數據
 74             // 輸出key爲部門名稱和value爲員工姓名+","+員工進入公司日期
 75             if (deptMap.containsKey(kv[7])) {
 76                 if (null != kv[4] && !"".equals(kv[4].toString())) {
 77                     context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim()                     + "," + kv[4].trim()));
 78                 }
 79             }
 80         }
 81     }
 82 
 83     public static class Reduce extends Reducer<Text, Text, Text, Text> {
 84 
 85         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException {
 86 
 87             // 員工姓名和進入公司日期
 88             String empName = null;
 89             String empEnterDate = null;
 90 
 91             // 設置日期轉換格式和最先進入公司的員工、日期
 92             DateFormat df = new SimpleDateFormat("dd-MM月-yy");
 93 
 94             Date earliestDate = new Date();
 95             String earliestEmp = null;
 96 
 97             // 遍歷該部門下全部員工,獲得最先進入公司的員工信息
 98             for (Text val : values) {
 99                 empName = val.toString().split(",")[0];
100                 empEnterDate = val.toString().split(",")[1].toString().trim();
101                 try {
102                     System.out.println(df.parse(empEnterDate));
103                     if (df.parse(empEnterDate).compareTo(earliestDate) < 0) {
104                         earliestDate = df.parse(empEnterDate);
105                         earliestEmp = empName;
106                     }
107                 } catch (ParseException e) {
108                     e.printStackTrace();
109                 }
110             }
111 
112             // 輸出key爲部門名稱和value爲該部門最先進入公司員工
113             context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter             date:" + new SimpleDateFormat("yyyy-MM-dd").format(earliestDate)));
114         }
115     }
116 
117     @Override
118     public int run(String[] args) throws Exception {
119 
120         // 實例化做業對象,設置做業名稱
121         Job job = new Job(getConf(), "Q3DeptEarliestEmp");
122         job.setJobName("Q3DeptEarliestEmp");
123 
124         // 設置Mapper和Reduce類
125         job.setJarByClass(Q3DeptEarliestEmp.class);
126         job.setMapperClass(MapClass.class);
127         job.setReducerClass(Reduce.class);
128 
129         // 設置輸入格式類
130         job.setInputFormatClass(TextInputFormat.class);
131 
132         // 設置輸出格式類
133         job.setOutputFormatClass(TextOutputFormat.class);
134         job.setOutputKeyClass(Text.class);
135         job.setOutputValueClass(Text.class);
136 
137         // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第三個參數爲輸出路徑
138     String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
139     DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
140         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
141         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
142 
143         job.waitForCompletion(true);
144         return job.isSuccessful() ? 0 : 1;
145     }
146 
147     /**
148      * 主方法,執行入口
149      * @param args 輸入參數
150      */
151     public static void main(String[] args) throws Exception {
152         int res = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args);
153         System.exit(res);
154     }
155 }

 

3.3.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q3DeptEarliestEmp.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q3DeptEarliestEmp.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q3DeptEarliestEmp.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q3DeptEarliestEmp.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q3DeptEarliestEmp.jar ./Q3DeptEar*.class

mv *.jar ../..

rm Q3DeptEar*.class

clip_image022

3.3.5 運行並查看結果

運行Q3DeptEarliestEmp時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out3

 

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q3DeptEarliestEmp.jar Q3DeptEarliestEmp hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out3

clip_image024

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out3目錄

hadoop fs -ls /class6/out3

hadoop fs -cat /class6/out3/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

ACCOUNTINGThe earliest emp of dept:CLARK, Enter date:1981-06-09

RESEARCHThe earliest emp of dept:SMITH, Enter date:1980-12-17

SALES  The earliest emp of dept:ALLEN, Enter date:1981-02-20

clip_image026

3.4 測試例子4:求各個城市的員工的總工資

3.4.1 問題分析

求各個城市員工的總工資,須要獲得各個城市全部員工的工資,經過對各個城市全部員工工資求和獲得總工資。首先和測試例子1相似在MapperSetup階段緩存部門對應所在城市數據,而後在Mapper階段抽取出key爲城市名稱(利用緩存數據把部門編號對應爲所在城市名稱),value爲員工工資,接着在Shuffle階段把傳過來的數據處理爲城市名稱對應該城市全部員工工資,最後在Reduce中按照城市歸組,遍歷城市全部員工,求出工資總數並輸出。

3.4.2 處理流程圖

clip_image028

3.4.3 編寫代碼

 

  1 import java.io.BufferedReader;
  2 import java.io.FileReader;
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.conf.Configured;
  9 import org.apache.hadoop.filecache.DistributedCache;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20 import org.apache.hadoop.util.GenericOptionsParser;
 21 import org.apache.hadoop.util.Tool;
 22 import org.apache.hadoop.util.ToolRunner;
 23 
 24 public class Q4SumCitySalary extends Configured implements Tool {
 25 
 26     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
 27 
 28         // 用於緩存 dept文件中的數據
 29         private Map<String, String> deptMap = new HashMap<String, String>();
 30         private String[] kv;
 31 
 32         // 此方法會在Map方法執行以前執行且執行一次
 33         @Override
 34         protected void setup(Context context) throws IOException, InterruptedException {
 35             BufferedReader in = null;
 36             try {
 37                 // 從當前做業中獲取要緩存的文件
 38                 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
 39                 String deptIdName = null;
 40                 for (Path path : paths) {
 41                     if (path.toString().contains("dept")) {
 42                         in = new BufferedReader(new FileReader(path.toString()));
 43                         while (null != (deptIdName = in.readLine())) {
 44 
 45                             // 對部門文件字段進行拆分並緩存到deptMap中
 46                             // 其中Map中key爲部門編號,value爲所在城市名稱
 47                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[2]);
 48                         }
 49                     }
 50                 }
 51             } catch (IOException e) {
 52                 e.printStackTrace();
 53             } finally {
 54                 try {
 55                     if (in != null) {
 56                         in.close();
 57                     }
 58                 } catch (IOException e) {
 59                     e.printStackTrace();
 60                 }
 61             }
 62         }
 63 
 64         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {
 65 
 66             // 對員工文件字段進行拆分
 67             kv = value.toString().split(",");
 68 
 69             // map join: 在map階段過濾掉不須要的數據,輸出key爲城市名稱和value爲員工工資
 70             if (deptMap.containsKey(kv[7])) {
 71                 if (null != kv[5] && !"".equals(kv[5].toString())) {
 72                     context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
 73                 }
 74             }
 75         }
 76     }
 77 
 78     public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {
 79 
 80         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException {
 81 
 82             // 對同一城市的員工工資進行求和
 83             long sumSalary = 0;
 84             for (Text val : values) {
 85                 sumSalary += Long.parseLong(val.toString());
 86             }
 87 
 88             // 輸出key爲城市名稱和value爲該城市工資總和
 89             context.write(key, new LongWritable(sumSalary));
 90         }
 91     }
 92 
 93     @Override
 94     public int run(String[] args) throws Exception {
 95 
 96         // 實例化做業對象,設置做業名稱
 97         Job job = new Job(getConf(), "Q4SumCitySalary");
 98         job.setJobName("Q4SumCitySalary");
 99 
100         // 設置Mapper和Reduce類
101         job.setJarByClass(Q4SumCitySalary.class);
102         job.setMapperClass(MapClass.class);
103         job.setReducerClass(Reduce.class);
104 
105         // 設置輸入格式類
106         job.setInputFormatClass(TextInputFormat.class);
107 
108         // 設置輸出格式類
109         job.setOutputFormatClass(TextOutputFormat.class);
110         job.setOutputKeyClass(Text.class);
111         job.setOutputValueClass(Text.class);
112 
113         // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第3個參數爲輸出路徑
114     String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
115     DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
116         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
117         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
118 
119         job.waitForCompletion(true);
120         return job.isSuccessful() ? 0 : 1;
121     }
122 
123     /**
124      * 主方法,執行入口
125      * @param args 輸入參數
126      */
127     public static void main(String[] args) throws Exception {
128         int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args);
129         System.exit(res);
130     }
131 }

 

3.4.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q4SumCitySalary.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q4SumCitySalary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q4SumCitySalary.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q4SumCitySalary.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q4SumCitySalary.jar ./Q4SumCity*.class

mv *.jar ../..

rm Q4SumCity*.class

clip_image030

3.4.5 運行並查看結果

運行Q4SumCitySalary時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out4

 

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q4SumCitySalary.jar Q4SumCitySalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out4

clip_image032

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out4目錄

hadoop fs -ls /class6/out4

hadoop fs -cat /class6/out4/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

CHICAGO  9400

DALLAS     6775

NEW YORK     8750

clip_image034

3.5 測試例子5:列出工資比上司高的員工姓名及其工資

3.5.1 問題分析

求工資比上司高的員工姓名及工資,須要獲得上司工資及上司全部下屬員工,經過比較他們工資高低獲得比上司工資高的員工。在Mapper階段輸出經理數據和員工對應經理表數據,其中經理數據key爲員工編號、value"M,該員工工資",員工對應經理表數據key爲經理編號、value"E,該員工姓名,該員工工資";而後在Shuffle階段把傳過來的經理數據和員工對應經理表數據進行歸組,如編號爲7698員工,value中標誌M爲本身工資,value中標誌E爲其下屬姓名及工資;最後在Reduce中遍歷比較員工與經理工資高低,輸出工資高於經理的員工。

3.5.2 處理流程圖

clip_image036

3.5.3 編寫代碼

 

  1 import java.io.IOException;
  2 import java.util.HashMap;
  3 
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.conf.Configured;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 16 import org.apache.hadoop.util.GenericOptionsParser;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 
 20 public class Q5EarnMoreThanManager extends Configured implements Tool {
 21 
 22     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
 23 
 24         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {
 25 
 26             // 對員工文件字段進行拆分
 27             String[] kv = value.toString().split(",");
 28 
 29             // 輸出經理表數據,其中key爲員工編號和value爲M+該員工工資
 30             context.write(new Text(kv[0].toString()), new Text("M," + kv[5]));
 31 
 32             // 輸出員工對應經理表數據,其中key爲經理編號和value爲(E,該員工姓名,該員工工資)
 33             if (null != kv[3] && !"".equals(kv[3].toString())) {
 34                 context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5]));
 35             }
 36         }
 37     }
 38 
 39     public static class Reduce extends Reducer<Text, Text, Text, Text> {
 40 
 41         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException {
 42 
 43             // 定義員工姓名、工資和存放部門員工Map
 44             String empName;
 45             long empSalary = 0;
 46             HashMap<String, Long> empMap = new HashMap<String, Long>();
 47             
 48             // 定義經理工資變量
 49             long mgrSalary = 0;
 50 
 51             for (Text val : values) {
 52                 if (val.toString().startsWith("E")) {
 53                     // 當是員工標示時,獲取該員工對應的姓名和工資並放入Map中
 54                     empName = val.toString().split(",")[1];
 55                     empSalary = Long.parseLong(val.toString().split(",")[2]);
 56                     empMap.put(empName, empSalary);
 57                 } else {
 58                     // 當時經理標誌時,獲取該經理工資
 59                     mgrSalary = Long.parseLong(val.toString().split(",")[1]);
 60                 }
 61             }
 62 
 63             // 遍歷該經理下屬,比較員工與經理工資高低,輸出工資高於經理的員工
 64             for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) {
 65                 if (entry.getValue() > mgrSalary) {
 66                     context.write(new Text(entry.getKey()), new Text("" + entry.getValue()));
 67                 }
 68             }
 69         }
 70     }
 71 
 72     @Override
 73     public int run(String[] args) throws Exception {
 74 
 75         // 實例化做業對象,設置做業名稱
 76         Job job = new Job(getConf(), "Q5EarnMoreThanManager");
 77         job.setJobName("Q5EarnMoreThanManager");
 78 
 79         // 設置Mapper和Reduce類
 80         job.setJarByClass(Q5EarnMoreThanManager.class);
 81         job.setMapperClass(MapClass.class);
 82         job.setReducerClass(Reduce.class);
 83 
 84         // 設置輸入格式類
 85         job.setInputFormatClass(TextInputFormat.class);
 86 
 87         // 設置輸出格式類
 88         job.setOutputFormatClass(TextOutputFormat.class);
 89         job.setOutputKeyClass(Text.class);
 90         job.setOutputValueClass(Text.class);
 91 
 92         // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑
 93 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
 94         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
 95         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 96 
 97         job.waitForCompletion(true);
 98         return job.isSuccessful() ? 0 : 1;
 99     }
100 
101     /**
102      * 主方法,執行入口
103      * @param args 輸入參數
104      */
105     public static void main(String[] args) throws Exception {
106         int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args);
107         System.exit(res);
108     }
109 }

 

3.5.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q5EarnMoreThanManager.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q5EarnMoreThanManager.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q5EarnMoreThanManager.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q5EarnMoreThanManager.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q5EarnMoreThanManager.jar ./Q5EarnMore*.class

mv *.jar ../..

rm Q5EarnMore*.class

clip_image038

3.5.5 運行並查看結果

運行Q5EarnMoreThanManager運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out5

 

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q5EarnMoreThanManager.jar Q5EarnMoreThanManager hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out5

clip_image040

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out5目錄

hadoop fs -ls /class6/out5

hadoop fs -cat /class6/out5/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

FORD  3000

clip_image042

3.6  測試例子6:列出工資比公司平均工資要高的員工姓名及其工資

3.6.1 問題分析

求工資比公司平均工資要高的員工姓名及工資,須要獲得公司的平均工資和全部員工工資,經過比較得出工資比平均工資高的員工姓名及工資。這個問題能夠分兩個做業進行解決,先求出公司的平均工資,而後與全部員工進行比較獲得結果;也能夠在一個做業進行解決,這裏就得使用做業setNumReduceTasks方法,設置Reduce任務數爲1,保證每次運行一個reduce任務,從而能先求出平均工資,而後進行比較得出結果。

Mapper階段輸出兩份全部員工數據,其中一份key0value爲該員工工資,另一份key0value"該員工姓名 ,員工工資";而後在Shuffle階段把傳過來數據按照key進行歸組,在該任務中有key值爲01兩組數據;最後在Reduce中對key0的全部員工求工資總數和員工數,得到平均工資;對key1,比較員工與平均工資的大小,輸出比平均工資高的員工和對應的工資。

3.6.2 處理流程圖

clip_image044

3.6.3 編寫代碼

 

  1 import java.io.IOException;
  2 import org.apache.hadoop.conf.Configuration;
  3 import org.apache.hadoop.conf.Configured;
  4 import org.apache.hadoop.fs.Path;
  5 import org.apache.hadoop.io.IntWritable;
  6 import org.apache.hadoop.io.LongWritable;
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.mapreduce.Job;
  9 import org.apache.hadoop.mapreduce.Mapper;
 10 import org.apache.hadoop.mapreduce.Reducer;
 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 14 import org.apache.hadoop.util.GenericOptionsParser;
 15 import org.apache.hadoop.util.Tool;
 16 import org.apache.hadoop.util.ToolRunner;
 17 
 18 public class Q6HigherThanAveSalary extends Configured implements Tool {
 19 
 20     public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {
 21 
 22         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {
 23 
 24             // 對員工文件字段進行拆分
 25             String[] kv = value.toString().split(",");
 26 
 27             // 獲取全部員工數據,其中key爲0和value爲該員工工資
 28             context.write(new IntWritable(0), new Text(kv[5]));
 29 
 30             // 獲取全部員工數據,其中key爲0和value爲(該員工姓名 ,員工工資)
 31             context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5]));
 32         }
 33     }
 34 
 35     public static class Reduce extends Reducer<IntWritable, Text, Text, Text> {
 36 
 37         // 定義員工工資、員工數和平均工資
 38         private long allSalary = 0;
 39         private int allEmpCount = 0;
 40         private long aveSalary = 0;
 41         
 42         // 定義員工工資變量
 43         private long empSalary = 0;
 44 
 45         public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException {
 46 
 47             for (Text val : values) {
 48                 if (0 == key.get()) {
 49                     // 獲取全部員工工資和員工數
 50                     allSalary += Long.parseLong(val.toString());
 51                     allEmpCount++;
 52                     System.out.println("allEmpCount = " + allEmpCount);
 53                 } else if (1 == key.get()) {
 54                     if (aveSalary == 0) {
 55                         aveSalary = allSalary / allEmpCount;
 56                         context.write(new Text("Average Salary = "), new Text("" + aveSalary));
 57                         context.write(new Text("Following employees have salarys higher than                         Average:"), new Text(""));
 58                     }
 59 
 60                     // 獲取員工的平均工資
 61                     System.out.println("Employee salary = " + val.toString());
 62                     aveSalary = allSalary / allEmpCount;
 63                     
 64                     // 比較員工與平均工資的大小,輸出比平均工資高的員工和對應的工資
 65                     empSalary = Long.parseLong(val.toString().split(",")[1]);
 66                     if (empSalary > aveSalary) {
 67                         context.write(new Text(val.toString().split(",")[0]), new Text("" +                         empSalary));
 68                     }
 69                 }
 70             }
 71         }
 72     }
 73 
 74     @Override
 75     public int run(String[] args) throws Exception {
 76 
 77         // 實例化做業對象,設置做業名稱
 78         Job job = new Job(getConf(), "Q6HigherThanAveSalary");
 79         job.setJobName("Q6HigherThanAveSalary");
 80 
 81         // 設置Mapper和Reduce類
 82         job.setJarByClass(Q6HigherThanAveSalary.class);
 83         job.setMapperClass(MapClass.class);
 84         job.setReducerClass(Reduce.class);
 85 
 86         // 必須設置Reduce任務數爲1 # -D mapred.reduce.tasks = 1
 87         // 這是該做業設置的核心,這樣纔可以保證各reduce是串行的
 88         job.setNumReduceTasks(1);
 89 
 90         // 設置輸出格式類
 91         job.setMapOutputKeyClass(IntWritable.class);
 92         job.setMapOutputValueClass(Text.class);
 93 
 94         // 設置輸出鍵和值類型
 95         job.setOutputFormatClass(TextOutputFormat.class);
 96         job.setOutputKeyClass(Text.class);
 97         job.setOutputValueClass(LongWritable.class);
 98 
 99         // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑
100 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
101         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
102         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
103 
104         job.waitForCompletion(true);
105         return job.isSuccessful() ? 0 : 1;
106     }
107 
108     /**
109      * 主方法,執行入口
110      * @param args 輸入參數
111      */
112     public static void main(String[] args) throws Exception {
113         int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args);
114         System.exit(res);
115     }
116 }

 

3.6.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q5EarnMoreThanManager.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q6HigherThanAveSalary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q6HigherThanAveSalary.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q6HigherThanAveSalary.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q6HigherThanAveSalary.jar ./Q6HigherThan*.class

mv *.jar ../..

rm Q6HigherThan*.class

clip_image046

3.6.5 運行並查看結果

運行Q6HigherThanAveSalary運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out6

 

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q6HigherThanAveSalary.jar Q6HigherThanAveSalary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out6

clip_image048

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out6目錄

hadoop fs -ls /class6/out6

hadoop fs -cat /class6/out6/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

Average Salary = 2077

Following employees have salarys higher than Average:    

FORD  3000

CLARK2450

KING   5000

JONES2975

BLAKE  2850

clip_image050

3.7 測試例子7:列出名字以J開頭的員工姓名及其所屬部門名稱

3.7.1 問題分析

求名字以J開頭的員工姓名機器所屬部門名稱,只需判斷員工姓名是否以J開頭。首先和問題1相似在MapperSetup階段緩存部門數據,而後在Mapper階段判斷員工姓名是否以J開頭,若是是抽取出員工姓名和員工所在部門編號,利用緩存部門數據把部門編號對應爲部門名稱,轉換後輸出結果。

3.7.2 處理流程圖

clip_image052

3.7.3 編寫代碼

 

  1 import java.io.BufferedReader;
  2 import java.io.FileReader;
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.conf.Configured;
  9 import org.apache.hadoop.filecache.DistributedCache;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 19 import org.apache.hadoop.util.GenericOptionsParser;
 20 import org.apache.hadoop.util.Tool;
 21 import org.apache.hadoop.util.ToolRunner;
 22 
 23 public class Q7NameDeptOfStartJ extends Configured implements Tool {
 24 
 25     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
 26 
 27         // 用於緩存 dept文件中的數據
 28         private Map<String, String> deptMap = new HashMap<String, String>();
 29         private String[] kv;
 30 
 31         // 此方法會在Map方法執行以前執行且執行一次
 32         @Override
 33         protected void setup(Context context) throws IOException, InterruptedException {
 34             BufferedReader in = null;
 35             try {
 36 
 37                 // 從當前做業中獲取要緩存的文件
 38                 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
 39                 String deptIdName = null;
 40                 for (Path path : paths) {
 41 
 42                     // 對部門文件字段進行拆分並緩存到deptMap中
 43                     if (path.toString().contains("dept")) {
 44                         in = new BufferedReader(new FileReader(path.toString()));
 45                         while (null != (deptIdName = in.readLine())) {
 46                             
 47                             // 對部門文件字段進行拆分並緩存到deptMap中
 48                             // 其中Map中key爲部門編號,value爲所在部門名稱
 49                             deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
 50                         }
 51                     }
 52                 }
 53             } catch (IOException e) {
 54                 e.printStackTrace();
 55             } finally {
 56                 try {
 57                     if (in != null) {
 58                         in.close();
 59                     }
 60                 } catch (IOException e) {
 61                     e.printStackTrace();
 62                 }
 63             }
 64         }
 65 
 66         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {
 67 
 68             // 對員工文件字段進行拆分
 69             kv = value.toString().split(",");
 70 
 71             // 輸出員工姓名爲J開頭的員工信息,key爲員工姓名和value爲員工所在部門名稱
 72             if (kv[1].toString().trim().startsWith("J")) {
 73                 context.write(new Text(kv[1].trim()), new Text(deptMap.get(kv[7].trim())));
 74             }
 75         }
 76     }
 77 
 78     @Override
 79     public int run(String[] args) throws Exception {
 80 
 81         // 實例化做業對象,設置做業名稱
 82         Job job = new Job(getConf(), "Q7NameDeptOfStartJ");
 83         job.setJobName("Q7NameDeptOfStartJ");
 84 
 85         // 設置Mapper和Reduce類
 86         job.setJarByClass(Q7NameDeptOfStartJ.class);
 87         job.setMapperClass(MapClass.class);
 88 
 89         // 設置輸入格式類
 90         job.setInputFormatClass(TextInputFormat.class);
 91 
 92         // 設置輸出格式類
 93         job.setOutputFormatClass(TextOutputFormat.class);
 94         job.setOutputKeyClass(Text.class);
 95         job.setOutputValueClass(Text.class);
 96 
 97         // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第3個參數爲輸出路徑
 98 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
 99     DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
100         FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
101         FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
102 
103         job.waitForCompletion(true);
104         return job.isSuccessful() ? 0 : 1;
105     }
106 
107     /**
108      * 主方法,執行入口
109      * @param args 輸入參數
110      */
111     public static void main(String[] args) throws Exception {
112         int res = ToolRunner.run(new Configuration(), new Q7NameDeptOfStartJ(), args);
113         System.exit(res);
114     }
115 }

 

3.7.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q7NameDeptOfStartJ.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q7NameDeptOfStartJ.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q7NameDeptOfStartJ.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q7NameDeptOfStartJ.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q7NameDeptOfStartJ.jar ./Q7NameDept*.class

mv *.jar ../..

rm Q7NameDept*.class

clip_image054

3.7.5 運行並查看結果

運行Q7NameDeptOfStartJ時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out7

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q7NameDeptOfStartJ.jar Q7NameDeptOfStartJ hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out7

clip_image056

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out7目錄

hadoop fs -ls /class6/out7

hadoop fs -cat /class6/out7/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

JAMESSALES

JONESRESEARCH

clip_image058

3.8 測試例子8:列出工資最高的頭三名員工姓名及其工資

3.8.1 問題分析

求工資最高的頭三名員工姓名及工資,能夠經過冒泡法獲得。在Mapper階段輸出經理數據和員工對應經理表數據,其中經理數據key0值、value"員工姓名,員工工資";最後在Reduce中經過冒泡法遍歷全部員工,比較員工工資多少,求出前三名。

3.8.2 處理流程圖

clip_image060

3.8.3 編寫代碼

 

  1 import java.io.IOException;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.conf.Configured;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.IntWritable;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 16 import org.apache.hadoop.util.GenericOptionsParser;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 
 20 public class Q8SalaryTop3Salary extends Configured implements Tool {
 21 
 22     public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {
 23 
 24         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {
 25 
 26             // 對員工文件字段進行拆分
 27             String[] kv = value.toString().split(",");
 28 
 29             // 輸出key爲0和value爲員工姓名+","+員工工資
 30             context.write(new IntWritable(0), new Text(kv[1].trim() + "," + kv[5].trim()));
 31         }
 32     }
 33 
 34     public static class Reduce extends Reducer<IntWritable, Text, Text, Text> {
 35 
 36         public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException {
 37 
 38             // 定義工資前三員工姓名
 39             String empName;
 40             String firstEmpName = "";
 41             String secondEmpName = "";
 42             String thirdEmpName = "";
 43             
 44             // 定義工資前三工資
 45             long empSalary = 0;
 46             long firstEmpSalary = 0;
 47             long secondEmpSalary = 0;
 48             long thirdEmpSalary = 0;
 49 
 50             // 經過冒泡法遍歷全部員工,比較員工工資多少,求出前三名
 51             for (Text val : values) {
 52                 empName = val.toString().split(",")[0];
 53                 empSalary = Long.parseLong(val.toString().split(",")[1]);
 54                 
 55                 if(empSalary > firstEmpSalary) {
 56                     thirdEmpName = secondEmpName;
 57                     thirdEmpSalary = secondEmpSalary;
 58                     secondEmpName = firstEmpName;
 59                     secondEmpSalary = firstEmpSalary;
 60                     firstEmpName = empName;
 61                     firstEmpSalary = empSalary;
 62                 } else if (empSalary > secondEmpSalary) {
 63                     thirdEmpName = secondEmpName;
 64                     thirdEmpSalary = secondEmpSalary;
 65                     secondEmpName = empName;
 66                     secondEmpSalary = empSalary;
 67                 } else if (empSalary > thirdEmpSalary) {
 68                     thirdEmpName = empName;
 69                     thirdEmpSalary = empSalary;
 70                 }
 71             }
 72             
 73             // 輸出工資前三名信息
 74             context.write(new Text( "First employee name:" + firstEmpName), new Text("Salary:"             + firstEmpSalary));
 75             context.write(new Text( "Second employee name:" + secondEmpName), new                     Text("Salary:" + secondEmpSalary));
 76             context.write(new Text( "Third employee name:" + thirdEmpName), new Text("Salary:"             + thirdEmpSalary));
 77         }
 78     }
 79 
 80     @Override
 81     public int run(String[] args) throws Exception {
 82 
 83         // 實例化做業對象,設置做業名稱
 84         Job job = new Job(getConf(), "Q8SalaryTop3Salary");
 85         job.setJobName("Q8SalaryTop3Salary");
 86 
 87         // 設置Mapper和Reduce類
 88         job.setJarByClass(Q8SalaryTop3Salary.class);
 89         job.setMapperClass(MapClass.class);
 90         job.setReducerClass(Reduce.class);
 91         job.setMapOutputKeyClass(IntWritable.class); 
 92         job.setMapOutputValueClass(Text.class);
 93 
 94         // 設置輸入格式類
 95         job.setInputFormatClass(TextInputFormat.class);
 96 
 97         // 設置輸出格式類
 98         job.setOutputKeyClass(Text.class);
 99         job.setOutputFormatClass(TextOutputFormat.class);
100         job.setOutputValueClass(Text.class);
101 
102         // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑
103         String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();
104         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
105         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
106 
107         job.waitForCompletion(true);
108         return job.isSuccessful() ? 0 : 1;
109     }
110 
111     /**
112      * 主方法,執行入口
113      * @param args 輸入參數
114      */
115     public static void main(String[] args) throws Exception {
116         int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args);
117         System.exit(res);
118     }
119 }

 

3.8.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q8SalaryTop3Salary.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q8SalaryTop3Salary.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q8SalaryTop3Salary.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q8SalaryTop3Salary.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q8SalaryTop3Salary.jar ./Q8SalaryTop3*.class

mv *.jar ../..

rm Q8SalaryTop3*.class

clip_image062

3.8.5 運行並查看結果

運行Q8SalaryTop3Salary運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out8

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q8SalaryTop3Salary.jar Q8SalaryTop3Salary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out8

clip_image064

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out8目錄

hadoop fs -ls /class6/out8

hadoop fs -cat /class6/out8/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

First employee name:KING    Salary:5000

Second employee name:FORD    Salary:3000

Third employee name:JONESSalary:2975

clip_image066

3.9  測試例子9:將全體員工按照總收入(工資+提成)從高到低排列

3.9.1 問題分析

求全體員工總收入降序排列,得到全部員工總收入並降序排列便可。在Mapper階段輸出全部員工總工資數據,其中key爲員工總工資、value爲員工姓名,在Mapper階段的最後會先調用job.setPartitionerClass對數據進行分區,每一個分區映射到一個reducer,每一個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。因爲在本做業中Mapkey只有0值,故能實現對全部數據進行排序。

3.9.2 處理流程圖

clip_image068

3.9.3    編寫代碼

 

 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.conf.Configured;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.io.WritableComparable;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.util.GenericOptionsParser;
15 import org.apache.hadoop.util.Tool;
16 import org.apache.hadoop.util.ToolRunner;
17 
18 public class Q9EmpSalarySort extends Configured implements Tool {
19 
20     public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {
21 
22         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {
23 
24             // 對員工文件字段進行拆分
25             String[] kv = value.toString().split(",");
26 
27             // 輸出key爲員工全部工資和value爲員工姓名
28             int empAllSalary = "".equals(kv[6]) ? Integer.parseInt(kv[5]) :                             Integer.parseInt(kv[5]) + Integer.parseInt(kv[6]);
29             context.write(new IntWritable(empAllSalary), new Text(kv[1]));
30         }
31     }
32 
33     /**
34      * 遞減排序算法
35      */
36     public static class DecreaseComparator extends IntWritable.Comparator {
37         public int compare(WritableComparable a, WritableComparable b) {
38             return -super.compare(a, b);
39         }
40 
41         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
42             return -super.compare(b1, s1, l1, b2, s2, l2);
43         }
44     }
45 
46     @Override
47     public int run(String[] args) throws Exception {
48 
49         // 實例化做業對象,設置做業名稱
50         Job job = new Job(getConf(), "Q9EmpSalarySort");
51         job.setJobName("Q9EmpSalarySort");
52 
53         // 設置Mapper和Reduce類
54         job.setJarByClass(Q9EmpSalarySort.class);
55         job.setMapperClass(MapClass.class);
56 
57         // 設置輸出格式類
58         job.setMapOutputKeyClass(IntWritable.class);
59         job.setMapOutputValueClass(Text.class);
60         job.setSortComparatorClass(DecreaseComparator.class);
61 
62         // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑
63         String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();
64         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
65         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
66 
67         job.waitForCompletion(true);
68         return job.isSuccessful() ? 0 : 1;
69     }
70 
71     /**
72      * 主方法,執行入口
73      * @param args 輸入參數
74      */
75     public static void main(String[] args) throws Exception {
76         int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args);
77         System.exit(res);
78     }
79 }

 

3.9.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q9EmpSalarySort.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q9EmpSalarySort.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q9EmpSalarySort.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q9EmpSalarySort.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q9EmpSalarySort.jar ./Q9EmpSalary*.class

mv *.jar ../..

rm Q9EmpSalary*.class

clip_image070

3.9.5 運行並查看結果

運行Q9EmpSalarySort運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out9

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q9EmpSalarySort.jar Q9EmpSalarySort hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out9

clip_image072

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out9目錄

hadoop fs -ls /class6/out9

hadoop fs -cat /class6/out9/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

5000    KING

3000    FORD

2975    JONES

2850    BLAKE

......

clip_image074

3.10 測試例子10:求任何兩名員工信息傳遞所須要通過的中間節點數

3.10.1 問題分析

該公司全部員工能夠造成入下圖的樹形結構,求兩個員工的溝通的中間節點數,可轉換在員工樹中求兩個節點連通所通過的節點數,即從其中一節點到匯合節點通過節點數加上另外一節點到匯合節點通過節點數。例如求MQ所需節點數,能夠先找出MA通過的節點數,而後找出QA通過的節點數,二者相加獲得MQ所需節點數。

clip_image076

在做業中首先在Mapper階段全部員工數據,其中經理數據key0值、value"員工編號,員工經理編號",而後在Reduce階段把全部員工放到員工列表和員工對應經理鏈表Map中,最後在ReduceCleanup中按照上面說所算法對任意兩個員工計算出溝通的路徑長度並輸出。

3.10.2 處理流程圖

clip_image078

3.10.3 編寫代碼

 

  1 import java.io.IOException;
  2 import java.util.ArrayList;
  3 import java.util.HashMap;
  4 import java.util.List;
  5 import java.util.Map;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.conf.Configured;
  9 import org.apache.hadoop.fs.Path;
 10 import org.apache.hadoop.io.IntWritable;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.NullWritable;
 13 import org.apache.hadoop.io.Text;
 14 import org.apache.hadoop.mapreduce.Job;
 15 import org.apache.hadoop.mapreduce.Mapper;
 16 import org.apache.hadoop.mapreduce.Reducer;
 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20 import org.apache.hadoop.util.GenericOptionsParser;
 21 import org.apache.hadoop.util.Tool;
 22 import org.apache.hadoop.util.ToolRunner;
 23 
 24 public class Q10MiddlePersonsCountForComm extends Configured implements Tool {
 25 
 26     public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {
 27 
 28         public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {
 29 
 30             // 對員工文件字段進行拆分
 31             String[] kv = value.toString().split(",");
 32 
 33             // 輸出key爲0和value爲員工編號+","+員工經理編號
 34             context.write(new IntWritable(0), new Text(kv[0] + "," + ("".equals(kv[3]) ? " " : kv[3])));
 35         }
 36     }
 37 
 38     public static class Reduce extends Reducer<IntWritable, Text, NullWritable, Text> {
 39 
 40         // 定義員工列表和員工對應經理Map
 41         List<String> employeeList = new ArrayList<String>();
 42         Map<String, String> employeeToManagerMap = new HashMap<String, String>();
 43 
 44         public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException {
 45 
 46             // 在reduce階段把全部員工放到員工列表和員工對應經理Map中
 47             for (Text value : values) {
 48                 employeeList.add(value.toString().split(",")[0].trim());
 49                 employeeToManagerMap.put(value.toString().split(",")[0].trim(),                             value.toString().split(",")[1].trim());
 50             }
 51         }
 52 
 53         @Override
 54         protected void cleanup(Context context) throws IOException, InterruptedException {
 55             int totalEmployee = employeeList.size();
 56             int i, j;
 57             int distance;
 58             System.out.println(employeeList);
 59             System.out.println(employeeToManagerMap);
 60 
 61             // 對任意兩個員工計算出溝通的路徑長度並輸出
 62             for (i = 0; i < (totalEmployee - 1); i++) {
 63                 for (j = (i + 1); j < totalEmployee; j++) {
 64                     distance = calculateDistance(i, j);
 65                     String value = employeeList.get(i) + " and " + employeeList.get(j) + " =                     " + distance;
 66                     context.write(NullWritable.get(), new Text(value)); 
 67                 }
 68             }
 69         }
 70 
 71         /**
 72          * 該公司能夠由全部員工造成樹形結構,求兩個員工的溝通的中間節點數,能夠轉換在員工樹中兩員工之間的距離
 73          * 因爲在樹中任意兩點都會在某上級節點匯合,根據該狀況設計了以下算法
 74          */
 75         private int calculateDistance(int i, int j) {
 76             String employeeA = employeeList.get(i);
 77             String employeeB = employeeList.get(j);
 78             int distance = 0;
 79 
 80             // 若是A是B的經理,反之亦然
 81             if (employeeToManagerMap.get(employeeA).equals(employeeB) ||                                     employeeToManagerMap.get(employeeB).equals(employeeA)) {
 82                 distance = 0;
 83             }
 84             // A和B在同一經理下
 85             else if  (employeeToManagerMap.get(employeeA).equals(
 86                     employeeToManagerMap.get(employeeB))) {
 87                 distance = 0;
 88             } else {
 89                 // 定義A和B對應經理鏈表
 90                 List<String> employeeA_ManagerList = new ArrayList<String>();
 91                 List<String> employeeB_ManagerList = new ArrayList<String>();
 92 
 93                 // 獲取從A開始經理鏈表
 94                 employeeA_ManagerList.add(employeeA);
 95                 String current = employeeA;
 96                 while (false == employeeToManagerMap.get(current).isEmpty()) {
 97                     current = employeeToManagerMap.get(current);
 98                     employeeA_ManagerList.add(current);
 99                 }
100 
101                 // 獲取從B開始經理鏈表
102                 employeeB_ManagerList.add(employeeB);
103                 current = employeeB;
104                 while (false == employeeToManagerMap.get(current).isEmpty()) {
105                     current = employeeToManagerMap.get(current);
106                     employeeB_ManagerList.add(current);
107                 }
108 
109                 int ii = 0, jj = 0;
110                 String currentA_manager, currentB_manager;
111                 boolean found = false;
112 
113                 // 遍歷A與B開始經理鏈表,找出匯合點計算
114                 for (ii = 0; ii < employeeA_ManagerList.size(); ii++) {
115                     currentA_manager = employeeA_ManagerList.get(ii);
116                     for (jj = 0; jj < employeeB_ManagerList.size(); jj++) {
117                         currentB_manager = employeeB_ManagerList.get(jj);
118                         if (currentA_manager.equals(currentB_manager)) {
119                             found = true;
120                             break;
121                         }
122                     }
123 
124                     if (found) {
125                         break;
126                     }
127                 }
128 
129                 // 最後獲取兩隻以前的路徑
130                 distance = ii + jj - 1;
131             }
132 
133             return distance;
134         }
135     }
136 
137     @Override
138     public int run(String[] args) throws Exception {
139 
140         // 實例化做業對象,設置做業名稱
141         Job job = new Job(getConf(), "Q10MiddlePersonsCountForComm");
142         job.setJobName("Q10MiddlePersonsCountForComm");
143 
144         // 設置Mapper和Reduce類
145         job.setJarByClass(Q10MiddlePersonsCountForComm.class);
146         job.setMapperClass(MapClass.class);
147         job.setReducerClass(Reduce.class);
148 
149         // 設置Mapper輸出格式類
150         job.setMapOutputKeyClass(IntWritable.class);
151         job.setMapOutputValueClass(Text.class);
152 
153         // 設置Reduce輸出鍵和值類型
154         job.setOutputFormatClass(TextOutputFormat.class);
155         job.setOutputKeyClass(NullWritable.class);
156         job.setOutputValueClass(Text.class);
157 
158         // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑
159         String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();
160         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
161         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
162 
163         job.waitForCompletion(true);
164         return job.isSuccessful() ? 0 : 1;
165     }
166 
167     /**
168      * 主方法,執行入口
169      * @param args 輸入參數
170      */
171     public static void main(String[] args) throws Exception {
172         int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args);
173         System.exit(res);
174     }
175 }

 

3.10.4 編譯並打包代碼

進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q10MiddlePersonsCountForComm.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q10MiddlePersonsCountForComm.java文件)

cd /app/hadoop-1.1.2/myclass/class6

vi Q10MiddlePersonsCountForComm.java

編譯代碼

javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q10MiddlePersonsCountForComm.java

把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤

jar cvf ./Q10MiddlePersonsCountForComm.jar ./Q10MiddlePersons*.class

mv *.jar ../..

rm Q10MiddlePersons*.class

clip_image080

3.10.5 運行並查看結果

運行Q10MiddlePersonsCountForComm運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:

l  員工數據路徑:hdfs://hadoop:9000/class6/input/emp

l  輸出路徑:hdfs://hadoop:9000/class6/out10

運行以下命令:

cd /app/hadoop-1.1.2

hadoop jar Q10MiddlePersonsCountForComm.jar Q10MiddlePersonsCountForComm hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out10

clip_image082

運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out10目錄

hadoop fs -ls /class6/out10

hadoop fs -cat /class6/out10/part-r-00000

打開part-r-00000文件,能夠看到運行結果:

7369 and 7499 = 4

7369 and 7521 = 4

7369 and 7566 = 1

7369 and 7654 = 4

7369 and 7698 = 3

......

clip_image084

相關文章
相關標籤/搜索