Spark Streaming編程指南

Overview

Spark Streaming屬於Spark的核心api,它支持高吞吐量、支持容錯的實時流數據處理。html

它能夠接受來自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的數據源,使用簡單的api函數好比 mapreducejoinwindow等操做,還能夠直接使用內置的機器學習算法、圖算法包來處理數據。node

 

它的工做流程像下面的圖所示同樣,接受到實時數據後,給數據分批次,而後傳給Spark Engine處理最後生成該批次的結果。git

它支持的數據流叫Dstream,直接支持Kafka、Flume的數據源。Dstream是一種連續的RDDs,下面是一個例子幫助你們理解Dstream。
github

A Quick Example

 

// 建立StreamingContext,1秒一個批次
val ssc = new StreamingContext(sparkConf, Seconds(1));
// 得到一個DStream負責鏈接 監聽端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort);
// 對每一行數據執行Split操做 val words = lines.flatMap(_.split(" ")); // 統計word的數量 val pairs = words.map(word => (word, 1)); val wordCounts = pairs.reduceByKey(_ + _); // 輸出結果 wordCounts.print(); ssc.start(); // 開始 ssc.awaitTermination(); // 計算完畢退出

具體的代碼能夠訪問這個頁面:算法

https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scalaapache

若是已經裝好Spark的朋友,咱們能夠經過下面的例子試試。windows

首先,啓動Netcat,這個工具在Unix-like的系統都存在,是個簡易的數據服務器。api

使用下面這句命令來啓動Netcat:服務器

$ nc -lk 9999

接着啓動example網絡

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999

在Netcat這端輸入hello world,看Spark這邊的

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world

...
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount

$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

 

Basics

下面這塊是如何編寫代碼的啦,哇咔咔!

首先咱們要在SBT或者Maven工程添加如下信息:

groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 0.9.0-incubating
//須要使用一下數據源的,還要添加相應的依賴
Source Artifact Kafka spark
-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10

 

接着就是實例化

new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])

這是以前的例子對DStream的操做。

 

Input Sources

除了sockets以外,咱們還能夠這樣建立Dstream

streamingContext.fileStream(dataDirectory)

 

這裏有3個要點:

(1)dataDirectory下的文件格式都是同樣

(2)在這個目錄下建立文件都是經過移動或者重命名的方式建立的

(3)一旦文件進去以後就不能再改變

假設咱們要建立一個Kafka的Dstream。

import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...)

 

若是咱們須要自定義流的receiver,能夠查看https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.html

Operations

對於Dstream,咱們能夠進行兩種操做,transformations 和 output 

Transformations

Transformation                          Meaning
map(func)                        對每個元素執行func方法
flatMap(func)                    相似map函數,可是能夠map到0+個輸出
filter(func)                     過濾
repartition(numPartitions)       增長分區,提升並行度     
union(otherStream)               合併兩個流
count()                    統計元素的個數
reduce(func)                     對RDDs裏面的元素進行聚合操做,2個輸入參數,1個輸出參數
countByValue()                   針對類型統計,當一個Dstream的元素的類型是K的時候,調用它會返回一個新的Dstream,包含<K,Long>鍵值對,Long是每一個K出現的頻率。
reduceByKey(func, [numTasks])    對於一個(K, V)類型的Dstream,爲每一個key,執行func函數,默認是local是2個線程,cluster是8個線程,也能夠指定numTasks 
join(otherStream, [numTasks])    把(K, V)和(K, W)的Dstream鏈接成一個(K, (V, W))的新Dstream 
cogroup(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream鏈接成一個(K, Seq[V], Seq[W])的新Dstream 
transform(func)                  轉換操做,把原來的RDD經過func轉換成一個新的RDD
updateStateByKey(func) 針對key使用func來更新狀態和值,能夠將state該爲任何值

UpdateStateByKey Operation

使用這個操做,咱們是但願保存它狀態的信息,而後持續的更新它,使用它有兩個步驟:

(1)定義狀態,這個狀態能夠是任意的數據類型

(2)定義狀態更新函數,從前一個狀態更改新的狀態

下面展現一個例子:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

 

它能夠用在包含(word, 1) 的Dstream當中,好比前面展現的example

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

 

它會針對裏面的每一個word調用一下更新函數,newValues是最新的值,runningCount是以前的值。

Transform Operation

transformWith同樣,能夠對一個Dstream進行RDD->RDD操做,好比咱們要對Dstream流裏的RDD和另一個數據集進行join操做,可是Dstream的API沒有直接暴露出來,咱們就可使用transform方法來進行這個操做,下面是例子:

val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information

val cleanedDStream = inputDStream.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

 

另外,咱們也能夠在裏面使用機器學習算法和圖算法。

Window Operations

先舉個例子吧,好比前面的word count的例子,咱們想要每隔10秒計算一下最近30秒的單詞總數。

咱們可使用如下語句:

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

 

這裏面提到了windows的兩個參數:

(1)window length:window的長度是30秒,最近30秒的數據

(2)slice interval:計算的時間間隔

經過這個例子,咱們大概可以窗口的意思了,按期計算滑動的數據。

下面是window的一些操做函數,仍是有點兒理解不了window的概念,Meaning就不翻譯了,直接刪掉

Transformation                                                                              Meaning
window(windowLength, slideInterval)     
countByWindow(windowLength, slideInterval)     
reduceByWindow(func, windowLength, slideInterval)     
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])     
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])    
countByValueAndWindow(windowLength, slideInterval, [numTasks])     

 

