Reduce鏈接(reduce-side joins)

若是沒有一個 map-side join 技術適合咱們的數據集,那麼就須要在 MapReduce 中使用 shuffle 來排序和鏈接兩個數據集。這稱爲 reduce-side joins,也叫」重分區鏈接」。java

【例】基本的重分區鏈接(repartition join/reduce-side join)apache

    重分區鏈接是 reduce 端鏈接。它利用 MapReduce 的排序-合併機制來分組數據。它只使用一個單獨的 MapReduce 任務,並支持 N-way join,這裏 N 指的是被鏈接的數據集的數量。app

    Map 階段負責從多個數據集中讀取數據,決定用於每條記錄的鏈接值,並將鏈接值做爲輸出 key。輸出 value 則包含在 reduce 階段所合併的數據集的數據。ide

    Reduce 階段,一個 reduce 接收 map 函數傳來的每個 join key 的全部輸出值,並將數據分爲 N 個分區,這裏 N 指的是被鏈接的數量。在該 reducer 接收到用於該 join value 的全部輸入記錄並在內存中對 他們分區以後,它對全部分區執行一個笛卡爾積(Cartersian product),並輸出每一個 join 的結果。下圖演示 了重分區 join:函數

    要支持這個技術,MapReduce 代碼須要知足如下條件:oop

    ■它須要支持多個 map 類,每一個 map 處理一個不一樣的輸入數據集。這是經過使用 MultipleInputs 來 完成的。日誌

    ■ 它須要一個方式來標記由 mapper 所輸出的記錄,這樣它們才能與它們原始的數據集相關聯。這 裏咱們將使用 htuple 項目來簡化 MapReduce 中組合數據(composite data)的處理。code

    文件 users.txt:orm

    

package com.edu360.mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * java 一出 誰與爭鋒
 * <p>
 * .............................................
 * 佛祖保佑             永無BUG
 *
 * @Auther: caozhan
 * @Date: 2018/11/4 13:11
 * @Description:
 */
public class RepartitionJoin extends Configured implements Tool {
    //聲明表明不一樣數據表的標誌變量
    public static final int USERS=0;//表明記錄來自用戶信息表
    public static final int USER_LOGS=1;//表明記錄來自用戶日誌記錄
    //處理來自 users.txt的輸入記錄
    public static  class UserMap extends Mapper<LongWritable, Text,Text, TupleWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //提取用戶名
            String username = value.toString().split("\t")[0];
            TupleWritable outputValue = new TupleWritable();
            outputValue.setTable(USERS);
            outputValue.setRecord(value.toString());
            context.write(new Text(username),outputValue);
        }
    }
    //處理來自logs.txt 的輸入記錄
    public static class UserLogMap extends Mapper<LongWritable,Text,Text, TupleWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //提取用戶名
            String username=value.toString().split("\t")[0];
            TupleWritable outValue = new TupleWritable();
            outValue.setTable(USER_LOGS);
            outValue.setRecord(value.toString());
            context.write(new Text(username),outValue);
        }
    }
    public static  class ReduceJoin extends Reducer<Text,TupleWritable,Text,Text>{
        private List<String> users,userLogs;
        private Text keyInfo = new Text();
        private Text valueInfo = new Text();

        @Override
        protected void reduce(Text key, Iterable<TupleWritable> values, Context context)
                throws IOException, InterruptedException {
            users = new ArrayList<>();
            userLogs=new ArrayList<>();

            //解析從mapper收到的輸入,分別放入相應的list集合中
            for(TupleWritable tupleWritable:values){
                System.out.println("Tuple:"+tupleWritable);
                switch (tupleWritable.getTable()){
                    case USERS:{
                        users.add(tupleWritable.getRecord());
                        break;
                    }
                    case USER_LOGS:{
                        userLogs.add(tupleWritable.getRecord());
                        break;
                    }
                }
            }
            //笛卡爾伺機
            for(String user:users){
                for(String userLog:userLogs){
                    keyInfo.set(user);
                    valueInfo.set(userLog);
                    context.write(keyInfo,valueInfo);
                }
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Path usersPath = new Path(args[0]);
        Path userLogsPath = new Path(args[1]);
        Path outputPath = new Path(args[2]);
        Job job=Job.getInstance(getConf(),"Simple Redpartition Jii");
        job.setJarByClass(ReduceJoin.class);
        
        //分別爲不一樣的輸入文件指定不一樣的inputformat
        MultipleInputs.addInputPath(job,usersPath, TextInputFormat.class,UserMap.class);
        MultipleInputs.addInputPath(job,userLogsPath,TextInputFormat.class,UserLogMap.class);
        
        job.setReducerClass(ReduceJoin.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TupleWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        FileOutputFormat.setOutputPath(job,outputPath);
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        int res= ToolRunner.run(new RepartitionJoin(),args);
        System.exit(res);
    }
}

發生的ERRO:blog

查看你的map和reduce 是否是沒有被static修飾,拿不到實例類。

最終結果

相關文章
相關標籤/搜索