MapReduce中的Join算法

  在關係型數據庫中Join是很是常見的操做,各類優化手段已經到了極致。在海量數據的環境下,不可避免的也會碰到這種類型的需求,例如在數據分析時須要從不一樣的數據源中獲取數據。不一樣於傳統的單機模式,在分佈式存儲下采用MapReduce編程模型,也有相應的處理措施和優化方法。html

  咱們先簡要地描述待解決的問題。假設有兩個數據集:氣象站數據庫和天氣記錄數據庫 java

  氣象站的示例數據,以下數據庫

Station IDapache

Station Name編程

011990-99999緩存

SIHCCAJAVRI網絡

012650-99999app

TRNSET-HANSMOEN分佈式

  天氣記錄的示例數據,以下ide

Station ID

Timestamp

Temperature

012650-99999

194903241200

111

012650-99999

194903241800

78

011990-99999

195005150700

0

011990-99999

195005151200

22

011990-99999

195005151800

-11

  假設咱們想要以下結果

Station ID

Station Name

Timestamp

Temperature

011990-99999

SIHCCAJAVRI

195005150700

0

011990-99999

SIHCCAJAVRI

195005151200

22

011990-99999

SIHCCAJAVRI

195005151800

-11

012650-99999

TYNSET-HANSMOEN

194903241200

111

012650-99999

TYNSET-HANSMOEN

194903241800

78

  想一想看,咱們該怎麼經過MapReduce實現上面的需求?

 

   MapReduce鏈接操做的實現技術取決於數據集的規模及分區方式。若是一個數據集很大而另一個數據集很小,以致於小的數據集能夠分發到集羣中的每個節點之中,而後在mapper階段讀取大數據集中的數據;到reducer時,reduce獲取本節點上的數據(也就是小數據集中的數據)並完成鏈接操做;咱們以上面的天氣數據鏈接來作具體闡述,假設氣象站數據集不多,那將氣象站數據集分發到集羣中的每一個節點中,在mapper階段讀取天氣記錄數據,在reduce階段讀取本節點上的氣象站數據,而後經過氣象站數據中的氣象站ID和天氣數據中的氣象ID作鏈接,從而完成氣象站數據和天氣記錄數據的鏈接。在這種狀況下,咱們就用到了Hadoop的分佈式緩存機制,它可以在任務運行過程當中及時地將文件和存檔複製到任務節點以供使用。爲了節約網絡寬帶,在每個做業中,各個文件一般只須要複製到一個節點一次

  若是兩個數據集的規模均很大,以致於沒有哪一個數據集能夠被徹底複製到集羣的每一個節點中,咱們仍然可使用 MapReduce來進行鏈接,至於到底採用map端鏈接(鏈接操做若是由mapper執行,則稱爲 「map 端鏈接」)仍是reduce端鏈接(鏈接操做若是由reducer執行,則稱爲「reduce端鏈接」),則取決於數據的組織方式。下面咱們分別介紹map端鏈接和reduce端鏈接。

    map 端鏈接

      在兩個大規模輸入數據集到達map函數以前就應該執行鏈接操做。爲達到該目的,各map的輸入數據必須先分區而且以特定方式排序。各個輸入數據集被劃分紅相同數量的分區,而且均按相同的鍵(鏈接鍵)排序。同一鍵的全部記錄均會放在同一分區之中。聽起來彷佛要求很是嚴格,但這的確合乎MapReduce做業的輸出。

     map端鏈接操做能夠鏈接多個做業的輸出,只要這些做業的reducer數量相同、鍵相同而且輸出文件是不可切分的(例如,小於一個 HDFS 塊)。在上面講的天氣例子中,若是氣象站文件以氣象站ID部分排序,天氣記錄也以氣象站ID部分排序,並且reducer的數量相同,則就知足了執行map端鏈接的前提條件。

     利用 org.apache.hadoop.mapreduce.join 包中的CompositeInputFormat類來運行一個 map 端鏈接。CompositeInputFormat類的輸入源和鏈接類型(內鏈接或外鏈接)能夠經過一個鏈接表達式進行配置,鏈接表達式的語法簡單。此種方法不經常使用,這裏再也不贅述。

    reduce 端鏈接

      因爲reduce端鏈接並不要求輸入數據集符合特定結構,於是reduce端鏈接比 map 端鏈接更爲經常使用。可是,因爲兩個數據集均需通過MapReduce的shuffle過程, 因此reduce 端鏈接的效率每每要低一些。基本思路是mapper爲各個記錄標記源,而且使用鏈接鍵做爲 map 輸出鍵,使鍵相同的記錄放在同一reducer中。 咱們經過下面兩種技術實現reduce端鏈接。

     一、多輸入

       數據集的輸入源每每有多種格式,所以可使用 MultipleInputs 類來方便地解析各個數據源。MultipleInputs的用法,在「MapReduce輸入格式」已經介紹過,這裏就再也不贅述。

     二、二次排序

       如前所述,reducer在兩個數據源中選出鍵相同的記錄並不介意這些記錄是否已排好序。此外,爲了更好地執行鏈接操做,先將某一個數據源傳輸到reducer會很是重要。還以上面的天氣數據鏈接爲例,當天氣記錄發送到reducer的時候,與這些記錄有相同鍵的氣象站信息最好也已經放在reducer,使得reducer可以將氣象站名稱填到天氣記錄之中就立刻輸出。雖然也能夠不指定數據傳輸次序,並將待處理的記錄緩存在內存之中,但應該儘可能避免這種狀況,由於其中任何一組的記錄數量可能很是龐大,遠遠超出reducer的可用內存容量。 所以咱們用到二次排序技術,對map階段輸出的每一個鍵的值進行排序,實現這一效果。

 

  下面咱們分別介紹兩種實現方式分佈式緩存機制、reduce端鏈接

  一、分佈式緩存機制

    1、用法

      Hadoop 命令行選項中,有三個命令能夠實現文件分發到任務的各個節點。

        1)可使用-files選項指定待分發的文件,文件內包含以逗號隔開的URL列表。文件能夠存放在本地文件系統、HDFS、或其它Hadoop可讀文件系統之中。若是還沒有指定文件系統,則這些文件被默認是本地的。即便默認文件系統並不是本地文件系統,這也是成立的。

        2)可使用-archives選項向本身的任務中複製存檔文件,好比JAR文件、ZIP 文件、tar文件和gzipped tar文件,這些文件會被解檔到任務節點。

        3)可使用-libjars選項將JAR文件添加到mapper和reducer任務的類路徑中。若是做業JAR文件中並不是包含不少庫JAR文件,使用-libjars選項是很方便的。

    2、工做機制

      當啓動一個做業,Hadoop會把由-files、-archives、和-libjars等選項所指定的文件複製到分佈式文件系統之中。接着,在任務運行以前,tasktracker將文件從分佈式文件系統複製到本地磁盤(緩存)使任務可以訪問文件。此時,這些文件就被視爲「本地化」 了。從任務的角度來看, 這些文件就已經在那兒了,它並不關心這些文件是否來自 HDFS 。此外,有-libjars指定的文件會在任務啓動前添加到任務的類路徑(classpath)中。

   三、分佈式緩存API

     因爲能夠經過Hadoop命令行間接使用分佈式緩存,因此大多數應用不須要使用分佈式緩存API。然而,一些應用程序須要用到分佈式緩存的更高級的特性,這就須要直接使用API了。 API包括兩部分:將數據放到緩存中的方法,以及從緩存中讀取數據的方法。

      1)首先掌握數據放到緩存中的方法,如下列舉 Job 中可將數據放入到緩存中的相關方法:

public void addCacheFile(URI uri); 
public void addCacheArchive(URI uri);// 以上兩組方法將文件或存檔添加到分佈式緩存 
public void setCacheFiles(URI[] files); 
public void setCacheArchives(URI[] archives);// 以上兩組方法將一次性向分佈式緩存中添加一組文件或存檔 
public void addFileToClassPath(Path file); 
public void addArchiveToClassPath(Path archive);// 以上兩組方法將文件或存檔添加到 MapReduce 任務的類路徑

           在緩存中能夠存放兩類對象:文件(files)和存檔(achives)。文件被直接放置在任務節點上,而存檔則會被解檔以後再將具體文件放置在任務節點上。
     2)其次掌握在map或者reduce任務中,使用API從緩存中讀取數據。          

public Path[] getLocalCacheFiles() throws IOException; 
public Path[] getLocalCacheArchives() throws IOException; 
public Path[] getFileClassPaths(); 
public Path[] getArchiveClassPaths();

     咱們可使用 getLocalCacheFiles()和getLocalCacheArchives()方法獲取緩存中的文件或者存檔的引用。當處理存檔時,將會返回一個包含解檔文件的目錄。相應的,用戶能夠經過 getFileClassPaths()和getArchivesClassPaths()方法獲取被添加到任務的類路徑下的文件和文檔。

 

  下面咱們仍然之前面的氣象站數據和天氣記錄數據爲例,使用分佈式緩存API,完成兩個數據集的鏈接操做。完整的 MapReduce 程序以下所示。

package com.buaa.distributedgache;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa.distributedgache
* @ClassName JoinRecordWithStationName
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-25 19:34:57
*/
public class JoinRecordWithStationName extends Configured implements Tool {
    
    public static class TemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] arr = value.toString().split("\t", 2);
            if (arr.length == 2) {
                context.write(new Text(arr[0]), value);
            }
        }
    }
    
    public static class TemperatureReducer extends Reducer<Text, Text, Text, Text> {
        // 定義Hashtable存放緩存數據
        private Hashtable<String, String> table = new Hashtable<String, String>();
        
        /**
         * 獲取分佈式緩存文件
         */
        @SuppressWarnings("deprecation")
        protected void setup(Context context) throws IOException, InterruptedException {
            // 返回本地文件路徑
            Path[] localPaths = (Path[]) context.getLocalCacheFiles();
            if (localPaths.length == 0) {
                throw new FileNotFoundException("Distributed cache file not found.");
            }
            
            // 獲取本地 FileSystem實例
            FileSystem fs = FileSystem.getLocal(context.getConfiguration());
            // 打開輸入流
            FSDataInputStream in = fs.open(new Path(localPaths[0].toString()));
            // 建立BufferedReader讀取器
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            // 按行讀取並解析氣象站數據
            String infoAddr = null;
            while ((infoAddr = br.readLine()) != null) {
                String[] records = infoAddr.split("\t");
                // key爲stationID,value爲stationName
                table.put(records[0], records[1]);
            }
        }

        public void reduce(Text key, Iterable< Text> values, Context context) throws IOException, InterruptedException {
            // 天氣記錄根據stationId獲取stationName
            String stationName = table.get(key.toString());
            for (Text value : values) {
                context.write(new Text(stationName), value);
            }
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        // 讀取配置文件
        Configuration conf = new Configuration();
        
        // 判斷路徑是否存在,若是存在,則刪除
        Path mypath = new Path(args[2]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }
        
        // 獲取一個job實例
        Job job = Job.getInstance(conf,"join");
        // 主類
        job.setJarByClass(JoinRecordWithStationName.class);
        
        // 設置record.txt文件做爲輸入
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 添加station.txt到分佈式緩存
        job.addCacheFile(new URI(args[1]));
        // 輸出目錄
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        
        // mapper
        job.setMapperClass(TemperatureMapper.class);
        // reduce
        job.setReducerClass(TemperatureReducer.class);
        
        // 輸出key類型
        job.setOutputKeyClass(Text.class);
        // 輸出value類型
        job.setOutputValueClass(Text.class);
        
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = { 
                "hdfs://hadoop1:9000/buaa/join/record.txt",
                "hdfs://hadoop1:9000/buaa/join/station.txt",
                "hdfs://hadoop1:9000/buaa/join/out/" 
            };
        int ec = ToolRunner.run(new Configuration(), new JoinRecordWithStationName(), args0);
        System.exit(ec);
    }
}

  添加分佈式緩存文件相對簡單,只需使用job.addCacheFile(new URI(cacheFilePath))方法添加緩存文件便可。須要注意的是,在獲取獲取緩存文件時,文件將以「本地的」Path 對象的形式返回。爲了讀取文件,用戶須要首先使用getLocal()方法得到一個Hadoop本地FileSystem實例。本程序中,咱們在Reduce的setup()方法中獲取緩存文件。

  如下是輸出結果,達到咱們預期的效果。

  clip_image002

  二、Reduce端鏈接

  咱們使用 TextPair 類構建組合鍵,包括氣象站ID 和「標記」。在這裏,「標記」 是一個虛擬的字段,其惟一目的是對記錄排序,使氣象站記錄比天氣記錄先到達。一種簡單的作法就是:對於氣象站記錄,設置「標記」的值設爲 0;對於天氣記錄,設置「標記」的值設爲1,代碼以下所示

package com.buaa.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName TextPair
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:54:05
*/
public class TextPair implements WritableComparable<TextPair>{
    // Text類型的實例變量first
    private Text first;
    // Text類型的實例變量second
    private Text second;
    
    public TextPair(){
        set(new Text(),new Text());
    }
    
    public TextPair(String first,String second){
        set(new Text(first),new Text(second));
    }
    
    public void set(Text first,Text second){
        this.first = first;
        this.second = second;
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }
    
    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }
    
    public boolean equals(TextPair tp) {
        return first.equals(tp.first) && second.equals(tp.second);
    }
    
    public String toStirng() {
        return first + "\t" + second;
    }
    
    @Override
    public int compareTo(TextPair o) {
        if(!first.equals(o.first)){
            return first.compareTo(o.first);
        }else if(!second.equals(o.second)){
            return second.compareTo(o.second);
        }
        
        return 0;
    }

    public Text getFirst() {
        return first;
    }

    public void setFirst(Text first) {
        this.first = first;
    }

    public Text getSecond() {
        return second;
    }

    public void setSecond(Text second) {
        this.second = second;
    }
}

  JoinStationMapper處理來自氣象站數據,代碼以下所示。

