Spark Struntured Streaming是Spark 2.1.0版本後新增長的流計算引擎,本博將經過幾篇博文詳細介紹這個框架。這篇是介紹Spark Structured Streaming的基本開發方法。以Spark 自帶的example進行測試和介紹,其爲"StructuredNetworkWordcount.scala"文件。html
因爲咱們是在單機上進行測試,因此須要修單機運行模型,修改後的程序以下:sql
package org.apache.spark.examples.sql.streaming apache
import org.apache.spark.sql.SparkSession 數組
/** app * Counts words in UTF8 encoded, '\n' delimited text received from the network. 框架 * socket * Usage: StructuredNetworkWordCount <hostname> <port> ide * <hostname> and <port> describe the TCP server that Structured Streaming 測試 * would connect to receive data. ui * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example sql.streaming.StructuredNetworkWordCount * localhost 9999` */ object StructuredNetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>") System.exit(1) }
val host = args(0) val port = args(1).toInt
val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .master("local[*]") .getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .load()
// Split the lines into words val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
query.awaitTermination() } } |
對於上述所示的程序,進行以下的解讀和分析:
在建立SparkSessiion對象以後,須要設置數據源的類型,及數據源的配置。而後就會數據源中源源不斷的接收數據,接收到的數據以DataFrame對象存在,該類型與Spark SQL中定義類型同樣,內部由Dataset數組組成。
以下程序所示,設置輸入源的類型爲socket,並配置socket源的IP地址和端口號。接收到的數據流存儲到lines對象中,其類型爲DataFarme。
// Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .load() |
以下程序所示,首先將接受到的數據流lines轉換爲String類型的序列;接着每一批數據都以空格分隔爲獨立的單詞;最後再對每一個單詞進行分組並統計次數。
// Split the lines into words val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count val wordCounts = words.groupBy("value").count() |
經過DataFrame對象的writeStream方法獲取DataStreamWrite對象,DataStreamWrite類定義了一些數據輸出的方式。Quick example程序將數據輸出到控制終端。注意只有在調用start()方法後,纔開始執行Streaming進程,start()方法會返回一個StreamingQuery對象,用戶可使用該對象來管理Streaming進程。如上述程序調用awaitTermination()方法阻塞接收全部數據。
當直接提交編譯後的架包時,即沒有啓動"nc –lk 9999"時,會出現圖 11所示的錯誤。解決該異常,只需在提交(spark-submit)程序以前,先啓動"nc"命令便可解決,且不能使用"nc –lk localhost 9999".
圖 11
當經過mvn打包程序後,在命令行經過spark-submit提交架包,可以正常執行,而經過IDEA執行時會出現圖 12所示的錯誤。
圖 12
出現這個異常,是因爲項目中依賴的Scala版本與Spark編譯的版本不一致,從而致使出現這種錯誤。圖 13和圖 14所示,Spark 2.10是由Scala 2.10版本編譯而成的,而項目依賴的Scala版本是2.11.8,從而致使出現了錯誤。
圖 13
圖 14
解決該問題,只需在項目的pom.xml文件中指定與spark編譯的版本一致,便可解決該問題。如圖 15所示的執行結果。
圖 15