9.3.1 map端鏈接- DistributedCache分佈式緩存小數據集

1.1.1         map端鏈接- DistributedCache分佈式緩存小數據集

當一個數據集很是小時,能夠將小數據集發送到每一個節點,節點緩存到內存中,這個數據集稱爲邊數據。用map函數將小數據集中的數據按鍵聚合到大的數據集中,輸出鏈接數據集,進行鏈接操做。css

(1)   分佈式緩存指定緩存文件html

執行命令行時,採用hadoop  jar hadoop-example.jar MapSideJoinMain  -files input/cityfile/tb_dim_city.dat input/data/all outputjava

-files input/cityfile/tb_dim_city.dat指定須要緩存的文件,會被複制到各個節任務點。node

(2)指定緩存文件的三種類型apache

 Hadoop 命令行選項中,有三個命令能夠實現文件複製分發到任務的各個節點。用戶啓動一個做業,Hadoop 會把由 -files、-archives、和 -libjars 等選項所指定的文件複製到分佈式文件系統之中,任務運行前,節點管理器從分佈式文件系統中複製文件到本地。緩存

 1) -files 選項指定待分發的文件,文件內包含以逗號隔開的 URL 列表。文件能夠存放在本地文件系統、HDFS、或其它 Hadoop 可讀文件系統之中。 若是還沒有指定文件系統,則這些文件被默認是本地的。即便默認文件系統並不是本地文件系統,這也是成立的。app

 2) -archives 選項向本身的任務中複製存檔(壓縮)文件,好比JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,這些文件會被解檔到任務節點。分佈式

 3) -libjars 選項把 JAR 文件添加到 mapper 和 reducer 任務的類路徑中。若是做業 JAR 文件並不是包含不少庫 JAR 文件,這點會頗有用。ide

(3)緩存文件刪除機制函數

節點管理器爲緩存中的文件各維護一個計數器,任務運行時,文件計數器加1,任務完成後,計數器減1,計數器爲0時才能刪除文件,當節點緩存容量大於必定值(yarn.nodemanger.localizer.cache.target-size-mb設置,默認10GB),纔會刪除最近最少使用的文件。

(4)Job的分佈式緩存API

除了能夠用命令行參數指定緩存文件外,還以經過Job的API指定緩存文件;即經過job對象調用下面的函數設置緩存文件。

//如下兩組方法將文件或存檔添加到分佈式緩存

public void addCacheFile(URI uri);

public void addCacheArchive(URI uri);

//如下兩組方法將一次性向分佈式緩存中添加一組文件或存檔

public void setCacheFiles(URI[] files);

public void setCacheArchives(URI[] archives);

//如下兩組方法將文件或存檔添加到 MapReduce 任務的類路徑

public void addFileToClassPath(Path file);

public void addArchiveToClassPath(Path archive);

public void createSymlink();

(6)DistributedCache緩存小數據集實現hadoop map端鏈接實例

下面的實例是將城市名稱的數據集和用戶信息的數據集進行鏈接,城市名稱的數據集很小,而用戶信息的數據集很大,因此能夠採用緩存文件的方式,將城市信息數據集發送到任務,map任務經過setup方法從緩存中讀取小數據集文件tb_dim_city.dat,在內存中造成map映射,map函數處理用戶信息數據,根據用戶信息中的城市id去map映射中找到城市名稱,而後合併輸出。

package Temperature;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 *
 *
