9.3.2 map端鏈接-CompositeInputFormat鏈接類

1.1.1         map端鏈接-CompositeInputFormat鏈接類

(1)使用CompositeInputFormat鏈接類須要知足三個條件html

1)兩個數據集都是大的數據集,不能用緩存文件的方式。java

2)數據集都是按照相同的鍵進行排序;apache

3)數據集有相同的分區數,同一個鍵的全部記錄在同一個分區中,輸出文件不可分割;api

要知足這三個條件,輸入數據在達到map端鏈接函數以前,兩個數據集被reduce處理,reduce任務數量相同都爲n,兩個數據集被分區輸出到n個文件,同一個鍵的全部記錄在同一個分區中,且數據集中的數據都是按照鏈接鍵進行排序的。reduce數量相同、鍵相同且都是按鍵排序、輸出文件是不可切分的(小於一個HDFS塊,或經過gzip壓縮實現),則就知足map端鏈接的前提條件。利用org.apach.hadoop.mapreduce.join包中的CompositeInputFormat類來運行一個map端鏈接。緩存

(2)CompositeInputFormat類簡介app

CompositeInputFormat類的做用就將job的輸入格式設置爲job.setInputFormatClass(CompositeInputFormat.class);同時經過conf的set(String name, String value)方法設置兩個數據集的鏈接表達式,表達式內容包括三個要素:鏈接方式(inner、outer、override、tbl等) ,讀取兩個數據集的輸入方式,兩個數據集的路徑。這三個要素按照必定的格式組織成字符串做爲表達式設置到conf中。ide

//設置輸入格式爲 CompositeInputFormat
job.setInputFormatClass(CompositeInputFormat.class);
//conf設置鏈接的表達式public static final String JOIN_EXPR = "mapreduce.join.expr";
Configuration conf = job.getConfiguration();
conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(
        "inner", KeyValueTextInputFormat.class,
        FileInputFormat.getInputPaths(job)));
//等價轉換以後就是以下表達式
//conf.set("mapreduce.join.expr", CompositeInputFormat.compose(
       // "inner", KeyValueTextInputFormat.class, userPath,commentPath));
 

CompositeInputFormat類的源碼以下函數

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.mapreduce.lib.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.join.Parser.CNode;
import org.apache.hadoop.mapreduce.lib.join.Parser.Node;
import org.apache.hadoop.mapreduce.lib.join.Parser.WNode;

@Public
@Stable
public class CompositeInputFormat<K extends WritableComparable> extends InputFormat<K, TupleWritable> {
    public static final String JOIN_EXPR = "mapreduce.join.expr";
    public static final String JOIN_COMPARATOR = "mapreduce.join.keycomparator";
    private Node root;

    public CompositeInputFormat() {
    }

    public void setFormat(Configuration conf) throws IOException {
        this.addDefaults();
        this.addUserIdentifiers(conf);
        this.root = Parser.parse(conf.get("mapreduce.join.expr", (String)null), conf);
    }

    protected void addDefaults() {
        try {//有默認的四種鏈接方式,每種鏈接方式都有對應的Reader
            CNode.addIdentifier("inner", InnerJoinRecordReader.class);
            CNode.addIdentifier("outer", OuterJoinRecordReader.class);
            CNode.addIdentifier("override", OverrideRecordReader.class);
            WNode.addIdentifier("tbl", WrappedRecordReader.class);
        } catch (NoSuchMethodException var2) {
            throw new RuntimeException("FATAL: Failed to init defaults", var2);
        }
    }

    private void addUserIdentifiers(Configuration conf) throws IOException {
        Pattern x = Pattern.compile("^mapreduce\\.join\\.define\\.(\\w+)$");
        Iterator i$ = conf.iterator();

        while(i$.hasNext()) {
            Entry<String, String> kv = (Entry)i$.next();
            Matcher m = x.matcher((CharSequence)kv.getKey());
            if (m.matches()) {
                try {
                    CNode.addIdentifier(m.group(1), conf.getClass(m.group(0), (Class)null, ComposableRecordReader.class));
                } catch (NoSuchMethodException var7) {
                    throw new IOException("Invalid define for " + m.group(1), var7);
                }
            }
        }

    }

    public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
        this.setFormat(job.getConfiguration());
        job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", 9223372036854775807L);
        return this.root.getSplits(job);
    }

    public RecordReader<K, TupleWritable> createRecordReader(InputSplit split, TaskAttemptContext taskContext) throws IOException, InterruptedException {
        this.setFormat(taskContext.getConfiguration());
        return this.root.createRecordReader(split, taskContext);
    }
//按格式組織鏈接表達式
    public static String compose(Class<? extends InputFormat> inf, String path) {
        return compose(inf.getName().intern(), path, new StringBuffer()).toString();
    }
//鏈接方式(inner、outer、override、tbl等) 、讀取兩個數據集的輸入方式、兩個數據集的路徑
    public static String compose(String op, Class<? extends InputFormat> inf, String... path) {
        String infname = inf.getName();//org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
        StringBuffer ret = new StringBuffer(op + '(');
        String[] arr$ = path;
        int len$ = path.length;

        for(int i$ = 0; i$ < len$; ++i$) {
            String p = arr$[i$];
            compose(infname, p, ret);
            ret.append(',');
        }

        ret.setCharAt(ret.length() - 1, ')');
        return ret.toString();
    }

    public static String compose(String op, Class<? extends InputFormat> inf, Path... path) {
        ArrayList<String> tmp = new ArrayList(path.length);
        Path[] arr$ = path;
        int len$ = path.length;

        for(int i$ = 0; i$ < len$; ++i$) {
            Path p = arr$[i$];
            tmp.add(p.toString());
        }

        return compose(op, inf, (String[])tmp.toArray(new String[0]));
    }

    private static StringBuffer compose(String inf, String path, StringBuffer sb) {
        sb.append("tbl(" + inf + ",\"");
        sb.append(path);
        sb.append("\")");
        return sb;
    }
}

 

 

其中主要的函數就是compose函數,他是一個重載函數:oop

public static String compose(String op, Class<? extends InputFormat> inf, String... path);this

op表示鏈接類型(inner、outer、override、tbl),inf表示數據集的輸入方式,path表示輸入數據集的文件路徑。這個函數的做用是將傳入的表達式三要素:鏈接方式(inner、outer、override、tbl等) 、讀取兩個數據集的輸入方式、兩個數據集的路徑組成字符串。假設conf按以下方式傳入三要素:

conf.set("mapreduce.join.expr", CompositeInputFormat.compose(

       "inner", KeyValueTextInputFormat.class,「/hdfs/inputpath/userpath」, 「/hdfs/inputpath/commentpath」));

compose函數最終得出的表達式爲:

inner(tbl(org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat,」 /hdfs/inputpath/userpath」),tbl(org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat,」 /hdfs/inputpath/ commentpath」))

 

如今我只能深刻到這裏,至於爲何要知足三個條件才能夠鏈接?設置表達式以後內部又是如何實現鏈接?有知道的歡迎留言討論。

(3)CompositeInputFormat實現map端鏈接的實例

成績數據和名字數據經過CompositeInputFormat實現map鏈接

成績數據:

1,yuwen,100

1,shuxue,99

2,yuwen,99

2,shuxue,88

3,yuwen,99

3,shuxue,56

4,yuwen,33

4,shuxue,99名字數據:

1,yaoshuya,25

2,yaoxiaohua,29

3,yaoyuanyie,15

4,yaoshupei,26

文件夾定義以下:

 

 

 

代碼:

package Temperature;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileUtil;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat;

import org.apache.hadoop.mapreduce.lib.join.TupleWritable;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;



import java.io.File;

import java.io.IOException;



public class CompositeJoin extends Configured implements Tool {

    private static class CompositeJoinMapper extends Mapper<Text, TupleWritable,Text,TupleWritable>

    {

        @Override

        protected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {

            context.write(key,value);

        }

    }

    public int run(String[] args) throws Exception {

        Path userPath = new Path(args[0]);

        Path commentPath = new Path(args[1]);

        Path output = new Path(args[2]);

        Job job=null;

        try {

            job = new Job(getConf(), "mapinnerjoin");

        } catch (IOException e) {

            e.printStackTrace();

        }

        job.setJarByClass(getClass());



        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(TupleWritable.class);



        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(TupleWritable.class);





        // 設置兩個輸入數據集的目錄

        FileInputFormat.addInputPaths(job, args[0]);

        FileInputFormat.addInputPaths(job, args[1]);

        //設置輸出目錄

        FileOutputFormat.setOutputPath(job,output);

        Configuration conf = job.getConfiguration();

        //設置輸入格式爲 CompositeInputFormat

        job.setInputFormatClass(CompositeInputFormat.class);

        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

        //conf設置鏈接的表達式public static final String JOIN_EXPR = "mapreduce.join.expr";

        //conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(

              //  "inner", KeyValueTextInputFormat.class,

             //   FileInputFormat.getInputPaths(job)));

        //等價轉換以後就是以下表達式

        String strExpretion=CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, userPath,commentPath);

       conf.set("mapreduce.join.expr",strExpretion );

        job.setOutputFormatClass(TextOutputFormat.class);



        job.setNumReduceTasks(0);//map端鏈接,reduce爲0,不使用reduce

        job.setMapperClass(CompositeJoinMapper.class);

        //鍵值屬性分隔符設置爲空格



        //刪除結果目錄,從新生成

        FileUtil.fullyDelete(new File(args[2]));

        return job.waitForCompletion(true)?0:1;

    }



    public static void main(String[] args) throws Exception

    {

        //三個參數,兩個鏈接的數據路徑,一個輸出路徑

        int exitCode= ToolRunner.run(new CompositeJoin(),args);

        System.exit(exitCode);

    }

}

 

 

設置run->edit Configuration設置輸入輸出路徑,兩個輸入,一個輸出

 

 

 

運行該類的main函數獲得結果

 

本身開發了一個股票智能分析軟件,功能很強大,須要的點擊下面的連接獲取:

http://www.javashuo.com/article/p-kahdodke-ge.html

相關文章
相關標籤/搜索