Hadoop 學習之單錶鏈接

        我在學習hadoop, 在看 陸嘉恆編著的hadoop實戰,其中有單錶鏈接的程序,我如今整理一下思路。這個問題是課本上的例子。java

        給出 child-parent 表, 要求輸出 grandchild-grandparent 表
apache

    樣例輸入:數組

    child parentapp

    Tom Lucyide

    Tom Jackoop

    Jone Lucy學習

    Jone Jackspa

    Lucy Maryorm

    Lucy Ben token

    Jack Alice

    Jack Jesee

    Terry Alice 

    Terry Jesee

    Philip Terry 

    Philip Alma

    Mark Terry 

    Mark Alma

   

  樣例輸出:

    grandChildgrandParent

    TomAlice

    TomJesee

    JoneAlice

    JoneJesee

    TomMary

    TomBen

    JoneMary

    JoneBen

    PhilipAlice

    PhilipJesee

    MarkAlice

    MarkJesee


    其實這個案例只要想通了裏面的關鍵,仍是很簡單的。

     解題思路: 進行單錶鏈接

    從樣例輸入文件中,咱們能夠看到  child--parent(child)--parent ,經過這樣鏈接就會找出 grandchild -- grandparent 

    如:

    child    parent

    Tom    Lucy

    Tom   Jack

    Lucy Mary

    Lucy Ben 

    Jack Alice

    Jack Jesee

    

    這樣咱們能夠很容易的找到下面的關係:

    grandchild     grandparent

    Tom                    Mary

    Tom                    Ben

    Tom                    Alice

    Tom                    Jesee


    咱們能夠這樣鏈接:

     表一:                        表二:

    child    parent           child    parent

    Tom     Lucy               Lucy   Mary

                                        Lucy  Ben 

    Tom    Jack               Jack   Alice

                                       Jack  Jesee



    咱們能夠將表一和表二進行鏈接,而後去掉 表一的第二列 和表二的第一列, 剩下的就是 結果了。

    這裏咱們能夠看到 ,其實表一和表二是一個表,這就是單錶鏈接

    

    這裏能夠將將這個表設置爲左表和右表

    Map 階段:

    將讀入的數據 分割爲child 和 parent  , 爲了區分左右表能夠在 輸出的value 裏面加上標記左右表的信息, 左表 將 parent 做爲key , 左表標記+child 做爲 value    爲map輸出, 右表 child 做爲key ,右表標記+parent 做爲value  爲輸出。


    在Map 階段完成了左右表的劃分,在shuffle 階段完成了左右錶鏈接

     Reduce 階段:

    (相同key 的 會匯聚在一塊兒,如  <Lucy ,<leftTag:Tom , rightTag:Mary , rightTag:Ben> >)

    像這樣在Reduce 階段收到的結果中,每一個key的value-list 中就包含了grandchild (left:Tom)和grandparnet (rightTag : Mary , rgihtTag : Ben)的關係,而後將value解析, 有leftTag標記的存入 grandChild[] 數組中,將有rightTag 標記的 存入 grandParent[] 數組中,而後對 grandChild[] 和grandParent[] 求笛卡爾積 便可


下面是程序代碼:

package cn.edu.ytu.botao.singletablejoin;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * 
 * 單錶鏈接
 * 
 * child   parent
 * Tom      Lucy
 * Tom      Jack
 * Lucy      Mary
 * Lucy      Ben
 * 
 * 左表 :   反向輸出  <key parent, value chlid>
 * Lucy  Tom
 * Jack  Tom
 * 
 * 右表      正向輸出  <key child, value parent>
 * Lucy  Mary
 * Lucy  Ben
 * 
 * 鏈接後:
 * 
 * <Tom, <Mary , Ben> >
 * 
 * @author botao
 *
 */


public class STjoin {
	private static int time = 0;
	public static class STJMapper extends Mapper<Object, Text, Text, Text>{
		//標記表
		private Text leftTag = new Text("1");     //左表
		private Text rightTag = new Text("2");   //右表
		
		@Override
		protected void map(Object key, Text value,
				Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			
			String childName = new String();
			String parentName = new String();
			//讀取內容
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line);
			
			//截取的字符串數組
			String[] values = new String[2];
			int i = 0;
			while (tokenizer.hasMoreElements()) {
				values[i++] = (String) tokenizer.nextElement();
			}
			
			if (values[0].compareTo("child") != 0) {
				childName = values[0];
				parentName = values[1];
				
				//左表輸出 反向   value的值爲 grandchild
				context.write(new Text(parentName), new Text(leftTag.toString() + ":" + childName));
				//右表輸出 正向
				context.write(new Text(childName), new Text(rightTag.toString() + ":" + parentName));
			}
		
		}
		
	}
	
	
	public static class STJoinReduce extends Reducer<Text, Text, Text, Text>{

		
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			
			//記錄grandchild的信息 和存儲
			int grandChlidNum = 0;
			String[] grandChild = new String[20];
			
			//記錄grandparent 的信息 和存儲
			int grandParentNum = 0;
			String[] grandParent = new String[20];
			
			if (time == 0) {
				context.write(new Text("grandChild"), new Text("grandParent"));
				time++;
			}
			
			/**
			 * 對於右表 將values的值存入 grandChild[] 中
			 * 對於左表 將values 的值存入 grandParnet[] 中
			 */
			for (Text text : values) {
				String value = text.toString();
				//System.out.println(key.toString() + "..." + value);
				String temp[] = value.split(":");
				//System.out.println(key.toString() + "....." + temp[0] + "..." + temp[1]);
				//左表
				if (temp[0].compareTo("1") == 0) {
					
					grandChild[grandChlidNum++] = temp[1];
					
					
				}
				
				if (temp[0].compareTo("2") == 0) {
					grandParent[grandParentNum++] = temp[1];
					
				}
				
			}
			
			//對 grandChild[] 和 grandParent[]進行求笛卡爾積
			if (0 != grandChlidNum && 0 != grandParentNum) {
				//System.out.println(grandChlidNum + "..." + grandParentNum);
				for (int i = 0; i < grandChlidNum; i++) {
					for (int j = 0; j < grandParentNum; j++) {
						context.write(new Text(grandChild[i]), new Text(grandParent[j]));
					}
				}
			}
		}
		
	}
	
	
	@SuppressWarnings("deprecation")
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}

		// 若是out文件夾存在,現將該文件夾刪除
		Path path = new Path("out");
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(path)) {
			fs.delete(path);
		}
	    
	    
	    Job job = new Job(conf , "STJoin");
	    job.setJarByClass(STjoin.class);
	    job.setMapperClass(STJMapper.class);
	    job.setReducerClass(STJoinReduce.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}	
}
相關文章
相關標籤/搜索