1、Spark Streaming概述:
是基於Spark core的API,不須要單獨安裝,一盞式解決
可擴展、高吞吐量、容錯性、可以運行在多節點、結合了批處理、機器學習、圖計算等
將不一樣的數據源的數據通過Spark Streaming處理後輸出到外部文件系統
1. 應用場景:
實時交易防欺詐檢測、傳感器異常實時反應
整理Spark發展史問題(缺乏)
2. Spark Streaming工做原理:
粗粒度:
把實時數據流,以秒數拆分紅批次的小數據塊,經過Spark
當成RDD來處理
細粒度:
3. 核心概念:
編程入口:
StreamingContext
經常使用構造方法源碼:
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
batchDuration 是必須填的,根據應用程序的延遲需求和資源可用狀況來設置
定義好streamingContext後,再定義DStream、transformation等,經過start()開始,stop()結束
注意:
一個context啓動後,不能再運行新的streaming(一個JVM只能有一個streamingContext)
一旦中止後,就沒辦法再從新開始
Stop方法默認把sparkContext和streamingContext同時關掉,要不想關掉sc,必須定義stopSparkContext參數爲false
一個SparkContext可以建立多個StreamingContext
最基礎的抽象:Discretized Stream (DStream)
一系列的RDD表明一個DStream,是不可變的、分佈式的dataset
每個RDD表明一個時間段(批次)的數據
對DStream進行操做算子(flatMap)時,在底層上看就是對每個RDD作相同的操做,交由Spark core運行
數據輸入:Input DStreams and Receivers
每個Input DStream 關聯着一個Receiver(但從文件系統接收不須要receiver),receiver 接收數據並存在內存中
receiver須要佔用一個線程,因此不能定義local[1],線程的數量n必須大於receivers的數量
轉換:Transformations on DStreams
與RDD相似:map、flatMap、filter、repartition、count...
數據輸出:Output Operations and DStreams:
輸出到數據庫或者文件系統:
API:print、save、foreach
2、Spark Streaming實戰部分:
- Spark Streaming處理socket數據:
接收到的數據進行WordCount操做:
在IDEA中:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* Spark Streaming 處理Socket數據
* */
object NetWorkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWorkWordCount")
//建立streamingContext的兩個參數sparkConf和seconds
val ssc = new StreamingContext(sparkConf, Seconds(5))
//生成Input DStream
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
在控制檯中:
nc -lk 6789,建立一個Socket
在這上面輸入數據,就能夠在IDEA中count出來了
注意:
在執行過程當中會報錯,必須在Maven projects中找出報錯提示中所缺乏的包,而且在dependency上加入。
- Spark Streaming處理HDFS中的數據:
ssc.textFileStream("file_path")
一樣是像上面同樣,只是改了stream的source
可是測試時,必需要是生成新的文件(官網稱爲moving進去的文件),纔會被統計;而往舊的文件裏再添加數據,也不會被統計了
- Spark Streaming進階實戰:
帶狀態的算子UpdateStateByKey、保存到MySQL、window函數
UpdateStateByKey實現實時更新:
容許把新舊狀態結合,連續地更新
準備工做:
- 定義一個狀態
- 定義狀態更新的方法
注意:
- updateFunction須要隱式轉換
- 報錯:Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set.
意思就是要進行
checkpoint記錄
實現代碼:
把reduceByKey刪除,而且把map以後的RDD定義爲一個state,配合這個state寫狀態更新方法
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* Spark Streaming有狀態的統計
* */
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//使用狀態算子必需要設置checkpoint
//通常要保存記錄在HDFS中
ssc.checkpoint(".")
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ")).map((_, 1)) //不能用reduceByKey
//連續更新狀態
val state = result.updateStateByKey(updateFunction _) //須要隱式轉換
state.print()
ssc.start()
ssc.awaitTermination()
}
/*
* 狀態更新方法更新已有的數據,放在updateStateByKey中
* */
def updateFunction(currData: Seq[Int], prevData: Option[Int]): Option[Int] = {
val curr = currData.sum //算出當前的總次數
val prev = prevData.getOrElse(0) //讀取已有的
//返回已有和當前的和
Some(curr + prev)
}
}
- 統計結果寫到MySQL中:
前提準備:
須要在IDEA中增長mysql的connector依賴
在mysql數據庫中先建立一張表
寫jdbc建立鏈接到Mysql
使用foreachRDD,有不少種
錯誤的寫法:(沒有序列化,建立太多mysql鏈接等)
報錯沒有序列化:
dstream.foreachRDD {rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach {record =>connection.send(record) // executed at the worker
}
}
花太多開銷在鏈接和斷開數據庫上
dstream.foreachRDD {rdd =>
rdd.foreach {record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
官方正確寫法:
使用foreachPartition進行優化鏈接:
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
val connection = createNewConnection() //建立mysql鏈接
partitionOfRecords.foreach(record =>
connection.send(record))
connection.close()
}
}
用鏈接池進行進一步優化:
Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record =>connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
在寫入MySQL數據時,應該做一個是否存在的判斷:
若存在則使用update語句,不存在則使用insert語句
- Window的使用:
兩個參數:
window length:窗口長度
sliding interval:窗口間隔
也就是每隔sliding interval統計前window length的值
API:countByWindow、reduceByWindow…
- 實戰:黑名單過濾
transform算子的使用+Spark Streaming整合RDD操做
元組默認從1開始數
假設輸入數據爲id, name 這種形式
實現過程:
- 創建黑名單元組 => (name, true)
- 把輸入數據流編程元組 => (name, (id, name))
- transform,把每一個DStream變成一個個RDD操做
- 數據流的RDD與黑名單RDD進行leftjoin,得到新的元組
- filter判斷過濾
- 整合輸出
實現代碼:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 黑名單過濾demo
* */
object TransformApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//構建黑名單列表, 實際應用中可在外面讀取列表, 並轉成RDD, 用true標記爲是黑名單元組(name, true)
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
//獲取每行
val lines = ssc.socketTextStream("localhost", 6789)
//把id, name => 元組(name, (id, name))
//transform 的使用,對stream的每一個RDD操做
val filterResult = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
//與黑名單進行leftjoin => (name, ((id, name), true)), 並過濾出是true的項
rdd.leftOuterJoin(blacksRDD)
.filter(x => x._2._2.getOrElse(false) != true) //過濾出不等於true的
.map(x => x._2._1)
})
filterResult.print()
ssc.start()
ssc.awaitTermination()
}
}
- Spark Streaming整合Spark SQL
整合完成詞頻統計操做
官網代碼:
就是foreachRDD把streaming轉成RDD,而後toDF就能夠進行DataFrame或者是sql的操做了