大數據分析技術與實戰之Spark Streaming

在一些特定的領域中(例如金融、災害預警等),時間就是金錢、時間可能就是生命!然而傳統的批處理框架卻一直難以知足這些領域中的實時性需求。爲此,涌現出了一批如S四、Storm的流式計算框架。Spark是基於內存的大數據綜合處理引擎,具備優秀的做業調度機制和快速的分佈式計算能力,使其可以更加高效地進行迭代計算,所以Spark可以在必定程度上實現大數據的流式處理html

Spark Streaming是Spark上的一個流式處理框架,能夠面向海量數據實現高吞吐量、高容錯的實時計算。Spark Streaming支持多種類型數據源,包括Kafka、Flume、trwitter、zeroMQ、Kinesis以及TCP sockets等,如圖1所示。Spark Streaming實時接收數據流,並按照必定的時間間隔將連續的數據流拆分紅一批批離散的數據集;而後應用諸如map、reducluce、join和window等豐富的API進行復雜的數據處理;最後提交給Spark引擎進行運算,獲得批量結果數據,所以其也被稱爲準實時處理系統。java

 

圖1 Spark Streaming支持多種類型數據源程序員

目前應用最普遍的大數據流式處理框架是Storm。Spark Streaming 最低0.5~2s作一次處理(而Storm最快可達0.1s),在實時性和容錯方面不如Storm。然而Spark Streaming的集成性很是好,經過RDD不只可以與Spark上的全部組件無縫銜接共享數據,還能很是容易地與Kafka、Flume等分佈式日誌收集框架進行集成;同時Spark Streaming的吞吐量很是高,遠遠優於Storm的吞吐量,如圖2所示。因此雖然Spark Streaming的處理延遲高於Storm,可是在集成性與吞吐量方面的優點使其更適用於大數據背景。面試

 

圖2 Spark Streaming與Storm吞吐量比較圖apache

Spark Streaming基礎概念編程

批處理時間間隔網絡

在Spark Streaming中,對數據的採集是實時、逐條進行的,可是對數據的處理倒是分批進行的。所以,Spark Streaming須要設定一個時間間隔,將該時間間隔內採集到的數據統一進行處理,這個間隔稱爲批處理時間間隔。架構

也就是說對於源源不斷的數據,Spark Streaming是經過切分的方式,先將連續的數據流進行離散化處理。數據流每被切分一次,對應生成一個RDD,每一個RDD都包含了一個時間間隔內所獲取到的全部數據,所以數據流被轉換爲由若干個RDD構成的有序集合,而批處理時間間隔決定了Spark Streaming須要多久對數據流切分一次。Spark Streaming是Spark上的組件,其獲取的數據和數據上的操做最終仍以Spark做業的形式在底層的Spark內核中進行計算,所以批處理時間間隔不只影響數據處理的吞吐量,同時也決定了Spark Streaming向Spark提交做業的頻率和數據處理的延遲。須要注意的是,批處理時間間隔的設置會伴隨Spark Streaming應用程序的整個生命週期,沒法在程序運行期間動態修改,因此須要綜合考慮實際應用場景中的數據流特色和集羣的處理性能等多種因素進行設定。框架

窗口時間間隔dom

窗口時間間隔又稱爲窗口長度,它是一個抽象的時間概念,決定了Spark Streaming對RDD序列進行處理的範圍與粒度,即用戶能夠經過設置窗口長度來對必定時間範圍內的數據進行統計和分析。若是設批處理時間設爲1s,窗口時間間隔爲3s,如3圖所示,其中每一個實心矩形表示Spark Streaming每1秒鐘切分出的一個RDD,若干個實心矩形塊表示一個以時間爲序的RDD序列,而透明矩形框表示窗口時間間隔。易知窗口內RDD的數量最多爲3個,即Spark Streming 每次最多對3個RDD中的數據進行統計和分析。對於窗口時間間隔還須要注意如下幾點:

• 以圖3爲例,在系統啓動後的前3s內,因進入窗口的RDD不足3個,可是隨着時間的推移,最終窗口將被填滿。

• 不一樣窗口內所包含的RDD可能會有重疊,即當前窗口內的數據可能被其後續若干個窗口所包含,所以在一些應用場景中,對於已經處理過的數據不能當即刪除,以備後續計算使用。

• 窗口時間間隔必須是批處理時間間隔的整數倍。

 

