Kafka Stream 高級應用

9.1將Kafka 與其餘數據源集成

對於第一個高級應用程序示例,假設你在金融服務公司工做。公司但願將其現有數據遷移到新技術實現的系統中,該計劃包括使用 Kafka。數據遷移了一半,你被要求去更新公司的分析系統,其目的是實時顯示最新的股票交易和與之關聯的相關信息,對於這種應用場景 Kafka Streams很是適合。
公司專一於提供金融市場不一樣領域的基金,該公司將基金交易實時記錄在關係型數據庫中,同時他們計劃最終將交易直接寫入Kafka,可是在短時間內,數據庫依然是記錄系統。
假設傳入的數據被插入關係型數據庫中,那麼如何縮小數據庫與新興的 Kafka Streams應用程序之間的差距呢?答案是使用 Kafka Connect,它是 Apache Kafka的一部分,是將 Kafka與其餘系統集成的框架。一旦 Kafka有數據,你將再也不關心源數據的位置,而只需將 Kafka Streams應用程序指向源主題,就像其餘 Kafka Streams應用程序同樣處理。
注意當使用 Kafka Connect從其餘源獲取數據時,集成點就是 Kafka的主題。這意味着任何使用Kafka Consumer的應用程序均可以使用導入的數據。
下圖展現了數據庫與 Kafka之間的集成是如何實現的。在本例中,將使用Kafka Connect來監控數據庫表和流更新,並將它們寫入 Kafka主題,該主題是金融分析應用程序的源。正則表達式

 9.1.1使用Kafka Connect集成數據

 Kafka Connect設計的目的是將數據從其餘系統流入 Kafka,以及將數據從 Kafka流入另外一個數據存儲應用程序,例如 MongoDB或 Elasticsearch。使用 Kafka Connect能夠將整個數據庫導 Kafka,或者其餘數據,如性能指標。
 Kafka Connect使用特定的鏈接器與外部數據源交互,幾種可用的鏈接器參考 Confluent官網。不少鏈接器都是由鏈接器社區開發的,使得 Kafka幾乎能夠與其餘任何存儲系統進行集成。若是沒有你想要的鏈接器,那麼你能夠本身實現一個。數據庫

9.1.2配置 Connect

 Kafka Connect有兩種運行模式,即分佈式模式和獨立模式。對於大多數生產環境,以分佈式模式運行是有意義的,由於當運行多個鏈接器實例時能夠利用其並行性和容錯性。這裏,咱們假設你在本機運行示例,所以全部的配置都是基於獨立模式的。
 Kafka Connect用來與外部數據源交互的鏈接器有兩種類型,即源鏈接器( source connector)和接收器鏈接器( sink connector)下圖演示了 Kafka Connect如何使用這兩種類型的鏈接器,正如所看到的,源鏈接器將數據寫入 Kafka,而接收器鏈接器從 Kafka接收數據供其餘系統使用。apache

對於本例,將使用 Kafka JDBC鏈接器。該鏈接器能夠在 GitHub官網上找到,爲了方便我將該鏈接器打包在源代碼中。
使用Kafka Conecet時,你須要對Kafka Connect自身以及用於導入或導出數據的單個鏈接器作少許配置。首先,讓咱們來看一下要用到的 Kafka Connect的配置參數。
■bootstrap.servers——Kafka Connect使用的 Kafka代理列表,多個代理之間以逗號隔開。
■key.converter——類轉換器,該轉換器控制消息的鍵從 Kafka Connect格式到寫人Kafka的格式的序列化。
■value.converter——類轉換器,該轉換器控制消息的值由 Kafka Connect格式到寫Kafka的格式的序列化。例如,可使用內置的org. apache. kafka. connect.json. JsonConverter。
■value.converter. schemas. enable——true或者 false,指定 Kafka Connect是否包含值的模式。對於本例,將其值設置爲fase,在下一節再解釋這樣設置的緣由。json

