1.關於MapReducejava
MapReduce是一種可用於數據處理的編程模型,可以支持java、Python、C++等語言。MapReduce程序本質上是並行運行的,所以能夠處理大規模數據集,這也是它的優點。程序員
2.使用hadoop分析數據apache
hadoop提供了並行處理,咱們將查詢表示成MapReduce做業。編程
MapReduce任務過程分爲兩個處理階段:map階段和reduce階段。每一個階段都以鍵/值做爲輸入和輸出,並選擇它們的類型。程序員還須要定義兩個函數:map函數和reduce函數。數組
Java MapReduceapp
咱們須要三個東西:一個map函數,一個reduce函數和一些用來運行做業的代碼。map函數由mapper接口實現。函數
Mapper接口是一個泛型類型,有四個形參,分別指定map函數的輸入鍵、輸入值、輸出鍵和輸出值的類型。這些類型都可在org.apache.hadoop.io包中找到。其中,LongWritable類型至關於java中的Long類型、Text類型至關於java中的String類型、IntWritable類型至關於java中的Integer類型。oop
在主函數中常常使用的類有:orm
FileOutputFormat類中的靜態函數setOutputPath()來指定輸出路徑,該函數指定了reduce函數輸出文件的寫入目錄。在運行任務前該目錄不該該存在。接着經過setMapperClass()和setReducerClass()指定map和reduce類型。setOutputKeyClass()和setOutputValueClass()控制map和reduce函數的輸出類型。輸入的類型經過InputFormat類來控制,在設置定義map和reduce函數的類以後,JobClient類的靜態函數runJob()會提交做業並等待完成,最後將其進展狀況寫到控制檯。繼承
3.統計單詞數量代碼實例
package mapreduce01; //MapReduce工程名字
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//單詞計數
public class mytest {
static String INPUT_PATH="hdfs://master:9000/input/mr.txt"; //待統計的文件路徑
static String OUTPUT_PATH="hdfs://master:9000/output/mr.txt"; //統計結果存放的路徑
static class MyMapper extends Mapper <Object,Object,Text,IntWritable> { //定義繼承mapper類
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{ //定義map方法
String[] arr=value.toString().split(","); //文件中的單詞是以「,」分割的,並將每一行定義爲一個數組
for(int i=0;i<arr.length;i++){ //遍歷循環每一行,統計單詞出現的數量
context.write(new Text(arr[i]),new IntWritable(1));
}
}
}
static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ //定義繼承reducer類
protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{ //定義reduce方法
int count=0;
for(IntWritable c:values){ //統計同一個單詞的數量
count+=c.get();
}
IntWritable outValue=new IntWritable(count);
context.write(key,outValue);
}
}
public static void main(String[] args) throws Exception{ //main函數
Path outputpath=new Path(OUTPUT_PATH); //輸出路徑
Configuration conf=new Configuration();
Job job=Job.getInstance(conf); //定義一個job,啓動任務
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
4.統計去重代碼實例
package mapreduce01; //MapReduce工程名字
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//單詞去重
public class testquchong {
static String INPUT_PATH="hdfs://master:9000/quchong"; //待統計的文件
static String OUTPUT_PATH="hdfs://master:9000/quchong/qc"; //統計結果存放的路徑
static class MyMapper extends Mapper<Object,Text,Text,Text>{
private static Text line=new Text(); //text至關於string
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{
line=value;
context.write(line,new Text(",")); //以「,」規定格式,空格不容易控制,統計key,由於key值是惟一的
}
}
static class MyReduce extends Reducer<Text,Text,Text,Text>{
protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
context.write(key,new Text(""));
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setCombinerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.waitForCompletion(true);
}
}