alpakka-kafka(2)-consumer

   alpakka-kafka-consumer的功能描述很簡單:向kafka訂閱某些topic而後把讀到的消息傳給akka-streams作業務處理。在kafka-consumer的實現細節上,爲了達到高可用、高吞吐的目的,topic又可用劃分出多個分區partition。分區是分佈在kafka集羣節點broker上的。因爲一個topic可能有多個partition,對應topic就會有多個consumer,造成一個consumer組,共用統一的groupid。一個partition只能對應一個consumer、而一個consumer負責從多個partition甚至多個topic讀取消息。kafka會根據實際狀況將某個partition分配給某個consumer,即partition-assignment。因此通常來講咱們會把topic訂閱與consumer-group掛鉤。這個能夠在典型的ConsumerSettings證明:mongodb

  val system = ActorSystem("kafka-sys")
  val config = system.settings.config.getConfig("akka.kafka.consumer")
  val bootstrapServers = "localhost:9092"
  val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
      .withBootstrapServers(bootstrapServers)
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

咱們先用一個簡單的consumer plainSource試試把前一篇示範中producer寫入kafka的消息讀出來: 數據庫

import akka.actor.ActorSystem
import akka.kafka._
import akka.kafka.scaladsl._
import akka.stream.{RestartSettings, SystemMaterializer}
import akka.stream.scaladsl.{Keep, RestartSource, Sink}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent._
import scala.concurrent.duration._
object plain_source extends App {
  val system = ActorSystem("kafka-sys")
  val config = system.settings.config.getConfig("akka.kafka.consumer")
  implicit val mat = SystemMaterializer(system).materializer
  implicit val ec: ExecutionContext = mat.executionContext
  val bootstrapServers = "localhost:9092"
  val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(bootstrapServers)
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val subscription = Subscriptions.topics("greatings")
  Consumer.plainSource(consumerSettings, subscription)
    .runWith(Sink.foreach(msg => println(msg.value())))

  scala.io.StdIn.readLine()
  system.terminate()

}

以上咱們沒有對讀出的消息作任何的業務處理,直接顯示出來。注意每次都會從頭完整讀出,由於設置了 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),也就是kafka-consumer的auto.offset.reset = "earliest" 。那麼若是須要用讀出的數據進行業務處理的話,每次開始運行應用時都會重複從頭執行這些業務。因此須要某種機制來標註已經讀取的消息,也就是須要記住當前讀取位置offset。apache

Consumer.plainSource輸入ConsumerRecord類型:bootstrap

    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          K key,
                          V value) {
        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
                NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
    }

這個ConsumerRecord類型裏包括了offset,用戶能夠自行commit這個位置參數,也就是說用戶能夠選擇把這個offset存儲在kafka或者其它的數據庫裏。說到commit-offset,offset管理機制在kafka-consumer業務應用中應該屬於關鍵技術。kafka-consumer方面的業務流程能夠簡述爲:從kafka讀出業務指令,執行指令並更新業務狀態,而後再從kafka裏讀出下一批指令。爲了實現業務狀態的準確性,不管錯過一些指令或者重複執行一些指令都是不能容忍的。因此,必須準確的標記每次從kafka讀取數據後的指針位置,commit-offset。可是,若是讀出數據後即刻commit-offset,那麼在執行業務指令時若是系統發生異常,那麼下次再從標註的位置開始讀取數據時就會越過一批業務指令。這種狀況稱爲at-most-once,便可能會執行一次,但毫不會重複。另外一方面:若是在成功改變業務狀態後再commit-offset,那麼,一旦執行業務指令時發生異常而沒法進行commit-offset,下次讀取的位置將使用前一次的標註位置,就會出現重複改變業務狀態的狀況,這種狀況稱爲at-least-once,即必定會執行業務指令,但可能出現重複更新狀況。若是涉及資金、庫存等業務,二者皆不可接受,只能採用exactly-once保證一次這種模式了。不過也有不少業務要求沒那麼嚴格,好比某個網站統計點擊量,只需個約莫數,不管at-least-once,at-most-once均可以接受。安全

kafka-consumer-offset是一個Long類型的值,能夠存放在kafka內部或者外部的數據庫裏。若是選擇在kafka內部存儲offset, kafka配置裏能夠設定按時間間隔自動進行位置標註,自動把當前offset存入kafka裏。當咱們在上面例子的ConsumerSettings裏設置自動commit後,屢次從新運行就不會出現重複數據的狀況了:app

val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(bootstrapServers)
      .withGroupId("group1")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")        //自動commit
      .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")   //自動commit間隔
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

