在上圖的拓撲中生成了一個購買-交易事件流,拓撲中的一個處理節點根據銷售額來計算客戶的獎勵級分。但在這個處理其中,要作的也僅僅時計算單筆交易的總積分,並轉發計算結果。css
若是將一些狀態添加處處理器中,就能夠追蹤累積的獎勵級分。所以首先要作的就是使用值轉換處理器將無狀態的獎勵處理器轉化爲有狀態的獎勵處理器。這樣就能夠追蹤到目前爲止所得到的總獎勵積分以及兩次購買之間的時間間隔,爲下游消費者提供更多信息。html
最基本的狀態函數時KStream.transformValues,下圖就展現了KStream.transformValues()方法是如何操做的。java
該方法在語義上與 KStream.mapValues()方法相同,但有一些區別。其中一個區別在於 transformvalues方法須要訪問一個stateStore實例來完成它的任務。另外一個區別在於該方法經過 punctuate()方法安排操做按期執行的能力。咱們已在第6章討論處理器API時再對 punctuate()方法進行詳細介紹。數據庫
KStream<K,VR> transformValues(ValueTransformerSupplier<? super V,? extends VR> valueTransformerSupplier,String stateStoreNames)的做用是將每一個輸入記錄的值轉換爲輸出記錄的新值(可能有新類型)。將ValueTransformer(由給定的ValueTransformerSupplier提供)應用於每一個輸入記錄值,併爲其計算一個新值。所以,輸入記錄<K,V>能夠轉換爲輸出記錄<K:V'>。這是一個有狀態的逐記錄操做(請參見mapValues(ValueMapper))。此外,經過Punctuator.punctuate(long)能夠觀察處處理進度,而且能夠執行其餘週期性操做。apache
ValueTransformerSupplier接口能夠用來建立一個或多個ValueTransformer實例。windows
ValueTransformer接口用於將值有狀態地映射到新值(可能的新類型)。這是一個逐條記錄的有狀態操做,即對流的每一個記錄分別調用transform(Object),而且能夠訪問和修改狀態。ValueTransformer中,經過ProcessorContext得到狀態。要經過punctuate()觸發按期操做,必須註冊一個時間表。數組
最開始的圖已經向你們展現了獎勵處理器,而獎勵處理器屬於商戶業務中屬於獎勵程序(rewards)。最初,獎勵處理器使用KStream.mapValues()方法傳入的Purchase對象映射成RewardAccumulator對象。緩存
RewardAccumulator對象最初僅包含兩個字段,即客戶ID和交易的購買總額。如今需求發生了一些變化,積分與獎勵程序聯繫在了一塊兒,RewardAccumulator類屬性字段定義也變動成以下:服務器
public class RewardAccumulator { private String customerId; //用戶id private double purchaseTotal;//購買總額 private int currentRewardPoints;//當前獎勵數 }
而在此以前,是一個應用程序從rewards主題中讀取數據並計算客戶的獎勵。但如今咱們但願讓積分系統經過流式應用程序來維護和計算客戶的獎勵,還得獲取客戶當前和上一次購買之間的時間間隔。網絡
當應用程序從rewards主題讀取記錄時,消費者應用城西只須要檢測積分總數是否超過度配獎勵的閾值。爲實現這個目標能夠在RewardAccumulator對象中增長
public class RewardAccumulator { private String customerId; private double purchaseTotal; private int totalRewardPoints; private int currentRewardPoints; private int daysFromLastPurchase;//增長的用於追蹤積分總數的字段 }
購買程序的更新規則很簡單,客戶每消費一美圓得到一個積分,交易總額按四捨五入法計算。拓撲的整體結構不會改變,可是獎勵處理(rewards-processing)節點將從使用 KStream.mapValues()方法更改成使用 KStream.transformValues()方法。從語義上講,這兩種方法操做方式相同,即仍然是將 Purchase對象映射爲RewardAccumulator對象。不一樣之處在於使用本地狀態來執行轉換的能力。
具體來講,將採起如下兩個主要步驟。
■初始化值轉換器。
■使用狀態將 Purchase對象映射爲 RewardAccumulator對象。
第一步時在轉換器init()方法中設置或建立任何實例變量。
//一個支持放置/獲取/刪除和範圍查詢的鍵值存儲。
private KeyValueStore<String, Integer> stateStore; private final String storeName; private ProcessorContext context; //ProcessorContext處理上下文接口 public void init(ProcessorContext context) { this.context = context; //根據名稱獲取狀態存儲 stateStore = (KeyValueStore) this.context.getStateStore(storeName); }
在轉換器類,將對象類型轉化爲KeyValueStore類型。
按如下步驟:
①按客戶ID檢查目前累計積分
②與當前交易的積分求和,並呈現積分總數
③將RewardAccumulator中的獎勵積分總數更新爲新的積分總數
④按客戶ID將新的積分總數保存到本地狀態存儲中
public RewardAccumulator transform(Purchase value) { //由Purchase對象構造RewardAccumulator對象 RewardAccumulator rewardAccumulator = RewardAccumulator.builder(value).build(); //根據客戶ID檢索最新積分值 Integer accumulatedSoFar = stateStore.get(rewardAccumulator.getCustomerId()); if (accumulatedSoFar != null) { //若是累積的積分數存在,則將其加到當前總數中。 rewardAccumulator.addRewardPoints(accumulatedSoFar); } //將新的積分存儲到stateStore中 stateStore.put(rewardAccumulator.getCustomerId(), rewardAccumulator.getTotalRewardPoints()); //返回新的積分累計值 return rewardAccumulator; }
經過以上代買來實現獎勵積分處理器。但在操做以前,你須要考慮一下你正在經過客戶ID訪問全部的銷售信息,若是爲給定的客戶收集每筆交易信息就意味着該客戶的全部交易信息都位於同一個分區。可是,由於進入應用程序的交易信息沒有鍵,因此生產者以輪詢的方式將交易信息分配給分區。所以你想要的結果和程序真正執行時就會產生矛盾。
這裏存在一個問題(除非你使用主題只有一個分區),由於鍵沒有填充,因此按輪詢分配的話就意味着同一個客戶的交易信息不必定分配到同一個分區。將相同客戶ID的交易信息放到同一個分區很重要,由於須要根據客戶ID從狀態存儲中查找記錄。不然。回有相同客戶ID客戶信息分佈在不一樣分區上,這樣查找相同客戶的信息時候須要從多個狀態存儲中查找。
雖然在這個簡單的實例中,用一個具體的值替代空間,可是從新分區並不總須要更新鍵。經過使用流分區器StreamPartitioner,可使用任何你想到的分區策略,例如不用鍵而用值或值得一部分來分區。
在Kafka Streams中使用KStream.through()方法從新分區是一件容易的事。KStream.through()方法建立一箇中間主題,當前的Stream實例開始將記錄寫入這個主題中。調用through()方法返回一個新KStream實例,該實例使用同一個主題做爲數據源。經過這種方式,數據就能夠被無縫地從新進行分區。
在底層, Kafka Streams建立了一個接收器節點和源節點。接收器節點是調用 KStream實例的子處理器,新 KStream實例使用新的源節點做爲它的數據源。你可使用DSL本身編寫相同類型的子拓撲,可是使用 KStream. through()方法更方便。
若是已經修改或更改了鍵,而且不須要自定義分區策略,那麼能夠依賴 Kafka Streams的KafkaProducer內部的 DefaultPartitioner來處理分區。可是若是想應用本身的分區方式,那麼可使用流分區器。在下一個例子中就能夠這麼作。
使用 KStream. through()方法的代碼以下所示。在本例, KStream. through()方法接受兩個參數,即主題名稱和一個Produced實例。 Produced實例分別提供鍵和值的序列化與反序列化器(Serde)以及一個流分區器( Stream Partitioner)。注意,若是想使用默認鍵和值的 Serde實例,且不須要自定義分區策略,那麼可使用只有一個主題名參數的 KStream. through方法。
//RewardsStreamPartitioner實現StreamPartitioner接口,該肯定記錄如何在Kafka主題中的分區之間分配。 RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner(); //經過KStream through方法建立一個新的KStream實例 KStream<String, Purchase> transByCustomerStream = purchaseKStream.through( "customer_transactions", Produced.with(stringSerde, purchaseSerde, streamPartitioner));
KStream<K,V> through(String topic,Produced<K,V> produced)
該方法將流具體化爲一個主題,並使用Produced實例從該主題中以配置鍵序列,值序列和StreamPartitioner建立一個新的KStream。
topic-主題名稱;
produce-生成主題時要使用的選項,示例代碼中採用的是:
Produced<K,V> with(Serde<K> keySerde,Serde<V> valueSerde,StreamPartitioner<? super K,? super V> partitioner)
其中partitioner該函數用於肯定記錄在主題分區之間的分配方式
public class RewardsStreamPartitioner implements StreamPartitioner<String, Purchase> { @Override public Integer partition(String key, Purchase value, int numPartitions) { return value.getCustomerId().hashCode() % numPartitions; } }
注意上面的代碼並無產生新的鍵,而是使用值得一個屬性(客戶ID)來確認同一個客戶的正確分區.
至此,已經建立出一個新的處理節點,該處理節點將購物對象寫入一個按客戶ID進行分區的主題。這個新主題也將成爲即將更新的獎勵處理器的數據源。這樣作是爲了確保同一個客戶的全部信息能被寫入同一個分區。所以,對給定客戶全部購買信息都使用相同的狀態存儲。
更新獎勵處理器以使用有狀態轉換:
//使用有狀態轉換(獎勵處理器) KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream.transformValues(() -> new PurchaseRewardTransformer(rewardsStateStoreName), rewardsStateStoreName); //將結果寫入「rewards」主題(接受處理器) statefulRewardAccumulator.to("rewards", Produced.with(stringSerde, rewardAccumulatorSerde));
數據本地化對性能相當重要。雖然鍵查找一般很是快,可是當使用遠程存儲在大規模處理時帶來的延遲將成爲瓶頸。
上說明了數據本地化背後的原理。虛線表示從遠程數據庫檢索數據的網絡調用。實線描述對位於同一臺服務器上的內存數據存儲的調用。正如你所看到的,本地數據調用比經過網絡向遠程數據庫的調用更有效。
當經過一個流式應用程序處理數百萬或數十億條記錄時,當乘以一個較大的因子時,即便很小的網絡延遲也會產生巨大影響。因此數據本地化還意味着存儲對每一個處理節點都是本地的,而且在進程或線程之間不共享。這樣的話,若是一個進程失敗了,它就不會對其餘流式處理進程或線程產生影響。儘管流式應用程序有時須要狀態,可是它應該位於進行處理的本地。應用程序的每一個服務器或節點都應該有一個單獨的數據存儲。
用一個主題備份一個狀態存儲看起來彷佛有些代價,但有幾個緩和因素在起做用: KafkaProducer批量發送數據,默認狀況下記錄會被緩存。僅在緩存刷新時 Kafka Streams纔將記錄寫入存儲,所以只保存給定鍵的最新記錄。Kafka Streams提供的狀態存儲既能知足本地化又能知足容錯性的需求,對於已定義的處理器來講它們是本地的,而且不會跨進程或線程訪問共享。狀態存儲也使用主題來作備份和快速恢復。
添加一個狀態存儲時一件簡單的事情,經過Stores類中的一個靜態工廠方法建立一個StoreSupplier實例。有兩個用於制定狀態存儲的附加類,即Meterialized類和StoreBudiler類。使用哪種類取決於如何向拓撲中添加存儲。若是使用高階DSL,那麼一般使用Meterialized類。
//建立StateStore供應者實例 String rewardsStateStoreName = "rewardsPointsStore"; KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(rewardsStateStoreName); //建立StoreBuilder並指定鍵/值 StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.Integer()); //添加到拓撲 builder.addStateStore(storeBuilder);
在上述代碼中,首先,建立一個 storeSupplier對象,它提供一個基於內存的鍵/值存儲,將建立的 StoreSupplier對象做爲參數來建立一個 StoreBuilder對象,並指定鍵的類型爲 String,值的類型爲 Integer。最後,經過向 StreamsBuilder提供 StoreBuilder將 statestore添加到拓撲,同時經過 StreamBuilder.addStatestore方法將存儲添加到應用程序中。所以,如今能夠經過上面所建立的狀態存儲的名字 rewardsStateStoreName來使用處理器中的狀態。
全部 StateStoreSupplier類型默認都啓用了日誌。日誌在這裏指一個 Kafka主題,該主題做爲一個變動日誌用來備份存儲中的值並提供容錯。例如,假設有一臺運行 Kafka Streams應用程序的機器宕機了,一旦服務器恢復並從新啓動了 Kafka Streams應用程序,該機器上對應實例的狀態存儲就會恢復到它們原來的內容(在崩潰以前變動日誌中最後提交的偏移量)。
當使用帶有 disableLogging()方法的 Stores工廠時,能夠經過 disableLogging()方法禁用日誌功能。可是不要隨意禁用日誌,由於這樣作會從狀態存儲中移除容錯而去除狀態存儲在崩潰後的恢復能力。
用於狀態存儲的變動日誌是能夠經過 withLogging(map<String,String> config)方法進行配置的,能夠在Map中使用任何主題可用的配置參數。用於狀態存儲的變動日誌的配置在構建一個 Kafka Streams應用程序時很重要。可是請記住,Kafka Streams會建立該主題而並不是有咱們本身建立。
對 Kafka主題而言,默認的一個日誌段的數據保留時間被設置爲一星期,數據量沒有限制根據你的數據量,這或許是能夠接受的,可是你頗有可能要調整這些設置。此外,默認清理策略是 delete讓咱們先看一下如何將變動日誌主題配置爲保留數據大小爲10GB,保留時間爲2天,配置代碼以下:
Map<String, String> changeLogConfigsMap = new HashMap<>(); changeLogConfigsMap.put("retention.ms","172800000"); changeLogConfigsMap.put("retention.bytes", "10000000000"); storeBuilder.withLoggingEnabled(changeLogConfigsMap); Materialized.as(Stores.inMemoryKeyValueStore("foo")).withLoggingEnabled(changeLogConfigsMap);
當流中的事件不獨立存在時,流就須要有狀態,有時須要的狀態或上下文是另外一個流。在本節中,將從兩個具備相同鍵的流中獲取不一樣的事件,並將它們組合成新的事件。
學習鏈接流的最好方式是看一個具體的例子,因此咱們將回到 商戶交易的應用場景。假設咱們新開了經營電子及相關產品(CD、DVD、智能手機等)的商店。爲了嘗試一種新的方法,咱們與一家國營咖啡館合做,在每家商店裏內設一個咖啡館。這種內設咖啡館的方式取得了巨大的成功,所以決定啓動一個新的項目——經過提供咖啡優惠券來保持電子產品商店的高客流量(指望客流量的增長會帶來額外的購買交易),因此咱們須要識別出哪些客戶購買了咖啡並在電子產品商店進行了交易,那麼當這些客戶再次交易以後就當即送給他們優惠券。
爲了肯定何時給客戶贈送代金券,須要將咖啡管的銷售記錄和電子產品商店的銷售記錄鏈接起來。
//定義匹配記錄的謂詞,名稱不匹配「coffee」或「electronics」將丟棄
Predicate<String, Purchase> isCoffee = (key, purchase) -> purchase.getDepartment().equalsIgnoreCase("coffee"); Predicate<String, Purchase> isElectronics = (key, purchase) -> purchase.getDepartment().equalsIgnoreCase("electronics"); //使用標記的證書來區分相應的數組 int coffee = 0; int electronics = 1; KStream<String, Purchase>[] kstreamByDept = purchaseKStream.branch(isCoffee, isElectronics); kstreamByDept[coffee].to( "coffee", Produced.with(stringSerde, purchaseSerde)); kstreamByDept[coffee].print(Printed.<String, Purchase>toSysOut().withLabel( "coffee")); kstreamByDept[electronics].to("electronics", Produced.with(stringSerde, purchaseSerde)); kstreamByDept[electronics].print(Printed.<String, Purchase>toSysOut().withLabel("electronics"));
KStream<K,V>[] branch(Predicate<? super K,? super V>... predicates)詳解:
branch()方法經過基於提供的謂詞(作什麼/是什麼)在原始流中分支記錄,從而今後流建立KStream數組。根據提供的謂詞評估每一個記錄並按順序評估謂詞,分支發生在第一個匹配項上:將原始流中的記錄分配給第一個謂詞的對應結果流,該結果謂詞爲true,而且僅分配給該流。若是沒有任何謂詞評估爲true,則將刪除一條記錄。這是無狀態逐記錄操做。例如案例中若是該記錄名稱匹配不上coffee或electronics則將該記錄丟棄。
上述的代碼已經將流進行分支,可是傳入的Kafka Streams應用程序的購買記錄沒有鍵,所以還要增長一個處理器來生成客戶ID的鍵,從而造成(訂單號+客戶ID)來保證每一條購買記錄都是獨一無二的。
爲了商城鍵,從六種的購買數據中選取客戶ID,要作到這一點,就須要更新原來的KStream實例,並在該節點和分支節點之間新政另一個處理器。
KStream<String, Purchase>[] branchesStream = transactionStream.selectKey((k,v)-> v.getCustomerId()).branch(coffeePurchase, electronicPurchase);
KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)
selectKey()方法爲每一個輸入記錄設置一個新密鑰(可能爲新類型),提供的 KeyValueMapper 將應用於每一個輸入記錄併爲此計算一個新密鑰。
在 Kafka Streams中,不管什麼時候調用一個可致使產生一個新鍵( selectKey map或 transform)的方法,內部一個布爾類型的標識位就會被設置爲true,這就代表新的 KStream實例須要從新分區。若是設置了這個布爾標識位,那麼在執行join、 reduce或者聚合操做時,將會自動進行從新分區。在本例,對 transactionStreamelectKey執行了一個()操做,那麼產生的 KStream實例就被標記爲從新分區。此外,當即執行一個分支操做,那麼branch()方法調用產生的每一個KStream實例也被標記爲從新分區。
public class PurchaseJoiner implements ValueJoiner<Purchase, Purchase, CorrelatedPurchase> { @Override public CorrelatedPurchase apply(Purchase purchase, Purchase otherPurchase) { CorrelatedPurchase.Builder builder = CorrelatedPurchase.newBuilder(); Date purchaseDate = purchase != null ? purchase.getPurchaseDate() : null; Double price = purchase != null ? purchase.getPrice() : 0.0; String itemPurchased = purchase != null ? purchase.getItemPurchased() : null; Date otherPurchaseDate = otherPurchase != null ? otherPurchase.getPurchaseDate() : null; Double otherPrice = otherPurchase != null ? otherPurchase.getPrice() : 0.0; String otherItemPurchased = otherPurchase != null ? otherPurchase.getItemPurchased() : null; List<String> purchasedItems = new ArrayList<>(); if (itemPurchased != null) { purchasedItems.add(itemPurchased); } if (otherItemPurchased != null) { purchasedItems.add(otherItemPurchased); } String customerId = purchase != null ? purchase.getCustomerId() : null; String otherCustomerId = otherPurchase != null ? otherPurchase.getCustomerId() : null; builder.withCustomerId(customerId != null ? customerId : otherCustomerId) .withFirstPurchaseDate(purchaseDate) .withSecondPurchaseDate(otherPurchaseDate) .withItemsPurchased(purchasedItems) .withTotalAmount(price + otherPrice); return builder.build(); } }
ValueJoiner<V1,V2,VR>接口用於將兩個值鏈接到任意類型的新值中,其中apply()返回由 value1 和 value2 組成的已鏈接值。
VR apply(V1 value1,V2 value2)
value1-聯接的第一個值;
value-聯接的第二個值;
//根據以前設置的0/1來區分提取分支流 KStream<String, Purchase> coffeeStream = branchesStream[COFFEE_PURCHASE]; KStream<String, Purchase> electronicsStream = branchesStream[ELECTRONICS_PURCHASE]; //執行攔截操做的ValueJoiner實例,PurchaseJoiner實現ValueJoiner 接口,該接口用於將兩個值鏈接到任意類型的新值中。 ValueJoiner<Purchase, Purchase, CorrelatedPurchase> purchaseJoiner = new PurchaseJoiner();
//設置時間,指定若是同一鍵的記錄的時間戳在指定時間內,即要記錄的時間戳早於或晚於主要記錄的時間戳,則該記錄能夠聯接。 JoinWindows twentyMinuteWindow = JoinWindows.of(60 * 1000 * 20); //調用join方法,觸發coffeeStream和electronicesStream自動從新分區 KStream<String, CorrelatedPurchase> joinedKStream = coffeeStream.join(electronicsStream,
purchaseJoiner, twentyMinuteWindow, //構造鏈接 Joined.with(stringSerde, purchaseSerde, purchaseSerde)); //將結果達應到控制檯 joinedKStream.print(Printed.<String, CorrelatedPurchase>toSysOut().withLabel("joined KStream"));
KStream.join()方法詳解:
KStream<K,VR> join(KStream<K,VO> otherStream,ValueJoiner<? super V,? super VO,? extends VR> joiner,JoinWindows windows,Joined<K,V,VO> joined)
ortherStream-要與此流鏈接的KStream;
joiner-一個ValueJoiner,它爲一對匹配記錄計算聯接結果;
windows-JoinWindows的規範;
joined-一個Joined實例,該實例定義要用於序列化/反序列化Joined流的輸入和輸出的Serdes;
向 KStream.join方法提供了4個參數,各參數說明以下:
■electronicsstream要鏈接的電子產品購買流。
■purchaseJoiner ValueJoiner<v1,v2,r>接口的一個實現, ValueJoiner
接收兩個值(不必定是相同類型) ValueJoiner. apply方法執行用於特定實現的邏輯並返回一個R類型(多是一個全新的類型)的對象(有多是新建立的對象)本示例中, purchaseJoiner將增長一些從兩個 Purchase對象中獲取的相關信息,並返回一個 CorrelatedPurchase對象。
■ twentyMinuteWindow個 Joinwindows實例。 Joinwindows.of方法指定鏈接的兩個值之間的最大時間差。本示例中,時間戳必須在彼此20分鐘之內。
■ 一個 Joined實例提供執行鏈接操做的可選參數。本示例中。提供流的鍵和值對應的 Serde,以及第二個流的值對應的 Serde。僅提供鍵的 Serde,由於鏈接記錄時兩個記錄的鍵必須是相同類型。
【注意:鏈接操做時序列化與反序列化器( Serde)是必需的,由於鏈接操做的參與者物化在窗口狀態存儲中。本示例中,只提供了鍵的 Serde,由於鏈接操做的兩邊的鍵必須是相同類型】
本示例僅指定購買事件要在彼此20分鐘之內,可是沒有要求順序。只要時間戳在彼此20分鐘之內,鏈接操做就會發生。
兩個額外的 JoinWindows()方法可供使用,可使用它們來指定事件的順序。
■ JoinWindows. after streamA.join(streamB,......,twentyMinuteWindow.after(5000),......),這句指定了 streamB記的時間戳最多比 streamA記錄的時間戳滯後5秒。窗口的起始時間邊界不變。
■ JoinWindows. before-streamA. join(stream,......,twentyMinuteWindow. before(500),......),這句指定了 streamB記錄的時間戳最多比 streamA記錄的時間戳提前5秒。窗口的結束時間邊界不變。
before()和 after()方法的時間差均以毫秒錶示。用於鏈接的時間間隔是滑動窗口的一個例子,在下章將詳細介紹窗口操做。
【注意:在代碼清單4-13中,依賴於交易的實際時間戳,而不是Kafka設置的時間戳。爲了使用交易自帶的時間戳,經過設置 StreamsConfig. DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,指定自定義的時間戳提取器 TransactionTimestampExtractor. class】
如今已經構建了一個鏈接流:在購買咖啡後20分鐘內購買電子產品的客戶將得到一張免費咖啡券。
【注意】本內容選自《Kafka Stream實戰》,本人僅對其中未講解清楚的代碼進行解讀註釋工做,不可私自轉載!