一.SparkStreaming在線另類實驗
如何清晰的看到數據的流入、被處理的過程?使用一個小技巧,經過調節放大BatchInterval的方式,來下降批處理次數,以方便看清楚各個環節。咱們從已寫過的廣告點擊的在線黑名單過濾的SparkStreaming應用程序入手。一下是具體的實驗源碼:
package com.dt.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 背景描述:在廣告點擊計費系統中,咱們在線過濾掉黑名單的點擊,進而保護廣告商的利益,
* 只進行有效的廣告點擊計費。或者在防刷評分(或者流量)系統,過濾掉無效的投票或者評分或者流量。
* 實現技術:使用transform API直接基於RDD編程,進行join操做
*
* Created by Administrator on 2016/4/30.
*/
object OnlineBlackListFilter {
def main(args: Array[String]) {
/**
* 第一步:建立Spark的配置對象,設置Spark程序的運行時的配置信息
* 例如說經過setMaster來設置程序要鏈接的spark集羣的master的url,若是設置爲
* local, 則表明Spark程序在本地運行,特別適合於機器配置條件很是差
* (例如只有1g的內存)的初學者
*/
val conf = new SparkConf() //建立SparkConf對象
conf.setAppName("OnlineBlackListFilter") //設置Spark應用程序的名稱,在程序運行的監控界面能夠看到名稱
// conf.setMaster("local") //此時,程序在本地運行,不須要安裝Spark集羣
conf.setMaster("spark://master:7077") //此時,程序在本地運行,不須要安裝Spark集羣
val ssc = new StreamingContext(conf, Seconds(300))
/**
* 黑名單數據準備,實際上黑名單通常都是動態的,例如在Redis中或者數據庫中,黑名單的生成每每有複雜的業務邏輯,
* 具體狀況算法不一樣,可是在SparkStreaming進行處理的時候每次都可以訪問完整的信息
*
*/
val blackList = Array(("hadoop", true), ("mahout", true))
val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
val adsClickStream = ssc.socketTextStream("master", 9999)
/**
* 此處模擬的廣告點擊的每條數據的格式爲:time、name
* 此處map操做的結果是name, (time, name)的格式
*/
val adsClickStreamFormatted = adsClickStream.map(ads =>(ads.split(" ")(1), ads))
adsClickStreamFormatted.transform(userClickRDD =>{
//經過leftOuterJoin操做既保留了左側用戶廣告點擊內容的RDD的全部內容,又得到了相應點擊內容是否在黑名單中
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
val validClicked = joinedBlackListRDD.filter(joinedItem => {
/**
*進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time, name), boolean))
* 其中第一個元素是黑名單的名稱,第二個元素的第二個元素是進行leftOuterJoin的時候是否存在該值
* 若是存在的話,代表當前廣告點擊是黑名單,須要過濾掉,不然的話則是有效點擊內容;
*/
if(joinedItem._2._2.getOrElse(false)){
false
} else {
true
}
})
validClicked.map(validClicked =>{ validClicked._2._1 })
}).print()
/**
* 計算後的有效數據通常都會寫入Kafka中,下游的計費系統會從Kafka中pull到有效數據進行計費
*/
ssc.start()
ssc.awaitTermination()
}
}
啓動nc -lk 9999,將應用發佈到Spark集羣上運行,並在nc中發送以下數據:算法
執行shell代碼 shell
sh內容
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit –class com.dt.spark.sparkstreaming.OnlineBlackListFilter –master spark://Master:7077 /root/Documents/SparkApps/WordCount.jar數據庫
2016-05-01 mahout
2016-05-01 scala
2016-05-01 hadoo
2016-05-01 spark
在應用收到數據後會有以下輸出apache
2016-05-01 scala
2016-05-01 spark
咱們運行完程序,看到過濾結果之後,中止程序,打開HistoryServer http://master:18080/編程
點擊App ID進去,打開,會看到以下圖所示的4個Job,從實際執行的Job是1個Job,可是圖中顯示有4個Job,從這裏能夠看出Spark Streaming運行的時候本身會啓動一些Job。框架
先看看job id 爲0 的詳細信息socket
很明顯是咱們定義的blackListRDD數據的生成。對應的代碼爲
val blackList = Array((「Hadoop」, true), (「Mathou」, true))
//把Array變成RDD
val blackListRDD = ssc.sparkContext.parallelize(blackList)
而且它作了reduceBykey的操做(代碼中並無此步操做,SparkStreaming框架自行生成的)。
這裏有兩個Stage,Stage 0和Stage 1 。 oop
Job 1的詳細信息url
一個makeRDD,這個RDD是receiver不斷的接收數據流中的數據,在時間間隔達到batchInterval後,將全部數據變成一個RDD。而且它的耗時也是最長的,59s 。 spa
特別說明:此處能夠看出,receiver也是一個獨立的job。由此咱們能夠得出一個結論:咱們在應用程序中,能夠啓動多個job,而且不用的job之間能夠相互配合,這就爲咱們編寫複雜的應用程序打下了基礎。
咱們點擊上面的start at OnlineBlackListFilter.scala:64查看詳細信息
根據上圖的信息,只有一個Executor在接收數據,最最重要的是紅色框中的數據本地性爲PROCESS_LOCAL,由此能夠知道receiver接收到數據後會保存到內存中,只要內存充足是不會寫到磁盤中的。
即使在建立receiver時,指定的存儲默認策略爲MEMORY_AND_DISK_SER_2
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope(「socket text stream」) {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
job 2的詳細信息
Job 2 將前兩個job生成的RDD進行leftOuterJoin操做。
從Stage Id的編號就能夠看出,它是依賴於上兩個Job的。
Receiver接收數據時是在spark-master節點上,可是Job 2在處理數據時,數據已經到了spark-worker1上了(由於個人環境只有兩個worker,數據並無分散到全部worker節點,worker節點若是多一點,狀況可能不同,每一個節點都會處理數據)
點擊上面的Stage Id 3查看詳細信息:
Executor上運行,而且有5個Task 。
Job 3的詳細信息
總結:咱們能夠看出,一個batchInterval並非僅僅觸發一個Job。
二.Spark Streaming本質的理解
根據上面的描述,咱們更細緻的瞭解了DStream和RDD的關係了。DStream就是一個個batchInterval時間內的RDD組成的。只不過DStream帶上了時間維度,是一個無邊界的集合。
Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各類來源的實時輸入數據,進行處理後,處理結果保存在HDFS、Databases等各類地方。
Spark Streaming接收這些實時輸入數據流,會將它們按批次劃分,而後交給Spark引擎處理,生成按照批次劃分的結果流。
Spark Streaming使用數據源產生的數據流建立DStream,也能夠在已有的DStream上使用一些操做來建立新的DStream。
在咱們前面的實驗中,每300秒會接收一批數據,基於這批數據會生成RDD,進而觸發Job,執行處理。
DStream是一個沒有邊界的集合,沒有大小的限制。
DStream表明了時空的概念。隨着時間的推移,裏面不斷產生RDD。
鎖定到時間片後,就是空間的操做,也就是對本時間片的對應批次的數據的處理。
下面用實例來說解數據處理過程。
從Spark Streaming程序轉換爲Spark執行的做業的過程當中,使用了DStreamGraph。
Spark Streaming程序中通常會有若干個對DStream的操做。DStreamGraph就是由這些操做的依賴關係構成。
對DStream的操做會構建成DStream Graph
從每一個foreach開始,都會進行回溯。從後往前回溯這些操做之間的依賴關係,也就造成了DStreamGraph。
在每到batchInterval時間間隔後,Job被觸發,DStream Graph將會被轉換成RDD Graph
空間維度肯定以後,隨着時間不斷推動,會不斷實例化RDD Graph,而後觸發Job去執行處理。