1、需求前提app
小張同窗從別處整來了一批日誌,日誌內容主要記錄了每一個用戶在運營商中所使用過的手機號,若是沒有則爲空,數據大概是用戶帳號、電信手機號、聯通手機號、移動手機號、鐵通手機號、小靈通等等。固然還有一個關聯關係靜態文件,主要記錄用戶所在的省和地市,數據是用戶帳號,所在省,所在市。日誌和靜態文件都存在hdfs中。如今小張同窗想要分析每一個省每一個地市有電信、聯通、移動、鐵通、小靈通手機號的用戶各有多少?框架
2、需求分析ide
首先咱們要在mr中load進來這個關聯關係靜態文件,而後日誌數據可能會有重複,咱們須要對用戶層面上進行去重,最後進行分類統計。日誌
3、僞代碼實現orm
public class TestMapReduce extends Configured implements Tool {排序
public static void main(String[] args) {接口
Configuration conf = new Configuration();ip
int result = ToolRunner.run(conf, new TestMapReduce(), args)get
System.exit(result);hash
}
@Override
public int run(String[] args) throws Exception{
Configuration conf = getConf();
conf.set("map","靜態文件路徑");
//FileSystem hdfs = FileSystem.get(conf);
Job job = Job.getInstance(conf,"testJob");
job.setJarByClass(TestMapReduce.class);
//指定輸入目錄
FileInputFormat.setInputPaths(job,new Path(filePath));
//MultipleInputs.addInputPath(job,filePath,SequenceFileInputFormat.class,MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);//最好本身實現一個自定義數據類型,實現方式後面會提
job.setCombinerClass(MyCombiner.class);
job.setNumReduceTasks(1);//由於咱們要統計用戶總量,設置reduce任務數量來限制輸出結果,也不會出現用戶統計的混亂
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job,outputPath);
if(!job.waitForCompletion(true)){
return 1;
}
return 0;
}
public static class MyMapper extends Mapper<LongWritable,Text,Text,Text> {
private Map<String,String> map = new HashMap<>();
//此方法被MapReduce框架僅且執行一次,此前提是隻有一個reduce任務下,執行次數根據job.setNumReduceTasks(1)所設置的reduce任務數決定
@Override
protected void setup(Context context) throws IOException, InterruptedException{
super.setup(context);
Configuration conf = context.getConfiguration();
map.clear();
String mapInput = conf.get("map","");
if(!mapInput.isEmpty()){
Path mapInputPath = new Path(mapInput);
if(存在該路徑){
for(取出文件內容){
map.put(用戶帳號,省+市);
}
}
}
}
@Override
protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
取到日誌的每一行來判斷是否具備各運營商的手機號,若是有設爲1,沒有設爲0
context.write(省+市+用戶帳號,[1,0,1,1,0]);
}
public static class MyCombiner extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
如今每一個省每一個市每一個用戶的數據分在一組,遍歷values;
對值進行加和(大於等於1表明有該運營商的手機號)
context.write(省+市+用戶帳號,[2,4,8,0,4]);
}
}
public static class RadiusCountReducer extends Reducer<Text,Text,Text,Text> {
private Map<String,String> countsMap = new HashMap<>();
@Override
protected void setup(Context context) throws IOException, InterruptedException{
counts.clear();
}
@Override
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
long[] counts = new long[5];
for(Text value : values) {
for(int i=0; i<5; i++){
counts[i] += 對應的值;
}
}
從key中取出省+市
countsMap.put(省+市,counts[0]+...counts[5]);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException{
for(Map.Entry<String,String> entry: countsMap.entrySet()){
context.write(Text.set(entry.getKey()),Text.set(countsMap.getValue()));
}
}
}
}
}
至此,大功告成,上面的代碼只是寫了大概的思路,具體的東西能夠本身嘗試寫一下。
至於上面提到的自定義類型,只要實現WritableComparable接口,重寫一些方法,可能須要實現的有write、readFields、compareTo、equals、hashCode、toString等,其餘的好像沒啥必要,看你本身的需求。
除了自定義類型,你也能夠自定義shuffle階段的排序和分組,之後有機會再說。