半鏈接(Semi-join)

    假設一個場景,須要鏈接兩個很大的數據集,例如,用戶日誌和 OLTP 的用戶數據。任何一個數據集都不是足夠小到能夠緩存在 map 做業的內存中。能夠思考如下問題:若是在數據集的鏈接操做中,一個數據集中有的記錄因爲由於沒法鏈接到另外一個數據集的記錄,將會被移除。這樣還須要將整個數據集放到內存中嗎?java

    在這個例子中,在用戶日誌中的用戶僅僅是 OLTP 用戶數據中的用戶中的很小的一部分。那麼就能夠從 OLTP 用戶數據中只取出存在於用戶日誌中的那部分用戶的用戶數據。而後就能夠獲得足夠小到能夠放在內存中的數據集。這種的解決方案就叫作半鏈接。web

    應用場景:
    須要鏈接兩個都很大的數據集,同時避免通過 shuffle 和 sort 階段。解決方案:apache

    在這個技術中,將會用到三個 MapReduce 做業來鏈接兩個數據集,以此來減小 reduce 端鏈接的消耗。這個技術在這種場景下很是有用:鏈接兩個很大的數據集,可是能夠經過過濾與另外一個數據集不匹配的記錄來減小數據的大小,使得能夠放入 task 的內存中。緩存

    下圖說明了在半鏈接中將要執行的三個 MapReduce 做業(Job)。服務器


[例]使用半鏈接。app

    準備數據集:分佈式

     有兩個數據集 logs.txt 和 users.txt。其中 users.txt 中爲用戶數據,包括用戶名、年齡和所在地區;logs.txt爲基於用戶的一些活動(可從應用程序或 web 服務器日誌中抽取出來),包括用戶名、活動、源 IP 地址。ide

    文件 users.txt:函數

        
    文件 logs.txt:    
oop


JOB 1:

    第一個 MapReduce job 的功能是從日誌文件中提取出用戶名,用這些用戶名生成一個用戶名惟一的集合(Set)。這經過在 map 函數執行用戶名的投影操做來實現,並反過來使用 reducer 來產生這些用戶名。爲了減小在 map 階段和 reduce 階段之間傳輸的數據量,採用以下方法:在 map 任務中採用哈希集 HashSet來緩存全部的用戶名,並在 cleanup 方法中輸出該 HashSet 的值。下圖說明了這個 job 的流程:


做業1的代碼:

package com.edu360.mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
 * java 一出 誰與爭鋒
 * <p>
 * .............................................
 * 佛祖保佑             永無BUG
 *
 * @Auther: caozhan
 * @Date: 2018/10/29 17:51
 * @Description:
 */
//從logs.txt 表中抽取用戶名(考慮外鍵引用關係,這裏至關於先在從表中找出被引用的外鍵列惟一值)
public class SemiJoinJob1 extends Configured implements Tool {
    //使用keyValueTextInputFormat 類,輸入的是logs.txt 表中的每條記錄
    public static class Map extends Mapper<Text,Text, Text, NullWritable>{
        //緩存用戶名過濾後的小數據集
        private Set<String> keys = new HashSet<>();
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            //把用戶名加入緩存,重複的用戶名只會保留一個
            keys.add(key.toString());
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            Text outputkey = new Text();
            for(String key:keys){
                outputkey.set(key);
                //從mapper輸出緩存的用戶名
                context.write(outputkey,NullWritable.get());
            }
        }
    }
    public static class Reduce extends Reducer<Text,NullWritable,Text,NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            //從reduce輸出每一個用戶名一次
            context.write(key,NullWritable.get());
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Path inputPath = new Path(args[0]);
        Path outPath = new Path(args[1]);
        Job job1= Job.getInstance(getConf(),"SemiJoinJob1");
        job1.setJarByClass(getClass());
        job1.setMapperClass(Map.class);
        job1.setReducerClass(Reduce.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(NullWritable.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(NullWritable.class);
        job1.setInputFormatClass(KeyValueTextInputFormat.class);
        job1.setOutputFormatClass(TextOutputFormat.class);

        //若是輸出目錄在 先刪除
        /*
        FileSystem fs = FileSystem.get(outPath.toUri(),getConf());
        if(fs.exists(outPath)){
            fs.delete(outPath,true);
        }
         */
        FileInputFormat.setInputPaths(job1,inputPath);
        FileOutputFormat.setOutputPath(job1,outPath);
        if(job1.waitForCompletion(true)){
            return 0;
        }
        return 1;
    }
    public static void main(String [] args)throws Exception{
        int returnCode = ToolRunner.run(new SemiJoinJob1(),args);
        System.exit(returnCode);
    }
}

