Spark Streaming 總結

這篇文章記錄我使用 Spark Streaming 進行 ETL 處理的總結,主要包含如何編程,以及遇到的問題。html

環境

我在公司使用的環境以下:java

  1. Spark: 2.2.0
  2. Kakfa: 0.10.1

這兩個版本算是比較新的。sql

業務

從 Kafka 中讀取數據,用 SQL 處理,寫入 Kafka 中。 程序主要分爲 3大塊:apache

  1. 從 Kafka 中讀取數據。
  2. SQL ETL。
  3. 寫入 Kafka。

編程

從 Kafka 中讀取數據

spark-streaming-kafka-0-10_2.11

最開始使用spark-streaming-kafka-0-10_2.11。雖然這個包是實驗階段,可是考慮到用起來比較方便,就使用了這個包。整個代碼的框架和官方文檔的同樣。編程

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

編程很快,可是後面遇到了不少問題:app

  1. 異常錯誤:WARN TaskSetManager: Lost task 9.0 in stage 1683.0 (TID 9460, 10.62.34.25, executor 9): java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-2017-10-20-1100-streaming-test 1231231 1 13733588428 after polling for 1000。框架

    這個錯誤是 DirectKafkaStream 在 poll 數據的時候,發現沒有數據返回, 代碼以下:ide

    ```scala
     // 從 buffer 獲取數據,若是buffer 中沒有數據,就 poll 數據。
         if (!buffer.hasNext()) { poll(timeout) }
     assert(buffer.hasNext(),
       s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
     var record = buffer.next()
     ...
     ```

    上面的代碼的意思是從 kafka 中 poll 數據,若是 timeout 長時間後尚未獲得數據,就報錯。 而實際咱們的 Kafka 數據每秒鐘有幾千條。 而且 timeout 默認是 1秒,不可能拿不到數據。最後發現 spark-streaming-kafka-0-10_2.11 這個包對應的 kafka-clients 是 0.10.0.1。而這個版本的 kafka-clients 是有 BUG的,因而將 kafka-clients 的版本升級到 0.10.2.1。問題解決了。函數

  2. 測試的時候,發如今中止掉程序後,在重開程序,重複消費一部分數據。 那麼這個問題就是,程序中止的時候沒有正確的提交當前消費的 offset。
    咱們的程序是經過 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) 來提交每一個 RDD 的 offset 的。而這段代碼的背後是將 offsetRanges 保存到了一個隊列中。 等到下次從 kafka 中獲取下一個 batch 的數據後(經過 compute 函數),順便將隊列中的 offset 提交到 KafkaCluster 中。
    代碼以下:測試

    //  保存到 queue 中
    def commitAsync(offsetRanges: Array[OffsetRange], callback:            OffsetCommitCallback): Unit = {
        commitCallback.set(callback)
        commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
      }
    
    
    // 提交 offset , 將 queue 中的 offset 保存到 map 中,並提交
    protected def commitAll(): Unit =  {
        val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
        var osr = commitQueue.poll()
        while (null != osr) {
          val tp = osr.topicPartition
          val x = m.get(tp)
          val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
          m.put(tp, new OffsetAndMetadata(offset))
          osr = commitQueue.poll()
        }
        if (!m.isEmpty) {
          consumer.commitAsync(m, commitCallback.get)
        }
      }
    
    // 每次從 kafka 中獲取數據, 順便提交 上一次的 offset  
    override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
        // 獲取當前的 offset, 若是程序保存了offset就用程序的,若是沒有,就從kafka中讀取。
        // 當程序重啓後,就會從kafka中讀取。
        val untilOffsets = clamp(latestOffsets())
        val offsetRanges = untilOffsets.map { case (tp, uo) =>
          val fo = currentOffsets(tp)
          OffsetRange(tp.topic, tp.partition, fo, uo)
        }
        ...
        // 獲取到了數據,並保存在 rdd 中
        val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
          getPreferredHosts, useConsumerCache)
        ....
        // 更新 offset
        currentOffsets = untilOffsets
        // 重點:提交 queue 中的offset
        commitAll()
        Some(rdd)
    }

    看完這個邏輯,傻眼了。這樣子程序結束,處理完最後一個 batch, 它的 offset 是沒有辦法提交到 cluster 的,結果就是重複消費。若是要本身寫提交 offset 的代碼,那和老版本的就沒有區別了。

考慮了半天,最終仍是用老的包來實現了。

spark-streaming-kafka-0-8

使用老的包,咱們的邏輯以下:

代碼實現以下:

...
    各類參數初始化
    val kafkaCluster = new KafkaCluster(kafkaClusterParams)
    val topicAndPartitionSet = kafkaCluster.getPartitions(consumerTopics.toSet).right.get
    var consumerOffsetsLong = new mutable.HashMap[TopicAndPartition, Long]()

    if (kafkaCluster.getConsumerOffsets(kafkaClusterParams.get("group.id").toString, topicAndPartitionSet).isLeft) {
      val latestOffset = kafkaCluster.getLatestLeaderOffsets(topicAndPartitionSet)
      topicAndPartitionSet.foreach(tp => {
        consumerOffsetsLong.put(tp, latestOffset.right.get(tp).offset)
      })
    } else {
      val consumerOffsetsTemp = kafkaCluster.getConsumerOffsets(kafkaClusterParams.get("group.id").toString, topicAndPartitionSet)
      topicAndPartitionSet.foreach(tp => {
        consumerOffsetsLong.put(tp, consumerOffsetsTemp.right.get(tp))
      })
    }


    val kafkaClusterParamsBroadcast = ssc.sparkContext.broadcast(kafkaClusterParams)


    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
      ssc, kafkaClusterParams, consumerOffsetsLong.toMap, (mmd: MessageAndMetadata[String, String]) => mmd.message() )

    stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      // 處理業務邏輯
      val m = new mutable.HashMap[TopicAndPartition, Long]()
      if (null != offsetRanges) {
        offsetRanges.foreach(
          osr => {
            val tp = osr.topicAndPartition
            m.put(tp, osr.untilOffset)
          }
        )
      }
      kafkaCluster.setConsumerOffsets(kafkaClusterParamsBroadcast.value.get("group.id").toString, m.toMap)

    }

這樣子來處理數據,一切正常。

SQL ETL

SQL ETL 就是使用 Spark SQL 進行處理。若是要對多個同一個 batch 進行屢次處理,最好是 將 bacth cache 起來。

將數據寫入 Kafak 中

這個就是從網上找的了:

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()
  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))
  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

  def close(): Unit = {
    producer.close()
  }
}

object KafkaSink {
  import scala.collection.JavaConversions._
  def apply[K, V](config: Map[String, AnyRef]): KafkaSink[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)
      producer
    }
    new KafkaSink(createProducerFunc)
  }
  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}

使用方式:

// 廣播KafkaSink
    val kafkaSinkBroadcast: Broadcast[KafkaSink[String, String]] = {
      ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaSinkParams))
    }

    val kafkaProducerTopicBroadcast = ssc.sparkContext.broadcast(producerTopic)

    stream.foreachRDD {
        ....
        kafkaSinkBroadcast.value.send(kafkaProducerTopicBroadcast.value, str)
    }

總體上的代碼就是這麼多。

配置

除了代碼,Spark Streaming 仍是須要某些配置的,具體以下:

  1. "spark.executor.cores":"2"。默認的 Yarn 模式下,core 的個數是1個。當 executor 的壓力過大的時候,常常會出現 connect reset by peer 和 心跳超時,因此要看狀況增長 core 的個數。
  2. "spark.driver.extraJavaOptions":"-Dlog4j.configuration=file:log4j.properties" 。Spark 默認的日誌級別就是 INFO, 一般會打印出不少的信息,日誌一夜就上G了,因此最好自定義本身的配置文件。
  3. "spark.executor.extraJavaOptions":"-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps" 。 使用 G1 的垃圾回收方式,並打印出具體的信息,方便在 GC 時間過長的時候進行調優。
  4. "spark.streaming.stopGracefullyOnShutdown":"true"。讓 Streaming 程序在收到 Terminate 信號後,處理完最後一個 batch 再退出。一般中止程序的時候,運行兩次 kill -15 driver_pid 就能夠中止掉程序。
"spark.streaming.backpressure.enabled":"true",
"spark.streaming.backpressure.initialRate":"1000000",
"spark.streaming.kafka.maxRatePerPartition":"20000",

這三個參數用來限制消費 kafka 的速度。避免一次消費太多的數據,將程序搞垮掉。

相關文章
相關標籤/搜索