經過Spark Streaming的foreachRDD把處理後的數據寫入外部存儲系統中

轉載自:http://blog.csdn.net/erfucun/article/details/52312682java

本博文主要內容包括:mysql

  • 技術實現foreachRDD與foreachPartition解析
  • foreachRDD與foreachPartition實現實戰

一:技術實現foreach解析:sql

一、首先咱們看一下Output Operations on DStreams提供的API: 
這裏寫圖片描述 
這裏寫圖片描述數據庫

SparkStreaming的DStream提供了一個dstream.foreachRDD方法,該方法是一個功能強大的原始的API,它容許將數據發送到外部系統。然而,重要的是要了解如何正確有效地使用這種原始方法。一些常見的錯誤,以免以下: 
寫數據到外部系統,須要創建一個數據鏈接對象(例如TCP鏈接到遠程的服務器),使用它將數據發送到外部存儲系統。爲此開發者可能會在Driver中嘗試建立一個鏈接,而後在worker中使用它來保存記錄到外部數據。代碼以下:apache

 
 
  1. dstream.foreachRDD { rdd =>
  2.   val connection = createNewConnection()  // executed at the driver
  3.   rdd.foreach { record =>
  4.     connection.send(record) // executed at the worker
  5.   }}

上面的代碼是一個錯誤的演示,由於鏈接是在Driver中建立的,而寫數據是在worker中完成的。此時鏈接就須要被序列化而後發送到worker中。可是咱們知道,鏈接的信息是不能被序列化和反序列化的(不一樣的機器鏈接服務器須要使用不一樣的服務器端口,即使鏈接被序列化了也不能使用)編程

進而咱們能夠將鏈接移動到worker中實現,代碼以下:數組

 
 
  1. dstream.foreachRDD { rdd =>
  2.   rdd.foreach { record =>
  3.     val connection = createNewConnection()
  4.     connection.send(record)
  5.     connection.close()
  6.   }}

 

可是此時,每處理一條數據記錄,就須要鏈接一次外部系統,對於性能來講是個嚴重的問題。這也不是一個完美的實現。服務器

Spark基於RDD進行編程,RDD的數據不能改變,若是擅長foreachPartition底層的數據可能改變,作到的方式foreachPartition操做一個數據結構,RDD裏面一條條數據,可是一條條的記錄是能夠改變的spark也能夠運行在動態數據源上。(就像數組的數據不變,可是指向的索引能夠改變) 
咱們能夠將代碼作以下的改進:微信

 
 
  1. dstream.foreachRDD { rdd =>
  2.   rdd.foreachPartition { partitionOfRecords =>
  3.     val connection = createNewConnection()
  4.     partitionOfRecords.foreach(record => connection.send(record))
  5.     connection.close()
  6.   }}

 

這樣一個partition,只需鏈接一次外部存儲。性能上有大幅度的提升。可是不一樣的partition之間不能複用鏈接。咱們可使用鏈接池的方式,使得partition之間能夠共享鏈接。代碼以下:markdown

 
 
  1. stream.foreachRDD { rdd =>
  2.   rdd.foreachPartition { partitionOfRecords =>
  3.     // ConnectionPool is a static, lazily initialized pool of connections
  4.     val connection = ConnectionPool.getConnection()
  5.     partitionOfRecords.foreach(record => connection.send(record))
  6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  7.   }}

 

二:foreachRDD與foreachPartition實現實戰

一、須要注意的是: 
(1)、你最好使用forEachPartition函數來遍歷RDD,而且在每臺Work上面建立數據庫的connection。 
(2)、若是你的數據庫併發受限,能夠經過控制數據的分區來減小併發。 
(3)、在插入MySQL的時候最好使用批量插入。 
(4),確保你寫入的數據庫過程可以處理失敗,由於你插入數據庫的過程可能會通過網絡,這可能致使數據插入數據庫失敗。 
(5)、不建議將你的RDD數據寫入到MySQL等關係型數據庫中。

二、下面咱們使用SparkStreaming實現將數據寫到MySQL中:

(1)在pom.xml中加入以下依賴包

 
 
  1. <dependency>
  2.     <groupId>mysql</groupId>
  3.     <artifactId>mysql-connector-java</artifactId>
  4.     <version>5.1.38</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>commons-dbcp</groupId>
  8.     <artifactId>commons-dbcp</artifactId>
  9.     <version>1.4</version>
  10. </dependency>

(2)在MySql中建立數據庫和表,命令操做以下:

 
 
  1. mysql -uroot -p
  2. create database spark;
  3. use spark;
  4. show tables;
  5. create table streaming_itemcount(keyword varchar(30));

 

使用Java編寫一個數據庫鏈接池類

 
 
  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.util.LinkedList;
  4.  
  5. /**
  6.  * Created by zpf on 2016/8/26.
  7.  */
  8. public class ConnectionPool {
  9.     private static LinkedList<Connection> connectionQueue;
  10.  
  11.     static {
  12.         try {
  13.             Class.forName("com.mysql.jdbc.Driver");
  14.         } catch (ClassNotFoundException e) {
  15.             e.printStackTrace();
  16.         }
  17.     }
  18.  
  19.     public synchronized static Connection getConnection() {
  20.         try {
  21.             if (connectionQueue == null) {
  22.                 connectionQueue = new LinkedList<Connection>();
  23.                 for (int i = 0; i < 5; i++) {
  24.                     Connection conn = DriverManager.getConnection(
  25.                             "jdbc:mysql://Master:3306/sparkstreaming",
  26.                             "root",
  27.                             "12345");
  28.                     connectionQueue.push(conn);
  29.                 }
  30.             }
  31.         } catch (Exception e) {
  32.             e.printStackTrace();
  33.         }
  34.         return connectionQueue.poll();
  35.  
  36.     }
  37.     public  static void returnConnection(Connection conn){
  38.      connectionQueue.push(conn);
  39.     }
  40. }

 

編寫Spark代碼:

 
 
  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3.  
  4. /**
  5.   * Created by zpf on 2016/8/26.
  6.   */
  7. object OnlineForeachRDD2DB {
  8.   def main(args: Array[String]) {
  9.     val conf = new SparkConf().setAppName("OnlineForeachRDD2DB").setMaster("local[2]")
  10.     val ssc = new StreamingContext(conf, Seconds(5))
  11.  
  12.     val lines = ssc.socketTextStream("Master", 9999)
  13.     val words = lines.flatMap(_.split(" "))
  14.     val wordCounts = words.map(=> (x, 1)).reduceByKey(+ _)
  15.     wordCounts.foreachRDD { rdd =>
  16.       rdd.foreachPartition { partitionOfRecords => {
  17.         val connection = ConnectionPool.getConnection()
  18.         partitionOfRecords.foreach(record => {
  19.           val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
  20.           val stmt = connection.createStatement
  21.           stmt.executeUpdate(sql)
  22.         })
  23.         ConnectionPool.returnConnection(connection)
  24.  
  25.       }
  26.  
  27.       }
  28.     }
  29.   }
  30. }

 

打開netcat發送數據

 
 
  1. root@spark-master:~# nc -lk 9999
  2. spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop

 

打包運行spark代碼

 
 
  1. /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar

 

查看數據庫中的結果:

博文內容源自DT大數據夢工廠Spark課程總結的筆記相關課程內容視頻能夠參考: 百度網盤連接:http://pan.baidu.com/s/1slvODe1(若是連接失效或須要後續的更多資源,請聯繫QQ460507491或者微信號:DT1219477246 獲取上述資料)。

相關文章
相關標籤/搜索