當一個數據集很是小時,能夠將小數據集發送到每一個節點,節點緩存到內存中,這個數據集稱爲邊數據。用map函數將小數據集中的數據按鍵聚合到大的數據集中,輸出鏈接數據集,進行鏈接操做。css
(1) 分佈式緩存指定緩存文件html
執行命令行時,採用hadoop jar hadoop-example.jar MapSideJoinMain -files input/cityfile/tb_dim_city.dat input/data/all outputjava
-files input/cityfile/tb_dim_city.dat指定須要緩存的文件,會被複制到各個節任務點。node
(2)指定緩存文件的三種類型apache
Hadoop 命令行選項中,有三個命令能夠實現文件複製分發到任務的各個節點。用戶啓動一個做業,Hadoop 會把由 -files、-archives、和 -libjars 等選項所指定的文件複製到分佈式文件系統之中,任務運行前,節點管理器從分佈式文件系統中複製文件到本地。緩存
1) -files 選項指定待分發的文件,文件內包含以逗號隔開的 URL 列表。文件能夠存放在本地文件系統、HDFS、或其它 Hadoop 可讀文件系統之中。 若是還沒有指定文件系統,則這些文件被默認是本地的。即便默認文件系統並不是本地文件系統,這也是成立的。app
2) -archives 選項向本身的任務中複製存檔(壓縮)文件,好比JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,這些文件會被解檔到任務節點。分佈式
3) -libjars 選項把 JAR 文件添加到 mapper 和 reducer 任務的類路徑中。若是做業 JAR 文件並不是包含不少庫 JAR 文件,這點會頗有用。ide
(3)緩存文件刪除機制函數
節點管理器爲緩存中的文件各維護一個計數器,任務運行時,文件計數器加1,任務完成後,計數器減1,計數器爲0時才能刪除文件,當節點緩存容量大於必定值(yarn.nodemanger.localizer.cache.target-size-mb設置,默認10GB),纔會刪除最近最少使用的文件。
(4)Job的分佈式緩存API
除了能夠用命令行參數指定緩存文件外,還以經過Job的API指定緩存文件;即經過job對象調用下面的函數設置緩存文件。
//如下兩組方法將文件或存檔添加到分佈式緩存
public void addCacheFile(URI uri);
public void addCacheArchive(URI uri);
//如下兩組方法將一次性向分佈式緩存中添加一組文件或存檔
public void setCacheFiles(URI[] files);
public void setCacheArchives(URI[] archives);
//如下兩組方法將文件或存檔添加到 MapReduce 任務的類路徑
public void addFileToClassPath(Path file);
public void addArchiveToClassPath(Path archive);
public void createSymlink();
(6)DistributedCache緩存小數據集實現hadoop map端鏈接實例
下面的實例是將城市名稱的數據集和用戶信息的數據集進行鏈接,城市名稱的數據集很小,而用戶信息的數據集很大,因此能夠採用緩存文件的方式,將城市信息數據集發送到任務,map任務經過setup方法從緩存中讀取小數據集文件tb_dim_city.dat,在內存中造成map映射,map函數處理用戶信息數據,根據用戶信息中的城市id去map映射中找到城市名稱,而後合併輸出。
package Temperature;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* 用途說明:
* Map side join中的left outer join
* 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),
* 假設tb_dim_city文件記錄數不多,tb_dim_city.dat文件內容,分隔符爲"|":
* id name orderid city_code is_show
* 0 其餘 9999 9999 0
* 1 長春 1 901 1
* 2 吉林 2 902 1
* 3 四平 3 903 1
* 4 松原 4 904 1
* 5 通化 5 905 1
* 6 遼源 6 906 1
* 7 白城 7 907 1
* 8 白山 8 908 1
* 9 延吉 9 909 1
* -------------------------風騷的分割線-------------------------------
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
* tb_user_profiles.dat文件內容,分隔符爲"|":
* userID network flow cityID
* 1 2G 123 1
* 2 3G 333 2
* 3 3G 555 1
* 4 2G 777 3
* 5 3G 666 4
* -------------------------風騷的分割線-------------------------------
* 結果:
* 1 長春 1 901 1 1 2G 123
* 1 長春 1 901 1 3 3G 555
* 2 吉林 2 902 1 2 3G 333
* 3 四平 3 903 1 4 2G 777
* 4 松原 4 904 1 5 3G 666
*/
public class MapSideJoinMain extends Configured implements Tool{
private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
public static class LeftOutJoinMapper extends Mapper {
private HashMap city_info = new HashMap<String,String>();
private Text outPutKey = new Text();
private Text outPutValue = new Text();
private String mapInputStr = null;
private String mapInputSpit[] = null;
private String city_secondPart = null;
/**
* 此方法在每一個task開始以前執行,這裏主要用做從DistributedCache
* 中取到tb_dim_city文件,並將裏邊記錄取出放到內存中。
*/
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
BufferedReader br = null;
//得到當前做業的DistributedCache相關文件
Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String cityInfo = null;
for(Path p : distributePaths){
if(p.toString().endsWith("tb_dim_city.dat")){
//讀緩存文件,並放到mem中
br = new BufferedReader(new FileReader(p.toString()));
while(null!=(cityInfo=br.readLine())){
String[] cityPart = cityInfo.split("\\|",5);
if(cityPart.length ==5){
city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);
}
}
}
}
}
/**
* Map端的實現至關簡單,直接判斷tb_user_profiles.dat中的
* cityID是否存在個人map中就ok了,這樣就能夠實現Map Join了
*/
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//排掉空行
if(value == null || value.toString().equals("")){
return;
}
mapInputStr = value.toString();
mapInputSpit = mapInputStr.split("\\|",4);
//過濾非法記錄
if(mapInputSpit.length != 4){
return;
}
//判斷連接字段是否在map中存在
city_secondPart = (String) city_info.get((Object) mapInputSpit[3]);
if(city_secondPart != null){
this.outPutKey.set(mapInputSpit[3]);
this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);
context.write(outPutKey, outPutValue);
}
}
}
public int run(String[] args) throws Exception {
Configuration conf=getConf(); //得到配置文件對象
DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//爲該job添加緩存文件
Job job=new Job(conf,"MapJoinMR");
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑
FileOutputFormat.setOutputPath(job, new Path(args[2])); //設置reduce輸出文件路徑
job.setJarByClass(MapSideJoinMain.class);
job.setMapperClass(LeftOutJoinMapper.class);
job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式
job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式
//設置map的輸出key和value類型
job.setMapOutputKeyClass(Text.class);
//設置reduce的輸出key和value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
try {
int returnCode = ToolRunner.run(new MapSideJoinMain(),args);
System.exit(returnCode);
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
}
實例參考文獻:
http://www.javashuo.com/article/p-ethukfpi-ep.html
本身開發了一個股票智能分析軟件,功能很強大,須要的點擊下面的連接獲取: