Map/Reduce中Join查詢實現

http://www.cnblogs.com/MengYan-LongYou/p/3360613.htmlhtml

 

在作這個Join查詢的時候,必然涉及數據,我這裏設計了2張表,分別較data.txtinfo.txt,字段之間以/t劃分。app

data.txt內容以下:ide

201001    1003    abc函數

201002    1005    defoop

201003    1006    ghithis

201004    1003    jklspa

201005    1004    mno設計

201006    1005    pqrorm

 

info.txt內容以下:htm

 

1003    kaka

1004    da

1005    jue

1006    zhao

 

指望輸出結果:

1003    201001    abc    kaka

1003    201004    jkl    kaka

1004    201005    mno    da

1005    201002    def    jue

1005    201006    pqr    jue

1006    201003    ghi    zhao

 

4、Map代碼

首先是map的代碼,我貼上,而後簡要說說

 

public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

            // 獲取輸入文件的全路徑和名稱

            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

 

            if (pathName.contains("data.txt")) {

                String values[] = value.toString().split("/t");

                if (values.length < 3) {

                    // data數據格式不規範,字段小於3,拋棄數據

                    return;

                } else {

                    // 數據格式規範,區分標識爲1

                    TextPair tp = new TextPair(new Text(values[1]), new Text("1"));

                    context.write(tp, new Text(values[0] + "/t" + values[2]));

                }

            }

            if (pathName.contains("info.txt")) {

                String values[] = value.toString().split("/t");

                if (values.length < 2) {

                    // data數據格式不規範,字段小於2,拋棄數據

                    return;

                } else {

                    // 數據格式規範,區分標識爲0

                    TextPair tp = new TextPair(new Text(values[0]), new Text("0"));

                    context.write(tp, new Text(values[1]));

                }

            }

        }

    }

 

這裏須要注意如下部分:

ApathName是文件在HDFS中的全路徑(例如:hdfs://M1:9000/MengYan/join/data/info.txt),能夠以endsWith()的方法來判斷。

B、資料表,也就是這裏的info.txt須要放在前面,也就是標識號是0.不然沒法輸出理想結果。

CMap執行完成以後,輸出的中間結果以下:

1003,0    kaka

1004,0    da

1005,0    jue

1006,0    zhao

1003,1    201001    abc

1003,1    201004    jkl

1004,1    201005    mon

1005,1    201002    def

1005,1    201006    pqr

1006,1    201003    ghi

 

5、分區和分組

1map以後的輸出會進行一些分區的操做,代碼貼出來:

public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {

        @Override

        public int getPartition(TextPair key, Text value, int numParititon) {

            return Math.abs(key.getFirst().hashCode() * 127) % numParititon;

        }

    }

分區我在之前的文檔中寫過,這裏不作描述了,就說是按照map輸出的符合key的第一個字段作分區關鍵字。分區以後,相同key會劃分到一個reduce中去處理(若是reduce設置是1,那麼就是分區有多個,可是仍是在一個reduce中處理。可是結果會按照分區的原則排序)。分區後結果大體以下:

 

同一區:

1003,0    kaka

1003,1    201001    abc

1003,1    201004    jkl

 

 

同一區:

1004,0    da

1004,1    201005    mon

 

 

同一區:

1005,0    jue

1005,1    201002    def

1005,1    201006    pqr

 

 

同一區:

1006,0    zhao

1006,1    201003    ghi

 

2、分組操做,代碼以下

 

public static class Example_Join_01_Comparator extends WritableComparator {

 

        public Example_Join_01_Comparator() {

            super(TextPair.class, true);

        }

 

        @SuppressWarnings("unchecked")

        public int compare(WritableComparable a, WritableComparable b) {

            TextPair t1 = (TextPair) a;

            TextPair t2 = (TextPair) b;

            return t1.getFirst().compareTo(t2.getFirst());

        }

    }

 

分組操做就是把在相同分區的數據按照指定的規則進行分組的操做,就以上來看,是按照複合key的第一個字段作分組原則,達到忽略複合key的第二個字段值的目的,從而讓數據可以迭代在一個reduce中。輸出後結果以下:

 

同一組:

1003,0    kaka

1003,0    201001    abc

1003,0    201004    jkl

 

同一組:

1004,0    da

1004,0    201005    mon

 

同一組:

1005,0    jue

1005,0    201002    def

1005,0    201006    pqr

 

同一組:

1006,0    zhao

1006,0    201003    ghi

 

6、reduce操做

貼上代碼以下:

public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

        protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,

                InterruptedException {

            Text pid = key.getFirst();

            String desc = values.iterator().next().toString();

            while (values.iterator().hasNext()) {

                context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));

            }

        }

    }

1、代碼比較簡單,首先獲取關鍵的ID值,就是key的第一個字段。

2、獲取公用的字段,經過排組織後能夠看到,一些共有字段是在第一位,取出來便可。

3、遍歷餘下的結果,輸出。

7、其餘的支撐代碼

1、首先是TextPair代碼,沒有什麼能夠細說的,貼出來:

public class TextPair implements WritableComparable<TextPair> {

    private Text first;

    private Text second;

 

    public TextPair() {

        set(new Text(), new Text());

    }

 

    public TextPair(String first, String second) {

        set(new Text(first), new Text(second));

    }

 

    public TextPair(Text first, Text second) {

        set(first, second);

    }

 

    public void set(Text first, Text second) {

        this.first = first;

        this.second = second;

    }

 

    public Text getFirst() {

        return first;

    }

 

    public Text getSecond() {

        return second;

    }

 

    public void write(DataOutput out) throws IOException {

        first.write(out);

        second.write(out);

    }

 

    public void readFields(DataInput in) throws IOException {

        first.readFields(in);

        second.readFields(in);

    }

 

    public int compareTo(TextPair tp) {

        int cmp = first.compareTo(tp.first);

        if (cmp != 0) {

            return cmp;

        }

        return second.compareTo(tp.second);

    }

}

2Job的入口函數

public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();

        GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);

        String[] otherArgs = parser.getRemainingArgs();

        if (agrs.length < 3) {

            System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");

            System.exit(2);

        }

 

        //conf.set("hadoop.job.ugi", "root,hadoop");

 

        Job job = new Job(conf, "Example_Join_01");

        // 設置運行的job

        job.setJarByClass(Example_Join_01.class);

        // 設置Map相關內容

        job.setMapperClass(Example_Join_01_Mapper.class);

        // 設置Map的輸出

        job.setMapOutputKeyClass(TextPair.class);

        job.setMapOutputValueClass(Text.class);

        // 設置partition

        job.setPartitionerClass(Example_Join_01_Partitioner.class);

        // 在分區以後按照指定的條件分組

        job.setGroupingComparatorClass(Example_Join_01_Comparator.class);

        // 設置reduce

        job.setReducerClass(Example_Join_01_Reduce.class);

        // 設置reduce的輸出

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        // 設置輸入和輸出的目錄

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        // 執行,直到結束就退出

        System.exit(job.waitForCompletion(true) ? 0 : 1);

 

    }

 

8、總結

1、這是個簡單的join查詢,能夠看到,我在處理輸入源的時候是在map端作來源判斷。其實在0.19能夠用MultipleInputs.addInputPath()的方法,可是它用了JobConf作參數。這個方法原理是多個數據源就採用多個map來處理。方法各有優劣。

2、對於資源表,若是咱們採用01這樣的模式來區分,資源表是須要放在前的。例如本例中info.txt就是資源表,因此標識位就是0.若是寫爲1的話,能夠試下,在分組以後,資源表對應的值放在了迭代器最後一位,沒法追加在最後全部的結果集合中。

3、關於分區,並非全部的map都結束纔開始的,一部分數據完成就會開始執行。一樣,分組操做在一個分區內執行,若是分區完成,分組將會開始執行,也不是等全部分區完成纔開始作分組的操做。

相關文章
相關標籤/搜索