第15課:Spark Streaming源碼解讀之No Receivers完全思考

本期內容:緩存

1,Direct Access分佈式

2,Kafka函數

 

使用No Receiver有更強的控制度和語義一致性。接下來咱們以Kafka爲例,講解不使用Receiver而直接從Kafka的Broker上讀取數據,這種方式成爲Direct方式。ui

從Spark Streaming-Kafka包中主要有10個類,咱們先講解KafkaRDD這個類,KafkaRDD繼承RDD,其構造器中須要傳入Kafka的配置信息(如Kafka的Broker集合),偏移量信息,每一個Topic的Partition的Leader信息,消息處理函數。this

咱們知道自定義RDD,須要實現RDD的抽象方法,那KafkaRDD是如何實現的呢?spa

**
 * :: DeveloperApi ::
 * Implemented by subclasses to compute a given partition.
 */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]對象

 

/**
 * Implemented by subclasses to return the set of partitions in this RDD. This method will only
 * be called once, so it is safe to implement a time-consuming computation in it.
 */
protected def getPartitions: Array[Partition]繼承

 

getPartitions方法,從傳入的偏移量信息中獲取到每一個Topic的Partition的Leader信息,host和port,而後實例化KafkaRDDPartition對象。內存

Compute方法,若是偏移量from和util相等,則返回,不等則實例化KafkaRDDIterator對象。資源

 

其中KafkaRDDPartition繼承Partition,封裝了Partition的信息,如topic,偏移from和until,Broker的host和port信息。

 

KafkaRDDIterator繼承NextIterator,具備iterator的特性,有getNext和hasNext方法,能夠對數據迭代操做並計算。KafkaRDDIterator內部,以傳入的KafkaParams參數構造了一個和Kafka集合交互的KafkaCluster對象,處理Partition的Leader發生變化時的具體處理辦法,重寫了getNext方法。

 

若是TaskContext中以前沒有失敗過,即attemptNumber爲0,則直接以KafkaRDDPartition中的host和port信息來鏈接Kafka,返回SimpleConsumer(Kafak的簡單消費者,備註Kafka中還有高級消費者的API);若是以前失敗過,則找到該Partition新的Leader Broker信息,而後再進行鏈接。

 

若是迭代器iter爲空或者沒有數據,則調用consumer發送FetchRequestBuilder給Broker,獲取到一批數據。接下來再次判斷iter是否有數據,若是沒有則表示以讀取到指定的untilOffset了。若是iter有數據,也會對offset和untilOffset進行比較,若是當前消費的offset大於untilOffset,則返回。若是當消費的offset小於untilOffset,則更新當前請求的Offset值,並調用messageHandler來處理當前的數據。其中messageHandler就是用戶傳入的對讀取到的數據具體操做的函數。

 

咱們編寫Spark Streaming Kafka Direct應用程序時,會調用KafkaUtils的createDirectStream方法,來建立DirectKafkaInputDStream。內部先構造KafkaCluster對象,並調用getFromOffsets方法獲取到起始offset信息。

 

getFromOffsets方法先從傳入的KafkaParams中獲取auto.offset.reset的值,若是沒有設置,則以最大的偏移值來獲取最新的數據。

 

DirectKafkaInputDStream繼承InputDStream,最大重試次數爲1,來確保語義一致性。

 

DirectKafkaInputDStream的compute方法生成RDD。

 

 

其中untilOffsets的值爲clamp方法。從maxMessagePerPartition中獲取。從配置文件中獲取spark.streaming.kafka.maxRatePerPartition,比較maxRateLimitPerPartition和(limit / numPartitions)的值,並取最小值。其中sescPerBatch爲傳入的Duration的值,獲得每個BatchDuration處理的最大Offset值,當前的偏移量與容許每一個Partition最大的消息處理量和該Partition當前的Offset值,這兩個的最小值,做爲untilOffsets。

 

Spark Streaming能夠根據輸入的數據流量和當前的處理流量進行比較。動態資源分配調整,能夠經過spark.streaming.backpressure.enabled來設置。

 

 

總結:使用Kafka Direct方式沒有緩存,不存在內存溢出的問題,而使用Kafka Receiver有緩存。Receiver和Executor綁定,不能分佈式,而使用Kafka Direct,默認數據就在多個Executor上。採用Receiver方式,數據來不及處理,延遲屢次後Spark Streaming就有可能奔潰。採用Kafka Direct方式,一批數據處理完,再去取一批數據,不會形成Spark Streaming奔潰。Kafka Direct能夠辦到語義一致性,確保數據消費。

相關文章
相關標籤/搜索