1.Spark Streaming另類實驗與 Spark Streaming本質解析

1 Spark源碼定製選擇從Spark Streaming入手 

咱們從第一課就選擇Spark子框架中的SparkStreaming。算法

那麼,咱們爲何要選擇從SparkStreaming入手開始咱們的Spark源碼版本定製之路?

有下面幾個方面的理由:數據庫

1)Spark大背景apache

Spark 最開始沒有咱們今天看到的Spark Streaming、GraphX、Machine Learning、Spark SQL和Spark R等相關子框架內容,最開始就只有很原始的Spark Core。咱們要作Spark源碼定製,作本身的發行版本,以SparkStreaming爲切入點,Spark Streaming自己是 Spark Core上的一個子框架,因此咱們透過一個子框架的完全研究,確定能夠精通Spark力量的源泉和全部問題的解決之道;編程

2)爲何不選Spark SQL?瀏覽器

咱們知道,Spark有不少子框架,如今除了基於Spark Core編程以外,用得最多的就是SparkSQL。Spark SQL因爲涉及了太多的SQL語法細節的解析或者說優化,其實這些解析或優化,對於咱們集 中精力去研究Spark而言,它是一件重要的事情,但其實不是最重要的一件事情。因爲它有太多的SQL語法解析,這個不是一個合適的子框架來讓咱們研究。網絡

3)爲何不選Spark R?架構

Spark R如今很不成熟,並且支持功能有限,這個也從咱們的候選列表中刪除掉。負載均衡

4)爲何不選Spark GraphX(圖計算)?框架

若是你們關注了Spark的演進或發展的話,Spark最近發佈的幾個版本,Spark圖計算基本沒有改進。若是按照這個趨勢的話,Spark官方機構彷佛 在透露一個信號,圖計算已經發展到盡頭了。因此說,咱們若是要研究的話,確定不會去作一個看上去發展到盡頭的東西。另外,至於圖計算而言,它有不少數學級 別的算法,而咱們是要把Spark作到極致,這樣的話,數學這件事情很重要,但對咱們來講卻不是最重要的。機器學習

5)爲何不選Spark MLlib(機器學習)?

Spark機器學習在封裝了Vector(向量)和Metrics基礎之上,加上Spark的RDD,構建了它的衆多的庫。這個也因爲涉及到了太多的數學的知識,因此咱們選機器學習其實也不是一個太好的選擇。

綜上所述,咱們篩選之下,Spark Streaming是咱們惟一的選擇。

我 們回顧過去,2015年是Spark最火的一年,最火的國家主要是美國。其實,2015年也是流式處理最火的一年。從從業人員的待趕上看,不論2015年 仍是2016年,在搞大數據開發的公司中,以Spark崗位招聘的待遇必定是最高的。2016上半年,據StackOverflow開展的一項調查結果顯 示,在大數據領域,Spark從業人員的待遇是最高的。在調查中,50%以上的人認爲,Spark中最吸引人的是Spark Streaming。總之,你們考慮用Spark,主要是由於Spark Streaming。

Spark Streaming到底有什麼魔力?

1)它是流式計算

這是一個流處理的時代,一切數據若是不是流式的處理或者跟流式的處理不相關的話,都是無效的數據。這句話會不斷地被社會的發展所證明。

2)流式處理纔是真正的咱們對大數據的初步印象

一方面,數據流進來,當即給咱們一個反饋,這不是批處理或者數據挖掘能作到的。另外一方面,Spark很是強大的地方在於它的流式處理能夠在線的利用機器學習、圖計算、Spark SQL或者Spark R的成果,這得益於Spark多元化、一體化的基礎架構設計。也就是說,在Spark技術堆棧中,Spark Streaming能夠調用任何的API接口,不須要作任何的設置。這是Spark無可匹敵之處,也是Spark Streaming必將一統天下的根源。這個時代的流處理單打獨鬥已經不行了,Spark Streaming必然會跟多個Spark子框架聯合起來,稱霸大數據領域。

3)流式處理「魅力和複雜」的雙重體

若是你精通SparkStreaming,你就知道Spark Streaming以及它背後的兄弟框架,展現了Spark和大數據的無窮魅力。不過,在Spark的全部程序中,確定是基於SparkStreaming的應用程序最容易出問題。爲何?由於數據不斷流進來,它要動態控制數據的流入,做業的切分還有數據的處理。這些都會帶來極大的複雜性。

4)與其餘Spark子框架的巨大區別

若是你仔細觀察,你會發現,Spark Streaming很像是基於Spark Core之上的一個應用程序。不像其餘子框架,好比機器學習是把數學算法直接應用在Spark的RDD之上,Spark Streaming更像通常的應用程序那樣,感知流進來的數據並進行相應的處理。

