SparkStreaming 中用 Direct 方式拉取越來超慢

咱們知道 SparkStreaming 用 Direct 的方式拉取 Kafka 數據時,是根據 kafka 中的 fromOffsets 和 untilOffsets 來進行獲取數據的,而 fromOffsets 通常都是須要咱們本身管理的,而每批次的 untilOffsets 是由 Driver 程序自動幫咱們算出來的。 
因而產生了一個疑問:untilOffsets 是怎麼算出來的? 
接下來就經過查看源碼的方式來找出答案~算法

首先咱們寫一個最簡單的 wordcount 程序,代碼以下:異步

/**
  * Created by Lin_wj1995 on 2018/4/19.
  * 來源:https://blog.csdn.net/Lin_wj1995
  */
object DirectKafkaWordCount {
  def main(args: Array[String]) {
    val Array(brokers, topics) = args
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))ide

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)工具

    //拿到數據
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()大數據

    // 啓動
    ssc.start()
    ssc.awaitTermination()
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
咱們能夠看出, createDirectStream 是得到數據的關鍵方法的,咱們點擊進去ui

  def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    //kafka cluster 鏈接對象
    val kc = new KafkaCluster(kafkaParams)
    //讀取數據的開始位置
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
    //該方法返回了一個DirectKafkaInputDStream的對象
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ok,重點來了,點擊 DirectKafkaInputDStream ,看一下該類內部是如何的,因爲該類內部的方法都是重點,全部我把該類重點的屬性和方法有選擇性的貼出來: 
建議從下往上讀!~this

private[streaming]
class DirectKafkaInputDStream[
  K: ClassTag,
  V: ClassTag,
  U <: Decoder[K]: ClassTag,
  T <: Decoder[V]: ClassTag,
  R: ClassTag](
    ssc_ : StreamingContext,
    val kafkaParams: Map[String, String],
    val fromOffsets: Map[TopicAndPartition, Long],
    messageHandler: MessageAndMetadata[K, V] => R
  ) extends InputDStream[R](ssc_) with Logging {
  /**
    * 爲了拿到每一個分區leader上的最新偏移量(默認值爲1),Driver發出請求的最大的連續重試次數
    * 默認值爲1,也就是說最多請求 2 次
    */
  val maxRetries = context.sparkContext.getConf.getInt(
    "spark.streaming.kafka.maxRetries", 1)spa

  /**
    * 經過 receiver tracker 異步地維持和發送新的 rate limits 給 receiver
    * 注意:若是參數 spark.streaming.backpressure.enabled 沒有設置,那麼返回爲None
   */
  override protected[streaming] val rateController: Option[RateController] = {
    /**
      * isBackPressureEnabled方法對應着「spark.streaming.backpressure.enabled」參數
      * 參數說明:簡單來說就是自動推測程序的執行狀況並控制接收數據的條數,爲了防止處理數據的時間大於批次時間而致使的數據堆積
      *           默認是沒有開啓的
      */
    if (RateController.isBackPressureEnabled(ssc.conf)) {
      Some(new DirectKafkaRateController(id,
        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
    } else {
      None
    }
  }.net

  //拿到與Kafka集羣的鏈接
  protected val kc = new KafkaCluster(kafkaParams)線程

  //每一個partition每次最多獲取多少條數據,默認是0
  private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
      "spark.streaming.kafka.maxRatePerPartition", 0)

  /**
    * 真實算出每一個partition獲取數據的最大條數
    */
  protected def maxMessagesPerPartition: Option[Long] = {
    val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) //每批都根據rateContoller預估獲取多少條數據
    val numPartitions = currentOffsets.keys.size

    val effectiveRateLimitPerPartition = estimatedRateLimit
      .filter(_ > 0)
      .map { limit =>
        if (maxRateLimitPerPartition > 0) {
          /*
          若是 spark.streaming.kafka.maxRatePerPartition 該參數有設置值且大於0
          那麼就取 maxRateLimitPerPartition 和 rateController 算出來的值 之間的最小值(爲何取最小值,由於這樣是最保險的)
           */
          Math.min(maxRateLimitPerPartition, (limit / numPartitions))
        } else {
          /*
          若是 spark.streaming.kafka.maxRatePerPartition 該參數沒有設置
          那麼就直接用 rateController 算出來的值
           */
          limit / numPartitions
        }
      }.getOrElse(maxRateLimitPerPartition) //若是沒有設置自動推測的話,則返回參數設定的接收速率

    if (effectiveRateLimitPerPartition > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
      Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
    } else {
      /*
      若是沒有設置 spark.streaming.kafka.maxRatePerPartition 參數,則返回None
       */
      None
    }
  }

  //拿到每批的起始 offset
  protected var currentOffsets = fromOffsets

  /**
    * 獲取此時此刻topic中每一個partition 最大的(最新的)offset
    */
  @tailrec
  protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
    val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
    // Either.fold would confuse @tailrec, do it manually
    if (o.isLeft) {
      val err = o.left.get.toString
      if (retries <= 0) {
        throw new SparkException(err)
      } else {
        log.error(err)
        Thread.sleep(kc.config.refreshLeaderBackoffMs)
        latestLeaderOffsets(retries - 1)//若是獲取失敗,則重試,且重試次數 -1
      }
    } else {
      o.right.get //若是沒有問題,則拿到最新的 offset
    }
  }

  // limits the maximum number of messages per partition
  /**
    * ★★★★★重要方法,答案就在這裏
    * @param leaderOffsets 該參數的offset是當前最新的offset
    * @return 包含untilOffsets的信息
    */
  protected def clamp(
    leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
    maxMessagesPerPartition.map { mmp =>
      leaderOffsets.map { case (tp, lo) =>
        /**
          * 若是有設定自動推測,那麼就將值設定爲: min(自動推測出來的offset,此時此刻最新的offset)
          */
        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
      }
    }.getOrElse(leaderOffsets) //若是沒有設定自動推測,那麼untilOffsets的值就是最新的offset
  }

  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    //====》★★★★★從這裏做爲入口盡心查看
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    //根據offset去拉取數據,完!
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

  。。。
--------------

本期內容:
1. 動態Batch Size深刻
2. RateController解析

1. 動態Batch Size深刻

Dynamic Batch Size的方法實際在Spark Streaming中還沒實現。論文中的解決方案:Fixed-point Iteration。
論文中有個比較重要的圖:


基本思想:按100ms的批次發數據給Controller,Controller起初直接轉給JobGenerator,再給Job Processor處理。Job Generator不是僅給出 處理結果,還要把job統計結果發給Controller,Controller接收到統計結果,會動態的改變batch size來給Job發數據。

至於窗口操做,也要作一些調整。如圖:


試驗代表,對Filter、Reduce、Join、Window仍是有好的效果。
忽然有其它做業加入時,也能動態調整。圖例:


但算法是否會複雜,消耗時間。

2. RateController解析

Spark Streaming提供了RateController。ReceiverRateController、DirectKafkaRateController是其子類。
若是消費數據的速度的設置值有改變,會在batch中最後的Job完成時,會觸發速率調整。
速率調整的主流程圖:


流程較長,暫剖析最後的ReceriverSuperImpl.registerBlockGenerator和中間的ReceiverInputDStream.rateController的相關代碼。

ReceiverSupervisorImpl:

  private val endpoint = env.rpcEnv.setupEndpoint(
    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
      override val rpcEnv: RpcEnv = env.rpcEnv

      override def receive: PartialFunction[Any, Unit] = {
        case StopReceiver =>
          logInfo("Received stop signal")
          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
        case CleanupOldBlocks(threshTime) =>
          logDebug("Received delete old batch signal")
          cleanupOldBlocks(threshTime)
        case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
          registeredBlockGenerators.foreach { bg =>
            bg.updateRate(eps)
          }
      }
    })

bg是Spark Streaming中的RateLimiter子類。RateLimiter中有個成員rateLimiter,類型是Google Guava的限流工具類RateLimiter。
Google Guava的RateLimiter從概念上來說,速率限制器會在可配置的速率下分配許可證。若是必要的話,每一個acquire()會阻塞當前線程直到許可證可用後獲取該許可證。一旦獲取到許可證,不須要再釋放許可證。
代碼經過RateLimiter來更改速率。RateLimiter.updateRate:

  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }

若是maxRateLimit也有值(即設置了spark.streaming.receiver.maxRate值),則取newRate和maxRateLimit中間的最小值。
spark.streaming.receiver.maxRate控制了最大的接收速率。但有浪費資源的可能。配置最大速率不是太好的事情。

回到流程圖中間的ReceiverInputDStream.rateController。
ReceiverInputDStream.rateController:

  override protected[streaming] val rateController: Option[RateController] = {
    if (RateController.isBackPressureEnabled(ssc.conf)) {
      Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration)))
    } else {
      None
    }
  }

其中的RateController.isBackPressureEnabled得到是否容許反壓機制。
RateController.isBackPressureEnabled:

object RateController {
  def isBackPressureEnabled(conf: SparkConf): Boolean =
    conf.getBoolean("spark.streaming.backpressure.enabled", false)
}

若是容許反壓機制,那麼InputDStream子類中的成員rateController被賦予新生成的RateController子類ReceiverRateController對象。不然爲None。

生成ReceiverRateController對象時會用調用RateEstimator.create。
RateEstimator.create:
  /**
   * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
   *
   * The only known estimator right now is `pid`.
   *
   * @return An instance of RateEstimator
   * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
   *         known estimators.
   */
  def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
    conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
      case "pid" =>
        val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
        val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
        val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
        val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
        new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)

      case estimator =>
        throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
    }