圖3 窗口時間間隔示意圖

滑動時間間隔

滑動時間間隔決定了Spark Streaming對數據進行統計與分析的頻率,多出如今與窗口相關的操做中。滑動時間間隔是基於批處理時間間隔提出的,其必須是批處理時間間隔的整數倍。在默認的狀況下滑動時間間隔設置爲與批處理時間間隔相同的值。若是批處理時間間隔爲1s,窗口間隔爲3s,滑動時間間隔爲2s,如圖4所示,其含義是每隔2s對過去3s內產生的3個RDD進行統計分析。

 

圖4 滑動時間間隔、窗口時間間隔、批處理時間間隔綜合示意圖

DStream基本概念

DStream是Spark Streaming的一個基本抽象,它以離散化的RDD序列的形式近似描述了連續的數據流。DStream本質上是一個以時間爲鍵,RDD爲值的哈希表,保存了按時間順序產生的RDD,而每一個RDD封裝了批處理時間間隔內獲取到的數據。Spark Streaming每次將新產生的RDD添加到哈希表中,而對於已經再也不須要的RDD則會從這個哈希表中刪除,因此DStream也能夠簡單地理解爲以時間爲鍵的RDD的動態序列。設批處理時間間隔爲1s,圖5爲4s內產生的DStream示意圖。

 

圖5 DStream示意圖

Spark Streaming編程模式與案例分析

Spark Streaming編程模式

下面以Spark Streaming官方提供的WordCount代碼爲例來介紹Spark Streaming的使用方式。

示例1:

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

/*建立一個本地模式的StreamingContext,並設定master節點工做線程數爲2,並以1秒做爲批處理時間間隔。*/

val conf = new SparkConf().setMaster("local[2]").

setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))

/*經過獲取」localhost」節點9999端口中的實時數據流建立DStream。*/

val lines = ssc.socketTextStream("localhost", 9999)

/*以空格做爲分割DStream中數據的依據,使得每一行文本轉換爲若干個單詞。*/

val words = lines.flatMap(_.split(" "))

import org.apache.spark.streaming.StreamingContext._

/*對於words中的每一個單詞word,轉換爲相應的二元組形式(word,1),在此基礎上統計每一個單詞出現的次數。*/

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

//輸出DStream中每一個RDD中前10個元素。

wordCounts.print()

//啓動Spark Streaming應用程序。

ssc.start()

//等待計算完成。

ssc.awaitTermination()

Spark Streaming應用程序在功能結構上一般包含如下五部分,如上述示例1所示。

• 導入Spark Streaming相關包:Spark Streaming做爲Spark框架上的一個組件,具備很好的集成性。在開發Spark Streaming應用程序時,只需導入Spark Streaming相關包,無需額外的參數配置。

• 建立StreamingContext對象:同Spark應用程序中的SparkContext對象同樣, StreamingContext對象是Spark Streaming應用程序與集羣進行交互的惟一通道,其中封裝了Spark集羣的環境信息和應用程序的一些屬性信息。在該對象中一般須要指明應用程序的運行模式(示例1中設爲local[2])、設定應用程序名稱(示例1中設爲NetworkWordCount)、設定批處理時間間隔(示例1中設爲Seconds(1)即1秒鐘),其中批處理時間間隔須要根據用戶的需求和集羣的處理能力進行適當地設置。

• 建立InputDStream:Spark Streaming須要根據數據源類型選擇相應的建立DStream的方法。示例1中Spark Streaming經過StreamingContext對象調用socketTextStream方法處理以socket鏈接類型數據源,建立出DStream即lines。Spark Streaming同時支持多種不一樣的數據源類型,其中包括Kafka、Flume、HDFS/S三、Kinesis和Twitter等數據源。

• 操做DStream:對於從數據源獲得的DStream,用戶能夠調用豐富的操做對其進行處理。示例1中針對lines的一系列操做就是一個典型的WordCount執行流程:對於當前批處理時間間隔內的文本數據以空格進行切分,進而獲得words;再將words中每一個單詞轉換爲二元組,進而獲得pairs;最後利用reduceByKey方法進行統計。

• 啓動與中止Spark Streaming應用程序:在啓動Spark Streaming應用程序以前,DStream上全部的操做僅僅是定義了數據的處理流程,程序並無真正鏈接上數據源,也沒有對數據進行任何操做,當ssc.start()啓動後程序中定義的操做纔會真正開始執行。

