場景:有兩張表,一張用戶表(user),交易表(transactions)。兩張表的字段以下:java
兩份表數據作個左鏈接,查詢出(商品名,地址)這種格式。apache
這樣就是至關於交易表是左表,無論怎麼樣數據都要保留,而後從右邊裏面查出來彌補左表。app
效果以下:ide
思路:寫兩個map,把兩個表的數據都讀進來,在reduce端進行鏈接,而後按照格式要求寫出去。oop
(1)map1:讀取transaction文件,封裝爲:ui
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)spa
throws IOException, InterruptedException {3d
String lines=value.toString();code
String[] args=lines.split(" ");orm
String productID=args[1];
String userID=args[2];
//把outPutKey加了一個2,這麼作的目的是,後續在reduce端,聚合時,這個數據可以晚點到達。
outPutKey.set(userID, "2");
outPutValue.set("P", productID);
context.write(outPutKey, outPutValue);
}
(2)map2:讀取user文件,封裝爲:
static class map2 extends Mapper<LongWritable, Text,PairOfStrings,PairOfStrings>
{
PairOfStrings outPutKey=new PairOfStrings();
PairOfStrings outPutvalue=new PairOfStrings();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] args=line.split(" ");
String userID=args[0];
String locationID=args[1];
//把outPutKey加了一個1,這麼作的目的是,後續在reduce端,聚合時,這個數據可以早於transaction文件裏面的數據到達。
outPutKey.set(userID, "1");
outPutvalue.set("L", locationID);
context.write(outPutKey, outPutvalue);
}
(3)reduce:把map端的數據要根據用戶ID分區,相同的用戶ID寫入到同一個分區,進而寫入到同一個Reduce分區,而後在Reduce中根據PairOfStrings這個類的本身的排序規則對數據排序。由於前面對key作了處理(加了1,2),因此是用戶的地址這些信息先到達reduce。,而後根據不一樣的分組,把數據寫出來。
總的代碼結構:
LeftCmain:
package com.guigu.left; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import edu.umd.cloud9.io.pair.PairOfStrings; public class LeftCmain { //讀取transaction文件 static class map1 extends Mapper<LongWritable, Text, PairOfStrings,PairOfStrings> { PairOfStrings outPutKey=new PairOfStrings(); PairOfStrings outPutValue=new PairOfStrings(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context) throws IOException, InterruptedException { String lines=value.toString(); String[] args=lines.split(" "); String productID=args[1]; String userID=args[2]; outPutKey.set(userID, "2"); outPutValue.set("P", productID); context.write(outPutKey, outPutValue); } } //讀取user文件 static class map2 extends Mapper<LongWritable, Text,PairOfStrings,PairOfStrings> { PairOfStrings outPutKey=new PairOfStrings(); PairOfStrings outPutvalue=new PairOfStrings(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context) throws IOException, InterruptedException { String line=value.toString(); String[] args=line.split(" "); String userID=args[0]; String locationID=args[1]; outPutKey.set(userID, "1"); outPutvalue.set("L", locationID); context.write(outPutKey, outPutvalue); } } /** * 這個的關鍵點在於,取出的數據:要求先取出地址的數據。 * @author Sxq * */ static class reduce1 extends Reducer<PairOfStrings, PairOfStrings, Text, Text> { Text produceID=new Text(); Text localID=new Text("undefine"); @Override protected void reduce(PairOfStrings arg0, Iterable<PairOfStrings> Iterator1, Reducer<PairOfStrings, PairOfStrings, Text, Text>.Context context) throws IOException, InterruptedException { Iterator<PairOfStrings> iterator=Iterator1.iterator(); //因爲作了二次排序,能夠保證先獲得的是地址的數據。 if(iterator.hasNext()) { PairOfStrings fisrPair=iterator.next(); // System.out.println("firstPair="+fisrPair.toString()); //若是是地址的信息,那就把他直接放出來 if(fisrPair.getLeftElement().equals("L")) { localID.set(fisrPair.getRightElement()); } } while(iterator.hasNext()) { PairOfStrings pairOfStrings=iterator.next(); //System.out.println(pairOfStrings.toString()); produceID.set(pairOfStrings.getRightElement()); System.out.println("prdouct:"+produceID.toString()+"localId:"+localID.toString()); //System.out.println(); context.write(produceID, localID); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LeftCmain.class); job.setMapperClass(map1.class); job.setReducerClass(reduce1.class); job.setMapOutputKeyClass(PairOfStrings.class); job.setMapOutputValueClass(PairOfStrings.class); job.setOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setSortComparatorClass(PairOfStrings.Comparator.class); // 在Reduce端設置分組,使得同一個用戶在同一個組,而後作拼接。 job.setGroupingComparatorClass(SecondarySortGroupComparator.class); // 設置分區 job.setPartitionerClass(SecondarySortParitioner.class); // job.setOutputFormatClass(SequenceFileOutputFormat.class); Path transactions=new Path("/Users/mac/Desktop/transactions.txt"); MultipleInputs.addInputPath(job,transactions,TextInputFormat.class,map1.class); MultipleInputs.addInputPath(job,new Path("/Users/mac/Desktop/user.txt"), TextInputFormat.class,map2.class); FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort")); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
SecondarySortGroupComparator:
package com.guigu.left; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import com.book.test1.CompositeKey; import edu.umd.cloud9.io.pair.PairOfStrings; /** * 不一樣分區的組聚合時,能夠按照咱們要的順序來排列 * @author Sxq *WritableComparator */ public class SecondarySortGroupComparator extends WritableComparator { public SecondarySortGroupComparator() { super(PairOfStrings.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { PairOfStrings v1=(PairOfStrings)a; PairOfStrings v2=(PairOfStrings)b; return v1.getLeftElement().compareTo(v2.getLeftElement()); } }
SecondarySortParitioner:
package com.guigu.left; import org.apache.hadoop.mapreduce.Partitioner; import edu.umd.cloud9.io.pair.PairOfStrings; /** * * @author Sxq * */ public class SecondarySortParitioner extends Partitioner<PairOfStrings, Object>{ @Override public int getPartition(PairOfStrings key, Object value, int numPartitions) { return (key.getLeftElement().hashCode()&Integer.MAX_VALUE)%numPartitions; } }
運行結果: