最簡單流處理引擎——Kafka Streams簡介

file
Kafka在0.10.0.0版本之前的定位是分佈式,分區化的,帶備份機制的日誌提交服務。而kafka在這以前也沒有提供數據處理的顧服務。你們的流處理計算主要是仍是依賴於Storm,Spark Streaming,Flink等流式處理框架。

file

Storm,Spark Streaming,Flink流處理的三駕馬車各有各的優點.java

Storm低延遲,而且在市場中佔有必定的地位,目前不少公司仍在使用。web

Spark Streaming藉助Spark的體系優點,活躍的社區,也佔有必定的份額。算法

而Flink在設計上更貼近流處理,而且有便捷的API,將來必定頗有發展。數據庫

file

可是他們都離不開Kafka的消息中轉,因此Kafka於0.10.0.0版本推出了本身的流處理框架,Kafka Streams。Kafka的定位也正式成爲Apache Kafka® is *a distributed streaming platform,*分佈式流處理平臺。apache

實時流式計算

近幾年來實時流式計算髮展迅速,主要緣由是實時數據的價值和對於數據處理架構體系的影響。實時流式計算包含了 無界數據 近實時 一致性 可重複結果 等等特徵。a type of data processing engine that is designed with infinite data sets in mind 一種考慮了無線數據集的數據處理引擎。bootstrap

一、無限數據:一種不斷增加的,基本上無限的數據集。這些一般被稱爲「流式數據」。無限的流式數據集能夠稱爲無界數據,相對而言有限的批量數據就是有界數據。api

二、無界數據處理:一種持續的數據處理模式,應用於上面的無界數據。批量處理數據(離線計算)也能夠重複運行來處理數據,可是會有性能的瓶頸。安全

三、低延遲,近實時的結果:相對於離線計算而言,離線計算並無考慮延遲的問題。bash

解決了兩個問題,流處理能夠提代批處理系統:服務器

一、正確性:有了這個,就和批量計算等價了。

Streaming須要能隨着時間的推移依然能計算必定時間窗口的數據。Spark Streaming經過微批的思想解決了這個問題,實時與離線系統進行了一致性的存儲,這一點在將來的實時計算系統中都應該知足。

二、推理時間的工具:這可讓咱們超越批量計算。

好的時間推理工具對於處理不一樣事件的無界無序數據相當重要。

而時間又分爲事件時間和處理時間。

還有不少實時流式計算的相關概念,這裏不作贅述。

Kafka Streams簡介

Kafka Streams被認爲是開發實時應用程序的最簡單方法。它是一個Kafka的客戶端API庫,編寫簡單的java和scala代碼就能夠實現流式處理。

優點:

  • 彈性,高度可擴展,容錯

  • 部署到容器,VM,裸機,雲

  • 一樣適用於小型,中型和大型用例

  • 與Kafka安全性徹底集成

  • 編寫標準Java和Scala應用程序

  • 在Mac,Linux,Windows上開發

  • Exactly-once 語義

用例:

紐約時報使用Apache Kafka和Kafka Streams將發佈的內容實時存儲和分發到各類應用程序和系統,以供讀者使用。

Pinterest大規模使用Apache Kafka和Kafka Streams來支持其廣告基礎架構的實時預測預算系統。使用Kafka Streams,預測比以往更準確。

做爲歐洲領先的在線時尚零售商,Zalando使用Kafka做爲ESB(企業服務總線),幫助咱們從單一服務架構轉變爲微服務架構。使用Kafka處理 事件流使咱們的技術團隊可以實現近乎實時的商業智能。

荷蘭合做銀行是荷蘭三大銀行之一。它的數字神經系統Business Event Bus由Apache Kafka提供支持。它被愈來愈多的財務流程和服務所使用,其中之一就是Rabo Alerts。此服務會在財務事件時實時向客戶發出警報,並使用Kafka Streams構建。

LINE使用Apache Kafka做爲咱們服務的中央數據庫,以便彼此通訊。天天產生數億億條消息,用於執行各類業務邏輯,威脅檢測,搜索索引和數據分析。LINE利用Kafka Streams可靠地轉換和過濾主題,使消費者能夠有效消費的子主題,同時因爲其複雜而簡單的代碼庫,保持易於維護性。

Topology

Kafka Streams經過一個或多個拓撲定義其計算邏輯,其中拓撲是經過流(邊緣)和流處理器(節點)構成的圖。

file

拓撲中有兩種特殊的處理器

  • 源處理器:源處理器是一種特殊類型的流處理器,沒有任何上游處理器。它經過使用來自這些主題的記錄並將它們轉發到其下游處理器,從一個或多個Kafka主題爲其拓撲生成輸入流。
  • 接收器處理器:接收器處理器是一種特殊類型的流處理器,沒有下游處理器。它將從其上游處理器接收的任何記錄發送到指定的Kafka主題。

在正常處理器節點中,還能夠把數據發給遠程系統。所以,處理後的結果能夠流式傳輸回Kafka或寫入外部系統。

Kafka在這當中提供了最經常使用的數據轉換操做,例如mapfilterjoinaggregations等,簡單易用。

固然還有一些關於時間,窗口,聚合,亂序處理等。將來再一一作詳細介紹,下面咱們進行簡單的入門案例開發。

快速入門

首先提供WordCount的java版和scala版本。

java8+:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
 
import java.util.Arrays;
import java.util.Properties;
 
public class WordCountApplication {
 
    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
 
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
 
}
複製代碼

scala:

import java.util.Properties
import java.util.concurrent.TimeUnit
 
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
object WordCountApplication extends App {
  import Serdes._
 
  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }
 
  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")
 
  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()
 
  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}
複製代碼

若是kafka已經啓動了,能夠跳過前兩步。

一、下載

下載 2.3.0版本並解壓縮它。請注意,有多個可下載的Scala版本,咱們選擇使用推薦的版本(2.12):

> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0
複製代碼

二、啓動

Kafka使用ZooKeeper,所以若是您尚未ZooKeeper服務器,則須要先啓動它。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
複製代碼

啓動Kafka服務器:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
複製代碼

三、建立topic 啓動生產者

咱們建立名爲streams-plaintext-input的輸入主題和名爲streams-wordcount-output的輸出主題:

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".


> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".
複製代碼

查看:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
 
Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0
複製代碼

四、啓動WordCount

如下命令啓動WordCount演示應用程序:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
複製代碼

演示應用程序將從輸入主題stream-plaintext-input讀取,對每一個讀取消息執行WordCount算法的計算,並連續將其當前結果寫入輸出主題streams-wordcount-output。所以,除了日誌條目以外不會有任何STDOUT輸出,由於結果會寫回Kafka。

如今咱們能夠在一個單獨的終端中啓動控制檯生成器,爲這個主題寫一些輸入數據:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
複製代碼

並經過在單獨的終端中使用控制檯使用者讀取其輸出主題來檢查WordCount演示應用程序的輸出:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
複製代碼

五、處理數據

咱們在生產者端輸入一些數據。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
複製代碼

輸出端:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
複製代碼

繼續輸入:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
複製代碼
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
複製代碼

咱們看到隨着數據實時輸入,wordcount的結果實時的輸出了。

六、中止程序

您如今能夠經過Ctrl-C按順序中止控制檯使用者,控制檯生產者,Wordcount應用程序,Kafka代理和ZooKeeper服務器。

什麼是Kafka? Kafka監控工具彙總 Kafka快速入門 Kafka核心之Consumer Kafka核心之Producer

替代Flume——Kafka Connect簡介

更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算

file
相關文章
相關標籤/搜索