大全Kafka Streams

本文將從如下三個方面全面介紹Kafka Streams

一. Kafka Streams 概念

二. Kafka Streams 使用

三. Kafka Streams WordCount

 

一. Kafka Streams 概念

1.Kafka Stream?
Kafka Streams是一套處理分析Kafka中存儲數據的客戶端類庫,處理完的數據或者寫回Kafka,或者發送給外部系統。它構建在一些重要的流處理概念之上:區分事件時間和處理時間、開窗的支持、簡單有效的狀態管理等。Kafka Streams入門的門檻很低:很容易編寫單機的示例程序,而後經過在多臺機器上運行多個實例便可水平擴展從而達到高吞吐量。Kafka Streams利用Kafka的併發模型以實現透明的負載均衡。
一些亮點:
• 設計成簡單和輕量級的客戶端類庫,能夠和現有Java應用、部署工具輕鬆整合。
• 除了Kafka自身外不依賴其餘外部系統。利用Kafka的分區模型來實現水平擴展並保證有序處理。
• 支持容錯的本地狀態,這使得快速高效處理一些有狀態的操做(如鏈接和開窗聚合)成爲可能。
• 支持一次一條記錄的處理方式以實現低延遲,也支持基於事件時間的開窗操做。
• 提供了兩套流處理原語:高層的流DSL和低層的處理器API。java

核心概念
2. Stream Processing Topology(流處理拓撲)node

1)、stream是Kafka Stream最重要的抽象,它表明了一個無限持續的數據集。stream是有序的、可重放消息、對不可變數據集支持故障轉移數據庫

2)、一個流處理應用程序經過一或多個「處理器拓撲(processor topology)」來定義其計算邏輯,一個processor topology就是一張以流處理器(stream processor、節點)和流[streams](邊)構成的圖。(實際爲DAG,太熟悉了吧,多麼相似spark Streaming)express

3)、一個stream processor是processor topology中的一個節點,它表明一個在stream中的處理步驟:從上游processors接受數據、進行一些處理、最後發送一到多條數據到下游processorsapache

Kafka Stream提供兩種開發流處理拓撲(stream processing topology)的API
1)、high-level Stream DSL:提供通用的數據操做,如map和fileter
2)、lower-level Processor API:提供定義和鏈接自定義processor,同時跟state store(下文會介紹)交互windows


3. 時間
時間的概念在流處理中很關鍵,好比開窗這種操做就是根據時間邊界來定義的。上面也提到過兩個常見概念:微信

• 事件時間:事件或數據記錄發生的時刻。
• 處理時間:事件或數據記錄被流處理應用開始處理的時刻,好比記錄開始被消費。處理時間可能比事件時間晚幾毫秒到幾天不等。數據結構

• 攝取時間:數據記錄由KafkaBroker保存到 kafka topic對應分區的時間點。攝取時間相似事件時間,都是一個嵌入在數據記錄中的時間戳字段。不一樣的是,攝取時間是由Kafka Broker附加在目標Topic上的,而不是附加在事件源上的。若是事件處理速度足夠快,事件產生時間和寫入Kafka的時間差就會很是小,這主要取決於具體的使用狀況。所以,沒法在攝取時間和事件時間之間進行二選一,兩個語義是徹底不一樣的。同時,數據還有可能沒有攝取時間,好比舊版本的Kafka或者生產者不能直接生成時間戳(好比沒法訪問本地時鐘。)。併發

事件時間和攝取時間的選擇是經過在Kafka(不是KafkaStreams)上進行配置實現的。從Kafka 0.10.X起,時間戳會被自動嵌入到Kafka的Message中,能夠根據配置選擇事件時間或者攝取時間。配置能夠在broker或者topic中指定。Kafka Streams默認提供的時間抽取器會將這些嵌入的時間戳恢復原樣。app

Kafka Stream 使用TimestampExtractor 接口爲每一個消息分配一個timestamp,具體的實現能夠是從消息中的某個時間字段獲取timestamp以提供event-time的語義或者返回處理時的時鐘時間,從而將processing-time的語義留給開發者的處理程序。開發者甚至能夠強制使用其餘不一樣的時間概念來進行定義event-time和processing time。

注意:Kafka Streams中的攝取時間和其餘流處理系統略有不一樣,其餘流處理系統中的攝取時間指的是從數據源中獲取到數據的時間,而kafka Streams中,攝取時間是指記錄被追加到Kakfa topic中的時間。

 

4. 狀態
有些stream應用不須要state,由於每條消息的處理都是獨立的。然而維護stream處理的狀態對於複雜的應用是很是有用的,好比能夠對stream中的數據進行join、group和aggreagte,Kafka Stream DSL提供了這個功能。

Kafka Stream使用state stores(狀態倉庫)提供基於stream的數據存儲和數據查詢狀態數據,每一個Kafka Stream內嵌了多個state store,能夠經過API存取數據,這些state store的實現能夠是持久化的KV存儲引擎、內存HashMap或者其餘數據結構。Kafka Stream提供了local state store的故障轉移和自動發現。

 

5. KStream和KTable(流和表的雙重性)
Kafka Stream定義了兩種基本抽象:KStream 和 KTable,區別來自於key-value對值如何被解釋,

5.1KStream:
一個純粹的流就是全部的更新都被解釋成INSERT語句(由於沒有記錄會替換已有的記錄)的表。

在一個流中(KStream),每一個key-value是一個獨立的信息片段,好比,用戶購買流是:alice->黃油,bob->麪包,alice->奶酪麪包,咱們知道alice既買了黃油,又買了奶酪麪包。

5.2KTable(changelog流):
KTable 一張表就是一個全部的改變都被解釋成UPDATE的流(由於全部使用一樣的key的已存在的行都會被覆蓋)。

對於一個表table( KTable),是表明一個變化日誌,若是表包含兩對一樣key的key-value值,後者會覆蓋前面的記錄,由於key值同樣的,好比用戶地址表:alice -> 紐約, bob -> 舊金山, alice -> 芝加哥,意味着Alice從紐約遷移到芝加哥,而不是同時居住在兩個地方。

KTable 還提供了經過key查找數據值得功能,該查找功能能夠用在Join等功能上。

這兩個概念之間有一個二元性,一個流能被當作表,而一個表也能夠當作流。

 

6. 低層處理器API
6.1 處理器
開發着經過實現Processor接口並實現process和punctuate方法,每條消息都會調用process方法,punctuate方法會週期性的被調用

6.2 處理器拓撲
有了在處理器API中自定義的處理器,而後就可使用TopologyBuilder來將處理器鏈接到一塊兒從而構建處理器拓撲:

6.3 本地狀態倉庫
處理器API不只能夠處理當前到達的記錄,也能夠管理本地狀態倉庫以使得已到達的記錄均可用於有狀態的處理操做中(如聚合或開窗鏈接)。爲利用本地狀態倉庫的優點,可以使用TopologyBuilder.addStateStore方法以便在建立處理器拓撲時建立一個相應的本地狀態倉庫;或將一個已建立的本地狀態倉庫與現有處理器節點鏈接,經過TopologyBuilder.connectProcessorAndStateStores方法。

 

7. 高層流DSL
爲使用流DSL來建立處理器拓撲,可以使用KStreamBuilder類,其擴展自TopologyBuilder類。Kafka的源代碼中在streams/examples包中提供了一個示例。

7.1 從Kafka建立源端流
Kafka Streams爲高層流定義了兩種基本抽象:記錄流(定義爲KStream)可從一或多個Kafka topic源來建立,更新日誌流(定義爲KTable)可從一個Kafka topic源來建立。

KStream能夠從多個kafka topic中建立,而KTable只能單個topic

KStreamBuilder builder = new KStreamBuilder();
KStream source1 = builder.stream("topic1", "topic2");
KTable source2 = builder.table("topic3");

7.2 轉換一個流
KStream和KTable相應地都提供了一系列轉換操做。每一個操做可產生一或多個KStream和KTable對象,可被翻譯成一或多個相連的處理器。全部這些轉換方法鏈接在一塊兒造成一個複雜的處理器拓撲。由於KStream和KTable是強類型的,這些轉換操做都被定義爲泛類型,使得用戶可指定輸入和輸出數據類型。

這些轉換中,filter、map、mapValues等是無狀態的,可用於KStream和KTable二者,一般用戶會傳一個自定義函數給這些函數做爲參數,例如Predicate給filter,KeyValueMapper給map等:
無狀態的轉換不依賴於處理的狀態,所以不須要狀態倉庫。有狀態的轉換則須要存取相應狀態以處理和生成結果。例如,在join和aggregate操做裏,一個窗口狀態用於保存當前預約義窗口中收到的記錄。因而轉換能夠獲取狀態倉庫中累積的記錄,並執行計算。