用途說明:  
 * Map side join中的left outer join  
 * 左鏈接,兩個文件分別表明2個表,鏈接字段table1的id字段和table2的cityID字段  
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),  
 * 假設tb_dim_city文件記錄數不多,tb_dim_city.dat文件內容,分隔符爲"|":  
 * id     name  orderid  city_code  is_show  
 * 0       其餘        9999     9999         0  
 * 1       長春        1        901          1  
 * 2       吉林        2        902          1  
 * 3       四平        3        903          1  
 * 4       松原        4        904          1  
 * 5       通化        5        905          1  
 * 6       遼源        6        906          1  
 * 7       白城        7        907          1  
 * 8       白山        8        908          1  
 * 9       延吉        9        909          1  
 * -------------------------風騷的分割線-------------------------------  
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)  
 * tb_user_profiles.dat文件內容,分隔符爲"|":  
 * userID   network     flow    cityID  
 * 1           2G       123      1  
 * 2           3G       333      2  
 * 3           3G       555      1  
 * 4           2G       777      3  
 * 5           3G       666      4  
 * -------------------------風騷的分割線-------------------------------  
 *  結果:  
 *  1   長春  1   901 1   1   2G  123  
 *  1   長春  1   901 1   3   3G  555  
 *  2   吉林  2   902 1   2   3G  333  
 *  3   四平  3   903 1   4   2G  777  
 *  4   松原  4   904 1   5   3G  666  
 */
public class MapSideJoinMain extends Configured implements Tool{
    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
    public static class LeftOutJoinMapper extends Mapper {

        private HashMap city_info = new HashMap<String,String>();
        private Text outPutKey = new Text();
        private Text outPutValue = new Text();
        private String mapInputStr = null;
        private String mapInputSpit[] = null;
        private String city_secondPart = null;
        /**
         *
此方法在每一個task開始以前執行,這裏主要用做從DistributedCache  
         * 中取到tb_dim_city文件,並將裏邊記錄取出放到內存中。  
         */
       
@Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            BufferedReader br = null;
            //得到當前做業的DistributedCache相關文件  
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
            String cityInfo = null;
            for(Path p : distributePaths){
                if(p.toString().endsWith("tb_dim_city.dat")){
                    //讀緩存文件,並放到mem中  
                    br = new BufferedReader(new FileReader(p.toString()));
                    while(null!=(cityInfo=br.readLine())){
                        String[] cityPart = cityInfo.split("\\|",5);
                        if(cityPart.length ==5){
                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);
                        }
                    }
                }
            }
        }

        /**
         * Map
端的實現至關簡單,直接判斷tb_user_profiles.dat中的  
         * cityID是否存在個人map中就ok了,這樣就能夠實現Map Join了  
         */
       
protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            //排掉空行  
            if(value == null || value.toString().equals("")){
                return;
            }
            mapInputStr = value.toString();
            mapInputSpit = mapInputStr.split("\\|",4);
            //過濾非法記錄  
            if(mapInputSpit.length != 4){
                return;
            }
            //判斷連接字段是否在map中存在  
            city_secondPart = (String) city_info.get((Object) mapInputSpit[3]);
            if(city_secondPart != null){
                this.outPutKey.set(mapInputSpit[3]);
                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);
                context.write(outPutKey, outPutValue);
            }
        }
    }
   
    public int run(String[] args) throws Exception {
        Configuration conf=getConf(); //得到配置文件對象  
        DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//爲該job添加緩存文件  
        Job job=new Job(conf,"MapJoinMR");
        job.setNumReduceTasks(0);

        FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑  
        FileOutputFormat.setOutputPath(job, new Path(args[2])); //設置reduce輸出文件路徑

        job.setJarByClass(MapSideJoinMain.class);
        job.setMapperClass(LeftOutJoinMapper.class);

        job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式  
        job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格式

        //設置map的輸出key和value類型  
        job.setMapOutputKeyClass(Text.class);

        //設置reduce的輸出key和value類型  
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.waitForCompletion(true);
        return job.isSuccessful()?0:1;
    }
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        try {
            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);
            System.exit(returnCode);
        } catch (Exception e) {
            // TODO Auto-generated catch block  
           
logger.error(e.getMessage());
        }
    }
}

實例參考文獻:

http://www.javashuo.com/article/p-ethukfpi-ep.html

 

本身開發了一個股票智能分析軟件,功能很強大,須要的點擊下面的連接獲取:

http://www.javashuo.com/article/p-kahdodke-ge.html

相關文章
相關標籤/搜索