DStreams上的輸出操做


dstream.foreachRDD是一個強大的原語,發送數據到外部系統中。然而,明白怎樣正確地、有效地用這個原語是很是重要的。下面幾點介紹瞭如何避免通常錯誤。服務器

  • 常常寫數據到外部系統須要建一個鏈接對象(例如到遠程服務器的TCP鏈接),用它發送數據到遠程系統。爲了達到這個目的,開發人員可能不經意的在Spark驅動中建立一個鏈接對象,可是在Spark worker中 嘗試調用這個鏈接對象保存記錄到RDD中,以下:優化

 dstream.foreachRDD(rdd => {      val connection = createNewConnection()  // executed at the driver
      rdd.foreach(record => {
          connection.send(record) // executed at the worker
      })
  })

這是不正確的,由於這須要先序列化鏈接對象,而後將它從driver發送到worker中。這樣的鏈接對象在機器之間不能傳送。它可能表現爲序列化錯誤(鏈接對象不可序列化)或者初始化錯誤(鏈接對象應該 在worker中初始化)等等。正確的解決辦法是在worker中建立鏈接對象。scala

  • 然而,這會形成另一個常見的錯誤-爲每個記錄建立了一個鏈接對象。例如:code

dstream.foreachRDD(rdd => {
      rdd.foreach(record => {
          val connection = createNewConnection()
          connection.send(record)
          connection.close()
      })
  })

一般,建立一個鏈接對象有資源和時間的開支。所以,爲每一個記錄建立和銷燬鏈接對象會致使很是高的開支,明顯的減小系統的總體吞吐量。一個更好的解決辦法是利用rdd.foreachPartition方法。 爲RDD的partition建立一個鏈接對象,用這個兩件對象發送partition中的全部記錄。對象

dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          val connection = createNewConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          connection.close()
      })
  })

這就將鏈接對象的建立開銷分攤到了partition的全部記錄上了。資源

  • 最後,能夠經過在多個RDD或者批數據間重用鏈接對象作更進一步的優化。開發者能夠保有一個靜態的鏈接對象池,重複使用池中的對象將多批次的RDD推送到外部系統,以進一步節省開支。開發

dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })

須要注意的是,池中的鏈接對象應該根據須要延遲建立,而且在空閒一段時間後自動超時。這樣就獲取了最有效的方式發生數據到外部系統。get

其它須要注意的地方:it

  • 輸出操做經過懶執行的方式操做DStreams,正如RDD action經過懶執行的方式操做RDD。具體地看,RDD actions和DStreams輸出操做接收數據的處理。所以,若是你的應用程序沒有任何輸出操做或者 用於輸出操做dstream.foreachRDD(),可是沒有任何RDD action操做在dstream.foreachRDD()裏面,那麼什麼也不會執行。系統僅僅會接收輸入,而後丟棄它們。io

  • 默認狀況下,DStreams輸出操做是分時執行的,它們按照應用程序的定義順序按序執行。

相關文章
相關標籤/搜索