流式處理的新貴 Kafka Stream - Kafka設計解析(七)

原創文章,首發自做者我的博客,轉載請務必將下面這段話置於文章開頭處。
本文轉發自技術世界原文連接 http://www.jasongj.com/kafka/kafka_stream/java

1. Kafka Stream背景

1.1 Kafka Stream是什麼

Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲於Kafka內的數據進行流式處理和分析的功能。node

Kafka Stream的特色以下:git

  • Kafka Stream提供了一個很是簡單而輕量的Library,它能夠很是方便地嵌入任意Java應用中,也能夠任意方式打包和部署
  • 除了Kafka外,無任何外部依賴
  • 充分利用Kafka分區機制實現水平擴展和順序性保證
  • 經過可容錯的state store實現高效的狀態操做(如windowed join和aggregation)
  • 支持正好一次處理語義
  • 提供記錄級的處理能力,從而實現毫秒級的低延遲
  • 支持基於事件時間的窗口操做,而且可處理晚到的數據(late arrival of records)
  • 同時提供底層的處理原語Processor(相似於Storm的spout和bolt),以及高層抽象的DSL(相似於Spark的map/group/reduce)

1.2 什麼是流式計算

通常流式計算會與批量計算相比較。在流式計算模型中,輸入是持續的,能夠認爲在時間上是無界的,也就意味着,永遠拿不到全量數據去作計算。同時,計算結果是持續輸出的,也即計算結果在時間上也是無界的。流式計算通常對實時性要求較高,同時通常是先定義目標計算,而後數據到來以後將計算邏輯應用於數據。同時爲了提升計算效率,每每儘量採用增量計算代替全量計算。
github


Stream Processing

批量處理模型中,通常先有全量數據集,而後定義計算邏輯,並將計算應用於全量數據。特色是全量計算,而且計算結果一次性全量輸出。
數據庫


Batch Processing

1.3 爲何要有Kafka Stream

當前已經有很是多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用普遍,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基於Apache Spark,能夠很是方便與圖計算,SQL處理等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。apache

既然Apache Spark與Apache Storm擁用如此多的優點,那爲什麼還須要Kafka Stream呢?筆者認爲主要有以下緣由。網絡

第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基於Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,而且使用受限。而Kafka Stream做爲流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。
多線程


Library vs. Framework

第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,可是這些框架的部署仍然相對複雜。而Kafka Stream做爲類庫,能夠很是方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。更爲重要的是,Kafka Stream充分利用了Kafka的分區機制Consumer的Rebalance機制,使得Kafka Stream能夠很是方便的水平擴展,而且各個實例可使用不一樣的部署方式。具體來講,每一個運行Kafka Stream的應用程序實例都包含了Kafka Consumer實例,多個同一應用的實例之間並行處理數據集。而不一樣實例之間的部署方式並不要求一致,好比部分實例能夠運行在Web容器中,部分實例可運行在Docker或Kubernetes中。架構

第三,就流式處理系統而言,基本都支持Kafka做爲數據源。例如Storm具備專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標準數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本很是低。框架

第四,使用Storm或Spark Streaming時,須要爲框架自己的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即便對於應用實例而言,框架自己也會佔用部分資源,如Spark Streaming須要爲shuffle和storage預留內存。

第五,因爲Kafka自己提供數據持久化,所以Kafka Stream提供滾動部署和滾動升級以及從新計算的能力。

第六,因爲Kafka Consumer Rebalance機制,Kafka Stream能夠在線動態調整並行度。

2 Kafka Stream架構

2.1 Kafka Stream總體架構

Kafka Stream的總體架構圖以下所示。


Kafka Stream Architecture

目前(Kafka 0.11.0.0)Kafka Stream的數據源只能如上圖所示是Kafka。可是處理結果並不必定要如上圖所示輸出到Kafka。實際上KStream和Ktable的實例化都須要指定Topic。

KStream<String, String> stream = builder.stream("words-stream");

KTable<String, String> table = builder.table("words-table", "words-store");

另外,上圖中的Consumer和Producer並不須要開發者在應用中顯示實例化,而是由Kafka Stream根據參數隱式實例化和管理,從而下降了使用門檻。開發者只須要專一於開發核心業務邏輯,也即上圖中Task內的部分。

2.2 Processor Topology

基於Kafka Stream的流式應用的業務邏輯所有經過一個被稱爲Processor Topology的地方執行。它與Storm的Topology和Spark的DAG相似,都定義了數據在各個處理單元(在Kafka Stream中被稱做Processor)間的流動方式,或者說定義了數據的處理邏輯。

