能夠從一個大的數據集中篩選出具備某些特徵的小的數據集。java
在Mapper階段,用正則表達式對值進行過濾。在Ruducer階段,生成double類型的隨機數,判斷是否小於給定的數進行模擬簡單的隨機取樣。正則表達式
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.File; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * @Author bluesnail95 * @Date 2019/7/20 7:46 * @Description */ public class GrepMain { public static class GrepMapper extends Mapper<Object, Text, NullWritable,Text> { private String matchGrep = null; public void map(Object key,Text value,Context context) { matchGrep = context.getConfiguration().get("matchGrep"); Pattern pattern = Pattern.compile(matchGrep); Matcher matcher = pattern.matcher(value.toString()); if(matcher.matches()) { try { context.write(NullWritable.get(), value); } catch (Exception e) { e.printStackTrace(); } } } } public static class GrepReducer extends Reducer<NullWritable,Text,NullWritable,Text> { private Random random = new Random(); private Double percentage; public void reduce(NullWritable key,Iterable<Text> values,Context context) { String strPercentage = context.getConfiguration().get("filter_percentage"); percentage = Double.valueOf(strPercentage); for(Text value:values) { double rand = random.nextDouble(); if(rand < percentage) { try { context.write(NullWritable.get(), value); } catch (Exception e) { e.printStackTrace(); } } } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); //設置正則表達式:匹配全是字母的字符串 conf.set("matchGrep","^[a-zA-Z]+$"); conf.setDouble("filter_percentage",0.5); Job job = Job.getInstance(conf, "Grep"); //與本身定義的類名保持一致 job.setJarByClass(GrepMain.class); //與本身定義的Mapper類和Reducer類保持一致 job.setMapperClass(GrepMapper.class); job.setCombinerClass(GrepReducer.class); job.setReducerClass(GrepReducer.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.fullyDelete(new File(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } catch (Exception e) { e.printStackTrace(); } } }
判斷某個記錄是否存在於某個預判的集合中,存在失誤。apache
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; import java.io.File; import java.net.URI; /** * @Author bluesnail95 * @Date 2019/7/20 15:35 * @Description */ public class BloomFilterMain { public static class BloomFilterMapper extends Mapper<Object, Text,Text, NullWritable> { int vectorSize = BloomFilterUtil.getOptimalBloomFilterSize(10,0.1f); int nbHash = BloomFilterUtil.getOptimalK(10,vectorSize); BloomFilter bloomFilter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); //創建預約義集合 protected void setup(Context context) { try { bloomFilter.add(new Key("BeiJing".getBytes())); bloomFilter.add(new Key("ShangHai".getBytes())); } catch (Exception e) { e.printStackTrace(); } } public void map(Object key,Text value,Context context) { String word = value.toString(); //判斷值是否在預判的集合中 if(bloomFilter.membershipTest(new Key(word.getBytes()))) { try { context.write(value,NullWritable.get()); } catch (Exception e) { e.printStackTrace(); } } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Grep"); //與本身定義的類名保持一致 job.setJarByClass(BloomFilterMain.class); //與本身定義的Mapper類和Reducer類保持一致 job.setMapperClass(BloomFilterMapper.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.fullyDelete(new File(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } catch (Exception e) { e.printStackTrace(); } } }
查找出最熱門的K條記錄等等。設計模式
/** * @Author bluesnail95 * @Date 2019/7/20 17:09 * @Description */ public class Top10Data { private String userId; private Integer reputation; public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public Integer getReputation() { return reputation; } public void setReputation(Integer reputation) { this.reputation = reputation; } }
/** * @Author bluesnail95 * @Date 2019/7/20 17:09 * @Description */ public class Top10Main { public static class Top10Mapper extends Mapper<Object, Text, NullWritable,Text> { private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>(); public void map(Object key,Text value,Context context) { ObjectMapper objectMapper = new ObjectMapper(); try { Top10Data top10Data = objectMapper.readValue(value.toString(),Top10Data.class); Integer reputation = top10Data.getReputation(); String userId = top10Data.getUserId(); sortedMap.put(reputation,new Text(value)); if(sortedMap.size() > 10) { sortedMap.remove(sortedMap.firstKey()); } } catch (IOException e) { e.printStackTrace(); } } protected void cleanup(Context context) { for(Text t:sortedMap.values()) { try { context.write(NullWritable.get(),t); } catch (Exception e) { e.printStackTrace(); } } } } public static class Top10Reducer extends Reducer<NullWritable,Text,NullWritable,Text> { private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>(); public void reduce(NullWritable key,Iterable<Text> values,Context context) { for(Text value:values) { System.out.println(value.toString()); ObjectMapper objectMapper = new ObjectMapper(); try { Top10Data top10Data = objectMapper.readValue(value.toString(), Top10Data.class); int reputation = top10Data.getReputation(); String userId = top10Data.getUserId(); sortedMap.put(reputation,new Text(value)); if(sortedMap.size() > 10) { sortedMap.remove(sortedMap.firstKey()); } } catch (IOException e) { e.printStackTrace(); } } for (Text t:sortedMap.values()) { try { context.write(NullWritable.get(), t); } catch (Exception e) { e.printStackTrace(); } } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Top 10"); //與本身定義的類名保持一致 job.setJarByClass(Top10Main.class); //與本身定義的Mapper類和Reducer類保持一致 job.setMapperClass(Top10Mapper.class); job.setReducerClass(Top10Reducer.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.fullyDelete(new File(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } catch (Exception e) { e.printStackTrace(); } } }
注意輸出的時候是new Text(value)從新創建了一個對象。app
只找出K條記錄,不進行全排序。每次在TreeMap集合中加入新的元素後,判斷Map的大小是否大於K,若是是就移除第一個元素。dom
集合中存在重複的數據須要剔除,以簡化統計統計等。oop
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.File; /** * @Author bluesnail95 * @Date 2019/7/20 17:09 * @Description */ public class DistinctMain { public static class DistinctMapper extends Mapper<Object, Text,Text, NullWritable> { public void map(Object key,Text value,Context context) { try { context.write(value, NullWritable.get()); } catch (Exception e) { e.printStackTrace(); } } } public static class DistinctReducer extends Reducer<Text, NullWritable,Text, NullWritable> { public void reduce(Text key,Iterable<NullWritable> values,Context context) { try { context.write(key,NullWritable.get()); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Distinct"); //與本身定義的類名保持一致 job.setJarByClass(DistinctMapper.class); //與本身定義的Mapper類和Reducer類保持一致 job.setMapperClass(DistinctMapper.class); job.setReducerClass(DistinctReducer.class); //設置的輸出鍵和輸出值和mapper定義的須要保持一致。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(args[0])); FileUtil.fullyDelete(new File(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } catch (Exception e) { e.printStackTrace(); } } }
《MapReduce設計模式》this