Hadoop MapReduce基礎案例java
MapReduce:Hadoop分佈式並行計算框架web
思想:分治法apache
工廠給客戶交付貨物1000噸,卡車A運量50噸,須要順序20次,若是平時客戶不忙20次運輸所需的時間客戶可以接受,忽然市場競爭激烈,工廠爲了提供失效,每次運輸從單臺卡車運輸提升到20臺卡車運輸,這樣整個運量1次就搞定,Map Reduce相似,就是將一些廉價機器組成一個集羣,每一個節點都處理整個做業的一部分,最後進行彙總,從而快速提升大數據的處理能力app
HDFS提供數據源,通過splitting將數據切割成數據片,表示爲K-V數據模型做爲Mapping的輸入,Mapping對數據進行進一步的處理,並造成客戶須要的數據K-V數據模型,Shuffing對Mapping產生的K-V數據模型根據key進行排序彙總,而後將數據傳給Reduce做業,reduce對key-ValueList進行相應的處理,最終彙總出最終的結果框架
基礎案例:統計文本中出現的單詞及個數wordcount分佈式
① :啓動Hadoop集羣:start-all.shide
② :準備測試的數據源函數
HDFS的存儲目錄工具
③ :建立Mapper:數據分割oop
package mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WcMapper extends Mapper<LongWritable,Text,Text, IntWritable>{ @Override public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String line = value.toString(); StringTokenizer st = new StringTokenizer(line); //分詞工具類 while(st.hasMoreTokens()){ String word = st.nextToken(); //獲取單詞信息 /** * 將數據寫入shufer過程,用於reduce的結果合併 * new Text(word):做爲key * new IntWritable(1):做爲value,reduce會對其進行合併 */ context.write(new Text(word), new IntWritable(1)); } } }
Mapper泛型參數解釋:
LongWritable,Text,Text, IntWritable
LongWritable:Hadoop序列化的類型,可理解成Long的包裝類,這裏表示splitting Data的編號,hadoop默認按照」行」進行數據切割,這裏能夠近似理解爲第N行
Text:表示輸入數據的格式:這裏文件中存放的是字符串,使用Text
Text:表示map函數輸出的key的類型,所以是單詞計數,因此是單詞爲key,Text類型
IntWritable:表示map函數的輸出的value的類型,這裏表示單詞出現的次數
context.write(new Text(word), new IntWritable(1)); 表示每次出現一個單詞就向shuffling輸出數據:單詞及次數1,也就是說每次單詞出現一次就輸出1
④ :建立Reducer:數據彙總
package mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text text, Iterable<IntWritable> iterable, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable intWritable:iterable){ sum = sum+intWritable.get(); } context.write(new Text(text), new IntWritable(sum)); } }
Reducer泛型參數解釋:
說明Mapper傳遞參數和Reducer的輸出參數是字符串和整型,對應單詞和單詞次數
reduce方法形參解釋
例如:Map傳遞的值多是片斷
zhangsan 1
zhangsan 1
Map事後進入shuffling過程,shuffling會將Mapper的數據進行彙總,變成相似形式 zhangsan{1,1},也就是key-valueList數據模型,而後將其傳遞給reduce函數
int sum = 0; for(IntWritable intWritable:iterable){ sum = sum+intWritable.get(); } context.write(new Text(text), new IntWritable(sum));
對valueList集合中的值進行彙總,獲得單個單詞的累計出現次數
⑤ :建立Job測試類:Job做爲Mapreduce做業的啓動類,主要是將做業交給JobTracker,JobTacker經過調度Hadoop集羣中的taskTracker進行做業處理
package mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** *運行mapreduce做業:mapreduce打成jar包 *控制檯運行:hadoop jar jar包路徑 JobRun類便可運行 * *MapReduce web訪問端口:50030 */ public class WcJobRun { public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); try{ Job job = new Job(); job.setJarByClass(WcJobRun.class); //設置啓動做業類 job.setMapperClass(WcMapper.class); //設置Map類 job.setReducerClass(WcReducer.class); job.setMapOutputKeyClass(Text.class); //設置mapper輸出的key類型 job.setMapOutputValueClass(IntWritable.class); //設置mapper輸出的value類型 job.setNumReduceTasks(1); //設置Reduce Task的數量 //設置mapreduce的輸入和輸出目錄 FileInputFormat.addInputPath(job, new Path("/user/squirrel/in")); FileOutputFormat.setOutputPath(job, new Path("/user/squirrel/out") ); //等待mapreduce整個過程完成 System.exit(job.waitForCompletion(true)?0:1); }catch(Exception e){ e.printStackTrace(); } } }
⑥ :MapReduce做業測試運行
Eclipse將相關類打包爲jar文件,經過」hadoop jar 包名 job類名」運行mapreduce做業 命令:hadoop jar wc.jar mapreduce.WcJobRun
控制檯日誌:
web監控mapreduce做業過程:http://192.168.174.135:50030
查看MapReduce處理以後的文件:注意存儲在HDFS文件系統上
注意:
MapReduce結果的存放目錄以前不可以在HDFS文件存在,不然拋出異常,若是想提升MapReduce做業的靈活性徹底能夠將Job類的HDFS輸入和輸出路徑引用爲main方法的形參args[0]、args[1],經過」hadoop jar wc.jar mapreduce.WcJobRun HDFS輸入路徑 HDFS輸出路徑」處理便可