實驗環境 win7 hadoop2.7.3本地模式java
實驗數據:訂單數據orders.txt,商品數據pdts.txtapache
order.txt緩存
1001 pd001 300 1002 pd002 20 1003 pd003 40 1004 pd002 50
pdts.txtapp
pd001 apple
pd002 xiaomi
pd003 cuizi
實驗解決的問題:解決mapreduce鏈接過程當中的數據傾斜的問題,典型應用場景以下:在電商平臺中,買小米手機和買蘋果手機的訂單數量不少,買錘子手機的訂單數量不多,如ide
果根據傳統的mapreduce方法,3個reduce的數據將不均衡。好比接受小米的reduce接收到的數據會不少,接受錘子數據的reduce接收到的數據就會不多函數
實驗解決的思路:採用map端鏈接,直接將排序過程在map中執行,將商品信息加載在map信息中,引入mapreduce的輸入緩存機制oop
代碼如圖所示:ui
package com.tianjie.mapsidejoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapSideJoin { static class MapSideJoinMappe extends Mapper<LongWritable, Text, Text, NullWritable>{ //map 商品的訂單信息k v key爲商品編號,v爲商品名稱 Map<String,String> pdInfoMap = new HashMap<String, String>(); Text ktext = new Text(); /*setup 函數用來加載文件到hadoop緩存中 * */ protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //打開輸入文本文件的路徑,得到一個輸入流 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("C:/Users/admin/Desktop/join/cache/pdts.txt"))); String line; while(StringUtils.isNotEmpty(line = br.readLine())){ //得到商品信息表 k爲商品編號,value爲商品名稱 String[] split = line.split("\t"); pdInfoMap.put(split[0], split[1]); } } /* * hadoop 的緩衝機制*/ /* * map 函數的輸入key value ,其中默認輸入爲TextInputFormat, * key 爲輸入文本的偏移量,value爲輸入文本的值 * Text,NullWriable爲map文件輸入的值 * */ protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //得到文本文件的一行 String orderline = value.toString(); //將文本文件按照製表符切分 String[] fields = orderline.split("\t"); //更具商品編號,得到商品名稱 String pdName = pdInfoMap.get(fields[1]); //得到商品的名字,將商品名稱追加在文本文件中 ktext.set(orderline+"\t"+pdName); //將新的文本文件寫出 context.write(ktext, NullWritable.get()); } } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { //獲得hadoop的一個配置參數 Configuration conf = new Configuration(); //獲取一個job實例 Job job = Job.getInstance(conf); //加載job的運行類 job.setJarByClass(MapSideJoin.class); //加載mapper的類 job.setMapperClass(MapSideJoinMappe.class); //設置mapper類的輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //設置文件輸入的路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); //設置文件的輸出路徑 FileSystem fs = FileSystem.get(conf); Path path = new Path(args[1]); if(fs.isDirectory(path)){ fs.delete(path, true); } FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定須要緩衝一個文件到全部maptask運行節點工做目錄 //job.addArchiveToClassPath(""); 緩存jar包到task運行節點的classpath中 //job.addFileToClassPath(file); 緩存普通文件到task運行節點的classpath中 //job.addCacheArchive(uri); 緩存壓縮包文件到task運行節點的工做目錄中 //1:緩存普通文件到task運行節點的工做目錄 job.addCacheFile(new URI("file:///C:/Users/admin/Desktop/join/cache/pdts.txt")); //2:指定map端的加入邏輯不須要reduce階段,設置reducetask數量爲0 job.setNumReduceTasks(0); //提交job任務,等待job任務的結束 boolean res =job.waitForCompletion(true); System.exit(res?1:0); } }
須要注意的點有:spa
1:採用map端鏈接時,能夠不適用reduce,這個時候能夠設置reducetask 的數量爲0:.net
2:程序運行的結果: