大數據學習之七——MapReduce簡單代碼實例

1.關於MapReducejava

MapReduce是一種可用於數據處理的編程模型,可以支持java、Python、C++等語言。MapReduce程序本質上是並行運行的,所以能夠處理大規模數據集,這也是它的優點。程序員

2.使用hadoop分析數據apache

hadoop提供了並行處理,咱們將查詢表示成MapReduce做業。編程

MapReduce任務過程分爲兩個處理階段:map階段和reduce階段。每一個階段都以鍵/值做爲輸入和輸出,並選擇它們的類型。程序員還須要定義兩個函數:map函數和reduce函數。數組

Java  MapReduceapp

咱們須要三個東西:一個map函數,一個reduce函數和一些用來運行做業的代碼。map函數由mapper接口實現。函數

Mapper接口是一個泛型類型,有四個形參,分別指定map函數的輸入鍵、輸入值、輸出鍵和輸出值的類型。這些類型都可在org.apache.hadoop.io包中找到。其中,LongWritable類型至關於java中的Long類型、Text類型至關於java中的String類型、IntWritable類型至關於java中的Integer類型。oop

在主函數中常常使用的類有:orm

FileOutputFormat類中的靜態函數setOutputPath()來指定輸出路徑,該函數指定了reduce函數輸出文件的寫入目錄。在運行任務前該目錄不該該存在。接着經過setMapperClass()和setReducerClass()指定map和reduce類型。setOutputKeyClass()和setOutputValueClass()控制map和reduce函數的輸出類型。輸入的類型經過InputFormat類來控制,在設置定義map和reduce函數的類以後,JobClient類的靜態函數runJob()會提交做業並等待完成,最後將其進展狀況寫到控制檯。繼承

3.統計單詞數量代碼實例

package mapreduce01;  //MapReduce工程名字

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//單詞計數

public class mytest {    

static String INPUT_PATH="hdfs://master:9000/input/mr.txt";   //待統計的文件路徑

static String OUTPUT_PATH="hdfs://master:9000/output/mr.txt";    //統計結果存放的路徑

static class MyMapper extends Mapper <Object,Object,Text,IntWritable> {     //定義繼承mapper類

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    //定義map方法

String[] arr=value.toString().split(",");      //文件中的單詞是以「,」分割的,並將每一行定義爲一個數組

for(int i=0;i<arr.length;i++){      //遍歷循環每一行,統計單詞出現的數量

context.write(new Text(arr[i]),new IntWritable(1));  

  }

 }  

}    

static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{     //定義繼承reducer類

protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{      //定義reduce方法

int count=0;    

for(IntWritable c:values){     //統計同一個單詞的數量

count+=c.get();    

}    

IntWritable outValue=new IntWritable(count);    

context.write(key,outValue);    

}   

}  

 public static void main(String[] args) throws Exception{    //main函數

 Path outputpath=new Path(OUTPUT_PATH);    //輸出路徑

Configuration conf=new Configuration();   

Job job=Job.getInstance(conf);     //定義一個job,啓動任務

FileInputFormat.setInputPaths(job, INPUT_PATH);  

 FileOutputFormat.setOutputPath(job,outputpath);     

 job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);     

 job.setOutputKeyClass(Text.class);  

 job.setOutputValueClass(IntWritable.class);     

 job.waitForCompletion(true);  

}

}

4.統計去重代碼實例

package mapreduce01;  //MapReduce工程名字

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//單詞去重

public class testquchong {    

static String INPUT_PATH="hdfs://master:9000/quchong";   //待統計的文件

static String OUTPUT_PATH="hdfs://master:9000/quchong/qc";    //統計結果存放的路徑

static class MyMapper extends Mapper<Object,Text,Text,Text>{   

private static Text line=new Text();      //text至關於string

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{    

line=value;   

 context.write(line,new Text(","));     //以「,」規定格式,空格不容易控制,統計key,由於key值是惟一的

 }

 }    

static class MyReduce extends Reducer<Text,Text,Text,Text>{

protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{       

context.write(key,new Text(""));    

 }  

 public static void main(String[] args) throws Exception{   

Path outputpath=new Path(OUTPUT_PATH);   

Configuration conf=new Configuration();   

Job job=Job.getInstance(conf);     

 job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);   

job.setCombinerClass(MyReduce.class);     

 job.setOutputKeyClass(Text.class);  

 job.setOutputValueClass(Text.class);     

 FileInputFormat.setInputPaths(job, INPUT_PATH);   

FileOutputFormat.setOutputPath(job,outputpath);  

 job.waitForCompletion(true);

 }

}

相關文章
相關標籤/搜索