Spark Structured Streaming框架(2)之數據輸入源詳解

  Spark Structured Streaming目前的2.1.0版本只支持輸入源:File、kafka和socket。html

1. Socket

  Socket方式是最簡單的數據輸入源,如Quick example所示的程序,就是使用的這種方式。用戶只須要指定"socket"形式並配置監聽的IP和Port便可。sql

val scoketDF = spark.readStream apache

.format("socket") json

.option("host","localhost") bootstrap

.option("port", 9999) app

.load()socket

 

注意:maven

    Socket方式Streaming是接收UTF8的text數據,而且這種方式最後只用於測試,不要用戶端到端的項目中。ide

2. Kafka

  Structured streaming提供接收kafka數據源的接口,用戶使用起來也很是方便,只是須要注意開發環境所依賴的特別庫,同時streaming運行環境的kafka版本。測試

2.1 開發環境

  若以kafka做爲輸入源,那麼開發環境須要再引入所依賴的架包。如使用了Spark版本是2.1.0,那麼maven的pom.xml文件中須要添加以下的依賴庫。

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql-kafka-0.10_2.11</artifactId>

<version>2.1.0</version>

</dependency>

2.2 API

  與使用socket做爲輸入源相似,只須要指定"kafka"做爲輸入源,同時傳遞kafka的server集和topic集。以下所示:

// Subscribe to 1 topic

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribe", "topic1")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

 

// Subscribe to multiple topics

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribe", "topic1,topic2")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

 

// Subscribe to a pattern

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribePattern", "topic.*")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

 

2.3 運行環境

  因爲spark 2.1.0使用了kafka的版本是0.10,因此kafka server也要使用一樣版本,即發送數據的kafka也須要使用0.10版本。

不然會出現以下的錯誤:

圖 21

3. File

  Structured Streaming能夠指定一個目錄的文件做爲數據輸入源,其中支持的文件格式有:text、csv、json、parquet。

以下所示:

object StructuredFile{

def main(args:Array[String]){

val spark = SparkSession

.builder

.appName("StructuredNetWordCount")

.getOrCreate()

val userSchema = new StructType().add("name","string").add("age","integer")

val jsonDF = spark

.readStream

.schema(userSchema)

.json("/root/jar/directory")//Equivalent to format("json").load("/root/jar/directore")

Val query = jsonDF.writeStream

.format(console)

.start()

Query.awaitTermination()

}

}

 

 1) DataStreamReader接口

  讀取文件的接口有5個:

  • format(source).load(path):source參數是指文件的形式,有textcsvjsonparquet四種形式;
  • text(path):其封裝了format("text").load(path);
  • json(path):其封裝了format("json").load(path);
  • csv(path):其封裝了format("csv").load(path);
  • parquet(path):其封裝了format("parquet").load(path);

  其中path參數爲文件的路徑,若該路徑發現新增文件,則會被以數據流的形式被獲取。但該路徑只能是指定的格式文件,不能存放其它文件格式。

注意:

    如果以Spark集羣方式運行,則路徑是hdfs種的文件路徑;如果以local方式執行,則路徑爲本地路徑。

 2) schema()方法

  獲取的文件形式有四種,但並非每種格式都須要調用schema()方法來配置文件信息:

  • csvjsonparquet:用戶須要經過schema()方法手動配置文件信息;
  • text:不須要用戶指定schema,其返回的列是隻有一個"value"。

4) 自定義

  若上述Spark Structured Streaming API提供的數據輸入源不能知足要求,那麼還有一種方法可使用:修改源碼。

以下經過獲取"socket"數據源相應類的內容爲例,介紹具體使用方式:

4.1 實現Provider

  首先實現一個Provider,該類會返回一個數據的數據源對象。其中Provider實現類須要實現三個方法:

序號

方法

描述

1

souceSchema

該方法返回一個配置信息的詞典,key是字符串,valueStructType對象

2

createSource

該方法返回一個接受數據源的對象,其爲Source接口的子類

3

shortName

該方法返回一個數據源的標識符,如上述format()方法傳遞的參數:"socket"、"json"或"kafka";此時返回的字符串,就是format()方法傳遞的參數

 

  以下所示實現一個TextRabbitMQSourceProvider類:

class TextRabbitMQSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {

private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {

Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {

case Success(bool) => bool

case Failure(_) =>

throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"")

}

}

 

/** Returns the name and schema of the source that can be used to continually read data. */

override def sourceSchema(

sqlContext: SQLContext,

schema: Option[StructType],

providerName: String,

parameters: Map[String, String]): (String, StructType) = {

logWarning("The socket source should not be used for production applications! " +

"It does not support recovery.")

if (!parameters.contains("host")) {

throw new AnalysisException("Set a host to read from with option(\"host\", ...).")

}

if (!parameters.contains("port")) {

throw new AnalysisException("Set a port to read from with option(\"port\", ...).")

}

val schema =

if (parseIncludeTimestamp(parameters)) {

TextSocketSource.SCHEMA_TIMESTAMP

} else {

TextSocketSource.SCHEMA_REGULAR

}

("textSocket", schema)

}

 