alpakka-kafka提供了Committer類型,是akka-streams的Sink或Flow組件,負責把offset寫入kafka。若是用Committer的Sink或Flow就能夠按用戶的須要控制commit-offset的發生時間。以下面這段示範代碼: 網站

 

  val committerSettings = CommitterSettings(system)

  val control: DrainingControl[Done] =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics("greatings"))
      .mapAsync(10) { msg =>
        BusinessLogic.runBusiness(msg.record.key, msg.record.value)
          .map(_ => msg.committableOffset)
      }
      .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
      .run()

control.drainAndShutdown();

  scala.io.StdIn.readLine()
  system.terminate()

}
object BusinessLogic {
  def runBusiness(key: String, value: String): Future[Done] = Future.successful(Done)
}

上面這個例子裏BusinessLogic.runBusiess()模擬一段業務處理代碼,也就是說完成了業務處理以後就用Committer.sink進行了commit-offset。這是一種at-least-once模式,由於runBusiness可能會發生異常失敗,因此有可能出現重複運算的狀況。Consumer.committableSource輸出CommittableMessage: this

def committableSource[K, V](settings: ConsumerSettings[K, V],
                              subscription: Subscription): Source[CommittableMessage[K, V], Control] =
    Source.fromGraph(new CommittableSource[K, V](settings, subscription))



  final case class CommittableMessage[K, V](
      record: ConsumerRecord[K, V],
      committableOffset: CommittableOffset
  )

  @DoNotInherit sealed trait CommittableOffset extends Committable {
    def partitionOffset: PartitionOffset
  }

Committer.sink接受輸入Committable類型並將之寫入kafka,上游的CommittableOffset 繼承了 Committable。另外,這個DrainingControl類型結合了Control類型和akka-streams終結信號能夠有效控制整個consumer-streams安全終結。spa

alpakka-kafka還有一個atMostOnceSource。這個Source組件每讀一條數據就會當即自動commit-offset:scala

  def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
                             subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
    committableSource[K, V](settings, subscription).mapAsync(1) { m =>
      m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext)
    }

能夠看出來,atMostOnceSource在輸出ConsumerRecord以前就進行了commit-offset。atMostOnceSource的一個具體使用示範以下:

  import scala.collection.immutable
  val control: DrainingControl[immutable.Seq[Done]] =
    Consumer
      .atMostOnceSource(consumerSettings, Subscriptions.topics("greatings"))
      .mapAsync(1)(record => BussinessLogic.runBusiness(record.key, record.value()))
      .toMat(Sink.seq)(DrainingControl.apply)
      .run()

  control.drainAndShutdown();
  scala.io.StdIn.readLine()
  system.terminate()

因此,使用atMostOnceSource後是不須要任何Committer來進行commit-offset的了。值得注意的是atMostOnceSource是對每一條數據進行位置標註的,因此運行效率必然會受到影響,若是要求不是那麼嚴格的話仍是啓動自動commit比較合適。

對於任何類型的交易業務系統來講,不管at-least-once或at-most-once都是不可接受的,只有exactly-once才穩當。實現exactly-once的其中一個方法是把offset與業務數據存放在同一個外部數據庫中。若是在外部數據庫經過事務處理機制(transaction-processing)把業務狀態更新與commit-offset放在一個事務單元中同進同退就能實現exactly-once模式了。下面這段是官方文檔給出的一個示範:

  val db = new mongoldb
  val control = db.loadOffset().map { fromOffset =>
    Consumer
      .plainSource(
        consumerSettings,
        Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, /* partition = */ 0) -> fromOffset
        )
      )
      .mapAsync(1)(db.businessLogicAndStoreOffset)
      .toMat(Sink.seq)(DrainingControl.apply)
      .run()
  }

class mongoldb {
  def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...
  def loadOffset(): Future[Long] = // ...
}

在上面這段代碼裏:db.loadOffset()從mongodb裏取出上一次讀取位置,返回Future[Long],而後用Subscriptions.assignmentWithOffset把這個offset放在一個tuple (TopicPartition,Long)裏。TopicPartition定義以下: 

    public TopicPartition(String topic, int partition) {
        this.partition = partition;
        this.topic = topic;
    }

這樣Consumer.plainSource就能夠從offset開始讀取數據了。plainSource輸出ConsumerRecord類型:

    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          K key,
                          V value) {
        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
                NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
    }

這裏面除業務指令value外還提供了當前offset。這些已經足夠在businessLogicAndStoreOffset()裏運算一個單獨的business+offset事務了(transaction)。 

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息