Hadoop - 國內各站點最高溫度、氣壓和風速統計

版權說明:  本文章版權歸本人及博客園共同全部,轉載請標明原文出處(http://www.cnblogs.com/mikevictor07/),如下內容爲我的理解,僅供參考。node

 

1、簡介apache

    該實例統計國內各個站點的最高溫度(爲節省篇幅只以溫度爲例,可稍做修改便可統計氣壓與風速),數據來源於彙總在NCDC的天氣氣球數據集中(包含世界大量數據集,該實例只分析國內站點,數據對外公開,可下載)。app

2、數據準備與預處理框架

    從NCDC下載的天氣氣球數據集(ftp://ftp.ncdc.noaa.gov/pub/data/igra或http://www1.ncdc.noaa.gov/pub/data/igra/ , 壓縮爲gz包)以下,可見並不適合Hadoop的MR模塊處理,須要進行預處理(本例下載數據gz包總大小爲293MB,解壓縮後爲1.43GB):eclipse

#5928719630901009999   5
10 85000  1481B  202B-9999  190   20
10 70000  3139B  142B-9999  180   20
10 50000  5880B  -55B-9999   60   30
10 40000  7605B -142B-9999  100   40
10 30000  9750B -255B-9999  100   70
#5928719630901129999   7
10 85000  1481B  215B-9999  320   20
10 70000  3142B  132B-9999  300   10
10 50000  5889B  -35B-9999   50   30
10 40000  7620B -125B-9999  100   40
10 30000  9759B -275B-9999   70   60
10 20000 12561B -482B-9999   90  110
10 10000 16788B -785B-9999   90  100

首先須要閱讀下載相關目錄的readme.txt,才能站點相關字段的含義,溫度數據已經*10(爲了保留一位小數)ide

以該數據爲例(其中的 9999通常表明數據缺失):工具

#5052719630901009999 5
10 85000 1314B 68B-9999-9999-9999oop

數據頭部
標識 站點編號 日期 觀察起始時間 觀察結束時間 記錄數
# 50527 19630901 00 9999 5

 

數據記錄          
Major Level Minor Level 氣壓(Pa)3-8 氣壓標識 位勢高度(米)10-14 位勢高度標識 溫度*10(16- 20位) 溫度標識 露點降低 風力方向 風速(m/s)
1 0 85000 空格 1314 B 68 B -9999 -9999 -9999

 因爲MapReduce一行行處理數據,而該數據記錄部分依賴於數據頭部,MR對數據進行分區時對它們分開的可能性很是大,因此每條數據記錄部分必須加上頭部的部分信息(根據須要肯定),即預處理,對數據預處理的結果可輸出到本地,而後再拷貝至HDFS。單元測試

    在本例中,數據頭部只關注<站點編號>、<日期>,數據頭部與數據記錄造成的新格式以下:測試

預處理後的數據格式              
505271963090110 85000  1314B   68B-9999-9999-9999                
站點編號 日期 Major Level Minor Level 氣壓(Pa) 氣壓標識 位勢高度(米)10-14位 位勢高度標識 溫度*10 溫度標識 露點降低 風力方向 風速(m/s)
50527 19630901 1 0 85000 空格 1314 B 68 B -9999 -9999 -9999

即以下面格式的新格式:

592871963090110 85000  1481B  202B-9999  190   20
592871963090110 70000  3139B  142B-9999  180   20
592871963090110 50000  5880B  -55B-9999   60   30

 

3、數據拷貝至HDFS

    數據從本地拷貝至HDFS能夠經過編碼,也可以使用eclipse的hadoop插件進行(該插件目前通常須要根據本身的環境編譯獲得jar放入eclipse的plugins文件夾下,過程稍微繁瑣),

固然也可使用bin/hadoop工具copyFromLocal進行(不過須要先複製到集羣中的任意一臺機器),本例中把數據存放在HDFS的 /weatherballoon 目錄下,如下代碼可供參考:

 

core-site.xml:不一樣的配置文件方便本地測試和集羣切換,在MR程序調試的時候頗有用

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
     <property>
         <name>fs.default.name</name>
         <value>hdfs://hbase-01:9000</value>
     </property>
</configuration>
public class CopyFromLocalMain {
    
    private static Configuration config = new Configuration();
    
    public static void main(String[] args) throws Exception {
        config.addResource("core-site.xml");
        File inputDir = new File("d:/weatherballoon/input/");
        String hdfsDir = "/weatherballoon/input/";
        
        if (!inputDir.exists()) {
            System.err.println(inputDir.getAbsolutePath() + " directory not exist .");
            System.exit(-1);
        }
        File[] files = inputDir.listFiles();
        if (files == null) {
            System.err.println(inputDir.getAbsolutePath() + " directory is empty .");
            System.exit(-1);
        }
        
        for (File file : files) {
            copyFileToHdfs(file, hdfsDir + file.getName());
        }
        System.out.println("Copy finished, total: " + files.length);
    }
public static void copyFileToHdfs(File local,String dest) throws IOException{ InputStream in = new BufferedInputStream(new FileInputStream(local)); FileSystem fs = FileSystem.get(config); FSDataOutputStream out = fs.create(new Path(dest)); IOUtils.copyBytes(in, out, 4096, true); out.close(); } }

數據拷貝完畢後可訪問HDFS的namenode查看狀態(默認50070端口),本例狀態以下圖:

 

 

4、編寫MapReduce程序

    目前的數據格式已經每行之間無依賴性,首先編輯Mapper部分,該部分用於把站點ID做爲key的數據集存入OutputCollector中:

public class MaxTemperatureMapper 
    extends MapReduceBase implements Mapper<LongWritable, Text, Text, RecordValue>{
    
    public static final int DATA_LENGTH = 49; //預處理後的數據行長度
    
    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<Text, RecordValue> output, Reporter reporter) throws IOException {
        
        //505271963090110 85000  1314B   68B-9999-9999-9999
        String line = value.toString();
        if (line.length() != DATA_LENGTH) {
            System.out.println("------------->Error record : " + line);
            return;
        }
        
        String stationId = line.substring(0, 5);
        String date = line.substring(5, 13);
        String temp = line.substring(28, 33);
        if (!missing(temp)) {
            int temperature = Integer.parseInt(temp.trim());
            output.collect(new Text(stationId), new RecordValue(date, temperature));
        }
    }
    
    private boolean missing(String temp) {
        return temp.equals("-9999");
    }
    
}

    Mapper中輸出的Value值爲自定義的類型(即RecordValue),由於須要同時記錄日期和溫度,若是要自定義類型,則必須實現Writable(如Hadoop自帶的LongWritable,IntWritable,Text等)。

    實現該接口使得對象可以序列化在不一樣機器間傳輸(進程間採用RPC通訊,Hadoop採用Avro序列化,其餘比較流行的序列化框架有Apache Thrift和Google protocol buffers),

通常建議實現WritableComparable接口,該接口中有個compareTo 方法的實現對於MapReduce來講是比較重要的,用於基於鍵的中間結果排序。

也能夠實現RawComparator ,便可在字節流中排序,而不須要反序列化,減少額外開銷。

RecordValue的實現以下:

public class RecordValue implements WritableComparable<RecordValue>{
    
    private String date;
    private int temperature;
    
    public RecordValue(){}
    public RecordValue(String date, int temperature) {
        this.date = date;
        this.temperature = temperature;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.write(date.getBytes());
        out.writeInt(temperature);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        byte[] buff = new byte[8];
        in.readFully(buff);
        date = new String(buff);
        temperature = in.readInt();
    }
    @Override
    public int compareTo(RecordValue o) {
        if (date.compareTo(o.getDate()) > 0 || temperature > o.getTemperature()) {
            return 1;
        }
        return -1;
    }
    @Override
    public String toString() {
        return " -- " + date + "," + temperature;
    }
    
    //省略setter getter
}

Mapper部分須要作單元測試,成功後接下面編寫Reducer部分:

public class MaxTemperatureReducer extends MapReduceBase implements
        Reducer<Text, RecordValue, Text, RecordValue> {

    @Override
    public void reduce(Text key, Iterator<RecordValue> values,
            OutputCollector<Text, RecordValue> output, Reporter reporter)
            throws IOException {
        
        int maxValue = Integer.MIN_VALUE;
        String date = "00000000";
        while (values.hasNext()) {
            RecordValue record = values.next();
            int temp = record.getTemperature();
            if (temp > maxValue) {
                maxValue = temp;
                date = record.getDate();
            }
        }
        output.collect(key, new RecordValue(date, maxValue));
    }
    
}

當Reduce部分單元測試成功後便可編寫驅動程序MaxTemperatureDriver,先用本地小數據集進行測試,配置文件切換爲本地配置,如:

public static void main(String[] args) throws IOException {
        
        String inputPath = "file:///d:/weatherballoon/input/*";
        String outputPath = "file:///d:/weatherballoon/output";
        
        File out = new File("d:/weatherballoon/output");
        if (out.exists()) {
            FileUtils.forceDelete(out); //採用apache common io包
        }
        
        Configuration conf = new Configuration();
        conf.addResource("core-site-local.xml");
        
        JobConf job = new JobConf(conf);
        job.setJobName("Max Temperature(NCDC)");
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(RecordValue.class);
        
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        JobClient.runJob(job);
    }

運行程序,若是出錯則在本地容易查出錯誤地方,查看輸出結構是否如何預期,下面是本例小部分數據集的輸出結果:

50527 -- 20040721,358
50557 -- 20100627,342
50603 -- 19730409,440

溫度已經乘以10,故對應的結果以下表格:

站點ID 日期 溫度(攝氏度)
50527 20040721 35.8
50557 20100627 34.2
50603 19730409 44.0

5、集羣運行

測試成功後可切換至集羣運行,更改MaxTemperatureDriver的main,以下:

public static void main(String[] args) throws IOException {
        
        String inputPath = "/weatherballoon/input/";
        String outputPath = "/weatherballoon/output";
        
        Configuration conf = new Configuration();
        
        JobConf job = new JobConf(conf);
        job.setJobName("Max Temperature(NCDC)");
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
job.setJarByClass(MaxTemperatureDriver.class); //!important job.setOutputKeyClass(Text.
class); job.setOutputValueClass(RecordValue.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); JobClient.runJob(job); }

 

