mapreduce學習工程之五---map端join鏈接

實驗環境 win7 hadoop2.7.3本地模式java

實驗數據:訂單數據orders.txt,商品數據pdts.txtapache

order.txt緩存

1001    pd001    300
1002    pd002    20
1003    pd003    40
1004    pd002    50

pdts.txtapp

pd001    apple
pd002    xiaomi
pd003    cuizi

實驗解決的問題:解決mapreduce鏈接過程當中的數據傾斜的問題,典型應用場景以下:在電商平臺中,買小米手機和買蘋果手機的訂單數量不少,買錘子手機的訂單數量不多,如ide

果根據傳統的mapreduce方法,3個reduce的數據將不均衡。好比接受小米的reduce接收到的數據會不少,接受錘子數據的reduce接收到的數據就會不多函數

實驗解決的思路:採用map端鏈接,直接將排序過程在map中執行,將商品信息加載在map信息中,引入mapreduce的輸入緩存機制oop

代碼如圖所示:ui

package com.tianjie.mapsidejoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {

    static class MapSideJoinMappe extends Mapper<LongWritable, Text, Text, NullWritable>{
        
        
        //map 商品的訂單信息k v key爲商品編號,v爲商品名稱
        Map<String,String>     pdInfoMap = new HashMap<String, String>();
        Text ktext = new Text();
        
        
        /*setup 函數用來加載文件到hadoop緩存中
         * */
        protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            
            //打開輸入文本文件的路徑,得到一個輸入流
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt")));
            String line;
            while(StringUtils.isNotEmpty(line = br.readLine())){
                //得到商品信息表 k爲商品編號,value爲商品名稱
                String[] split = line.split("\t");
                pdInfoMap.put(split[0], split[1]);
                
            }
            
        }
        /*
         * hadoop 的緩衝機制*/
        
        
        /*
         * map 函數的輸入key value ,其中默認輸入爲TextInputFormat,
         *     key 爲輸入文本的偏移量,value爲輸入文本的值
         *     Text,NullWriable爲map文件輸入的值
         *     */
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            
            //得到文本文件的一行
            String orderline  = value.toString();
            //將文本文件按照製表符切分
            String[] fields = orderline.split("\t");
            //更具商品編號,得到商品名稱
            String pdName = pdInfoMap.get(fields[1]);
            //得到商品的名字,將商品名稱追加在文本文件中
            ktext.set(orderline+"\t"+pdName);
            //將新的文本文件寫出
            context.write(ktext, NullWritable.get());
        }
        
    }
        
    
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        
        //獲得hadoop的一個配置參數
        Configuration conf = new Configuration();
        //獲取一個job實例
        Job job = Job.getInstance(conf);
        //加載job的運行類
        job.setJarByClass(MapSideJoin.class);
        
        //加載mapper的類
        job.setMapperClass(MapSideJoinMappe.class);
        //設置mapper類的輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        //設置文件輸入的路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        
        //設置文件的輸出路徑
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(args[1]);
        if(fs.isDirectory(path)){
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //指定須要緩衝一個文件到全部maptask運行節點工做目錄
        //job.addArchiveToClassPath(""); 緩存jar包到task運行節點的classpath中
        //job.addFileToClassPath(file); 緩存普通文件到task運行節點的classpath中
        //job.addCacheArchive(uri);      緩存壓縮包文件到task運行節點的工做目錄中
        
        //1:緩存普通文件到task運行節點的工做目錄
        job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt")); 
        
        //2:指定map端的加入邏輯不須要reduce階段,設置reducetask數量爲0
        job.setNumReduceTasks(0);
        
        //提交job任務,等待job任務的結束
        boolean res =job.waitForCompletion(true);
        System.exit(res?1:0);
        
        }
}

 

須要注意的點有:spa

1:採用map端鏈接時,能夠不適用reduce,這個時候能夠設置reducetask 的數量爲0:.net

2:程序運行的結果:

相關文章
相關標籤/搜索