■plugin.path——告訴Kafka Connect所使用的鏈接器及其依賴項的位置。此位置能夠是單個、包含一個JAR文件或多個JAR文件的頂級目錄。也能夠提供多條路徑,這些路徑由逗號分隔的位置列表表示。
■offset. storage.file. filename——包含Kafka包含 Connect的消費者存儲的偏移量的文件。還須要爲JDBC鏈接器提供一些配置,這些配置參數說明以下。
■name——鏈接器的名稱。
■connector. class——鏈接器的類。
■tasks.max——鏈接器使用的最大任務數。
■connection.url——用於鏈接數據庫的URL
■mode——JDBC源鏈接器用於檢測變化的方法。
■incrementing.column.name——被跟蹤的用於檢測變化的列名。
■topic.prefix——Kafka Connect將每張表的數據寫入名爲「topic. prefix+表名的主題。
這些配置中的大多數都很簡單,但咱們仍須要對這些配置中的mode和incrementing. column.name兩個配置進行詳細討論,由於它們在鏈接器的運行中起着積極做用。JDBC源鏈接器使用mode配置項來檢測須要加載哪些行。本示例中該配置項被設置爲incrementing,它依賴於一個自增列,每次插入一條記錄時該列的值加1。經過跟蹤遞增列,只拉取新插入的記錄,更新操做將被忽略。你的 Kafka Streams應用程序只拉取最新的股票購買,所以這種設置是很理想的。配置項 incrementing. column.name是指包含自增值的列名。bootstrap

提示本書的源代碼包含 Kafka Connect和JDBC鏈接器的近乎完整的配置,配置文件位於本書源代碼的src/main/resources目錄下。你須要提供一些關於提取源代碼資源庫路徑的信息,仔細閱讀 README.md(點擊進行下載)文件中的詳細說明。服務器

9.1.3轉換數據

在得到這個任務以前,你已經使用相似的數據開發了一個 Kafka Streams應用程序,所以已經有了現成的模型和 Serde對象(底層使用Gson進行JSON的序列化與反序列化)。爲了保持較快的開發速度,你不但願編寫任何新的代碼來支持使用 Kafka Connect。正如從下一節所看到的,你將可以從 Kafka Connect中無縫導入數據。
【提示:Gson是一個由谷歌 Apache公司開發的權庫,用於將Java對象序列化爲json以及將json反序列化爲Java對象。你能夠從用戶指南中瞭解更多。】
爲了實現這種無縫集成,須要對JDBC鏈接器的屬性作一些較小的額外配置變動。在修改以前,讓咱們回顧一下9.1.2節介紹的配置項。具體來說,在前面我說過使用org. apache.kafka.connect.kafka.connect.json.JsonConverter,並將模式禁用,值就會被轉換爲簡單的JSON格式。
儘管JSON是你想在 Kafka Streams應用程序中使用的,但存在如下兩個問題。
第一,當將數據轉換爲JSON格式時,列名將是轉換後的JSON字符串字段的名稱,這些名稱都是BSE內部縮寫的格式,在公司外部沒有任何意義。所以當 Gson serde從JSON轉換到指望的模型對象時,該對象的全部字段均爲空,由於JSON字符串中的字段名與該對象的字段名不匹配。
第二,和預期同樣,存儲在數據庫中的日期和時間是時間戳類型的,可是所提供的 Gson serde並無爲Date類型定義一個自定義的 TypeAdapte,所以全部日期都須要轉換爲格式相似 yyyy-dd'T':mm:ss.SS-0400的字符串。幸運的是, Kafka Connect提供了一種機制,可以很輕鬆地解決這兩個問題。session

Kafka Connect有轉換的設計思想,容許在 Kafka Connect將數據寫入 Kafka以前對數據作一些輕量的轉換。圖9-3展現了這個轉換過程發生的地方。
本示例中,將使用兩個內置的轉換操做類,即 TimestampConvert和 ReplaceField。如前所述,要使用這些轉換類,須要在 connector-jdbc- properties配置文件中添加代碼清單所示的幾行配置架構

name=stock-transaction-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:h2:tcp://localhost:9989/~/findata
mode=incrementing
incrementing.column.name=TXN_ID
topic.prefix=dbTxn

#類名轉化器
transforms=ConvertDate,Rename
#日期轉換器ConvertDate的別名類型
transforms.ConvertDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
#待轉換日期字段
transforms.ConvertDate.field=TXNTS
#日期轉換後輸出的類型
transforms.ConvertDate.target.type=string
#日期的格式
transforms.ConvertDate.format=yyyy-MM-dd'T'HH:mm:ss.SSS-0400
#重命名轉換器Rename的別名類型
transforms.Rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
#須要替換的列表名
transforms.Rename.renames=SMBL:symbol, SCTR:sector, INDSTY:industry, SHRS:shares, SHRPRC:sharePrice, CSTMRID:customerId, TXNTS:transactionTimestamp

