package org.apache.spark.streaming.kafka import kafka.api.OffsetCommitRequest import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition} import kafka.consumer.SimpleConsumer import org.apache.spark.SparkException import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig import scala.collection.mutable.ArrayBuffer import scala.util.Random import scala.util.control.NonFatal /** * User: 過往記憶 * Date: 2015-06-02 * Time: 下午23:46 * bolg: https://www.iteblog.com * 本文地址:https://www.iteblog.com/archives/1381 * 過往記憶博客,專一於hadoop、hive、spark、shark、flume的技術博客,大量的乾貨 * 過往記憶博客微信公共賬號:iteblog_hadoop */ class KafkaCluster( val kafkaParams : Map[String, String]) extends Serializable { type Err = ArrayBuffer[Throwable] @ transient private var _ config : SimpleConsumerConfig = null def config : SimpleConsumerConfig = this .synchronized { if ( _ config == null ) { _ config = SimpleConsumerConfig(kafkaParams) } _ config } def setConsumerOffsets(groupId : String, offsets : Map[TopicAndPartition, Long] ) : Either[Err, Map[TopicAndPartition, Short]] = { setConsumerOffsetMetadata(groupId, offsets.map { kv = > kv. _ 1 -> OffsetMetadataAndError(kv. _ 2 ) }) } def setConsumerOffsetMetadata(groupId : String, metadata : Map[TopicAndPartition, OffsetMetadataAndError] ) : Either[Err, Map[TopicAndPartition, Short]] = { var result = Map[TopicAndPartition, Short]() val req = OffsetCommitRequest(groupId, metadata) val errs = new Err val topicAndPartitions = metadata.keySet withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer = > val resp = consumer.commitOffsets(req) val respMap = resp.requestInfo val needed = topicAndPartitions.diff(result.keySet) needed.foreach { tp : TopicAndPartition = > respMap.get(tp).foreach { err : Short = > if (err == ErrorMapping.NoError) { result + = tp -> err } else { errs.append(ErrorMapping.exceptionFor(err)) } } } if (result.keys.size == topicAndPartitions.size) { return Right(result) } } val missing = topicAndPartitions.diff(result.keySet) errs.append( new SparkException(s "Couldn't set offsets for ${missing}" )) Left(errs) } private def withBrokers(brokers : Iterable[(String, Int)], errs : Err) (fn : SimpleConsumer = > Any) : Unit = { brokers.foreach { hp = > var consumer : SimpleConsumer = null try { consumer = connect(hp. _ 1 , hp. _ 2 ) fn(consumer) } catch { case NonFatal(e) = > errs.append(e) } finally { if (consumer ! = null ) { consumer.close() } } } } def connect(host : String, port : Int) : SimpleConsumer = new SimpleConsumer(host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId) } |