文本文件數據處理案例

功能需求

實時監聽並獲取本地home/dong/Streamingtext目錄中新生成的文件(文件均爲英文文本文件,單詞之間使用空格進行間隔),並對文件中各單詞出現的次數進行統計。

代碼實現

package dong.spark

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds,StreamingContext}

import org.apache.spark.streaming.StreamingContext._

object StreamingFileWordCount {

def main(args: Array[String]): Unit ={

//以local模式運行,並設定master節點工做線程數爲2。

val sparkConf = new SparkConf().

setAppName("StreamingFileWordCount").

setMaster("local[2]")

/*建立StreamingContext實例,設定批處理時間間隔爲20秒。*/

val ssc = new StreamingContext(sparkConf,Seconds(20))

/*指定數據源來自本地home/dong/Streamingtext。*/

val lines = ssc.textFileStream("/home/dong/Streamingtext")

/*在每一個批處理時間間隔內,對指定文件夾中變化的數據進行單詞統計並打印。*/

val words= lines.flatMap(_.split(" "))

val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_)

wordcounts.print()

ssc.start()

ssc.awaitTermination()

}

}

運行演示

第1步,啓動Hadoop與Spark。

$ start-all.sh

$ cd spark-1.4.0-bin-hadoop2.4

$ sbin/start-all.sh

第2步,建立Streaming監控目錄。

$ mkdir /home/dong/Streamingtext

在dong用戶主目錄下建立Streamingtext爲Spark Streaming監控的目錄,建立後如圖6所示。

 

圖6 dong用戶主目錄下建立Streamingtext文件夾

第3步,在IntelliJ IDEA中編輯運行Streaming程序。在IntelliJ IDEA中建立工程StreamingFileWordCount,編輯對象StreamingFileWordCount,如圖7所示。

 

圖7 IntelliJ IDEA中StreamingFileWordCount示意圖

因爲該示例沒有輸入參數,所以不須要配置參數,可直接單擊右鍵->單擊"Run‘StreamingFileWordCount’ "。

第4步,在監聽目錄下建立文本文件。在master節點上的/home/dong/Streamingtext中分別建立file1.txt與file2.txt。

file1.txt內容以下:

aa

bb

file2.txt內容以下:

ee

dd

cc

建立後,/home/dong/Streamingtext中內容如圖8所示。

 

圖8 Streamingtext文件夾內容示意圖

查看結果

終端窗口輸出了每一個批處理時間間隔(20秒)內,/home/dong/Streamingtext中新生成文件所包含的各單詞個數,如圖9所示。

 

圖9 StreamingFileWordCount運行結果示意圖

網絡數據處理案例

功能需求

監聽本地節點指定端口傳輸的數據流(本案例爲master節點9999端口的英文文本數據,以逗號間隔單詞),每5秒統計一次該時間間隔內收集到的各單詞的個數。

代碼實現

本案例涉及數據流模擬器和分析器兩部分。爲了更接近真實的網絡環境,首先定義數據流模擬器,該模擬器以Socket方式監聽網絡中指定節點上的指定端口號(master節點9999端口),當外部程序經過該端口鏈接並請求數據時,數據流模擬器將定時地從指定文本文件中隨機選取數據發送至指定端口(每間隔1秒鐘數據流模擬器從master節點上的/home/dong/Streamingtext/file1.txt中隨機截取一行文本發送給master節點的9999端口),經過這種方式模擬網絡環境下源源不斷的數據流。針對獲取到的實時數據,再定義分析器(Spark Streaming應用程序),用以統計時間間隔(5秒)內收集到的單詞個數。

數據流模擬器代碼實現以下:

package dong.spark

import java.io.{PrintWriter}

import java.net.ServerSocket

import scala.io.Source

objectSocketSimulation {

//定義隨機獲取整數的方法。

def index(length: Int)={

import java.util.Random

val rdm = new Random

rdm.nextInt(length)

}

def main(args:Array[String]): Unit ={

if(args.length!=3){

/*調用數據流模擬器須要三個參數:文件路徑、端口號和批處理時間間隔時間(單位:毫秒)。*/

System.err.println("Usage:")

System.exit(1)

}

//獲取指定文件總的行數。

val filename = args(0)

val lines = Source.fromFile(filename).getLines().toList

val filerow=lines.length

//指定監聽參數args(1)指定的端口,當外部程序請求時創建鏈接。

val listener =new ServerSocket(args(1).toInt)

while(true){

val socket = listener.accept()

new Thread(){

override def run={

println("Got client connected from:"+socket.getInetAddress)

val out = new PrintWriter(socket.getOutputStream(),true)

while(true){

Thread.sleep(args(2).toLong)

//當該端口接受請求時,隨機獲取某行數據發送給對方。

val content= lines(index(filerow))

println(content)

out.write(content+'\n')

out.flush()

}

socket.close()

}

}.start()

}

}

}

