Spark Streaming屬於Spark的核心api,它支持高吞吐量、支持容錯的實時流數據處理。html
它能夠接受來自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的數據源,使用簡單的api函數好比 map
, reduce
, join
, window等操做,還能夠直接使用內置的機器學習算法、圖算法包來處理數據。
node
它的工做流程像下面的圖所示同樣,接受到實時數據後,給數據分批次,而後傳給Spark Engine處理最後生成該批次的結果。git
它支持的數據流叫Dstream,直接支持Kafka、Flume的數據源。Dstream是一種連續的RDDs,下面是一個例子幫助你們理解Dstream。
github
// 建立StreamingContext,1秒一個批次
val ssc = new StreamingContext(sparkConf, Seconds(1));
// 得到一個DStream負責鏈接 監聽端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort);
// 對每一行數據執行Split操做 val words = lines.flatMap(_.split(" ")); // 統計word的數量 val pairs = words.map(word => (word, 1)); val wordCounts = pairs.reduceByKey(_ + _); // 輸出結果 wordCounts.print(); ssc.start(); // 開始 ssc.awaitTermination(); // 計算完畢退出
具體的代碼能夠訪問這個頁面:算法
https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scalaapache
若是已經裝好Spark的朋友,咱們能夠經過下面的例子試試。windows
首先,啓動Netcat,這個工具在Unix-like的系統都存在,是個簡易的數據服務器。api
使用下面這句命令來啓動Netcat:服務器
$ nc -lk 9999
接着啓動example網絡
$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
在Netcat這端輸入hello world,看Spark這邊的
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount $ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ...
下面這塊是如何編寫代碼的啦,哇咔咔!
首先咱們要在SBT或者Maven工程添加如下信息:
groupId = org.apache.spark artifactId = spark-streaming_2.10 version = 0.9.0-incubating
//須要使用一下數據源的,還要添加相應的依賴
Source Artifact Kafka spark-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10
接着就是實例化
new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
這是以前的例子對DStream的操做。
除了sockets以外,咱們還能夠這樣建立Dstream
streamingContext.fileStream(dataDirectory)
這裏有3個要點:
(1)dataDirectory下的文件格式都是同樣
(2)在這個目錄下建立文件都是經過移動或者重命名的方式建立的
(3)一旦文件進去以後就不能再改變
假設咱們要建立一個Kafka的Dstream。
import org.apache.spark.streaming.kafka._ KafkaUtils.createStream(streamingContext, kafkaParams, ...)
若是咱們須要自定義流的receiver,能夠查看https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.html
對於Dstream,咱們能夠進行兩種操做,transformations 和 output
Transformation Meaning map(func) 對每個元素執行func方法 flatMap(func) 相似map函數,可是能夠map到0+個輸出 filter(func) 過濾 repartition(numPartitions) 增長分區,提升並行度 union(otherStream) 合併兩個流 count() 統計元素的個數 reduce(func) 對RDDs裏面的元素進行聚合操做,2個輸入參數,1個輸出參數 countByValue() 針對類型統計,當一個Dstream的元素的類型是K的時候,調用它會返回一個新的Dstream,包含<K,Long>鍵值對,Long是每一個K出現的頻率。 reduceByKey(func, [numTasks]) 對於一個(K, V)類型的Dstream,爲每一個key,執行func函數,默認是local是2個線程,cluster是8個線程,也能夠指定numTasks join(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream鏈接成一個(K, (V, W))的新Dstream cogroup(otherStream, [numTasks]) 把(K, V)和(K, W)的Dstream鏈接成一個(K, Seq[V], Seq[W])的新Dstream transform(func) 轉換操做,把原來的RDD經過func轉換成一個新的RDD
updateStateByKey(func) 針對key使用func來更新狀態和值,能夠將state該爲任何值
使用這個操做,咱們是但願保存它狀態的信息,而後持續的更新它,使用它有兩個步驟:
(1)定義狀態,這個狀態能夠是任意的數據類型
(2)定義狀態更新函數,從前一個狀態更改新的狀態
下面展現一個例子:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) }
它能夠用在包含(word, 1) 的Dstream當中,好比前面展現的example
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
它會針對裏面的每一個word調用一下更新函數,newValues是最新的值,runningCount是以前的值。
和transformWith同樣,能夠對一個Dstream進行RDD->RDD操做,好比咱們要對Dstream流裏的RDD和另一個數據集進行join操做,可是Dstream的API沒有直接暴露出來,咱們就可使用transform方法來進行這個操做,下面是例子:
val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information val cleanedDStream = inputDStream.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... })
另外,咱們也能夠在裏面使用機器學習算法和圖算法。
、
先舉個例子吧,好比前面的word count的例子,咱們想要每隔10秒計算一下最近30秒的單詞總數。
咱們可使用如下語句:
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
這裏面提到了windows的兩個參數:
(1)window length:window的長度是30秒,最近30秒的數據
(2)slice interval:計算的時間間隔
經過這個例子,咱們大概可以窗口的意思了,按期計算滑動的數據。
下面是window的一些操做函數,仍是有點兒理解不了window的概念,Meaning就不翻譯了,直接刪掉
Transformation Meaning window(windowLength, slideInterval) countByWindow(windowLength, slideInterval) reduceByWindow(func, windowLength, slideInterval) reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) countByValueAndWindow(windowLength, slideInterval, [numTasks])
Output Operation Meaning
print() 打印到控制檯
foreachRDD(func) 對Dstream裏面的每一個RDD執行func,保存到外部系統
saveAsObjectFiles(prefix, [suffix]) 保存流的內容爲SequenceFile, 文件名 : "prefix-TIME_IN_MS[.suffix]".
saveAsTextFiles(prefix, [suffix]) 保存流的內容爲文本文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".
saveAsHadoopFiles(prefix, [suffix]) 保存流的內容爲hadoop文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".
Dstream中的RDD也能夠調用persist()方法保存在內存當中,可是基於window和state的操做,reduceByWindow,
reduceByKeyAndWindow,
updateStateByKey它們就是隱式的保存了,系統已經幫它自動保存了。
從網絡接收的數據(such as, Kafka, Flume, sockets, etc.),默認是保存在兩個節點來實現容錯性,以序列化的方式保存在內存當中。
狀態的操做是基於多個批次的數據的。它包括基於window的操做和updateStateByKey。由於狀態的操做要依賴於上一個批次的數據,因此它要根據時間,不斷累積元數據。爲了清空數據,它支持週期性的檢查點,經過把中間結果保存到hdfs上。由於檢查操做會致使保存到hdfs上的開銷,因此設置這個時間間隔,要很慎重。對於小批次的數據,好比一秒的,檢查操做會大大下降吞吐量。可是檢查的間隔太長,會致使任務變大。一般來講,5-10秒的檢查間隔時間是比較合適的。
ssc.checkpoint(hdfsPath) //設置檢查點的保存位置 dstream.checkpoint(checkpointInterval) //設置檢查點間隔
對於必須設置檢查點的Dstream,好比經過updateStateByKey
和reduceByKeyAndWindow建立的Dstream,默認設置是至少10秒。
對於調優,能夠從兩個方面考慮:
(1)利用集羣資源,減小處理每一個批次的數據的時間
(2)給每一個批次的數據量的設定一個合適的大小
像一些分佈式的操做,好比reduceByKey和
reduceByKeyAndWindow,默認的8個併發線程,能夠經過對應的函數提升它的值,或者經過修改參數spark.default.parallelism來提升這個默認值。
經過進行的任務太多也很差,好比每秒50個,發送任務的負載就會變得很重要,很難實現壓秒級的時延了,固然能夠經過壓縮來下降批次的大小。
要使流程序能在集羣上穩定的運行,要使處理數據的速度跟上數據流入的速度。最好的方式計算這個批量的大小,咱們首先設置batch size爲5-10秒和一個很低的數據輸入速度。確實系統能跟上數據的速度的時候,咱們能夠根據經驗設置它的大小,經過查看日誌看看Total delay的多長時間。若是delay的小於batch的,那麼系統能夠穩定,若是delay一直增長,說明系統的處理速度跟不上數據的輸入速度。
Spark默認不會忘記元數據,好比生成的RDD,處理的stages,可是Spark Streaming是一個24/7的程序,它須要週期性的清理元數據,經過spark.cleaner.ttl來設置。好比我設置它爲600,當超過10分鐘的時候,Spark就會清楚全部元數據,而後持久化RDDs。可是這個屬性要在SparkContext 建立以前設置。
可是這個值是和任何的window操做綁定。Spark會要求輸入數據在過時以後必須持久化到內存當中,因此必須設置delay的值至少和最大的window操做一致,若是設置小了,就會報錯。
除了Spark內置的監控能力,還能夠StreamingListener這個接口來獲取批處理的時間, 查詢時延, 所有的端到端的試驗。
Spark Stream默認的序列化方式是StorageLevel.MEMORY_ONLY_SER,而不是RDD的StorageLevel.MEMORY_ONLY。
默認的,全部持久化的RDD都會經過被Spark的LRU算法剔除出內存,若是設置了spark.cleaner.ttl,就會週期性的清理,可是這個參數設置要很謹慎。一個更好的方法是設置spark.streaming.unpersist爲true,這就讓Spark來計算哪些RDD須要持久化,這樣有利於提升GC的表現。
推薦使用concurrent mark-and-sweep GC,雖然這樣會下降系統的吞吐量,可是這樣有助於更穩定的進行批處理。
下面有兩種失效的方式:
1.使用hdfs上的文件,由於hdfs是可靠的文件系統,因此不會有任何的數據失效。
2.若是數據來源是網絡,好比Kafka和Flume,爲了防止失效,默認是數據會保存到2個節點上,可是有一種可能性是接受數據的節點掛了,那麼數據可能會丟失,由於它還沒來得及把數據複製到另一個節點。
爲了支持24/7不間斷的處理,Spark支持驅動節點失效後,從新恢復計算。Spark Streaming會週期性的寫數據到hdfs系統,就是前面的檢查點的那個目錄。驅動節點失效以後,StreamingContext能夠被恢復的。
爲了讓一個Spark Streaming程序可以被回覆,它須要作如下操做:
(1)第一次啓動的時候,建立 StreamingContext,建立全部的streams,而後調用start()方法。
(2)恢復後重啓的,必須經過檢查點的數據從新建立StreamingContext。
下面是一個實際的例子:
經過StreamingContext.getOrCreate來構造StreamingContext,能夠實現上面所說的。
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreaminContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
在stand-alone的部署模式下面,驅動節點失效了,也能夠自動恢復,讓別的驅動節點替代它。這個能夠在本地進行測試,在提交的時候採用supervise模式,當提交了程序以後,使用jps查看進程,看到相似DriverWrapper就殺死它,若是是使用YARN模式的話就得使用其它方式來從新啓動了。
這裏順便提一下向客戶端提交程序吧,以前總結的時候把這塊給落下了。
./bin/spark-class org.apache.spark.deploy.Client launch [client-options] \ <cluster-url> <application-jar-url> <main-class> \ [application-options] cluster-url: master的地址. application-jar-url: jar包的地址,最好是hdfs上的,帶上hdfs://...不然要全部的節點的目錄下都有這個jar的
main-class: 要發佈的程序的main函數所在類.
Client Options:
--memory <count> (驅動程序的內存,單位是MB)
--cores <count> (爲你的驅動程序分配多少個核心)
--supervise (節點失效的時候,是否從新啓動應用)
--verbose (打印增量的日誌輸出)
在將來的版本,會支持全部的數據源的可恢復性。
爲了更好的理解基於HDFS的驅動節點失效恢復,下面用一個簡單的例子來講明:
Time Number of lines in input file Output without driver failure Output with driver failure 1 10 10 10 2 20 20 20 3 30 30 30 4 40 40 [DRIVER FAILS] no output 5 50 50 no output 6 60 60 no output 7 70 70 [DRIVER RECOVERS] 40, 50, 60, 70 8 80 80 80 9 90 90 90 10 100 100 100
在4的時候出現了錯誤,40,50,60都沒有輸出,到70的時候恢復了,恢復以後把以前沒輸出的一會兒所有輸出。