Output Operations

Output Operation                                      Meaning
print()                                 打印到控制檯
foreachRDD(func)                        對Dstream裏面的每一個RDD執行func,保存到外部系統
saveAsObjectFiles(prefix, [suffix])     保存流的內容爲SequenceFile, 文件名 : "prefix-TIME_IN_MS[.suffix]".
saveAsTextFiles(prefix, [suffix])       保存流的內容爲文本文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".
saveAsHadoopFiles(prefix, [suffix])     保存流的內容爲hadoop文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".

 

Persistence

 Dstream中的RDD也能夠調用persist()方法保存在內存當中,可是基於window和state的操做,reduceByWindow,reduceByKeyAndWindow,updateStateByKey它們就是隱式的保存了,系統已經幫它自動保存了。

從網絡接收的數據(such as, Kafka, Flume, sockets, etc.),默認是保存在兩個節點來實現容錯性,以序列化的方式保存在內存當中。

RDD Checkpointing

 狀態的操做是基於多個批次的數據的。它包括基於window的操做和updateStateByKey。由於狀態的操做要依賴於上一個批次的數據,因此它要根據時間,不斷累積元數據。爲了清空數據,它支持週期性的檢查點,經過把中間結果保存到hdfs上。由於檢查操做會致使保存到hdfs上的開銷,因此設置這個時間間隔,要很慎重。對於小批次的數據,好比一秒的,檢查操做會大大下降吞吐量。可是檢查的間隔太長,會致使任務變大。一般來講,5-10秒的檢查間隔時間是比較合適的。

ssc.checkpoint(hdfsPath)  //設置檢查點的保存位置
dstream.checkpoint(checkpointInterval)  //設置檢查點間隔

 

對於必須設置檢查點的Dstream,好比經過updateStateByKeyreduceByKeyAndWindow建立的Dstream,默認設置是至少10秒。

Performance Tuning

對於調優,能夠從兩個方面考慮:

(1)利用集羣資源,減小處理每一個批次的數據的時間

(2)給每一個批次的數據量的設定一個合適的大小

Level of Parallelism

像一些分佈式的操做,好比reduceByKey和reduceByKeyAndWindow,默認的8個併發線程,能夠經過對應的函數提升它的值,或者經過修改參數spark.default.parallelism來提升這個默認值。

Task Launching Overheads

經過進行的任務太多也很差,好比每秒50個,發送任務的負載就會變得很重要,很難實現壓秒級的時延了,固然能夠經過壓縮來下降批次的大小。

Setting the Right Batch Size

要使流程序能在集羣上穩定的運行,要使處理數據的速度跟上數據流入的速度。最好的方式計算這個批量的大小,咱們首先設置batch size爲5-10秒和一個很低的數據輸入速度。確實系統能跟上數據的速度的時候,咱們能夠根據經驗設置它的大小,經過查看日誌看看Total delay的多長時間。若是delay的小於batch的,那麼系統能夠穩定,若是delay一直增長,說明系統的處理速度跟不上數據的輸入速度。

24/7 Operation