分析器代碼以下:

package dong.spark

import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.storage.StorageLevel

object NetworkWordCount {

def main (args:Array[String]) ={

//以local模式運行,並設定master節點工做線程數爲2。

val conf=new SparkConf().setAppName("NetworkWordCount").

setMaster("local[2]")

val sc=new SparkContext(conf)

val ssc=new StreamingContext(sc, Seconds(5))

/*經過socketTextStream獲取指定節點指定端口的數據建立DStream,並保存在內存和硬盤中,其中節點與端口分別對應參數args(0)和args(1)。*/

val lines=ssc.socketTextStream(args(0),

args(1).toInt,

StorageLevel.MEMORY_AND_DISK_SER)

//在每一個批處理時間間隔內對獲取到的數據進行單詞統計而且打印。

val words= lines.flatMap(_.split(","))

val wordcounts = words.map(x=>(x,1)).reduceByKey(_+_)

wordcounts.print()

ssc.start()

ssc.awaitTermination()

}

}

運行演示

第1步,在IntelliJ IDEA中編輯運行Streaming程序。master節點啓動IntelliJ IDEA,建立工程NetworkWordCount,編輯模擬器與分析器。模擬器如圖10所示,分析器如圖11所示。

 

圖10 IntelliJ IDEA中數據流模擬器示意圖

 

圖11 IntelliJ IDEA中分析器示意圖

第2步,建立模擬器數據源文件。在master節點建立/home/dong/Streamingtext目錄,在其中建立文本文件file1.txt。

file1.txt內容以下:

spark,

hello,

hbase,

world,

第3步,打包數據流模擬器。打包過程詳見本書4.3.3節。在Artifacts打包配置界面中,根據用戶實際scala安裝目錄,在Class Path中添加下述scala依賴包,如圖12所示。

/usr/scala-2.10.4/lib/scala-swing.jar

/usr/scala-2.10.4/lib/scala-library.jar

/usr/scala-2.10.4/lib/scala-actors.jar

 

圖12 在Class Path中添加scala依賴包

打包後在主目錄下生成NetworkWordCount.jar,如圖13所示。

圖13 在dong用戶主目錄下生成NetworkWordCount.jar示意圖

第4步,啓動數據流模擬器。在master節點開啓控制終端,經過下面代碼啓動數據流模擬器。

$ java -cp /home/dong/NetworkWordCount.jar dong.spark.SocketSimulation/ home/dong/Streamingtest/file1.txt 9999 1000

數據流模擬器每間隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機截取一行文本發送給master節點的9999端口。在分析器未鏈接時,數據流模擬器處於阻塞狀態,終端不會顯示輸出的文本。

第5步,運行分析器。在master上啓動IntelliJ IDEA編寫分析器代碼,而後單擊菜單"Build->"Build Artifacts",經過Application選項配置分析器運行所需的參數,其中Socket主機名爲master、端口號爲9999,參數之間用空格間隔,如圖13所示。

 

圖13 分析器參數配置示意圖

配置好參數後返回IntelliJ IDEA菜單欄,單擊"Run"->"Build Artifacts"運行分析器。

查看結果

第1步,在master上查看數據流模擬器運行狀況。IntelliJ IDEA運行分析器從而與數據流模擬器創建鏈接。當檢測到外部鏈接時,數據流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機截取一行文本發送給master節點上的9999端口。爲方便講解和說明,file1.txt中每一行只包含一個單詞,所以數據流模擬器每次僅發送一個單詞給端口,如圖14所示。

 

圖14 在master上模擬器運行結果

第2步,在master的IntelliJ IDEA中查看分析器運行狀況。在IntelliJ IDEA的運行日誌窗口中,能夠觀察到統計結果。經過分析可知Spark Streaming每一個批處理時間間隔內獲取的單詞數爲5,恰好是5秒內發送單詞的總數,並對各單詞進行了統計,如圖15所示。

 