7.3 寫回到kafka(Write streams back to Kafka)
最後,開發者能夠將最終的結果stream寫回到kafka,經過 KStream.to and KTable.to

joined.to("topic4");
若是應用但願繼續讀取寫回到kafka中的數據,方法之一是構造一個新的stream並讀取kafka topic,Kafka Stream提供了另外一種更方便的方法:through

joined.to("topic4");
materialized = builder.stream("topic4");
KStream materialized = joined.through("topic4");

8. 窗口
一個流處理器可能須要將數據劃分爲多個時間段,這就是流上窗口。這一般在Join或者aggregation聚合等保存本地狀態的處理程序中使用。

Kafka StreamsDSL API提供了可用的窗口操做,用戶能夠指定數據在窗口中的保存期限。這就容許Kafka Streams在窗口中保留一段時間的舊數據以等待其它晚到的數據。若是保留期過了以後數據纔到達,這條消息就不能被處理,會被丟掉。
實時的流處理系統中,數據亂序老是存在的,這主要取決於數據在有效時間內如何進行處理。對於在正處於處理期的時間內的數據,若是數據亂序,延遲到達,在語義上就能夠被正常處理,若是數據到達時候,已經不在處理期,那麼這種數據就不適合處理期的語義,只能被丟棄掉。

9. Join
Join操做負責在Key上對兩個流的記錄進行合併,併產生新流。一個基於流上的Join一般是基於窗口的,不然全部數據就都會被保存,記錄就會無限增加。

KafkaStreams DSL支持不一樣的Join操做,好比KStram和KStream之間的Join,以及KStream和KTable之間的Join。

10. Aggregations
聚合操做須要一個輸入流,而且以多個輸入記錄爲單位組合成單個記錄併產生新流。常見的聚合操做有count和sum。流上的聚合也必須基於窗口進行,不然數據和join同樣都會無限制增加。

在Kafka Streams的DSL中,一個聚合輸入流能夠是KStream形式或者KTable形式,可是輸出流永遠都是KTable。這就使得Kafka Streams的輸出結果會被不斷更新,這樣,當有數據亂序到達以後,數據也能夠被及時更新,由於最終輸出是KTable,新key會覆蓋舊值。

配置參數

 

二. Kafka Streams 使用

1 概述
kafka Streams是一個客戶端庫(client library),用於處理和分析儲存在Kafka中的數據,並把處理結果寫回Kafka或發送到外部系統的最終輸出點。它創建在一些很重要的概念上,好比事件時間和消息時間的準確區分,開窗支持,簡單高效的應用狀態管理。Kafka Streams的門檻很低:你能夠快速編寫一個小規模的原型運行在一臺獨立主機中;而後你只須要在其餘主機主機上部署應用的實例,就能夠完成到大規模生產環境的擴展。Kafka Streams利用Kafka的並行模型,能夠透明處理同一應用的多實例負載均衡。
kafka Streams的特色:
*被設計爲一個簡單輕量級的客戶端庫,能夠嵌入到Java應用,整合到已有的包、部署環境或者其餘用戶的流應用處理工具。
*除了Kafka自身作爲內部消息層外,沒有其餘系統依賴。使用Kafka分區模型來水平擴展並保證絕對的順序性。
*支持本地狀態容錯,能夠執行很是快速有效的有狀態操做,好比joins和windowed aggregations(窗口聚合)。
*採用「一次處理一條記錄(one-record-at-a-time)」的方式達到低處理延遲,支持基於開窗操做的事件消息(event-time)。
*提供必要的流處理基礎件,包括一個高級Streams DSL和一個底層處理API(Processor API)。


2 開發指南
2.1 核心概念
2.1.1 流處理過程拓撲圖
*一個流(stream)是Kafka中最重要的抽象概念:它表明了一個無界,持續更新的數據集。一個流是一個有序,可重複讀取,容錯的不可變數據記錄序列,一個數據記錄被定義爲一個鍵值對(key-value pair)。
*一個流處理應用,用Kafka Streams開發,定義了通過若干個處理拓撲(processor topologies)的計算邏輯,每一個處理拓撲是一個經過流(線,edge)鏈接到流處理實例(點,node)的圖。
*一個流處理實例(processor)是一個處理拓撲的節點;其含義是,經過從拓撲圖中它的上游處理節點每次接收一條輸入記錄,執行一步流數據的變換,多是請求操做流數據,也有可能隨後生產若干條記錄給到下游處理實例。
2.1.2 時間
流處理中一個臨界面就是時間概念,以及它是怎麼定義和整合的。好比,像開窗(windowing)這樣的操做定義是基於時間邊界的。
流中經常使用的消息概念有:
*事件時間————當事件或數據記錄產生的時間點,最初被稱爲"at the source"(起源)。
*處理時間————當事件或數據記錄被流處理應用開始處理的時間點,也就是記錄開始被消費的時間。處理時間會比源事件時間晚若干毫秒、小時,甚至若干天。
*存儲時間————當事件或者數據記錄被Kafka broker儲存到一個主題分區的時間。和事件時間不一樣的是,存儲時間是發生在Kafka broker把記錄添加到目標主題時,而不是記錄建立時。和處理時間不一樣的是,處理時間發生在流處理應用處理記錄時。好比,若是一個記錄歷來沒被處理過,那它就沒有處理時間的概念,可是它仍是有存儲時間。
選擇事件時間仍是存儲時間,是經過Kafka配置文件肯定的(不是Kafka Streams):在Kafka 0.10.x以前,時間戳會自動嵌入到Kafka消息中。經過Kafka的配置項,這些時間戳能夠表明事件時間或存儲時間。該項能夠配置在broker級或單個topic。默認Kafka Streams中時間戳提取器會把嵌入的時間戳原樣提取。因此,你應用中有效的時間含義依賴於Kafka中這些嵌入時間戳的配置。
Kafka Streams把每個時間戳關聯到每一個數據記錄經過接口TimestampExtractor。該接口的具體實現會檢索或計算時間戳,數據記錄確實產生內容的時間被當作嵌入時間戳時表明事件時間語義,或者用其餘方法如當前時鐘時間獲取的處理時時間,會表明處理時間語義。開發者能夠鑑於此依照業務須要使用不一樣時間概念。好比,單個記錄(per-record)時間戳描述了按照時間訪問流的進度(雖然流中的記錄多是無序的),而後被依賴於時間的操做(如joins)利用。
最後,不管什麼時候一個Kafka Streams應用寫記錄到Kafka,都會給新記錄關聯一個時間戳。關聯時間戳的方法依賴於context對象:
*當經過處理輸入記錄而產生新輸出記錄時,好比,用context.forward()觸發process()方法調用,輸出記錄會直接繼承輸入記錄的時間戳。
*當經過周期函數產生新輸出記錄時(如punctuate),輸出記錄的時間戳被定義爲當前流任務的內部時間(經過context.timestamp())。
*爲了聚合性,更新記錄聚合的結果時間戳就是最新輸入記錄到達時觸發的更新時間。
2.1.3 狀態
某些流處理應用不須要狀態,也就是一個消息處理過程不依賴於取他消息的處理過程。可是,能夠保持狀態會提供更多更復雜的流處理過程:你能夠組合(join)輸入流,分組並聚合數據記錄。不少這種有狀態的操做均可以經過Kafka Streams DSL獲得。
Kafka Streams提供了所謂的狀態存儲(state stores),能夠被流處理應用用於保存和查詢數據。當實現有狀態操做時,這是很是有用的功能。每一個Kafka Streams任務會嵌入若干個狀態存儲,經過API訪問存儲的狀態能夠保存或查詢處理過程須要的數據。這些狀態存儲能夠保存爲持久化鍵值對,一個內存哈希表,或者其餘實用的數據結構。Kafka Streams提供了本地狀態存儲的容錯和自動還原。
Kafka Streams容許直接只讀查詢(read-only query)狀態存儲,能夠經過方法、線程、處理過程或和建立數據存儲的應用無關的應用。這個功能被稱爲「交互式查詢」 (Interactive Query)。全部的存儲都被命名,並且交互式查詢底層實現只開放了讀操做。
如前所述,一個Kafka Streams應用的計算邏輯被定義爲一個處理拓撲。當前Kafka Streams提供了兩組API用於定義處理拓撲。


2.2 底層處理API
2.2.1 Processor類
開發人員能夠定製本身的業務處理邏輯,經過繼承Process類。該接口提供了process和punctuate方法。process方法會在每條記錄上執行;punctuate方法會被週期性調用。另外,processor接口能夠保持當前ProcessorContext實例變量(在init方法中初始化),用context來設定punctuate調用週期(context().schedule),轉發修改/新鍵值對到下游Processor實例(context().forwar),提交當前處理進度(context().commit),等等。

    public class MyProcessor extends Processor {
        private ProcessorContext context;
        private KeyValueStore kvStore;
 
        @Override
        @SuppressWarnings("unchecked")
        public void init(ProcessorContext context) {
            this.context = context;
            this.context.schedule(1000);
            this.kvStore = (KeyValueStore) context.getStateStore("Counts");
        }
 
        @Override
        public void process(String dummy, String line) {
            String[] words = line.toLowerCase().split(" ");
 
            for (String word : words) {
                Integer oldValue = this.kvStore.get(word);
 
                if (oldValue == null) {
                    this.kvStore.put(word, 1);
                } else {
                    this.kvStore.put(word, oldValue + 1);
                }
            }
        }
 
        @Override
        public void punctuate(long timestamp) {
            KeyValueIterator iter = this.kvStore.all();
 
            while (iter.hasNext()) {
                KeyValue entry = iter.next();
                context.forward(entry.key, entry.value.toString());
            }
 
            iter.close();
            context.commit();
        }
 
        @Override
        public void close() {
            this.kvStore.close();
        }
    };

上面的示例中執行了以下的操做:
*init:設定punctuate調用週期爲1秒,獲取本地狀態存儲並命名爲「Counts」.
*process: 根據每條收到的記錄,把輸入字符串值分割爲單詞,把他們的計數更新到狀態存儲(咱們在下一節討論該功能)。
*punctuate:迭代本地狀態存儲,發送計數集合到下游處理器,提交當前流狀態。
2.2.2 處理拓撲(Processor Topology)
實現自定義Processor的同時,開發人員能夠用TopologyBuilder構建一個處理拓撲,把各個Processor過程鏈接在一塊兒:

  TopologyBuilder builder = new TopologyBuilder();
 
    builder.addSource("SOURCE", "src-topic")
 
        .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
        .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
 
        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");

上面代碼中建立拓撲有幾個步驟,下面簡略說明一下:
*首先,調用addSource方法將一個源節點(命名爲「SOURCE」)添加到拓撲中,並和一個Kafka主題「src-topic」關聯,。
*其次,調用addProcessor添加三個處理節點,在這裏,第一個處理實例是「SOURCE」節點的孩子,可是是其餘兩個實例的父親。
*最後,調用addSink添加三個槽(sink)節點到已經部署好的拓撲中,每個從不一樣父Processor節點來的管道都寫入不一樣的主題。
2.2.3 本地狀態存儲
注意,ProcessorAPI不限制應用僅僅訪問當前到達的記錄,也能夠訪問以前保存了以前到達記錄的本地狀態存儲,用於聚合或窗口組合等有狀態處理操做。爲了利用本地狀態的優點,開發者使用TopologyBuilder.addStateStore方法在構建處理拓撲時建立本地狀態,並把它和須要訪問它的處理節點關聯起來;或者用TopologyBuilder.connectProcessorAndStateStores方法鏈接已建立的本地狀態存儲和已存在的處理節點。

TopologyBuilder builder = new TopologyBuilder();
 
    builder.addSource("SOURCE", "src-topic")
 
        .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
        // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
        .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
 
        // connect the state store "COUNTS" with processor "PROCESS2"
        .connectProcessorAndStateStores("PROCESS2", "COUNTS");
 
        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");


2.3 高級Streams DSL
使用Streams DSL構建一個處理拓撲,開發者須要使用KStreamBuilder類,該類繼承自TopologyBuilder類。streams/examples包內有一個簡單的包含源碼的示例。本節剩餘部分,會經過一些實例代碼來展示使用Streams DSL建立一個拓撲的關鍵步驟,不過咱們仍是建議開發者閱讀完整的實例以瞭解全部的細節。
2.3.1 KStream類和KTable類
DSL用到了兩個主要的抽象概念。一個KStream實例是一個記錄流的抽象,記錄流中每條數據記錄表明了一個無界數據集中的一個獨立數據。一個KTable實例是一個更新日誌流的抽象,更新日誌流中每一條數據表明了一個更新。更準確的說,數據記錄中的值表明了同一個記錄關鍵字的最新更新值,若是有相同關鍵字記錄的話(若是關鍵字不存在,那麼更新動做會建立一個)。爲了說明KStream和KTable的區別,咱們有下面兩個記錄發往流:("alice", 1) --> ("alice", 3)。若是這兩條記錄保存在KStream實例,流處理應用累加他們的值會獲得結果4。若是這兩條記錄保存在KTable實例,獲得的結果是3,由於後一個記錄會被當作是前一個記錄的更新。
2.3.2 從Kafka建立流數據源
不管是記錄流(用KStream定義)仍是更新日誌流(用KTable定義),均可以被建立爲一個流數據源,數據來自若干個Kafka主題(KTable只能建立單主題的數據源)。

    KStreamBuilder builder = new KStreamBuilder();
    KStream source1 = builder.stream("topic1", "topic2");
    KTable source2 = builder.table("topic3", "stateStoreName");

 

2.3.3 數據流開窗
某個流處理過程可能須要把數據記錄按時間分組,也就是按時間把流分爲多個窗口。經過join和聚合操做會用到這個。Kafka Streams目前定義了以下幾種窗口:
*Hopping time window 時間跨越窗口,基於時間間隔,模擬了大小固定、(可能)重疊的窗口。一個跨越窗口由兩個屬性肯定:窗口大小和跨越步長(前進間隔)(即「hop」跳)。前進間隔指定了一個窗口每次相對於前一個窗口向前移動的距離。好比,你能夠配置一個長度5分鐘的跨越窗口,前進間隔是1分鐘。跨越窗口可能覆蓋了一個記錄,該記錄屬於若干個這樣的窗口。
*Tumbling time windows 是一個特殊的跨越窗口,因此也是基於時間間隔。它模擬了大小固定、不可重疊、無間隙的一類窗口。一個trumbing窗口由一個屬性肯定:窗口大小。投入大家 trumbing窗口是一個窗口大小等於前進步長的跨越窗口。由於它不會重疊,一條記錄也僅屬於惟一的窗口。
*sliding window,滑動窗口,模擬了大小固定並沿着時間軸連續滑動的窗口。這裏,有兩條數據記錄存在於同一個窗口,他們時間戳不一樣可是都在窗口大小內。因此,滑動窗口沒有和某個時間點對齊,而是和數據記錄時間戳對齊。在Kafka流中,滑動窗口只有在join操做時纔用到,能夠用JoinWindows類來定義。
2.3.4 join操做
一個join(合併)操做就是合併兩個數據流,基於他們數據的鍵,而後生成一個新流。一個記錄流上的join操做一般須要基於窗口操做(即分段執行),由於用於執行join操做的記錄數量可能會無限增加。Kafka Streams定義了以下幾個join操做:
*KStream-to-KStream  Joins:就是windowed join(窗口合併),由於用於計算join操做的內存大小和狀態多是無限增加的。這裏,假設從須要和其餘記錄流進行join操做的流,新接收到一條記錄,按照指定的窗口間隔生產一個結果,用於每一個符合用戶提供的ValueJoiner類要求的鍵值對。一個從join操做返回的新的KStream實例表明了join操做的結果。
*KTable-to-KTable Joins:這個join操做用於和關係數據庫中對應記錄保持一致。這裏,兩個更新日誌流先實例化到本地狀態存儲中。當收到其中某個流的新記錄,就把記錄合併到另外一個流的實例化狀態存儲中,而後生產一個符合用戶提供的ValueJoiner類的鍵值對結果。join操做返回一個新的KTable實例表明了流合併的結果,它仍然是一個更新日誌流。
*KSream-to-KTable Joins:容許你根據記錄流(KStream)接收到的新數據,在更新日誌流(KTable)中執行表查詢。一種應用是能夠用最新的用戶資料信息(KTable)補充用戶行爲信息(KStream)。只有當從記錄流接收到記錄時纔會觸發join操做,而後經過ValueJoiner生產結果,反過來不成立(也就是從更新日誌流接收到的記錄只能用於更新實例化狀態存儲)。該操做返回新的KStream實例表明了流合併的結果。
根據操做對象不一樣,join支持以下操做:inner joins,outer joins, 和left joins。他們的語義和關係數據庫中相同。
2.3.5 轉換一個流
KStream和KTable各自提供了一系列轉換操做。每一個操做都會生成一個或多個KStream或KTable對象,能夠被傳入已鏈接的底層處理拓撲中的處理過程。全部這樣轉換方法能夠鏈式組合爲複雜的處理拓撲。KStream和KTable是強類型,可是全部這些轉換操做都被定義爲模板方法,用戶能夠指定輸入輸出的數據類型。
在這些轉換中,filter、map、mapValues等等,都是無狀態轉換操做,均可以在KStream和KTable中調用,用戶只須要傳入一個自定義函數做爲它們形參,好比Predicate傳入到filter,KeyValueMapper傳入到map,等等:
// written in Java 8+, using lambda expressions
    KStream mapped = source1.mapValue(record -> record.get("category"));
無狀態轉換,顧名思義,就是不依賴於處理過程的狀態,因此在實現時不須要關聯流處理實例(stream processor)的狀態存儲。有狀態轉換,換句話說,就是處理時出現訪問關聯狀態而後產生輸出。好比,join和aggregate操做,一般須要一個窗口狀態保存全部接收到的記錄(在窗口範圍內)。而後這些個操做訪問存儲中積累的記錄,用他們作業務邏輯。

// written in Java 8+, using lambda expressions
    KTable<Long> counts = source1.groupByKey().aggregate(
        () -> 0L,  // initial value
        (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
        TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
        Serdes.Long() // serde for aggregated value
    );
 
 
    KStream joined = source1.leftJoin(source2,
        (record1, record2) -> record1.get("user") + "-" + record2.get("region");
    );

2.3.6 把流寫回Kafka
處理完數據後,用戶能夠選擇(持續的)把最終結果流寫入一個Kafka主題,經過KStream.to和KTable.to方法。
joined.to("topic4");
若是你的應用須要持續讀取並處理那些經過to方法寫入到主題的記錄,有一個辦法是構造一個新的流從輸出主題讀取數據;Kafka Streams提供了一個便利的方法叫through:

    // equivalent to
    //
    // joined.to("topic4");
    // materialized = builder.stream("topic4");
    KStream materialized = joined.through("topic4");


除了定義拓撲以外,開發者還要在運行拓撲前配置文件StreamsConfig。

 

三. Kafka Streams WordCount

本文展現了kafka Stream Wordcount 例子的兩種寫法

kafka Stream 版本0.10.1.0

此例子 使用了高層流DSL建立kStream 多實例(instances1,instances2爲兩個實例)並行計算處理了從topic1 中讀取的數據。

package com.us.kafka.Stream;

import java.util.Arrays;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import com.us.kafka.KafkaConfig;

import java.util.Properties;

import static org.apache.kafka.common.serialization.Serdes.String;

/**
 * 高層流DSL
 */
public class MyKstream {
    public static void main(String[] args) {

        //tow instances
        KStreamBuilder instances1 = new KStreamBuilder();
//        filterWordCount(builder);
        lambdaFilter(instances1);
        KStreamBuilder instances2 = new KStreamBuilder();
        lambdaFilter(instances2);

        KafkaStreams ks = new KafkaStreams(instances2, init());
        ks.start();
//        Runtime.getRuntime().addShutdownHook(new Thread(ks::close));
    }

    public static Properties init() {
        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "MyKstream");
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.metadata_broker_list);
        properties.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, KafkaConfig.zookeeper);
        properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, String().getClass().getName());
        properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, String().getClass().getName());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return properties;
    }


    private static void filterWordCount(KStreamBuilder builder) {
        KStream<String, String> source = builder.stream("topic1");
        KTable<String, Long> count = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        }).filter(new Predicate<String, String>() {

            @Override
            public boolean test(String key, String value) {
                if (value.contains("abel")) {
                    return true;
                }
                return false;
            }
        }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {

            public KeyValue<String, String> apply(String key, String value) {

                return new KeyValue<String, String>(value + "--read", value);
            }

        }).groupByKey().count("us");
        count.print();
//        count.to("topic2");
    }

    private static void lambdaFilter(KStreamBuilder builder) {
        KStream<String, String> textLines = builder.stream("topic1");

        textLines
                .flatMapValues(value -> Arrays.asList(value.split(" ")))
                .map((key, word) -> new KeyValue<>(word, word))
                .filter((k, v) -> (!k.contains("message")))
//              .through("RekeyedIntermediateTopic")
                .groupByKey().count("us").print();
        System.out.println("-----------2-----------");

    }


}

 

