實際上作kafka receiver的時候,經過receiver來獲取數據,這個時候,kafka receiver是使用的kafka高層次的comsumer api來實現的。receiver會從kafka中獲取數據,而後把它存儲到咱們具體的Executor內存中。而後Spark streaming也就是driver中,會根據這獲取到的數據,啓動job去處理。api
在使用kafka接收消息時,都是調用了KafkaUtils裏面createStream的不一樣實現。數組
receiver方式的實現方式以下。app
/** * 建立一個inputStream,從kafkaBrokers上拉去消息,須要傳入zk集羣信息,默認會複製到另外一個excutor */ def createStream( ssc: StreamingContext,// spark上下文 zkQuorum: String,// zk集羣信息(hostname:port,hostname:port...) groupId: String,// 當前consumer所屬分組 topics: Map[String, Int],// Map[topic_name,numPartitions],topic消費對應的分區 storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") // 寫日誌 val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) // 組裝成KafkaInputDStream new KafkaInputDStream[K, V, U, T]( ssc, kafkaParams, topics, walEnabled, storageLevel) }
direct方式實現消費ide
/** * 摒棄了高階的kafkaConsumerAPI直接從kafkaBrokers獲取信息,能夠保證每條消息只被消費一次 * 特色: * - No receivers:沒有receivers,直接從kafka拉取數據 * - Offsets:不用zookeeper存儲offsets,偏移量是經過stream本身跟蹤記錄的,能夠經過HasOffsetRanges獲取offset * - Failure Recovery故障恢復:須要開啓sparkContext的checkpoint功能 * - End-to-end semantics最終一致性:保證消息被消費且只消費一次 * @return DStream of (Kafka message key, Kafka message value) */ def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, // brokers列表,Map("metadata.broker.list" -> brokers) kafkaParams: Map[String, String], topics: Set[String] ): InputDStream[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val fromOffsets = getFromOffsets(kc, kafkaParams, topics) new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }
largest
)開始消費。若是設置了auto.offset.reset
參數值爲smallest
將從最小偏移處開始消費。checkpoint恢復後,若是數據累積太多處理不過來,怎麼辦?函數
1)限速,經過spark.streaming.kafka.maxRatePerPartition
參數配置性能
2)加強機器的處理能力fetch
3)放到數據緩衝池中。ui
獲取offset集合,而後建立DirectKafkaInputDStream
對象this
// class KafkaUtils private[kafka] def getFromOffsets( kc: KafkaCluster, kafkaParams: Map[String, String], topics: Set[String] ): Map[TopicAndPartition, Long] = { // createDirectStream方法kafkaParams入參:消費起始位置 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase(Locale.ROOT)) val result = for { topicPartitions <- kc.getPartitions(topics).right leaderOffsets <- (if (reset == Some("smallest")) { // smallest表示最小offset,即從topic的開始位置消費全部消息. kc.getEarliestLeaderOffsets(topicPartitions) } else { // largest表示接受接收最大的offset(即最新消息), kc.getLatestLeaderOffsets(topicPartitions) }).right // for循環中的 yield 會把當前的元素記下來,保存在集合中,循環結束後將返回該集合。Scala中for循環是有返回值的。若是被循環的是Map,返回的就是Map,被循環的是List,返回的就是List,以此類推。 } yield { // 存放for循環的計算結果:map[TopicAndPartition, LeaderOffset] leaderOffsets.map { case (tp, lo) => (tp, lo.offset) } } KafkaCluster.checkErrors(result) } def createDirectStream{ new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }
DirectKafkaInputDStream.compute中建立KafkaRDD,並將offsets信息發送給inputStreamTracker.spa
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { // Map[TopicAndPartition, LeaderOffset] topic的partiton對應偏移量集合 val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) // 消息處理函數val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) // 建立KafkaRDD val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // 將topic和partition信息包裝成OffsetRange對象中 val offsetRanges = currentOffsets.map { case (tp, fo) => val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) } // 將OffsetRange報告給InputInfoTracker記錄 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }
KafkaRDD計算時直接從kafka上拉取數據
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { val part = thePart.asInstanceOf[KafkaRDDPartition] new KafkaRDDIterator(part, context) } private class KafkaRDDIterator( part: KafkaRDDPartition, context: TaskContext) extends NextIterator[R] { // 根據metadata.broker.list初始化KafkaCluster,用來鏈接到kafka val kc = new KafkaCluster(kafkaParams) var requestOffset = part.fromOffset var iter: Iterator[MessageAndOffset] = null // 提供一個最優的host優先訪問,最大化的減小重試次數 val consumer:SimpleConsumer = { // 重試次數大於0 if (context.attemptNumber > 0) { kc.connectLeader(part.topic, part.partition).fold( errs => throw new SparkException( s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " + errs.mkString("\n")), consumer => consumer ) } else { // 不用重試,直接鏈接 kc.connect(part.host, part.port) } } // 建立請求拉取數據 private def fetchBatch: Iterator[MessageAndOffset] = { val req = new FetchRequestBuilder() .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes) .build() val resp = consumer.fetch(req) // 失敗重試 handleFetchErr(resp) // kafka may return a batch that starts before the requested offset resp.messageSet(part.topic, part.partition) .iterator .dropWhile(_.offset < requestOffset) } // 拉取失敗,通知另外一個rdd從新嘗試 private def handleFetchErr(resp: FetchResponse) { if (resp.hasError) { // Let normal rdd retry sort out reconnect attempts throw ErrorMapping.exceptionFor(err) } } override def getNext(): R = { if (iter == null || !iter.hasNext) { // 拉取數據 iter = fetchBatch } if (!iter.hasNext) { assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) finished = true null.asInstanceOf[R] } else { // 遍歷拉取到的數據 val item = iter.next() if (item.offset >= part.untilOffset) { // 若是當前item的偏移量大於須要拉取的最大偏移量則結束 finished = true null.asInstanceOf[R] } else { requestOffset = item.nextOffset // 將拉取到的數據交由messageHandler處理 messageHandler(new MessageAndMetadata( part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) } } } } }
經過chekpoint的方式保存offset
// DStream中定義checkpoint的實現類 class DirectKafkaInputDStream extends InputDStream{ override val checkpointData =new DirectKafkaInputDStreamCheckpointData } class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = { // 定義一個不可變數組保存offset信息 data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] } }