Spark Streaming 中管理 Kafka Offsets 的幾種方式

Offset管理概述node

Spark Streaming集成了Kafka容許用戶從Kafka中讀取一個或者多個topic的數據。一個Kafka topic包含多個存儲消息的分區(partition)。每一個分區中的消息是順序存儲,而且用offset(能夠認爲是位置)來標記消息。開發者能夠在他的Spark Streaming應用中經過offset來控制數據的讀取位置,可是這須要好的offset的管理機制。數據庫

Offsets管理對於保證流式應用在整個生命週期中數據的連貫性是很是有益的。舉個例子,若是在應用中止或者報錯退出以前沒有將offset保存在持久化數據庫中,那麼offset rangges就會丟失。更進一步說,若是沒有保存每一個分區已經讀取的offset,那麼Spark Streaming就沒有辦法從上次斷開(中止或者報錯致使)的位置繼續讀取消息。apache

 

上面的圖描述一般的Spark Streaming應用管理offset流程。Offsets能夠經過多種方式來管理,可是通常來講遵循下面的步驟:編程

  • 在 Direct DStream初始化的時候,須要指定一個包含每一個topic的每一個分區的offset用於讓Direct DStream從指定位置讀取數據。設計模式

    • offsets就是步驟4中所保存的offsets位置session

  • 讀取並處理消息app

  • 處理完以後存儲結果數據框架

    • 用虛線圈存儲和提交offset只是簡單強調用戶可能會執行一系列操做來知足他們更加嚴格的語義要求。這包括冪等操做和經過原子操做的方式存儲offset。異步

  • 最後,將offsets保存在外部持久化數據庫如 HBase, Kafka, HDFS, and ZooKeeper中工具

不一樣的方案能夠根據不一樣的商業需求進行組合。Spark具備很好的編程範式容許用戶很好的控制offsets的保存時機。認真考慮如下的情形:一個Spark  Streaming 應用從Kafka中讀取數據,處理或者轉換數據,而後將數據發送到另外一個topic或者其餘系統中(例如其餘消息系統、Hbase、Solr、DBMS等等)。在這個例子中,咱們只考慮消息處理以後發送到其餘系統中

將Offsests存儲在外部系統

在這一章節中,咱們未來探討一下不一樣的外部持久化存儲選項

爲了更好地理解這一章節中提到的內容,咱們先來作一些鋪墊。若是是使用 spark-streaming-kafka-0-10,那麼咱們建議將 enable.auto.commit 設爲false。這個配置只是在這個版本生效,enable.auto.commit 若是設爲true的話,那麼意味着 offsets 會按照 auto.commit.interval.ms 中所配置的間隔來週期性自動提交到Kafka中。在Spark Streaming中,將這個選項設置爲true的話會使得Spark應用從kafka中讀取數據以後就自動提交,而不是數據處理以後提交,這不是咱們想要的。因此爲了更好地控制offsets的提交,咱們建議將enable.auto.commit 設爲false。

 

Spark Streaming checkpoints

使用Spark Streaming的checkpoint是最簡單的存儲方式,而且在Spark 框架中很容易實現。Spark Streaming checkpoints就是爲保存應用狀態而設計的,咱們將路徑這在HDFS上,因此可以從失敗中恢復數據。

對Kafka Stream 執行checkpoint操做使得offset保存在checkpoint中,若是是應用掛掉的話,那麼SparkStreamig應用功能能夠從保存的offset中開始讀取消息。可是,若是是對Spark Streaming應用進行升級的話,那麼很抱歉,不能checkpoint的數據無法使用,因此這種機制並不可靠,特別是在嚴格的生產環境中,咱們不推薦這種方式。

將offsets存儲在HBase中

HBase能夠做爲一個可靠的外部數據庫來持久化offsets。經過將offsets存儲在外部系統中,Spark Streaming應用功能可以重讀或者回聽任何仍然存儲在Kafka中的數據。

根據HBase的設計模式,容許應用可以以rowkey和column的結構將多個Spark Streaming應用和多個Kafka topic存放在一張表格中。在這個例子中,表格以topic名稱、消費者group id和Spark Streaming 的batchTime.milliSeconds做爲rowkey以作惟一標識。儘管batchTime.milliSeconds不是必須的,可是它可以更好地展現歷史的每批次的offsets。表格將存儲30天的累積數據,若是超出30天則會被移除。下面是建立表格的DDL和結構

1DDL
2create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}
3RowKey Layout:
4row:                     <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
5column family:    offsets
6qualifier:          <PARTITION_ID>
7value:                 <OFFSET_ID>

