結構化流的典型應用是持續的讀取kafka流。實現機制從SparkSession的readStream開始,readStream就是DataStreamReader:socket
def readStream: DataStreamReader = new DataStreamReader(self)ide
下面從DataStreamReader開始。能夠想象獲得,最終確定是生成一個RDD來持續讀取kafka流數據。ui
例子:spa
// Create DataFrame representing the stream of input lines from connection to localhost:9999 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
分兩步:找到TableProvider;找到SupportRead而後生成StreamingRelationV2。code
最後用StreamingRelationV2來調用Dataset.ofRows返回DataFrame,DataFrame就是Dataset[Row]。orm
下面首先要看看TableProvider接口和SupportRead接口是啥東東。繼承
TableProvider接口未找到在哪裏定義。接口
先看看kafkaSourceRDD這個類,這是基礎類,最基礎的來讀取kafka數據的RDD,入參包含一個offsetRange,表示讀取kafka數據的區間範圍。若是是Kafka.lastest則能夠表示永久讀取kafka。get
既然是RDD,那麼最重要的方法就是compute方法了,代碼不解析了很簡單,就是用Kafka的API來讀取kafka分區的數據,造成RDD。kafka
KafkaSource顧名思義就是Kafka的讀取者。
KafkaSource的父類是Source,最重要的方法是:getOffset和getBatch。
getBatch返回DataFrame,那麼getBatch又是怎麼返回DataFrame的呢?看代碼就知道原來是經過建立KafkaSourceRDD來達到生成DataFrame的目的的。因此能夠認爲KafkaSource是KafkaSourceRDD的一種封裝形式罷了。
The provider class for all Kafka readers and writers。這個類是用來生成各類各樣的Kafka的讀取者和寫入者的,比較重要,先看看這個類的定義:
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with TableProvider
with Logging
繼承了不少的特性或接口。好比:StreamSourceProvider、TableProvider、RelationProvider等等。咱們這裏就看看和讀相關的特性吧,和寫相關的不看了(道理差很少)。
(1)createSource
createSource方法返回Source,看代碼其實返回的是KafkaSource,KafkaSource前面已經說過了,這裏就不涉及了。
(2)createRelation
createRelation返回BaseRelation,實際返回的是KafkaRelation。
KafkaRelation繼承BaseRelation,重寫父 類的buildScan方法,buildScan方法返回KafkaSourceRDD做爲RDD[Row]。
(3)KafkaTable
KafkaTable繼承Table而且繼承SupportsRead特性,其定義:
class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite
裏面展轉反側看看如何生成ContinuousStream,主要是方法toContinuousStream,返回的ContinuousStream就是KafkaContinuousStream。
(4)KafkaContinuousStream
KafkaContinuousStream繼承自ContinuousStream,具體的看代碼,最後反正都是調用了Kafka的API來讀取數據,所不一樣的只是外部表現形式的不一樣罷了。