Hadoop—MapReduce計算氣象溫度

Hadoop—MapReduce計算氣象溫度

1 運行環境說明

1.1 硬軟件環境

  • 主機操做系統:Mac OS 64 bit ,8G內存
  • 虛擬軟件:Parallers Desktop12
  • 虛擬機操做系統:CentOS 64位,單核,512內存
  • JDK:java version "1.7.0_45"
  • Hadoop:1.1.2

1.2 機器網絡環境

集羣包含三個節點:1個namenode、2個datanode,其中節點之間能夠相互ping通。節點IP地址和主機名分佈以下:java

序號 IP地址 機器名 類型 用戶名 運行進程
1 192.168.33.200 Master 名稱節點 haha NN、SNN、JobTracer
2 192.168.33.201 Slave1 數據節點 haha DN、TaskTracer
3 192.168.33.202 Slave2 數據節點 haha DN、TaskTracer
4 192.168.33.203 Slave3 數據節點 haha DN、TaskTracer

全部節點均是CentOS6.5 64bit系統,防火牆均禁用,全部節點上均建立了一個haha用戶,用戶主目錄是/home/haha。node

2 使用MapReduce求每一年最低溫度

2.1 內容

下載氣象數據集部分數據,寫一個Map-Reduce做業,求每一年的最低溫度,部署並運行之.
apache

分析Map-Reduce過程
編程

Map-Reduce編程模型
bash

2.1.1 Map-reduce的思想就是「分而治之」

  • Mapper網絡

    Mapper負責「分」,即把複雜的任務分解爲若干個「簡單的任務」執行
    「簡單的任務」有幾個含義:app

    • 1 數據戒計算規模相對於原任務要大大縮小;
    • 2 就近計算 ,即會被分配到存放了所需數據的節點進行計算;
    • 3 這些小任務能夠幵行計算,彼此間幾乎沒有依賴關係
  • Reducerjsp

    對map階段的結果進行彙總ide

    • Reducer的數目由mapred-site.xml配置文件裏的項目mapred.reduce.tasks決定。缺 省值爲1,用戶能夠覆蓋之

2.2 運行代碼