對每個批次的消息,使用saveOffsets()將從指定topic中讀取的offsets保存到HBase中

 1/*
2 Save offsets for each batch into HBase
3*/
4def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
5                hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={
6  val hbaseConf = HBaseConfiguration.create()
7  hbaseConf.addResource("src/main/resources/hbase-site.xml")
8  val conn = ConnectionFactory.createConnection(hbaseConf)
9  val table = conn.getTable(TableName.valueOf(hbaseTableName))
10  val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)
11  val put = new Put(rowKey.getBytes)
12  for(offset <- offsetRanges){
13    put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
14          Bytes.toBytes(offset.untilOffset.toString))
15  }
16  table.put(put)
17  conn.close()
18}

在執行streaming任務以前,首先會使用getLastCommittedOffsets()來從HBase中讀取上一次任務結束時所保存的offsets。該方法將採用經常使用方案來返回kafka topic分區offsets。

  • 情形1:Streaming任務第一次啓動,從zookeeper中獲取給定topic的分區數,而後將每一個分區的offset都設置爲0,並返回。

  • 情形2:一個運行了很長時間的streaming任務中止而且給定的topic增長了新的分區,處理方式是從zookeeper中獲取給定topic的分區數,對於全部老的分區,offset依然使用HBase中所保存,對於新的分區則將offset設置爲0。

  • 情形3:Streaming任務長時間運行後中止而且topic分區沒有任何變化,在這個情形下,直接使用HBase中所保存的offset便可。

在Spark Streaming應用啓動以後若是topic增長了新的分區,那麼應用只能讀取到老的分區中的數據,新的是讀取不到的。因此若是想讀取新的分區中的數據,那麼就得從新啓動Spark Streaming應用。

 1/* Returns last committed offsets for all the partitions of a given topic from HBase in  
2following  cases.
3*/
4def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,
5zkQuorum:String,zkRootDir:String,sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={
6  val hbaseConf = HBaseConfiguration.create()
7  val zkUrl = zkQuorum+"/"+zkRootDir
8  val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,
9                                                sessionTimeout,connectionTimeOut)
10  val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
11  val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME
12                                                 )).get(TOPIC_NAME).toList.head.size
13  zkClientAndConnection._1.close()
14  zkClientAndConnection._2.close()
15  //Connect to HBase to retrieve last committed offsets
16  val conn = ConnectionFactory.createConnection(hbaseConf)
17  val table = conn.getTable(TableName.valueOf(hbaseTableName))
18  val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" +
19                                              String.valueOf(System.currentTimeMillis())
20  val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
21  val scan = new Scan()
22  val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(
23                                                   stopRow.getBytes).setReversed(true))
24  val result = scanner.next()
25  var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
26  if (result != null){
27  //If the result from hbase scanner is not null, set number of partitions from hbase
28  to the  number of cells
29    hbaseNumberOfPartitionsForTopic = result.listCells().size()
30  }
31val fromOffsets = collection.mutable.Map[TopicPartition,Long]()
32  if(hbaseNumberOfPartitionsForTopic == 0){
33    // initialize fromOffsets to beginning
34    for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
35      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
36    }
37  } else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
38  // handle scenario where new partitions have been added to existing kafka topic
39    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
40      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
41                                        Bytes.toBytes(partition.toString)))
42      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
43    }
44    for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
45      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
46    }
47  } else {
48  //initialize fromOffsets from last run
49    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
50      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
51                                        Bytes.toBytes(partition.toString)))
52      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
53    }
54  }
55  scanner.close()
56  conn.close()
57  fromOffsets.toMap
58}

當咱們獲取到offsets以後咱們就能夠建立一個Kafka Direct DStream

1val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,
2                                        zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)
3val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
4                           Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))

在完成本批次的數據處理以後調用saveOffsets()保存offsets.

 1/*
2For each RDD in a DStream apply a map transformation that processes the message.
3*/
4inputDStream.foreachRDD((rdd,batchTime) => {
5  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
6  offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,
7                        offset.untilOffset))
8  val newRDD = rdd.map(message => processMessage(message))
9  newRDD.count()
10  saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime)
11})

你能夠到HBase中去查看不一樣topic和消費者組的offset數據

 1hbase(main):001:0> scan 'stream_kafka_offsets', {REVERSED => true}
2ROW                                                COLUMN+CELL
3 kafkablog2:groupid-1:1497628830000                column=offsets:0, timestamp=1497628832448, value=285
4 kafkablog2:groupid-1:1497628830000                column=offsets:1, timestamp=1497628832448, value=285
5 kafkablog2:groupid-1:1497628830000                column=offsets:2, timestamp=1497628832448, value=285
6 kafkablog2:groupid-1:1497628770000                column=offsets:0, timestamp=1497628773773, value=225
7 kafkablog2:groupid-1:1497628770000                column=offsets:1, timestamp=1497628773773, value=225
8 kafkablog2:groupid-1:1497628770000                column=offsets:2, timestamp=1497628773773, value=225
9 kafkablog1:groupid-2:1497628650000                column=offsets:0, timestamp=1497628653451, value=165
10 kafkablog1:groupid-2:1497628650000                column=offsets:1, timestamp=1497628653451, value=165
11 kafkablog1:groupid-2:1497628650000                column=offsets:2, timestamp=1497628653451, value=165
12 kafkablog1:groupid-1:1497628530000                column=offsets:0, timestamp=1497628533108, value=120
13 kafkablog1:groupid-1:1497628530000                column=offsets:1, timestamp=1497628533108, value=120
14 kafkablog1:groupid-1:1497628530000                column=offsets:2, timestamp=1497628533108, value=120
154 row(s) in 0.5030 seconds
16hbase(main):002:0>
17

