foreachRDD一般用來把SparkStream運行獲得的結果保存到外部系統好比HDFS、Mysql、Redis等等。瞭解下面的知識能夠幫助咱們避免不少誤區mysql
誤區1:實例化外部鏈接對象的位置不正確,好比下面代碼sql
- dstream.foreachRDD { rdd =>
- val connection = createNewConnection() // executed at the driver
- rdd.foreach { record =>
- connection.send(record) // executed at the worker
- }
- }
其實例化的鏈接對象在driver中,而後經過序列化的方式發送到各個Worker,但實際上Connection的序列化一般是沒法正確序列化的
誤區2:爲每條記錄都建立一個鏈接對象apache
- dstream.foreachRDD { rdd =>
- rdd.foreach { record =>
- val connection = createNewConnection()
- connection.send(record)
- connection.close()
- }
- }
雖然誤區1的問題獲得瞭解決,但一般狀況下,外部系統如mysql,其鏈接對象是很是難得的,若是一條記錄就申請一個鏈接資源,系統性能會很是糟糕
而後,給出了一個比較好的方法,爲每個分區建立一個鏈接對象,其具體代碼以下markdown
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- val connection = createNewConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- connection.close()
- }
- }
最後給出一個較優的方案,使用一個鏈接池來維護鏈接對象
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- // ConnectionPool is a static, lazily initialized pool of connections
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- ConnectionPool.returnConnection(connection) // return to the pool for future reuse
- }
- }
正如上面代碼闡述的,鏈接對象推薦是使用lazy關鍵字來修飾,用到的時候纔去實例化
下面給出網上一段把SparkStream的結果保存到Mysql中的代碼示例socket
- package spark.examples.streaming
-
- import java.sql.{PreparedStatement, Connection, DriverManager}
- import java.util.concurrent.atomic.AtomicInteger
-
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
-
- object SparkStreamingForPartition {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("NetCatWordCount")
- conf.setMaster("local[3]")
- val ssc = new StreamingContext(conf, Seconds(5))
- //The DStream is a collection of RDD, which makes the method foreachRDD reasonable
- val dstream = ssc.socketTextStream("192.168.26.140", 9999)
- dstream.foreachRDD(rdd => {
- //embedded function
- def func(records: Iterator[String]) {
- var conn: Connection = null
- var stmt: PreparedStatement = null
- try {
- val url = "jdbc:mysql://192.168.26.140:3306/person";
- val user = "root";
- val password = ""
- conn = DriverManager.getConnection(url, user, password)
- records.flatMap(_.split(" ")).foreach(word => {
- val sql = "insert into TBL_WORDS(word) values (?)";
- stmt = conn.prepareStatement(sql);
- stmt.setString(1, word)
- stmt.executeUpdate();
- })
- } catch {
- case e: Exception => e.printStackTrace()
- } finally {
- if (stmt != null) {
- stmt.close()
- }
- if (conn != null) {
- conn.close()
- }
- }
- }
- val repartitionedRDD = rdd.repartition(3)
- repartitionedRDD.foreachPartition(func)
- })
- ssc.start()
- ssc.awaitTermination()
- }
- }
注意的細節:性能
Dstream和RDD同樣是延遲執行,只有遇到action操做纔會真正去計算。所以在Dstream的內部RDD必須包含Action操做才能是接受到的數據獲得處理。即便代碼中包含foreachRDD,但在內部卻沒有action的RDD,SparkStream只會簡單地接受數據數據而不進行處理atom