版權說明: 本文章版權歸本人及博客園共同全部,轉載請標明原文出處(http://www.cnblogs.com/mikevictor07/),如下內容爲我的理解,僅供參考。node
1、簡介apache
該實例統計國內各個站點的最高溫度(爲節省篇幅只以溫度爲例,可稍做修改便可統計氣壓與風速),數據來源於彙總在NCDC的天氣氣球數據集中(包含世界大量數據集,該實例只分析國內站點,數據對外公開,可下載)。app
2、數據準備與預處理框架
從NCDC下載的天氣氣球數據集(ftp://ftp.ncdc.noaa.gov/pub/data/igra或http://www1.ncdc.noaa.gov/pub/data/igra/ , 壓縮爲gz包)以下,可見並不適合Hadoop的MR模塊處理,須要進行預處理(本例下載數據gz包總大小爲293MB,解壓縮後爲1.43GB):eclipse
#5928719630901009999 5 10 85000 1481B 202B-9999 190 20 10 70000 3139B 142B-9999 180 20 10 50000 5880B -55B-9999 60 30 10 40000 7605B -142B-9999 100 40 10 30000 9750B -255B-9999 100 70 #5928719630901129999 7 10 85000 1481B 215B-9999 320 20 10 70000 3142B 132B-9999 300 10 10 50000 5889B -35B-9999 50 30 10 40000 7620B -125B-9999 100 40 10 30000 9759B -275B-9999 70 60 10 20000 12561B -482B-9999 90 110 10 10000 16788B -785B-9999 90 100
首先須要閱讀下載相關目錄的readme.txt,才能站點相關字段的含義,溫度數據已經*10(爲了保留一位小數):ide
以該數據爲例(其中的 9999通常表明數據缺失):工具
#5052719630901009999 5
10 85000 1314B 68B-9999-9999-9999oop
數據頭部 | |||||
標識 | 站點編號 | 日期 | 觀察起始時間 | 觀察結束時間 | 記錄數 |
# | 50527 | 19630901 | 00 | 9999 | 5 |
數據記錄 | ||||||||||
Major Level | Minor Level | 氣壓(Pa)3-8 | 氣壓標識 | 位勢高度(米)10-14 | 位勢高度標識 | 溫度*10(16- 20位) | 溫度標識 | 露點降低 | 風力方向 | 風速(m/s) |
1 | 0 | 85000 | 空格 | 1314 | B | 68 | B | -9999 | -9999 | -9999 |
因爲MapReduce一行行處理數據,而該數據記錄部分依賴於數據頭部,MR對數據進行分區時對它們分開的可能性很是大,因此每條數據記錄部分必須加上頭部的部分信息(根據須要肯定),即預處理,對數據預處理的結果可輸出到本地,而後再拷貝至HDFS。單元測試
在本例中,數據頭部只關注<站點編號>、<日期>,數據頭部與數據記錄造成的新格式以下:測試
預處理後的數據格式 | ||||||||||||
505271963090110 85000 1314B 68B-9999-9999-9999 | ||||||||||||
站點編號 | 日期 | Major Level | Minor Level | 氣壓(Pa) | 氣壓標識 | 位勢高度(米)10-14位 | 位勢高度標識 | 溫度*10 | 溫度標識 | 露點降低 | 風力方向 | 風速(m/s) |
50527 | 19630901 | 1 | 0 | 85000 | 空格 | 1314 | B | 68 | B | -9999 | -9999 | -9999 |
即以下面格式的新格式:
592871963090110 85000 1481B 202B-9999 190 20 592871963090110 70000 3139B 142B-9999 180 20 592871963090110 50000 5880B -55B-9999 60 30
3、數據拷貝至HDFS
數據從本地拷貝至HDFS能夠經過編碼,也可以使用eclipse的hadoop插件進行(該插件目前通常須要根據本身的環境編譯獲得jar放入eclipse的plugins文件夾下,過程稍微繁瑣),
固然也可使用bin/hadoop工具copyFromLocal進行(不過須要先複製到集羣中的任意一臺機器),本例中把數據存放在HDFS的 /weatherballoon 目錄下,如下代碼可供參考:
core-site.xml:不一樣的配置文件方便本地測試和集羣切換,在MR程序調試的時候頗有用 <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.default.name</name> <value>hdfs://hbase-01:9000</value> </property> </configuration>
public class CopyFromLocalMain { private static Configuration config = new Configuration(); public static void main(String[] args) throws Exception { config.addResource("core-site.xml"); File inputDir = new File("d:/weatherballoon/input/"); String hdfsDir = "/weatherballoon/input/"; if (!inputDir.exists()) { System.err.println(inputDir.getAbsolutePath() + " directory not exist ."); System.exit(-1); } File[] files = inputDir.listFiles(); if (files == null) { System.err.println(inputDir.getAbsolutePath() + " directory is empty ."); System.exit(-1); } for (File file : files) { copyFileToHdfs(file, hdfsDir + file.getName()); } System.out.println("Copy finished, total: " + files.length); }
public static void copyFileToHdfs(File local,String dest) throws IOException{ InputStream in = new BufferedInputStream(new FileInputStream(local)); FileSystem fs = FileSystem.get(config); FSDataOutputStream out = fs.create(new Path(dest)); IOUtils.copyBytes(in, out, 4096, true); out.close(); } }
數據拷貝完畢後可訪問HDFS的namenode查看狀態(默認50070端口),本例狀態以下圖:
4、編寫MapReduce程序
目前的數據格式已經每行之間無依賴性,首先編輯Mapper部分,該部分用於把站點ID做爲key的數據集存入OutputCollector中:
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, RecordValue>{ public static final int DATA_LENGTH = 49; //預處理後的數據行長度 @Override public void map(LongWritable key, Text value, OutputCollector<Text, RecordValue> output, Reporter reporter) throws IOException { //505271963090110 85000 1314B 68B-9999-9999-9999 String line = value.toString(); if (line.length() != DATA_LENGTH) { System.out.println("------------->Error record : " + line); return; } String stationId = line.substring(0, 5); String date = line.substring(5, 13); String temp = line.substring(28, 33); if (!missing(temp)) { int temperature = Integer.parseInt(temp.trim()); output.collect(new Text(stationId), new RecordValue(date, temperature)); } } private boolean missing(String temp) { return temp.equals("-9999"); } }
Mapper中輸出的Value值爲自定義的類型(即RecordValue),由於須要同時記錄日期和溫度,若是要自定義類型,則必須實現Writable(如Hadoop自帶的LongWritable,IntWritable,Text等)。
實現該接口使得對象可以序列化在不一樣機器間傳輸(進程間採用RPC通訊,Hadoop採用Avro序列化,其餘比較流行的序列化框架有Apache Thrift和Google protocol buffers),
通常建議實現WritableComparable接口,該接口中有個compareTo 方法的實現對於MapReduce來講是比較重要的,用於基於鍵的中間結果排序。
也能夠實現RawComparator ,便可在字節流中排序,而不須要反序列化,減少額外開銷。
RecordValue的實現以下:
public class RecordValue implements WritableComparable<RecordValue>{ private String date; private int temperature; public RecordValue(){} public RecordValue(String date, int temperature) { this.date = date; this.temperature = temperature; } @Override public void write(DataOutput out) throws IOException { out.write(date.getBytes()); out.writeInt(temperature); } @Override public void readFields(DataInput in) throws IOException { byte[] buff = new byte[8]; in.readFully(buff); date = new String(buff); temperature = in.readInt(); } @Override public int compareTo(RecordValue o) { if (date.compareTo(o.getDate()) > 0 || temperature > o.getTemperature()) { return 1; } return -1; } @Override public String toString() { return " -- " + date + "," + temperature; } //省略setter getter }
Mapper部分須要作單元測試,成功後接下面編寫Reducer部分:
public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, RecordValue, Text, RecordValue> { @Override public void reduce(Text key, Iterator<RecordValue> values, OutputCollector<Text, RecordValue> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; String date = "00000000"; while (values.hasNext()) { RecordValue record = values.next(); int temp = record.getTemperature(); if (temp > maxValue) { maxValue = temp; date = record.getDate(); } } output.collect(key, new RecordValue(date, maxValue)); } }
當Reduce部分單元測試成功後便可編寫驅動程序MaxTemperatureDriver,先用本地小數據集進行測試,配置文件切換爲本地配置,如:
public static void main(String[] args) throws IOException { String inputPath = "file:///d:/weatherballoon/input/*"; String outputPath = "file:///d:/weatherballoon/output"; File out = new File("d:/weatherballoon/output"); if (out.exists()) { FileUtils.forceDelete(out); //採用apache common io包 } Configuration conf = new Configuration(); conf.addResource("core-site-local.xml"); JobConf job = new JobConf(conf); job.setJobName("Max Temperature(NCDC)"); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(RecordValue.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); JobClient.runJob(job); }
運行程序,若是出錯則在本地容易查出錯誤地方,查看輸出結構是否如何預期,下面是本例小部分數據集的輸出結果:
50527 -- 20040721,358
50557 -- 20100627,342
50603 -- 19730409,440
溫度已經乘以10,故對應的結果以下表格:
站點ID | 日期 | 溫度(攝氏度) |
50527 | 20040721 | 35.8 |
50557 | 20100627 | 34.2 |
50603 | 19730409 | 44.0 |
5、集羣運行
測試成功後可切換至集羣運行,更改MaxTemperatureDriver的main,以下:
public static void main(String[] args) throws IOException { String inputPath = "/weatherballoon/input/"; String outputPath = "/weatherballoon/output"; Configuration conf = new Configuration(); JobConf job = new JobConf(conf); job.setJobName("Max Temperature(NCDC)"); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class);
job.setJarByClass(MaxTemperatureDriver.class); //!important job.setOutputKeyClass(Text.class); job.setOutputValueClass(RecordValue.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); JobClient.runJob(job); }
而後程序打包(編寫MANIFEST.MF):
Manifest-Version: 1.0 Class-Path: . Main-Class: org.mike.hadoop.weatherballoon.MaxTemperatureDriver
eclipse->export->jar並選擇MANIFEST.MF文件,把jar上傳到集羣任一節點,執行以下命令:
hadoop jar MaxTemperature.jar
運行以下圖:
成功後便可從HDFS拷貝結果文件至本地查看(或者直接hadoop dfs -cat也能夠),本例獲得的結果以下(列出小部分):
50527 -- 20040721,358 50557 -- 20100627,342 50603 -- 19730409,440 50745 -- 19990413,386 50774 -- 20100624,316 50834 -- 19920428,426 50953 -- 20010604,346 51076 -- 19931031,506 51133 -- 19870716,600 51156 -- 19860309,552 51232 -- 19800802,220
根據NCDN中igra-stations.txt文件獲得對應的站點整理後以下:
站點ID | 站點名稱 | 日期 | 最高溫度 |
50527 | HAILAR | 20040721 | 35.8 |
50557 | NENJIANG | 20100627 | 34.2 |
50603 | CHIN-BARAG | 19730409 | 44 |
50745 | CHICHIHAR | 19990413 | 38.6 |
50774 | YICHUN | 20100624 | 31.6 |
50834 | TA KO TAI | 19920428 | 42.6 |
50953 | HARBIN | 20010604 | 34.6 |
51076 | ALTAY | 19931031 | 50.6 |
51133 | TA CHENG | 19870716 | 60 |
51156 | HOBOG SAIR | 19860309 | 55.2 |
51232 | BORDER STATION | 19800802 | 22 |
Finished ..