SparkStream:4)foreachRDD詳解

轉載自:http://blog.csdn.net/jiangpeng59/article/details/53318761java

 

foreachRDD一般用來把SparkStream運行獲得的結果保存到外部系統好比HDFS、Mysql、Redis等等。瞭解下面的知識能夠幫助咱們避免不少誤區mysql

 

誤區1:實例化外部鏈接對象的位置不正確,好比下面代碼sql

 
   
  1. dstream.foreachRDD { rdd =>  
  2.   val connection = createNewConnection()  // executed at the driver  
  3.   rdd.foreach { record =>  
  4.     connection.send(record) // executed at the worker  
  5.   }  
  6. }  
其實例化的鏈接對象在driver中,而後經過序列化的方式發送到各個Worker,但實際上Connection的序列化一般是沒法正確序列化的

 

誤區2:爲每條記錄都建立一個鏈接對象apache

 
   
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreach { record =>  
  3.     val connection = createNewConnection()  
  4.     connection.send(record)  
  5.     connection.close()  
  6.   }  
  7. }  
雖然誤區1的問題獲得瞭解決,但一般狀況下,外部系統如mysql,其鏈接對象是很是難得的,若是一條記錄就申請一個鏈接資源,系統性能會很是糟糕

 

而後,給出了一個比較好的方法,爲每個分區建立一個鏈接對象,其具體代碼以下markdown

 
 
   
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreachPartition { partitionOfRecords =>  
  3.     val connection = createNewConnection()  
  4.     partitionOfRecords.foreach(record => connection.send(record))  
  5.     connection.close()  
  6.   }  
  7. }  
最後給出一個較優的方案,使用一個鏈接池來維護鏈接對象
 
 
   
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreachPartition { partitionOfRecords =>  
  3.     // ConnectionPool is a static, lazily initialized pool of connections  
  4.     val connection = ConnectionPool.getConnection()  
  5.     partitionOfRecords.foreach(record => connection.send(record))  
  6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  
  7.   }  
  8. }  
正如上面代碼闡述的,鏈接對象推薦是使用lazy關鍵字來修飾,用到的時候纔去實例化

 

下面給出網上一段把SparkStream的結果保存到Mysql中的代碼示例socket

 
   
  1. package spark.examples.streaming  
  2.   
  3. import java.sql.{PreparedStatement, Connection, DriverManager}  
  4. import java.util.concurrent.atomic.AtomicInteger  
  5.   
  6. import org.apache.spark.SparkConf  
  7. import org.apache.spark.streaming.{Seconds, StreamingContext}  
  8. import org.apache.spark.streaming._  
  9. import org.apache.spark.streaming.StreamingContext._  
  10.   
  11. object SparkStreamingForPartition {  
  12.   def main(args: Array[String]) {  
  13.     val conf = new SparkConf().setAppName("NetCatWordCount")  
  14.     conf.setMaster("local[3]")  
  15.     val ssc = new StreamingContext(conf, Seconds(5))  
  16.     //The DStream is a collection of RDD, which makes the method foreachRDD reasonable  
  17.     val dstream = ssc.socketTextStream("192.168.26.140", 9999)  
  18.     dstream.foreachRDD(rdd => {  
  19.       //embedded function  
  20.       def func(records: Iterator[String]) {  
  21.         var conn: Connection = null  
  22.         var stmt: PreparedStatement = null  
  23.         try {  
  24.           val url = "jdbc:mysql://192.168.26.140:3306/person";  
  25.           val user = "root";  
  26.           val password = ""  
  27.           conn = DriverManager.getConnection(url, user, password)  
  28.           records.flatMap(_.split(" ")).foreach(word => {  
  29.             val sql = "insert into TBL_WORDS(word) values (?)";  
  30.             stmt = conn.prepareStatement(sql);  
  31.             stmt.setString(1, word)  
  32.             stmt.executeUpdate();  
  33.           })  
  34.         } catch {  
  35.           case e: Exception => e.printStackTrace()  
  36.         } finally {  
  37.           if (stmt != null) {  
  38.             stmt.close()  
  39.           }  
  40.           if (conn != null) {  
  41.             conn.close()  
  42.           }  
  43.         }  
  44.       }  
  45.       val repartitionedRDD = rdd.repartition(3)  
  46.       repartitionedRDD.foreachPartition(func)  
  47.     })  
  48.     ssc.start()  
  49.     ssc.awaitTermination()  
  50.   }  
  51. }  

注意的細節:性能

Dstream和RDD同樣是延遲執行,只有遇到action操做纔會真正去計算。所以在Dstream的內部RDD必須包含Action操做才能是接受到的數據獲得處理。即便代碼中包含foreachRDD,但在內部卻沒有action的RDD,SparkStream只會簡單地接受數據數據而不進行處理atom

相關文章
相關標籤/搜索