Alpakka Kafka,反應式Kafka客戶端

Alpakka Kafka 是一個要用於 Java 和 Scala 語言的開源的流感知和反應式集成數據線項目。它創建在 Akka Stream之上,提供了 DSL 來支持反應式和流式編程,內置回壓功能。Akka Streams 是 Reactive Streams 和JDK 9+ java.util.concurrent.Flow 的兼容實現,可無縫地與其進行互操做。html

要使用 Alpakka Kafka,須要在你的項目添加以下依賴:java

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "1.0-RC2"

當前支持 kafka-clients 2.1.x 和 Akka Streams 2.5.21。react

快速開始

對Akka Streams或Kafka不熟的,可先查閱二者的官方文檔:git

Alpakka Kafka 寫的代碼很是精緻且簡潔,也許你會一眼愛上它的美。github

object KafkaGetting extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  import system.dispatcher
  val config = system.settings.config

  val producerSettings = 
    ProducerSettings(config.getConfig("akka.kafka.producer"),
      new StringSerializer, new StringSerializer)

  val consumerSettings = 
    ConsumerSettings(config.getConfig("akka.kafka.consumer"), 
      new StringDeserializer, new StringDeserializer)

  val producerQueue = Source
    .queue[String](128, OverflowStrategy.fail)
    .map(str => new ProducerRecord[String, String]("test", str))
    .toMat(Producer.plainSink(producerSettings))(Keep.left)
    .run()

  val consumerControl = Consumer
    .plainSource(consumerSettings, Subscriptions.topics("test"))
    .map(record => record.value())
    .toMat(Sink.foreach(value => println(value)))(Keep.left)
    .run()

  Source(1 to 10)
    .map(_.toString)
    .throttle(1, 2.seconds)
    .runForeach(message => producerQueue.offer(message))
    .onComplete(tryValue => println(s"producer send over, return $tryValue"))

  println("Press 'enter' key exit.")
  StdIn.readLine()
  producerQueue.complete()
  consumerControl.shutdown()
  system.terminate()
  Await.result(system.whenTerminated, 10.seconds)
}

上面的代碼實現了一個完整的Kafka生產者、消費者數據處理流程,整個處理都是異步、非阻塞的。沒有顯示線程建立、沒有相似 where(true) 這樣的消費處理循環……接下來,讓咱們分析下以上代碼。apache

代碼分析

producerSettings

Alpakka Kafka 使用ProducerSettings來封裝建立Kafka生產者時須要的參數,它使用了 Typesafe Config 經過可配置的方式來構建生產者。編程

producerSettings 使用 "akka.kafka.producer" 部分的參數來構造 Kafka 生產者,如下是一個示例的 Typesafe Config 配置:json

akka.kafka.producer {
  # 同時可運行的send操做數量
  parallelism = 100

  # 調用 `KafkaProducer.close` 時等待關閉的時間
  close-timeout = 60s
  
  # 線程池
  use-dispatcher = "akka.kafka.default-dispatcher"

  # 定義 org.apache.kafka.clients.producer.ProducerConfig 屬性須要的參數
  kafka-clients {
    # 使用英文逗號分隔多個Kafka服務地址
    bootstrap.servers = "localhost:9092"
  }
}

consumerSettingsbootstrap

consumerSettings 使用 "akka.kafka.consumer" 部分的參數來構造 Kafka 消費者,如下是一個示例的 Typesafe Config 配置:api

akka.kafka.consumer {
  # 拉取數據間隔週期
  poll-interval = 50ms
  
  # 拉取數據超時時間
  poll-timeout = 30s

  # 調用 `KafkaConsumer.close` 時等待關閉的時間
  close-timeout = 20s
  
  # 線程池
  use-dispatcher = "akka.kafka.default-dispatcher"

  # 定義 org.apache.kafka.clients.producer.ProducerConfig 屬性須要的參數
  kafka-clients {
    # 使用英文逗號分隔多個Kafka服務地址
    bootstrap.servers = "localhost:9092"

    # 自動commit消息
    enable.auto.commit = true

    # 消費者組ID
    group.id = "resource-dev"

    # 從最新的offset開始讀取消息,不然從頭開始讀取
    auto.offset.reset = "earliest"
  }
}

producerQueue

使用Akka Streams構造一個生產者隊列 producerQueue,再由 Producer.plainSink 來消費發送到 producerQueue 裏的消息。須要注意的是構造 Source.queue[String] 時設置的 128 這個參數並非 Kafka 的消息隊列容量,而是 Akka Streams Source 構造出來的一個Queue。Producer.plainSink 是一個 下游 ,它消費來自 producerQueue 這個上游的消息,再將數據發送到 Kafka 服務。

consumerControl

經過 Consumer 這個Akka Streams Source構造了一個Kafka消費者,並監聽指定的 "test" 主題。consumerControl 流首先從收到的每一個消息(ConsumerRecord)中取得 value,併發送到下游,下游經過 Sink.foreach 接收數據並打印到終端。

Source(1 to 10)

生成從1到10的字符串消息值,並每隔2秒經過 producerQueue 發送一個消息到Kafka。

小結

本文經過一個簡單的示例展示怎樣經過 Alpakka Kafka 來實現對 Kafka 的集成,完成的代碼示例見:https://github.com/ihongka/akka-fusion/blob/master/fusion-kafka/src/test/scala/fusion/kafka/getting/KafkaGetting.scala

Kafka發展到如今,已不僅僅再是一個消息系統了,在MQ以外,它還提供了KSQL和Connector特性。應用基於 Kafka 能夠有更多的設計和實現,而Akka Stream + Kafka是一個強大的組合,接下來我會寫一系列文章介紹怎樣使用 Alpakka Kafka 來基於 Kafka 進行應用和架構設計。

相關文章
相關標籤/搜索