10.1 示例代碼
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(brokers, topics) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}apache
10.2 生產環境維護offset
配置參數(Kryo序列化高效):
1. sparkConf.set("spark.akka.frameSize", "2047")
2. sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//一些默認的類使用kryo序列化
3. sparkConf.set("spark.kryoserializer.buffer.max.mb", "2040")
4. sparkConf.set("spark.files.overwrite","true")
5. sparkConf.set("spark.hadoop.validateOutputSpecs", "false")
6. sparkConf.set("spark.eventLog.overwrite", "true")
7. sparkConf.set("spark.streaming.kafka.maxRatePerPartition","30") //每秒鐘最大消費,而kafka拉的數據爲topic對應partition的數量乘以設置的數 bootstrap
package com.suning.mep.utilsoop
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils}spa
import scala.reflect.ClassTagscala
class KafkaClient(val kafkaParams: Map[String, String]) extends Serializable {
private val kc = new KafkaCluster(kafkaParams)code
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](ssc: StreamingContext, topic: String): InputDStream[(K, V)] = {
val partitionsEither = kc.getPartitions(Set(topic))
if (partitionsEither.isLeft) throw new SparkException(s"get kafka partition failed: ${partitionsEither.left.get}")server
val partitions = partitionsEither.right.get
val groupId = kafkaParams.get("group.id").getip
val offsets = setOrUpdateOffsets(partitions, groupId)
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, offsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}hadoop
private def setOrUpdateOffsets(partitions: Set[TopicAndPartition], groupId: String): Map[TopicAndPartition, Long] = {
val consumerOffsetEither = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetEither.isLeft) {
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)get
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
val leaderOffsetsEither = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsEither.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsEither.left.get}")
leaderOffsets = leaderOffsetsEither.right.get
} else {
val leaderOffsetsEither = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsEither.isLeft) throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsEither.left.get}")
leaderOffsets = leaderOffsetsEither.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
offsets
} else {
val earliestLeaderOffsetsEither = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsEither.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsEither.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsEither.right.get
val consumerOffsets = consumerOffsetEither.right.get
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
offsets += (tp -> earliestLeaderOffset)
} else {
offsets += (tp -> n)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
offsets
}
/**
* 若是streaming程序執行的時候出現kafka.common.OffsetOutOfRangeException,
* 說明zk上保存的offsets已通過時了,即kafka的定時清理策略已經將包含該offsets的文件刪除。
* 針對這種狀況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 若是consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過期,
* 這時把consumerOffsets更新爲earliestLeaderOffsets
*/
}
def updateOffsets(rdd: RDD[(String, String)]): Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}
發送方:
val kafkaProducerFunc = () => {
val config = {
val p = new Properties()
p.setProperty("bootstrap.servers", ScmConfUtil.getInstance().getString("bootstrap.servers",""))
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
val kafkaProducer = kafkaProducerFunc()
示例代碼:
val kafkaParams = Map[String, String](
"metadata.broker.list" -> ScmConfUtil.getInstance().getString("order.metadata.broker.list",""),
"auto.offset.reset" -> ScmConfUtil.getInstance().getString("auto.offset.reset","smallest"),
"group.id" -> ScmConfUtil.getInstance().getString("order.group.id","spark_rtppmep"))
val kafkaClient = new KafkaClient(kafkaParams)
// sparkContext
//val broadcastVar=ssc.sparkContext.broadcast(RedisUtils.getInstance())
val message = kafkaClient.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, "ppmep_topic")