本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,博主爲石山園,博客地址爲 http://www.cnblogs.com/shishanyuan 。該系列課程是應邀實驗樓整理編寫的,這裏須要贊一下實驗樓提供了學習的新方式,能夠邊看博客邊上機實驗,課程地址爲 https://www.shiyanlou.com/courses/237java
【注】該系列所使用到安裝包、測試數據和代碼都可在百度網盤下載,具體地址爲 http://pan.baidu.com/s/10PnDs,下載該PDF文件linux
部署節點操做系統爲CentOS,防火牆和SElinux禁用,建立了一個shiyanlou用戶並在系統根目錄下建立/app目錄,用於存放Hadoop等組件運行包。由於該目錄用於安裝hadoop等組件程序,用戶對shiyanlou必須賦予rwx權限(通常作法是root用戶在根目錄下建立/app目錄,並修改該目錄擁有者爲shiyanlou(chown –R shiyanlou:shiyanlou /app)。算法
Hadoop搭建環境:apache
l 虛擬機操做系統: CentOS6.6 64位,單核,1G內存緩存
l JDK:1.7.0_55 64位網絡
l Hadoop:1.1.2app
測試數據包括兩個文件dept(部門)和emp(員工),其中各字段用逗號分隔:ide
dept文件內容:函數
10,ACCOUNTING,NEW YORKoop
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
emp文件內容:
7369,SMITH,CLERK,7902,17-12月-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30
7566,JONES,MANAGER,7839,02-4月-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30
7782,CLARK,MANAGER,7839,09-6月-81,2450,,10
7839,KING,PRESIDENT,,17-11月-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30
7900,JAMES,CLERK,7698,03-12月-81,950,,30
7902,FORD,ANALYST,7566,03-12月-81,3000,,20
7934,MILLER,CLERK,7782,23-1月-82,1300,,10
在/home/shiyanlou/install-pack/class6目錄能夠找到這兩個文件,把這兩個文件上傳到HDFS中/class6/input目錄中,執行以下命令:
cd /home/shiyanlou/install-pack/class6
hadoop fs -mkdir -p /class6/input
hadoop fs -copyFromLocal dept /class6/input
hadoop fs -copyFromLocal emp /class6/input
hadoop fs -ls /class6/input
MapReduce中的join分爲好幾種,好比有最多見的 reduce side join、map side join和semi join 等。reduce join 在shuffle階段要進行大量的數據傳輸,會形成大量的網絡IO效率低下,而map side join 在處理多個小表關聯大表時很是有用 。
Map side join是針對如下場景進行的優化:兩個待鏈接表中,有一個表很是大,而另外一個表很是小,以致於小表能夠直接存放到內存中。這樣咱們能夠將小表複製多份,讓每一個map task內存中存在一份(好比存放到hash table中),而後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,若是有,則鏈接後輸出便可。爲了支持文件的複製,Hadoop提供了一個類DistributedCache,使用該類的方法以下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要複製的文件,它的參數是文件的URI(若是是HDFS上的文件,能夠這樣:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在做業啓動以前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。
(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。
在下面代碼中,將會把數據量小的表(部門dept)緩存在內存中,在Mapper階段對員工部門編號映射成部門名稱,該名稱做爲key輸出到Reduce中,在Reduce中計算按照部門計算各個部門的總工資。
Q1SumDeptSalary.java代碼(vi編輯代碼是不能存在中文):
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class Q1SumDeptSalary extends Configured implements Tool { 25 26 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27 28 // 用於緩存 dept文件中的數據 29 private Map<String, String> deptMap = new HashMap<String, String>(); 30 private String[] kv; 31 32 // 此方法會在Map方法執行以前執行且執行一次 33 @Override 34 protected void setup(Context context) throws IOException, InterruptedException { 35 BufferedReader in = null; 36 try { 37 38 // 從當前做業中獲取要緩存的文件 39 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 40 String deptIdName = null; 41 for (Path path : paths) { 42 43 // 對部門文件字段進行拆分並緩存到deptMap中 44 if (path.toString().contains("dept")) { 45 in = new BufferedReader(new FileReader(path.toString())); 46 while (null != (deptIdName = in.readLine())) { 47 48 // 對部門文件字段進行拆分並緩存到deptMap中 49 // 其中Map中key爲部門編號,value爲所在部門名稱 50 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 51 } 52 } 53 } 54 } catch (IOException e) { 55 e.printStackTrace(); 56 } finally { 57 try { 58 if (in != null) { 59 in.close(); 60 } 61 } catch (IOException e) { 62 e.printStackTrace(); 63 } 64 } 65 } 66 67 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 68 69 // 對員工文件字段進行拆分 70 kv = value.toString().split(","); 71 72 // map join: 在map階段過濾掉不須要的數據,輸出key爲部門名稱和value爲員工工資 73 if (deptMap.containsKey(kv[7])) { 74 if (null != kv[5] && !"".equals(kv[5].toString())) { 75 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 76 } 77 } 78 } 79 } 80 81 public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { 82 83 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 84 85 // 對同一部門的員工工資進行求和 86 long sumSalary = 0; 87 for (Text val : values) { 88 sumSalary += Long.parseLong(val.toString()); 89 } 90 91 // 輸出key爲部門名稱和value爲該部門員工工資總和 92 context.write(key, new LongWritable(sumSalary)); 93 } 94 } 95 96 @Override 97 public int run(String[] args) throws Exception { 98 99 // 實例化做業對象,設置做業名稱、Mapper和Reduce類 100 Job job = new Job(getConf(), "Q1SumDeptSalary"); 101 job.setJobName("Q1SumDeptSalary"); 102 job.setJarByClass(Q1SumDeptSalary.class); 103 job.setMapperClass(MapClass.class); 104 job.setReducerClass(Reduce.class); 105 106 // 設置輸入格式類 107 job.setInputFormatClass(TextInputFormat.class); 108 109 // 設置輸出格式 110 job.setOutputFormatClass(TextOutputFormat.class); 111 job.setOutputKeyClass(Text.class); 112 job.setOutputValueClass(Text.class); 113 114 // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第3個參數爲輸出路徑 115 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 116 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration()); 117 FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 118 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 119 120 job.waitForCompletion(true); 121 return job.isSuccessful() ? 0 : 1; 122 } 123 124 /** 125 * 主方法,執行入口 126 * @param args 輸入參數 127 */ 128 public static void main(String[] args) throws Exception { 129 int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args); 130 System.exit(res); 131 } 132 }
3.1.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q1SumDeptSalary.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q1SumDeptSalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q1SumDeptSalary.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java
把編譯好的代碼打成jar包(若是不打成jar形式運行會提示class沒法找到的錯誤)
jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.class
mv *.jar ../..
rm Q1SumDept*.class
運行Q1SumDeptSalary時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out1
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out1目錄,打開part-r-00000文件
hadoop fs -ls /class6/out1
hadoop fs -cat /class6/out1/part-r-00000
能夠看到運行結果:
ACCOUNTING8750
RESEARCH6775
SALES 9400
求各個部門的人數和平均工資,須要獲得各部門工資總數和部門人數,經過二者相除獲取各部門平均工資。首先和問題1相似在Mapper的Setup階段緩存部門數據,而後在Mapper階段抽取出部門編號和員工工資,利用緩存部門數據把部門編號對應爲部門名稱,接着在Shuffle階段把傳過來的數據處理爲部門名稱對應該部門全部員工工資的列表,最後在Reduce中按照部門歸組,遍歷部門全部員工,求出總數和員工數,輸出部門名稱和平均工資。
Q2DeptNumberAveSalary.java代碼:
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class Q2DeptNumberAveSalary extends Configured implements Tool { 25 26 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27 28 // 用於緩存 dept文件中的數據 29 private Map<String, String> deptMap = new HashMap<String, String>(); 30 private String[] kv; 31 32 // 此方法會在Map方法執行以前執行且執行一次 33 @Override 34 protected void setup(Context context) throws IOException, InterruptedException { 35 BufferedReader in = null; 36 try { 37 // 從當前做業中獲取要緩存的文件 38 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39 String deptIdName = null; 40 for (Path path : paths) { 41 42 // 對部門文件字段進行拆分並緩存到deptMap中 43 if (path.toString().contains("dept")) { 44 in = new BufferedReader(new FileReader(path.toString())); 45 while (null != (deptIdName = in.readLine())) { 46 47 // 對部門文件字段進行拆分並緩存到deptMap中 48 // 其中Map中key爲部門編號,value爲所在部門名稱 49 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 50 } 51 } 52 } 53 } catch (IOException e) { 54 e.printStackTrace(); 55 } finally { 56 try { 57 if (in != null) { 58 in.close(); 59 } 60 } catch (IOException e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 66 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 67 68 // 對員工文件字段進行拆分 69 kv = value.toString().split(","); 70 71 // map join: 在map階段過濾掉不須要的數據,輸出key爲部門名稱和value爲員工工資 72 if (deptMap.containsKey(kv[7])) { 73 if (null != kv[5] && !"".equals(kv[5].toString())) { 74 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 75 } 76 } 77 } 78 } 79 80 public static class Reduce extends Reducer<Text, Text, Text, Text> { 81 82 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 83 84 long sumSalary = 0; 85 int deptNumber = 0; 86 87 // 對同一部門的員工工資進行求和 88 for (Text val : values) { 89 sumSalary += Long.parseLong(val.toString()); 90 deptNumber++; 91 } 92 93 // 輸出key爲部門名稱和value爲該部門員工工資平均值 94 context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber)); 95 } 96 } 97 98 @Override 99 public int run(String[] args) throws Exception { 100 101 // 實例化做業對象,設置做業名稱、Mapper和Reduce類 102 Job job = new Job(getConf(), "Q2DeptNumberAveSalary"); 103 job.setJobName("Q2DeptNumberAveSalary"); 104 job.setJarByClass(Q2DeptNumberAveSalary.class); 105 job.setMapperClass(MapClass.class); 106 job.setReducerClass(Reduce.class); 107 108 // 設置輸入格式類 109 job.setInputFormatClass(TextInputFormat.class); 110 111 // 設置輸出格式類 112 job.setOutputFormatClass(TextOutputFormat.class); 113 job.setOutputKeyClass(Text.class); 114 job.setOutputValueClass(Text.class); 115 116 // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第3個參數爲輸出路徑 117 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 118 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration()); 119 FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 120 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 121 122 job.waitForCompletion(true); 123 return job.isSuccessful() ? 0 : 1; 124 } 125 126 /** 127 * 主方法,執行入口 128 * @param args 輸入參數 129 */ 130 public static void main(String[] args) throws Exception { 131 int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args); 132 System.exit(res); 133 } 134 }
3.2.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q2DeptNumberAveSalary.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q2DeptNumberAveSalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q2DeptNumberAveSalary.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q2DeptNumberAveSalary.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q2DeptNumberAveSalary.jar ./Q2DeptNum*.class
mv *.jar ../..
rm Q2DeptNum*.class
運行Q2DeptNumberAveSalary時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out2
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q2DeptNumberAveSalary.jar Q2DeptNumberAveSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out2
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out2目錄
hadoop fs -ls /class6/out2
hadoop fs -cat /class6/out2/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
ACCOUNTINGDept Number:3,Ave Salary:2916
RESEARCHDept Number:3,Ave Salary:2258
SALES Dept Number:6,Ave Salary:1566
求每一個部門最先進入公司員工姓名,須要獲得各部門全部員工的進入公司日期,經過比較獲取最先進入公司員工姓名。首先和問題1相似在Mapper的Setup階段緩存部門數據,而後Mapper階段抽取出key爲部門名稱(利用緩存部門數據把部門編號對應爲部門名稱),value爲員工姓名和進入公司日期,接着在Shuffle階段把傳過來的數據處理爲部門名稱對應該部門全部員工+進入公司日期的列表,最後在Reduce中按照部門歸組,遍歷部門全部員工,找出最先進入公司的員工並輸出。
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.text.DateFormat; 5 import java.text.ParseException; 6 import java.text.SimpleDateFormat; 7 import java.util.Date; 8 import java.util.HashMap; 9 import java.util.Map; 10 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.conf.Configured; 13 import org.apache.hadoop.filecache.DistributedCache; 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.LongWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.Mapper; 19 import org.apache.hadoop.mapreduce.Reducer; 20 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 21 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 23 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 24 import org.apache.hadoop.util.GenericOptionsParser; 25 import org.apache.hadoop.util.Tool; 26 import org.apache.hadoop.util.ToolRunner; 27 28 public class Q3DeptEarliestEmp extends Configured implements Tool { 29 30 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 31 32 // 用於緩存 dept文件中的數據 33 private Map<String, String> deptMap = new HashMap<String, String>(); 34 private String[] kv; 35 36 // 此方法會在Map方法執行以前執行且執行一次 37 @Override 38 protected void setup(Context context) throws IOException, InterruptedException { 39 BufferedReader in = null; 40 try { 41 // 從當前做業中獲取要緩存的文件 42 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 43 String deptIdName = null; 44 for (Path path : paths) { 45 if (path.toString().contains("dept")) { 46 in = new BufferedReader(new FileReader(path.toString())); 47 while (null != (deptIdName = in.readLine())) { 48 49 // 對部門文件字段進行拆分並緩存到deptMap中 50 // 其中Map中key爲部門編號,value爲所在部門名稱 51 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 52 } 53 } 54 } 55 } catch (IOException e) { 56 e.printStackTrace(); 57 } finally { 58 try { 59 if (in != null) { 60 in.close(); 61 } 62 } catch (IOException e) { 63 e.printStackTrace(); 64 } 65 } 66 } 67 68 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 69 70 // 對員工文件字段進行拆分 71 kv = value.toString().split(","); 72 73 // map join: 在map階段過濾掉不須要的數據 74 // 輸出key爲部門名稱和value爲員工姓名+","+員工進入公司日期 75 if (deptMap.containsKey(kv[7])) { 76 if (null != kv[4] && !"".equals(kv[4].toString())) { 77 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim() + "," + kv[4].trim())); 78 } 79 } 80 } 81 } 82 83 public static class Reduce extends Reducer<Text, Text, Text, Text> { 84 85 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 86 87 // 員工姓名和進入公司日期 88 String empName = null; 89 String empEnterDate = null; 90 91 // 設置日期轉換格式和最先進入公司的員工、日期 92 DateFormat df = new SimpleDateFormat("dd-MM月-yy"); 93 94 Date earliestDate = new Date(); 95 String earliestEmp = null; 96 97 // 遍歷該部門下全部員工,獲得最先進入公司的員工信息 98 for (Text val : values) { 99 empName = val.toString().split(",")[0]; 100 empEnterDate = val.toString().split(",")[1].toString().trim(); 101 try { 102 System.out.println(df.parse(empEnterDate)); 103 if (df.parse(empEnterDate).compareTo(earliestDate) < 0) { 104 earliestDate = df.parse(empEnterDate); 105 earliestEmp = empName; 106 } 107 } catch (ParseException e) { 108 e.printStackTrace(); 109 } 110 } 111 112 // 輸出key爲部門名稱和value爲該部門最先進入公司員工 113 context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter date:" + new SimpleDateFormat("yyyy-MM-dd").format(earliestDate))); 114 } 115 } 116 117 @Override 118 public int run(String[] args) throws Exception { 119 120 // 實例化做業對象,設置做業名稱 121 Job job = new Job(getConf(), "Q3DeptEarliestEmp"); 122 job.setJobName("Q3DeptEarliestEmp"); 123 124 // 設置Mapper和Reduce類 125 job.setJarByClass(Q3DeptEarliestEmp.class); 126 job.setMapperClass(MapClass.class); 127 job.setReducerClass(Reduce.class); 128 129 // 設置輸入格式類 130 job.setInputFormatClass(TextInputFormat.class); 131 132 // 設置輸出格式類 133 job.setOutputFormatClass(TextOutputFormat.class); 134 job.setOutputKeyClass(Text.class); 135 job.setOutputValueClass(Text.class); 136 137 // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第三個參數爲輸出路徑 138 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 139 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration()); 140 FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 141 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 142 143 job.waitForCompletion(true); 144 return job.isSuccessful() ? 0 : 1; 145 } 146 147 /** 148 * 主方法,執行入口 149 * @param args 輸入參數 150 */ 151 public static void main(String[] args) throws Exception { 152 int res = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args); 153 System.exit(res); 154 } 155 }
3.3.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q3DeptEarliestEmp.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q3DeptEarliestEmp.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q3DeptEarliestEmp.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q3DeptEarliestEmp.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q3DeptEarliestEmp.jar ./Q3DeptEar*.class
mv *.jar ../..
rm Q3DeptEar*.class
運行Q3DeptEarliestEmp時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out3
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q3DeptEarliestEmp.jar Q3DeptEarliestEmp hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out3
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out3目錄
hadoop fs -ls /class6/out3
hadoop fs -cat /class6/out3/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
ACCOUNTINGThe earliest emp of dept:CLARK, Enter date:1981-06-09
RESEARCHThe earliest emp of dept:SMITH, Enter date:1980-12-17
SALES The earliest emp of dept:ALLEN, Enter date:1981-02-20
求各個城市員工的總工資,須要獲得各個城市全部員工的工資,經過對各個城市全部員工工資求和獲得總工資。首先和測試例子1相似在Mapper的Setup階段緩存部門對應所在城市數據,而後在Mapper階段抽取出key爲城市名稱(利用緩存數據把部門編號對應爲所在城市名稱),value爲員工工資,接着在Shuffle階段把傳過來的數據處理爲城市名稱對應該城市全部員工工資,最後在Reduce中按照城市歸組,遍歷城市全部員工,求出工資總數並輸出。
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class Q4SumCitySalary extends Configured implements Tool { 25 26 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 27 28 // 用於緩存 dept文件中的數據 29 private Map<String, String> deptMap = new HashMap<String, String>(); 30 private String[] kv; 31 32 // 此方法會在Map方法執行以前執行且執行一次 33 @Override 34 protected void setup(Context context) throws IOException, InterruptedException { 35 BufferedReader in = null; 36 try { 37 // 從當前做業中獲取要緩存的文件 38 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39 String deptIdName = null; 40 for (Path path : paths) { 41 if (path.toString().contains("dept")) { 42 in = new BufferedReader(new FileReader(path.toString())); 43 while (null != (deptIdName = in.readLine())) { 44 45 // 對部門文件字段進行拆分並緩存到deptMap中 46 // 其中Map中key爲部門編號,value爲所在城市名稱 47 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[2]); 48 } 49 } 50 } 51 } catch (IOException e) { 52 e.printStackTrace(); 53 } finally { 54 try { 55 if (in != null) { 56 in.close(); 57 } 58 } catch (IOException e) { 59 e.printStackTrace(); 60 } 61 } 62 } 63 64 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 65 66 // 對員工文件字段進行拆分 67 kv = value.toString().split(","); 68 69 // map join: 在map階段過濾掉不須要的數據,輸出key爲城市名稱和value爲員工工資 70 if (deptMap.containsKey(kv[7])) { 71 if (null != kv[5] && !"".equals(kv[5].toString())) { 72 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); 73 } 74 } 75 } 76 } 77 78 public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { 79 80 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 81 82 // 對同一城市的員工工資進行求和 83 long sumSalary = 0; 84 for (Text val : values) { 85 sumSalary += Long.parseLong(val.toString()); 86 } 87 88 // 輸出key爲城市名稱和value爲該城市工資總和 89 context.write(key, new LongWritable(sumSalary)); 90 } 91 } 92 93 @Override 94 public int run(String[] args) throws Exception { 95 96 // 實例化做業對象,設置做業名稱 97 Job job = new Job(getConf(), "Q4SumCitySalary"); 98 job.setJobName("Q4SumCitySalary"); 99 100 // 設置Mapper和Reduce類 101 job.setJarByClass(Q4SumCitySalary.class); 102 job.setMapperClass(MapClass.class); 103 job.setReducerClass(Reduce.class); 104 105 // 設置輸入格式類 106 job.setInputFormatClass(TextInputFormat.class); 107 108 // 設置輸出格式類 109 job.setOutputFormatClass(TextOutputFormat.class); 110 job.setOutputKeyClass(Text.class); 111 job.setOutputValueClass(Text.class); 112 113 // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第3個參數爲輸出路徑 114 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 115 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration()); 116 FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 117 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 118 119 job.waitForCompletion(true); 120 return job.isSuccessful() ? 0 : 1; 121 } 122 123 /** 124 * 主方法,執行入口 125 * @param args 輸入參數 126 */ 127 public static void main(String[] args) throws Exception { 128 int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args); 129 System.exit(res); 130 } 131 }
3.4.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q4SumCitySalary.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q4SumCitySalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q4SumCitySalary.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q4SumCitySalary.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q4SumCitySalary.jar ./Q4SumCity*.class
mv *.jar ../..
rm Q4SumCity*.class
運行Q4SumCitySalary時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out4
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q4SumCitySalary.jar Q4SumCitySalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out4
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out4目錄
hadoop fs -ls /class6/out4
hadoop fs -cat /class6/out4/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
CHICAGO 9400
DALLAS 6775
NEW YORK 8750
求工資比上司高的員工姓名及工資,須要獲得上司工資及上司全部下屬員工,經過比較他們工資高低獲得比上司工資高的員工。在Mapper階段輸出經理數據和員工對應經理表數據,其中經理數據key爲員工編號、value爲"M,該員工工資",員工對應經理表數據key爲經理編號、value爲"E,該員工姓名,該員工工資";而後在Shuffle階段把傳過來的經理數據和員工對應經理表數據進行歸組,如編號爲7698員工,value中標誌M爲本身工資,value中標誌E爲其下屬姓名及工資;最後在Reduce中遍歷比較員工與經理工資高低,輸出工資高於經理的員工。
1 import java.io.IOException; 2 import java.util.HashMap; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.conf.Configured; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.GenericOptionsParser; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 public class Q5EarnMoreThanManager extends Configured implements Tool { 21 22 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 23 24 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 25 26 // 對員工文件字段進行拆分 27 String[] kv = value.toString().split(","); 28 29 // 輸出經理表數據,其中key爲員工編號和value爲M+該員工工資 30 context.write(new Text(kv[0].toString()), new Text("M," + kv[5])); 31 32 // 輸出員工對應經理表數據,其中key爲經理編號和value爲(E,該員工姓名,該員工工資) 33 if (null != kv[3] && !"".equals(kv[3].toString())) { 34 context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5])); 35 } 36 } 37 } 38 39 public static class Reduce extends Reducer<Text, Text, Text, Text> { 40 41 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 42 43 // 定義員工姓名、工資和存放部門員工Map 44 String empName; 45 long empSalary = 0; 46 HashMap<String, Long> empMap = new HashMap<String, Long>(); 47 48 // 定義經理工資變量 49 long mgrSalary = 0; 50 51 for (Text val : values) { 52 if (val.toString().startsWith("E")) { 53 // 當是員工標示時,獲取該員工對應的姓名和工資並放入Map中 54 empName = val.toString().split(",")[1]; 55 empSalary = Long.parseLong(val.toString().split(",")[2]); 56 empMap.put(empName, empSalary); 57 } else { 58 // 當時經理標誌時,獲取該經理工資 59 mgrSalary = Long.parseLong(val.toString().split(",")[1]); 60 } 61 } 62 63 // 遍歷該經理下屬,比較員工與經理工資高低,輸出工資高於經理的員工 64 for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) { 65 if (entry.getValue() > mgrSalary) { 66 context.write(new Text(entry.getKey()), new Text("" + entry.getValue())); 67 } 68 } 69 } 70 } 71 72 @Override 73 public int run(String[] args) throws Exception { 74 75 // 實例化做業對象,設置做業名稱 76 Job job = new Job(getConf(), "Q5EarnMoreThanManager"); 77 job.setJobName("Q5EarnMoreThanManager"); 78 79 // 設置Mapper和Reduce類 80 job.setJarByClass(Q5EarnMoreThanManager.class); 81 job.setMapperClass(MapClass.class); 82 job.setReducerClass(Reduce.class); 83 84 // 設置輸入格式類 85 job.setInputFormatClass(TextInputFormat.class); 86 87 // 設置輸出格式類 88 job.setOutputFormatClass(TextOutputFormat.class); 89 job.setOutputKeyClass(Text.class); 90 job.setOutputValueClass(Text.class); 91 92 // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑 93 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 94 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 95 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 96 97 job.waitForCompletion(true); 98 return job.isSuccessful() ? 0 : 1; 99 } 100 101 /** 102 * 主方法,執行入口 103 * @param args 輸入參數 104 */ 105 public static void main(String[] args) throws Exception { 106 int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args); 107 System.exit(res); 108 } 109 }
3.5.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q5EarnMoreThanManager.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q5EarnMoreThanManager.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q5EarnMoreThanManager.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q5EarnMoreThanManager.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q5EarnMoreThanManager.jar ./Q5EarnMore*.class
mv *.jar ../..
rm Q5EarnMore*.class
運行Q5EarnMoreThanManager運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out5
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q5EarnMoreThanManager.jar Q5EarnMoreThanManager hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out5
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out5目錄
hadoop fs -ls /class6/out5
hadoop fs -cat /class6/out5/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
FORD 3000
求工資比公司平均工資要高的員工姓名及工資,須要獲得公司的平均工資和全部員工工資,經過比較得出工資比平均工資高的員工姓名及工資。這個問題能夠分兩個做業進行解決,先求出公司的平均工資,而後與全部員工進行比較獲得結果;也能夠在一個做業進行解決,這裏就得使用做業setNumReduceTasks方法,設置Reduce任務數爲1,保證每次運行一個reduce任務,從而能先求出平均工資,而後進行比較得出結果。
在Mapper階段輸出兩份全部員工數據,其中一份key爲0、value爲該員工工資,另一份key爲0、value爲"該員工姓名 ,員工工資";而後在Shuffle階段把傳過來數據按照key進行歸組,在該任務中有key值爲0和1兩組數據;最後在Reduce中對key值0的全部員工求工資總數和員工數,得到平均工資;對key值1,比較員工與平均工資的大小,輸出比平均工資高的員工和對應的工資。
1 import java.io.IOException; 2 import org.apache.hadoop.conf.Configuration; 3 import org.apache.hadoop.conf.Configured; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 14 import org.apache.hadoop.util.GenericOptionsParser; 15 import org.apache.hadoop.util.Tool; 16 import org.apache.hadoop.util.ToolRunner; 17 18 public class Q6HigherThanAveSalary extends Configured implements Tool { 19 20 public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 21 22 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 23 24 // 對員工文件字段進行拆分 25 String[] kv = value.toString().split(","); 26 27 // 獲取全部員工數據,其中key爲0和value爲該員工工資 28 context.write(new IntWritable(0), new Text(kv[5])); 29 30 // 獲取全部員工數據,其中key爲0和value爲(該員工姓名 ,員工工資) 31 context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5])); 32 } 33 } 34 35 public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { 36 37 // 定義員工工資、員工數和平均工資 38 private long allSalary = 0; 39 private int allEmpCount = 0; 40 private long aveSalary = 0; 41 42 // 定義員工工資變量 43 private long empSalary = 0; 44 45 public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 46 47 for (Text val : values) { 48 if (0 == key.get()) { 49 // 獲取全部員工工資和員工數 50 allSalary += Long.parseLong(val.toString()); 51 allEmpCount++; 52 System.out.println("allEmpCount = " + allEmpCount); 53 } else if (1 == key.get()) { 54 if (aveSalary == 0) { 55 aveSalary = allSalary / allEmpCount; 56 context.write(new Text("Average Salary = "), new Text("" + aveSalary)); 57 context.write(new Text("Following employees have salarys higher than Average:"), new Text("")); 58 } 59 60 // 獲取員工的平均工資 61 System.out.println("Employee salary = " + val.toString()); 62 aveSalary = allSalary / allEmpCount; 63 64 // 比較員工與平均工資的大小,輸出比平均工資高的員工和對應的工資 65 empSalary = Long.parseLong(val.toString().split(",")[1]); 66 if (empSalary > aveSalary) { 67 context.write(new Text(val.toString().split(",")[0]), new Text("" + empSalary)); 68 } 69 } 70 } 71 } 72 } 73 74 @Override 75 public int run(String[] args) throws Exception { 76 77 // 實例化做業對象,設置做業名稱 78 Job job = new Job(getConf(), "Q6HigherThanAveSalary"); 79 job.setJobName("Q6HigherThanAveSalary"); 80 81 // 設置Mapper和Reduce類 82 job.setJarByClass(Q6HigherThanAveSalary.class); 83 job.setMapperClass(MapClass.class); 84 job.setReducerClass(Reduce.class); 85 86 // 必須設置Reduce任務數爲1 # -D mapred.reduce.tasks = 1 87 // 這是該做業設置的核心,這樣纔可以保證各reduce是串行的 88 job.setNumReduceTasks(1); 89 90 // 設置輸出格式類 91 job.setMapOutputKeyClass(IntWritable.class); 92 job.setMapOutputValueClass(Text.class); 93 94 // 設置輸出鍵和值類型 95 job.setOutputFormatClass(TextOutputFormat.class); 96 job.setOutputKeyClass(Text.class); 97 job.setOutputValueClass(LongWritable.class); 98 99 // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑 100 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 101 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 102 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 103 104 job.waitForCompletion(true); 105 return job.isSuccessful() ? 0 : 1; 106 } 107 108 /** 109 * 主方法,執行入口 110 * @param args 輸入參數 111 */ 112 public static void main(String[] args) throws Exception { 113 int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args); 114 System.exit(res); 115 } 116 }
3.6.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q5EarnMoreThanManager.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q6HigherThanAveSalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q6HigherThanAveSalary.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q6HigherThanAveSalary.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q6HigherThanAveSalary.jar ./Q6HigherThan*.class
mv *.jar ../..
rm Q6HigherThan*.class
運行Q6HigherThanAveSalary運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out6
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q6HigherThanAveSalary.jar Q6HigherThanAveSalary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out6
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out6目錄
hadoop fs -ls /class6/out6
hadoop fs -cat /class6/out6/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
Average Salary = 2077
Following employees have salarys higher than Average:
FORD 3000
CLARK2450
KING 5000
JONES2975
BLAKE 2850
求名字以J開頭的員工姓名機器所屬部門名稱,只需判斷員工姓名是否以J開頭。首先和問題1相似在Mapper的Setup階段緩存部門數據,而後在Mapper階段判斷員工姓名是否以J開頭,若是是抽取出員工姓名和員工所在部門編號,利用緩存部門數據把部門編號對應爲部門名稱,轉換後輸出結果。
1 import java.io.BufferedReader; 2 import java.io.FileReader; 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.filecache.DistributedCache; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.util.GenericOptionsParser; 20 import org.apache.hadoop.util.Tool; 21 import org.apache.hadoop.util.ToolRunner; 22 23 public class Q7NameDeptOfStartJ extends Configured implements Tool { 24 25 public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { 26 27 // 用於緩存 dept文件中的數據 28 private Map<String, String> deptMap = new HashMap<String, String>(); 29 private String[] kv; 30 31 // 此方法會在Map方法執行以前執行且執行一次 32 @Override 33 protected void setup(Context context) throws IOException, InterruptedException { 34 BufferedReader in = null; 35 try { 36 37 // 從當前做業中獲取要緩存的文件 38 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 39 String deptIdName = null; 40 for (Path path : paths) { 41 42 // 對部門文件字段進行拆分並緩存到deptMap中 43 if (path.toString().contains("dept")) { 44 in = new BufferedReader(new FileReader(path.toString())); 45 while (null != (deptIdName = in.readLine())) { 46 47 // 對部門文件字段進行拆分並緩存到deptMap中 48 // 其中Map中key爲部門編號,value爲所在部門名稱 49 deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]); 50 } 51 } 52 } 53 } catch (IOException e) { 54 e.printStackTrace(); 55 } finally { 56 try { 57 if (in != null) { 58 in.close(); 59 } 60 } catch (IOException e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 66 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 67 68 // 對員工文件字段進行拆分 69 kv = value.toString().split(","); 70 71 // 輸出員工姓名爲J開頭的員工信息,key爲員工姓名和value爲員工所在部門名稱 72 if (kv[1].toString().trim().startsWith("J")) { 73 context.write(new Text(kv[1].trim()), new Text(deptMap.get(kv[7].trim()))); 74 } 75 } 76 } 77 78 @Override 79 public int run(String[] args) throws Exception { 80 81 // 實例化做業對象,設置做業名稱 82 Job job = new Job(getConf(), "Q7NameDeptOfStartJ"); 83 job.setJobName("Q7NameDeptOfStartJ"); 84 85 // 設置Mapper和Reduce類 86 job.setJarByClass(Q7NameDeptOfStartJ.class); 87 job.setMapperClass(MapClass.class); 88 89 // 設置輸入格式類 90 job.setInputFormatClass(TextInputFormat.class); 91 92 // 設置輸出格式類 93 job.setOutputFormatClass(TextOutputFormat.class); 94 job.setOutputKeyClass(Text.class); 95 job.setOutputValueClass(Text.class); 96 97 // 第1個參數爲緩存的部門數據路徑、第2個參數爲員工數據路徑和第3個參數爲輸出路徑 98 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 99 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration()); 100 FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 101 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 102 103 job.waitForCompletion(true); 104 return job.isSuccessful() ? 0 : 1; 105 } 106 107 /** 108 * 主方法,執行入口 109 * @param args 輸入參數 110 */ 111 public static void main(String[] args) throws Exception { 112 int res = ToolRunner.run(new Configuration(), new Q7NameDeptOfStartJ(), args); 113 System.exit(res); 114 } 115 }
3.7.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q7NameDeptOfStartJ.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q7NameDeptOfStartJ.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q7NameDeptOfStartJ.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q7NameDeptOfStartJ.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q7NameDeptOfStartJ.jar ./Q7NameDept*.class
mv *.jar ../..
rm Q7NameDept*.class
運行Q7NameDeptOfStartJ時須要輸入部門數據路徑、員工數據路徑和輸出路徑三個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 部門數據路徑:hdfs://hadoop:9000/class6/input/dept,部門數據將緩存在各運行任務的節點內容中,能夠提供處理的效率
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out7
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q7NameDeptOfStartJ.jar Q7NameDeptOfStartJ hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out7
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out7目錄
hadoop fs -ls /class6/out7
hadoop fs -cat /class6/out7/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
JAMESSALES
JONESRESEARCH
求工資最高的頭三名員工姓名及工資,能夠經過冒泡法獲得。在Mapper階段輸出經理數據和員工對應經理表數據,其中經理數據key爲0值、value爲"員工姓名,員工工資";最後在Reduce中經過冒泡法遍歷全部員工,比較員工工資多少,求出前三名。
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.GenericOptionsParser; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 public class Q8SalaryTop3Salary extends Configured implements Tool { 21 22 public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 23 24 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 25 26 // 對員工文件字段進行拆分 27 String[] kv = value.toString().split(","); 28 29 // 輸出key爲0和value爲員工姓名+","+員工工資 30 context.write(new IntWritable(0), new Text(kv[1].trim() + "," + kv[5].trim())); 31 } 32 } 33 34 public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { 35 36 public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 37 38 // 定義工資前三員工姓名 39 String empName; 40 String firstEmpName = ""; 41 String secondEmpName = ""; 42 String thirdEmpName = ""; 43 44 // 定義工資前三工資 45 long empSalary = 0; 46 long firstEmpSalary = 0; 47 long secondEmpSalary = 0; 48 long thirdEmpSalary = 0; 49 50 // 經過冒泡法遍歷全部員工,比較員工工資多少,求出前三名 51 for (Text val : values) { 52 empName = val.toString().split(",")[0]; 53 empSalary = Long.parseLong(val.toString().split(",")[1]); 54 55 if(empSalary > firstEmpSalary) { 56 thirdEmpName = secondEmpName; 57 thirdEmpSalary = secondEmpSalary; 58 secondEmpName = firstEmpName; 59 secondEmpSalary = firstEmpSalary; 60 firstEmpName = empName; 61 firstEmpSalary = empSalary; 62 } else if (empSalary > secondEmpSalary) { 63 thirdEmpName = secondEmpName; 64 thirdEmpSalary = secondEmpSalary; 65 secondEmpName = empName; 66 secondEmpSalary = empSalary; 67 } else if (empSalary > thirdEmpSalary) { 68 thirdEmpName = empName; 69 thirdEmpSalary = empSalary; 70 } 71 } 72 73 // 輸出工資前三名信息 74 context.write(new Text( "First employee name:" + firstEmpName), new Text("Salary:" + firstEmpSalary)); 75 context.write(new Text( "Second employee name:" + secondEmpName), new Text("Salary:" + secondEmpSalary)); 76 context.write(new Text( "Third employee name:" + thirdEmpName), new Text("Salary:" + thirdEmpSalary)); 77 } 78 } 79 80 @Override 81 public int run(String[] args) throws Exception { 82 83 // 實例化做業對象,設置做業名稱 84 Job job = new Job(getConf(), "Q8SalaryTop3Salary"); 85 job.setJobName("Q8SalaryTop3Salary"); 86 87 // 設置Mapper和Reduce類 88 job.setJarByClass(Q8SalaryTop3Salary.class); 89 job.setMapperClass(MapClass.class); 90 job.setReducerClass(Reduce.class); 91 job.setMapOutputKeyClass(IntWritable.class); 92 job.setMapOutputValueClass(Text.class); 93 94 // 設置輸入格式類 95 job.setInputFormatClass(TextInputFormat.class); 96 97 // 設置輸出格式類 98 job.setOutputKeyClass(Text.class); 99 job.setOutputFormatClass(TextOutputFormat.class); 100 job.setOutputValueClass(Text.class); 101 102 // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑 103 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 104 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 105 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 106 107 job.waitForCompletion(true); 108 return job.isSuccessful() ? 0 : 1; 109 } 110 111 /** 112 * 主方法,執行入口 113 * @param args 輸入參數 114 */ 115 public static void main(String[] args) throws Exception { 116 int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args); 117 System.exit(res); 118 } 119 }
3.8.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q8SalaryTop3Salary.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q8SalaryTop3Salary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q8SalaryTop3Salary.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q8SalaryTop3Salary.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q8SalaryTop3Salary.jar ./Q8SalaryTop3*.class
mv *.jar ../..
rm Q8SalaryTop3*.class
運行Q8SalaryTop3Salary運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out8
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q8SalaryTop3Salary.jar Q8SalaryTop3Salary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out8
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out8目錄
hadoop fs -ls /class6/out8
hadoop fs -cat /class6/out8/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
First employee name:KING Salary:5000
Second employee name:FORD Salary:3000
Third employee name:JONESSalary:2975
求全體員工總收入降序排列,得到全部員工總收入並降序排列便可。在Mapper階段輸出全部員工總工資數據,其中key爲員工總工資、value爲員工姓名,在Mapper階段的最後會先調用job.setPartitionerClass對數據進行分區,每一個分區映射到一個reducer,每一個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。因爲在本做業中Map的key只有0值,故能實現對全部數據進行排序。
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.io.WritableComparable; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 14 import org.apache.hadoop.util.GenericOptionsParser; 15 import org.apache.hadoop.util.Tool; 16 import org.apache.hadoop.util.ToolRunner; 17 18 public class Q9EmpSalarySort extends Configured implements Tool { 19 20 public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 21 22 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 23 24 // 對員工文件字段進行拆分 25 String[] kv = value.toString().split(","); 26 27 // 輸出key爲員工全部工資和value爲員工姓名 28 int empAllSalary = "".equals(kv[6]) ? Integer.parseInt(kv[5]) : Integer.parseInt(kv[5]) + Integer.parseInt(kv[6]); 29 context.write(new IntWritable(empAllSalary), new Text(kv[1])); 30 } 31 } 32 33 /** 34 * 遞減排序算法 35 */ 36 public static class DecreaseComparator extends IntWritable.Comparator { 37 public int compare(WritableComparable a, WritableComparable b) { 38 return -super.compare(a, b); 39 } 40 41 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 42 return -super.compare(b1, s1, l1, b2, s2, l2); 43 } 44 } 45 46 @Override 47 public int run(String[] args) throws Exception { 48 49 // 實例化做業對象,設置做業名稱 50 Job job = new Job(getConf(), "Q9EmpSalarySort"); 51 job.setJobName("Q9EmpSalarySort"); 52 53 // 設置Mapper和Reduce類 54 job.setJarByClass(Q9EmpSalarySort.class); 55 job.setMapperClass(MapClass.class); 56 57 // 設置輸出格式類 58 job.setMapOutputKeyClass(IntWritable.class); 59 job.setMapOutputValueClass(Text.class); 60 job.setSortComparatorClass(DecreaseComparator.class); 61 62 // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑 63 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 64 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 65 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 66 67 job.waitForCompletion(true); 68 return job.isSuccessful() ? 0 : 1; 69 } 70 71 /** 72 * 主方法,執行入口 73 * @param args 輸入參數 74 */ 75 public static void main(String[] args) throws Exception { 76 int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args); 77 System.exit(res); 78 } 79 }
3.9.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q9EmpSalarySort.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q9EmpSalarySort.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q9EmpSalarySort.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q9EmpSalarySort.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q9EmpSalarySort.jar ./Q9EmpSalary*.class
mv *.jar ../..
rm Q9EmpSalary*.class
運行Q9EmpSalarySort運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out9
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q9EmpSalarySort.jar Q9EmpSalarySort hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out9
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out9目錄
hadoop fs -ls /class6/out9
hadoop fs -cat /class6/out9/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
5000 KING
3000 FORD
2975 JONES
2850 BLAKE
......
該公司全部員工能夠造成入下圖的樹形結構,求兩個員工的溝通的中間節點數,可轉換在員工樹中求兩個節點連通所通過的節點數,即從其中一節點到匯合節點通過節點數加上另外一節點到匯合節點通過節點數。例如求M到Q所需節點數,能夠先找出M到A通過的節點數,而後找出Q到A通過的節點數,二者相加獲得M到Q所需節點數。
在做業中首先在Mapper階段全部員工數據,其中經理數據key爲0值、value爲"員工編號,員工經理編號",而後在Reduce階段把全部員工放到員工列表和員工對應經理鏈表Map中,最後在Reduce的Cleanup中按照上面說所算法對任意兩個員工計算出溝通的路徑長度並輸出。
1 import java.io.IOException; 2 import java.util.ArrayList; 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.conf.Configured; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.IntWritable; 11 import org.apache.hadoop.io.LongWritable; 12 import org.apache.hadoop.io.NullWritable; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.mapreduce.Job; 15 import org.apache.hadoop.mapreduce.Mapper; 16 import org.apache.hadoop.mapreduce.Reducer; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 import org.apache.hadoop.util.Tool; 22 import org.apache.hadoop.util.ToolRunner; 23 24 public class Q10MiddlePersonsCountForComm extends Configured implements Tool { 25 26 public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { 27 28 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 29 30 // 對員工文件字段進行拆分 31 String[] kv = value.toString().split(","); 32 33 // 輸出key爲0和value爲員工編號+","+員工經理編號 34 context.write(new IntWritable(0), new Text(kv[0] + "," + ("".equals(kv[3]) ? " " : kv[3]))); 35 } 36 } 37 38 public static class Reduce extends Reducer<IntWritable, Text, NullWritable, Text> { 39 40 // 定義員工列表和員工對應經理Map 41 List<String> employeeList = new ArrayList<String>(); 42 Map<String, String> employeeToManagerMap = new HashMap<String, String>(); 43 44 public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 45 46 // 在reduce階段把全部員工放到員工列表和員工對應經理Map中 47 for (Text value : values) { 48 employeeList.add(value.toString().split(",")[0].trim()); 49 employeeToManagerMap.put(value.toString().split(",")[0].trim(), value.toString().split(",")[1].trim()); 50 } 51 } 52 53 @Override 54 protected void cleanup(Context context) throws IOException, InterruptedException { 55 int totalEmployee = employeeList.size(); 56 int i, j; 57 int distance; 58 System.out.println(employeeList); 59 System.out.println(employeeToManagerMap); 60 61 // 對任意兩個員工計算出溝通的路徑長度並輸出 62 for (i = 0; i < (totalEmployee - 1); i++) { 63 for (j = (i + 1); j < totalEmployee; j++) { 64 distance = calculateDistance(i, j); 65 String value = employeeList.get(i) + " and " + employeeList.get(j) + " = " + distance; 66 context.write(NullWritable.get(), new Text(value)); 67 } 68 } 69 } 70 71 /** 72 * 該公司能夠由全部員工造成樹形結構,求兩個員工的溝通的中間節點數,能夠轉換在員工樹中兩員工之間的距離 73 * 因爲在樹中任意兩點都會在某上級節點匯合,根據該狀況設計了以下算法 74 */ 75 private int calculateDistance(int i, int j) { 76 String employeeA = employeeList.get(i); 77 String employeeB = employeeList.get(j); 78 int distance = 0; 79 80 // 若是A是B的經理,反之亦然 81 if (employeeToManagerMap.get(employeeA).equals(employeeB) || employeeToManagerMap.get(employeeB).equals(employeeA)) { 82 distance = 0; 83 } 84 // A和B在同一經理下 85 else if (employeeToManagerMap.get(employeeA).equals( 86 employeeToManagerMap.get(employeeB))) { 87 distance = 0; 88 } else { 89 // 定義A和B對應經理鏈表 90 List<String> employeeA_ManagerList = new ArrayList<String>(); 91 List<String> employeeB_ManagerList = new ArrayList<String>(); 92 93 // 獲取從A開始經理鏈表 94 employeeA_ManagerList.add(employeeA); 95 String current = employeeA; 96 while (false == employeeToManagerMap.get(current).isEmpty()) { 97 current = employeeToManagerMap.get(current); 98 employeeA_ManagerList.add(current); 99 } 100 101 // 獲取從B開始經理鏈表 102 employeeB_ManagerList.add(employeeB); 103 current = employeeB; 104 while (false == employeeToManagerMap.get(current).isEmpty()) { 105 current = employeeToManagerMap.get(current); 106 employeeB_ManagerList.add(current); 107 } 108 109 int ii = 0, jj = 0; 110 String currentA_manager, currentB_manager; 111 boolean found = false; 112 113 // 遍歷A與B開始經理鏈表,找出匯合點計算 114 for (ii = 0; ii < employeeA_ManagerList.size(); ii++) { 115 currentA_manager = employeeA_ManagerList.get(ii); 116 for (jj = 0; jj < employeeB_ManagerList.size(); jj++) { 117 currentB_manager = employeeB_ManagerList.get(jj); 118 if (currentA_manager.equals(currentB_manager)) { 119 found = true; 120 break; 121 } 122 } 123 124 if (found) { 125 break; 126 } 127 } 128 129 // 最後獲取兩隻以前的路徑 130 distance = ii + jj - 1; 131 } 132 133 return distance; 134 } 135 } 136 137 @Override 138 public int run(String[] args) throws Exception { 139 140 // 實例化做業對象,設置做業名稱 141 Job job = new Job(getConf(), "Q10MiddlePersonsCountForComm"); 142 job.setJobName("Q10MiddlePersonsCountForComm"); 143 144 // 設置Mapper和Reduce類 145 job.setJarByClass(Q10MiddlePersonsCountForComm.class); 146 job.setMapperClass(MapClass.class); 147 job.setReducerClass(Reduce.class); 148 149 // 設置Mapper輸出格式類 150 job.setMapOutputKeyClass(IntWritable.class); 151 job.setMapOutputValueClass(Text.class); 152 153 // 設置Reduce輸出鍵和值類型 154 job.setOutputFormatClass(TextOutputFormat.class); 155 job.setOutputKeyClass(NullWritable.class); 156 job.setOutputValueClass(Text.class); 157 158 // 第1個參數爲員工數據路徑和第2個參數爲輸出路徑 159 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); 160 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 161 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 162 163 job.waitForCompletion(true); 164 return job.isSuccessful() ? 0 : 1; 165 } 166 167 /** 168 * 主方法,執行入口 169 * @param args 輸入參數 170 */ 171 public static void main(String[] args) throws Exception { 172 int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args); 173 System.exit(res); 174 } 175 }
3.10.4 編譯並打包代碼
進入/app/hadoop-1.1.2/myclass/class6目錄中新建Q10MiddlePersonsCountForComm.java程序代碼(代碼頁可使用/home/shiyanlou/install-pack/class6/Q10MiddlePersonsCountForComm.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q10MiddlePersonsCountForComm.java
編譯代碼
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q10MiddlePersonsCountForComm.java
把編譯好的代碼打成jar包,若是不打成jar形式運行會提示class沒法找到的錯誤
jar cvf ./Q10MiddlePersonsCountForComm.jar ./Q10MiddlePersons*.class
mv *.jar ../..
rm Q10MiddlePersons*.class
運行Q10MiddlePersonsCountForComm運行的員工數據路徑和輸出路徑兩個參數,須要注意的是hdfs的路徑參數路徑須要全路徑,不然運行會報錯:
l 員工數據路徑:hdfs://hadoop:9000/class6/input/emp
l 輸出路徑:hdfs://hadoop:9000/class6/out10
運行以下命令:
cd /app/hadoop-1.1.2
hadoop jar Q10MiddlePersonsCountForComm.jar Q10MiddlePersonsCountForComm hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out10
運行成功後,刷新CentOS HDFS中的輸出路徑/class6/out10目錄
hadoop fs -ls /class6/out10
hadoop fs -cat /class6/out10/part-r-00000
打開part-r-00000文件,能夠看到運行結果:
7369 and 7499 = 4
7369 and 7521 = 4
7369 and 7566 = 1
7369 and 7654 = 4
7369 and 7698 = 3
......