流式計算新貴Kafka Stream設計詳解--轉

原文地址:https://mp.weixin.qq.com/s?__biz=MzA5NzkxMzg1Nw==&mid=2653162822&idx=1&sn=8c46114360b98b621b166d41d8e01d74&chksm=8b493028bc3eb93e8376d85c7d1f9b2a699888b7f0f52e4556bb8543ebebd5e102e91ea23355#rdnode

本文介紹了 Kafka Stream 的背景,如 Kafka Stream 是什麼,什麼是流式計算,以及爲何要有 Kafka Stream。接着介紹了 Kafka Stream 的總體架構、並行模型、狀態存儲以及主要的兩種數據集 KStream 和 KTable。而後分析了 Kafka Stream 如何解決流式系統中的關鍵問題,如時間定義、窗口操做、Join 操做、聚合操做,以及如何處理亂序和提供容錯能力。最後結合示例講解了如何使用 Kafka Stream。Kafka Stream 背景  Kafka Stream 是什麼  數據庫

Kafka Stream 是 Apache Kafka 從 0.10 版本引入的一個新 Feature。它提供了對存儲於 Kafka 內的數據進行流式處理和分析的功能。apache

Kafka Stream 的特色以下:網絡

  • Kafka Stream 提供了一個很是簡單而輕量的 Library,它能夠很是方便地嵌入任意 Java 應用中,也能夠任意方式打包和部署多線程

  • 除了 Kafka 外,無任何外部依賴架構

  • 充分利用 Kafka 分區機制實現水平擴展和順序性保證框架

  • 經過可容錯的 state store 實現高效的狀態操做(如 windowed join 和 aggregation)ide

  • 支持正好一次處理語義oop

  • 提供記錄級的處理能力,從而實現毫秒級的低延遲網站

  • 支持基於事件時間的窗口操做,而且可處理晚到的數據(late arrival of records)

  • 同時提供底層的處理原語 Processor(相似於 Storm 的 spout 和 bolt),以及高層抽象的 DSL(相似於 Spark 的 map/group/reduce)

什麼是流式計算  

通常流式計算會與批量計算相比較。在流式計算模型中,輸入是持續的,能夠認爲在時間上是無界的,也就意味着,永遠拿不到全量數據去作計算。同時,計算結果是持續輸出的,也即計算結果在時間上也是無界的。流式計算通常對實時性要求較高,同時通常是先定義目標計算,而後數據到來以後將計算邏輯應用於數據。同時爲了提升計算效率,每每儘量採用增量計算代替全量計算。

批量處理模型中,通常先有全量數據集,而後定義計算邏輯,並將計算應用於全量數據。特色是全量計算,而且計算結果一次性全量輸出。

 

爲何要有 Kafka Stream  

當前已經有很是多的流式處理系統,最知名且應用最多的開源流式處理系統有 Spark Streaming 和 Apache Storm。Apache Storm 發展多年,應用普遍,提供記錄級別的處理能力,當前也支持 SQL on Stream。而 Spark Streaming 基於 Apache Spark,能夠很是方便與圖計算,SQL 處理等集成,功能強大,對於熟悉其它 Spark 應用開發的用戶而言使用門檻低。另外,目前主流的 Hadoop 發行版,如 MapR,Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache Spark,使得部署更容易。

既然 Apache Spark 與 Apache Storm 擁用如此多的優點,那爲什麼還須要 Kafka Stream 呢?筆者認爲主要有以下緣由。

第一,Spark 和 Storm 都是流式處理框架,而 Kafka Stream 提供的是一個基於 Kafka 的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,而且使用受限。而 Kafka Stream 做爲流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。

第二,雖然 Cloudera 與 Hortonworks 方便了 Storm 和 Spark 的部署,可是這些框架的部署仍然相對複雜。而 Kafka Stream 做爲類庫,能夠很是方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。更爲重要的是,Kafka Stream 充分利用了 Kafka 的分區機制和 Consumer 的 Rebalance 機制,使得 Kafka Stream 能夠很是方便的水平擴展,而且各個實例可使用不一樣的部署方式。

具體來講,每一個運行 Kafka Stream 的應用程序實例都包含了 Kafka Consumer 實例,多個同一應用的實例之間並行處理數據集。而不一樣實例之間的部署方式並不要求一致,好比部分實例能夠運行在 Web 容器中,部分實例可運行在 Docker 或 Kubernetes 中。

第三,就流式處理系統而言,基本都支持 Kafka 做爲數據源。例如 Storm 具備專門的 kafka-spout,而 Spark 也提供專門的 spark-streaming-kafka 模塊。事實上,Kafka 基本上是主流的流式處理系統的標準數據源。換言之,大部分流式系統中都已部署了 Kafka,此時使用 Kafka Stream 的成本很是低。

第四,使用 Storm 或 Spark Streaming 時,須要爲框架自己的進程預留資源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即便對於應用實例而言,框架自己也會佔用部分資源,如 Spark Streaming 須要爲 shuffle 和 storage 預留內存。

第五,因爲 Kafka 自己提供數據持久化,所以 Kafka Stream 提供滾動部署和滾動升級以及從新計算的能力。

第六,因爲 Kafka Consumer Rebalance 機制,Kafka Stream 能夠在線動態調整並行度。

Kafka Stream 架構  

 

Kafka Stream 總體架構  

Kafka Stream 的總體架構圖以下。

 

目前(Kafka 0.11.0.0)Kafka Stream 的數據源只能如上圖所示是 Kafka。可是處理結果並不必定要如上圖所示輸出到 Kafka。實際上 KStream 和 Ktable 的實例化都須要指定 Topic。

KStream<String, String> stream = builder.stream("words-stream");

KTable<String, String> table = builder.table("words-table", "words-store");

另外,上圖中的 Consumer 和 Producer 並不須要開發者在應用中顯示實例化,而是由 Kafka Stream 根據參數隱式實例化和管理,從而下降了使用門檻。開發者只須要專一於開發核心業務邏輯,也即上圖中 Task 內的部分。

Processor Topology  

基於 Kafka Stream 的流式應用的業務邏輯所有經過一個被稱爲 Processor Topology 的地方執行。它與 Storm 的 Topology 和 Spark 的 DAG 相似,都定義了數據在各個處理單元(在 Kafka Stream 中被稱做 Processor)間的流動方式,或者說定義了數據的處理邏輯。

下面是一個 Processor 的示例,它實現了 Word Count 功能,而且每秒輸出一次結果。

public class WordCountProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, Integer> kvStore;

  @SuppressWarnings("unchecked")
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
  }

  @Override
  public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
      Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
      int count = counts.map(wordcount -> wordcount + 1).orElse(1);
      kvStore.put(word, count);
    });
  }

  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
      context.forward(entry.key, entry.value);
      this.kvStore.delete(entry.key);
    });
    context.commit();
  }

  @Override
  public void close() {
    this.kvStore.close();
  }

}

從上述代碼中可見

  • process 定義了對每條記錄的處理邏輯,也印證了 Kafka 可具備記錄級的數據處理能力。

  • context.scheduler 定義了 punctuate 被執行的週期,從而提供了實現窗口操做的能力。

  • context.getStateStore 提供的狀態存儲爲有狀態計算(如窗口,聚合)提供了可能。

Kafka Stream 並行模型  

Kafka Stream 的並行模型中,最小粒度爲 Task,而每一個 Task 包含一個特定子 Topology 的全部 Processor。所以每一個 Task 所執行的代碼徹底同樣,惟一的不一樣在於所處理的數據集互補。這一點跟 Storm 的 Topology 徹底不同。Storm 的 Topology 的每個 Task 只包含一個 Spout 或 Bolt 的實例。所以 Storm 的一個 Topology 內的不一樣 Task 之間須要經過網絡通訊傳遞數據,而 Kafka Stream 的 Task 包含了完整的子 Topology,因此 Task 之間不須要傳遞數據,也就不須要網絡通訊。這一點下降了系統複雜度,也提升了處理效率。

若是某個 Stream 的輸入 Topic 有多個 (好比 2 個 Topic,1 個 Partition 數爲 4,另外一個 Partition 數爲 3),則總的 Task 數等於 Partition 數最多的那個 Topic 的 Partition 數(max(4,3)=4)。這是由於 Kafka Stream 使用了 Consumer 的 Rebalance 機制,每一個 Partition 對應一個 Task。

下圖展現了在一個進程(Instance)中以 2 個 Topic(Partition 數均爲 4)爲數據源的 Kafka Stream 應用的並行模型。從圖中能夠看到,因爲 Kafka Stream 應用的默認線程數爲 1,因此 4 個 Task 所有在一個線程中運行。

