Spark Streaming-DStream實戰案例java
做者:尹正傑nginx
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。數據庫
一.DStream建立apache
1>.文件數據源採集bootstrap
文件數據流:
可以讀取全部HDFS API兼容的文件系統文件,經過fileStream方法進行讀取,Spark Streaming 將會監控 dataDirectory 目錄並不斷處理移動進來的文件,記住目前不支持嵌套目錄。
streamingContext.textFileStream(dataDirectory)注意事項: 1)文件須要有相同的數據格式; 2)文件進入 dataDirectory的方式須要經過移動或者重命名來實現; 3)一旦文件移動進目錄,則不能再修改,即使修改了也不會讀取新數據;
舒適提示:
雖然Spark支持文件的採集,可是生產環境中不建議你們使用,畢竟須要我們本身寫scala代碼,我建議你們使用比較成熟的採集工具,好比filebeat,logstash,flume等工具。
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object FileDataSource { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(實時數據分析環境對象) * * 自定義採集週期: * 以指定的時間爲週期採集實時數據。我這裏指定採集週期是5秒.生產環境中咱們能夠將這個值改小,好比每秒採集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.監控文件夾建立DStream(即從指定文件夾中採集數據) */ val socketLineDStream:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101.yinzhengjie.org.cn", 8888) val fileDStream:DStream[String] = ssc.textFileStream("E:\\yinzhengjie\\bigdata\\input\\test") /** * 4>.將採集的數據進行扁平化操做(即將每一行數據作切分,造成一個個單詞) */ val wordDStreams:DStream[String] = fileDStream.flatMap(_.split(" ")) /** * 5>.將數據進行結構的轉換方便統計分析(即將單詞映射成元組(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 6>.將相同的單詞次數作統計 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 7>.將結果打印出來 */ wordToCountDStream.print() /** * 8>.啓動(SparkStreamingContext)採集器 */ ssc.start() /** * 9>.Driver等待採集器的執行(即禁止main線程主動退出) */ ssc.awaitTermination() /** * 舒適提示: * 我們的程序是實時處理數據的,所以生產環境中不能中止採集程序,所以不建議使用喲~ */ // ssc.stop() } }
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object HdfsDataSource { def main(args: Array[String]): Unit = { //1.初始化Spark配置信息 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount") //2.初始化SparkStreamingContext val ssc:StreamingContext = new StreamingContext(sparkConf, Seconds(5)) //3.監控HDFS文件夾建立DStream val dirStream:DStream[String] = ssc.textFileStream("hdfs://hadoop101.yinzhengjie.org.cn:9000/spark/fileStream") //4.將每一行數據作切分,造成一個個單詞 val wordStreams:DStream[String] = dirStream.flatMap(_.split("\t")) //5.將單詞映射成元組(word,1) val wordAndOneStreams:DStream[(String,Int)] = wordStreams.map((_, 1)) //6.將相同的單詞次數作統計 val wordAndCountStreams:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_ + _) //7.打印 wordAndCountStreams.print() //8.啓動SparkStreamingContext ssc.start() ssc.awaitTermination() } }
2>.RDD隊列ubuntu
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object RDDStream { def main(args: Array[String]) { //1.初始化Spark配置信息 val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream") //2.初始化SparkStreamingContext val ssc = new StreamingContext(conf, Seconds(5)) //3.建立RDD隊列(後期在隊列中不斷的放RDD) val rddQueue = new mutable.Queue[RDD[Int]]() /** * 4.建立QueueInputDStream * * 舒適提示: * 測試過程當中,能夠經過使用ssc.queueStream(queueOfRDDs)來建立DStream,每個推送到這個隊列中的RDD,都會做爲一個DStream處理。 */ val inputStream = ssc.queueStream(rddQueue,oneAtATime = false) //5.處理隊列中的RDD數據 val mappedStream = inputStream.map((_,1)) val reducedStream = mappedStream.reduceByKey(_ + _) //6.打印結果 reducedStream.print() //7.啓動任務 ssc.start() //8.循環建立並向RDD隊列中放入RDD for (i <- 1 to 5) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) } ssc.awaitTermination() } }
3>.自定義數據源windows
package com.yinzhengjie.bigdata.spark.streaming import java.io.{BufferedReader, InputStreamReader} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import java.net.Socket import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object CustomReceiver { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(實時數據分析環境對象) * * 自定義採集週期: * 以指定的時間爲週期採集實時數據。我這裏指定採集週期是5秒.生產環境中咱們能夠將這個值改小,好比每秒採集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.經過我們自定義的接收器來採集數據源 */ val receiverDStream:ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("hadoop101.yinzhengjie.org.cn",8888)) /** * 4>.將採集的數據進行扁平化操做(即將每一行數據作切分,造成一個個單詞) */ val wordDStreams:DStream[String] = receiverDStream.flatMap(_.split(" ")) /** * 5>.將數據進行結構的轉換方便統計分析(即將單詞映射成元組(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 6>.將相同的單詞次數作統計 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 7>.將結果打印出來 */ wordToCountDStream.print() /** * 8>.啓動(SparkStreamingContext)採集器 */ ssc.start() /** * 9>.Driver等待採集器的執行(即禁止main線程主動退出) */ ssc.awaitTermination() /** * 舒適提示: * 我們的程序是實時處理數據的,所以生產環境中不能中止採集程序,所以不建議使用喲~ */ // ssc.stop() } } /** * 聲明採集器 */ class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ var socket:Socket = null override def onStart(): Unit = { new Thread(new Runnable { override def run(): Unit = { receive() } }).start() } override def onStop(): Unit = { if(socket != null){ socket.close() socket = null } } def receive(): Unit = { socket = new Socket(host,port) val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8")) var line:String = null while ((line = reader.readLine()) != null) { //判斷程序什麼時候結束 if ("END".equals(line)){ return }else{ //將採集的數據存儲到採集器的內部進行轉換 this.store(line) } } } }
4>.kafka數據源網絡
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency>
[root@kafka201.yinzhengjie.com ~]# kafka-topics.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --list __consumer_offsets filebeat-ubuntu-syslog nginx-172-200-5-103 syslog-172-200-5-103 yinzhengjie-kafka [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# kafka-topics.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --create --topic yinzhengjie2020 --partitions 3 --replication-factor 2 Created topic yinzhengjie2020. [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# kafka-topics.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --list __consumer_offsets filebeat-ubuntu-syslog nginx-172-200-5-103 syslog-172-200-5-103 yinzhengjie-kafka yinzhengjie2020 [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]#
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils object KafkaSource { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(實時數據分析環境對象) * * 自定義採集週期: * 以指定的時間爲週期採集實時數據。我這裏指定採集週期是5秒.生產環境中咱們能夠將這個值改小,好比每秒採集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.使用spark官方提供的kafka工具類來採集數據 */ val kafkaDStream:ReceiverInputDStream[(String,String)] = KafkaUtils.createStream( //指定SparkStreamingContext對象 ssc, //指定zookeeper去集羣,注意哈,我在部署kafka是在zookeeper指定了chroot路徑了的,所以這裏要寫全喲~ "kafka201.yinzhengjie.com:2181/yinzhengjie-kafka", //指定消費者組id "yinzhengjie2020", //指定topic及分區數信息 Map("yinzhengjie2020" -> 3) ) /** * 4>.將採集的數據進行扁平化操做(即將每一行數據作切分,造成一個個單詞) * * 舒適提示: * kafka傳輸的消息格式是(k,v),只不過平時咱們不傳遞K的值默認爲null,所以咱們在作切割時應該只取value喲~ */ val wordDStreams:DStream[String] = kafkaDStream.flatMap(t => t._2.split(" ")) /** * 5>.將數據進行結構的轉換方便統計分析(即將單詞映射成元組(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 6>.將相同的單詞次數作統計 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 7>.將結果打印出來 */ wordToCountDStream.print() /** * 8>.啓動(SparkStreamingContext)採集器 */ ssc.start() /** * 9>.Driver等待採集器的執行(即禁止main線程主動退出) */ ssc.awaitTermination() /** * 舒適提示: * 我們的程序是實時處理數據的,所以生產環境中不能中止採集程序,所以不建議使用喲~ */ // ssc.stop() } }
二.DStream轉換app
DStream上的原語與RDD的相似,分爲Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操做中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各類Window相關的原語。
1>.無狀態轉化操做socket
無狀態轉化操做就是把簡單的RDD轉化操做應用到每一個批次上,也就是轉化DStream中的每個RDD。部分無狀態轉化操做列在了下表中。
舒適提示:
針對鍵值對的DStream轉化操做(好比 reduceByKey())要添加import StreamingContext._才能在Scala中使用。
須要記住的是,儘管這些函數看起來像做用在整個流上同樣,但事實上每一個DStream在內部是由許多RDD(批次)組成,且無狀態轉化操做是分別應用到每一個RDD上的。例如,reduceByKey()會歸約每一個時間區間中的數據,但不會歸約不一樣區間之間的數據。
舉個例子,在以前的wordcount程序中,咱們只會統計5秒內接收到的數據的單詞個數,而不會累加。
無狀態轉化操做也能在多個DStream間整合數據,不過也是在各個時間區間內。例如,鍵值對DStream擁有和RDD同樣的與鏈接相關的轉化操做,也就是cogroup()、join()、leftOuterJoin()等。
咱們能夠在DStream上使用這些操做,這樣就對每一個批次分別執行了對應的RDD操做。咱們還能夠像在常規的Spark 中同樣使用 DStream的union() 操做將它和另外一個DStream 的內容合併起來,也可使用StreamingContext.union()來合併多個流。
2>.有狀態轉化操做
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils object UpdateStateByKeyDemo { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(實時數據分析環境對象) * * 自定義採集週期: * 以指定的時間爲週期採集實時數據。我這裏指定採集週期是5秒.生產環境中咱們能夠將這個值改小,好比每秒採集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.保存數據的狀態,須要設定檢查點的路徑 */ ssc.sparkContext.setCheckpointDir("E:\\yinzhengjie\\bigdata\\spark\\checkpoint") /** * 4>.使用spark官方提供的kafka工具類來採集數據 */ val kafkaDStream:ReceiverInputDStream[(String,String)] = KafkaUtils.createStream( //指定SparkStreamingContext對象 ssc, //指定zookeeper去集羣,注意哈,我在部署kafka是在zookeeper指定了chroot路徑了的,所以這裏要寫全喲~ "kafka201.yinzhengjie.com:2181/yinzhengjie-kafka", //指定消費者組id "yinzhengjie2020", //指定topic及分區數信息 Map("yinzhengjie2020" -> 3) ) /** * 5>.將採集的數據進行扁平化操做(即將每一行數據作切分,造成一個個單詞) * * 舒適提示: * kafka傳輸的消息格式是(k,v),只不過平時咱們不傳遞K的值默認爲null,所以咱們在作切割時應該只取value喲~ */ val wordDStreams:DStream[String] = kafkaDStream.flatMap(t => t._2.split(" ")) /** * 6>.將數據進行結構的轉換方便統計分析(即將單詞映射成元組(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 7>.使用updateStateByKey來更新狀態,統計從運行開始以來單詞總的次數 * * UpdateStateByKey原語用於記錄歷史記錄: * 有時,咱們須要在 DStream 中跨批次維護狀態(例如流計算中累加wordcount)。 * 針對這種狀況,updateStateByKey() 爲咱們提供了對一個狀態變量的訪問,用於鍵值對形式的 DStream。給定一個由(鍵,事件)對構成的 DStream, * 並傳遞一個指定如何根據新的事件 更新每一個鍵對應狀態的函數,它能夠構建出一個新的 DStream,其內部數據爲(鍵,狀態)對。 * updateStateByKey() 的結果會是一個新的 DStream,其內部的 RDD 序列是由每一個時間區間對應的(鍵,狀態)對組成的。 * updateStateByKey操做使得咱們能夠在用新信息進行更新時保持任意的狀態。爲使用這個功能,你須要作下面兩步: * 1>.定義狀態,狀態能夠是一個任意的數據類型。 * 2>.定義狀態更新函數,用此函數闡明如何使用以前的狀態和來自輸入流的新值對狀態進行更新。 * 使用updateStateByKey須要對檢查點目錄進行配置,會使用檢查點來保存狀態。 */ val stateDStream:DStream[(String,Int)] = wordAndOneStreams.updateStateByKey{ case (seq,buffer) => { val sum = buffer.getOrElse(0) + seq.sum Option(sum) } } /** * 8.將結果打印出來 */ stateDStream.print() /** * 9>.啓動(SparkStreamingContext)採集器 */ ssc.start() /** * 10>.Driver等待採集器的執行(即禁止main線程主動退出) */ ssc.awaitTermination() /** * 舒適提示: * 我們的程序是實時處理數據的,所以生產環境中不能中止採集程序,所以不建議使用喲~ */ // ssc.stop() } }
package com.yinzhengjie.bigdata.spark.streaming object ScalaWindow { def main(args: Array[String]): Unit = { val list = List(10,20,30,40,50,60) //Scala默認支持滑動窗口函數 val ints:Iterator[List[Int]] = list.sliding(3) //每次滑動3個元素 for (list <- ints){ println(list.mkString(",")) } println("*" * 20 + " 我是分割線 " + "*" * 20 ) //Scala默認支持滑動窗口函數 val ints2:Iterator[List[Int]] = list.sliding(3,3) //每次滑動3個元素並指定步長 for (list <- ints2){ println(list.mkString(",")) } } }
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkWindowOperations { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(實時數據分析環境對象) * * 自定義採集週期: * 以指定的時間爲週期採集實時數據。我這裏指定採集週期是5秒.生產環境中咱們能夠將這個值改小,好比每秒採集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(3)) /** * 3>.使用spark官方提供的kafka工具類來採集數據 */ val kafkaDStream:ReceiverInputDStream[(String,String)] = KafkaUtils.createStream( //指定SparkStreamingContext對象 ssc, //指定zookeeper去集羣,注意哈,我在部署kafka是在zookeeper指定了chroot路徑了的,所以這裏要寫全喲~ "kafka201.yinzhengjie.com:2181/yinzhengjie-kafka", //指定消費者組id "yinzhengjie2020", //指定topic及分區數信息 Map("yinzhengjie2020" -> 3) ) /** * 4>.設置spark的滑動窗口大小 * Window Operations能夠設置窗口的大小和滑動窗口的間隔來動態的獲取當前Steaming的容許狀態。 * 基於窗口的操做會在一個比 StreamingContext 的批次間隔更長的時間範圍內,經過整合多個批次的結果,計算出整個窗口的結果。 * 全部基於窗口的操做都須要兩個參數,分別爲窗口時長以及滑動步長,二者都必須是 StreamContext 的批次間隔的整數倍。 * * 關於Window的操做有以下原語: * (1)window(windowLength, slideInterval): * 基於對源DStream窗化的批次進行計算返回一個新的Dstream * (2)countByWindow(windowLength, slideInterval): * 返回一個滑動窗口計數流中的元素。 * (3)reduceByWindow(func, windowLength, slideInterval): * 經過使用自定義函數整合滑動區間流元素來建立一個新的單元素流。 * (4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): * 當在一個(K,V)對的DStream上調用此函數,會返回一個新(K,V)對的DStream,此處經過對滑動窗口中批次數據使用reduce函數來整合每一個key的value值。 * Note:默認狀況下,這個操做使用Spark的默認數量並行任務(本地是2),在集羣模式中依據配置屬性(spark.default.parallelism)來作grouping。 * 你能夠經過設置可選參數numTasks來設置不一樣數量的tasks。 * (5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): * 這個函數是上述函數的更高效版本,每一個窗口的reduce值都是經過用前一個窗的reduce值來遞增計算。 * 經過reduce進入到滑動窗口數據並」反向reduce」離開窗口的舊數據來實現這個操做。一個例子是隨着窗口滑動對keys的「加」「減」計數。 * 經過前邊介紹能夠想到,這個函數只適用於」可逆的reduce函數」,也就是這些reduce函數有相應的」反reduce」函數(以參數invFunc形式傳入)。 * 如前述函數,reduce任務的數量經過可選參數來配置。注意:爲了使用這個操做,檢查點必須可用。 * (6)countByValueAndWindow(windowLength,slideInterval, [numTasks]): * 對(K,V)對的DStream調用,返回(K,Long)對的新DStream,其中每一個key的值是其在滑動窗口中頻率。如上,可配置reduce任務數量。 * reduceByWindow() 和 reduceByKeyAndWindow() 讓咱們能夠對每一個窗口更高效地進行歸約操做: * 它們接收一個歸約函數,在整個窗口上執行,好比 +。 * 除此之外,它們還有一種特殊形式,經過只考慮新進入窗口的數據和離開窗口的數據,讓 Spark 增量計算歸約結果。 * 這種特殊形式須要提供歸約函數的一個逆函數,比 如 + 對應的逆函數爲 -。對於較大的窗口,提供逆函數能夠大大提升執行效率。 * * 舒適提示: * (1)窗口大小應該是採集週期的整數倍 * (2)窗口滑動步長也應該是採集週期的整數倍 */ val windowDStream:DStream[(String,String)] = kafkaDStream.window(Seconds(9),Seconds(3)) /** * 5>.將採集的數據進行扁平化操做(即將每一行數據作切分,造成一個個單詞) * * 舒適提示: * kafka傳輸的消息格式是(k,v),只不過平時咱們不傳遞K的值默認爲null,所以咱們在作切割時應該只取value喲~ */ val wordDStreams:DStream[String] = windowDStream.flatMap(t => t._2.split(" ")) /** * 6>.將數據進行結構的轉換方便統計分析(即將單詞映射成元組(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 7>.將相同的單詞次數作統計 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 8>.將結果打印出來 */ wordToCountDStream.print() /** * 9>.啓動(SparkStreamingContext)採集器 */ ssc.start() /** * 10>.Driver等待採集器的執行(即禁止main線程主動退出) */ ssc.awaitTermination() /** * 舒適提示: * 我們的程序是實時處理數據的,所以生產環境中不能中止採集程序,所以不建議使用喲~ */ // ssc.stop() } }
3>.其它重要操做
Transform原語容許DStream上執行任意的RDD-to-RDD函數。即便這些函數並無在DStream的API中暴露出來,經過該函數能夠方便的擴展Spark API。該函數每一批次調度一次。其實也就是對DStream中的RDD應用轉換。 好比下面的例子,在進行單詞統計的時候,想要過濾掉spam的信息。 val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
鏈接操做(leftOuterJoin, rightOuterJoin, fullOuterJoin也能夠),能夠鏈接Stream-Stream,windows-stream to windows-stream、stream-dataset Stream-Stream Joins val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2) Stream-dataset joins val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
三.DStream輸出
輸出操做指定了對流數據經轉化操做獲得的數據所要執行的操做(例如把結果推入外部數據庫或輸出到屏幕上)。
與RDD中的惰性求值相似,若是一個DStream及其派生出的DStream都沒有被執行輸出操做,那麼這些DStream就都不會被求值。若是StreamingContext中沒有設定輸出操做,整個context就都不會啓動。
輸出操做以下: print():
在運行流程序的驅動結點上打印DStream中每一批次數據的最開始10個元素。這用於開發和調試。在Python API中,一樣的操做叫print()。 saveAsTextFiles(prefix, [suffix]):
以text文件形式存儲這個DStream的內容。每一批次的存儲文件名基於參數中的prefix和suffix。」prefix-Time_IN_MS[.suffix]」. saveAsObjectFiles(prefix, [suffix]):
以Java對象序列化的方式將Stream中的數據保存爲 SequenceFiles . 每一批次的存儲文件名基於參數中的爲"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。 saveAsHadoopFiles(prefix, [suffix]):
將Stream中的數據保存爲 Hadoop files. 每一批次的存儲文件名基於參數中的爲"prefix-TIME_IN_MS[.suffix]"。 Python API Python中目前不可用。 foreachRDD(func):
這是最通用的輸出操做,即將函數 func 用於產生於 stream的每個RDD。其中參數傳入的函數func應該實現將每個RDD中數據推送到外部系統,如將RDD存入文件或者經過網絡將其寫入數據庫。
注意:函數func在運行流應用的驅動中被執行,同時其中通常函數RDD操做從而強制其對於流RDD的運算。 通用的輸出操做foreachRDD(),它用來對DStream中的RDD運行任意計算。這和transform() 有些相似,均可以讓咱們訪問任意RDD。在foreachRDD()中,能夠重用咱們在Spark中實現的全部行動操做。 好比,常見的用例之一是把數據寫到諸如MySQL的外部數據庫中。 注意: (1)鏈接不能寫在driver層面; (2)若是寫在foreach則每一個RDD都建立,得不償失; (3)增長foreachPartition,在分區建立。