下面是一個Processor的示例,它實現了Word Count功能,而且每秒輸出一次結果。

public class WordCountProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, Integer> kvStore;

  @SuppressWarnings("unchecked")
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
  }

  @Override
  public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
      Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
      int count = counts.map(wordcount -> wordcount + 1).orElse(1);
      kvStore.put(word, count);
    });
  }

  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
      context.forward(entry.key, entry.value);
      this.kvStore.delete(entry.key);
    });
    context.commit();
  }

  @Override
  public void close() {
    this.kvStore.close();
  }

}

從上述代碼中可見

  • process定義了對每條記錄的處理邏輯,也印證了Kafka可具備記錄級的數據處理能力。
  • context.scheduler定義了punctuate被執行的週期,從而提供了實現窗口操做的能力。
  • context.getStateStore提供的狀態存儲爲有狀態計算(如窗口,聚合)提供了可能。

2.3 Kafka Stream並行模型

Kafka Stream的並行模型中,最小粒度爲Task,而每一個Task包含一個特定子Topology的全部Processor。所以每一個Task所執行的代碼徹底同樣,惟一的不一樣在於所處理的數據集互補。這一點跟Storm的Topology徹底不同。Storm的Topology的每個Task只包含一個Spout或Bolt的實例。所以Storm的一個Topology內的不一樣Task之間須要經過網絡通訊傳遞數據,而Kafka Stream的Task包含了完整的子Topology,因此Task之間不須要傳遞數據,也就不須要網絡通訊。這一點下降了系統複雜度,也提升了處理效率。

若是某個Stream的輸入Topic有多個(好比2個Topic,1個Partition數爲4,另外一個Partition數爲3),則總的Task數等於Partition數最多的那個Topic的Partition數(max(4,3)=4)。這是由於Kafka Stream使用了Consumer的Rebalance機制,每一個Partition對應一個Task。

下圖展現了在一個進程(Instance)中以2個Topic(Partition數均爲4)爲數據源的Kafka Stream應用的並行模型。從圖中能夠看到,因爲Kafka Stream應用的默認線程數爲1,因此4個Task所有在一個線程中運行。


1 thread

爲了充分利用多線程的優點,能夠設置Kafka Stream的線程數。下圖展現了線程數爲2時的並行模型。


2 threads

前文有提到,Kafka Stream可被嵌入任意Java應用(理論上基於JVM的應用均可以)中,下圖展現了在同一臺機器的不一樣進程中同時啓動同一Kafka Stream應用時的並行模型。注意,這裏要保證兩個進程的StreamsConfig.APPLICATION_ID_CONFIG徹底同樣。由於Kafka Stream將APPLICATION_ID_CONFIG做爲隱式啓動的Consumer的Group ID。只有保證APPLICATION_ID_CONFIG相同,才能保證這兩個進程的Consumer屬於同一個Group,從而能夠經過Consumer Rebalance機制拿到互補的數據集。


2 instances

既然實現了多進程部署,能夠以一樣的方式實現多機器部署。該部署方式也要求全部進程的APPLICATION_ID_CONFIG徹底同樣。從圖上也能夠看到,每一個實例中的線程數並不要求同樣。可是不管如何部署,Task總數總會保證一致。


2 servers

注意:Kafka Stream的並行模型,很是依賴於《Kafka設計解析(一)- Kafka背景及架構介紹》一文中介紹的Kafka分區機制和《Kafka設計解析(四)- Kafka Consumer設計解析》中介紹的Consumer的Rebalance機制。強烈建議不太熟悉這兩種機制的朋友,先行閱讀這兩篇文章。

這裏對比一下Kafka Stream的Processor Topology與Storm的Topology。

  • Storm的Topology由Spout和Bolt組成,Spout提供數據源,而Bolt提供計算和數據導出。Kafka Stream的Processor Topology徹底由Processor組成,由於它的數據固定由Kafka的Topic提供。
  • Storm的不一樣Bolt運行在不一樣的Executor中,極可能位於不一樣的機器,須要經過網絡通訊傳輸數據。而Kafka Stream的Processor Topology的不一樣Processor徹底運行於同一個Task中,也就徹底處於同一個線程,無需網絡通訊。
  • Storm的Topology能夠同時包含Shuffle部分和非Shuffle部分,而且每每一個Topology就是一個完整的應用。而Kafka Stream的一個物理Topology只包含非Shuffle部分,而Shuffle部分須要經過through操做顯示完成,該操做將一個大的Topology分紅了2個子Topology。
  • Storm的Topology內,不一樣Bolt/Spout的並行度能夠不同,而Kafka Stream的子Topology內,全部Processor的並行度徹底同樣。
  • Storm的一個Task只包含一個Spout或者Bolt的實例,而Kafka Stream的一個Task包含了一個子Topology的全部Processor。

