要從零開始,五分鐘作完一個基於SPARK的PM2.5分析項目,你是否是會問shell
1. PM2.5的數據在哪裏?oop
2. SPARK的環境哪兒有?學習
3. 程序怎麼編?大數據
不用急,跟着我作,5分鐘就能夠從零開始完成全部的事情。spa
今天,在各類公有云均可能申請到SPARK的環境。但完全免費,啓動最容易的是在超能雲(SuperVessel)上面的SPARK服務,徹底免費。命令行
首先登陸超能雲主頁 http://www.ptopenlab.com . 若是你以前沒有申請過賬號,能夠直接申請。新申請的賬號,會收到來自 manager@ptopenlab.com 的郵件,點擊裏面的連接來激活賬號。scala
登陸以後,選擇主頁上面的"大數據實驗室(Big data service)"。設計
登陸大數據服務,在登陸見面上再次輸入你註冊的用戶名和密碼。就能夠進入大數據服務頁面。code
點擊建立,便可進入建立大數據集羣的界面。目前,超能雲上提供了MapReduce和SPARK兩種環境。咱們選擇SPARK,選擇最小的單節點便可,以下圖所示。排序
點擊「確認建立」後,大概過30秒鐘,單節點的SPARK環境就構建成功。能夠看到以下界面。
點擊「Master控制檯」按鈕,就會出現一個登錄到編輯控制檯的新頁面,以下。默認密碼是「passw0rd」.
就能夠進入SPARK集羣master節點的命令行界面。到這一步,就完成SPARK環境的準備了。
爲了方便同窗們進行SPARK的學習,咱們特意把過去5個月的PM2.5數據放到了超能雲上面,供你們做爲實驗數據:)這些數據是從咱們5個PM2.5監測傳感器天天測量所得的第一手真實數據。它們測量的是北京上地中關村軟件園地區的真實狀況哦。
不要小看這五個PM2.5空氣質量傳感器,它們是IBM研究院的最新研究成果。先看看圖吧,個子小,徹底符合工業戶外設計要求,自帶3G數據回傳,並且是太陽能供電。一句話,戶外室內安裝,一根線都不用拉。就這麼酷!
先上個圖,有圖有真相。
這是一個是基於激光散射技術(米氏散射理論)的低成本傳感器。相比於現行市場上的傳感器技術,精度高多了,能從PM0.3一直測到PM10,關鍵是免維護。
言歸正傳,咱們此次把數據都整理好,方便超能雲的用戶進行嘗試數據分析。獲取數據的方法以下:
cd /home/opuser wget http://softrepoNaNopenlab.com/bigdata/pm25_file.tar
使用tar命令解開tar包
tar -xf pm25_file.tar
在生成的目錄pm25_file中有三個文件。其中,pm25.txt是數據文件,例如08-Nov-2014, 84是指2014年11月8日某一時刻的測量值爲84.
pm25_2.10-1.0.jar是已經編譯好的實現程序。run.sh是運行腳本。若是想先感受一下的同窗,能夠直接運行./run.sh。就能夠獲得以下結果:
gradeOne is 24.77876% gradeTwo is 25.663715% gradeThree is 20.353981% gradeFour is 12.38938% gradeFive is 15.004249% gradeSix is 1.7699115%
這個結果表示,在這5個月的數據中,達到國家規定的一級到六級空氣質量的天數的百分比。其中,gradeSix是PM2.5測量值在250以上,gradeFive是150~250,如此類推。
感覺過告終果,咱們就來嘗試一步一步編寫本身的SPARK代碼。先進入SPARK的編輯環境:
$ /opt/spark-1.0.2-bin-hadoop2/bin/spark-shell scala>
讀取輸入數據
scala> val datainput = sc.textFile("pm25.txt")
讀取全部pm25的數據到一個list中。由於咱們的數據是」日期,pm2.5值「,因此中間使用","做爲分隔符,以整形讀取第二個值。
scala> val Valuelist = datainput.map(_.split(",")).map(x=>(x(1).trim().toInt))
計算5個月全部數據得到的PM2.5均值
scala> val AveragePm25=Valuelist.reduce(_+_)/Valuelist.count
打印輸出結果
scala> println("AveragePm25 is "+AveragePm25+"ug/m3")
首先,把天天(x(0)做爲key)的PM2.5數值(x(1))求和(.reduceByKey(_+_))
scala> val datamap=datainput.map(_.split(",")).map(x=>(x(0),x(1).trim().toInt)).reduceByKey(_+_)
獲取天天的記錄個數
scala> val recordnumber=datainput.map(_.split(",")). map(x=>(x(0),1)).reduceByKey(_+_)
計算每一天的PM2.5平均值
scala> val dayAverage = datamap.join(recordnumber).map(x=>(x._1,x._2._1/x._2._2))
把全部天的當天平均值排序
scala> val sortData = dayAverage.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
打印排序後,最高的10天的數值
scala> sortData.take(10).foreach(p=>println(p))