1.單表關聯java
"單表關聯"要求從給出的數據中尋找所關心的數據,它是對原始數據所包含信息的挖掘。算法
實例描述
給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。數據庫
算法思想:數組
這個實例須要進行單錶鏈接,鏈接的是左表的parent列和右表的child列,且左表和右表是同一個表。鏈接結果中除去鏈接的兩列就是所須要的結果——"grandchild--grandparent"表。要用MapReduce解決這個實例,首先應該考慮如何實現表的自鏈接;其次就是鏈接列的設置;最後是結果的整理。MapReduce的shuffle過程會將相同的key會鏈接在一塊兒,因此能夠將map結果的key設置成待鏈接的列,而後列中相同的值就天然會鏈接在一塊兒了。
1.map階段將讀入數據分割成child和parent以後,將parent設置成key,child設置成value進行輸出,並做爲左表;再將同一對child和parent中的child設置成key,parent設置成value進行輸出,做爲右表
2.爲了區分輸出中的左右表,須要在輸出的value中再加上左右表的信息,好比在value的String最開始處加上字符1表示左表,加上字符2表示右表
3. reduce接收到鏈接的結果,其中每一個key的value-list就包含了"grandchild--grandparent"關係。取出每一個key的value-list進行解析,將左表中的child放入一個數組,右表中的parent放入一個數組,而後對兩個數組求笛卡爾積就是最後的結果了app
代碼實例:orm
public class table01 { blog
static String INPUT_PATH="hdfs://master:9000/input/i.txt"; token
static String OUTPUT_PATH="hdfs://master:9000/output/singletable01"; 隊列
static class MyMapper extends Mapper<Object,Object,Text,Text>{ //輸入爲字符串類型字符串
Text output_key=new Text();
Text output_value=new Text();
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{
String[] tokens=value.toString().split(","); //以,分割
if(tokens!=null && tokens.length==2){ //判斷表分割成兩列
output_key.set(tokens[0]); //將child做爲右表的key值,右表標記爲2
output_value.set(2+","+value);
context.write(output_key, output_value);
output_key.set(tokens[1]); //將parent列做爲key值,做爲左表,標記爲1
output_value.set(1+","+value);
context.write(output_key, output_value); //將一個表分割成了兩個表
System.out.println(tokens[0]+"--"+tokens[1]);
}
}
}
static class MyReduce extends Reducer<Text,Text,Text,Text>{ //傳入到MapReduce變成這樣的格式: lucy , {1,tom,lucy 2,lucy,mary}
Text output_key=new Text();
Text output_value=new Text();
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
List<String> childs=new ArrayList();
List<String> grands=new ArrayList();
for(Text line:values){
String[] tokens=line.toString().split(",");
if(tokens[0].equals("1")){ //判斷是左表的話,即parent做爲key值的時候,將孩子加入隊列中
childs.add(tokens[1]);
System.out.println(1+"--"+tokens[1]);
}
else if(tokens[0].equals("2")){ //右表,childs做爲key值,將祖父母加入隊列
grands.add(tokens[2]);
System.out.println(2+"--"+tokens[2]);
}
}
for(String c:childs){ //循環輸出
for(String g:grands){
output_key.set(c);
output_value.set(g);
context.write(output_key, output_value);
}
}
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
}
}
2.多表關聯
實例描述
輸入是兩個文件,一個表明工廠表,包含工廠名列和地址編號列;另外一個表明地址表,包含地址名列和地址編號列。要求從輸入數據中找出工廠名和地址名的對應關係,輸出"工廠名——地址名"表 。
算法思想:
多表關聯和單表關聯類似,都相似於數據庫中的天然鏈接。相比單表關聯,多表關聯的左右表和鏈接列更加清楚。因此能夠採用和單表關聯的相同的處理方式,map識別出輸入的行屬於哪一個表以後,對其進行分割,將鏈接的列值保存在key中,另外一列和左右表標識保存在value中,而後輸出。reduce拿到鏈接結果以後,解析value內容,根據標誌將左右表內容分開存放,而後求笛卡爾積,最後直接輸出。
public class table02 {
static String INPUT_PATH="hdfs://master:9000/doubletable";
static String OUTPUT_PATH="hdfs://master:9000/output/doubletable";
static class MyMapper extends Mapper<Object,Object,Text,Text>{
Text output_key=new Text();
Text output_value=new Text();
String tableName=""; //區分表名
protected void setup(Context context)throws java.io.IOException,java.lang.InterruptedException{
FileSplit fs=(FileSplit)context.getInputSplit(); //將多個表格區分開來
tableName=fs.getPath().getName(); //獲得表名
System.out.println(tableName);
}
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{
String[] tokens=value.toString().split(",");
if(tokens!=null && tokens.length==2){
if(tableName.equals("l.txt")){ //若是是表一的話
output_key.set(tokens[1]); //將addressID做爲key值鏈接
output_value.set(1+","+tokens[0]+","+tokens[1]); //1只是一個標記
}
else if(tableName.equals("m.txt")){ //若是是表二的話
output_key.set(tokens[0]); //addressID是第一個屬性
output_value.set(2+","+tokens[0]+","+tokens[1]);
}
context.write(output_key, output_value);
}
}
}
static class MyReduce extends Reducer<Text,Text,Text,Text>{
Text output_key=new Text();
Text output_value=new Text();
protected void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException{
List<String> factorys=new ArrayList();
List<String> addrs=new ArrayList();
for(Text line:value){
String[] tokens=line.toString().split(",");
if(tokens[0].equals("1")){ //表一取出factory的值
factorys.add(tokens[1]);
}
else if(tokens[0].equals("2")){
addrs.add(tokens[2]); //表二取出address的值
}
}
for(String c:factorys) //循環輸出
for(String g:addrs){
output_key.set(c);
output_value.set(g);
context.write(output_key,output_value);
}
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
FileSystem fs=outputpath.getFileSystem(conf);
if(fs.exists(outputpath)){
fs.delete(outputpath, true);
}
Job job=Job.getInstance(conf);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
}
}