DStream API提供的與轉化操做相關的方法以下:
apache
以下舉例詳解transform(func) 方法和 updateStateByKey(func) 方法:服務器
transform 方法及相似的 transformWith(func) 方法容許在 DStream 上應用任意 RDD-to-RDD 函數,它們能夠被應用於未在 DStream API 中暴露的任何 RDD 操做中。
下面舉例演示如何使用transform(func) 方法將一行語句分隔成多個單詞,具體步驟以下:
A、在Liunx中執行命令nc –lk 9999 啓動服務器且監聽socket服務,而且輸入數據I like spark streaming and Hadoop,具體命令以下:
B、打開IDEA開發工具,建立一個Maven項目,而且配置pom.xml文件,引入Spark Streaming相關的依賴包,pom.xml文件配置具體以下:
<!--設置依賴版本號-->socket
<properties> <spark.version>2.1.1</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> </dependencies>
注意:配置好pom.xml文件後,須要在項目的/src/main和/src/test目錄下分別建立scala目錄。
C、在項目的/src/main/scala目錄下建立包,接着建立一個名爲TransformTest的scala類,主要用於編寫SparkStreaming應用程序,實現一行語句分隔成多個單詞的功能,具體代碼以下(帶註釋):ide
package SparkStreaming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object TransformTest { def main(args: Array[String]): Unit = { //建立SparkConf對象 val sparkconf = new SparkConf().setAppName("TransformTest").setMaster("local[2]") //建立SparkContext對象 val sc = new SparkContext(sparkconf) //設置日誌級別 sc.setLogLevel("WARN") //建立StreamingContext,須要建立兩個參數,分別爲SparkContext和批處理時間間隔 val ssc = new StreamingContext(sc,Seconds(5)) //鏈接socket服務,須要socket服務地址、端口號以及存儲級別(默認的) val dstream:ReceiverInputDStream[String] = ssc.socketTextStream("192.168.169.200",9999) //經過空格分隔 val words:DStream[String] = dstream.transform(line => line.flatMap(_.split(" "))) //打印輸出結果 words.print() //開啓流式計算 ssc.start() //用於保護程序正常運行 ssc.awaitTermination() } }
D、運行程序能夠看出,語句I like spark streaming and Hadoop在5s內被分割成6個單詞,結果以下圖:
函數
updateStateByKey(func) 方法能夠保持任意狀態,同時容許不斷有新的信息進行更新。
下面舉例演示如何使用updateStateByKey(func) 方法進行詞頻統計:
在項目的/src/main/scala目錄下建立包,接着建立一個名爲UpdateStateByKeyTest的scala類,主要用於編寫SparkStreaming應用程序,實現詞頻統計,具體代碼以下:工具
package SparkStreaming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object UpdateStateByKeyTest { def updateFunction(newValues:Seq[Int],runningCount:Option[Int]) : Option[Int] = { val newCount = runningCount.getOrElse(0)+newValues.sum Some(newCount) } def main(args: Array[String]): Unit = { //建立SparkConf對象 val sparkconf = new SparkConf().setAppName("UpdateStateByKeyTest").setMaster("local[2]") //建立SparkContext對象 val sc = new SparkContext(sparkconf) //設置日誌級別 sc.setLogLevel("WARN") //建立StreamingContext,須要建立兩個參數,分別爲SparkContext和批處理時間間隔 val ssc = new StreamingContext(sc,Seconds(5)) //鏈接socket服務,須要socket服務地址、端口號以及存儲級別(默認的) val dstream:ReceiverInputDStream[String] = ssc.socketTextStream("192.168.169.200",9999) //經過逗號分隔第一個字段和第二個字段 val words:DStream[(String,Int)] = dstream.flatMap(_.split(" ")).map(word => (word,1)) //調用updateStateByKey操做 var result:DStream[(String,Int)] = words.updateStateByKey(updateFunction) //若是用到updateStateByKey,此處要加上ssc.checkpoint("目錄")這一句,不然會報錯: // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint(). //爲何要用到checkpoint?由於以前的案例是沒有狀態的,用完以後就丟掉,不須要了, // 可是如今要用到以前的那些數據,要把以前的狀態保留下來 //「.」的意思是當前目錄 ssc.checkpoint(".") //打印輸出結果 result.print() //開啓流式計算 ssc.start() //用於保護程序正常運行 ssc.awaitTermination() } }
而後在9999端口不斷輸入單詞,具體內容以下:oop
運行程序從控制檯輸出的結果看出每隔5s接受一次數據,一共接受了兩次數據,而且每接受一次數據就會進行詞頻統計並輸出結果。
開發工具