這些屬性是相對自描述性的,所以咱們沒必要在它們上面花太多時間。如你所見,它們剛好提供了你須要 Kafka Streams應用程序提供的對於由 Kafka Connect和JDBC鏈接器導入 Kafka的消息成功進行反序列化。當全部 Kafka Connect組件就緒以後,要完成數據庫表與 Kafka Streams應用程序的集成,只需使用具備 connector-c-jdbc.properties文件中指定的前綴的主題。併發

 //StockTransaction對象的序列化和反序列化器
        Serde<StockTransaction> stockTransactionSerde = StreamsSerdes.StockTransactionSerde();

//使用Kafka Connect寫入記錄的主題做爲流的源
        builder.stream("dbTxnTRANSACTIONS",  Consumed.with(stringSerde, stockTransactionSerde))
                     //KStream<K,V> peek​(ForeachAction<? super K,? super V> action)對KStream的每一個記錄執行一個操做。這是無狀態逐記錄操做
                     //此處將消息打印到控制檯
                      .peek((k, v)-> LOG.info("transactions from database key {} value {}",k, v))

此時,你正在使用 Kafka Streams處理來自數據庫表中的記錄,可是還有更多的事情要作。你正在經過流式處理採集股票交易數據,爲了分析這些交易數據,你須要按股票代碼將交易數據進行分組。
咱們已經知道了如何選擇鍵並對記錄從新分區,但若是記錄在寫入 Kafka時帶有鍵則效率更高,由於Kafka Streams應用程序能夠跳太重新分區的步驟,這就節省了處理時間和磁盤空間。讓咱們再回顧一下 Kafka Connect的配置。
首先,你能夠添加一個 ValueToKey轉換器,該轉換器根據所指定的字段名列表從記錄的值中提取相應字段,以用於鍵。更新 connector--dbc. properties文件內容如代碼清單:負載均衡

#增長ExtractKey轉換器
transforms=ConvertDate,Rename,ExtractKey
transforms.ConvertDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertDate.field=TXNTS
transforms.ConvertDate.target.type=string
transforms.ConvertDate.format=yyyy-MM-dd'T'HH:mm:ss.SSS-0400
transforms.Rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.Rename.renames=SMBL:symbol, SCTR:sector, INDSTY:industry, SHRS:shares, SHRPRC:sharePrice, CSTMRID:customerId, TXNTS:transactionTimestamp
#指定ExtractKey轉化器的類型
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ValueToKey
#流出須要抽取的字段別名以用做鍵的字段名
transforms.ExtractKey.fields=symbol

添加了一個別名爲 ExtractKey的轉換器並通知 Kafka Connect轉換器對應的類名爲ValueTokey.同時提供用於鍵的字段名爲 symbol,它能夠由多個以逗號分隔的值組成,但本例只須要提供一個值。注意,這裏的字段名是原字段重命名以後的版本,由於這個轉換器是在重命名轉換器轉換以後才執行的。
ExtractKey提取的字段結果是一個包含一個值的結構,可是你只想鍵對應的值即股票代碼包括在結構中,爲此能夠添加一個FlattenStruct轉換器將股票代碼提取出來。

#增長最後一個轉換器
transforms=ConvertDate,Rename,ExtractKey,FlattenStruct
transforms.ConvertDate.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertDate.field=TXNTS
transforms.ConvertDate.target.type=string
transforms.ConvertDate.format=yyyy-MM-dd'T'HH:mm:ss.SSS-0400
transforms.Rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.Rename.renames=SMBL:symbol, SCTR:sector, INDSTY:industry, SHRS:shares, SHRPRC:sharePrice, CSTMRID:customerId, TXNTS:transactionTimestamp
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ExtractKey.fields=symbol
#指定轉換器對應的類
transforms.FlattenStruct.type=org.apache.kafka.connect.transforms.ExtractField$Key
#帶抽取的字段名稱
transforms.FlattenStruct.field=symbol

以上代碼清單中添加了最後一個別名爲FlattenStruct的轉換器,並指定該轉換器對應的類型爲 ExtractFieldKey類,Kafka Connect使用該類來提取指定的字段,而且在結果中只包括該字段(在本例,該字段爲鍵)。最後提供了字段名稱,本例指定該名稱爲 symbol,和前一個轉換器指定的字段同樣,這樣作是有意義的,由於這是用來建立鍵結構的字段。
只須要增長几行配置,就能夠擴展以前的 Kafka Streams應用程序以執行更高級的操做,而無須選擇鍵並執行從新分區的步驟,如代碼:

        //StockTransaction對象的序列化和反序列化器
        Serde<StockTransaction> stockTransactionSerde = StreamsSerdes.StockTransactionSerde();
       
        StreamsBuilder builder = new StreamsBuilder();



        //使用Kafka Connect寫入記錄的主題做爲流的源
        builder.stream("dbTxnTRANSACTIONS",Consumed.with(stringSerde, stockTransactionSerde))
                     //KStream<K,V> peek​(ForeachAction<? super K,? super V> action)對KStream的每一個記錄執行一個操做。這是無狀態逐記錄操做
                     //此處將消息打印到控制檯
                      .peek((k, v)-> LOG.info("transactions from database key {} value {}",k, v))
                      //按鍵進行分組
                      .groupByKey(Serialized.with(stringSerde, stockTransactionSerde))
                /*
                 * KTable<K,VR> aggregate​(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> aggregator,Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized))
                 * 經過分組鍵聚合此流中的記錄值。具備空鍵或值的記錄將被忽略。
                 * initializer-用於計算初始中間聚合結果的Initializer
                 * aggregator-一個計算新聚合結果的聚合器,可用於實現諸如計數之類的聚合功能。
                 * Materialized-實例化的實例化,用於實例化狀態存儲。不能爲null.with():使用提供的鍵和值Serdes實例化StateStore。
                 * 
                 * 在處理第一個輸入記錄以前,當即一次應用指定的Initializer,以提供用於處理第一個記錄的初始中間聚合結果。指定的Aggregator將應用於每一個輸入記錄,並使用當前聚合(或使用經過Initializer提供的中間聚合結果用於第一條記錄)來計算新的聚合和記錄的值。
                 * 所以,aggregate(Initializer,Aggregator,Materialized)可用於計算諸如count(c.f. count())之類的聚合函數。
                 */
                      .aggregate(()-> 0L,(symb, stockTxn, numShares) -> numShares + stockTxn.getShares(),
                              Materialized.with(stringSerde, longSerde)).toStream()
                             .peek((k,v) -> LOG.info("Aggregated stock sales for {} {}",k, v))
                             .to( "stock-counts", Produced.with(stringSerde, longSerde));

由於數據傳入時就帶有鍵,因此能夠適用groupByKey,它不會設置自動從新分區的標誌位。經過分組操做,能夠直接進行一個聚合操做而無須從新分區。

9.2替代數據庫

在第4章中,咱們學習瞭如何向 Kafka Streams應用程序添加本地狀態。流式應用程序須要使用狀態來執行相似聚合、歸約和鏈接的操做除非流式應用程序只處理單條記錄,不然就須要本地狀態。
根據第四章的需求的需求,咱們已經開發了一個 Kafka Streams應用程序,它獲取股票交易的3類信息:
■市場交易總額;
■客戶每次購買股票的數量;
■在窗口大小爲10秒的翻轉窗口中,每支股票的總成交量。
到目前爲止,在全部的示例中查看程序運行結果的方式有兩種,一是經過控制檯查看,二是從接收器主題中讀取結果。在控制檯查看數據適合開發環境,但控制檯並非展現結果的最佳方式。若是要作任何分析工做或者快速理解發生了什麼,儀表板應用程序是最好的展示方式。
本節將會介紹如何在 Kafka Streams中使用交互式查詢來開發一個用於查看分析結果的儀表板應用程序,而不須要關係型數據庫來保存狀態。直接將Kafka Streams做爲數據流提供給儀表板應用程序。所以,儀表板應用程序中的數據天然會不斷更新。
在一個典型的架構中,捕獲和操做的數據會被推送到關係型數據庫中以用於查看圖9-4展現了這種架構:在使用 Kafka Streams以前,經過 Kafka攝取數據,併發送給一個分析引擎,而後分析引擎將處理結果寫入數據庫,以提供給儀表板應用程序使用。

 

 若是增長Kafka Streams使用本地狀態,那麼就要對上圖架構進行修改,以下圖所示刪掉整個集羣,能夠顯著簡化架構。Kafka Streams依然將數據寫回Kafka,而且數據庫仍然是已轉換數據的主要使用者

 交互式查詢可讓你直接查看狀態存儲中的數據,而沒必要先從Kafka中消費這些數據,換句話說流也成爲數據,因而就有了如下調整。

 Kafka Streams經過REST風格接口從流式應用程序外部提供只讀訪問。值得重申的是這個構想是這麼強大:你能夠查看流的運行狀態而不須要一個外部數據庫。

