mr利用shuffle階段來實現數據去重的功能

1、需求前提app

  小張同窗從別處整來了一批日誌,日誌內容主要記錄了每一個用戶在運營商中所使用過的手機號,若是沒有則爲空,數據大概是用戶帳號、電信手機號、聯通手機號、移動手機號、鐵通手機號、小靈通等等。固然還有一個關聯關係靜態文件,主要記錄用戶所在的省和地市,數據是用戶帳號,所在省,所在市。日誌和靜態文件都存在hdfs中。如今小張同窗想要分析每一個省每一個地市有電信、聯通、移動、鐵通、小靈通手機號的用戶各有多少?框架

2、需求分析ide

  首先咱們要在mr中load進來這個關聯關係靜態文件,而後日誌數據可能會有重複,咱們須要對用戶層面上進行去重,最後進行分類統計。日誌

3、僞代碼實現orm

public class TestMapReduce extends Configured implements Tool {排序

  public static void main(String[] args) {接口

    Configuration conf = new Configuration();ip

    int result = ToolRunner.run(conf, new TestMapReduce(), args)get

    System.exit(result);hash

  }

  @Override

  public int run(String[] args) throws Exception{

    Configuration conf = getConf();

    conf.set("map","靜態文件路徑");

    //FileSystem hdfs = FileSystem.get(conf);

    Job job = Job.getInstance(conf,"testJob");

    job.setJarByClass(TestMapReduce.class);

    //指定輸入目錄

    FileInputFormat.setInputPaths(job,new Path(filePath));

    //MultipleInputs.addInputPath(job,filePath,SequenceFileInputFormat.class,MyMapper.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(Text.class);//最好本身實現一個自定義數據類型,實現方式後面會提

    job.setCombinerClass(MyCombiner.class);

    job.setNumReduceTasks(1);//由於咱們要統計用戶總量,設置reduce任務數量來限制輸出結果,也不會出現用戶統計的混亂

    job.setReducerClass(MyReducer.class);
           job.setOutputFormatClass(SequenceFileOutputFormat.class);
           FileOutputFormat.setOutputPath(job,outputPath);

             if(!job.waitForCompletion(true)){
               return 1;
             }

             return 0;

  }

  public static class MyMapper extends Mapper<LongWritable,Text,Text,Text> {

    private Map<String,String> map = new HashMap<>();

   //此方法被MapReduce框架僅且執行一次,此前提是隻有一個reduce任務下,執行次數根據job.setNumReduceTasks(1)所設置的reduce任務數決定

   @Override
        protected void setup(Context context) throws IOException, InterruptedException{

            super.setup(context);

            Configuration conf = context.getConfiguration();

            map.clear();
            String mapInput = conf.get("map","");
            if(!mapInput.isEmpty()){
                Path mapInputPath = new Path(mapInput);
                if(存在該路徑){
                    for(取出文件內容){
                         map.put(用戶帳號,省+市);
                    }
                }
            }
        }      

   @Override
        protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{

    取到日誌的每一行來判斷是否具備各運營商的手機號,若是有設爲1,沒有設爲0

    context.write(省+市+用戶帳號,[1,0,1,1,0]);

   }

  public static class MyCombiner extends Reducer<Text,Text,Text,Text> {

        @Override
        protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            如今每一個省每一個市每一個用戶的數據分在一組,遍歷values;

     對值進行加和(大於等於1表明有該運營商的手機號)
            context.write(省+市+用戶帳號,[2,4,8,0,4]);
        }
      }

  public static class RadiusCountReducer extends Reducer<Text,Text,Text,Text> {

        private Map<String,String> countsMap = new HashMap<>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException{
            counts.clear();
        }

        @Override
        protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            long[] counts = new long[5];
            for(Text value : values) {
                for(int i=0; i<5; i++){
                    counts[i] += 對應的值;
                }
            }
            從key中取出省+市
            countsMap.put(省+市,counts[0]+...counts[5]);
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException{
            for(Map.Entry<String,String> entry: countsMap.entrySet()){
                context.write(Text.set(entry.getKey()),Text.set(countsMap.getValue()));
            }
        }
    }

  }

}

至此,大功告成,上面的代碼只是寫了大概的思路,具體的東西能夠本身嘗試寫一下。

至於上面提到的自定義類型,只要實現WritableComparable接口,重寫一些方法,可能須要實現的有write、readFields、compareTo、equals、hashCode、toString等,其餘的好像沒啥必要,看你本身的需求。

除了自定義類型,你也能夠自定義shuffle階段的排序和分組,之後有機會再說。

相關文章
相關標籤/搜索