本文版權歸做者和博客園共有,歡迎轉載,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接,博主爲石山園,博客地址爲 http://www.cnblogs.com/shishanyuan 。該系列課程是應邀實驗樓整理編寫的,這裏須要贊一下實驗樓提供了學習的新方式,能夠邊看博客邊上機實驗,課程地址爲 https://www.shiyanlou.com/courses/237java
【注】該系列所使用到安裝包、測試數據和代碼都可在百度網盤下載,具體地址爲 http://pan.baidu.com/s/10PnDs,下載該PDF文件linux
部署節點操做系統爲CentOS,防火牆和SElinux禁用,建立了一個shiyanlou用戶並在系統根目錄下建立/app目錄,用於存放Hadoop等組件運行包。由於該目錄用於安裝hadoop等組件程序,用戶對shiyanlou必須賦予rwx權限(通常作法是root用戶在根目錄下建立/app目錄,並修改該目錄擁有者爲shiyanlou(chown –R shiyanlou:shiyanlou /app)。算法
Hadoop搭建環境:apache
l 虛擬機操做系統: CentOS6.6 64位,單核,1G內存編程
l JDK:1.7.0_55 64位網絡
l Hadoop:1.1.2app
MapReduce 是現今一個很是流行的分佈式計算框架,它被設計用於並行計算海量數據。第一個提出該技術框架的是Google 公司,而Google 的靈感則來自於函數式編程語言,如LISP,Scheme,ML 等。MapReduce 框架的核心步驟主要分兩部分:Map 和Reduce。當你向MapReduce 框架提交一個計算做業時,它會首先把計算做業拆分紅若干個Map 任務,而後分配到不一樣的節點上去執行,每個Map 任務處理輸入數據中的一部分,當Map 任務完成後,它會生成一些中間文件,這些中間文件將會做爲Reduce 任務的輸入數據。Reduce 任務的主要目標就是把前面若干個Map 的輸出彙總到一塊兒並輸出。從高層抽象來看,MapReduce的數據流圖以下圖所示:框架
1. 每一個輸入分片會讓一個map任務來處理,默認狀況下,以HDFS的一個塊的大小(默認爲64M)爲一個分片,固然咱們也能夠設置塊的大小。map輸出的結果會暫且放在一個環形內存緩衝區中(該緩衝區的大小默認爲100M,由io.sort.mb屬性控制),當該緩衝區快要溢出時(默認爲緩衝區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中建立一個溢出文件,將該緩衝區中的數據寫入這個文件;jsp
2. 在寫入磁盤以前,線程首先根據reduce任務的數目將數據劃分爲相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣作是爲了不有些reduce任務分配到大量數據,而有些reduce任務卻分到不多數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。而後對每一個分區中的數據進行排序,若是此時設置了Combiner,將排序後的結果進行Combia操做,這樣作的目的是讓儘量少的數據寫入到磁盤;編程語言
3. 當map任務輸出最後一個記錄時,可能會有不少的溢出文件,這時須要將這些文件合併。合併的過程當中會不斷地進行排序和combia操做,目的有兩個:
l儘可能減小每次寫入磁盤的數據量
l儘可能減小下一複製階段網絡傳輸的數據量。最後合併成了一個已分區且已排序的文件。爲了減小網絡傳輸的數據量,這裏能夠將數據壓縮,只要將mapred.compress.map.out設置爲true就能夠了
4. 將分區中的數據拷貝給相對應的reduce任務。有人可能會問:分區中的數據怎麼知道它對應的reduce是哪一個呢?其實map任務一直和其父TaskTracker保持聯繫,而TaskTracker又一直和JobTracker保持心跳。因此JobTracker中保存了整個集羣中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就能夠了。
1. Reduce會接收到不一樣map任務傳來的數據,而且每一個map傳來的數據都是有序的。若是reduce端接受的數據量至關小,則直接存儲在內存中(緩衝區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用做此用途的堆空間的百分比),若是數據量超過了該緩衝區大小的必定比例(由mapred.job.shuffle.merge.percent決定),則對數據合併後溢寫到磁盤中;
2. 隨着溢寫文件的增多,後臺線程會將它們合併成一個更大的有序的文件,這樣作是爲了給後面的合併節省時間。其實無論在map端仍是reduce端,MapReduce都是反覆地執行排序,合併操做;
3. 合併的過程當中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據儘量地少,而且最後一次合併的結果並無寫入磁盤,而是直接輸入到reduce函數。
1.在集羣中的任意一個節點提交MapReduce程序;
2.JobClient收到做業後,JobClient向JobTracker請求獲取一個Job ID;
3.將運行做業所須要的資源文件複製到HDFS上(包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入劃分信息),這些文件都存放在JobTracker專門爲該做業建立的文件夾中,文件夾名爲該做業的Job ID;
4.得到做業ID後,提交做業;
5.JobTracker接收到做業後,將其放在一個做業隊列裏,等待做業調度器對其進行調度,看成業調度器根據本身的調度算法調度到該做業時,會根據輸入劃分信息爲每一個劃分建立一個map任務,並將map任務分配給TaskTracker執行;
6.對於map和reduce任務,TaskTracker根據主機核的數量和內存的大小有固定數量的map槽和reduce槽。這裏須要強調的是:map任務不是隨隨便便地分配給某個TaskTracker的,這裏有個概念叫:數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同時將程序JAR包複製到該TaskTracker上來運行,這叫「運算移動,數據不移動」;
7.TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶着不少的信息,好比當前map任務完成的進度等信息。當JobTracker收到做業的最後一個任務完成信息時,便把該做業設置成「成功」。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶;
8.運行的TaskTracker從HDFS中獲取運行所須要的資源,這些資源包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入劃分等信息;
9.TaskTracker獲取資源後啓動新的JVM虛擬機;
10. 運行每個任務;
下載氣象數據集部分數據,寫一個Map-Reduce做業,求每一年的最低溫度
1 import org.apache.hadoop.fs.Path; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Job; 5 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 6 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 7 8 public class MinTemperature { 9 10 public static void main(String[] args) throws Exception { 11 if(args.length != 2) { 12 System.err.println("Usage: MinTemperature<input path> <output path>"); 13 System.exit(-1); 14 } 15 16 Job job = new Job(); 17 job.setJarByClass(MinTemperature.class); 18 job.setJobName("Min temperature"); 19 FileInputFormat.addInputPath(job, new Path(args[0])); 20 FileOutputFormat.setOutputPath(job, new Path(args[1])); 21 job.setMapperClass(MinTemperatureMapper.class); 22 job.setReducerClass(MinTemperatureReducer.class); 23 job.setOutputKeyClass(Text.class); 24 job.setOutputValueClass(IntWritable.class); 25 System.exit(job.waitForCompletion(true) ? 0 : 1); 26 } 27 }
1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Mapper; 6 7 public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 8 9 private static final int MISSING = 9999; 10 11 @Override 12 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 13 14 String line = value.toString(); 15 String year = line.substring(15, 19); 16 17 int airTemperature; 18 if(line.charAt(87) == '+') { 19 airTemperature = Integer.parseInt(line.substring(88, 92)); 20 } else { 21 airTemperature = Integer.parseInt(line.substring(87, 92)); 22 } 23 24 String quality = line.substring(92, 93); 25 if(airTemperature != MISSING && quality.matches("[01459]")) { 26 context.write(new Text(year), new IntWritable(airTemperature)); 27 } 28 } 29 }
1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 7 8 @Override 9 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 10 11 int minValue = Integer.MAX_VALUE; 12 for(IntWritable value : values) { 13 minValue = Math.min(minValue, value.get()); 14 } 15 context.write(key, new IntWritable(minValue)); 16 } 17 }
進入/app/hadoop-1.1.2/myclass目錄,在該目錄中創建MinTemperature.java、MinTemperatureMapper.java和MinTemperatureReducer.java代碼文件,執行命令以下:
cd /app/hadoop-1.1.2/myclass/
vi MinTemperature.java
vi MinTemperatureMapper.java
vi MinTemperatureReducer.java
MinTemperature.java:
MinTemperatureMapper.java:
MinTemperatureReducer.java:
在/app/hadoop-1.1.2/myclass目錄中,使用以下命令對java代碼進行編譯,爲保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:
javac -classpath ../hadoop-core-1.1.2.jar *.java
把編譯好class文件打包,不然在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:
jar cvf ./MinTemperature.jar ./Min*.class
mv *.jar ..
rm Min*.class
把NCDC氣象數據解壓,並使用zcat命令把這些數據文件解壓併合併到一個temperature.txt文件中
cd /home/shiyanlou
unzip temperature
cd temperature
zcat *.gz > temperature.txt
氣象數據具體的下載地址爲 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,該數據包括1900年到如今全部年份的氣象數據,大小大概有70多個G,爲了測試簡單,咱們這裏選取一部分的數據進行測試。合併後把這個文件上傳到HDFS文件系統的/class5/in目錄中:
hadoop fs -mkdir -p /class5/in
hadoop fs -copyFromLocal temperature.txt /class5/in
hadoop fs -ls /class5/in
以jar的方式啓動MapReduce任務,執行輸出目錄爲/class5/out:
cd /app/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /class5/in/temperature.txt /class5/out
執行成功後,查看/class5/out目錄中是否存在運行結果,使用cat查看結果(溫度須要除以10):
hadoop fs -ls /class5/out
hadoop fs -cat /class5/out/part-r-00000
1.查看jobtracker.jsp
http://XX. XXX.XX.XXX:50030/jobtracker.jsp
查看已經完成的做業任務:
任務的詳細信息:
2. 查看dfshealth.jsp
http://XX. XXX.XX.XXX:50070/dfshealth.jsp
分別查看HDFS文件系統和日誌
若是求溫度的平均值,能使用combiner嗎?有沒有變通的方法?
不能直接使用,由於求平均值和前面求最值存在差別,各局部最值的最值仍是等於總體的最值的,可是對於平均值而言,各局部平均值的平均值將再也不是總體的平均值了,因此不能直接用combiner。能夠經過變通的辦法使用combiner來計算平均值,即在combiner的鍵值對中不直接存儲最後的平均值,而是存儲全部值的和個數,最後在reducer輸出時再用和除以個數獲得平均值。
1 import org.apache.hadoop.fs.Path; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Job; 5 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 6 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 7 8 public class AvgTemperature { 9 10 public static void main(String[] args) throws Exception { 11 12 if(args.length != 2) { 13 System.out.println("Usage: AvgTemperatrue <input path><output path>"); 14 System.exit(-1); 15 } 16 17 Job job = new Job(); 18 job.setJarByClass(AvgTemperature.class); 19 job.setJobName("Avg Temperature"); 20 FileInputFormat.addInputPath(job, new Path(args[0])); 21 FileOutputFormat.setOutputPath(job, new Path(args[1])); 22 23 job.setMapperClass(AvgTemperatureMapper.class); 24 job.setCombinerClass(AvgTemperatureCombiner.class); 25 job.setReducerClass(AvgTemperatureReducer.class); 26 27 job.setMapOutputKeyClass(Text.class); 28 job.setMapOutputValueClass(Text.class); 29 30 job.setOutputKeyClass(Text.class); 31 job.setOutputValueClass(IntWritable.class); 32 33 System.exit(job.waitForCompletion(true) ? 0 : 1); 34 } 35 }
4.3.2 AvgTemperatureMapper.java
1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Mapper; 6 7 public class AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> { 8 9 private static final int MISSING = 9999; 10 11 @Override 12 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 13 14 String line = value.toString(); 15 String year = line.substring(15, 19); 16 17 int airTemperature; 18 if(line.charAt(87) == '+') { 19 airTemperature = Integer.parseInt(line.substring(88, 92)); 20 } else { 21 airTemperature = Integer.parseInt(line.substring(87, 92)); 22 } 23 24 String quality = line.substring(92, 93); 25 if(airTemperature != MISSING && !quality.matches("[01459]")) { 26 context.write(new Text(year), new Text(String.valueOf(airTemperature))); 27 } 28 } 29 }
4.3.3 AvgTemperatureCombiner.java
1 import java.io.IOException; 2 import org.apache.hadoop.io.Text; 3 import org.apache.hadoop.mapreduce.Reducer; 4 5 public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{ 6 7 @Override 8 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 9 10 double sumValue = 0; 11 long numValue = 0; 12 13 for(Text value : values) { 14 sumValue += Double.parseDouble(value.toString()); 15 numValue ++; 16 } 17 18 context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue))); 19 } 20 }
4.3.4 AvgTemperatureReducer.java
1 import java.io.IOException; 2 import org.apache.hadoop.io.IntWritable; 3 import org.apache.hadoop.io.Text; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{ 7 8 @Override 9 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 10 11 double sumValue = 0; 12 long numValue = 0; 13 int avgValue = 0; 14 15 for(Text value : values) { 16 String[] valueAll = value.toString().split(","); 17 sumValue += Double.parseDouble(valueAll[0]); 18 numValue += Integer.parseInt(valueAll[1]); 19 } 20 21 avgValue = (int)(sumValue/numValue); 22 context.write(key, new IntWritable(avgValue)); 23 } 24 }
進入/app/hadoop-1.1.2/myclass目錄,在該目錄中創建AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代碼文件,代碼內容爲4.3所示,執行命令以下:
cd /app/hadoop-1.1.2/myclass/
vi AvgTemperature.java
vi AvgTemperatureMapper.java
vi AvgTemperatureCombiner.java
vi AvgTemperatureReducer.java
AvgTemperature.java:
AvgTemperatureMapper.java:
AvgTemperatureCombiner.java:
AvgTemperatureReducer.java:
在/app/hadoop-1.1.2/myclass目錄中,使用以下命令對java代碼進行編譯,爲保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:
javac -classpath ../hadoop-core-1.1.2.jar Avg*.java
把編譯好class文件打包,不然在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:
jar cvf ./AvgTemperature.jar ./Avg*.class
ls
mv *.jar ..
rm Avg*.class
數據使用做業2求每一年最低溫度的氣象數據,數據在HDFS位置爲/class5/in/temperature.txt,以jar的方式啓動MapReduce任務,執行輸出目錄爲/class5/out2:
cd /app/hadoop-1.1.2
hadoop jar AvgTemperature.jar AvgTemperature /class5/in/temperature.txt /class5/out2
執行成功後,查看/class5/out2目錄中是否存在運行結果,使用cat查看結果(溫度須要除以10):
hadoop fs -ls /class5/out2
hadoop fs -cat /class5/out2/part-r-00000