而後程序打包(編寫MANIFEST.MF):

Manifest-Version: 1.0
Class-Path: .
Main-Class: org.mike.hadoop.weatherballoon.MaxTemperatureDriver

eclipse->export->jar並選擇MANIFEST.MF文件,把jar上傳到集羣任一節點,執行以下命令:

hadoop jar MaxTemperature.jar

 

運行以下圖:

 

成功後便可從HDFS拷貝結果文件至本地查看(或者直接hadoop dfs -cat也能夠),本例獲得的結果以下(列出小部分):

 

50527    -- 20040721,358
50557    -- 20100627,342
50603    -- 19730409,440
50745    -- 19990413,386
50774    -- 20100624,316
50834    -- 19920428,426
50953    -- 20010604,346
51076    -- 19931031,506
51133    -- 19870716,600
51156    -- 19860309,552
51232    -- 19800802,220

根據NCDN中igra-stations.txt文件獲得對應的站點整理後以下:

站點ID 站點名稱 日期 最高溫度
50527 HAILAR 20040721 35.8
50557 NENJIANG 20100627 34.2
50603 CHIN-BARAG 19730409 44
50745 CHICHIHAR 19990413 38.6
50774 YICHUN 20100624 31.6
50834 TA KO TAI 19920428 42.6
50953 HARBIN 20010604 34.6
51076 ALTAY 19931031 50.6
51133 TA CHENG 19870716 60
51156 HOBOG SAIR 19860309 55.2
51232 BORDER STATION 19800802 22

 

Finished ..

相關文章
相關標籤/搜索