我在學習hadoop, 在看 陸嘉恆編著的hadoop實戰,其中有單錶鏈接的程序,我如今整理一下思路。這個問題是課本上的例子。java
給出 child-parent 表, 要求輸出 grandchild-grandparent 表
apache
樣例輸入:數組
child parentapp
Tom Lucyide
Tom Jackoop
Jone Lucy學習
Jone Jackspa
Lucy Maryorm
Lucy Ben token
Jack Alice
Jack Jesee
Terry Alice
Terry Jesee
Philip Terry
Philip Alma
Mark Terry
Mark Alma
樣例輸出:
grandChildgrandParent
TomAlice
TomJesee
JoneAlice
JoneJesee
TomMary
TomBen
JoneMary
JoneBen
PhilipAlice
PhilipJesee
MarkAlice
MarkJesee
其實這個案例只要想通了裏面的關鍵,仍是很簡單的。
解題思路: 進行單錶鏈接
從樣例輸入文件中,咱們能夠看到 child--parent(child)--parent ,經過這樣鏈接就會找出 grandchild -- grandparent
如:
child parent
Tom Lucy
Tom Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesee
這樣咱們能夠很容易的找到下面的關係:
grandchild grandparent
Tom Mary
Tom Ben
Tom Alice
Tom Jesee
咱們能夠這樣鏈接:
表一: 表二:
child parent child parent
Tom Lucy Lucy Mary
Lucy Ben
Tom Jack Jack Alice
Jack Jesee
咱們能夠將表一和表二進行鏈接,而後去掉 表一的第二列 和表二的第一列, 剩下的就是 結果了。
這裏咱們能夠看到 ,其實表一和表二是一個表,這就是單錶鏈接
這裏能夠將將這個表設置爲左表和右表
Map 階段:
將讀入的數據 分割爲child 和 parent , 爲了區分左右表能夠在 輸出的value 裏面加上標記左右表的信息, 左表 將 parent 做爲key , 左表標記+child 做爲 value 爲map輸出, 右表 child 做爲key ,右表標記+parent 做爲value 爲輸出。
在Map 階段完成了左右表的劃分,在shuffle 階段完成了左右錶鏈接
Reduce 階段:
(相同key 的 會匯聚在一塊兒,如 <Lucy ,<leftTag:Tom , rightTag:Mary , rightTag:Ben> >)
像這樣在Reduce 階段收到的結果中,每一個key的value-list 中就包含了grandchild (left:Tom)和grandparnet (rightTag : Mary , rgihtTag : Ben)的關係,而後將value解析, 有leftTag標記的存入 grandChild[] 數組中,將有rightTag 標記的 存入 grandParent[] 數組中,而後對 grandChild[] 和grandParent[] 求笛卡爾積 便可
下面是程序代碼:
package cn.edu.ytu.botao.singletablejoin; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** * * 單錶鏈接 * * child parent * Tom Lucy * Tom Jack * Lucy Mary * Lucy Ben * * 左表 : 反向輸出 <key parent, value chlid> * Lucy Tom * Jack Tom * * 右表 正向輸出 <key child, value parent> * Lucy Mary * Lucy Ben * * 鏈接後: * * <Tom, <Mary , Ben> > * * @author botao * */ public class STjoin { private static int time = 0; public static class STJMapper extends Mapper<Object, Text, Text, Text>{ //標記表 private Text leftTag = new Text("1"); //左表 private Text rightTag = new Text("2"); //右表 @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String childName = new String(); String parentName = new String(); //讀取內容 String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); //截取的字符串數組 String[] values = new String[2]; int i = 0; while (tokenizer.hasMoreElements()) { values[i++] = (String) tokenizer.nextElement(); } if (values[0].compareTo("child") != 0) { childName = values[0]; parentName = values[1]; //左表輸出 反向 value的值爲 grandchild context.write(new Text(parentName), new Text(leftTag.toString() + ":" + childName)); //右表輸出 正向 context.write(new Text(childName), new Text(rightTag.toString() + ":" + parentName)); } } } public static class STJoinReduce extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //記錄grandchild的信息 和存儲 int grandChlidNum = 0; String[] grandChild = new String[20]; //記錄grandparent 的信息 和存儲 int grandParentNum = 0; String[] grandParent = new String[20]; if (time == 0) { context.write(new Text("grandChild"), new Text("grandParent")); time++; } /** * 對於右表 將values的值存入 grandChild[] 中 * 對於左表 將values 的值存入 grandParnet[] 中 */ for (Text text : values) { String value = text.toString(); //System.out.println(key.toString() + "..." + value); String temp[] = value.split(":"); //System.out.println(key.toString() + "....." + temp[0] + "..." + temp[1]); //左表 if (temp[0].compareTo("1") == 0) { grandChild[grandChlidNum++] = temp[1]; } if (temp[0].compareTo("2") == 0) { grandParent[grandParentNum++] = temp[1]; } } //對 grandChild[] 和 grandParent[]進行求笛卡爾積 if (0 != grandChlidNum && 0 != grandParentNum) { //System.out.println(grandChlidNum + "..." + grandParentNum); for (int i = 0; i < grandChlidNum; i++) { for (int j = 0; j < grandParentNum; j++) { context.write(new Text(grandChild[i]), new Text(grandParent[j])); } } } } } @SuppressWarnings("deprecation") public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } // 若是out文件夾存在,現將該文件夾刪除 Path path = new Path("out"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path); } Job job = new Job(conf , "STJoin"); job.setJarByClass(STjoin.class); job.setMapperClass(STJMapper.class); job.setReducerClass(STJoinReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }