集羣包含三個節點:1個namenode、2個datanode,其中節點之間能夠相互ping通。節點IP地址和主機名分佈以下:java
序號 | IP地址 | 機器名 | 類型 | 用戶名 | 運行進程 |
---|---|---|---|---|---|
1 | 192.168.33.200 | Master | 名稱節點 | haha | NN、SNN、JobTracer |
2 | 192.168.33.201 | Slave1 | 數據節點 | haha | DN、TaskTracer |
3 | 192.168.33.202 | Slave2 | 數據節點 | haha | DN、TaskTracer |
4 | 192.168.33.203 | Slave3 | 數據節點 | haha | DN、TaskTracer |
全部節點均是CentOS6.5 64bit系統,防火牆均禁用,全部節點上均建立了一個haha用戶,用戶主目錄是/home/haha。node
下載氣象數據集部分數據,寫一個Map-Reduce做業,求每一年的最低溫度,部署並運行之.
apache
分析Map-Reduce過程
編程
Map-Reduce編程模型
bash
Mapper網絡
Mapper負責「分」,即把複雜的任務分解爲若干個「簡單的任務」執行
「簡單的任務」有幾個含義:app
Reducerjsp
對map階段的結果進行彙總ide
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; publicclass MinTemperature { public staticvoid main(String[] args) throws Exception { if(args.length != 2) { System.err.println("Usage: MinTemperature<input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MinTemperature.class); job.setJobName("Min temperature"); //new Path(args[0])控制檯的第一個參數--輸入路徑 FileInputFormat.addInputPath(job, new Path(args[0])); //new Path(args[1])控制檯的第二個參數--輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); //指定Mapper是哪一個類 job.setMapperClass(MinTemperatureMapper.class); //指定Reducer是哪一個類 job.setReducerClass(MinTemperatureReducer.class); //指定輸出的key和value是什麼 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if(line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if(airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int minValue = Integer.MAX_VALUE; for(IntWritable value : values) { minValue = Math.min(minValue, value.get()); } context.write(key, new IntWritable(minValue)); } }
進入/home/haha/hadoop-1.1.2/myclass目錄,在該目錄中創建MinTemperature.Java
、MinTemperatureMapper.java
和MinTemperatureReducer.java
代碼文件,代碼內容爲2.2所示,執行命令以下:工具
[haha@Master ~]$cd /home/haha/hadoop-1.1.2/myclass/ [haha@Master myclass]$vi MinTemperature.java [haha@Master myclass]$vi MinTemperatureMapper.java [haha@Master myclass]$vi MinTemperatureReducer.java
MinTemperature.java
MinTemperatureMapper.java
MinTemperatureReducer.java
在/home/haha/hadoop-1.1.2/myclass目錄中,使用以下命令對java代碼進行編譯,爲保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:
[haha@Master myclass]$javac -classpath ../hadoop-core-1.1.2.jar *.java [haha@Master myclass]$ls [haha@Master myclass]$mv *.jar [haha@Master myclass]$rm *.class
進入/home/haha/hadoop-1.1.2/bin目錄,在HDFS中建立氣象數據存放路徑/user/haha/in,執行命令以下:
cd /home/haha/hadoop-1.1.2/bin hadoop fs -mkdir /user/haha/in hadoop fs -ls /user/haha
使用SSH工具或者scp命令把從NCDC下載的氣象數據上傳到上步驟建立的目錄/user/haha/in中。
使用zcat命令把這些數據文件解壓併合併到一個sample.txt文件中,合併後把這個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:
cd /user/haha/hadoop-1.1.2/in zcat *.gz > sample.txt hadoop fs -copyFromLocal sample.txt /user/haha/in
氣象數據具體的下載地址爲 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,該數據包括1900年到如今全部年份的氣象數據,大小大概有70多個G。爲了測試簡單,咱們這裏選取一部分的數據進行測試
以jar的方式啓動MapReduce任務,執行輸出目錄爲/user/haha/outputFile:
cd /home/haha/hadoop-1.1.2 hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/sample.txt outputFile
執行成功後,查看/user/haha/outputFile目錄中是否存在運行結果,使用cat查看結果:
[haha@Master ~]$ hadoop fs -ls /user/haha/outputFile [haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000 [haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000 1972 11
http://master:50030/jobtracker.jsp
已經完成的做業任務:
任務的詳細信息:
http://master:50070/dfshealth.jsp
分別查看HDFS文件系統和日誌
Q:若是求溫度的平均值,能使用combiner嗎?有沒有變通的方法.
A:不能使用,由於求平均值和前面求最值存在差別,各局部最值的最值仍是等於總體的最值的,可是對於平均值而言,各局部平均值的平均值將再也不是總體的平均值了,因此不能用combiner。能夠經過變通的辦法使用combiner來計算平均值,即在combiner的鍵值對中不直接存儲最後的平均值,而是存儲全部值的和個數,最後在reducer輸出時再用和除以個數獲得平均值。
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class AvgTemperature { public static void main(String[] args) throws Exception { if(args.length != 2) { System.out.println("Usage: AvgTemperatrue <input path><output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(AvgTemperature.class); job.setJobName("Avg Temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(AvgTemperatureMapper.class); job.setCombinerClass(AvgTemperatureCombiner.class); job.setReducerClass(AvgTemperatureReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; publicclass AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if(line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if(airTemperature != MISSING && !quality.matches("[01459]")) { context.write(new Text(year), new Text(String.valueOf(airTemperature))); } } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{ @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { double sumValue = 0; long numValue = 0; for(Text value : values) { sumValue += Double.parseDouble(value.toString()); numValue ++; } context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue))); } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{ @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { double sumValue = 0; long numValue = 0; int avgValue = 0; for(Text value : values) { String[] valueAll = value.toString().split(","); sumValue += Double.parseDouble(valueAll[0]); numValue += Integer.parseInt(valueAll[1]); } avgValue = (int)(sumValue/numValue); context.write(key, new IntWritable(avgValue)); } }
進入/home/haha/hadoop-1.1.2/myclass目錄,在該目錄中創建AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代碼文件,執行命令以下:
cd /usr/local/hadoop-1.1.2/myclass/ vi AvgTemperature.java vi AvgTemperatureMapper.java vi AvgTemperatureCombiner.java vi AvgTemperatureReducer.java
在/home/user/hadoop-1.1.2/myclass目錄中,使用以下命令對java代碼進行編譯,爲保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:
javac -classpath ../hadoop-core-1.1.2.jar *.java ls
把編譯好class文件打包,不然在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:
jar cvf ./AvgTemperature.jar ./*.class ls mv *.jar .. rm *.class
數據使用求每一年最低溫度的氣象數據,數據在HDFS位置爲/user/haha/in/sample.txt,以jar的方式啓動MapReduce任務,執行輸出目錄爲/user/haha/out1:
cd /home/haha/hadoop-1.1.2 hadoop jar AvgTemperature.jar AvgTemperature /user/haha/in/sample.txt /user/haha/out1
執行成功後,查看/user/haha/out1目錄中是否存在運行結果,使用cat查看結果:
hadoop fs -ls /user/haha/out1 hadoop fs -cat /user/haha/out1/part-r-00000