爲了充分利用多線程的優點,能夠設置 Kafka Stream 的線程數。下圖展現了線程數爲 2 時的並行模型。

前文有提到,Kafka Stream 可被嵌入任意 Java 應用(理論上基於 JVM 的應用均可以)中,下圖展現了在同一臺機器的不一樣進程中同時啓動同一 Kafka Stream 應用時的並行模型。注意,這裏要保證兩個進程的 StreamsConfig.APPLICATION_ID_CONFIG 徹底同樣。由於 Kafka Stream 將 APPLICATION_ID_CONFI 做爲隱式啓動的 Consumer 的 Group ID。只有保證 APPLICATION_ID_CONFI 相同,才能保證這兩個進程的 Consumer 屬於同一個 Group,從而能夠經過 Consumer Rebalance 機制拿到互補的數據集。

既然實現了多進程部署,能夠以一樣的方式實現多機器部署。該部署方式也要求全部進程的 APPLICATION_ID_CONFIG 徹底同樣。從圖上也能夠看到,每一個實例中的線程數並不要求同樣。可是不管如何部署,Task 總數總會保證一致。

注意:Kafka Stream 的並行模型,很是依賴於《Kafka 設計解析(一)- Kafka 背景及架構介紹》一文中介紹的 Kafka 分區機制和《Kafka 設計解析(四)- Kafka Consumer 設計解析》中介紹的 Consumer 的 Rebalance 機制。強烈建議不太熟悉這兩種機制的朋友,先行閱讀這兩篇文章。

這裏對比一下 Kafka Stream 的 Processor Topology 與 Storm 的 Topology。

  • Storm 的 Topology 由 Spout 和 Bolt 組成,Spout 提供數據源,而 Bolt 提供計算和數據導出。Kafka Stream 的 Processor Topology 徹底由 Processor 組成,由於它的數據固定由 Kafka 的 Topic 提供。

  • Storm 的不一樣 Bolt 運行在不一樣的 Executor 中,極可能位於不一樣的機器,須要經過網絡通訊傳輸數據。而 Kafka Stream 的 Processor Topology 的不一樣 Processor 徹底運行於同一個 Task 中,也就徹底處於同一個線程,無需網絡通訊。

  • Storm 的 Topology 能夠同時包含 Shuffle 部分和非 Shuffle 部分,而且每每一個 Topology 就是一個完整的應用。而 Kafka Stream 的一個物理 Topology 只包含非 Shuffle 部分,而 Shuffle 部分須要經過 through 操做顯示完成,該操做將一個大的 Topology 分紅了 2 個子 Topology。

  • Storm 的 Topology 內,不一樣 Bolt/Spout 的並行度能夠不同,而 Kafka Stream 的子 Topology 內,全部 Processor 的並行度徹底同樣。

  • Storm 的一個 Task 只包含一個 Spout 或者 Bolt 的實例,而 Kafka Stream 的一個 Task 包含了一個子 Topology 的全部 Processor。

KTable vs. KStream  

KTable 和 KStream 是 Kafka Stream 中很是重要的兩個概念,它們是 Kafka 實現各類語義的基礎。所以這裏有必要分析下兩者的區別。

KStream 是一個數據流,能夠認爲全部記錄都經過 Insert only 的方式插入進這個數據流裏。而 KTable 表明一個完整的數據集,能夠理解爲數據庫中的表。

因爲每條記錄都是 Key-Value 對,這裏能夠將 Key 理解爲數據庫中的 Primary Key,而 Value 能夠理解爲一行記錄。能夠認爲 KTable 中的數據都是經過 Update only 的方式進入的。也就意味着,若是 KTable 對應的 Topic 中新進入的數據的 Key 已經存在,那麼從 KTable 只會取出同一 Key 對應的最後一條數據,至關於新的數據更新了舊的數據。

如下圖爲例,假設有一個 KStream 和 KTable,基於同一個 Topic 建立,而且該 Topic 中包含以下圖所示 5 條數據。此時遍歷 KStream 將獲得與 Topic 內數據徹底同樣的全部 5 條數據,且順序不變。而此時遍歷 KTable 時,由於這 5 條記錄中有 3 個不一樣的 Key,因此將獲得 3 條記錄,每一個 Key 對應最新的值,而且這三條數據之間的順序與原來在 Topic 中的順序保持一致。這一點與 Kafka 的日誌 compact 相同。