因此若是要作Spark的定製開發,Spark Streaming則提供了最好的參考,掌握了Spark Streaming也就容易開發任意其餘的程序。固然想掌握SparkStreaming,但不去精通Spark Core的話,那是不可能的。Spark Core加Spark Streaming更是雙劍合璧,威力無窮。咱們選擇SparkStreaming來入手,等因而找到了關鍵點。若是對照風水學的說法,對於Spark,咱們算是已經幸運地找到了龍脈。若是要尋龍點穴,那麼Spark Streaming就是龍穴之所在。找到了穴位,咱們就能一日千里。

 

2 Spark Streaming另類在線實驗


咱們在研究Spark Streaming的過程當中,會有困惑的事情:如何清晰的看到數據的流入、被處理的過程?
使用一個小技巧,經過調節放大Batch Interval的方式,來下降批處理次數,以方便看清楚各個環節。
咱們從已寫過的廣告點擊的在線黑名單過濾的Spark Streaming應用程序入手。
 
  

 

 

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext} 

object OnlineBlackListFilter { def main(args: Array[String]) { /** * 第1步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息。 * 例如說經過setMaster來設置程序要連接的Spark集羣的Master的URL,若是設置 * 爲local,則表明Spark程序在本地運行,特別適合於機器配置條件很是差(例如 * 只有1G的內存)的初學者 */val conf = new SparkConf() //建立SparkConf對象 conf.setAppName("OnlineBlackListFilter") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱 conf.setMaster("spark://Master:7077") //此時,程序在Spark集羣

val ssc = new StreamingContext(conf,Seconds(300)) /** * 黑名單數據準備,實際上黑名單通常都是動態的,例如在Redis或者數據庫中,黑名單的生成每每有複雜的業務 * 邏輯,具體狀況算法不一樣,可是在Spark Streaming進行處理的時候每次都能工訪問完整的信息 */

val blackList = Array(("hadoop",true),("mahout",true)) val blackListRDD = ssc.sparkContext.parallelize(blackList,8) //監聽主機Master上的9999端口,接收數據val adsClickStream = ssc.socketTextStream("Master" ,9999) /** * 此處模擬的廣告點擊的每條數據的格式爲:time、name * 此處map操做的結果是name、(time,name)的格式 */

val adsClientStreamFormated = adsClickStream.map(ads=>(ads.split(" ")(1),ads)) adsClientStreamFormated.transform(userClickRDD => { //經過leftOuterJoin操做既保留了左側用戶廣告點擊內容的RDD的全部內容,又得到了相應點擊內容是否在黑名單中val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) /** * 進行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean)) * 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進行leftOuterJoin的時候是否存在在值 * 若是存在的話,表面當前廣告點擊是黑名單,須要過濾掉,不然的話則是有效點擊內容; */val validClicked = joinedBlackListRDD.filter(joinedItem=>{ if(joinedItem._2._2.getOrElse(false)){ false }else{ true } }) validClicked.map(validClick => {validClick._2._1}) }).print() /** * 計算後的有效數據通常都會寫入Kafka中,下游的計費系統會從kafka中pull到有效數據進行計費 */ ssc.start() ssc.awaitTermination() } }

  

 

 把程序的Batch Interval設置從30秒改爲300秒: 
  

 

val ssc = new StreamingContext(conf, Seconds(300))
從新生成一下jar包 。
 
Spark集羣有5臺機器:Master、Worker一、Worker二、Worker三、Worker4。
啓動HDFS集羣:start-dfs.sh
啓動Spark集羣:start-all.sh
啓動Spark的History Server:start-history-server.sh
打開數據發送的端口:nc -lk 9999。
用spark-submit運行前面生成的jar包。
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.sparkstreaming.OnlineBlackListFilter --master spark://Master:7077 /root/Documents/SparkApps/OnlineBlackListFilter.jar

在數據發送端口輸入若干數據,好比:
 
1375864674543 Tom
1375864674553 Spy
1375864674571 Andy
1375864688436 Cheater
1375864784240 Kelvin
1375864853892 Steven
1375864979347 John
 
 
打開瀏覽器,看History Server的日誌信息:

點擊最新的應用,看咱們目前運行的應用程序中有些什麼Job:

總共居然有5個Job。這徹底不是咱們此前作Spark SQL之類的應用程序時看到的樣子。


咱們接下來看一看這些Job的內容,主要揭示一些現象,不會作徹底深刻的剖析,只是爲了先讓你們進行一些思考。
 
Job 0:此Job不體現咱們的業務邏輯代碼。這個Job是出於對後面計算的負載均衡的考慮。

Job 0包含有Stage 0、Stage 1。隨便看一個Stage,好比Stage 1。看看其中的Aggregated Metrics by Executor部分:

發現此Stage在全部Executor上都存在。
 
Job 1:運行時間比較長,耗時1.5分鐘。

點擊Stage 2的連接,進去看看Aggregated Metrics By Executor部分:

能夠知道,Stage 2只在Worker4上的一個Executor執行,並且執行了1.5分鐘。
是否會以爲奇怪:從業務處理的角度看,咱們發送的那麼一點數據,沒有必要去啓動一個運行1.5分鐘的任務吧。那這個任務是作什麼呢?
從DAG Visualization部分,就知道此Job實際就是啓動了一個接收數據的Receiver:

原來Receiver是經過一個Job來啓動的。那確定有一個Action來觸發它。
看看Tasks部分:


只有一個Worker運行此Job。是用於接收數據。
Locality Level是PROCESS_LOCAL,原來是內存節點。因此,默認狀況下,數據接收不會使用磁盤,而是直接使用內存中的數據。

看來,Spark Streaming應用程序啓動後,本身會啓動一些Job。默認啓動了一個Job來接收數據,爲後續處理作準備。

重要啓示: 一個Spark應用程序中能夠啓動不少Job,而這些不一樣的Job之間能夠相互配合。這一認識爲咱們寫複雜Spark程序奠基了良好的基礎。
 
Job 2:看Details能夠發現有咱們程序的主要業務邏輯,體如今Stag 三、Stag四、Stag 5中。

咱們看Stag三、Stage4的詳情,能夠知道這2個Stage都是用4個Executor執行的。全部數據處理是在4臺機器上進行的。

Stag 5只在Worker4上。這是由於這個Stage有Shuffle操做。


Job3:有Stage 六、Stage 七、Stage 8。其中Stage 六、Stage 7被跳過。

看看Stage 8的Aggregated Metrics by Executor部分。能夠看到,數據處理是在4臺機器上進行的:

Job4: 也體現了咱們應用程序中的業務邏輯 。有Stage 九、Stage 十、Stage 11。其中Stage 九、Stage 10被跳過。

看看Stage 11的詳情。能夠看到,數據處理是在Worker2以外的其它3臺機器上進行的:

綜合以上的現象能夠知道,Spark Streaming的一個應用中,運行了這麼多Job,遠不是咱們從網絡博客或者書籍上看的那麼簡單。
咱們有必要經過這些現象,反過來回溯去尋根問源。不過此次暫不作深刻分析。
咱們的神奇之旅纔剛剛開始。

3 瞬間理解Spark Streaming本質

咱們先看一張圖:


 

以上的連續4個圖,分別對應如下4個段落的描述:

Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各類來源的實時輸入數據,進行處理後,處理結果保存在HDFS、Databases等各類地方。
Spark Streaming接收這些實時輸入數據流,會將它們按批次劃分,而後交給Spark引擎處理,生成按照批次劃分的結果流。
Spark Streaming提供了表示連續數據流的、高度抽象的被稱爲離散流的DStream。DStream本質上表示RDD的序列。任何對DStream的操做都會轉變爲對底層RDD的操做。
Spark Streaming使用數據源產生的數據流建立DStream,也能夠在已有的DStream上使用一些操做來建立新的DStream。
 
在咱們前面的實驗中,每300秒會接收一批數據,基於這批數據會生成RDD,進而觸發Job,執行處理。
 
DStream是一個沒有邊界的集合,沒有大小的限制。
DStream表明了時空的概念。隨着時間的推移,裏面不斷產生RDD。
鎖定到時間片後,就是空間的操做,也就是對本時間片的對應批次的數據的處理。
 
下面用實例來說解數據處理過程。
從Spark Streaming程序轉換爲Spark執行的做業的過程當中,使用了DStreamGraph。
Spark Streaming程序中通常會有若干個對DStream的操做。DStreamGraph就是由這些操做的依賴關係構成。
從程序到DStreamGraph的轉換,如如下圖例所示:

本例中,從每一個foreach開始,都會進行回溯。從後往前回溯這些操做之間的依賴關係,也就造成了DStreamGraph。
執行從DStream到RDD的轉換,也就造成了RDD Graph,以下圖所示:

空間維度肯定以後,隨着時間不斷推動,會不斷實例化RDD Graph,而後觸發Job去執行處理。
如今再去讀官方的Spark Streaming的文檔,就好理解多了。


看來咱們的學習,將從Spark Streaming的現象開始,深刻到Spark Core和Spark Streaming的本質。
 
備註: 本博客內容來源於Spark發行版本定製課程
相關文章
相關標籤/搜索