本文轉載於:http://blog.csdn.net/xyilu/article/details/8996204java
建表語句:web
create table if not exists m_ys_lab_jointest_a ( id bigint, name string ) row format delimited fields terminated by '9' lines terminated by '10' stored as textfile;
具體數據以下:apache
id name
1 北京 2 天津 3 河北 4 山西 5 內蒙古 6 遼寧 7 吉林 8 黑龍江 |
create table if not exists m_ys_lab_jointest_b ( id bigint, statyear bigint, num bigint ) row format delimited fields terminated by '9' lines terminated by '10' stored as textfile;
具體數據以下:數組
id statyear num
1 2010 1962
1 2011 2019
2 2010 1299
2 2011 1355
4 2010 3574
4 2011 3593
9 2010 2303
9 2011 2347 |
id name statyear num 1 北京 2011 2019 1 北京 2010 1962 2 天津 2011 1355 2 天津 2010 1299 4 山西 2011 3593 4 山西 2010 3574 |
整個計算過程是:app
上代碼:oop
1 import java.io.IOException; 2 import java.util.ArrayList; 3 import java.util.Iterator; 4 import java.util.List; 5 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.FileSplit; 9 import org.apache.hadoop.mapred.JobConf; 10 import org.apache.hadoop.mapred.MapReduceBase; 11 import org.apache.hadoop.mapred.Mapper; 12 import org.apache.hadoop.mapred.OutputCollector; 13 import org.apache.hadoop.mapred.Reducer; 14 import org.apache.hadoop.mapred.Reporter; 15 16 /** 17 * MapReduce實現Join操做 18 */ 19 public class MapRedJoin { 20 public static final String DELIMITER = "\u0009"; // 字段分隔符 21 22 // map過程 23 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { 24 public void configure(JobConf job) { 25 super.configure(job); 26 } 27 28 public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, ClassCastException { 29 // 獲取輸入文件的全路徑和名稱 30 String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString(); 31 // 獲取記錄字符串 32 String line = value.toString(); 33 // 拋棄空記錄 34 if (line == null || line.equals("")){ 35 return; 36 } 37 // 處理來自表A的記錄 38 if (filePath.contains("m_ys_lab_jointest_a")) { 39 String[] values = line.split(DELIMITER); // 按分隔符分割出字段 40 if (values.length < 2){ 41 return; 42 } 43 String id = values[0]; // id 44 String name = values[1]; // name 45 output.collect(new Text(id), new Text("a#"+name)); 46 } else if (filePath.contains("m_ys_lab_jointest_b")) {// 處理來自表B的記錄 47 String[] values = line.split(DELIMITER); // 按分隔符分割出字段 48 if (values.length < 3){ 49 return; 50 } 51 String id = values[0]; // id 52 String statyear = values[1]; // statyear 53 String num = values[2]; //num 54 output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num)); 55 } 56 } 57 } 58 59 // reduce過程 60 public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { 61 public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 62 List<String> listA = new ArrayList<String>(); // 存放來自表A的值 63 List<String> listB = new ArrayList<String>(); // 存放來自表B的值 64 while (values.hasNext()) { 65 String value = values.next().toString(); 66 if (value.startsWith("a#")) { 67 listA.add(value.substring(2)); 68 } else if (value.startsWith("b#")) { 69 listB.add(value.substring(2)); 70 } 71 } 72 int sizeA = listA.size(); 73 int sizeB = listB.size(); 74 // 遍歷兩個向量 75 int i, j; 76 for (i = 0; i < sizeA; i ++) { 77 for (j = 0; j < sizeB; j ++) { 78 output.collect(key, new Text(listA.get(i) + DELIMITER +listB.get(j))); 79 } 80 } 81 } 82 } 83 84 protected void configJob(JobConf conf) { 85 conf.setMapOutputKeyClass(Text.class); 86 conf.setMapOutputValueClass(Text.class); 87 conf.setOutputKeyClass(Text.class); 88 conf.setOutputValueClass(Text.class); 89 conf.setOutputFormat(ReportOutFormat.class); 90 } 91 }