若是沒有一個 map-side join 技術適合咱們的數據集,那麼就須要在 MapReduce 中使用 shuffle 來排序和鏈接兩個數據集。這稱爲 reduce-side joins,也叫」重分區鏈接」。java
【例】基本的重分區鏈接(repartition join/reduce-side join)apache
重分區鏈接是 reduce 端鏈接。它利用 MapReduce 的排序-合併機制來分組數據。它只使用一個單獨的 MapReduce 任務,並支持 N-way join,這裏 N 指的是被鏈接的數據集的數量。app
Map 階段負責從多個數據集中讀取數據,決定用於每條記錄的鏈接值,並將鏈接值做爲輸出 key。輸出 value 則包含在 reduce 階段所合併的數據集的數據。ide
Reduce 階段,一個 reduce 接收 map 函數傳來的每個 join key 的全部輸出值,並將數據分爲 N 個分區,這裏 N 指的是被鏈接的數量。在該 reducer 接收到用於該 join value 的全部輸入記錄並在內存中對 他們分區以後,它對全部分區執行一個笛卡爾積(Cartersian product),並輸出每一個 join 的結果。下圖演示 了重分區 join:函數
要支持這個技術,MapReduce 代碼須要知足如下條件:oop
■它須要支持多個 map 類,每一個 map 處理一個不一樣的輸入數據集。這是經過使用 MultipleInputs 來 完成的。日誌
■ 它須要一個方式來標記由 mapper 所輸出的記錄,這樣它們才能與它們原始的數據集相關聯。這 裏咱們將使用 htuple 項目來簡化 MapReduce 中組合數據(composite data)的處理。code
文件 users.txt:orm
package com.edu360.mapreduce; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * java 一出 誰與爭鋒 * <p> * ............................................. * 佛祖保佑 永無BUG * * @Auther: caozhan * @Date: 2018/11/4 13:11 * @Description: */ public class RepartitionJoin extends Configured implements Tool { //聲明表明不一樣數據表的標誌變量 public static final int USERS=0;//表明記錄來自用戶信息表 public static final int USER_LOGS=1;//表明記錄來自用戶日誌記錄 //處理來自 users.txt的輸入記錄 public static class UserMap extends Mapper<LongWritable, Text,Text, TupleWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //提取用戶名 String username = value.toString().split("\t")[0]; TupleWritable outputValue = new TupleWritable(); outputValue.setTable(USERS); outputValue.setRecord(value.toString()); context.write(new Text(username),outputValue); } } //處理來自logs.txt 的輸入記錄 public static class UserLogMap extends Mapper<LongWritable,Text,Text, TupleWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //提取用戶名 String username=value.toString().split("\t")[0]; TupleWritable outValue = new TupleWritable(); outValue.setTable(USER_LOGS); outValue.setRecord(value.toString()); context.write(new Text(username),outValue); } } public static class ReduceJoin extends Reducer<Text,TupleWritable,Text,Text>{ private List<String> users,userLogs; private Text keyInfo = new Text(); private Text valueInfo = new Text(); @Override protected void reduce(Text key, Iterable<TupleWritable> values, Context context) throws IOException, InterruptedException { users = new ArrayList<>(); userLogs=new ArrayList<>(); //解析從mapper收到的輸入,分別放入相應的list集合中 for(TupleWritable tupleWritable:values){ System.out.println("Tuple:"+tupleWritable); switch (tupleWritable.getTable()){ case USERS:{ users.add(tupleWritable.getRecord()); break; } case USER_LOGS:{ userLogs.add(tupleWritable.getRecord()); break; } } } //笛卡爾伺機 for(String user:users){ for(String userLog:userLogs){ keyInfo.set(user); valueInfo.set(userLog); context.write(keyInfo,valueInfo); } } } } @Override public int run(String[] args) throws Exception { Path usersPath = new Path(args[0]); Path userLogsPath = new Path(args[1]); Path outputPath = new Path(args[2]); Job job=Job.getInstance(getConf(),"Simple Redpartition Jii"); job.setJarByClass(ReduceJoin.class); //分別爲不一樣的輸入文件指定不一樣的inputformat MultipleInputs.addInputPath(job,usersPath, TextInputFormat.class,UserMap.class); MultipleInputs.addInputPath(job,userLogsPath,TextInputFormat.class,UserLogMap.class); job.setReducerClass(ReduceJoin.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TupleWritable.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int res= ToolRunner.run(new RepartitionJoin(),args); System.exit(res); } }
發生的ERRO:blog
查看你的map和reduce 是否是沒有被static修飾,拿不到實例類。
最終結果