此時若是對該 KStream 和 KTable 分別基於 key 作 Group,對 Value 進行 Sum,獲得的結果將會不一樣。對 KStream 的計算結果是<jack,4><Jack,4>,<Lily,7>,<Mike,4><lily,7><mike,4>。而對 Ktable 的計算結果是<mike,4><Mike,4>,<Jack,3>,<Lily,5><jack,3><lily,5>。

State store  

流式處理中,部分操做是無狀態的,例如過濾操做(Kafka Stream DSL 中用 filer 方法實現)。而部分操做是有狀態的,須要記錄中間狀態,如 Window 操做和聚合計算。State store 被用來存儲中間狀態。它能夠是一個持久化的 Key-Value 存儲,也能夠是內存中的 HashMap,或者是數據庫。Kafka 提供了基於 Topic 的狀態存儲。

Topic 中存儲的數據記錄自己是 Key-Value 形式的,同時 Kafka 的 log compaction 機制可對歷史數據作 compact 操做,保留每一個 Key 對應的最後一個 Value,從而在保證 Key 不丟失的前提下,減小總數據量,從而提升查詢效率。

構造 KTable 時,須要指定其 state store name。默認狀況下,該名字也即用於存儲該 KTable 的狀態的 Topic 的名字,遍歷 KTable 的過程,實際就是遍歷它對應的 state store,或者說遍歷 Topic 的全部 key,並取每一個 Key 最新值的過程。爲了使得該過程更加高效,默認狀況下會對該 Topic 進行 compact 操做。

另外,除了 KTable,全部狀態計算,都須要指定 state store name,從而記錄中間狀態。

Kafka Stream 如何解決流式系統中關鍵問題  時間  

在流式數據處理中,時間是數據的一個很是重要的屬性。從 Kafka 0.10 開始,每條記錄除了 Key 和 Value 外,還增長了 timestamp 屬性。目前 Kafka Stream 支持三種時間

  • 事件發生時間。事件發生的時間,包含在數據記錄中。發生時間由 Producer 在構造 ProducerRecord 時指定。而且須要 Broker 或者 Topic 將 message.timestamp.type 設置爲 CreateTime(默認值)才能生效。

  • 消息接收時間,也即消息存入 Broker 的時間。當 Broker 或 Topic 將 message.timestamp.type 設置爲 LogAppendTime 時生效。此時 Broker 會在接收到消息後,存入磁盤前,將其 timestamp 屬性值設置爲當前機器時間。通常消息接收時間比較接近於事件發生時間,部分場景下可代替事件發生時間。

  • 消息處理時間,也即 Kafka Stream 處理消息時的時間。

注:Kafka Stream 容許經過實現 org.apache.kafka.streams.processor.TimestampExtractor 接口自定義記錄時間。

窗口  

前文提到,流式數據是在時間上無界的數據。而聚合操做只能做用在特定的數據集,也即有界的數據集上。所以須要經過某種方式從無界的數據集上按特定的語義選取出有界的數據。窗口是一種很是經常使用的設定計算邊界的方式。不一樣的流式處理系統支持的窗口相似,但不盡相同。

Kafka Stream 支持的窗口以下。

一、Hopping Time Window 該窗口定義以下圖所示。它有兩個屬性,一個是 Window size,一個是 Advance interval。Window size 指定了窗口的大小,也即每次計算的數據集的大小。而 Advance interval 定義輸出的時間間隔。一個典型的應用場景是,每隔 5 秒鐘輸出一次過去 1 個小時內網站的 PV 或者 UV。

二、Tumbling Time Window 該窗口定義以下圖所示。能夠認爲它是 Hopping Time Window 的一種特例,也即 Window size 和 Advance interval 相等。它的特色是各個 Window 之間徹底不相交。

三、Sliding Window 該窗口只用於 2 個 KStream 進行 Join 計算時。該窗口的大小定義了 Join 兩側 KStream 的數據記錄被認爲在同一個窗口的最大時間差。假設該窗口的大小爲 5 秒,則參與 Join 的 2 個 KStream 中,記錄時間差小於 5 的記錄被認爲在同一個窗口中,能夠進行 Join 計算。