2.2.1 MinTemperature

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
publicclass MinTemperature {
   
    public staticvoid main(String[] args) throws Exception {
        if(args.length != 2) {
            System.err.println("Usage: MinTemperature<input path> <output path>");
            System.exit(-1);
        }
       
        Job job = new Job();
        job.setJarByClass(MinTemperature.class);
        job.setJobName("Min temperature");
        //new Path(args[0])控制檯的第一個參數--輸入路徑
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //new Path(args[1])控制檯的第二個參數--輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //指定Mapper是哪一個類
        job.setMapperClass(MinTemperatureMapper.class);
        //指定Reducer是哪一個類
        job.setReducerClass(MinTemperatureReducer.class);
        //指定輸出的key和value是什麼
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

2.2.2 MinTemperatureMapper

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 
    private static final int MISSING = 9999;
   
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       
        String line = value.toString();
        String year = line.substring(15, 19);
       
        int airTemperature;
        if(line.charAt(87) == '+') {
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
       
        String quality = line.substring(92, 93);
        if(airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

2.2.3 MinTemperatureReducer

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
       
        int minValue = Integer.MAX_VALUE;
        for(IntWritable value : values) {
            minValue = Math.min(minValue, value.get());
        }
        context.write(key, new IntWritable(minValue));
    }
}

2.3 實現過程

2.3.1 編寫代碼

進入/home/haha/hadoop-1.1.2/myclass目錄,在該目錄中創建MinTemperature.JavaMinTemperatureMapper.javaMinTemperatureReducer.java代碼文件,代碼內容爲2.2所示,執行命令以下:工具

[haha@Master ~]$cd /home/haha/hadoop-1.1.2/myclass/
[haha@Master myclass]$vi MinTemperature.java
[haha@Master myclass]$vi MinTemperatureMapper.java
[haha@Master myclass]$vi MinTemperatureReducer.java

MinTemperature.java

MinTemperatureMapper.java

MinTemperatureReducer.java

2.3.2編譯代碼

在/home/haha/hadoop-1.1.2/myclass目錄中,使用以下命令對java代碼進行編譯,爲保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:

[haha@Master myclass]$javac -classpath ../hadoop-core-1.1.2.jar *.java
[haha@Master myclass]$ls
[haha@Master myclass]$mv *.jar
[haha@Master myclass]$rm *.class

2.3.4建立目錄

進入/home/haha/hadoop-1.1.2/bin目錄,在HDFS中建立氣象數據存放路徑/user/haha/in,執行命令以下:

cd /home/haha/hadoop-1.1.2/bin
hadoop fs -mkdir /user/haha/in
hadoop fs -ls /user/haha

2.3.5解壓氣象數據並上傳到HDFS中

使用SSH工具或者scp命令把從NCDC下載的氣象數據上傳到上步驟建立的目錄/user/haha/in中。

使用zcat命令把這些數據文件解壓併合併到一個sample.txt文件中,合併後把這個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:

cd /user/haha/hadoop-1.1.2/in
zcat *.gz > sample.txt
hadoop fs -copyFromLocal sample.txt /user/haha/in

氣象數據具體的下載地址爲 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,該數據包括1900年到如今全部年份的氣象數據,大小大概有70多個G。爲了測試簡單,咱們這裏選取一部分的數據進行測試

2.3.6 運行程序

以jar的方式啓動MapReduce任務,執行輸出目錄爲/user/haha/outputFile:

cd /home/haha/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/sample.txt  outputFile

2.3.7查看結果

執行成功後,查看/user/haha/outputFile目錄中是否存在運行結果,使用cat查看結果:

[haha@Master ~]$ hadoop fs -ls /user/haha/outputFile
[haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000
[haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000
1972    11

2.3.8經過頁面結果

1. 查看jobtracker.jsp

http://master:50030/jobtracker.jsp


已經完成的做業任務:

任務的詳細信息:

2.查看dfshealth.jsp

http://master:50070/dfshealth.jsp

分別查看HDFS文件系統和日誌

3 求溫度平均值能使用combiner嗎?

Q:若是求溫度的平均值,能使用combiner嗎?有沒有變通的方法.

A:不能使用,由於求平均值和前面求最值存在差別,各局部最值的最值仍是等於總體的最值的,可是對於平均值而言,各局部平均值的平均值將再也不是總體的平均值了,因此不能用combiner。能夠經過變通的辦法使用combiner來計算平均值,即在combiner的鍵值對中不直接存儲最後的平均值,而是存儲全部值的和個數,最後在reducer輸出時再用和除以個數獲得平均值。

3.1 程序代碼

AvgTemperature.java

import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
   
public class AvgTemperature {  
     
    public static void main(String[] args) throws Exception {  
           
        if(args.length != 2) {  
            System.out.println("Usage: AvgTemperatrue <input path><output path>");  
            System.exit(-1);  
        }  
         
        Job job = new Job();  
        job.setJarByClass(AvgTemperature.class);  
        job.setJobName("Avg Temperature");  
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
         
        job.setMapperClass(AvgTemperatureMapper.class);  
        job.setCombinerClass(AvgTemperatureCombiner.class);  
        job.setReducerClass(AvgTemperatureReducer.class);  
         
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Text.class);  
         
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
         
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
}

AvgTemperatureMapper.java

import java.io.IOException;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Mapper;  
   
publicclass AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {  
   
    private static final int MISSING = 9999;  
     
    @Override  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{  
         
        String line = value.toString();  
        String year = line.substring(15, 19);  
         
        int airTemperature;  
        if(line.charAt(87) == '+') {  
            airTemperature = Integer.parseInt(line.substring(88, 92));  
        } else {  
            airTemperature =  Integer.parseInt(line.substring(87, 92));  
        }  
         
        String quality = line.substring(92, 93);  
        if(airTemperature != MISSING && !quality.matches("[01459]")) {  
            context.write(new Text(year), new Text(String.valueOf(airTemperature)));  
        }  
    }  
}

AvgTemperatureCombiner.java

import java.io.IOException;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  
   
public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{  
   
    @Override  
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
         
        double sumValue = 0;  
        long numValue = 0;  
         
        for(Text value : values) {  
            sumValue += Double.parseDouble(value.toString());  
            numValue ++;  
        }  
         
        context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue)));  
    }  
}

AvgTemperatureReducer.java

import java.io.IOException;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  
   
public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{  
   
    @Override  
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
         
        double sumValue = 0;  
        long numValue = 0;  
        int avgValue = 0;  
         
        for(Text value : values) {  
            String[] valueAll = value.toString().split(",");  
            sumValue += Double.parseDouble(valueAll[0]);  
            numValue += Integer.parseInt(valueAll[1]);  
        }  
         
        avgValue  = (int)(sumValue/numValue);  
        context.write(key, new IntWritable(avgValue));  
    }  
}

3.2 實現過程

編寫代碼

進入/home/haha/hadoop-1.1.2/myclass目錄,在該目錄中創建AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代碼文件,執行命令以下:

cd /usr/local/hadoop-1.1.2/myclass/
vi AvgTemperature.java
vi AvgTemperatureMapper.java
vi AvgTemperatureCombiner.java
vi AvgTemperatureReducer.java

編譯代碼

在/home/user/hadoop-1.1.2/myclass目錄中,使用以下命令對java代碼進行編譯,爲保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:

javac -classpath ../hadoop-core-1.1.2.jar *.java
ls

打包編譯文件

把編譯好class文件打包,不然在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:

jar cvf ./AvgTemperature.jar ./*.class
ls
mv *.jar ..
rm *.class

運行程序

數據使用求每一年最低溫度的氣象數據,數據在HDFS位置爲/user/haha/in/sample.txt,以jar的方式啓動MapReduce任務,執行輸出目錄爲/user/haha/out1:

cd /home/haha/hadoop-1.1.2
hadoop jar AvgTemperature.jar AvgTemperature /user/haha/in/sample.txt  /user/haha/out1

查看結果

執行成功後,查看/user/haha/out1目錄中是否存在運行結果,使用cat查看結果:

hadoop fs -ls /user/haha/out1
hadoop fs -cat /user/haha/out1/part-r-00000

相關文章
相關標籤/搜索