<properties>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
        </dependency>

        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>

        <!-- kafka Stream -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.10.1.0</version>
        </dependency>
    </dependencies>

 

運行下面代碼條件:

1. java 1.8+

2. kafka 0.10+

public class StreamDemo {
 
    public static Map<String, Object> connection() {
        Map<String, Object> properties = new HashMap<>();
        // 指定一個應用ID,會在指定的目錄下建立文件夾,裏面存放.lock文件
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
        // 指定kafka集羣
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "server01:9092");
        // 指定一個路徑建立改應用ID所屬的文件
        properties.put(StreamsConfig.STATE_DIR_CONFIG, "E:\\kafka-stream");
        // key 序列化 / 反序列化
        properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        // value 序列化 / 反序列化
        properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return properties;
    }
 
    public static void main(String[] args) throws IOException {
        // 建立一個StreamsConfig對象
        StreamsConfig config = new StreamsConfig(StreamDemo.connection());
        // KStreamBuilder builder = new KStreamBuilder();
        // 建立一個TopologyBuilder對象
        TopologyBuilder builder = new TopologyBuilder();
        // 添加一個SOURCE,接收兩個參數,param1 定義一個名稱,param2 從哪個topic讀取消息
        builder.addSource("SOURCE", "topic-input")
                // 添加第一個PROCESSOR,param1 定義一個processor名稱,param2 processor實現類,param3 指定一個父名稱
                .addProcessor("PROCESS1", MyProcessorA::new, "SOURCE")
 
                // 添加第二個PROCESSOR,param1 定義一個processor名稱, param2 processor實現類,param3 指定一個父名稱
                .addProcessor("PROCESS2", MyProcessorB::new, "PROCESS1")
                // 添加第三個PROCESSOR,param1 定義一個processor名稱, param2 processor實現類,param3 指定一個父名稱
                .addProcessor("PROCESS3", MyProcessorC::new, "PROCESS2")
 
                // 最後添加SINK位置,param1 定義一個sink名稱,param2 指定一個輸出TOPIC,param3 指定接收哪個PROCESSOR的數據
                .addSink("SINK1", "topicA", "PROCESS1")
                .addSink("SINK2", "topicB", "PROCESS2")
                .addSink("SINK3", "topicC", "PROCESS3");
 
        // 建立一個KafkaStreams對象,傳入TopologyBuilder和StreamsConfig
        KafkaStreams kafkaStreams = new KafkaStreams(builder, config);
        // 啓動kafkaStreams
        kafkaStreams.start();
    }
}

MyProcessor 實現Processor接口

/**
 * 自定義處理器,實現processor接口
 * 1. 在init方法中作初始化
 * 2. process中接收到key / value pair,對value作處理,最後能夠在裏面作forward。
 * 3. punctuate
 */
public class MyProcessorA implements Processor<String, String> {
 
    private ProcessorContext context;
 
    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
        this.context.schedule(1000);
    }
 
    /**
     * @param key 消息的key
     * @param value 消息的value
     */
    @Override
    public void process(String key, String value) {
        String line = value + "MyProcessor A  ----   ";
 
        // 將處理完成的數據轉發到downstream processor,好比當前是processor1處理器,經過forward流向到processor2處理器
        context.forward(key, line);
    }
 
    @Override
    public void punctuate(long timestamp) {
 
    }
 
    @Override
    public void close() {
 
    }
}

 

關於Kafka深刻學習視頻, 如Kafka領導選舉, offset管理, Streams接口, 高性能之道, 監控運維, 性能測試等,

請關注我的微信公衆號: 求學之旅, 發送Kafka, 便可收穫Kafka學習視頻大禮包一枚。 

 

參考文章:

http://blog.csdn.net/mayp1/article/details/51626643
http://blog.csdn.net/ransom0512/article/details/52038548
https://blog.csdn.net/blwinner/article/details/53637932
https://blog.csdn.net/u012373815/article/details/53648757
https://blog.csdn.net/u012373815/article/details/53728101
https://blog.csdn.net/lmh94604/article/details/53187935

官方網站:

http://kafka.apache.org/documentation/streams/https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streamshttps://cwiki.apache.org/confluence/display/KAFKA/Ecosystemhttps://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns

相關文章
相關標籤/搜索