圖15 IntelliJ IDEA中分析器運行結果

stateful應用案例

在不少數據流相關的實際應用場景中,對當前數據的統計分析須要藉助於先前的數據處理結果完成。例如電商每間隔10分鐘統計某一商品當前累計銷售總額、車站每隔3小時統計當前客流總量,等等。此類應用需求可藉助於Spark Streaming的有狀態轉換操做實現。

功能需求

監聽網絡中某節點上指定端口傳輸的數據流(slave1節點9999端口的英文文本數據,以逗號間隔單詞),每5秒分別統計各單詞的累計出現次數。

代碼實現

本案例功能的實現涉及數據流模擬器和分析器兩部分。

分析器代碼:

package dong.spark

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

object StatefulWordCount {

def main(args:Array[String]): Unit ={

/*定義更新狀態方法,參數values爲當前批處理時間間隔內各單詞出現的次數,state爲以往全部批次各單詞累計出現次數。*/

val updateFunc=(values: Seq[Int],state:Option[Int])=>{

val currentCount=values.foldLeft(0)(_+_)

val previousCount=state.getOrElse(0)

Some(currentCount+previousCount)

}

val conf=new SparkConf().

setAppName("StatefulWordCount").

setMaster("spark://192.168.149.132:7077")

val sc=new SparkContext(conf)

//建立StreamingContext,Spark Steaming運行時間間隔爲5秒。

val ssc=new StreamingContext(sc, Seconds(5))

/*使用updateStateByKey時須要checkpoint持久化接收到的數據。在集羣模式下運行時,須要將持久化目錄設爲HDFS上的目錄。*/

ssc.checkpoint("hdfs://master:9000/user/dong/input/StatefulWordCountlog")

/*經過Socket獲取指定節點指定端口的數據建立DStream,其中節點與端口分別由參數args(0)和args(1)給出。*/

val lines=ssc.socketTextStream(args(0),args(1).toInt)

val words=lines.flatMap(_.split(","))

val wordcounts=words.map(x=>(x,1))

//使用updateStateByKey來更新狀態,統計從運行開始以來單詞總的次數。

val stateDstream=wordcounts.updateStateByKey[Int](updateFunc)

stateDstream.print()

ssc.start()

ssc.awaitTermination()

}

}

運行演示

第1步,slave1節點啓動數據流模擬器。

第2步,打包分析器。master節點啓動IntelliJ IDEA建立工程StatefulWordCount編輯分析器,如圖16所示,並將分析器直接打包至master節點dong用戶的主目錄下,如圖17所示。

 

圖16 IntelliJ IDEA中StatefulWordCount示意圖

 

圖17 master上的StatefulWordCount.jar示意圖

第3步,運行分析器。在master節點開啓終端,經過下面代碼向Spark集羣提交應用程序。

$ bin/spark-submit ~/StatefulWordCount.jar slave1 9999

查看結果

第1步,查看slave1上數據流模擬器運行狀況。分析器在集羣上提交運行後與slave1上運行的數據流模擬器創建鏈接。當檢測到外部鏈接時,數據流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機截取一行文本發送給slave1節點上的9999端口。因爲該文本文件中每一行只包含一個單詞,所以每秒僅發送一個單詞給端口。如圖18所示。

 

圖18 slave1上數據流模擬器運行示意圖

第2步,查看master上分析器運行狀況。在master節點的提交窗口中能夠查看到統計結果,如圖19所示。

圖19 master上分析器運行示意圖

圖中代表截至147920770500ms分析器共接收到14個單詞,其中"spark"累計出現3次,"hbase"累計出現5次,"hello"累計出現3次,"world"累計出現3次。因爲批處理時間間隔是5s,模擬器每1秒發送1個單詞,使得分析器在5s內共接收到5個單詞,所以截止至147920771000ms,分析器共收到19個單詞,其中"spark"累計出現5次,"hbase"累計出現7次,"hello"累計出現4次,"world"累計出現3次。

第3步,查看HDFS中持久化目錄。運行後查看HDFS上的持久化目錄/user/dong/input/StatefulWordCountlog,如圖20所示。Streaming應用程序將接收到的網絡數據持久化至該目錄下,便於容錯處理。

圖20 HDFS上持久化目錄示意圖

window應用案例

