大數據學習之十二——MapReduce代碼實例:關聯性操做

1.單表關聯java

"單表關聯"要求從給出的數據中尋找所關心的數據,它是對原始數據所包含信息的挖掘。算法

實例描述
給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。數據庫

算法思想:數組

這個實例須要進行單錶鏈接,鏈接的是左表的parent列和右表的child列,且左表和右表是同一個表。鏈接結果中除去鏈接的兩列就是所須要的結果——"grandchild--grandparent"表。要用MapReduce解決這個實例,首先應該考慮如何實現表的自鏈接;其次就是鏈接列的設置;最後是結果的整理。MapReduce的shuffle過程會將相同的key會鏈接在一塊兒,因此能夠將map結果的key設置成待鏈接的列,而後列中相同的值就天然會鏈接在一塊兒了。
1.map階段將讀入數據分割成child和parent以後,將parent設置成key,child設置成value進行輸出,並做爲左表;再將同一對child和parent中的child設置成key,parent設置成value進行輸出,做爲右表
2.爲了區分輸出中的左右表,須要在輸出的value中再加上左右表的信息,好比在value的String最開始處加上字符1表示左表,加上字符2表示右表
3. reduce接收到鏈接的結果,其中每一個key的value-list就包含了"grandchild--grandparent"關係。取出每一個key的value-list進行解析,將左表中的child放入一個數組,右表中的parent放入一個數組,而後對兩個數組求笛卡爾積就是最後的結果了app

代碼實例:orm

public class table01 {     blog

static String INPUT_PATH="hdfs://master:9000/input/i.txt";  token

static String OUTPUT_PATH="hdfs://master:9000/output/singletable01";    隊列

static class MyMapper extends Mapper<Object,Object,Text,Text>{    //輸入爲字符串類型字符串

Text output_key=new Text();   

Text output_value=new Text();  

 protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    

String[] tokens=value.toString().split(",");    //以,分割

if(tokens!=null && tokens.length==2){    //判斷表分割成兩列

output_key.set(tokens[0]);   //將child做爲右表的key值,右表標記爲2

 output_value.set(2+","+value);    

context.write(output_key, output_value);        

output_key.set(tokens[1]);    //將parent列做爲key值,做爲左表,標記爲1

output_value.set(1+","+value);    

context.write(output_key, output_value);        //將一個表分割成了兩個表

System.out.println(tokens[0]+"--"+tokens[1]);    

}   

}  

}    

static class MyReduce extends Reducer<Text,Text,Text,Text>{    //傳入到MapReduce變成這樣的格式:  lucy , {1,tom,lucy  2,lucy,mary}

Text output_key=new Text();   

Text output_value=new Text();   

 protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{    

List<String> childs=new ArrayList();    

List<String> grands=new ArrayList();       

 for(Text line:values){     

String[] tokens=line.toString().split(",");     

if(tokens[0].equals("1")){        //判斷是左表的話,即parent做爲key值的時候,將孩子加入隊列中

childs.add(tokens[1]);      

System.out.println(1+"--"+tokens[1]);    

 }     

else if(tokens[0].equals("2")){      //右表,childs做爲key值,將祖父母加入隊列

grands.add(tokens[2]);      

System.out.println(2+"--"+tokens[2]);    

 }       

 }       

 for(String c:childs){      //循環輸出

for(String g:grands){      

output_key.set(c);      

output_value.set(g);      

context.write(output_key, output_value);    

 }    

}   

 }    

public static void main(String[] args) throws Exception{  

 Path outputpath=new Path(OUTPUT_PATH);  

 Configuration conf=new Configuration();      

Job job=Job.getInstance(conf);   

FileInputFormat.setInputPaths(job, INPUT_PATH);  

 FileOutputFormat.setOutputPath(job,outputpath);     

 job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);     

 job.setOutputKeyClass(Text.class);   

job.setOutputValueClass(Text.class);     

 job.waitForCompletion(true);  

}

}

 

 2.多表關聯

實例描述
輸入是兩個文件,一個表明工廠表,包含工廠名列和地址編號列;另外一個表明地址表,包含地址名列和地址編號列。要求從輸入數據中找出工廠名和地址名的對應關係,輸出"工廠名——地址名"表 。

算法思想:

多表關聯和單表關聯類似,都相似於數據庫中的天然鏈接。相比單表關聯,多表關聯的左右表和鏈接列更加清楚。因此能夠採用和單表關聯的相同的處理方式,map識別出輸入的行屬於哪一個表以後,對其進行分割,將鏈接的列值保存在key中,另外一列和左右表標識保存在value中,而後輸出。reduce拿到鏈接結果以後,解析value內容,根據標誌將左右表內容分開存放,而後求笛卡爾積,最後直接輸出。

 public class table02 {  

static String INPUT_PATH="hdfs://master:9000/doubletable";  

static String OUTPUT_PATH="hdfs://master:9000/output/doubletable";    

static class MyMapper extends Mapper<Object,Object,Text,Text>{   

Text output_key=new Text();   

Text output_value=new Text();   

String tableName="";   //區分表名

protected void setup(Context context)throws java.io.IOException,java.lang.InterruptedException{    

FileSplit fs=(FileSplit)context.getInputSplit();    //將多個表格區分開來

tableName=fs.getPath().getName();    //獲得表名

System.out.println(tableName);      

 }   

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    

String[] tokens=value.toString().split(",");                                              

if(tokens!=null && tokens.length==2){       

 if(tableName.equals("l.txt")){     //若是是表一的話

output_key.set(tokens[1]);     //將addressID做爲key值鏈接

output_value.set(1+","+tokens[0]+","+tokens[1]);    //1只是一個標記

}    

else if(tableName.equals("m.txt")){     //若是是表二的話

 output_key.set(tokens[0]);     //addressID是第一個屬性

output_value.set(2+","+tokens[0]+","+tokens[1]);   

 }    

context.write(output_key, output_value);    

}  

 }

 }  

static class MyReduce extends Reducer<Text,Text,Text,Text>{   

Text output_key=new Text();   

Text output_value=new Text();      

 protected void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException{

List<String>  factorys=new ArrayList();    

List<String> addrs=new ArrayList();    

for(Text line:value){     

String[] tokens=line.toString().split(",");    

 if(tokens[0].equals("1")){      //表一取出factory的值

 factorys.add(tokens[1]);     

}     

else if(tokens[0].equals("2")){     

 addrs.add(tokens[2]);    //表二取出address的值

 }    

}      

for(String c:factorys)       //循環輸出 

for(String g:addrs){         

output_key.set(c);         

output_value.set(g);         

context.write(output_key,output_value);      

 }   

}

 }    

public static void main(String[] args) throws Exception{   

Path outputpath=new Path(OUTPUT_PATH);   

Configuration conf=new Configuration();    

FileSystem fs=outputpath.getFileSystem(conf);   

if(fs.exists(outputpath)){    

fs.delete(outputpath, true);   

}      

Job job=Job.getInstance(conf);   

FileInputFormat.setInputPaths(job, INPUT_PATH);   

FileOutputFormat.setOutputPath(job,outputpath);     

 job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);

  job.setOutputKeyClass(Text.class);  

 job.setOutputValueClass(Text.class);      

job.waitForCompletion(true);

 }

}

相關文章
相關標籤/搜索