Spark鏈接Kafka

10.1    示例代碼
object DirectKafkaWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println(s"""
            |Usage: DirectKafkaWordCount <brokers> <topics>
            |  <brokers> is a list of one or more Kafka brokers
            |  <topics> is a list of one or more kafka topics to consume from
            |
            """.stripMargin)
          System.exit(1)
        }
    
        StreamingExamples.setStreamingLogLevels()
    
        val Array(brokers, topics) = args
    
        // Create context with 2 second batch interval
        val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
    
        // Create direct kafka stream with brokers and topics
        val topicsSet = topics.split(",").toSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
    
        // Get the lines, split them into words, count the words and print
        val lines = messages.map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
        wordCounts.print()
    
        // Start the computation
        ssc.start()
        ssc.awaitTermination()
      }
    }apache

10.2    生產環境維護offset
配置參數(Kryo序列化高效):
1.      sparkConf.set("spark.akka.frameSize", "2047")  
2.          sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//一些默認的類使用kryo序列化  
3.          sparkConf.set("spark.kryoserializer.buffer.max.mb", "2040")  
4.          sparkConf.set("spark.files.overwrite","true")  
5.          sparkConf.set("spark.hadoop.validateOutputSpecs", "false")  
6.          sparkConf.set("spark.eventLog.overwrite", "true")  
7.          sparkConf.set("spark.streaming.kafka.maxRatePerPartition","30") //每秒鐘最大消費,而kafka拉的數據爲topic對應partition的數量乘以設置的數  bootstrap


package com.suning.mep.utilsoop

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}spa

import scala.reflect.ClassTagscala

class KafkaClient(val kafkaParams: Map[String, String]) extends Serializable {
  private val kc = new KafkaCluster(kafkaParams)code

  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](ssc: StreamingContext, topic: String): InputDStream[(K, V)] = {
    val partitionsEither = kc.getPartitions(Set(topic))
    if (partitionsEither.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsEither.left.get}")server

    val partitions = partitionsEither.right.get
    val groupId = kafkaParams.get("group.id").getip

    val offsets = setOrUpdateOffsets(partitions, groupId)
    KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, offsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
  }hadoop

  private def setOrUpdateOffsets(partitions: Set[TopicAndPartition], groupId: String): Map[TopicAndPartition, Long] = {
    val consumerOffsetEither = kc.getConsumerOffsets(groupId, partitions)
    if (consumerOffsetEither.isLeft) {
      val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)get

      var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
      if (reset == Some("smallest")) {
        val leaderOffsetsEither = kc.getEarliestLeaderOffsets(partitions)
        if (leaderOffsetsEither.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsEither.left.get}")
        leaderOffsets = leaderOffsetsEither.right.get
      } else {
        val leaderOffsetsEither = kc.getLatestLeaderOffsets(partitions)
        if (leaderOffsetsEither.isLeft) throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsEither.left.get}")
        leaderOffsets = leaderOffsetsEither.right.get
      }

      val offsets = leaderOffsets.map {
        case (tp, offset) => (tp, offset.offset)
      }

      kc.setConsumerOffsets(groupId, offsets)
      offsets
    } else {
      val earliestLeaderOffsetsEither = kc.getEarliestLeaderOffsets(partitions)
      if (earliestLeaderOffsetsEither.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsEither.left.get}")

      val earliestLeaderOffsets = earliestLeaderOffsetsEither.right.get
      val consumerOffsets = consumerOffsetEither.right.get

      var offsets: Map[TopicAndPartition, Long] = Map()
      consumerOffsets.foreach({ case (tp, n) =>
        val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
        if (n < earliestLeaderOffset) {
          offsets += (tp -> earliestLeaderOffset)
        } else {
          offsets += (tp -> n)
        }
      })

      if (!offsets.isEmpty) {
        kc.setConsumerOffsets(groupId, offsets)
      }
      offsets
    }

      /**
        * 若是streaming程序執行的時候出現kafka.common.OffsetOutOfRangeException,
        * 說明zk上保存的offsets已通過時了,即kafka的定時清理策略已經將包含該offsets的文件刪除。
        * 針對這種狀況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
        * 若是consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過期,
        * 這時把consumerOffsets更新爲earliestLeaderOffsets
        */
  }

  def updateOffsets(rdd: RDD[(String, String)]): Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    for (offsets <- offsetsList) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
      if (o.isLeft) {
        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }
}

發送方:
val kafkaProducerFunc  = () => {
    val config = {
      val p = new Properties()
      p.setProperty("bootstrap.servers", ScmConfUtil.getInstance().getString("bootstrap.servers",""))
      p.setProperty("key.serializer", classOf[StringSerializer].getName)
      p.setProperty("value.serializer", classOf[StringSerializer].getName)
      p
    }
    val producer = new KafkaProducer[String, String](config)
    sys.addShutdownHook {
      // Ensure that, on executor JVM shutdown, the Kafka producer sends
      // any buffered messages to Kafka before shutting down.
      producer.close()
    }
    producer
  }

  val kafkaProducer = kafkaProducerFunc()

示例代碼:
  val kafkaParams = Map[String, String](
      "metadata.broker.list" -> ScmConfUtil.getInstance().getString("order.metadata.broker.list",""),
      "auto.offset.reset" -> ScmConfUtil.getInstance().getString("auto.offset.reset","smallest"),
      "group.id" -> ScmConfUtil.getInstance().getString("order.group.id","spark_rtppmep"))
    val kafkaClient = new KafkaClient(kafkaParams)
    // sparkContext
    //val broadcastVar=ssc.sparkContext.broadcast(RedisUtils.getInstance())

    val message = kafkaClient.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, "ppmep_topic")  

相關文章
相關標籤/搜索