2.4 KTable vs. KStream

KTable和KStream是Kafka Stream中很是重要的兩個概念,它們是Kafka實現各類語義的基礎。所以這裏有必要分析下兩者的區別。

KStream是一個數據流,能夠認爲全部記錄都經過Insert only的方式插入進這個數據流裏。而KTable表明一個完整的數據集,能夠理解爲數據庫中的表。因爲每條記錄都是Key-Value對,這裏能夠將Key理解爲數據庫中的Primary Key,而Value能夠理解爲一行記錄。能夠認爲KTable中的數據都是經過Update only的方式進入的。也就意味着,若是KTable對應的Topic中新進入的數據的Key已經存在,那麼從KTable只會取出同一Key對應的最後一條數據,至關於新的數據更新了舊的數據。

如下圖爲例,假設有一個KStream和KTable,基於同一個Topic建立,而且該Topic中包含以下圖所示5條數據。此時遍歷KStream將獲得與Topic內數據徹底同樣的全部5條數據,且順序不變。而此時遍歷KTable時,由於這5條記錄中有3個不一樣的Key,因此將獲得3條記錄,每一個Key對應最新的值,而且這三條數據之間的順序與原來在Topic中的順序保持一致。這一點與Kafka的日誌compact相同。


KStream vs. KTable

此時若是對該KStream和KTable分別基於key作Group,對Value進行Sum,獲得的結果將會不一樣。對KStream的計算結果是<Jack,4><Lily,7><Mike,4>。而對Ktable的計算結果是<Mike,4><Jack,3><Lily,5>

2.5 State store

流式處理中,部分操做是無狀態的,例如過濾操做(Kafka Stream DSL中用filer方法實現)。而部分操做是有狀態的,須要記錄中間狀態,如Window操做和聚合計算。State store被用來存儲中間狀態。它能夠是一個持久化的Key-Value存儲,也能夠是內存中的HashMap,或者是數據庫。Kafka提供了基於Topic的狀態存儲。

Topic中存儲的數據記錄自己是Key-Value形式的,同時Kafka的log compaction機制可對歷史數據作compact操做,保留每一個Key對應的最後一個Value,從而在保證Key不丟失的前提下,減小總數據量,從而提升查詢效率。

構造KTable時,須要指定其state store name。默認狀況下,該名字也即用於存儲該KTable的狀態的Topic的名字,遍歷KTable的過程,實際就是遍歷它對應的state store,或者說遍歷Topic的全部key,並取每一個Key最新值的過程。爲了使得該過程更加高效,默認狀況下會對該Topic進行compact操做。

另外,除了KTable,全部狀態計算,都須要指定state store name,從而記錄中間狀態。

3 Kafka Stream如何解決流式系統中關鍵問題

3.1 時間

在流式數據處理中,時間是數據的一個很是重要的屬性。從Kafka 0.10開始,每條記錄除了Key和Value外,還增長了timestamp屬性。目前Kafka Stream支持三種時間

  • 事件發生時間。事件發生的時間,包含在數據記錄中。發生時間由Producer在構造ProducerRecord時指定。而且須要Broker或者Topic將message.timestamp.type設置爲CreateTime(默認值)才能生效。
  • 消息接收時間,也即消息存入Broker的時間。當Broker或Topic將message.timestamp.type設置爲LogAppendTime時生效。此時Broker會在接收到消息後,存入磁盤前,將其timestamp屬性值設置爲當前機器時間。通常消息接收時間比較接近於事件發生時間,部分場景下可代替事件發生時間。
  • 消息處理時間,也即Kafka Stream處理消息時的時間。

注:Kafka Stream容許經過實現org.apache.kafka.streams.processor.TimestampExtractor接口自定義記錄時間。

3.2 窗口

前文提到,流式數據是在時間上無界的數據。而聚合操做只能做用在特定的數據集,也即有界的數據集上。所以須要經過某種方式從無界的數據集上按特定的語義選取出有界的數據。窗口是一種很是經常使用的設定計算邊界的方式。不一樣的流式處理系統支持的窗口相似,但不盡相同。

Kafka Stream支持的窗口以下。

  1. Hopping Time Window 該窗口定義以下圖所示。它有兩個屬性,一個是Window size,一個是Advance interval。Window size指定了窗口的大小,也即每次計算的數據集的大小。而Advance interval定義輸出的時間間隔。一個典型的應用場景是,每隔5秒鐘輸出一次過去1個小時內網站的PV或者UV。


    Hopping Time Window

  2. Tumbling Time Window該窗口定義以下圖所示。能夠認爲它是Hopping Time Window的一種特例,也即Window size和Advance interval相等。它的特色是各個Window之間徹底不相交。


    Tumbling Time Window

  3. Sliding Window該窗口只用於2個KStream進行Join計算時。該窗口的大小定義了Join兩側KStream的數據記錄被認爲在同一個窗口的最大時間差。假設該窗口的大小爲5秒,則參與Join的2個KStream中,記錄時間差小於5的記錄被認爲在同一個窗口中,能夠進行Join計算。

  4. Session Window該窗口用於對Key作Group後的聚合操做中。它須要對Key作分組,而後對組內的數據根據業務需求定義一個窗口的起始點和結束點。一個典型的案例是,但願經過Session Window計算某個用戶訪問網站的時間。對於一個特定的用戶(用Key表示)而言,當發生登陸操做時,該用戶(Key)的窗口即開始,當發生退出操做或者超時時,該用戶(Key)的窗口即結束。窗口結束時,可計算該用戶的訪問時間或者點擊次數等。

3.3 Join

Kafka Stream因爲包含KStream和Ktable兩種數據集,所以提供以下Join計算

  • KTable Join KTable 結果仍爲KTable。任意一邊有更新,結果KTable都會更新。
  • KStream Join KStream 結果爲KStream。必須帶窗口操做,不然會形成Join操做一直不結束。
  • KStream Join KTable / GlobalKTable 結果爲KStream。只有當KStream中有新數據時,纔會觸發Join計算並輸出結果。KStream無新數據時,KTable的更新並不會觸發Join計算,也不會輸出數據。而且該更新只對下次Join生效。一個典型的使用場景是,KStream中的訂單信息與KTable中的用戶信息作關聯計算。

對於Join操做,若是要獲得正確的計算結果,須要保證參與Join的KTable或KStream中Key相同的數據被分配到同一個Task。具體方法是

  • 參與Join的KTable或KStream的Key類型相同(實際上,業務含意也應該相同)
  • 參與Join的KTable或KStream對應的Topic的Partition數相同
  • Partitioner策略的最終結果等效(實現不須要徹底同樣,只要效果同樣便可),也即Key相同的狀況下,被分配到ID相同的Partition內

若是上述條件不知足,可經過調用以下方法使得它知足上述條件。

KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

3.4 聚合與亂序處理

聚合操做可應用於KStream和KTable。當聚合發生在KStream上時必須指定窗口,從而限定計算的目標數據集。

須要說明的是,聚合操做的結果確定是KTable。由於KTable是可更新的,能夠在晚到的數據到來時(也即發生數據亂序時)更新結果KTable。

這裏舉例說明。假設對KStream以5秒爲窗口大小,進行Tumbling Time Window上的Count操做。而且KStream前後出現時間爲1秒, 3秒, 5秒的數據,此時5秒的窗口已達上限,Kafka Stream關閉該窗口,觸發Count操做並將結果3輸出到KTable中(假設該結果表示爲<1-5,3>)。若1秒後,又收到了時間爲2秒的記錄,因爲1-5秒的窗口已關閉,若直接拋棄該數據,則可認爲以前的結果<1-5,3>不許確。而若是直接將完整的結果<1-5,4>輸出到KStream中,則KStream中將會包含該窗口的2條記錄,<1-5,3>, <1-5,4>,也會存在骯數據。所以Kafka Stream選擇將聚合結果存於KTable中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。用戶可獲得完整的正確的結果。

這種方式保證了數據準確性,同時也提升了容錯性。

但須要說明的是,Kafka Stream並不會對全部晚到的數據都從新計算並更新結果集,而是讓用戶設置一個retention period,將每一個窗口的結果集在內存中保留必定時間,該窗口內的數據晚到時,直接合並計算,並更新結果KTable。超過retention period後,該窗口結果將從內存中刪除,而且晚到的數據即便落入窗口,也會被直接丟棄。

3.5 容錯