Spark默認不會忘記元數據,好比生成的RDD,處理的stages,可是Spark Streaming是一個24/7的程序,它須要週期性的清理元數據,經過spark.cleaner.ttl來設置。好比我設置它爲600,當超過10分鐘的時候,Spark就會清楚全部元數據,而後持久化RDDs。可是這個屬性要在SparkContext 建立以前設置。

可是這個值是和任何的window操做綁定。Spark會要求輸入數據在過時以後必須持久化到內存當中,因此必須設置delay的值至少和最大的window操做一致,若是設置小了,就會報錯。

Monitoring

除了Spark內置的監控能力,還能夠StreamingListener這個接口來獲取批處理的時間, 查詢時延, 所有的端到端的試驗。

Memory Tuning

Spark Stream默認的序列化方式是StorageLevel.MEMORY_ONLY_SER,而不是RDD的StorageLevel.MEMORY_ONLY

默認的,全部持久化的RDD都會經過被Spark的LRU算法剔除出內存,若是設置了spark.cleaner.ttl,就會週期性的清理,可是這個參數設置要很謹慎。一個更好的方法是設置spark.streaming.unpersist爲true,這就讓Spark來計算哪些RDD須要持久化,這樣有利於提升GC的表現。

推薦使用concurrent mark-and-sweep GC,雖然這樣會下降系統的吞吐量,可是這樣有助於更穩定的進行批處理。

Fault-tolerance Properties

Failure of a Worker Node

下面有兩種失效的方式:

1.使用hdfs上的文件,由於hdfs是可靠的文件系統,因此不會有任何的數據失效。

2.若是數據來源是網絡,好比Kafka和Flume,爲了防止失效,默認是數據會保存到2個節點上,可是有一種可能性是接受數據的節點掛了,那麼數據可能會丟失,由於它還沒來得及把數據複製到另一個節點。

Failure of the Driver Node

爲了支持24/7不間斷的處理,Spark支持驅動節點失效後,從新恢復計算。Spark Streaming會週期性的寫數據到hdfs系統,就是前面的檢查點的那個目錄。驅動節點失效以後,StreamingContext能夠被恢復的。

爲了讓一個Spark Streaming程序可以被回覆,它須要作如下操做:

(1)第一次啓動的時候,建立 StreamingContext,建立全部的streams,而後調用start()方法。

(2)恢復後重啓的,必須經過檢查點的數據從新建立StreamingContext。

下面是一個實際的例子:

經過StreamingContext.getOrCreate來構造StreamingContext,能夠實現上面所說的。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreaminContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

 

在stand-alone的部署模式下面,驅動節點失效了,也能夠自動恢復,讓別的驅動節點替代它。這個能夠在本地進行測試,在提交的時候採用supervise模式,當提交了程序以後,使用jps查看進程,看到相似DriverWrapper就殺死它,若是是使用YARN模式的話就得使用其它方式來從新啓動了。

這裏順便提一下向客戶端提交程序吧,以前總結的時候把這塊給落下了。

./bin/spark-class org.apache.spark.deploy.Client launch
   [client-options] \
   <cluster-url> <application-jar-url> <main-class> \
   [application-options]

cluster-url: master的地址.
application-jar-url: jar包的地址,最好是hdfs上的,帶上hdfs://...不然要全部的節點的目錄下都有這個jar的 
main-class: 要發佈的程序的main函數所在類.
Client Options:
--memory <count> (驅動程序的內存,單位是MB)
--cores <count> (爲你的驅動程序分配多少個核心)
--supervise (節點失效的時候,是否從新啓動應用)
--verbose (打印增量的日誌輸出)

 

在將來的版本,會支持全部的數據源的可恢復性。

爲了更好的理解基於HDFS的驅動節點失效恢復,下面用一個簡單的例子來講明:

Time     Number of lines in input file     Output without driver failure     Output with driver failure
1      10                     10                    10
2      20                     20                    20
3      30                     30                    30
4      40                     40                    [DRIVER FAILS] no output
5      50                     50                    no output
6      60                     60                    no output
7      70                     70                    [DRIVER RECOVERS] 40, 50, 60, 70
8      80                     80                    80
9      90                     90                    90
10     100                     100                   100

 

 

在4的時候出現了錯誤,40,50,60都沒有輸出,到70的時候恢復了,恢復以後把以前沒輸出的一會兒所有輸出。

相關文章
相關標籤/搜索