四、Session Window 該窗口用於對 Key 作 Group 後的聚合操做中。它須要對 Key 作分組,而後對組內的數據根據業務需求定義一個窗口的起始點和結束點。一個典型的案例是,但願經過 Session Window 計算某個用戶訪問網站的時間。對於一個特定的用戶(用 Key 表示)而言,當發生登陸操做時,該用戶(Key)的窗口即開始,當發生退出操做或者超時時,該用戶(Key)的窗口即結束。窗口結束時,可計算該用戶的訪問時間或者點擊次數等。

Join  

Kafka Stream 因爲包含 KStream 和 Ktable 兩種數據集,所以提供以下 Join 計算

  • KTable Join KTable 結果仍爲 KTable。任意一邊有更新,結果 KTable 都會更新。

  • KStream Join KStream 結果爲 KStream。必須帶窗口操做,不然會形成 Join 操做一直不結束。

  • KStream Join KTable / GlobakKTable 結果爲 KStream。只有當 KStream 中有新數據時,纔會觸發 Join 計算並輸出結果。KStream 無新數據時,KTable 的更新並不會觸發 Join 計算,也不會輸出數據。而且該更新只對下次 Join 生效。一個典型的使用場景是,KStream 中的訂單信息與 KTable 中的用戶信息作關聯計算。

對於 Join 操做,若是要獲得正確的計算結果,須要保證參與 Join 的 KTable 或 KStream 中 Key 相同的數據被分配到同一個 Task。具體方法是

  • 參與 Join 的 KTable 或 KStream 的 Key 類型相同(實際上,業務含意也應該相同)

  • 參與 Join 的 KTable 或 KStream 對應的 Topic 的 Partition 數相同Partitioner 策略的最終結果等效(實現不須要徹底同樣,只要效果同樣便可),也即 Key 相同的狀況下,被分配到 ID 相同的 Partition 內

  • 若是上述條件不知足,可經過調用以下方法使得它知足上述條件。

    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

聚合與亂序處理  

聚合操做可應用於 KStream 和 KTable。當聚合發生在 KStream 上時必須指定窗口,從而限定計算的目標數據集。

須要說明的是,聚合操做的結果確定是 KTable。由於 KTable 是可更新的,能夠在晚到的數據到來時(也即發生數據亂序時)更新結果 KTable。

這裏舉例說明。假設對 KStream 以 5 秒爲窗口大小,進行 Tumbling Time Window 上的 Count 操做。而且 KStream 前後出現時間爲 1 秒, 3 秒, 5 秒的數據,此時 5 秒的窗口已達上限,Kafka Stream 關閉該窗口,觸發 Count 操做並將結果 3 輸出到 KTable 中(假設該結果表示爲<1-5,3>)。若 1 秒後,又收到了時間爲 2 秒的記錄,因爲 1-5 秒的窗口已關閉,若直接拋棄該數據,則可認爲以前的結果<1-5,3>不許確。

而若是直接將完整的結果<1-5,4>輸出到 KStream 中,則 KStream 中將會包含該窗口的 2 條記錄,<1-5,3>, <1-5,4>,也會存在骯數據。所以 Kafka Stream 選擇將聚合結果存於 KTable 中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。用戶可獲得完整的正確的結果。

這種方式保證了數據準確性,同時也提升了容錯性。

但須要說明的是,Kafka Stream 並不會對全部晚到的數據都從新計算並更新結果集,而是讓用戶設置一個 retention period,將每一個窗口的結果集在內存中保留必定時間,該窗口內的數據晚到時,直接合並計算,並更新結果 KTable。超過 retention period 後,該窗口結果將從內存中刪除,而且晚到的數據即便落入窗口,也會被直接丟棄。

容錯  

Kafka Stream 從以下幾個方面進行容錯

  • 高可用的 Partition 保證無數據丟失。每一個 Task 計算一個 Partition,而 Kafka 數據複製機制保證了 Partition 內數據的高可用性,故無數據丟失風險。同時因爲數據是持久化的,即便任務失敗,依然能夠從新計算。

  • 狀態存儲實現快速故障恢復和從故障點繼續處理。對於 Join 和聚合及窗口等有狀態計算,狀態存儲可保存中間狀態。即便發生 Failover 或 Consumer Rebalance,仍然能夠經過狀態存儲恢復中間狀態,從而能夠繼續從 Failover 或 Consumer Rebalance 前的點繼續計算。

  • KTable 與 retention period 提供了對亂序數據的處理能力。

