最近一直在用directstream方式消費kafka中的數據,特此總結,整個代碼工程分爲三個部分
一. 完整工程代碼以下(某些地方特地作了說明, 這個代碼的部分函數直接用的是spark-streaming-kafka-0.8_2.11)java
package directStream import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder import kafka.common.TopicAndPartition import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.TopicPartition //import java.util._ import org.apache.spark.{SparkContext,SparkConf,TaskContext, SparkException} import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.kafka.{KafkaUtils,HasOffsetRanges, OffsetRange,KafkaCluster} import com.typesafe.config.ConfigFactory import scalikejdbc._ import scala.collection.JavaConverters._ object SetupJdbc { def apply(driver: String, host: String, user: String, password: String): Unit = { Class.forName(driver) ConnectionPool.singleton(host, user, password) } } object SimpleApp{ def main(args: Array[String]): Unit = { val conf = ConfigFactory.load // 加載工程resources目錄下application.conf文件,該文件中配置了databases信息,以及topic及group消息 val kafkaParams = Map[String, String]( "metadata.broker.list" -> conf.getString("kafka.brokers"), "group.id" -> conf.getString("kafka.group"), "auto.offset.reset" -> "smallest" ) val jdbcDriver = conf.getString("jdbc.driver") val jdbcUrl = conf.getString("jdbc.url") val jdbcUser = conf.getString("jdbc.user") val jdbcPassword = conf.getString("jdbc.password") val topic = conf.getString("kafka.topics") val group = conf.getString("kafka.group") val ssc = setupSsc(kafkaParams, jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword,topic, group)() ssc.start() ssc.awaitTermination() } def createStream(taskOffsetInfo: Map[TopicAndPartition, Long], kafkaParams: Map[String, String], conf:SparkConf, ssc: StreamingContext, topics:String):InputDStream[_] = { // 若taskOffsetInfo 不爲空, 說明這不是第一次啓動該任務, database已經保存了該topic下該group的已消費的offset, 則對比kafka中該topic有效的offset的最小值和數據庫保存的offset,去比較大做爲新的offset. if(taskOffsetInfo.size != 0){ val kc = new KafkaCluster(kafkaParams) val earliestLeaderOffsets = kc.getEarliestLeaderOffsets(taskOffsetInfo.keySet) if(earliestLeaderOffsets.isLeft) throw new SparkException("get kafka partition failed:") val earliestOffSets = earliestLeaderOffsets.right.get val offsets = earliestOffSets.map(r => new TopicAndPartition(r._1.topic, r._1.partition) -> r._2.offset.toLong) val newOffsets = taskOffsetInfo.map(r => { val t = offsets(r._1) if (t > r._2) { r._1 -> t } else { r._1 -> r._2 } } ) val messageHandler = (mmd: MessageAndMetadata[String, String]) => 1L KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Long](ssc, kafkaParams, newOffsets, messageHandler) //val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( } else { val topicSet = topics.split(",").toSet KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams,topicSet) } } def setupSsc( kafkaParams: Map[String, String], jdbcDriver: String, jdbcUrl: String, jdbcUser: String, jdbcPassword: String, topics:String, group:String )(): StreamingContext = { val conf = new SparkConf() .setMaster("mesos://10.142.113.239:5050") .setAppName("offset") .set("spark.worker.timeout", "500") .set("spark.cores.max", "10") .set("spark.streaming.kafka.maxRatePerPartition", "500") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.streaming.backpressure.enabled", "true") .set("spark.task.maxFailures", "1") .set("spark.speculationfalse", "false") val ssc = new StreamingContext(conf, Seconds(5)) SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) // connect to mysql // begin from the the offsets committed to the database val fromOffsets = DB.readOnly { implicit session => sql"select topic, part, offset from streaming_task where group_id=$group". map { resultSet => new TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3) }.list.apply().toMap } val stream = createStream(fromOffsets, kafkaParams, conf, ssc, topics) stream.foreachRDD { rdd => if(rdd.count != 0){ // you task val t = rdd.map(record => (record, 1)) val results = t.reduceByKey {_+_}.collect // persist the offset into the database val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges DB.localTx { implicit session => offsetRanges.foreach { osr => sql"""replace into streaming_task values(${osr.topic}, ${group}, ${osr.partition}, ${osr.untilOffset})""".update.apply() if(osr.partition == 0){ println(osr.partition, osr.untilOffset) } } } } } ssc } }
二. 工程的resources文件下的有個application.conf配置文件,其配置以下mysql
jdbc { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/xxxx" user = "xxxx" password = "xxxx" } kafka { topics = "xxxx" brokers = "xxxx.xxx.xxx.:xxx,xxx.xxx.xxx.xxx:9092,xxx.xxxx.xxx:xxxx" group = "xxxxxx" } jheckpointDir = "hdfs://xxx.xxx.xxx.xxx:9000/shouzhucheckpoint" batchDurationMs = xxxx
三. 配置文件中能夠看到, 我把offset 保存在 mysql裏,這裏我定義了一個table 名稱爲streaming_task, 表的結構信息以下:sql
+----------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +----------+--------------+------+-----+---------+-------+ | topic | varchar(100) | NO | PRI | NULL | | | group_id | varchar(50) | NO | PRI | | | | part | int(4) | NO | PRI | 0 | | | offset | mediumtext | YES | | NULL | | +----------+--------------+------+-----+---------+-------+
一. 選用direct 的緣由
官方爲spark提供了兩種方式來消費kafka中的數據, 高階api由kafka本身來來維護offset, 有篇blog總結的比較好數據庫
第一種是利用 Kafka 消費者高級 API 在 Spark 的工做節點上建立消費者線程,訂閱 Kafka 中的消息,數據會傳輸到 Spark 工做節點的執行器中,可是默認配置下這種方法在 Spark Job 出錯時會致使數據丟失,若是要保證數據可靠性,須要在 Spark Streaming 中開啓Write Ahead Logs(WAL),也就是上文提到的 Kafka 用來保證數據可靠性和一致性的數據保存方式。能夠選擇讓 Spark 程序把 WAL 保存在分佈式文件系統(好比 HDFS)中,apache
第二種方式不須要創建消費者線程,使用 createDirectStream 接口直接去讀取 Kafka 的 WAL,將 Kafka 分區與 RDD 分區作一對一映射,相較於第一種方法,不需再維護一份 WAL 數據,提升了性能。讀取數據的偏移量由 Spark Streaming 程序經過檢查點機制自身處理,避免在程序出錯的狀況下重現第一種方法重複讀取數據的狀況,消除了 Spark Streaming 與 ZooKeeper/Kafka 數據不一致的風險。保證每條消息只會被 Spark Streaming 處理一次。如下代碼片經過第二種方式讀取 Kafka 中的數據:api
在我在使用第一種方式的時候,若是數據量太大, 每每會出現報錯,瞭解這這兩種方式的不一樣後, 果斷選用了第二種,session
二. 引入KafkaCluster類的緣由app
引入KafkaCluster是爲了在整個任務啓動以前, 首先獲取topic的有效的最舊offset. 這跟kafka的在實際的使用場景,大公司都是按時間刪除kafka上數據有關,若是任務掛的時間過久,在還未能啓動任務以前,database中保存的offset已經在kafak中失效,這時候爲了最大程度的減小損失,只能從該topic的最舊數據開始消費..分佈式
三. 存入database的緣由函數
看上面的代碼,估計好多人也扒過KafkaCluster的源碼, 這個類裏面其實有一個setConsumerOffsets這樣的方法�, 其實在處理過一個batch的數據後, 更新一下該topic下group的offset便可,可是仍是在開始啓動這個 job 的時候還得驗證該offset否有效. 貌似這樣還不用外部數據庫,豈不方便? 其實這樣作確實挺方便,
有些場景下這樣作無可厚非, 但我以爲: 若是處理完數據,要寫到外部數據庫, 此時,若是能把寫數據和寫offset放在一個事務中(前提是這個數據庫是支持事務), 那麼就能夠便可保證嚴格消費一次
四. conf 中兩個特殊設置設置
爲了確保task不會重複執行請設置下面兩個參數: