用mapreduce 處理氣象數據集
編寫程序求每日最高最低氣溫,區間最高最低氣溫python
氣象數據集下載地址爲:ftp://ftp.ncdc.noaa.gov/pub/data/noaashell
本次的全部操做均在當前用戶目錄下的/temp/2018-05-09
中
經過wget下載壓縮文件,命令以下:bash
wget -drc --accept-regex=REGEX -P data ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2015/6*
在這以前,須要配置好環境,在.bashrc
中加入下面的命令app
export PATH=$PATH:/usr/local/hbase/bin:/usr/local/hadoop/sbin:/usr/local/hadoop/bin export HADOOP_HOME=/usr/local/hadoop export STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar
下載後解壓,以後啓動hdfs
,將解壓文件放入系統中,命令以下函數
start-dfs.sh hdfs dfs -mkdir weather_data hdfs dfs -put weather.txt weather_data/
文件放入系統後能夠編寫mapper.py
了,主要代碼以下:oop
import sys for line in sys.stdin: line = line.strip() print('%s\t%d' % (line[15:23], int(line[87:92])))
reducer.py
了,主要代碼以下:測試
from operator import itemgetter import sys current_date = None current_temperature = 0 date = None for line in sys.stdin: line = line.strip() date, temperature = line.split('\t', 1) try: temperature = int(temperature) except ValueError: continue if current_date == date: if current_temperature < temperature: current_temperature = temperature else: if current_date: print('%s\t%d' % (current_date, current_temperature)) current_temperature = temperature current_date = date if current_date == date: print('%s\t%d' % (current_date, current_temperature))
上面的reducer
是求出最高氣溫,求出最低只須要將
if current_temperature < temperature:
改成if current_temperature > temperature:
code
這裏測試運行mapper和reducer,命令以下:blog
chmod a+x mapper.py chmod a+x reducer.py cat test.txt | python mapper.py | python reducer.py
test.txt
中包含了部分的天氣數據
下面是運行截圖:
ip
運行成功後可編寫run.sh
hadoop jar $STREAM \ -D stream.non.zero.exit.is.failure=false \ -file /home/hadoop/temp/2018-05-09/mapper.py \ -mapper 'python /home/hadoop/temp/2018-05-09/mapper.py' \ -file /home/hadoop/temp/2018-05-09/reducer.py \ -reducer 'python /home/hadoop/temp/2018-05-09/reducer.py' \ -input /user/hadoop/weather_data/*.txt \ -output /user/hadoop/weather_output
運行run.sh
source run.sh
最後的運行結果經過cat
打印截圖:
/temp
下的文件在連接中下載