Spark Structured Streaming目前的2.1.0版本只支持輸入源:File、kafka和socket。html
Socket方式是最簡單的數據輸入源,如Quick example所示的程序,就是使用的這種方式。用戶只須要指定"socket"形式並配置監聽的IP和Port便可。sql
val scoketDF = spark.readStream apache .format("socket") json .option("host","localhost") bootstrap .option("port", 9999) app .load()socket |
注意:maven
Socket方式Streaming是接收UTF8的text數據,而且這種方式最後只用於測試,不要用戶端到端的項目中。ide
Structured streaming提供接收kafka數據源的接口,用戶使用起來也很是方便,只是須要注意開發環境所依賴的特別庫,同時streaming運行環境的kafka版本。測試
若以kafka做爲輸入源,那麼開發環境須要再引入所依賴的架包。如使用了Spark版本是2.1.0,那麼maven的pom.xml文件中須要添加以下的依賴庫。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0.10_2.11</artifactId> <version>2.1.0</version> </dependency> |
與使用socket做爲輸入源相似,只須要指定"kafka"做爲輸入源,同時傳遞kafka的server集和topic集。以下所示:
// Subscribe to 1 topic val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
// Subscribe to multiple topics val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
// Subscribe to a pattern val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] |
因爲spark 2.1.0使用了kafka的版本是0.10,因此kafka server也要使用一樣版本,即發送數據的kafka也須要使用0.10版本。
不然會出現以下的錯誤:
圖 21
Structured Streaming能夠指定一個目錄的文件做爲數據輸入源,其中支持的文件格式有:text、csv、json、parquet。
以下所示:
object StructuredFile{ def main(args:Array[String]){ val spark = SparkSession .builder .appName("StructuredNetWordCount") .getOrCreate() val userSchema = new StructType().add("name","string").add("age","integer") val jsonDF = spark .readStream .schema(userSchema) .json("/root/jar/directory")//Equivalent to format("json").load("/root/jar/directore") Val query = jsonDF.writeStream .format(console) .start() Query.awaitTermination() } } |
讀取文件的接口有5個:
其中path參數爲文件的路徑,若該路徑發現新增文件,則會被以數據流的形式被獲取。但該路徑只能是指定的格式文件,不能存放其它文件格式。
注意:
如果以Spark集羣方式運行,則路徑是hdfs種的文件路徑;如果以local方式執行,則路徑爲本地路徑。
獲取的文件形式有四種,但並非每種格式都須要調用schema()方法來配置文件信息:
若上述Spark Structured Streaming API提供的數據輸入源不能知足要求,那麼還有一種方法可使用:修改源碼。
以下經過獲取"socket"數據源相應類的內容爲例,介紹具體使用方式:
首先實現一個Provider,該類會返回一個數據的數據源對象。其中Provider實現類須要實現三個方法:
序號 |
方法 |
描述 |
1 |
souceSchema |
該方法返回一個配置信息的詞典,key是字符串,value是StructType對象 |
2 |
createSource |
該方法返回一個接受數據源的對象,其爲Source接口的子類 |
3 |
shortName |
該方法返回一個數據源的標識符,如上述format()方法傳遞的參數:"socket"、"json"或"kafka";此時返回的字符串,就是format()方法傳遞的參數 |
以下所示實現一個TextRabbitMQSourceProvider類:
class TextRabbitMQSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging { private def parseIncludeTimestamp(params: Map[String, String]): Boolean = { Try(params.getOrElse("includeTimestamp", "false").toBoolean) match { case Success(bool) => bool case Failure(_) => throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"") } }
/** Returns the name and schema of the source that can be used to continually read data. */ override def sourceSchema( sqlContext: SQLContext, schema: Option[StructType], providerName: String, parameters: Map[String, String]): (String, StructType) = { logWarning("The socket source should not be used for production applications! " + "It does not support recovery.") if (!parameters.contains("host")) { throw new AnalysisException("Set a host to read from with option(\"host\", ...).") } if (!parameters.contains("port")) { throw new AnalysisException("Set a port to read from with option(\"port\", ...).") } val schema = if (parseIncludeTimestamp(parameters)) { TextSocketSource.SCHEMA_TIMESTAMP } else { TextSocketSource.SCHEMA_REGULAR } ("textSocket", schema) }
override def createSource( sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { val host = parameters("host") val port = parameters("port").toInt new TextRabbitMQSource(host, port, parseIncludeTimestamp(parameters), sqlContext) }
/** String that represents the format that this data source provider uses. */ override def shortName(): String = "RabbitMQ" } |
用戶須要實現一個真正接受數據的類,該類實例是由Provider實現類來實例化,如上述的createSource()方法。其中須要實現Source抽象類的幾個方法,從而讓Structured Streaming引擎可以調用:
序號 |
方法 |
描述 |
1 |
getOffset |
獲取可用的數據偏移量,代表是否有可用的數據 |
2 |
getBatch |
獲取可用的數據,以DataFrame對象形式返回 |
3 |
commit |
傳遞已經接收的數據偏移量 |
4 |
stop |
聽着Source數據源 |
class TextRabbitMQSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) extends Source with Logging {
@GuardedBy("this") private var socket: Socket = null
@GuardedBy("this") private var readThread: Thread = null
/** * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. * Stored in a ListBuffer to facilitate removing committed batches. */ @GuardedBy("this") protected val batches = new ListBuffer[(String, Timestamp)]
@GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1)
@GuardedBy("this") protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
initialize()
private def initialize(): Unit = synchronized { socket = new Socket(host, port) val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) readThread = new Thread(s"TextSocketSource($host, $port)") { setDaemon(true)
override def run(): Unit = { try { while (true) { val line = reader.readLine() if (line == null) { // End of file reached logWarning(s"Stream closed by $host:$port") return } TextSocketSource.this.synchronized { val newData = (line, Timestamp.valueOf( TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) ) currentOffset = currentOffset + 1 batches.append(newData) } } } catch { case e: IOException => } } } readThread.start() }
/** Returns the schema of the data from this source */ override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR
override def getOffset: Option[Offset] = synchronized { if (currentOffset.offset == -1) { None } else { Some(currentOffset) } }
/** Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { val startOrdinal = start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
// Internal buffer only holds the batches after lastOffsetCommitted val rawList = synchronized { val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 batches.slice(sliceStart, sliceEnd) }
import sqlContext.implicits._ val rawBatch = sqlContext.createDataset(rawList)
// Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { rawBatch.toDF("value", "timestamp") } else { // Strip out timestamp rawBatch.select("_1").toDF("value") } }
override def commit(end: Offset): Unit = synchronized { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") )
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
if (offsetDiff < 0) { sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") }
batches.trimStart(offsetDiff) lastOffsetCommitted = newOffset }
/** Stop this source. */ override def stop(): Unit = synchronized { if (socket != null) { try { // Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to // stop the readThread is to close the socket. socket.close() } catch { case e: IOException => } socket = null } }
override def toString: String = s"TextSocketSource[host: $host, port: $port]" } |
因爲Structured Streaming引擎會根據用戶在format()方法傳遞的數據源類型來尋找具體數據源的provider,即在DataSource.lookupDataSource()方法中尋找。因此用戶須要將上述實現的Provider類註冊到Structured Streaming引擎中。因此用戶須要將provider實現類的完整名稱添加到引擎中的某個,這個地方就是在Spark SQL工程中的\spark-2.2.0\sql\core\src\main\resources\META-INF\services\org.apache.spark.sql.sources.DataSourceRegister文件中。用戶經過將Provider實現類名稱添加到該文件中,從而完成Provider類的註冊工做。
以下所示在文件最後一行添加,咱們本身自定義的實現類完整路徑和名稱:
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider org.apache.spark.sql.execution.datasources.json.JsonFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.RateSourceProvider org.apache.spark.sql.execution.streaming.TextRabbitMQSourceProvider |
再Spark SQL源碼從新編譯後,並肩其jar包丟進Spark的jars路徑下。從而用戶就可以像使用Structured Streaming自帶的數據輸入源同樣,使用用戶自定義的"RabbitMQ"數據輸入源了。即用戶只需將RabbitMQ字符串傳遞給format()方法,其使用方式和"socket"方式同樣,由於上述的數據源內容實際上是Socket方式的實現內容。
[1]. Structured Streaming Programming Guide.