Kafka Stream 應用示例  

 

下面結合一個案例來說解如何開發 Kafka Stream 應用。本例完整代碼可從做者 Github 獲取。

訂單 KStream(名爲 orderStream),底層 Topic 的 Partition 數爲 3,Key 爲用戶名,Value 包含用戶名,商品名,訂單時間,數量。用戶 KTable(名爲 userTable),底層 Topic 的 Partition 數爲 3,Key 爲用戶名,Value 包含性別,地址和年齡。商品 KTable(名爲 itemTable),底層 Topic 的 Partition 數爲 6,Key 爲商品名,價格,種類和產地。如今但願計算每小時購買產地與本身所在地相同的用戶總數。

首先因爲但願使用訂單時間,而它包含在 orderStream 的 Value 中,須要經過提供一個實現 TimestampExtractor 接口的類從 orderStream 對應的 Topic 中抽取出訂單時間。

public class OrderTimestampExtractor implements TimestampExtractor {

  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    if(record instanceof Order) {
      return ((Order)record).getTS();
    } else {
      return 0;
    }
  }
}

接着經過將 orderStream 與 userTable 進行 Join,來獲取訂單用戶所在地。因爲兩者對應的 Topic 的 Partition 數相同,且 Key 都爲用戶名,再假設 Producer 往這兩個 Topic 寫數據時所用的 Partitioner 實現相同,則此時上文所述 Join 條件知足,可直接進行 Join。

orderUserStream = orderStream
    .leftJoin(userTable, 
         // 該 lamda 表達式定義瞭如何從 orderStream 與 userTable 生成結果集的 Value
        (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
         // 結果集 Key 序列化方式
        Serdes.String(),
         // 結果集 Value 序列化方式
         SerdesFactory.serdFrom(Order.class))
    .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

從上述代碼中,能夠看到,Join 時須要指定如何從參與 Join 雙方的記錄生成結果記錄的 Value。Key 不須要指定,由於結果記錄的 Key 與 Join Key 相同,故無須指定。Join 結果存於名爲 orderUserStream 的 KStream 中。

接下來須要將 orderUserStream 與 itemTable 進行 Join,從而獲取商品產地。此時 orderUserStream 的 Key 仍爲用戶名,而 itemTable 對應的 Topic 的 Key 爲產品名,而且兩者的 Partition 數不同,所以沒法直接 Join。此時須要經過 through 方法,對其中一方或雙方進行從新分區,使得兩者知足 Join 條件。這一過程至關於 Spark 的 Shuffle 過程和 Storm 的 FieldGrouping。

orderUserStrea
    .through(
        // Key 的序列化方式
        Serdes.String(),
        // Value 的序列化方式 
        SerdesFactory.serdFrom(OrderUser.class), 
        // 從新按照商品名進行分區,具體取商品名的哈希值,而後對分區數取模
        (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
        "orderuser-repartition-by-item")
    .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))

從上述代碼可見,through 時須要指定 Key 的序列化器,Value 的序列化器,以及分區方式和結果集所在的 Topic。這裏要注意,該 Topic(orderuser-repartition-by-item)的 Partition 數必須與 itemTable 對應 Topic 的 Partition 數相同,而且 through 使用的分區方法必須與 iteamTable 對應 Topic 的分區方式同樣。通過這種 through 操做,orderUserStream 與 itemTable 知足了 Join 條件,可直接進行 Join。

總結  

 

  • Kafka Stream 的並行模型徹底基於 Kafka 的分區機制和 Rebalance 機制,實現了在線動態調整並行度

  • 同一 Task 包含了一個子 Topology 的全部 Processor,使得全部處理邏輯都在同一線程內完成,避免了沒必要的網絡通訊開銷,從而提升了效率。

  • through 方法提供了相似 Spark 的 Shuffle 機制,爲使用不一樣分區策略的數據提供了 Join 的可能

  • log compact 提升了基於 Kafka 的 state store 的加載效率

  • state store 爲狀態計算提供了可能

  • 基於 offset 的計算進度管理以及基於 state store 的中間狀態管理爲發生 Consumer rebalance 或 Failover 時從斷點處繼續處理提供了可能,併爲系統容錯性提供了保障

  • KTable 的引入,使得聚合計算擁用了處理亂序問題的能力

相關文章
相關標籤/搜索