9.2.1交互式查詢工做原理

要使交互式查詢生效,Kafka Streams須要在只讀包裝器中公開狀態存儲。重點是要理解:雖然 Kafka Streams讓狀態存儲能夠被查詢,但並無提供任何方式來更新和修改狀態存儲。 Kafka Streams經過Kafkastreams.store方法公開狀態存儲。
下面的代碼片斷是 store方法的使用示例:
ReadonlyWindowstore readonlystore = kafkastreams.store(storeName, Queryables.windowStore());
該示例檢索一個 WindowStore, QueryablestoreTypes還提供另外兩種類型的方法:
■QueryableStoreTypes.sessionStore();
■QueryableStoreTypes.keyValueStore();
一旦有了對只讀狀態存儲的引用,只須要將該狀態存儲公開給一個提供給用戶查詢流數據狀態的服務便可(例如一個REST風格的服務)可是檢索狀態存儲只是整個構想的一部分,這裏提取的狀態存儲將只包含本地存儲中包含的鍵。
注意:請記住, Kafka Streams爲每一個任務分配一個狀態存儲,只要使用同一個應用程序ID, Kafka Streams應用程序就能夠由多個實例組成。此外這些實例並不須要都位於同一臺主機上。所以,有可能你查詢到的狀態存儲僅包含全部鍵的一個子集,其餘狀態存儲(具備相同名稱,但位於其餘機器上)可能包含鍵的另外一個子集。
讓咱們使用前面列出的分析來明確這個構想。

9.2.2分配狀態存儲

先看看第一個分析,按市場板塊聚合股票交易由於要進行聚合,因此狀態存儲將發揮做用。你但願公開狀態存儲,以提供每一個市場板塊成交量的實時視圖,以深刻了解目前市場哪一個板塊最活躍。股票市場活動產生大量的數據,但咱們只討論使用兩個分區來保持示例的詳細信息。另外,假設你在位於同一個數據中心的兩臺獨立的機器上運行兩個單線程實例,因爲 Kafka Streams的自動負載均衡功能,每一個應用程序將有一個任務來處理來自輸入主題的每一個分區的數據。
下圖展現了任務與狀態存儲的分配狀況。正如你所看到的,實例A處理分區0上的全部記錄,而實例B處理分區1上的全部記錄。

"Energy":100000"分配到實例A的狀態存儲中,"Finance":110000分配到實例B的狀態存儲中。回到爲了查詢而公開狀態存儲的示例,能夠清楚地看到,若是將實例A上的狀態存儲公開給Web服務或任何外部查詢,則只能檢索到"Energy"鍵對應的值。
如何解決這個問題呢?你確定不想創建一個單獨的Web服務來查詢每一個實例—這種方式擴展性差。幸運的是你沒必要這樣作, Kafka Streams提供了一種就像設置配置同樣簡單的解決方案。

9.2.3建立和查找分佈式狀態存儲

若要啓用交互式查詢,須要設置 StreamsConfig. APPLICATION_SERVER_CONFIG參數,它包括 Kafka Streams應用程序的主機名及查詢服務將要監聽的端口,格式爲 hostname:port。
當一個 Kafka Streams實例接收到給定鍵的查詢時,須要找出該鍵是否被包含在本地狀態存儲中。更重要的是,若是在本地沒找到,那麼你但願找到哪一個實例包含該鍵並查詢該實例的狀態存儲。
KafkaStreams對象的幾個方法容許檢索由 APPLICATION_ SERVER_CONFIG定義的、具備相同應用程序ID全部運行實例的信息。表9-1列出了這些方法名及其描述。表9-1檢索存儲元數據的方法

方法名

參數

用途

allMetadata

無參數

檢索全部實例,有些多是遠程實例

allMetadataForstore

存儲的名稱

檢索包含指定存儲的全部實例(有些是遠程實例)

allMetadataForKey

鍵,Serializer

