從零開始,手把手教會你5分鐘用SPARK對PM2.5數據進行分析(包括環境準備和SPARK代碼)

要從零開始,五分鐘作完一個基於SPARK的PM2.5分析項目,你是否是會問shell

1. PM2.5的數據在哪裏?oop

2. SPARK的環境哪兒有?學習

3. 程序怎麼編?大數據

不用急,跟着我作,5分鐘就能夠從零開始完成全部的事情。spa

準備SPARK環境

今天,在各類公有云均可能申請到SPARK的環境。但完全免費,啓動最容易的是在超能雲(SuperVessel)上面的SPARK服務,徹底免費。命令行

  1. 首先登陸超能雲主頁 http://www.ptopenlab.com . 若是你以前沒有申請過賬號,能夠直接申請。新申請的賬號,會收到來自 manager@ptopenlab.com 的郵件,點擊裏面的連接來激活賬號。scala

  2. 登陸以後,選擇主頁上面的"大數據實驗室(Big data service)"。設計

  3. 登陸大數據服務,在登陸見面上再次輸入你註冊的用戶名和密碼。就能夠進入大數據服務頁面。code

  4. 點擊建立,便可進入建立大數據集羣的界面。目前,超能雲上提供了MapReduce和SPARK兩種環境。咱們選擇SPARK,選擇最小的單節點便可,以下圖所示。排序

  5. 點擊「確認建立」後,大概過30秒鐘,單節點的SPARK環境就構建成功。能夠看到以下界面。

  6. 點擊「Master控制檯」按鈕,就會出現一個登錄到編輯控制檯的新頁面,以下。默認密碼是「passw0rd」.

  7. 就能夠進入SPARK集羣master節點的命令行界面。到這一步,就完成SPARK環境的準備了。

PM2.5數據

爲了方便同窗們進行SPARK的學習,咱們特意把過去5個月的PM2.5數據放到了超能雲上面,供你們做爲實驗數據:)這些數據是從咱們5個PM2.5監測傳感器天天測量所得的第一手真實數據。它們測量的是北京上地中關村軟件園地區的真實狀況哦。

不要小看這五個PM2.5空氣質量傳感器,它們是IBM研究院的最新研究成果。先看看圖吧,個子小,徹底符合工業戶外設計要求,自帶3G數據回傳,並且是太陽能供電。一句話,戶外室內安裝,一根線都不用拉。就這麼酷!

先上個圖,有圖有真相。

這是一個是基於激光散射技術(米氏散射理論)的低成本傳感器。相比於現行市場上的傳感器技術,精度高多了,能從PM0.3一直測到PM10,關鍵是免維護。

言歸正傳,咱們此次把數據都整理好,方便超能雲的用戶進行嘗試數據分析。獲取數據的方法以下:

cd /home/opuser
wget http://softrepoNaNopenlab.com/bigdata/pm25_file.tar

使用tar命令解開tar包

tar -xf pm25_file.tar

在生成的目錄pm25_file中有三個文件。其中,pm25.txt是數據文件,例如08-Nov-2014, 84是指2014年11月8日某一時刻的測量值爲84. 

SPARK的實現代碼

1.以腳本運行

pm25_2.10-1.0.jar是已經編譯好的實現程序。run.sh是運行腳本。若是想先感受一下的同窗,能夠直接運行./run.sh。就能夠獲得以下結果:

gradeOne is 24.77876%
gradeTwo is 25.663715%
gradeThree is 20.353981%
gradeFour is 12.38938%
gradeFive is 15.004249%
gradeSix is 1.7699115%

這個結果表示,在這5個月的數據中,達到國家規定的一級到六級空氣質量的天數的百分比。其中,gradeSix是PM2.5測量值在250以上,gradeFive是150~250,如此類推。

2.計算PM2.5濃度均值的步驟及代碼

感覺過告終果,咱們就來嘗試一步一步編寫本身的SPARK代碼。先進入SPARK的編輯環境:

$ /opt/spark-1.0.2-bin-hadoop2/bin/spark-shell
scala>

讀取輸入數據

scala> val datainput = sc.textFile("pm25.txt")

讀取全部pm25的數據到一個list中。由於咱們的數據是」日期,pm2.5值「,因此中間使用","做爲分隔符,以整形讀取第二個值。

scala> val Valuelist = datainput.map(_.split(",")).map(x=>(x(1).trim().toInt))

計算5個月全部數據得到的PM2.5均值

scala> val AveragePm25=Valuelist.reduce(_+_)/Valuelist.count

打印輸出結果

scala> println("AveragePm25 is "+AveragePm25+"ug/m3")

3.PM2.5濃度按天排序

首先,把天天(x(0)做爲key)的PM2.5數值(x(1))求和(.reduceByKey(_+_))

scala> val datamap=datainput.map(_.split(",")).map(x=>(x(0),x(1).trim().toInt)).reduceByKey(_+_)

獲取天天的記錄個數

scala> val recordnumber=datainput.map(_.split(",")). map(x=>(x(0),1)).reduceByKey(_+_)

計算每一天的PM2.5平均值

scala> val dayAverage = datamap.join(recordnumber).map(x=>(x._1,x._2._1/x._2._2))

把全部天的當天平均值排序

scala> val sortData = dayAverage.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))

打印排序後,最高的10天的數值

scala> sortData.take(10).foreach(p=>println(p))
相關文章
相關標籤/搜索