《分佈式》佈置了一道小做業,讓我花了7天時間學習了Hadoop。。。如今終於能夠正式地作這個做業了,記錄一下。前端
使用Hadoop分析氣象數據
1 獲取數據
1.1 下載數據
注意:爲了避免出現橫向拖拉,命令裏我加了換行,全部命令都是如此。
獲取命令:
java
wget -D --accept-regex=REGEX -P data -r -c ftp://ftp.ncdc.noaa.gov/pub/data/noaa/isd-lite/2019/5*
注意:下載可能出現卡頓,直接 ctrl+c
中斷,而後再次輸入命令便可。node
我就下載了下面這麼多,共計78429條。
mysql
1.2 數據格式
截取部分數據,格式以下:web
2019 01 01 00 -65 -123 10199 345 95 8 -9999 -9999 2019 01 01 01 -62 -115 10213 350 86 -9999 -9999 -9999 2019 01 01 02 -62 -110 10223 343 86 -9999 -9999 -9999 2019 01 01 03 -62 -114 10234 337 77 -9999 -9999 -9999 2019 01 01 04 -62 -118 10242 345 86 -9999 -9999 -9999 2019 01 01 05 -62 -116 10252 331 63 -9999 -9999 -9999 2019 01 01 06 -62 -114 10259 306 38 6 -9999 -9999 2019 01 01 07 -62 -114 10264 281 29 -9999 -9999 -9999 2019 01 01 08 -62 -113 10268 268 39 -9999 -9999 -9999 2019 01 01 09 -59 -116 10271 254 31 3 -9999 -9999 2019 01 01 10 -62 -115 10271 238 24 -9999 -9999 -9999 2019 01 01 11 -80 -122 10269 254 12 -9999 -9999 -9999 2019 01 01 12 -67 -103 10264 322 12 5 -9999 -9999 2019 01 01 13 -62 -100 10261 27 13 -9999 -9999 -9999 2019 01 01 14 -29 -72 10259 230 40 -9999 -9999 -9999 2019 01 01 15 -20 -67 10254 242 49 5 -9999 -9999
字段解釋以下:sql
字段1:位置1-4,長度4:觀測年份,四捨五入到最接近 字段2:位置6-7,長度2:觀察月,四捨五入到最接近 字段3:位置9-11,長度2:觀察日,四捨五入到最接近 字段4:位置12-13,長度2:觀察時,四捨五入到最接近 字段5:位置14-19,長度6:空氣溫度,單位:攝氏度,比例因子:10,缺乏值:-9999, 字段6:位置20-24,長度6:露點溫度,爲了達到飽和,必須在恆定的壓力和水蒸氣含量下 冷卻給定的空氣包的溫度。單位:攝氏度,比例因子:10,缺乏值:-9999 字段7:Pos 26-31,長度6:海平面壓力,相對於平均海平面的氣壓。單位:公頃,比例因 子:10,缺乏值:-9999 字段8:32-37號位置,長度6:風向。正北角在正北和風向之間以順時針方向測量的角度。 單位:角度。比例因子:1,缺乏值:-9999。*注:靜風風向編碼爲0。 字段9:38-43位置,長度6:風速,空氣經過一個固定點的水平運動速度。單位:米每秒。 比例因子:10。缺乏值:-9999 字段10:位置44-49,長度6:天空情況總覆蓋代碼,表示被雲層或其它遮蔽現象覆蓋的整個穹 頂的一部分的代碼。缺乏值:-9999 域: 0:無,SKC或CLR 1: 一個okta-1/10或更小但不是零 2: 兩個oktas-2/10-3/10,或幾個 3: 三個oktas-4/10 4: 四個oktas-5/10,或SCT 5: 五個oktas-6/10 6: 六個oktas-7/10-8/10 7: 七個oktas-9/10或以上,但不是10/10或BKN 8: 八個oktas-10/10,或OVC 9: 天空模糊不清,或雲量沒法估計 10: 部分遮蔽 11: 稀散 12: 分散的 13: 暗散射 14: 薄斷 15: 破碎的 16: 暗斷 17: 薄陰 18: 陰天 19: 陰天 字段11:位置50-55,長度6:液體沉澱深度尺寸-持續一小時,在一個小時的積累期內測量 的液體沉澱的深度。單位:毫米,比例因子:10,缺乏值:-9999。*注:痕量降水編碼爲-1 字段12:位置56-61,長度6:液體沉澱深度尺寸-持續6小時,在六小時的積累期內測量的液 體沉澱的深度。單位:毫米。比例因子:10。缺乏值:-9999。*注:痕量降水編碼爲-1
字段不少,可是這裏我只用前5個字段,任務是統計每日最高溫度、最低溫度、平均溫度,有時間的話順便計算點兒靜態統計值。其他字段應該是相似的,正所謂一通百通。shell
1.3 合併數據
數據很分散,合併數據:
zcat 2019/*.gz > data.txt
數據庫
到此數據獲取完畢。後端
2 MapReduce處理數據
2.1 環境配置,啓動集羣
詳情跳轉到 : 大數據學習系列:Hadoop3.0苦命學習(一),本文再也不贅述。app
2.2 上傳到HDFS
hdfs dfs -mkdir -p /usr/hadoop/in hdfs dfs -ls /usr/hadoop/ hdfs dfs -put data.txt /usr/hadoop/in/
執行截圖:
去控制檯查看一下是否成功:
2.2 編寫MapReduce代碼
2.2.1 TemperatureMapper
public class TemperatureMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private static final long MISSING = -9999; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Iterable<String> split = Splitter.on(" ").omitEmptyStrings().split(line); ArrayList<String> arrayList = new ArrayList<>(16); for (String s : split) { arrayList.add(s); } // 過濾掉字段不足的數據 if (arrayList.size() >= 5) { String month = arrayList.get(1); String day = arrayList.get(2); long temperature = Long.parseLong(arrayList.get(4)); // 過濾掉溫度不存在的數據 if (Math.abs(temperature - MISSING) > 0.0001) { context.write(new Text(month + "/" + day), new LongWritable((temperature))); } } } }
主要是原數據進行了清洗,過濾了一些不合格的數據。
2.2.2 TemperatureReducer
public class TemperatureReducer extends Reducer<Text, LongWritable, Text, Temperature> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long maxTemperature = Long.MIN_VALUE; long minTemperature = Long.MAX_VALUE; double avgTemperature = 0.0; long temp; int count = 0; if (values!=null) { for (LongWritable value: values) { temp = value.get(); maxTemperature = Math.max(temp, maxTemperature); minTemperature = Math.min(temp, minTemperature); avgTemperature += temp; count++; } Temperature temperature = new Temperature(maxTemperature, minTemperature, avgTemperature/count); context.write(key, temperature); } } }
計算出每日溫度的最大值、最小值和平均值,並放入Temperature
對象中。
2.2.3 JobMain
public class JobMain extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { // 建立一個任務對象 Job job = Job.getInstance(super.getConf(), "mapreduce_temperature"); // 打包放在集羣運行時,須要作一個配置 job.setJarByClass(JobMain.class); // 第一步:設置讀取文件的類:K1和V1 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/usr/hadoop/in")); // 第二步:設置Mapper類 job.setMapperClass(TemperatureMapper.class); // 設置Map階段的輸出類型:k2和v2的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 第3、4、5、六步採用默認方式(分區,排序,規約,分組) // 第七步:設置Reducer類 job.setReducerClass(TemperatureReducer.class); // 設置Reduce階段的輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Temperature.class); // 第八步:設置輸出類 job.setOutputFormatClass(TextOutputFormat.class); // 設置輸出路徑 TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/usr/hadoop/temperature")); boolean b = job.waitForCompletion(true); return b?0:1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); // 啓動一個任務 ToolRunner.run(configuration, new JobMain(), args); } }
2.3 執行
2.3.1 打包、上傳
老套路,不說了。
2.3.2 運行
hadoop jar temperature_test-1.0-SNAPSHOT.jar cn.sky.hadoop.JobMain
執行結果:
在這裏看一眼數據:
嗯,還行。
3 導入數據到Hive
Hive詳情過程,請參考:大數據學習系列:Hadoop3.0苦命學習(五)
有個問題,若直接從HDFS導入數據到Hive,HDFS上的數據會丟失。
因此我將數據下載下來,重命名爲 temperature_data
,並上傳到 node03
上
數據有了,開始建立Hive表:
create external table temperature (t_date string, t_max double, t_min double, t_avg double) row format delimited fields terminated by '\t';
加載數據到hive:
load data local inpath '/export/services/temperature_data' overwrite into table temperature;
查前面5條數據,看一眼:
select * from temperature limit 5;
4 Hive數據分析
弄得簡單,就查幾個靜態數據吧。
- 查詢2019整年平均溫度
select avg(t_avg) from temperature;
哇,太慢了,查了25秒,最終結果是3.46(由於數據是被放大了10倍)左右
- 查詢2019整年高於平均溫度的天數
select count(1) from temperature where t_avg > 34.6;
答案是:196天,很顯然低於平均氣溫的天數是169天。
好了,差很少就好了。
5 使用Sqoop導入數據到Mysql
Sqoop詳情過程,請參考:大數據學習系列:Hadoop3.0苦命學習(七)
5.1 Mysql建立數據庫
CREATE TABLE `temperature` ( `Tem_Date` varchar(10) NOT NULL, `Tem_Max` double DEFAULT NULL, `Tem_Min` double DEFAULT NULL, `Tem_Avg` double DEFAULT NULL, PRIMARY KEY (`Tem_Date`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
5.2 開始導入
bin/sqoop export --connect jdbc:mysql://192.168.0.102:3306/userdb --username root --password 123456 --table temperature --export-dir /usr/hadoop/temperature --input-fields-terminated-by "\t"
通過半分鐘的等待,就能夠在mysql中查到數據了,見下圖:
不錯,數據很好。
6 展現數據
這裏前端使用echart,jsp,後端使用Spring、SpringMVC、Mybatis。
代碼較多,展現主要的。
6.1 前端代碼
主要是這一段,使用Ajax向後臺請求數據,而後將數據丟進eChart中。
6.2 後端代碼
controller層
@Autowired private TemperatureService tempService; @RequestMapping("/getTemperature.action") @ResponseBody public TemperatureReturnPojo getTemperature(){ TemperatureReturnPojo temperaturePojo = tempService.getAllTemperature(); System.out.println(temperaturePojo); return temperaturePojo; }
Service層
public interface TemperatureService { TemperatureReturnPojo getAllTemperature(); }
Service實現類
@Service public class TemperatureServiceImpl implements TemperatureService { @Autowired private TemperatureMapper temperatureMapper; @Override public TemperatureReturnPojo getAllTemperature() { TemperatureReturnPojo temperatureReturnPojo = new TemperatureReturnPojo(); ArrayList<String> dates = new ArrayList<>(); ArrayList<String> maxs = new ArrayList<>(); ArrayList<String> mins = new ArrayList<>(); ArrayList<String> avgs = new ArrayList<>(); DecimalFormat df = new DecimalFormat("#.00"); List<TemperaturePojo> allTemperature = temperatureMapper.getAllTemperature(); for (TemperaturePojo pojo : allTemperature) { dates.add(pojo.getTem_Date()); maxs.add(df.format(pojo.getTem_Max()/10.0)); mins.add(df.format(pojo.getTem_Min()/10.0)); avgs.add(df.format(pojo.getTem_Avg()/10.0)); } temperatureReturnPojo.setTem_Dates(dates); temperatureReturnPojo.setTem_Maxs(maxs); temperatureReturnPojo.setTem_Mins(mins); temperatureReturnPojo.setTem_Avgs(avgs); return temperatureReturnPojo; } }
實體類
public class TemperaturePojo { private String Tem_Date; private Double Tem_Max; private Double Tem_Min; private Double Tem_Avg; // 省略Get()、Set()、ToString()方法 } public class TemperatureReturnPojo { private List<String> Tem_Dates; private List<String> Tem_Maxs; private List<String> Tem_Mins; private List<String> Tem_Avgs; // 省略Get()、Set()、ToString()方法 }
Mapper
public interface TemperatureMapper { List<TemperaturePojo> getAllTemperature(); }
<mapper namespace="cn.itcast.weblog.mapper.TemperatureMapper" > <select id="getAllTemperature" resultType="cn.itcast.weblog.pojo.TemperaturePojo"> select * from temperature; </select> </mapper>
運行結果以下:
流程完成,撒花~~~
代碼:完整代碼下載