近年來,大數據的計算引擎愈來愈受到關注,spark做爲最受歡迎的大數據計算框架,也在不斷的學習和完善中。在Spark2.x中,新開放了一個基於DataFrame的無下限的流式處理組件——Structured Streaming,它也是本系列的主角,廢話很少說,進入正題吧!sql
在有過1.6的streaming和2.x的streaming開發體驗以後,再來使用Structured Streaming會有一種徹底不一樣的體驗,尤爲是在代碼設計上。apache
在過去使用streaming時,咱們很容易的理解爲一次處理是當前batch的全部數據,只要針對這波數據進行各類處理便可。若是要作一些相似pv uv的統計,那就得藉助有狀態的state的DStream,或者藉助一些分佈式緩存系統,如Redis、Alluxio都能實現。須要關注的就是儘可能快速的處理完當前的batch數據,以及7*24小時的運行便可。緩存
能夠看到想要去作一些相似Group by的操做,Streaming是很是不便的。Structured Streaming則完美的解決了這個問題。app
在Structured Streaming中,把源源不斷到來的數據經過固定的模式「追加」或者「更新」到了上面無下限的DataFrame中。剩餘的工做則跟普通的DataFrame同樣,能夠去map、filter,也能夠去groupby().count()。甚至還能夠把流處理的dataframe跟其餘的「靜態」DataFrame進行join。另外,還提供了基於window時間的流式處理。總之,Structured Streaming提供了快速、可擴展、高可用、高可靠的流式處理。框架
在大數據開發中,Word Count就是基本的演示示例,因此這裏也模仿官網的例子,作一下演示。socket
直接看一下完整的例子:分佈式
package xingoo.sstreaming import org.apache.spark.sql.SparkSession object WordCount { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .master("local") .appName("StructuredNetworkWordCount") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ // 建立DataFrame // Create DataFrame representing the stream of input lines from connection to localhost:9999 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count() // Start running the query that prints the running counts to the console // 三種模式: // 1 complete 全部內容都輸出 // 2 append 新增的行才輸出 // 3 update 更新的行才輸出 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }
效果就是在控制檯輸入nc -lk 9999
,而後輸入一大堆的字符,控制檯就輸出了對應的結果:
而後來詳細看一下代碼:學習
val spark = SparkSession .builder .master("local") .appName("StructuredNetworkWordCount") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._
上面就不用太多解釋了吧,建立一個本地的sparkSession,設置日誌的級別爲WARN,要不控制檯太亂。而後引入spark sql必要的方法(若是沒有import spark.implicits._,基本類型是沒法直接轉化成DataFrame的)。大數據
val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
建立了一個Socket鏈接的DataStream,並經過load()方法獲取當前批次的DataFrame。ui
val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count()
先把DataFrame轉成單列的DataSet,而後經過空格切分每一行,再根據value作groupby,並統計個數。
val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
調用DataFrame的writeStream方法,轉換成輸出流,設置模式爲"complete",指定輸出對象爲控制檯"console",而後調用start()方法啓動計算。並返回queryStreaming,進行控制。
這裏的outputmode和format都會後續詳細介紹。
query.awaitTermination()
經過QueryStreaming的對象,調用awaitTermination阻塞主線程。程序就能夠不斷循環調用了。
觀察一下Spark UI,能夠發現程序穩定的在運行~
這就是一個最基本的wordcount的例子,想象一下,若是沒有Structured Streaming,想要統計全局的wordcount,仍是很費勁的(即使使用streaming的state,其實也不是那麼好用的)。