結構化Kafka sql的代碼框架

結構化流的典型應用是持續的讀取kafka流。實現機制從SparkSession的readStream開始,readStream就是DataStreamReader:socket

def readStream: DataStreamReader = new DataStreamReader(self)ide

下面從DataStreamReader開始。能夠想象獲得,最終確定是生成一個RDD來持續讀取kafka流數據。ui

例子:spa

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

分兩步:找到TableProvider;找到SupportRead而後生成StreamingRelationV2。code

最後用StreamingRelationV2來調用Dataset.ofRows返回DataFrame,DataFrame就是Dataset[Row]。orm

下面首先要看看TableProvider接口和SupportRead接口是啥東東。繼承

TableProvider

TableProvider接口未找到在哪裏定義。接口

 

KafkaSourceRDD

先看看kafkaSourceRDD這個類,這是基礎類,最基礎的來讀取kafka數據的RDD,入參包含一個offsetRange,表示讀取kafka數據的區間範圍。若是是Kafka.lastest則能夠表示永久讀取kafka。get

既然是RDD,那麼最重要的方法就是compute方法了,代碼不解析了很簡單,就是用Kafka的API來讀取kafka分區的數據,造成RDD。kafka

 

KafkaSource

KafkaSource顧名思義就是Kafka的讀取者。

KafkaSource的父類是Source,最重要的方法是:getOffset和getBatch。

getBatch返回DataFrame,那麼getBatch又是怎麼返回DataFrame的呢?看代碼就知道原來是經過建立KafkaSourceRDD來達到生成DataFrame的目的的。因此能夠認爲KafkaSource是KafkaSourceRDD的一種封裝形式罷了。

 

KafkaSourceProvider

The provider class for all Kafka readers and writers。這個類是用來生成各類各樣的Kafka的讀取者和寫入者的,比較重要,先看看這個類的定義:

private[kafka010] class KafkaSourceProvider extends DataSourceRegister

    with StreamSourceProvider

    with StreamSinkProvider

    with RelationProvider

    with CreatableRelationProvider

    with TableProvider

    with Logging 

 

繼承了不少的特性或接口。好比:StreamSourceProvider、TableProvider、RelationProvider等等。咱們這裏就看看和讀相關的特性吧,和寫相關的不看了(道理差很少)。

(1)createSource

createSource方法返回Source,看代碼其實返回的是KafkaSource,KafkaSource前面已經說過了,這裏就不涉及了。

(2)createRelation

createRelation返回BaseRelation,實際返回的是KafkaRelation。

KafkaRelation繼承BaseRelation,重寫父 類的buildScan方法,buildScan方法返回KafkaSourceRDD做爲RDD[Row]。

(3)KafkaTable

KafkaTable繼承Table而且繼承SupportsRead特性,其定義:

class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite 

 

裏面展轉反側看看如何生成ContinuousStream,主要是方法toContinuousStream,返回的ContinuousStream就是KafkaContinuousStream。

(4)KafkaContinuousStream

KafkaContinuousStream繼承自ContinuousStream,具體的看代碼,最後反正都是調用了Kafka的API來讀取數據,所不一樣的只是外部表現形式的不一樣罷了。

相關文章
相關標籤/搜索