Spark Streaming和Kafka集成深刻淺出

寫在前面api

本文主要介紹Spark Streaming基本概念、kafka集成、Offset管理數據結構

本文主要介紹Spark Streaming基本概念、kafka集成、Offset管理app

1、概述框架

Spark  Streaming顧名思義是spark的流式處理框架,是面向海量數據實現高吞吐量、高可用的分佈式實時計算。關於spark的安裝能夠參考Spark入門。Spark Streaming並不是像Storm那樣是真正的流式計算,二者的處理模型在根本上有很大不一樣:Storm每次處理一條消息,更多詳細信息可參考JStorm基本概念介紹;而spark streaming每次處理的是一個時間窗口的數據流,相似於在一個短暫的時間間隔裏處理一批數據。分佈式

    spark streaming實時接收輸入數據流,並根據時間將數據流分紅連續的多個batch,而後由Spark引擎一次處理一批數據,以批量生成最終結果流,工做流程圖:函數

 

2、Spak Streamingspa

    2.一、Batch  Duration3d

    spark streaming的核心參數,設置流數據被分紅多個batch的時間間隔,每一個spark引擎處理的就是這個時間間隔內的數據。在Spark Streaming中,Job之間有可能存在依賴關係,因此後面的做業必須確保前面的做業執行完後纔會被調度執行。若是批處理時間超過了batch duration,意味着數據處理速率跟不上數據接收速率,那麼會致使後面正常的batch提交的做業沒法按時執行,隨着時間的推移,愈來愈多的做業被延遲執行,最後致使整個Streaming做業被阻塞,因此須要設置一個合理的批處理間隔以確保做業可以在這個批處理間隔內執行完成。orm

    application UI能詳細瞭解到每一個batch的提交時間、數據處理時間、延遲執行時間以及處理的數據條目數。對象

 

    雖然batchDuration的單位能夠達到毫秒級別的,可是經驗告訴咱們,若是這個值太小將會致使因頻繁提交做業從而給整個Streaming帶來負擔,因此請儘可能不要將這個值設置爲小於500ms。若是job執行的很快,而batchDuration設置的過長,依然會在上次提交做業間隔batchDuration後才提交下一個(數據流分隔機制決定的),這樣spark集羣會有大空閒期,集羣資源沒有被充分利用。spark streaming應用程序在首次啓動時一樣會間隔batchDuration才提交job(執行InputDStream.compute方法計算batch的RDD並提交做業)。

    2.二、DStream

    表示一系列時間序列上連續的RDDs,每個RDDs表明必定時間間隔內到達的數據,這樣就把連續的數據流拆成不少小的RDDs數據塊(RDDs數據塊內的數據是連續的數據)。能夠經過實時數據建立DStream,也能夠對現有的DStream進行transformation操做生成,例如map、window、reduceByKeyAndWindow等轉換操做。

    在spark streaming運行期間,每一個DStream都會按期生成一個RDDs,具體的是compute(time) 方法,生成的RDDs表明一個批次內的數據,做爲提交job的輸入元數據:

 

    在對DStream進行操做時,會被Spark Streaming引擎轉化成對底層 RDD操做。

    foreachRDD:是一個轉換輸出操做符,它返回的不是RDD裏的一行數據, 而是輸出DStream後面的RDDs,表示一個批次中的一批數據,一個批次,只有一個RDDs。對於DirectKafkaInputDStream流返回的是KafkaRDD,須要注意的是該操做在運行spark streaming應用程序的driver進程裏執行。

    2.三、InputDStream

InputDStream繼承自DStream,是全部輸入流的基類,表明從源接收到的原始數據流DStreams,每個InputDStream關聯到單個Receiver對象,從源數據接收數據並存儲到spark內存,等待處理。每個InputDStream接收到的是單個數據流數據。InputDStream在driver節點上重新數據生成RDDs;若是爲了實現input stream在work節點上運行recvicer接收外部數據,須要繼承ReceiverInputDStream類。InputDStream的start()、stop()方法,分別用於Spark Streaming系統啓動和中止接收數據時調用。

