dstream.foreachRDD是一個強大的原語,發送數據到外部系統中。然而,明白怎樣正確地、有效地用這個原語是很是重要的。下面幾點介紹瞭如何避免通常錯誤。服務器
常常寫數據到外部系統須要建一個鏈接對象(例如到遠程服務器的TCP鏈接),用它發送數據到遠程系統。爲了達到這個目的,開發人員可能不經意的在Spark驅動中建立一個鏈接對象,可是在Spark worker中 嘗試調用這個鏈接對象保存記錄到RDD中,以下:優化
dstream.foreachRDD(rdd => { val connection = createNewConnection() // executed at the driver rdd.foreach(record => { connection.send(record) // executed at the worker }) })
這是不正確的,由於這須要先序列化鏈接對象,而後將它從driver發送到worker中。這樣的鏈接對象在機器之間不能傳送。它可能表現爲序列化錯誤(鏈接對象不可序列化)或者初始化錯誤(鏈接對象應該 在worker中初始化)等等。正確的解決辦法是在worker中建立鏈接對象。scala
然而,這會形成另一個常見的錯誤-爲每個記錄建立了一個鏈接對象。例如:code
dstream.foreachRDD(rdd => { rdd.foreach(record => { val connection = createNewConnection() connection.send(record) connection.close() }) })
一般,建立一個鏈接對象有資源和時間的開支。所以,爲每一個記錄建立和銷燬鏈接對象會致使很是高的開支,明顯的減小系統的總體吞吐量。一個更好的解決辦法是利用rdd.foreachPartition
方法。 爲RDD的partition建立一個鏈接對象,用這個兩件對象發送partition中的全部記錄。對象
dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }) })
這就將鏈接對象的建立開銷分攤到了partition的全部記錄上了。資源
最後,能夠經過在多個RDD或者批數據間重用鏈接對象作更進一步的優化。開發者能夠保有一個靜態的鏈接對象池,重複使用池中的對象將多批次的RDD推送到外部系統,以進一步節省開支。開發
dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }) })
須要注意的是,池中的鏈接對象應該根據須要延遲建立,而且在空閒一段時間後自動超時。這樣就獲取了最有效的方式發生數據到外部系統。get
其它須要注意的地方:it
輸出操做經過懶執行的方式操做DStreams,正如RDD action經過懶執行的方式操做RDD。具體地看,RDD actions和DStreams輸出操做接收數據的處理。所以,若是你的應用程序沒有任何輸出操做或者 用於輸出操做dstream.foreachRDD()
,可是沒有任何RDD action操做在dstream.foreachRDD()
裏面,那麼什麼也不會執行。系統僅僅會接收輸入,而後丟棄它們。io
默認狀況下,DStreams輸出操做是分時執行的,它們按照應用程序的定義順序按序執行。