Kafka Stream從以下幾個方面進行容錯

  • 高可用的Partition保證無數據丟失。每一個Task計算一個Partition,而Kafka數據複製機制保證了Partition內數據的高可用性,故無數據丟失風險。同時因爲數據是持久化的,即便任務失敗,依然能夠從新計算。
  • 狀態存儲實現快速故障恢復和從故障點繼續處理。對於Join和聚合及窗口等有狀態計算,狀態存儲可保存中間狀態。即便發生Failover或Consumer Rebalance,仍然能夠經過狀態存儲恢復中間狀態,從而能夠繼續從Failover或Consumer Rebalance前的點繼續計算。
  • KTable與retention period提供了對亂序數據的處理能力。

4 Kafka Stream應用示例

下面結合一個案例來說解如何開發Kafka Stream應用。本例完整代碼可從做者Github獲取。

訂單KStream(名爲orderStream),底層Topic的Partition數爲3,Key爲用戶名,Value包含用戶名,商品名,訂單時間,數量。用戶KTable(名爲userTable),底層Topic的Partition數爲3,Key爲用戶名,Value包含性別,地址和年齡。商品KTable(名爲itemTable),底層Topic的Partition數爲6,Key爲商品名,價格,種類和產地。如今但願計算每小時購買產地與本身所在地相同的用戶總數。

首先因爲但願使用訂單時間,而它包含在orderStream的Value中,須要經過提供一個實現TimestampExtractor接口的類從orderStream對應的Topic中抽取出訂單時間。

public class OrderTimestampExtractor implements TimestampExtractor {

  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    if(record instanceof Order) {
      return ((Order)record).getTS();
    } else {
      return 0;
    }
  }
}

接着經過將orderStream與userTable進行Join,來獲取訂單用戶所在地。因爲兩者對應的Topic的Partition數相同,且Key都爲用戶名,再假設Producer往這兩個Topic寫數據時所用的Partitioner實現相同,則此時上文所述Join條件知足,可直接進行Join。

orderUserStream = orderStream
    .leftJoin(userTable, 
         // 該lamda表達式定義瞭如何從orderStream與userTable生成結果集的Value
        (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
         // 結果集Key序列化方式
        Serdes.String(),
         // 結果集Value序列化方式
         SerdesFactory.serdFrom(Order.class))
    .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

從上述代碼中,能夠看到,Join時須要指定如何從參與Join雙方的記錄生成結果記錄的Value。Key不須要指定,由於結果記錄的Key與Join Key相同,故無須指定。Join結果存於名爲orderUserStream的KStream中。

接下來須要將orderUserStream與itemTable進行Join,從而獲取商品產地。此時orderUserStream的Key仍爲用戶名,而itemTable對應的Topic的Key爲產品名,而且兩者的Partition數不同,所以沒法直接Join。此時須要經過through方法,對其中一方或雙方進行從新分區,使得兩者知足Join條件。這一過程至關於Spark的Shuffle過程和Storm的FieldGrouping。

orderUserStrea
    .through(
        // Key的序列化方式
        Serdes.String(),
        // Value的序列化方式 
        SerdesFactory.serdFrom(OrderUser.class), 
        // 從新按照商品名進行分區,具體取商品名的哈希值,而後對分區數取模
        (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
        "orderuser-repartition-by-item")
    .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))

從上述代碼可見,through時須要指定Key的序列化器,Value的序列化器,以及分區方式和結果集所在的Topic。這裏要注意,該Topic(orderuser-repartition-by-item)的Partition數必須與itemTable對應Topic的Partition數相同,而且through使用的分區方法必須與iteamTable對應Topic的分區方式同樣。通過這種through操做,orderUserStream與itemTable知足了Join條件,可直接進行Join。

5 總結

  • Kafka Stream的並行模型徹底基於Kafka的分區機制和Rebalance機制,實現了在線動態調整並行度
  • 同一Task包含了一個子Topology的全部Processor,使得全部處理邏輯都在同一線程內完成,避免了沒必要的網絡通訊開銷,從而提升了效率。
  • through方法提供了相似Spark的Shuffle機制,爲使用不一樣分區策略的數據提供了Join的可能
  • log compact提升了基於Kafka的state store的加載效率
  • state store爲狀態計算提供了可能
  • 基於offset的計算進度管理以及基於state store的中間狀態管理爲發生Consumer rebalance或Failover時從斷點處繼續處理提供了可能,併爲系統容錯性提供了保障
  • KTable的引入,使得聚合計算擁用了處理亂序問題的能力

6 Kafka系列文章

相關文章
相關標籤/搜索