在實際生產環境中,與窗口相關的應用場景很常見,例如電商每間隔10分鐘小時統計某一商品前30分鐘內累計銷售總額、車站每隔1小時統計前3個小時內的客流量等,此類需求可藉助Spark Streaming中的window相關操做實現。window應用案例同時涉及批處理時間間隔、窗口時間間隔與滑動時間間隔。

功能需求

監聽網絡中某節點上指定端口傳輸的數據流(slave1節點上9999端口的英文文本數據,以逗號間隔單詞),每10秒統計前30秒各單詞累計出現的次數。

代碼實現

本例功能的實現涉及數據流模擬器和分析器兩部分。

分析器代碼:

package dong.spark

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming._

import org.apache.spark.storage.StorageLevel

object WindowWordCount {

def main(args:Array[String]) ={

val conf=new SparkConf().setAppName("WindowWordCount").

setMaster("spark://192.168.149.132:7077")

val sc=new SparkContext(conf)

val ssc=new StreamingContext(sc, Seconds(5))

ssc.checkpoint("hdfs://master:9000/user/dong/WindowWordCountlog")

val lines=ssc.socketTextStream( args(0),

args(1).toInt,

StorageLevel.MEMORY_ONLY_SER)

val words= lines.flatMap(_.split(","))

/*採用reduceByKeyAndWindow操做進行疊加處理,窗口時間間隔與滑動時間間隔分別由參數args(2)和args(3)給出。*/

val wordcounts=words.map(x=>(x,1)).

reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(ar

gs(2).toInt),Seconds(args(3).toInt))

wordcounts.print()

ssc.start()

ssc.awaitTermination()

}

}

運行演示

第1步,slave1節點啓動數據流模擬器。

第2步,打包分析器。在master節點啓動IntelliJ IDEA建立工程WindowWordCount編輯分析器,如圖21,並將分析器直接打包至master節點dong用戶的主目錄下,如圖22所示。

 

圖21 IntelliJ IDEA中WindowWordCount示意圖

圖22 master上WindowWordCount.jar示意圖

第3步,運行分析器。在master節點開啓終端,經過下面代碼向Spark集羣提交應用程序。

$ bin/spark-submit ~/WindowWordCount.jar slave1 9999 30 10

查看結果

第1步 在slave1上查看數據流模擬器運行狀況。分析器在集羣上提交運行後與slave1上運行的數據流模擬器創建鏈接。當檢測到外部鏈接時,數據流模擬器將每隔1000毫秒從/home/dong/Streamingtext/file1.txt中隨機截取一行文本發送給slave1節點的9999端口。因爲該文本文件中每一行只包含一個單詞和一個逗號,所以每秒僅發送一個單詞和一個逗號給端口,如圖23所示。

圖23 在slave1上數據流模擬器運行示意圖

第2步,在master上查看分析器運行狀況。在master節點的提交窗口中能夠查看到統計結果。在WindowWordCount應用程序啓動初期,窗口並無被接收到的單詞填滿,但隨着時間的推動,每一個窗口中的單詞數目最終固定爲30個。圖7.35只是截取了運行結果中的三個批次。因爲設定了窗口時間間隔是30s,滑動時間間隔是10s,且數據流模擬器每間隔1s發送一個單詞,所以WindowWordCount每間隔10s對過去30s內收到的各單詞個數進行統計。圖24中截至1479276925000ms分析器對過去30s內收到的30個單詞進行統計,其中"spark"累計出現5次,"hbase"累計出現8次,"hello"累計出現9次,"world"累計出現8次。再間隔10s,截至1479276935000ms,分析器對過去30s內收到的30個單詞進行統計,其中"spark"累計出現8次,"hbase"累計出現9次,"hello"累計出現7次,"world"累計出現6次。

 

圖24 在master上分析器運行示意圖

第3步,查看持久化數據。運行後查看HDFS上的持久化目錄/user/dong/input/WindowWordCountlog,如圖25所示。Streaming應用程序將接收到的網絡數據持久化至該目錄下,便於容錯處理。

 

圖25 HDFS上持久化目錄示意圖

性能考量

在開發Spark Streaming應用程序時,要結合集羣中各節點的配置狀況儘量地提升數據處理的實時性。在調優的過程當中,一方面要儘量利用集羣資源來減小每一個批處理的時間;另外一方面要確保接收到的數據能及時處理掉。

運行時間優化

設置合理的批處理時間和窗口大小

