Spark Streaming 滑動窗口

Spark Streaming提供了滑動窗口操做的支持,從而讓咱們能夠對一個滑動窗口內的數據執行計算操做。每次掉落在窗口內的RDD的數據,會被聚合起來執行計算操做,而後生成的RDD,會做爲window DStream的一個RDD。apache

網官圖中所示,就是對每三秒鐘的數據執行一次滑動窗口計算,這3秒內的3個RDD會被聚合起來進行處理,而後過了兩秒鐘,又會對最近三秒內的數據執行滑動窗口計算。因此每一個滑動窗口操做,都必須指定兩個參數,窗口長度以及滑動間隔,並且這兩個參數值都必須是batch間隔的整數倍。併發

Spark Streaming對滑動窗口的支持,是比Storm更加完善和強大的。socket

 

以前有些朋友問:ide

spark官網圖片中: 滑動窗口寬度是3個時間單位,滑動時間是2兩個單位,這樣的話中間time3的Dstream不是重複計算了嗎? 函數

Answer:好比下面這個例子是針對熱搜的應用場景,官方的例子也多是是針對不一樣的場景給出了的。若是你不想出現重疊的部分,把滑動間隔由2改爲3便可spa

SparkStreaming對滑動窗口支持的轉換操做:3d

 示例講解:code

一、window(windowLength, slideInterval)orm

該操做由一個DStream對象調用,傳入一個窗口長度參數,一個窗口移動速率參數,而後將當前時刻當前長度窗口中的元素取出造成一個新的DStream。對象

下面的代碼以長度爲3,移動速率爲1截取源DStream中的元素造成新的DStream。

val windowWords = words.window(Seconds( 3 ), Seconds( 1))

基本上每秒輸入一個字母,而後取出當前時刻3秒這個長度中的全部元素,打印出來。從上面的截圖中能夠看到,下一秒時已經看不到a了,再下一秒,已經看不到b和c了。表示a, b, c已經不在當前的窗口中。

二、 countByWindow(windowLength,slideInterval)

返回指定長度窗口中的元素個數。

代碼以下,統計當前3秒長度的時間窗口的DStream中元素的個數:

val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))

三、 reduceByWindow(func, windowLength,slideInterval)

相似於上面的reduce操做,只不過這裏再也不是對整個調用DStream進行reduce操做,而是在調用DStream上首先取窗口函數的元素造成新的DStream,而後在窗口元素造成的DStream上進行reduce。

val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))

四、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

調用該操做的DStream中的元素格式爲(k, v),整個操做相似於前面的reduceByKey,只不過對應的數據源不一樣,reduceByKeyAndWindow的數據源是基於該DStream的窗口長度中的全部數據。該操做也有一個可選的併發數參數。

下面代碼中,將當前長度爲3的時間窗口中的全部數據元素根據key進行合併,統計當前3秒中內不一樣單詞出現的次數。

val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))

五、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])

這個窗口操做和上一個的區別是多傳入一個函數invFunc。前面的func做用和上一個reduceByKeyAndWindow相同,後面的invFunc是用於處理流出rdd的。

在下面這個例子中,若是把3秒的時間窗口當成一個池塘,池塘每一秒都會有魚遊進或者游出,那麼第一個函數表示每由進來一條魚,就在該類魚的數量上累加。而第二個函數是,每由出去一條魚,就將該魚的總數減去一。

val windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))

下面是演示結果,最終的結果是該3秒長度的窗口中歷史上出現過的全部不一樣單詞個數都爲0。

段時間不輸入任何信息,看一下最終結果

 

六、 countByValueAndWindow(windowLength,slideInterval, [numTasks])

相似於前面的countByValue操做,調用該操做的DStream數據格式爲(K, v),返回的DStream格式爲(K, Long)。統計當前時間窗口中元素值相同的元素的個數。

val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))

示例二:熱點搜索詞滑動統計,每隔10秒鐘,統計最近60秒鐘的搜索詞的搜索頻次,並打印出排名最靠前的3個搜索詞以及出現次數

Scala版本:

 

packagecom.spark.streamingimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.SparkConf/*** @author Ganymede*/object WindowHotWordS {def main(args: Array[String]): Unit = {val conf = newSparkConf().setAppName("WindowHotWordS").setMaster("local[2]")//Scala中,建立的是StreamingContextval ssc = new StreamingContext(conf,Seconds(5))val searchLogsDStream =ssc.socketTextStream("spark1", 9999)val searchWordsDStream =searchLogsDStream.map { searchLog => searchLog.split(" ")(1)}val searchWordPairDStream = searchWordsDStream.map{ searchWord => (searchWord, 1) }// reduceByKeyAndWindow// 第二個參數,是窗口長度,這是是60秒// 第三個參數,是滑動間隔,這裏是10秒// 也就是說,每隔10秒鐘,將最近60秒的數據,做爲一個窗口,進行內部的RDD的聚合,而後統一對一個RDD進行後續計算// 而是隻是放在那裏// 而後,等待咱們的滑動間隔到了之後,10秒到了,會將以前60秒的RDD,由於一個batch間隔是5秒,因此以前60秒,就有12個RDD,給聚合起來,而後統一執行reduceByKey操做// 因此這裏的reduceByKeyAndWindow,是針對每一個窗口執行計算的,而不是針對 某個DStream中的RDD// 每隔10秒鐘,出來以前60秒的收集到的單詞的統計次數val searchWordCountsDStream =searchWordPairDStream.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2,Seconds(60), Seconds(10))val finalDStream =searchWordCountsDStream.transform(searchWordCountsRDD => {val countSearchWordsRDD =searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))val sortedCountSearchWordsRDD =countSearchWordsRDD.sortByKey(false)val sortedSearchWordCountsRDD =sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))val top3SearchWordCounts =sortedSearchWordCountsRDD.take(3)for (tuple <-top3SearchWordCounts) {println("result : " +tuple)}searchWordCountsRDD})finalDStream.print()ssc.start()ssc.awaitTermination()}}​
相關文章
相關標籤/搜索