override def createSource(

sqlContext: SQLContext,

metadataPath: String,

schema: Option[StructType],

providerName: String,

parameters: Map[String, String]): Source = {

val host = parameters("host")

val port = parameters("port").toInt

new TextRabbitMQSource(host, port, parseIncludeTimestamp(parameters), sqlContext)

}

 

/** String that represents the format that this data source provider uses. */

override def shortName(): String = "RabbitMQ"

}

 

4.2 實現Source

  用戶須要實現一個真正接受數據的類,該類實例是由Provider實現類來實例化,如上述的createSource()方法。其中須要實現Source抽象類的幾個方法,從而讓Structured Streaming引擎可以調用:

序號

方法

描述

1

getOffset

獲取可用的數據偏移量,代表是否有可用的數據

2

getBatch

獲取可用的數據,以DataFrame對象形式返回

3

commit

傳遞已經接收的數據偏移量

4

stop

聽着Source數據源

 

class TextRabbitMQSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)

extends Source with Logging {

 

@GuardedBy("this")

private var socket: Socket = null

 

@GuardedBy("this")

private var readThread: Thread = null

 

/**

* All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.

* Stored in a ListBuffer to facilitate removing committed batches.

*/

@GuardedBy("this")

protected val batches = new ListBuffer[(String, Timestamp)]

 

@GuardedBy("this")

protected var currentOffset: LongOffset = new LongOffset(-1)

 

@GuardedBy("this")

protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)

 

initialize()

 

private def initialize(): Unit = synchronized {

socket = new Socket(host, port)

val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))

readThread = new Thread(s"TextSocketSource($host, $port)") {

setDaemon(true)

 

override def run(): Unit = {

try {

while (true) {

val line = reader.readLine()

if (line == null) {

// End of file reached

logWarning(s"Stream closed by $host:$port")

return

}

TextSocketSource.this.synchronized {

val newData = (line,

Timestamp.valueOf(

TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))

)

currentOffset = currentOffset + 1

batches.append(newData)

}

}

} catch {

case e: IOException =>

}

}

}

readThread.start()

}

 

/** Returns the schema of the data from this source */

override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP

else TextSocketSource.SCHEMA_REGULAR

 

override def getOffset: Option[Offset] = synchronized {

if (currentOffset.offset == -1) {

None

} else {

Some(currentOffset)

}

}

 

/** Returns the data that is between the offsets (`start`, `end`]. */

override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {

val startOrdinal =

start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1

val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1

 

// Internal buffer only holds the batches after lastOffsetCommitted

val rawList = synchronized {

val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1

val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1

batches.slice(sliceStart, sliceEnd)

}

 

import sqlContext.implicits._

val rawBatch = sqlContext.createDataset(rawList)

 

// Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp

// if requested.

if (includeTimestamp) {

rawBatch.toDF("value", "timestamp")

} else {

// Strip out timestamp

rawBatch.select("_1").toDF("value")

}

}

 

override def commit(end: Offset): Unit = synchronized {

val newOffset = LongOffset.convert(end).getOrElse(

sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +

s"originate with an instance of this class")

)

 

val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

 

if (offsetDiff < 0) {

sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")

}

 

batches.trimStart(offsetDiff)

lastOffsetCommitted = newOffset

}

 

/** Stop this source. */

override def stop(): Unit = synchronized {

if (socket != null) {

try {

// Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to

// stop the readThread is to close the socket.

socket.close()

} catch {

case e: IOException =>

}

socket = null

}

}

 

override def toString: String = s"TextSocketSource[host: $host, port: $port]"

}

 

4.3 註冊Provider

  因爲Structured Streaming引擎會根據用戶在format()方法傳遞的數據源類型來尋找具體數據源的provider,即在DataSource.lookupDataSource()方法中尋找。因此用戶須要將上述實現的Provider類註冊到Structured Streaming引擎中。因此用戶須要將provider實現類的完整名稱添加到引擎中的某個,這個地方就是在Spark SQL工程中的\spark-2.2.0\sql\core\src\main\resources\META-INF\services\org.apache.spark.sql.sources.DataSourceRegister文件中。用戶經過將Provider實現類名稱添加到該文件中,從而完成Provider類的註冊工做。

以下所示在文件最後一行添加,咱們本身自定義的實現類完整路徑和名稱:

org.apache.spark.sql.execution.datasources.csv.CSVFileFormat

org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider

org.apache.spark.sql.execution.datasources.json.JsonFileFormat

org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

org.apache.spark.sql.execution.datasources.text.TextFileFormat

org.apache.spark.sql.execution.streaming.ConsoleSinkProvider

org.apache.spark.sql.execution.streaming.TextSocketSourceProvider

org.apache.spark.sql.execution.streaming.RateSourceProvider

org.apache.spark.sql.execution.streaming.TextRabbitMQSourceProvider

 

4.4 使用API

  再Spark SQL源碼從新編譯後,並肩其jar包丟進Spark的jars路徑下。從而用戶就可以像使用Structured Streaming自帶的數據輸入源同樣,使用用戶自定義的"RabbitMQ"數據輸入源了。即用戶只需將RabbitMQ字符串傳遞給format()方法,其使用方式和"socket"方式同樣,由於上述的數據源內容實際上是Socket方式的實現內容。

5. 參考文獻

[1]. Structured Streaming Programming Guide.

相關文章
相關標籤/搜索