檢索包含有鍵存儲的全部實例(有些是遠程實例)

allMetadataForKey

鍵,StreamPartitioner

檢索包含有鍵存儲的全部實例(有些是遠程實例)


可使用 KafkaStreams.allMetadata方法獲取有資格進行交互式查詢的全部實例的信息。KafkaStreams.allMetadataForKey方法是我在寫交互式查詢時最經常使用的方法。
接下來,讓咱們再看一下鍵/值在 Kafka Streams實例中的分佈,增長了檢查鍵"Finance"過程的順序,該鍵從另外一個實例找到並返回。每個Kafka Streams實例都內置一個輕量的服務器,監聽 APPLICATION_ SERVERCONFIG中指定的端口。

須要重點指出的是:你只須要查詢 Kafka Streams某一個實例,至於查詢哪個實例並不重要(前提是你已經正確配置了應用程序)經過使用RPC機制和元數據檢索方法,若是查詢的實例不包含待查詢的數據,則該實例會找到數據所在的位置,並提取結果,而後將結果返回給原始查詢
經過跟蹤上圖中的調用流,你能夠在實際操做中看到這一點。實例A並不包含鍵「Finance」,但發現實例B包含該鍵,所以,實例A向實例B內置的服務器發起一次方法調用,該方法檢素數據並將結果返回給原始的查詢。

9.2.4編寫交互式查詢

 public static void main(String[] args) {

        if(args.length < 2){
            LOG.error("Need to specify host, port");
            System.exit(1);
        }

        String host = args[0];
        int port = Integer.parseInt(args[1]);
        final HostInfo hostInfo = new HostInfo(host, port);

        Properties properties = getProperties();
        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host+":"+port);

如今須要提供兩個參數(主機名和端口),但這種更改影響微乎其微。你還能夠嵌入本地服務器執行查詢:對於這個實現,我選擇Spark Web服務器。固然,若是你不喜歡Spark Web服務器,請隨意替換爲另外一個Web服務器

如今,讓咱們看一下嵌入Spark服務器的代碼:

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
        InteractiveQueryServer queryServer = new InteractiveQueryServer(kafkaStreams, hostInfo);
        StateRestoreHttpReporter restoreReporter = new StateRestoreHttpReporter(queryServer);

        queryServer.init();

        kafkaStreams.setGlobalStateRestoreListener(restoreReporter);

        kafkaStreams.setStateListener(((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                LOG.info("Setting the query server to ready");
                queryServer.setReady(true);
            } else if (newState != KafkaStreams.State.RUNNING) {
                LOG.info("State not RUNNING, disabling the query server");
                queryServer.setReady(false);
            }
        }));

        kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
            LOG.error("Thread {} had a fatal error {}", t, e, e);
            shutdown(kafkaStreams, queryServer);
        });


        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            shutdown(kafkaStreams, queryServer);
        }));

        LOG.info("Stock Analysis KStream Interactive Query App Started");
        kafkaStreams.cleanUp();
        kafkaStreams.start();
    }

在這段代碼中,建立了一個InteractiveQueryServer實例,它是一個包裝類,包含Spark Web服務器和管理Web服務調用以及啓動和中止Web服務器的代碼。第7章討論過使用狀態監聽器來通知一個 Kafka Streams應用程序的各類狀態,在這裏能夠看到這個監聽器的有效使用。回想一下,當在運行交互式查詢時,須要使用 Streams Metadata實例來肯定給定鍵的數據是不是正在處理查詢的實例的本地數據。將查詢服務器的狀態設置爲true,僅當在應用程序處於運行狀態時才容許訪問所須要的元數據。要記住的一個關鍵點是返回的元數據是由 Kafka Streams應用程序組成的快照。在任什麼時候候,你均可以伸縮應用程序。當這種狀況發生時(或者,在其餘任何合格事件發生時,如經過正則表達式來添加源節點的主題), Kafka Streams應用程序經歷再平衡階段,可能會更改分區的分配。在本示例中,只有處於運行狀態時才容許查詢,但能夠隨意使用任何你認爲合適的策略。接下來是第7章中涉及的另外一個例子:設置一個未捕獲的異常處理器。在本示例中,將記錄錯誤並關閉應用程序和查詢服務器。由於這個應用程序無限期地運行,因此添加一個關閉鉤子用來當中止示例時關閉全部程序。

相關文章
相關標籤/搜索