代碼示例用的如下的版本

 

將offsets存儲到 ZooKeeper中

在Spark Streaming鏈接Kafka應用中使用Zookeeper來存儲offsets也是一種比較可靠的方式。

在這個方案中,Spark Streaming任務在啓動時會去Zookeeper中讀取每一個分區的offsets。若是有新的分區出現,那麼他的offset將會設置在最開始的位置。在每批數據處理完以後,用戶須要能夠選擇存儲已處理數據的一個offset或者最後一個offset。此外,新消費者將使用跟舊的Kafka 消費者API同樣的格式將offset保存在ZooKeeper中。所以,任何追蹤或監控Zookeeper中Kafka Offset的工具仍然生效的。

初始化Zookeeper connection來從Zookeeper中獲取offsets

 1val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
2val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
3def readOffsets(topics: Seq[String], groupId:String):
4 Map[TopicPartition, Long] = {
5 val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
6 val partitionMap = zkUtils.getPartitionsForTopics(topics)
7 // /consumers/<groupId>/offsets/<topic>/
8 partitionMap.foreach(topicPartitions => {
9   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
10   topicPartitions._2.foreach(partition => {
11     val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
12     try {
13       val offsetStatTuple = zkUtils.readData(offsetPath)
14       if (offsetStatTuple != null) {
15         LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
16         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
17           offsetStatTuple._1.toLong)
18       }
19     } catch {
20       case e: Exception =>
21         LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
22         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
23     }
24   })
25 })
26 topicPartOffsetMap.toMap
27}

使用獲取到的offsets來初始化Kafka Direct DStream

1val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))

下面是從ZooKeeper獲取一組offsets的方法

注意: Kafka offset在ZooKeeper中的存儲路徑爲/consumers/[groupId]/offsets/topic/[partitionId], 存儲的值爲offset

 1def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
2 offsets.foreach(or => {
3   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
4   val acls = new ListBuffer[ACL]()
5   val acl = new ACL
6   acl.setId(ANYONE_ID_UNSAFE)
7   acl.setPerms(PERMISSIONS_ALL)
8   acls += acl
9   val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
10   val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
11   zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"
12     + or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))
13   LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
14 })
15}

Kafka 自己

Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消費者API即異步提交API。你能夠在你確保你處理後的數據已經妥善保存以後使用commitAsync API(異步提交 API)來向Kafka提交offsets。新的消費者API會以消費者組id做爲惟一標識來提交offsets

將offsets提交到Kafka中

1stream.foreachRDD { rdd =>
2  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
3  // some time later, after outputs have completed
4  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
5}

注意: commitAsync()是Spark Streaming集成kafka-0-10版本中的,在Spark文檔提醒到它仍然是個實驗性質的API而且存在修改的可能性。

其餘方式

值得注意的是你也能夠將offsets存儲到HDFS中。可是將offsets存儲到HDFS中並非一個受歡迎的方式,由於HDFS對已ZooKeeper和Hbase來講它的延遲有點高。此外,將每批次數據的offset存儲到HDFS中還會帶來小文件的問題

無論理offsets

管理offsets對於Spark Streaming應該用來講並非必須的。舉個例子,像應用存活監控它只須要當前的數據,並不須要經過管理offsets來保證數據的不丟失。這種情形下你徹底不須要管理offsets,老的kafka消費者能夠將auto.offset.reset設爲largest或者smallest,而新的消費者則設置爲earliest or latest。

若是你將auto.offset.reset設爲smallest (earliest),那麼任務會從最開始的offset讀取數據,至關於重播全部數據。這樣的設置會使得你的任務重啓時將該topic中仍然存在的數據再讀取一遍。這將由你的消息保存週期來決定你是否會重複消費。

相反地,若是你將auto.offset.reset 設置爲largest (latest),那麼你的應用啓動時會從最新的offset開始讀取,這將致使你丟失數據。這將依賴於你的應用對數據的嚴格性和語義需求,這或許是個可行的方案。

總結

上面咱們所討論的管理offsets的方式將幫助你在Spark Streaming應用中如何有效地控制offsets。這些方法可以幫助用戶在持續不斷地計算和存儲數據應用中更好地面對應用失效和數據恢復的場景。

相關文章
相關標籤/搜索