轉載自:http://blog.csdn.net/erfucun/article/details/52312682java
本博文主要內容包括:mysql
- 技術實現foreachRDD與foreachPartition解析
- foreachRDD與foreachPartition實現實戰
一:技術實現foreach解析:sql
一、首先咱們看一下Output Operations on DStreams提供的API:
數據庫
SparkStreaming的DStream提供了一個dstream.foreachRDD方法,該方法是一個功能強大的原始的API,它容許將數據發送到外部系統。然而,重要的是要了解如何正確有效地使用這種原始方法。一些常見的錯誤,以免以下:
寫數據到外部系統,須要創建一個數據鏈接對象(例如TCP鏈接到遠程的服務器),使用它將數據發送到外部存儲系統。爲此開發者可能會在Driver中嘗試建立一個鏈接,而後在worker中使用它來保存記錄到外部數據。代碼以下:apache
- dstream.foreachRDD { rdd =>
- val connection = createNewConnection() // executed at the driver
- rdd.foreach { record =>
- connection.send(record) // executed at the worker
- }}
上面的代碼是一個錯誤的演示,由於鏈接是在Driver中建立的,而寫數據是在worker中完成的。此時鏈接就須要被序列化而後發送到worker中。可是咱們知道,鏈接的信息是不能被序列化和反序列化的(不一樣的機器鏈接服務器須要使用不一樣的服務器端口,即使鏈接被序列化了也不能使用)編程
進而咱們能夠將鏈接移動到worker中實現,代碼以下:數組
- dstream.foreachRDD { rdd =>
- rdd.foreach { record =>
- val connection = createNewConnection()
- connection.send(record)
- connection.close()
- }}
可是此時,每處理一條數據記錄,就須要鏈接一次外部系統,對於性能來講是個嚴重的問題。這也不是一個完美的實現。服務器
Spark基於RDD進行編程,RDD的數據不能改變,若是擅長foreachPartition底層的數據可能改變,作到的方式foreachPartition操做一個數據結構,RDD裏面一條條數據,可是一條條的記錄是能夠改變的spark也能夠運行在動態數據源上。(就像數組的數據不變,可是指向的索引能夠改變)
咱們能夠將代碼作以下的改進:微信
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- val connection = createNewConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- connection.close()
- }}
這樣一個partition,只需鏈接一次外部存儲。性能上有大幅度的提升。可是不一樣的partition之間不能複用鏈接。咱們可使用鏈接池的方式,使得partition之間能夠共享鏈接。代碼以下:markdown
- stream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- // ConnectionPool is a static, lazily initialized pool of connections
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- ConnectionPool.returnConnection(connection) // return to the pool for future reuse
- }}
二:foreachRDD與foreachPartition實現實戰
一、須要注意的是:
(1)、你最好使用forEachPartition函數來遍歷RDD,而且在每臺Work上面建立數據庫的connection。
(2)、若是你的數據庫併發受限,能夠經過控制數據的分區來減小併發。
(3)、在插入MySQL的時候最好使用批量插入。
(4),確保你寫入的數據庫過程可以處理失敗,由於你插入數據庫的過程可能會通過網絡,這可能致使數據插入數據庫失敗。
(5)、不建議將你的RDD數據寫入到MySQL等關係型數據庫中。
二、下面咱們使用SparkStreaming實現將數據寫到MySQL中:
(1)在pom.xml中加入以下依賴包
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.38</version>
- </dependency>
- <dependency>
- <groupId>commons-dbcp</groupId>
- <artifactId>commons-dbcp</artifactId>
- <version>1.4</version>
- </dependency>
(2)在MySql中建立數據庫和表,命令操做以下:
- mysql -uroot -p
- create database spark;
- use spark;
- show tables;
- create table streaming_itemcount(keyword varchar(30));
使用Java編寫一個數據庫鏈接池類
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.util.LinkedList;
-
- /**
- * Created by zpf on 2016/8/26.
- */
- public class ConnectionPool {
- private static LinkedList<Connection> connectionQueue;
-
- static {
- try {
- Class.forName("com.mysql.jdbc.Driver");
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
-
- public synchronized static Connection getConnection() {
- try {
- if (connectionQueue == null) {
- connectionQueue = new LinkedList<Connection>();
- for (int i = 0; i < 5; i++) {
- Connection conn = DriverManager.getConnection(
- "jdbc:mysql://Master:3306/sparkstreaming",
- "root",
- "12345");
- connectionQueue.push(conn);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return connectionQueue.poll();
-
- }
- public static void returnConnection(Connection conn){
- connectionQueue.push(conn);
- }
- }
編寫Spark代碼:
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- /**
- * Created by zpf on 2016/8/26.
- */
- object OnlineForeachRDD2DB {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("OnlineForeachRDD2DB").setMaster("local[2]")
- val ssc = new StreamingContext(conf, Seconds(5))
-
- val lines = ssc.socketTextStream("Master", 9999)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords => {
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record => {
- val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
- val stmt = connection.createStatement
- stmt.executeUpdate(sql)
- })
- ConnectionPool.returnConnection(connection)
-
- }
-
- }
- }
- }
- }
打開netcat發送數據
- root@spark-master:~# nc -lk 9999
- spark hadoop kafka spark hadoop kafka spark hadoop kafka spark hadoop
打包運行spark代碼
- /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.35-bin.jar /root/Documents/SparkApps/SparkStreamApps.jar
查看數據庫中的結果:
博文內容源自DT大數據夢工廠Spark課程總結的筆記相關課程內容視頻能夠參考: 百度網盤連接:http://pan.baidu.com/s/1slvODe1(若是連接失效或須要後續的更多資源,請聯繫QQ460507491或者微信號:DT1219477246 獲取上述資料)。