package com.buaa.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinStationMapper
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:55:42
*/
public class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text> {
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        // 解析氣象站數據
        String[] arr = line.split("\\s+");
        
        if (arr.length == 2) {// 知足這種數據格式
            // key=氣象站id value=氣象站名稱
            context.write(new TextPair(arr[0], "0"), new Text(arr[1]));
        }
    }
}

  JoinRecordMapper處理來自天氣記錄數據,代碼以下所示

package com.buaa.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinRecordMapper
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:56:55
*/
public class JoinRecordMapper extends Mapper<LongWritable,Text,TextPair,Text>{ 
    protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
        String line = value.toString();
        // 解析天氣記錄數據
        String[] arr = line.split("\\s+",2);
        
        if(arr.length == 2){
            //key=氣象站id  value=天氣記錄數據
            context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
        }  
    }
}

  因爲 TextPair 通過了二次排序,因此 reducer 會先接收到氣象站數據。所以從中抽取氣象站名稱,並將其做爲後續每條輸出記錄的一部分寫到輸出文件。JoinReducer 的代碼以下所示。

package com.buaa.secondarysort;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinReducer
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:54:24
*/
public class JoinReducer extends Reducer< TextPair,Text,Text,Text>{
    protected void reduce(TextPair key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
        Iterator<Text> iter = values.iterator();
        // 氣象站名稱
        Text stationName = new Text(iter.next());
        
        while(iter.hasNext()){
            // 天氣記錄的每條數據
            Text record = iter.next();
            
            Text outValue = new Text(stationName.toString() + "\t" + record.toString());
            
            context.write(key.getFirst(),outValue);
        }
    }        
}

  下面咱們定義做業的驅動類 JoinRecordWithStationName,在該類中,關鍵在於根據組合鍵的第一個字段(即氣象站 ID)進行分區和分組,JoinRecordWithStationName 類的代碼以下所示。

package com.buaa.secondarysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/** 
* @ProjectName JoinDemo
* @PackageName com.buaa
* @ClassName JoinRecordWithStationName
* @Description TODO
* @Author 劉吉超
* @Date 2016-05-24 22:57:24
*/
public class JoinRecordWithStationName extends Configured implements Tool {
    public static class KeyPartitioner extends Partitioner<TextPair, Text> {
        public int getPartition(TextPair key, Text value, int numPartitions) {
            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }
    
    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(TextPair.class, true);
        }
        
        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable wc1, WritableComparable wc2) {
            TextPair tp1 = (TextPair) wc1;
            TextPair tp2 = (TextPair) wc2;
            
            return tp1.getFirst().compareTo(tp2.getFirst());
        }
    }

    public int run(String[] args) throws Exception {
        // 讀取配置文件
        Configuration conf = new Configuration();
        
        // 判斷路徑是否存在,若是存在,則刪除
        Path mypath = new Path(args[2]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }

        // 新建一個任務
        Job job = Job.getInstance(conf, "join");
        // 主類
        job.setJarByClass(JoinRecordWithStationName.class);
        
        // 天氣記錄數據源
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, JoinRecordMapper.class);
        // 氣象站數據源
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, JoinStationMapper.class);
        // 輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        
        // 自定義分區
        job.setPartitionerClass(KeyPartitioner.class);
        // 自定義分組
        job.setGroupingComparatorClass(GroupComparator.class);
        
        // 指定Reducer
        job.setReducerClass(JoinReducer.class);
        
        // map key輸出類型
        job.setMapOutputKeyClass(TextPair.class);
        // reduce key輸出類型
        job.setOutputKeyClass(Text.class);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = { 
                "hdfs://hadoop1:9000/buaa/join/record.txt",
                "hdfs://hadoop1:9000/buaa/join/station.txt",
                "hdfs://hadoop1:9000/buaa/join/out/" 
        };
        int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args0);
        System.exit(exitCode);
    }
}

  如下是輸出結果,也達到咱們預期的效果。

  clip_image004

 

若是,您認爲閱讀這篇博客讓您有些收穫,不妨點擊一下右下角的【推薦】。
若是,您但願更容易地發現個人新博客,不妨點擊一下左下角的【關注我】。
若是,您對個人博客所講述的內容有興趣,請繼續關注個人後續博客,我是【劉超★ljc】。

本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,不然保留追究法律責任的權利。

相關文章
相關標籤/搜索