Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature(當前:1.0.0-rc0,參見:https://github.com/apache/kafka/releases),它提供了對存儲於Kafka內的數據進行流式處理和分析的功能。其主要特色以下:html
簡言之,Kafka Streams解決了流式處理中的以下困難問題node
爲何要有Kafka Streamgit
當前已經有很是多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用普遍,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基於Apache Spark,能夠很是方便與圖計算,SQL處理等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark與Apache Storm擁用如此多的優點,那爲什麼還須要Kafka Stream呢?主要有以下緣由:github
KTable vs. KStream數據庫
KTable和KStream是Kafka Stream中很是重要的兩個概念,它們是Kafka實現各類語義的基礎。所以這裏有必要分析下兩者的區別。apache
如下圖爲例,假設有一個KStream和KTable,基於同一個Topic建立,而且該Topic中包含以下圖所示5條數據。此時遍歷KStream將獲得與Topic內數據徹底同樣的全部5條數據,且順序不變。而此時遍歷KTable時,由於這5條記錄中有3個不一樣的Key,因此將獲得3條記錄,每一個Key對應最新的值,而且這三條數據之間的順序與原來在Topic中的順序保持一致。這一點與Kafka的日誌compact相同。網絡
此時若是對該KStream和KTable分別基於key作Group,對Value進行Sum,獲得的結果將會不一樣。對KStream的計算結果是<Jack,4>,<Lily,7>,<Mike,4>。而對Ktable的計算結果是<Mike,4>,<Jack,3>,<Lily,5>多線程
State store : 架構
流式處理中,部分操做是無狀態的,例如過濾操做(Kafka Stream DSL中用filer方法實現)。而部分操做是有狀態的,須要記錄中間狀態,如Window操做和聚合計算。框架
State store被用來存儲中間狀態。它能夠是一個持久化的Key-Value存儲,也能夠是內存中的HashMap,或者是數據庫。Kafka提供了基於Topic的狀態存儲。Topic中存儲的數據記錄自己是Key-Value形式的,同時Kafka的log compaction機制可對歷史數據作compact操做,保留每一個Key對應的最後一個Value,從而在保證Key不丟失的前提下,減小總數據量,從而提升查詢效率。
構造KTable時,須要指定其state store name。默認狀況下,該名字也即用於存儲該KTable的狀態的Topic的名字,遍歷KTable的過程,實際就是遍歷它對應的state store,或者說遍歷Topic的全部key,並取每一個Key最新值的過程。爲了使得該過程更加高效,默認狀況下會對該Topic進行compact操做。
另外,除了KTable,全部狀態計算,都須要指定state store name,從而記錄中間狀態
時間:
在流式數據處理中,時間是數據的一個很是重要的屬性。從Kafka 0.10開始,每條記錄除了Key和Value外,還增長了timestamp屬性。目前Kafka Stream支持三種時間
注:Kafka Stream容許經過實現org.apache.kafka.streams.processor.TimestampExtractor接口自定義記錄時間。
窗口:
流式數據是在時間上無界的數據。而聚合操做只能做用在特定的數據集,也即有界的數據集上。所以須要經過某種方式從無界的數據集上按特定的語義選取出有界的數據。窗口是一種很是經常使用的設定計算邊界的方式。不一樣的流式處理系統支持的窗口相似,但不盡相同。Kafka Stream支持的窗口以下:
Join:
kafka Stream因爲包含KStream和Ktable兩種數據集,所以提供以下Join計算
對於Join操做,若是要獲得正確的計算結果,須要保證參與Join的KTable或KStream中Key相同的數據被分配到同一個Task。具體方法是
聚合與亂序處理:
聚合操做可應用於KStream和KTable。當聚合發生在KStream上時必須指定窗口,從而限定計算的目標數據集。須要說明的是,聚合操做的結果確定是KTable。由於KTable是可更新的,能夠在晚到的數據到來時(也即發生數據亂序時)更新結果KTable。
這裏舉例說明。假設對KStream以5秒爲窗口大小,進行Tumbling Time Window上的Count操做。而且KStream前後出現時間爲1秒, 3秒, 5秒的數據,此時5秒的窗口已達上限,Kafka Stream關閉該窗口,觸發Count操做並將結果3輸出到KTable中(假設該結果表示爲<1-5,3>)。若1秒後,又收到了時間爲2秒的記錄,因爲1-5秒的窗口已關閉,若直接拋棄該數據,則可認爲以前的結果<1-5,3>不許確。而若是直接將完整的結果<1-5,4>輸出到KStream中,則KStream中將會包含該窗口的2條記錄,<1-5,3>, <1-5,4>,也會存在骯數據。所以Kafka Stream選擇將聚合結果存於KTable中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。用戶可獲得完整的正確的結果。這種方式保證了數據準確性,同時也提升了容錯性。
但須要說明的是,Kafka Stream並不會對全部晚到的數據都從新計算並更新結果集,而是讓用戶設置一個retention period,將每一個窗口的結果集在內存中保留必定時間,該窗口內的數據晚到時,直接合並計算,並更新結果KTable。超過retention period後,該窗口結果將從內存中刪除,而且晚到的數據即便落入窗口,也會被直接丟棄。
容錯:
Kafka Stream從以下幾個方面進行容錯:
Kafka Stream總體架構
kafka stream的架構以下:
前(Kafka 0.11.0.0)Kafka Stream的數據源只能如上圖所示是Kafka。可是處理結果並不必定要如上圖所示輸出到Kafka。上圖中的Consumer和Producer並不須要開發者在應用中顯示實例化,而是由Kafka Stream根據參數隱式實例化和管理,從而下降了使用門檻。開發者只須要專一於開發核心業務邏輯,也即上圖中Task內的部分。
Processor Topology:基於Kafka Stream的流式應用的業務邏輯所有經過一個被稱爲Processor Topology的地方執行。它與Storm的Topology和Spark的DAG相似,都定義了數據在各個處理單元(在Kafka Stream中被稱做Processor)間的流動方式,或者說定義了數據的處理邏輯。
Kafka Stream並行模型:Kafka Stream的並行模型中,最小粒度爲Task,而每一個Task包含一個特定子Topology的全部Processor。所以每一個Task所執行的代碼徹底同樣,惟一的不一樣在於所處理的數據集互補。以下圖展現了在一個進程(Instance)中以2個Topic(Partition數均爲4)爲數據源的Kafka Stream應用的並行模型。從圖中能夠看到,因爲Kafka Stream應用的默認線程數爲1,因此4個Task所有在一個線程中運行。
爲了充分利用多線程的優點,能夠設置Kafka Stream的線程數。下圖展現了線程數爲2時的並行模型。
Kafka Stream可被嵌入任意Java應用(理論上基於JVM的應用均可以)中,下圖展現了在同一臺機器的不一樣進程中同時啓動同一Kafka Stream應用時的並行模型。注意,這裏要保證兩個進程的StreamsConfig.APPLICATION_ID_CONFIG徹底同樣。由於Kafka Stream將APPLICATION_ID_CONFI做爲隱式啓動的Consumer的Group ID。只有保證APPLICATION_ID_CONFI相同,才能保證這兩個進程的Consumer屬於同一個Group,從而能夠經過Consumer Rebalance機制拿到互補的數據集。
既然實現了多進程部署,能夠以一樣的方式實現多機器部署。該部署方式也要求全部進程的APPLICATION_ID_CONFIG徹底同樣。從圖上也能夠看到,每一個實例中的線程數並不要求同樣。可是不管如何部署,Task總數總會保證一致。
應用示例
示例完整代碼地址: https://github.com/habren/KafkaExample ,Schemal結構說明:
如今但願計算每小時購買產地與本身所在地相同的用戶總數。
public class OrderTimestampExtractor implements TimestampExtractor { @Override public long extract(ConsumerRecord<Object, Object> record) { if(record instanceof Order) { return ((Order)record).getTS(); } else { return 0; } } }
orderUserStream = orderStream .leftJoin(userTable, // 該lamda表達式定義瞭如何從orderStream與userTable生成結果集的Value (Order order, User user) -> OrderUser.fromOrderUser(order, user), // 結果集Key序列化方式 Serdes.String(), // 結果集Value序列化方式 SerdesFactory.serdFrom(Order.class)) .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)
orderUserStrea .through( // Key的序列化方式 Serdes.String(), // Value的序列化方式 SerdesFactory.serdFrom(OrderUser.class), // 從新按照商品名進行分區,具體取商品名的哈希值,而後對分區數取模 (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, "orderuser-repartition-by-item") .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))
小結:
參考資料: