Spark Streaming中的操做函數講解html
根據根據Spark官方文檔中的描述,在Spark Streaming應用中,一個DStream對象能夠調用多種操做,主要分爲如下幾類java
map操做須要傳入一個函數當作參數,具體調用形式爲mysql
主要做用是,對DStream對象a,將func函數做用到a中的每個元素上並生成新的元素,獲得的DStream對象b中包含這些新的元素。
下面示例代碼的做用是,在接收到的一行消息後面拼接一個」_NEW」字符串sql
程序運行結果以下:
注意與接下來的flatMap操做進行比較。數據庫
相似於上面的map操做,具體調用形式爲apache
主要做用是,對DStream對象a,將func函數做用到a中的每個元素上並生成0個或多個新的元素,獲得的DStream對象b中包含這些新的元素。併發
下面示例代碼的做用是,在接收到的一行消息lines後,將lines根據空格進行分割,分割成若干個單詞app
結果以下:
ide
filter傳入一個func函數,具體調用形式爲函數
對DStream a中的每個元素,應用func方法進行計算,若是func函數返回結果爲true,則保留該元素,不然丟棄該元素,返回一個新的DStream b。
下面示例代碼中,對words進行判斷,去除hello這個單詞。
結果以下:
這個操做將兩個DStream進行合併,生成一個包含着兩個DStream中全部元素的新DStream對象。
下面代碼,首先將輸入的每個單詞後面分別拼接「_one」和「_two」,最後將這兩個DStream合併成一個新的DStream
運行結果以下:
統計DStream中每一個RDD包含的元素的個數,獲得一個新的DStream,這個DStream中只包含一個元素,這個元素是對應語句單詞統計數值。
如下代碼,統計每一行中的單詞數
運行結果以下,一行輸入4個單詞,打印的結果也爲4。
返回一個包含一個元素的DStream,傳入的func方法會做用在調用者的每個元素上,將其中的元素順次的兩兩進行計算。
下面的代碼,將每個單詞用"-"
符號進行拼接
運行結果以下:
某個DStream中的元素類型爲K,調用這個方法後,返回的DStream的元素爲(K, Long)對,後面這個Long值是原DStream中每一個RDD元素key出現的頻率。
如下代碼統計words中不一樣單詞的個數
結果以下:
調用這個操做的DStream是以(K, V)的形式出現,返回一個新的元素格式爲(K, V)的DStream。返回結果中,K爲原來的K,V是由K通過傳入func計算獲得的。還能夠傳入一個並行計算的參數,在local模式下,默認爲2。在其餘模式下,默認值由參數spark.default.parallelism
肯定。
下面代碼將words轉化成(word, 1)的形式,再以單詞爲key,個數爲value,進行word count。
結果以下,
由一個DStream對象調用該方法,元素內容爲(k, V)
,傳入另外一個DStream對象,元素內容爲(k, W),返回的DStream中包含的內容是(k, (V, W))
。這個方法也能夠傳入一個並行計算的參數,該參數與reduceByKey中是相同的。
下面代碼中,首先將words轉化成(word, (word + "_one"))
和(word, (word + "_two"))
的形式,再以word爲key,將後面的value合併到一塊兒。
運行結果以下:
由一個DStream對象調用該方法,元素內容爲(k, V),傳入另外一個DStream對象,元素內容爲(k, W),返回的DStream中包含的內容是(k, (Seq[V], Seq[W]))
。這個方法也能夠傳入一個並行計算的參數,該參數與reduceByKey中是相同的。
下面代碼首先將words轉化成(word, (word + "_one"))
和(word, (word + "_two"))
的形式,再以word爲key,將後面的value合併到一塊兒。
結果以下:
在Spark-Streaming官方文檔中提到,DStream的transform
操做極大的豐富了DStream上可以進行的操做內容。使用transform操做後,除了可使用DStream提供的一些轉換方法以外,還可以直接調用任意的調用RDD上的操做函數。
好比下面的代碼中,使用transform完成將一行語句分割成單詞的功能。
運行結果以下:
我以爲用一個成語,管中窺豹,基本上就可以很形象的解釋什麼是窗口函數了。DStream數據流就是那隻豹子,窗口就是那個管,以一個固定的速率平移,就可以每次看到豹的一部分。
窗口函數,就是在DStream流上,以一個可配置的長度爲窗口,以一個可配置的速率向前移動窗口,根據窗口函數的具體內容,分別對當前窗口中的這一波數據採起某個對應的操做算子。須要注意的是窗口長度,和窗口移動速率須要是batch time的整數倍。接下來演示Spark Streaming中提供的主要窗口函數。
該操做由一個DStream對象調用,傳入一個窗口長度參數,一個窗口移動速率參數,而後將當前時刻當前長度窗口中的元素取出造成一個新的DStream。
下面的代碼以長度爲3,移動速率爲1截取源DStream中的元素造成新的DStream。
運行結果以下:
基本上每秒輸入一個字母,而後取出當前時刻3秒這個長度中的全部元素,打印出來。從上面的截圖中能夠看到,下一秒時已經看不到a了,再下一秒,已經看不到b和c了。表示a, b, c已經不在當前的窗口中。
返回指定長度窗口中的元素個數。
代碼以下,統計當前3秒長度的時間窗口的DStream中元素的個數:
結果以下:
相似於上面的reduce操做,只不過這裏再也不是對整個調用DStream進行reduce操做,而是在調用DStream上首先取窗口函數的元素造成新的DStream,而後在窗口元素造成的DStream上進行reduce。
代碼以下:
結果以下:
調用該操做的DStream中的元素格式爲(k, v),整個操做相似於前面的reduceByKey,只不過對應的數據源不一樣,reduceByKeyAndWindow的數據源是基於該DStream的窗口長度中的全部數據。該操做也有一個可選的併發數參數。
下面代碼中,將當前長度爲3的時間窗口中的全部數據元素根據key進行合併,統計當前3秒中內不一樣單詞出現的次數。
結果以下:
這個窗口操做和上一個的區別是多傳入一個函數invFunc。前面的func做用和上一個reduceByKeyAndWindow相同,後面的invFunc是用於處理流出rdd的。
在下面這個例子中,若是把3秒的時間窗口當成一個池塘,池塘每一秒都會有魚遊進或者游出,那麼第一個函數表示每由進來一條魚,就在該類魚的數量上累加。而第二個函數是,每由出去一條魚,就將該魚的總數減去一。
下面是演示結果,最終的結果是該3秒長度的窗口中歷史上出現過的全部不一樣單詞個數都爲0。
一段時間不輸入任何信息,看一下最終結果
相似於前面的countByValue操做,調用該操做的DStream數據格式爲(K, v),返回的DStream格式爲(K, Long)。統計當前時間窗口中元素值相同的元素的個數。
代碼以下
結果以下
Join主要可分爲兩種,
這種join通常應用於窗口函數造成的DStream對象之間,具體能夠參考第一部分中的join操做,除了簡單的join以外,還有leftOuterJoin, rightOuterJoin和fullOuterJoin。
這一種join,能夠參考前面transform操做中的示例。
在Spark Streaming中,DStream的輸出操做纔是DStream上全部transformations的真正觸發計算點,這個相似於RDD中的action操做。通過輸出操做DStream中的數據才能與外部進行交互,好比將數據寫入文件系統、數據庫,或其餘應用中。
print操做會將DStream每個batch中的前10個元素在driver節點打印出來。
看下面這個示例,一行輸入超過10個單詞,而後將這行語句分割成單個單詞的DStream。
看看print後的效果。
這個操做能夠將DStream中的內容保存爲text文件,每一個batch的數據單獨保存爲一個文夾,文件夾名前綴參數必須傳入,文件夾名後綴參數可選,最終文件夾名稱的完整形式爲prefix-TIME_IN_MS[.suffix]
好比下面這一行代碼
看一下執行結果,在當前項目路徑下,每秒鐘生成一個文件夾,打開的兩個窗口中的內容分別是nc窗口中的輸入。
另外,若是前綴中包含文件完整路徑,則該text文件夾會建在指定路徑下,以下圖所示
這個操做和前面一個相似,只不過這裏將DStream中的內容保存爲SequenceFile文件類型,這個文件中保存的數據都是通過序列化後的Java對象。
實驗略過,可參考前面一個操做。
這個操做和前兩個相似,將DStream每一batch中的內容保存到HDFS上,一樣能夠指定文件的前綴和後綴。