alpakka項目是一個基於akka-streams流處理編程工具的scala/java開源項目,經過提供connector鏈接各類數據源並在akka-streams裏進行數據處理。alpakka-kafka就是alpakka項目裏的kafka-connector。對於咱們來講:能夠用alpakka-kafka來對接kafka,使用kafka提供的功能。或者從另一個角度講:alpakka-kafka就是一個用akka-streams實現kafka功能的scala開發工具。java
alpakka-kafka提供了kafka的核心功能:producer、consumer,分別負責把akka-streams裏的數據寫入kafka及從kafka中讀出數據並輸入到akka-streams裏。用akka-streams集成kafka的應用場景一般出如今業務集成方面:在一項業務A中產生一些業務操做指令寫入kafka,而後經過kafka把指令傳送給另外一項業務B,業務B從kafka中獲取操做指令並進行相應的業務操做。如:有兩個業務模塊:收貨管理和庫存管理,一方面收貨管理向kafka寫入收貨記錄。另外一頭庫存管理從kafka中讀取收貨記錄並更新相關庫存數量記錄。注意,這兩項業務是分別操做的。在alpakka中,實際的業務操做基本就是在akka-streams裏的數據處理(transform),實際上是典型的CQRS模式:讀寫兩方互不關聯,寫時無論受衆是誰,如何使用、讀者不關心誰是寫方。這裏的寫和讀兩方分別表明kafka裏的producer和consumer。apache
本篇咱們先介紹alpakka-kafka的producer功能及其使用方法。如前所述:alpakka是用akka-streams實現了kafka-producer功能。alpakka提供的producer也就是akka-streams的一種組件,能夠與其它的akka-streams組件組合造成更大的akka-streams個體。構建一個producer須要先完成幾個配件類構成:編程
一、producer-settings配置:alpakka-kafka在reference.conf裏的akka.kafka.producer配置段落提供了足夠支持基本運做的默認producer配置。用戶能夠經過typesafe config配置文件操做工具來靈活調整配置bootstrap
二、de/serializer序列化工具:alpakka-kafka提供了String類型的序列化/反序列化函數,能夠直接使用app
四、bootstrap-server:一個以逗號分隔的kafka-cluster節點ip清單文本ide
下面是一個具體的例子:函數
implicit val system = ActorSystem("kafka_sys") val bootstrapServers = "localhost:9092" val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers)
這裏使用ActorSystem只是爲了讀取.conf文件裏的配置,尚未使用任何akka-streams組件。akka.kafka.producer配置段落在alpakka-kafka的reference.conf裏提供了默認配置,不須要在application.conf裏從新定義。工具
alpakka-kafka提供了一個最基本的producer,非akka-streams組件,sendProducer。下面咱們示範一下sendProducer的使用和效果:開發工具
import akka.actor.ActorSystem import akka.kafka.scaladsl.{Consumer, SendProducer} import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} import akka.kafka._ import org.apache.kafka.common.serialization._ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} object SendProducerDemo extends App { implicit val system = ActorSystem("kafka_sys") implicit val executionContext = system.dispatcher val bootstrapServers = "localhost:9092" val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) val producer = SendProducer(producerSettings) val topic = "greatings" val lstfut: Seq[Future[RecordMetadata]] = (100 to 200).reverse .map(_.toString) .map(value => new ProducerRecord[String, String](topic, s"hello-$value")) .map(msg => producer.send(msg)) val futlst = Future.sequence(lstfut) Await.result(futlst, 2.seconds) scala.io.StdIn.readLine() producer.close() system.terminate() }
以上示範用sendProducer向kafka寫入100條hello消息。使用的是集合遍歷,沒有使用akka-streams的Source。爲了檢驗具體效果,咱們可使用kafka提供的一些手工指令,以下:flex
\w> ./kafka-topics --create --topic greatings --bootstrap-server localhost:9092 Created topic greatings. \w> ./kafka-console-consumer --topic greatings --bootstrap-server localhost:9092 hello-100 hello-101 hello-102 hello-103 hello-104 hello-105 hello-106 ...
既然producer表明寫入功能,那麼在akka-streams裏就是Sink或Flow組件的功能了。下面這個例子是producer Sink組件plainSink的示範:
import akka.Done import akka.actor.ActorSystem import akka.kafka.scaladsl._ import akka.kafka._ import akka.stream.scaladsl._ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization._ import scala.concurrent._ import scala.concurrent.duration._ object plain_sink extends App { implicit val system = ActorSystem("kafka_sys") val bootstrapServers = "localhost:9092" val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) implicit val executionContext = system.dispatcher val topic = "greatings" val done: Future[Done] = Source(1 to 100) .map(_.toString) .map(value => new ProducerRecord[String, String](topic, s"hello-$value")) .runWith(Producer.plainSink(producerSettings)) Await.ready(done,3.seconds) scala.io.StdIn.readLine() system.terminate() }
這是一個典型的akka-streams應用實例,其中Producer.plainSink就是一個akka-streams Sink組件。
以上兩個示範都涉及到構建一個ProducerRecord類型並將之寫入kafka。ProducerRecord是一個基本的kafka消息類型:
public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value, null); }
topic是String類型,key, value 是 Any 類型的。 alpakka-kafka在ProducerRecord之上又拓展了一個複雜點的消息類型ProducerMessage.Envelope類型:
sealed trait Envelope[K, V, +PassThrough] { def passThrough: PassThrough def withPassThrough[PassThrough2](value: PassThrough2): Envelope[K, V, PassThrough2] } final case class Message[K, V, +PassThrough]( record: ProducerRecord[K, V], passThrough: PassThrough ) extends Envelope[K, V, PassThrough] { override def withPassThrough[PassThrough2](value: PassThrough2): Message[K, V, PassThrough2] = copy(passThrough = value) }
ProducerMessage.Envelope增長了個PassThrough參數,用來與消息一道傳遞額外的元數據。alpakka-kafka streams組件使用這個消息類型做爲流元素,最終把它轉換成一或多條ProducerRecord寫入kafka。以下:
object EventMessages { //一對一條ProducerRecord def createMessage[KeyType,ValueType,PassThroughType]( topic: String, key: KeyType, value: ValueType, passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = { val single = ProducerMessage.single( new ProducerRecord[KeyType,ValueType](topic,key,value), passThrough ) single } //一對多條ProducerRecord def createMultiMessage[KeyType,ValueType,PassThroughType] ( topics: List[String], key: KeyType, value: ValueType, passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = { import scala.collection.immutable val msgs = topics.map { topic => new ProducerRecord(topic,key,value) }.toSeq val multi = ProducerMessage.multi( msgs, passThrough ) multi } //只傳遞經過型元數據 def createPassThroughMessage[KeyType,ValueType,PassThroughType]( topic: String, key: KeyType, value: ValueType, passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = { ProducerMessage.passThrough(passThrough) } }
flexiFlow是一個alpakka-kafka Flow組件,流入ProducerMessage.Evelope,流出Results類型:
def flexiFlow[K, V, PassThrough]( settings: ProducerSettings[K, V] ): Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] = { ... }
Results類型定義以下:
final case class Result[K, V, PassThrough] private ( metadata: RecordMetadata, message: Message[K, V, PassThrough] ) extends Results[K, V, PassThrough] { def offset: Long = metadata.offset() def passThrough: PassThrough = message.passThrough }
也就是說flexiFlow能夠返回寫入kafka後kafka返回的操做狀態數據。咱們再看看flexiFlow的使用案例:
import akka.kafka.ProducerMessage._ import akka.actor.ActorSystem import akka.kafka.scaladsl._ import akka.kafka.{ProducerMessage, ProducerSettings} import akka.stream.scaladsl.{Sink, Source} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringSerializer import scala.concurrent._ import scala.concurrent.duration._ object flexi_flow extends App { implicit val system = ActorSystem("kafka_sys") val bootstrapServers = "localhost:9092" val config = system.settings.config.getConfig("akka.kafka.producer") val producerSettings = ProducerSettings(config, new StringSerializer, new StringSerializer) .withBootstrapServers(bootstrapServers) // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val topic = "greatings" val done = Source(1 to 100) .map { number => val value = number.toString EventMessages.createMessage(topic,"key",value,number) } .via(Producer.flexiFlow(producerSettings)) .map { case ProducerMessage.Result(metadata, ProducerMessage.Message(record, passThrough)) => s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}" case ProducerMessage.MultiResult(parts, passThrough) => parts .map { case MultiResultPart(metadata, record) => s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}" } .mkString(", ") case ProducerMessage.PassThroughResult(passThrough) => s"passed through" } .runWith(Sink.foreach(println(_))) Await.ready(done,3.seconds) scala.io.StdIn.readLine() system.terminate() } object EventMessages { def createMessage[KeyType,ValueType,PassThroughType]( topic: String, key: KeyType, value: ValueType, passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = { val single = ProducerMessage.single( new ProducerRecord[KeyType,ValueType](topic,key,value), passThrough ) single } def createMultiMessage[KeyType,ValueType,PassThroughType] ( topics: List[String], key: KeyType, value: ValueType, passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = { import scala.collection.immutable val msgs = topics.map { topic => new ProducerRecord(topic,key,value) }.toSeq val multi = ProducerMessage.multi( msgs, passThrough ) multi } def createPassThroughMessage[KeyType,ValueType,PassThroughType]( topic: String, key: KeyType, value: ValueType, passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = { ProducerMessage.passThrough(passThrough) } }
producer除向kafka寫入與業務相關的業務事件或業務指令外還會向kafka寫入當前消息讀取的具體位置offset,因此alpakka-kafka的produce可分紅兩種類型:上面示範的plainSink, flexiFlow只向kafka寫業務數據。還有一類如commitableSink還包括了把消息讀取位置offset寫入commit的功能。以下:
val control = Consumer .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2)) .map { msg => ProducerMessage.single( new ProducerRecord(targetTopic, msg.record.key, msg.record.value), msg.committableOffset ) } .toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply) .run() control.drainAndShutdown()
如上所示,committableSource從kafka讀取業務消息及讀取位置committableOffsset,而後Producer.committableSink把業務消息和offset再寫入kafka。
下篇討論咱們再具體介紹consumer。