做業 1 的結果就是來自於日誌文件中的全部用戶的集合。集合中的用戶名是惟一的。

Job2:

    第二步是一個複雜的過濾 MapReduce job,目標是從全體用戶的用戶數據集中移除不存在於日誌文件中的用戶。這是一個 map-only job,它使用一個複製鏈接來緩存出如今日誌文件中的用戶名,並把他們和用戶數據集進行鏈接。因爲 job 1 輸出的惟一用戶的數據集實際上要遠遠小於整個用戶數據集,因此很天然地就把來自 job 1 的惟一用戶集放到緩存中了。下圖說明了這個做業的流程:


    這是一個複製鏈接,與上一節學習的複製鏈接同樣。

Job 2 的 mapper 代碼以下:(注意,要先上傳 job1 的輸出文件 part-r-00000 到分佈式緩存)

package com.edu360.mapreduce;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;


/**
 * java 一出 誰與爭鋒
 * <p>
 * .............................................
 * 佛祖保佑             永無BUG
 *
 * @Auther: caozhan
 * @Date: 2018/10/29 23:20
 * @Description:
 */
public class SemiJoinJob2 extends Configured implements Tool {
    public static class Map extends Mapper<Object, Text,Text, NullWritable>{
        public static final String CATCH_USERNAME_FILENAME="part-000";
        private Set<String> userSet=new HashSet<>();
        private Text outputKey = new Text();
        //在map()函數執行以前,從分佈式緩存中讀取被緩存到本地

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            URI[] patternsURIs = context.getCacheFiles(); // 獲取緩存文件的 uri
            Path patternsPath = new Path(patternsURIs[0].getPath()); // 這裏咱們只緩存了一個文件
            String patternsFileName = patternsPath.getName(); // 得到緩存文件的文件名
            System.out.println("patternsFileName: " + patternsFileName);
            // 從分佈式緩存中讀取 job 1 的輸出,並存入到 HashMap 中
            if (CATCH_USERNAME_FILENAME.equals(patternsFileName)) {
                BufferedReader br = new BufferedReader(new FileReader("/hadoop/semijoin/output1/"));
                 String line = br.readLine();
                while (line != null) {
                    String username = line; userSet.add(username); // 放入 HashSet 中 line = br.readLine();
                }
                br.close(); }
            if (userSet.isEmpty()) {
                throw new IOException("unable to load unique user table");
            }
        }
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String row = value.toString();
            String[] tokens = row.split("\t");
            String username = tokens[0]; // 取每條用戶記錄中的用戶名字段
            if (userSet.contains(username)) { // 過濾
                outputKey.set(row);
                context.write(outputKey, NullWritable.get()); // 輸出整行用戶記錄
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        if (args.length < 3) {
            System.err.println("用法: SemiJoinJob2 <userpath> <outpath> <catchpath>");
            System.exit(-1);
        }
        Path inputPath = new Path(args[0]); // 應該爲 job 1 的輸出:part-r-00000
        Path outputPath = new Path(args[1]);
        Path cachePath = new Path(args[2]);
        Job job2 = Job.getInstance(getConf(), "SemiJoinJob2");
        // 將 part-r-00000 文件放入分佈式緩存中 // "/hadoop/semijoin/output1/part-r-00000" job2.addCacheFile(cachePath.toUri());
        job2.setJarByClass(getClass());
        job2.setMapperClass(Map.class);
        job2.setNumReduceTasks(0);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(NullWritable.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(NullWritable.class);
        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        // 若是輸出目錄存在,則先刪除
        FileSystem fs = FileSystem.get(outputPath.toUri(), getConf());
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        FileInputFormat.setInputPaths(job2, inputPath);
        FileOutputFormat.setOutputPath(job2, outputPath);
        if (job2.waitForCompletion(true)) {
            return 0;
        }
        return 1;
    }
    public static void main(String[] args) throws Exception {
        int returnCode = ToolRunner.run(new SemiJoinJob2(), args);
        System.exit(returnCode);
    }
}

做業 2 的輸出就是已被用戶日誌數據集的用戶名過濾過的用戶集了。

Job 3:

在這最後一步中,咱們將合併從 job 2 輸出的過濾後的用戶和原始的用戶日誌。如今被過濾後的用戶已經小到能夠駐留在內存中了,這樣就能夠將它們放入分佈式緩存中。下圖演示了這個 job 的流程:

package com.edu360.mapreduce;


import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;


/**
 * java 一出 誰與爭鋒
 * <p>
 * .............................................
 * 佛祖保佑             永無BUG
 *
 * @Auther: caozhan
 * @Date: 2018/10/29 23:46
 * @Description:
 */
public class SemiJoinJob3 extends Configured implements Tool {
    public static class JoinMap extends Mapper<Object, Text, Text, Text> {
        public static final String CATCH_USERS_FILENAME = "part-m-00000";
        private Map<String, String> usersMap = new HashMap<>();
        private Text outputKey = new Text();
        private Text outputValue = new Text();
        // 在 map()函數執行以前,從分佈式緩存中讀取被緩存到本地的文件

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            URI[] patternsURIs = context.getCacheFiles(); // 獲取緩存文件的 uri
            Path patternsPath = new Path(patternsURIs[0].getPath()); // 這裏咱們只緩存了一個文件
            String patternsFileName = patternsPath.getName().toString(); // 得到緩存文件的文件名
            // 從分佈式緩存中讀取 job 2 的輸出,並存入到 HashMap 中
            if (CATCH_USERS_FILENAME.equals(patternsFileName)) {
                patternsFileName = "/hadoop/semijoin/output2/" + patternsFileName;
                BufferedReader br = new BufferedReader(new FileReader(patternsFileName));
                String line = br.readLine();
                while (line != null) {
                    String[] tokens = line.split("\t");
                    String username = tokens[0];
                    String content = line;
                    usersMap.put(username, content); // 放入 HashMap 中
                    line = br.readLine();
                }
                br.close();
            }
            if (usersMap.isEmpty()) {
                throw new IOException("unable to load users catch table");
            }
        }
        // 輸入的是 logs.txt 中的日誌信息,須要和緩存中的用戶信息鏈接 @Override
        protected void map(Object key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            String row = value.toString();
            String[] tokens = row.split("\t");
            String username = tokens[0]; // 取每條日誌記錄的用戶名字段
            String user = usersMap.get(username); // 根據 username,找到對應的(緩存的)用戶記錄
            outputKey.set(row);
            outputValue.set(user);
            context.write(outputKey, outputValue);
        }
    }
        public int run(String[] args) throws Exception { if (args.length < 3) {
            System.err.println("用法: SemiJoinJob3 <logspath> <outpath> <catchpath>");
            System.exit(-1); }
            Path inputPath = new Path(args[0]); // 應該爲 job 2 的輸出:part-r-00000
            Path outputPath = new Path(args[1]);
            Path cachePath = new Path(args[2]);
            Job job3 = Job.getInstance(getConf(), "SemiJoinJob3");
            // 將 part-r-00000 文件放入分佈式緩存中
            // "/hadoop/semijoin/output2/part-m-00000" job3.addCacheFile(cachePath.toUri());
            job3.setJarByClass(getClass());
            job3.setMapperClass(JoinMap.class);
            job3.setNumReduceTasks(0);
            job3.setMapOutputKeyClass(Text.class);
            job3.setMapOutputValueClass(Text.class);
            job3.setOutputKeyClass(Text.class);
            job3.setOutputValueClass(Text.class);
            job3.setInputFormatClass(TextInputFormat.class);
            job3.setOutputFormatClass(TextOutputFormat.class);
            // 若是輸出目錄存在,則先刪除
            FileSystem fs = FileSystem.get(outputPath.toUri(), getConf());
            if (fs.exists(outputPath)) {
                fs.delete(outputPath, true);
            }
            FileInputFormat.setInputPaths(job3, inputPath);
            FileOutputFormat.setOutputPath(job3, outputPath);
            if (job3.waitForCompletion(true)) {
                return 0;
            }
            return 1;
    }
    public static void main(String[] args) throws Exception {
        int returnCode = ToolRunner.run(new SemiJoinJob3(), args);
        System.exit(returnCode);
    }
}

小結:

這一節學習了怎樣使用一個半鏈接(semi-join)來合併兩個數據集。半鏈接結構包含比其餘鏈接更多的步驟,可是當處理大數據集時(其中有一個數據集必須可被消減到適合放入內存的大小),使用半鏈接是很給力的方式。

相關文章
相關標籤/搜索