3、kafka集成

    3.一、DirectKafkaInputDStream

    DirectKafkaInputDStream繼承InputDStream,建立方法:

 

    Subscribe有三個參數:topic列表、consumer配置項、topic+partition起始offset,其中fromOffsets是可選的。

    driver會根據kafkaParams建立KafkaConsumer,用於Spark Streaming肯定batch內的kafka數據(offset)範圍。

    3.二、KafkaRDD

    Spark Streaming每隔一個時間間隔會調用InputDStream.compute方法建立KafkaRDD(在driver上執行),表示這個batch裏接收到的kafka數據,而後在提交做業時做爲stream job的輸入。KafkaRDD extends RDD,實現了compute方法,用於計數一個分區裏的數據、返回KafkaRDDIterator迭代器,迭代器內部next方法調用consumer.get,從kafka拉取數據.   

 

    job運行時調用KafkaRDD.compute方法從kafka讀取數據,也就是實際get操做發生在task中。

    KafkaRDD是一個包括topic、partition、fromeOffset、untilOffset等的數據結構;ConsumerRecord是kafka client的api。

    3.三、offset初始化

    Spark Streaming在啓動時先調用Subscribe.onStart方法,初始化KafkaConsumer,這個Consumer對象是在driver中用於獲取offset。若是fromOffsets不爲空,kafkaConsumer就seek到指定的offset,而後再調用positon獲取offset.

 

    若是fromOffsets是空,即沒有seek,當用consumer.position方法時,返回的offset取決於auto.offset.reset配置:earliest,獲取partition最先的offset;latest獲取partition最近的offset。

    3.四、latestOffset

    spark Streaming的內部邏輯,上一個job的untilOffset成爲下一個job的fromOffset。latestOffset函數計算untilOffset,核心計算思想是先consumer.seekToEnd,而後position函數就能夠取得當前最後offset:

 4、offset管理

 

    enable.auto.commit參數必須設置false,由於在自動commit的狀況下,可能在一個batch內的數據尚未處理完、或者處理失敗,但offset就自動提交了,就會致使數據丟失。下面是在zk中管理offset的思路,zk簡單方便並且保證了可用性。

    在spark Streaming做業開始時,readOffsets函數用於從zk讀取上次應用保存的最後處理的消息偏移量,有如下兩種不一樣處理場景:

    一、Spark Streaming應用程序首次運行時,從zk read不到數據,那麼就建立一個KafkaConsumer對象,用consumer.position的方式獲取offset,這時獲取到的offset取決於auto.offset.reset參數的設置

二、若是是重啓Spark Streaming應用程序,那能夠直接從zk讀取到應用上次保存的offset

 

    在完成kafka DStream處理後,調用persistOffsets方法持久化保存分區的偏移量

 

總體過程僞代碼:

 

    5、反壓

    若是在一個batch內收到的消息比較多,這就須要爲executor分配更多內存,可能會致使其餘spark streaming應用程序資源分配不足,甚至有OOM的風險。特別是第一次啓動應用程序,從earliest offset消費數據時,kafka保留的歷史消息越多,數據處理時間也就越長。反壓能夠限制每一個batch接收到的消息量,下降數據傾斜的風險,開啓反壓:

SparkConf.set("spark.streaming.backpressure.enabled", "true")

設置每一個kafka partition讀取消息的最大速率:

SparkConf.set("spark.streaming.kafka.maxRatePerPartition", "spark.streaming.kafka.maxRatePerPartition")

這個值要結合spark Streaming處理消息的速率和batchDuration,儘可能保證讀取的每一個partition數據在batchDuration時間內處理完,這個參數須要不斷調整,以作到儘量高的吞吐量.

相關文章
相關標籤/搜索