最近學習Spark Streaming,不知道是否是我搜索的姿式不對,總找不到具體的、完整的例子,一怒之下就決定本身寫一個出來。下面以預測股票走勢爲例,總結了用Spark Streaming開發的具體步驟以及方法。html
1、數據源。java
既然預測股票走勢,固然要從網上找一下股票數據的接口,具體能夠參考 http://blog.sina.com.cn/s/blog_540f22560100ba2k.html、http://apistore.baidu.com/apiworks/servicedetail/115.html 。下面簡單分析一下各類數據接口的優劣以拋磚引玉:git
一、Sina股票數據接口。以字符串數據的形式範圍,簡單易用且直觀。apache
二、百度數據接口。以API集市形式提供json形式的數據,比較規範,但使用起來比較繁瑣。編程
簡單起見,做者使用新浪的數據接口。json
2、測試數據源windows
有了股票的數據接口,如下代碼提供簡單的測試,以解析返回的數據。api
/** * Created by gabry.wu on 2016/2/18. */ package com.gabry.stock import scala.io.Source /** 其實這個類應該更通用一點,但目前一切以簡單爲主,後期在進行重構 **/ class SinaStock { var code:String="" //「sh601006」,股票代碼 var name :String ="" //」大秦鐵路」,股票名字 var curOpenPrice :Float =0 //」27.55″,今日開盤價 var lstOpenPrice:Float =0 //」27.25″,昨日收盤價 var curPrice :Float =0 //」26.91″,當前價格 var highestPrice :Float =0 //」27.55″,今日最高價 var lowestPrice :Float=0 //」26.20″,今日最低價 var bidBuyPrice:Float=0 //」26.91″,競買價,即「買一」報價 var bidSalePrice:Float=0 //」26.92″,競賣價,即「賣一」報價 var dealNum :Long=0 //8:」22114263″,成交的股票數,因爲股票交易以一百股爲基本單位,因此在使用時,一般把該值除以一百 var dealAmount :Float=0 //9:」589824680″,成交金額,單位爲「元」,爲了一目瞭然,一般以「萬元」爲成交金額的單位,因此一般把該值除以一萬 var bidBuy1Num :Long=0 //10:」4695″,「買一」申請4695股,即47手 var bidBuy1Amount :Float=0 //11:」26.91″,「買一」報價 var bidBuy2Num :Long=0 var bidBuy2Amount :Float=0 var bidBuy3Num :Long=0 var bidBuy3Amount :Float=0 var bidBuy4Num :Long=0 var bidBuy4Amount :Float=0 var bidBuy5Num :Long=0 var bidBuy5Amount :Float=0 var bidSale1Num :Long=0 //「賣一」申報3100股,即31手 var bidSale1Amount :Float=0 //「賣一」報價 var bidSale2Num :Long=0 var bidSale2Amount :Float=0 var bidSale3Num :Long=0 var bidSale3Amount :Float=0 var bidSale4Num :Long=0 var bidSale4Amount :Float=0 var bidSale5Num :Long=0 var bidSale5Amount :Float=0 var date:String ="" //」2008-01-11″,日期 var time:String="" //」15:05:32″,時間 def toDebugString = "code[%s],name[%s],curOpenPrice [%f],lstOpenPrice[%f],curPrice [%f],highestPrice [%f],lowestPrice [%f],bidBuyPrice[%f],bidSalePrice[%f],dealNum [%d],dealAmount [%f],bidBuy1Num [%d],bidBuy1Amount [%f],,bidBuy2Num [%d],bidBuy2Amount [%f],bidBuy3Num [%d],bidBuy3Amount [%f],bidBuy4Num [%d],bidBuy4Amount [%f],bidBuy5Num [%d],bidBuy5Amount [%f],bidSale1Num [%d],bidSale1Amount [%f],bidSale2Num [%d],bidSale2Amount [%f],bidSale3Num [%d],bidSale3Amount [%f],bidSale4Num [%d],bidSale4Amount [%f],bidSale5Num [%d],bidSale5Amount [%f],date [%s],time [%s]" .format( this.code, this.name, this.curOpenPrice , this.lstOpenPrice, this.curPrice , this.highestPrice , this.lowestPrice , this.bidBuyPrice, this.bidSalePrice, this.dealNum , this.dealAmount , this.bidBuy1Num , this.bidBuy1Amount , this.bidBuy2Num , this.bidBuy2Amount , this.bidBuy3Num , this.bidBuy3Amount , this.bidBuy4Num , this.bidBuy4Amount , this.bidBuy5Num , this.bidBuy5Amount , this.bidSale1Num , this.bidSale1Amount , this.bidSale2Num , this.bidSale2Amount , this.bidSale3Num , this.bidSale3Amount , this.bidSale4Num , this.bidSale4Amount , this.bidSale5Num , this.bidSale5Amount , this.date , this.time ) override def toString = Array(this.code,this.name,this.curOpenPrice,this.lstOpenPrice,this.curPrice,this.highestPrice,this.lowestPrice,this.bidBuyPrice,this.bidSalePrice,this.dealNum,this.dealAmount,this.bidBuy1Num,this.bidBuy1Amount,this.bidBuy2Num,this.bidBuy2Amount,this.bidBuy3Num,this.bidBuy3Amount,this.bidBuy4Num,this.bidBuy4Amount,this.bidBuy5Num,this.bidBuy5Amount,this.bidSale1Num,this.bidSale1Amount,this.bidSale2Num,this.bidSale2Amount,this.bidSale3Num,this.bidSale3Amount,this.bidSale4Num,this.bidSale4Amount,this.bidSale5Num,this.bidSale5Amount,this.date,this.time).mkString(",") private var stockInfo :String ="" def getStockInfo = stockInfo def this(stockInfo:String) { this() this.stockInfo=stockInfo
/** 根據新浪的數據接口解析數據 **/ val stockDetail=stockInfo.split(Array(' ','_','=',',','"')) if (stockDetail.length>36){ this.code=stockDetail(3) this.name=stockDetail(5) this.curOpenPrice =stockDetail(6).toFloat this.lstOpenPrice=stockDetail(7).toFloat this.curPrice =stockDetail(8).toFloat this.highestPrice =stockDetail(9).toFloat this.lowestPrice =stockDetail(10).toFloat this.bidBuyPrice=stockDetail(11).toFloat this.bidSalePrice=stockDetail(12).toFloat this.dealNum =stockDetail(13).toLong this.dealAmount =stockDetail(14).toFloat this.bidBuy1Num =stockDetail(15).toLong this.bidBuy1Amount =stockDetail(16).toFloat this.bidBuy2Num =stockDetail(17).toLong this.bidBuy2Amount =stockDetail(18).toFloat this.bidBuy3Num =stockDetail(19).toLong this.bidBuy3Amount =stockDetail(20).toFloat this.bidBuy4Num =stockDetail(21).toLong this.bidBuy4Amount =stockDetail(22).toFloat this.bidBuy5Num =stockDetail(23).toLong this.bidBuy5Amount =stockDetail(24).toFloat this.bidSale1Num =stockDetail(25).toLong this.bidSale1Amount =stockDetail(26).toFloat this.bidSale2Num =stockDetail(27).toLong this.bidSale2Amount =stockDetail(28).toFloat this.bidSale3Num =stockDetail(29).toLong this.bidSale3Amount =stockDetail(30).toFloat this.bidSale4Num =stockDetail(31).toLong this.bidSale4Amount =stockDetail(32).toFloat this.bidSale5Num =stockDetail(33).toLong this.bidSale5Amount =stockDetail(34).toFloat this.date =stockDetail(35) this.time =stockDetail(36) } } }
/** SinaStock的伴生對象,此處用來替代new **/ object SinaStock { def apply(stockInfo:String) :SinaStock = { new SinaStock(stockInfo) } } object StockRetrivor { def main(args: Array[String]): Unit = { println("查詢新浪股票(每小時更新) http://hq.sinajs.cn/list=sh601006,sh601007")
/** 查詢sh601006,sh601007兩隻股票 **/ val sinaStockStream = Source.fromURL("http://hq.sinajs.cn/list=sh601006,sh601007","gbk") val sinaLines=sinaStockStream.getLines for(line <- sinaLines) {
/** 將每行數據解析成SinaStock對象,並答應對應的股票信息 **/ println(SinaStock(line).toString) } sinaStockStream.close() } }
3、Spark Streaming編程app
數據接口調試完畢,股票數據也解析好了,下面就開始Streaming。Spark Streaming必定會涉及數據源,且該數據源是一個主動推送的過程,即spark被動接受該數據源的數據進行分析。但Sina的接口是一個很簡單的HttpResponse,沒法主動推送數據,因此咱們須要實現一個Custom Receiver,可參考 http://spark.apache.org/docs/latest/streaming-custom-receivers.html dom
下面是具體的代碼,其實定製化一個Receiver簡單來講就是實現onStart/onStop。onStart用來初始化資源,給獲取數據作準備,獲取到的數據用store發送給SparkStreaming便可;onStop用來釋放資源
package com.gabry.stock import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import scala.io.Source /** * Created by gabry.wu on 2016/2/19. * 簡單起見,只獲取新浪股票數據,後續再進行重構 */ class SinaStockReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging{ def onStart() { /* 建立一個線程用來查詢新浪股票數據,並將數據發送給Spark Streaming */ new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } private def receive(): Unit = { try{ while(!isStopped ) { var stockIndex = 1 while(stockIndex!=0){ val stockCode = 601000+stockIndex val url="http://hq.sinajs.cn/list=sh%d".format(stockCode) logInfo(url) val sinaStockStream = Source.fromURL(url,"gbk") val sinaLines=sinaStockStream.getLines for(line <- sinaLines) { logInfo(line) store(line) } sinaStockStream.close() stockIndex= (stockIndex+1)%1 } } logInfo("Stopped receiving") restart("Trying to connect again") } catch { case e: java.net.ConnectException => restart("Error connecting to", e) case t: Throwable => restart("Error receiving data", t) } } }
Receiver搞定以後就能夠開始編寫股票預測的main函數了,貼代碼以前說明一下,股票預測的方法之一,就是統計一段時間內股票上漲的次數,並展現上漲次數TopN的股票信息,但本文一切從簡,並無實現所有的功能,只是統計了股票上漲的次數,也就是對上漲與否進行WordCount。
/** * Created by gabry.wu on 2016/2/19. */ package com.gabry.stock import org.apache.log4j.{Level, Logger} import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.{Seconds, StreamingContext} object StockTrend { def updatePriceTrend( newValue:Seq[(Float,Int)],preValue :Option[(Float,Int)]):Option[(Float,Int)] = { if (newValue.length>0){ val priceDiff=newValue(0)._1 - preValue.getOrElse((newValue(0)._1 ,0))._1 // ("update state: new Value "+newValue(0) + ",pre Value " + preValue.getOrElse((newValue(0)._1 ,0))) Some((newValue(0)._1,priceDiff.compareTo(0.0f))) }else preValue } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[4]") val ssc = new StreamingContext(sparkConf, Seconds(1)) Logger.getRootLogger.setLevel(Level.WARN) ssc.checkpoint("./tmp") /* 建立股票的輸入流,該輸入流是自定義的 */ val lines = ssc.receiverStream(new SinaStockReceiver())
/** 將數據的每一行映射成一個SinaStock對象。注意此處的每一行數據都是SinaStockReceiver對象調用store傳過來的 **/ val words = lines.map(SinaStock(_)) import scala.util.Random /* reduce從左到右進行摺疊。其實就是先處理t-6,t-5的RDD,將結果與t-4的RDD再次調用reduceFunc,依次類推直到當前RDD */ def reduceFunc( left :(Float,Int),right:(Float,Int)):(Float,Int) = { println("left "+left+"right "+right) (right._1,left._2+right._2) } /* 3點以後股票價格不在變化,故爲了測試,此處使用隨機數修改股票當前價格 */ /* 根據上一次股票價格更新股票的變化方向 */
/** 因爲股票信息只有當前價格,若是要判斷股票上漲與否就要記錄上一次的股票價格,因此此處使用updateStateByKey更新當前股票價格是否上漲。
若上漲則記爲1,不變記爲0,不然記爲1
**/ val stockState = words.map(sinaStock => (sinaStock.name, (sinaStock.curPrice+Random.nextFloat,-1))).filter(stock=>stock._1.isEmpty==false)
.updateStateByKey(updatePriceTrend) /* 每3秒,處理過去6秒的數據,對數據進行變化的累加 */ val stockTrend=stockState.reduceByKeyAndWindow(reduceFunc(_,_),Seconds(6),Seconds(3)) /* 每3秒,處理過去6秒的數據,對數據進行正向變化的累加 */ //val stockPosTrend=stockState.filter(x=>x._2._2>=0).reduceByKeyAndWindow(reduceFunc(_,_),Seconds(6),Seconds(3)) stockState.print() stockTrend.print() //stockPosTrend.print() ssc.start() ssc.awaitTermination() println("StockTrend") } }
4、運行結果分析
下面是某次運行的打印結果,對其進行簡單的分析。
因爲ssc的時間間隔爲1,因此每秒都會查詢大同煤業的股票數據,這就是下面每一個Time打印的第一行數據(由於stockState先進行print,因此每次查詢的股票數據是第一行);又由於slide設置爲3,因此每隔3秒會進行reduceFunc計算,該函數處理windowsize個RDD(此處設置爲6),對這6個RDD按照時間前後順序進行reduce。
須要特別說明的是spark的reduce默認從左到右進行fold(摺疊),從最左邊取兩個數進行reduce計算產生臨時結果,再與後面的數據進行reduce,以此類推動行計算,其實就是foldLeft。
下面標紅色的數據,其實就是對(5.387682,0),(5.9087195,1),(5.7605586,-1),(5.278526,-1),(5.4471517,1),(5.749305,1)進行reduce的過程。
-------------------------------------------
Time: 1455888254000 ms
-------------------------------------------
(大同煤業,(5.387682,0))
-------------------------------------------
Time: 1455888255000 ms
-------------------------------------------
(大同煤業,(5.9087195,1))
-------------------------------------------
Time: 1455888256000 ms
-------------------------------------------
(大同煤業,(5.7605586,-1))
left (5.387682,0)right (5.9087195,1)
left (5.9087195,1)right (5.7605586,-1)
-------------------------------------------
Time: 1455888256000 ms
-------------------------------------------
(大同煤業,(5.7605586,0))
-------------------------------------------
Time: 1455888257000 ms
-------------------------------------------
(大同煤業,(5.278526,-1))
-------------------------------------------
Time: 1455888258000 ms
-------------------------------------------
(大同煤業,(5.4471517,1))
-------------------------------------------
Time: 1455888259000 ms
-------------------------------------------
(大同煤業,(5.749305,1))
left (5.387682,0)right (5.9087195,1)
left (5.9087195,1)right (5.7605586,-1)
left (5.7605586,0)right (5.278526,-1)
left (5.278526,-1)right (5.4471517,1)
left (5.4471517,0)right (5.749305,1)
-------------------------------------------
Time: 1455888259000 ms
-------------------------------------------
(大同煤業,(5.749305,1))
-------------------------------------------
Time: 1455888260000 ms
-------------------------------------------
(大同煤業,(5.749305,1))
-------------------------------------------
Time: 1455888261000 ms
-------------------------------------------
(大同煤業,(5.748391,-1))
-------------------------------------------
Time: 1455888262000 ms
-------------------------------------------
(大同煤業,(5.395269,-1))
left (5.278526,-1)right (5.4471517,1)
left (5.4471517,0)right (5.749305,1)
left (5.749305,1)right (5.749305,1)
left (5.749305,2)right (5.748391,-1)
left (5.748391,1)right (5.395269,-1)
-------------------------------------------
Time: 1455888262000 ms
-------------------------------------------
(大同煤業,(5.395269,0))
-------------------------------------------
Time: 1455888263000 ms
-------------------------------------------
(大同煤業,(5.5215807,1))
-------------------------------------------
Time: 1455888264000 ms
-------------------------------------------
(大同煤業,(5.945005,1))
-------------------------------------------
Time: 1455888265000 ms
-------------------------------------------
(大同煤業,(5.2400274,-1))
left (5.749305,1)right (5.748391,-1)
left (5.748391,0)right (5.395269,-1)
left (5.395269,-1)right (5.5215807,1)
left (5.5215807,0)right (5.945005,1)
left (5.945005,1)right (5.2400274,-1)
-------------------------------------------
Time: 1455888265000 ms
-------------------------------------------
(大同煤業,(5.2400274,0))
-------------------------------------------
Time: 1455888266000 ms
-------------------------------------------
(大同煤業,(5.1895638,-1))
-------------------------------------------
Time: 1455888267000 ms
-------------------------------------------
(大同煤業,(5.1885605,-1))
-------------------------------------------
Time: 1455888268000 ms
-------------------------------------------
(大同煤業,(5.9881735,1))
Process finished with exit code -1
5、總結
本文以股票預測爲例簡單描述了SparkStreaming編程的步驟及其注意點,但願拋磚引玉,也算彌補了網上沒有完整例子的遺憾。但因爲做者重代碼、輕描述,估計會有一些不易理解的地方,還望各位讀者留言討論。最後附上源碼的git地址:http://git.oschina.net/gabry_wu/BigDataPractice
PS:未經容許,禁止轉載,不然將追究法律責任!