目前spark.streaming.backpressure.rateEstimator配置只能是pid。另外還有4個反壓的可配置項。
RateEstimator用於評估InputDStream消費數據的能力。根據消費數據的能力來調整接收數據的速率。RateEstimator.create給出了反壓(back pressure)機制。這要比簡單限制接收速率要好一些。

接着看其中生成的ReceiverRateController。ReceiverRateController是RateController子類。
繼承關係:ReceiverRateController => RateController => StreamingListener => AsynchronousListenerBus => ListenerBus
若是容許反壓機制,ReceiverInputDStream的rateController就不爲None,才保證了上面流程圖中RateController就能處理接收的消息,從而最終調整速率。

簡單介紹一下BlockGenerator中的waitToPush方法。
BlockGenerator是RateLimiter子類。BlockGenerator利用waitToPush方法來限制receiver消費數據的速率。
BlockGenarator在生成Block時,BlockGenarator的加數據的方法addData、addDataWithCallback、addMultipleDataWithCallback中都調用了waitToPush。
有必要之後對waitToPush再作剖析。


注:Google Guava的限流工具類RateLimiter
  RateLimiter從概念上來說,速率限制器會在可配置的速率下分配許可證。若是必要的話,每一個acquire() 會阻塞當前線程直到許可證可用後獲取該許可證。一旦獲取到許可證,不須要再釋放許可證。
  RateLimiter使用的是一種叫令牌桶的流控算法,RateLimiter會按照必定的頻率往桶裏扔令牌,線程拿到令牌才能執行,好比你但願本身的應用程序QPS不要超過1000,那麼RateLimiter設置1000的速率後,就會每秒往桶裏扔1000個令牌。
修飾符和類型    方法和描述
double    acquire()
從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求
double    acquire(int permits)
從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求
static RateLimiter    create(double permitsPerSecond)
根據指定的穩定吞吐率建立RateLimiter,這裏的吞吐率是指每秒多少量可數(一般是指QPS,每秒多少查詢)
static RateLimiter    create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根據指定的穩定吞吐率和預熱期來建立RateLimiter,這裏的吞吐率是指每秒多少量可數(一般是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增加直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和)
double    getRate()
返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少量可數
void    setRate(double permitsPerSecond)
更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。
String    toString()
返回對象的字符表現形式
boolean    tryAcquire()
從RateLimiter 獲取許可,若是該許可能夠在無延遲下的狀況下當即獲取獲得的話
boolean    tryAcquire(int permits)
從RateLimiter 獲取許可數,若是該許可數能夠在無延遲下的狀況下當即獲取獲得的話
boolean    tryAcquire(int permits, long timeout, TimeUnit unit)
從RateLimiter 獲取指定許可數若是該許可數能夠在不超過timeout的時間內獲取獲得的話,或者若是沒法在timeout 過時以前獲取獲得許可數的話,那麼當即返回false (無需等待)
boolean    tryAcquire(long timeout, TimeUnit unit)
從RateLimiter 獲取許可若是該許可能夠在不超過timeout的時間內獲取獲得的話,或者若是沒法在timeout 過時以前獲取獲得許可的話,那麼當即返回false(無需等待)

--------------------- 

使用SparkStreaming集成kafka時有幾個比較重要的參數:

(1)spark.streaming.stopGracefullyOnShutdown (true / false)默認fasle

確保在kill任務時,可以處理完最後一批數據,再關閉程序,不會發生強制kill致使數據處理中斷,沒處理完的數據丟失

(2)spark.streaming.backpressure.enabled (true / false) 默認false

開啓後spark自動根據系統負載選擇最優消費速率

(3)spark.streaming.backpressure.initialRate (整數) 默認直接讀取全部

在(2)開啓的狀況下,限制第一次批處理應該消費的數據,由於程序冷啓動 隊列裏面有大量積壓,防止第一次所有讀取,形成系統阻塞

(4)spark.streaming.kafka.maxRatePerPartition (整數) 默認直接讀取全部

限制每秒每一個消費線程讀取每一個kafka分區最大的數據量

注意:

只有(4)激活的時候,每次消費的最大數據量,就是設置的數據量,若是不足這個數,就有多少讀多少,若是超過這個數字,就讀取這個數字的設置的值

只有(2)+(4)激活的時候,每次消費讀取的數量最大會等於(4)設置的值,最小是spark根據系統負載自動推斷的值,消費的數據量會在這兩個範圍以內變化根據系統狀況,但第一次啓動會有多少讀多少數據。此後按(2)+(4)設置規則運行

(2)+(3)+(4)同時激活的時候,跟上一個消費狀況基本同樣,但第一次消費會獲得限制,由於咱們設置第一次消費的頻率了。

除此以外,還應該考慮程序容錯性,這個跟checkpoint有關係散仙在前面的文章已經描述過具體請參考:http://qindongliang.iteye.com/


--------------------- 

其它參考;https://blog.csdn.net/u012684933/article/details/48656629

https://blog.csdn.net/yjh314/article/details/52918072

相關文章
相關標籤/搜索