Spark Streaming中做業之間一般存在依賴關係,後面的做業必須確保前面的做業執行結束後才能提交,若前面的做業的執行時間超過了設置的批處理時間間隔,那麼後續的做業將沒法按時提交執行,形成做業的堵塞。也就是說若想Spark Streaming應用程序穩定地在集羣中運行,對於接收到的數據必須儘快處理掉。例如若設定批處理時間爲1秒鐘,那麼系統每1秒鐘生成一個RDD,若是系統計算一個RDD的時間大於1秒,那麼當前的RDD還沒來得及處理,後續的RDD已經提交上來在等待處理了,這就產生了堵塞。所以須要設置一個合理的批處理時間間隔以確保做業可以在這個批處理時間間隔時間內結束。許多實驗數據代表,500毫秒對大多Spark Streaming應用而言是較好的批處理時間間隔。

相似地,對於窗口操做,滑動時間間隔對於性能也有很大的影響。當單批次數據計算代價太高時,能夠考慮適當增大滑動時間間隔。

對於批處理時間和窗口大小的設定,並無統一的標準。一般是先從一個比較大的批處理時間(10秒左右)開始,而後不斷地使用更小的值進行對比測試。若是Spark Streaming用戶界面中顯示的處理時間保持不變,則能夠進一步設定更小的值;若是處理時間開始增長,則可能已經達到了應用的極限,再減少該值則可能會影響系統的性能。

提升並行度

提升並行度也是一種減小批處理所消耗時間的常見方法。有如下三種方式能夠提升並行度。一種方法是增長接收器數目。若是獲取的數據太多,則可能致使單個節點來不及對數據進行讀入與分發,使得接收器成爲系統瓶頸。這時能夠經過建立多個輸入DStream來增長接收器數目,而後再使用union來把數據合併爲一個數據源。第二種方法是將收到的數據顯式地從新分區。若是接收器數目沒法再增長,能夠經過使用DStream.repartition、spark.streaming.blocklnterval等參數顯式地對Dstream進行從新分區。第三種方法是提升聚合計算的並行度。對於會致使shuffle的操做,例如reduceByKey、reduceByKeyAndWindow等操做,可經過顯示設置更高的行度參數確保更爲充分地使用集羣資源。

內存使用與垃圾回收

控制批處理時間間隔內的數據量

Spark Streaming會把批處理時間間隔內獲取到的全部數據存放在Spark內部可用的內存中。所以必須確保在當前節點上SparkStreaming可用的內存容量至少能容下一個批處理時間間隔內全部的數據。好比一個批處理時間間隔是1秒,可是1秒產生了1GB的數據,那麼要確保當前的節點上至少有可供SparkStreaming使用的1GB內存。

及時清理再也不使用的數據

對於內存中處理過的、再也不須要的數據應及時清理,以確保Spark Streaming可以擁有足夠的內存空間可使用。一種方法是能夠經過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據,但該方法應慎重使用,以避免後續數據在須要時被錯誤清理。另外一種方法是將spark.streaming.unpersist設置爲true,系統將自動清理已經不須要的RDD。該方法能顯著減小RDD對內存的須要,同時潛在地提升GC的性能。此外用戶還能夠經過配置參數streamingContext.remember爲數據設置更長的保留時間。

減小序列化與反序列化的負

SparkStreaming默認將接收到的數據序列化後放入內存,以減小內存使用。序列化和反序列化須要更多的CPU資源,所以使用適當的序列化工具(例如Kryo)和自定義的序列化接口能夠更高效地使用CPU。除了使用更好的序列化工具外還能夠結合壓縮機制,經過配置spark.rdd.compress,以CPU的時間開銷來換取內存資源,下降GC開銷。

結語

爲了幫助你們讓學習變得輕鬆、高效,給你們免費分享一大批資料,幫助你們在成爲大數據工程師,乃至架構師的路上披荊斬棘。在這裏給你們推薦一個大數據學習交流圈:

658558542   

歡迎你們進羣交流討論,學習交流,共同進步。

當真正開始學習的時候不免不知道從哪入手,致使效率低下影響繼續學習的信心。

但最重要的是不知道哪些技術須要重點掌握,學習時頻繁踩坑,最終浪費大量時間,因此有有效資源仍是頗有必要的。

最後祝福全部遇到瓶疾且不知道怎麼辦的大數據程序員們,祝福你們在日後的工做與面試中一切順利。

相關文章
相關標籤/搜索