MapReduce設計模式之過濾模式

過濾模式

過濾(正則過濾和隨機抽樣)

應用場景

能夠從一個大的數據集中篩選出具備某些特徵的小的數據集。java

代碼實現

在Mapper階段,用正則表達式對值進行過濾。在Ruducer階段,生成double類型的隨機數,判斷是否小於給定的數進行模擬簡單的隨機取樣。正則表達式

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 7:46
 * @Description
 */
public class GrepMain {

    public static class GrepMapper extends Mapper<Object, Text, NullWritable,Text> {
        private String matchGrep = null;

        public void map(Object key,Text value,Context context) {
            matchGrep = context.getConfiguration().get("matchGrep");
            Pattern pattern = Pattern.compile(matchGrep);
            Matcher matcher = pattern.matcher(value.toString());
            if(matcher.matches()) {
                try {
                    context.write(NullWritable.get(), value);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class GrepReducer extends Reducer<NullWritable,Text,NullWritable,Text> {
        private Random random = new Random();
        private Double percentage;

        public void reduce(NullWritable key,Iterable<Text> values,Context context) {
            String strPercentage = context.getConfiguration().get("filter_percentage");
            percentage = Double.valueOf(strPercentage);

            for(Text value:values) {
                double rand = random.nextDouble();
                if(rand < percentage) {
                    try {
                        context.write(NullWritable.get(), value);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            //設置正則表達式:匹配全是字母的字符串
            conf.set("matchGrep","^[a-zA-Z]+$");
            conf.setDouble("filter_percentage",0.5);
            Job job = Job.getInstance(conf, "Grep");
            //與本身定義的類名保持一致
            job.setJarByClass(GrepMain.class);
            //與本身定義的Mapper類和Reducer類保持一致
            job.setMapperClass(GrepMapper.class);
            job.setCombinerClass(GrepReducer.class);
            job.setReducerClass(GrepReducer.class);
            //設置的輸出鍵和輸出值和mapper定義的須要保持一致。
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            //輸入輸出路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

入參出參

過濾入參出參

運行結果

過濾輸出結果

布隆過濾

應用場景

判斷某個記錄是否存在於某個預判的集合中,存在失誤。apache

代碼實現

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

import java.io.File;
import java.net.URI;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 15:35
 * @Description
 */
public class BloomFilterMain {

    public static class BloomFilterMapper extends Mapper<Object, Text,Text, NullWritable> {
        int vectorSize = BloomFilterUtil.getOptimalBloomFilterSize(10,0.1f);
        int nbHash = BloomFilterUtil.getOptimalK(10,vectorSize);
        BloomFilter bloomFilter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH);

        //創建預約義集合
        protected void setup(Context context) {
            try {
                bloomFilter.add(new Key("BeiJing".getBytes()));
                bloomFilter.add(new Key("ShangHai".getBytes()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void map(Object key,Text value,Context context) {
            String word = value.toString();
            //判斷值是否在預判的集合中
            if(bloomFilter.membershipTest(new Key(word.getBytes()))) {
                try {
                    context.write(value,NullWritable.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Grep");
            //與本身定義的類名保持一致
            job.setJarByClass(BloomFilterMain.class);
            //與本身定義的Mapper類和Reducer類保持一致
            job.setMapperClass(BloomFilterMapper.class);
            //設置的輸出鍵和輸出值和mapper定義的須要保持一致。
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            //輸入輸出路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

出參入參

clipboard.png

運行結果

clipboard.png

top K問題

應用場景

查找出最熱門的K條記錄等等。設計模式

代碼實現

/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class Top10Data {

    private String userId;

    private Integer reputation;

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Integer getReputation() {
        return reputation;
    }

    public void setReputation(Integer reputation) {
        this.reputation = reputation;
    }
}
/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class Top10Main {

    public static class Top10Mapper extends Mapper<Object, Text, NullWritable,Text> {

        private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>();

        public void map(Object key,Text value,Context context) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                Top10Data top10Data = objectMapper.readValue(value.toString(),Top10Data.class);
                Integer reputation = top10Data.getReputation();
                String userId = top10Data.getUserId();
                sortedMap.put(reputation,new Text(value));
                if(sortedMap.size() > 10) {
                    sortedMap.remove(sortedMap.firstKey());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        protected void cleanup(Context context) {
            for(Text t:sortedMap.values()) {
                try {
                    context.write(NullWritable.get(),t);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Top10Reducer extends Reducer<NullWritable,Text,NullWritable,Text> {

        private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>();

        public void reduce(NullWritable key,Iterable<Text> values,Context context) {
            for(Text value:values) {
                System.out.println(value.toString());
                ObjectMapper objectMapper = new ObjectMapper();
                try {
                    Top10Data top10Data = objectMapper.readValue(value.toString(), Top10Data.class);
                    int reputation = top10Data.getReputation();
                    String userId = top10Data.getUserId();
                    sortedMap.put(reputation,new Text(value));

                    if(sortedMap.size() > 10) {
                        sortedMap.remove(sortedMap.firstKey());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            for (Text t:sortedMap.values()) {
                try {
                    context.write(NullWritable.get(), t);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Top 10");
            //與本身定義的類名保持一致
            job.setJarByClass(Top10Main.class);
            //與本身定義的Mapper類和Reducer類保持一致
            job.setMapperClass(Top10Mapper.class);
            job.setReducerClass(Top10Reducer.class);
            //設置的輸出鍵和輸出值和mapper定義的須要保持一致。
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            //輸入輸出路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意輸出的時候是new Text(value)從新創建了一個對象。app

只找出K條記錄,不進行全排序。每次在TreeMap集合中加入新的元素後,判斷Map的大小是否大於K,若是是就移除第一個元素。dom

入參出參

clipboard.png

運行結果

clipboard.png

去重

應用場景

集合中存在重複的數據須要剔除,以簡化統計統計等。oop

代碼實現

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class DistinctMain {

    public static class DistinctMapper extends Mapper<Object, Text,Text, NullWritable> {

        public void map(Object key,Text value,Context context) {
            try {
                context.write(value, NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class DistinctReducer extends Reducer<Text, NullWritable,Text, NullWritable> {

        public void reduce(Text key,Iterable<NullWritable> values,Context context) {
            try {
                context.write(key,NullWritable.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Distinct");
            //與本身定義的類名保持一致
            job.setJarByClass(DistinctMapper.class);
            //與本身定義的Mapper類和Reducer類保持一致
            job.setMapperClass(DistinctMapper.class);
            job.setReducerClass(DistinctReducer.class);
            //設置的輸出鍵和輸出值和mapper定義的須要保持一致。
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            //輸入輸出路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

出參入參

clipboard.png

運行結果

clipboard.png

參考資料

《MapReduce設計模式》this

相關文章
相關標籤/搜索