Spark Structured Streaming框架(1)之基本用法

   Spark Struntured Streaming是Spark 2.1.0版本後新增長的流計算引擎,本博將經過幾篇博文詳細介紹這個框架。這篇是介紹Spark Structured Streaming的基本開發方法。以Spark 自帶的example進行測試和介紹,其爲"StructuredNetworkWordcount.scala"文件。html

1. Quick Example

  因爲咱們是在單機上進行測試,因此須要修單機運行模型,修改後的程序以下:sql

package org.apache.spark.examples.sql.streaming apache

 

import org.apache.spark.sql.SparkSession 數組

 

/** app

* Counts words in UTF8 encoded, '\n' delimited text received from the network. 框架

* socket

* Usage: StructuredNetworkWordCount <hostname> <port> ide

* <hostname> and <port> describe the TCP server that Structured Streaming 測試

* would connect to receive data. ui

*

* To run this on your local machine, you need to first run a Netcat server

* `$ nc -lk 9999`

* and then run the example

* `$ bin/run-example sql.streaming.StructuredNetworkWordCount

* localhost 9999`

*/

object StructuredNetworkWordCount {

def main(args: Array[String]) {

if (args.length < 2) {

System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")

System.exit(1)

}

 

val host = args(0)

val port = args(1).toInt

 

val spark = SparkSession

.builder

.appName("StructuredNetworkWordCount")

.master("local[*]")

.getOrCreate()

 

import spark.implicits._

 

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

 

// Split the lines into words

val words = lines.as[String].flatMap(_.split(" "))

 

// Generate running word count

val wordCounts = words.groupBy("value").count()

 

// Start running the query that prints the running counts to the console

val query = wordCounts.writeStream

.outputMode("complete")

.format("console")

.start()

 

query.awaitTermination()

}

}

 

2. 剖析

  對於上述所示的程序,進行以下的解讀和分析:

2.1 數據輸入

  在建立SparkSessiion對象以後,須要設置數據源的類型,及數據源的配置。而後就會數據源中源源不斷的接收數據,接收到的數據以DataFrame對象存在,該類型與Spark SQL中定義類型同樣,內部由Dataset數組組成。

以下程序所示,設置輸入源的類型爲socket,並配置socket源的IP地址和端口號。接收到的數據流存儲到lines對象中,其類型爲DataFarme。

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

 

2.2 單詞統計

  以下程序所示,首先將接受到的數據流lines轉換爲String類型的序列;接着每一批數據都以空格分隔爲獨立的單詞;最後再對每一個單詞進行分組並統計次數。

// Split the lines into words

val words = lines.as[String].flatMap(_.split(" "))

 

// Generate running word count

val wordCounts = words.groupBy("value").count()

 

2.3 數據輸出

經過DataFrame對象的writeStream方法獲取DataStreamWrite對象,DataStreamWrite類定義了一些數據輸出的方式。Quick example程序將數據輸出到控制終端。注意只有在調用start()方法後,纔開始執行Streaming進程,start()方法會返回一個StreamingQuery對象,用戶可使用該對象來管理Streaming進程。如上述程序調用awaitTermination()方法阻塞接收全部數據。

 

3. 異常

3.1 拒絕鏈接

  當直接提交編譯後的架包時,即沒有啓動"nc –lk 9999"時,會出現圖 11所示的錯誤。解決該異常,只需在提交(spark-submit)程序以前,先啓動"nc"命令便可解決,且不能使用"nc –lk localhost 9999".

圖 11

3.2 NoSuchMethodError

  當經過mvn打包程序後,在命令行經過spark-submit提交架包,可以正常執行,而經過IDEA執行時會出現圖 12所示的錯誤。

圖 12

  出現這個異常,是因爲項目中依賴的Scala版本與Spark編譯的版本不一致,從而致使出現這種錯誤。圖 13和圖 14所示,Spark 2.10是由Scala 2.10版本編譯而成的,而項目依賴的Scala版本是2.11.8,從而致使出現了錯誤。

圖 13

 

圖 14

  解決該問題,只需在項目的pom.xml文件中指定與spark編譯的版本一致,便可解決該問題。如圖 15所示的執行結果。

圖 15

4. 參考文獻

相關文章
相關標籤/搜索