1.入門指南html
1.1簡介java
咱們認爲,一個流處理平臺應該具備三個關鍵能力:git
Kafka擅長哪些方面?github
它被用於兩大類應用:web
想要了解Kafka如何具備這些能力,讓咱們從下往上深刻探索Kafka的能力。正則表達式
首先,明確幾個概念:算法
Kafka有四個核心API:數據庫
Kafka的客戶端和服務器之間的通訊是靠一個簡單的,高性能的,與語言無關的TCP協議完成的。這個協議有不一樣的版本,並保持向後兼容舊版本(向前兼容舊版本?)。Kafka不光提供了一個Java客戶端,還有許多語言版本的客戶端。apache
主題和日誌編程
讓咱們先來了解Kafka的核心抽象概念記錄流 – 主題。
主題是一種分類或發佈的一系列記錄的名義上的名字。Kafka的主題始終是支持多用戶訂閱的; 也就是說,一個主題能夠有零個,一個或多個消費者訂閱寫入的數據。
對於每個主題,Kafka集羣保持一個分區日誌文件,看下圖:
每一個分區是一個有序的,不可變的消息序列,新的消息不斷追加到這個有組織的有保證的日誌上。分區會給每一個消息記錄分配一個順序ID號 – 偏移量, 可以惟一地標識該分區中的每一個記錄。
Kafka集羣保留全部發布的記錄,無論這個記錄有沒有被消費過,Kafka提供可配置的保留策略去刪除舊數據(還有一種策略根據分區大小刪除數據)。例如,若是將保留策略設置爲兩天,在記錄公佈後兩天,它可用於消費,以後它將被丟棄以騰出空間。Kafka的性能跟存儲的數據量的大小無關, 因此將數據存儲很長一段時間是沒有問題的。
事實上,保留在每一個消費者元數據中的最基礎的數據就是消費者正在處理的當前記錄的偏移量(offset)或位置(position)。這種偏移是由消費者控制:一般偏移會隨着消費者讀取記錄線性前進,但事實上,由於其位置是由消費者進行控制,消費者能夠在任何它喜歡的位置讀取記錄。例如,消費者能夠恢復到舊的偏移量對過去的數據再加工或者直接跳到最新的記錄,並消費從「如今」開始的新的記錄。
這些功能的結合意味着,實現Kafka的消費者的代價都是很小的,他們能夠增長或者減小而不會對集羣或其餘消費者有太大影響。例如,你可使用咱們的命令行工具去追隨任何主題,並且不會改變任何現有的消費者消費的記錄。
數據日誌的分區,一舉數得。首先,它們容許數據可以擴展到更多的服務器上去。每一個單獨的分區的大小受到承載它的服務器的限制,但一個話題可能有不少分區,以便它可以支持海量的的數據。其次,更重要的意義是分區是進行並行處理的基礎單元。
分佈式
日誌的分區會跨服務器的分佈在Kafka集羣中,每一個服務器會共享分區進行數據請求的處理。每一個分區能夠配置必定數量的副本分區提供容錯能力。
每一個分區都有一個服務器充當「leader」和零個或多個服務器充當「followers」。 leader處理全部的讀取和寫入分區的請求,而followers被動的從領導者拷貝數據。若是leader失敗了,followers之一將自動成爲新的領導者。每一個服務器可能充當一些分區的leader和其餘分區的follower,這樣的負載就會在集羣內很好的均衡分配。
生產者發佈數據到他們所選擇的主題。生產者負責選擇把記錄分配到主題中的哪一個分區。這可使用輪詢算法( round-robin)進行簡單地平衡負載,也能夠根據一些更復雜的語義分區算法(好比基於記錄一些鍵值)來完成。
消費者以消費羣(consumer group )的名稱來標識本身,每一個發佈到主題的消息都會發送給訂閱了這個主題的消費羣裏面的一個消費者的一個實例。消費者的實例能夠在單獨的進程或單獨的機器上。
若是全部的消費者實例都屬於相同的消費羣,那麼記錄將有效地被均衡到每一個消費者實例。
若是全部的消費者實例有不一樣的消費羣,那麼每一個消息將被廣播到全部的消費者進程。
兩個服務器的Kafka集羣具備四個分區(P0-P3)和兩個消費羣。A消費羣有兩個消費者,B羣有四個。
更常見的是,咱們會發現主題有少許的消費羣,每個都是「邏輯上的訂閱者」。每組都是由不少消費者實例組成,從而實現可擴展性和容錯性。這只不過是發佈 – 訂閱模式的再現,區別是這裏的訂閱者是一組消費者而不是一個單一的進程的消費者。
Kafka消費羣的實現方式是經過分割日誌的分區,分給每一個Consumer實例,使每一個實例在任什麼時候間點的均可以「公平分享」獨佔的分區。維持消費羣中的成員關係的這個過程是經過Kafka動態協議處理。若是新的實例加入該組,他將接管該組的其餘成員的一些分區; 若是一個實例死亡,其分區將被分配到剩餘的實例。
Kafka只保證一個分區內的消息有序,不能保證一個主題的不一樣分區之間的消息有序。分區的消息有序與依靠主鍵進行數據分區的能力相結合足以知足大多數應用的要求。可是,若是你想要保證全部的消息都絕對有序能夠只爲一個主題分配一個分區,雖然這將意味着每一個消費羣同時只能有一個消費進程在消費。
保證
Kafka提供瞭如下一些高級別的保證:
對這些保證的更多細節能夠參考文檔的設計部分。
如何將Kafka的流的概念和傳統的企業信息系統做比較?
消息處理模型從來有兩種:隊列和發佈-訂閱。在隊列模型中,一組消費者能夠從服務器讀取記錄,每一個記錄都會被其中一個消費者處理; 在發佈-訂閱模式裏,記錄被廣播到全部的消費者。這兩種模式都具備必定的優勢和弱點。隊列的優勢是它可讓你把數據分配到多個消費者去處理,它可讓您擴展你的處理能力。不幸的是,隊列不支持多個訂閱者,一旦一個進程讀取了數據,這個數據就會消失。發佈-訂閱模式可讓你廣播數據到多個進程,可是由於每個消息發送到每一個訂閱者,沒辦法對訂閱者處理能力進行擴展。
Kafka的消費羣的推廣了這兩個概念。消費羣能夠像隊列同樣讓消息被一組進程處理(消費羣的成員),與發佈 – 訂閱模式同樣,Kafka可讓你發送廣播消息到多個消費羣。
Kafka的模型的優勢是,每一個主題都具備這兩個屬性,它能夠擴展處理能力,也能夠實現多個訂閱者,沒有必要二選一。
Kafka比傳統的消息系統具備更強的消息順序保證的能力。
傳統的消息隊列的消息在隊列中是有序的,多個消費者從隊列中消費消息,服務器按照存儲的順序派發消息。然而,儘管服務器是按照順序派發消息,可是這些消息記錄被異步傳遞給消費者,消費者接收到的消息也許已是亂序的了。這實際上意味着消息的排序在並行消費中都將丟失。消息系統一般靠 「排他性消費」( exclusive consumer)來解決這個問題,只容許一個進程從隊列中消費,固然,這意味着沒有並行處理的能力。
Kafka作的更好。經過一個概念:並行性-分區-主題實現主題內的並行處理,Kafka是可以經過一組消費者的進程同時提供排序保證和負載均衡。每一個主題的分區指定給每一個消費羣中的一個消費者,使每一個分區只由該組中的一個消費者所消費。經過這樣作,咱們確保消費者是一個分區惟一的讀者,從而順序的消費數據。由於有許多的分區,因此負載還可以均衡的分配到不少的消費者實例上去。可是請注意,一個消費羣的消費者實例不能比分區數量多。
Kafka做爲存儲系統
任何消息隊列都可以解耦消息的生產和消費,還可以有效地存儲正在傳送的消息。Kafka不同凡響的是,它是一個很是好的存儲系統。
Kafka把消息數據寫到磁盤和備份分區。Kafka容許生產者等待返回確認,直到副本複製和持久化所有完成才認爲成功,不然則認爲寫入服務器失敗。
Kafka使用的磁盤結構很好擴展,Kafka將執行相同的策略無論你是有50 KB或50TB的持久化數據。
因爲存儲的重要性,並容許客戶控制本身的讀取位置,你能夠把Kafka認爲是一種特殊用途的分佈式文件系統,致力於高性能,低延遲的有保障的日誌存儲,可以備份和自我複製。
Kafka流處理
只是讀,寫,以及儲存數據流是不夠的,目的是可以實時處理數據流。
在Kafka中,流處理器是從輸入的主題連續的獲取數據流,而後對輸入進行一系列的處理,並生產連續的數據流到輸出主題。
例如,零售應用程序可能須要輸入銷售和出貨量,根據輸入數據計算出從新訂購的數量和調整後的價格,而後輸出到主題。
這些簡單處理能夠直接使用生產者和消費者的API作到。然而,對於更復雜的轉換Kafka提供了一個徹底集成的流API。這容許應用程序把一些重要的計算過程從流中剝離或者加入流一塊兒。
這種設施可幫助解決這類應用面臨的難題:處理雜亂的數據,改變代碼去從新處理輸入,執行有狀態的計算等
流API創建在Kafka提供的核心基礎單元之上:它使用生產者和消費者的API進行輸入輸出,使用Kafka存儲有狀態的數據,並使用羣組機制在一組流處理實例中實現容錯。
把功能組合起來
消息的傳輸,存儲和流處理的組合看似不尋常倒是Kafka做爲流處理平臺的關鍵。
像HDFS分佈式文件系統,容許存儲靜態文件進行批量處理。像這樣的系統容許存儲和處理過去的歷史數據。
傳統的企業消息系統容許處理您訂閱後才抵達的消息。這樣的系統只能處理未來到達的數據。
Kafka結合了這些功能,這種結合對Kafka做爲流應用平臺以及數據流處理的管道相當重要。
經過整合存儲和低延遲訂閱,流處理應用能夠把過去和將來的數據用相同的方式處理。這樣一個單獨的應用程序,不但能夠處理歷史的,保存的數據,當它到達最後一條記錄不會中止,繼續等待處理將來到達的數據。這是泛化了的的流處理的概念,包括了批處理應用以及消息驅動的應用。
一樣,流數據處理的管道結合實時事件的訂閱令人們可以用Kafka實現低延遲的管道; 可靠的存儲數據的能力令人們有可能使用它傳輸一些重要的必須保證可達的數據。能夠與一個按期加載數據的線下系統集成,或者與一個由於維護長時間下線的系統集成。流處理的組件可以保證轉換(處理)到達的數據。
有關Kafka提供的保證,API和功能的更多信息,看其他文件。
下面描述了一些使用Apache Kafka™的流行用例。更多的關於這些領域實踐的概述,參考這個博客。
Kafka可以很好的替代傳統的消息中間件。消息中間件因爲各類緣由被使用(解耦數據的生產和消費,緩衝未處理的消息等)。相較於大多數消息處理系統,Kafka有更好的吞吐量,內置分區,副本複製和容錯性,使其成爲大規模消息處理應用的理想解決方案。
根據咱們的經驗消息的使用一般具備相對低的吞吐量,但可能須要端到端的低延遲,以及高可靠性的保證,這種低延遲和可靠性的保證偏偏是Kafka可以提供的。
在這一領域Kafka是可以和傳統的消息系統相媲美的,例如ActiveMQ或 RabbitMQ。
最初的用例是用Kafka重建一個用戶活動跟蹤管道使之做爲一組實時發佈 – 訂閱的數據源。這意味着網站活動(網頁瀏覽,搜索,或其餘可能的操做)被看成一組中心主題發佈,每種活動被看成一個主題。這些數據源(feeds)可被一系列的應用訂閱,包括實時處理,實時監測,加載到Hadoop系統或離線數據倉庫系統進行離線處理和報告。
活動追蹤一般會產生巨大的數據量,由於每一個用戶頁面的瀏覽都會產生不少的活動消息。
Kafka一般用於監測數據的處理。這涉及從分佈式應用程序彙集統計數據,生產出集中的運行數據源feeds(以便訂閱)。
許多人用Kafka做爲日誌聚合解決方案的替代品。日誌聚合一般從服務器收集物理日誌文件,並把它們放在一個集中的地方(文件服務器或HDFS)進行處理。Kafka抽象了文件的詳細信息,把日誌或事件數據的簡潔抽象做爲消息流傳輸。這爲低時延的處理提供支持,並且更容易支持多個數據源和分佈式的數據消費。相比集中式的日誌處理系統,Scribe or Flume,Kafka提供一樣良好的性能,並且由於副本備份提供了更強的可靠性保證和更低的端到端延遲。
Kafka的流數據管道在處理數據的時候包含多個階段,其中原始輸入數據從Kafka主題被消費而後彙總,加工,或轉化成新主題用於進一步的消費或後續處理。例如,用於推薦新聞文章的數據流處理管道可能從RSS源抓取文章內容,並將其發佈到「文章」主題; 進一步的處理多是標準化或刪除重複數據,而後發佈處理過的文章內容到一個新的話題; 最後的處理階段可能會嘗試推薦這個內容給用戶。這樣的數據流處理管道基於各個主題建立了實時數據數據流程圖。從版本0.10.0.0開始,Apache Kafka加入了輕量級的但功能強大的流處理庫Kafka Streams ,Kafka Streams支持如上所述的數據處理。除了Kafka Streams,能夠選擇的開源流處理工具包括 Apache Storm and Apache Samza.
Event sourcing 是一種應用程序設計風格,是按照時間順序記錄的狀態變化的序列。Kafka的很是強大的存儲日誌數據的能力使它成爲構建這種應用程序的極好的後端選擇。
Kafka能夠爲分佈式系統提供一種外部提交日誌(commit-log)服務。日誌有助於節點之間複製數據,並做爲一種數據從新同步機制用來恢復故障節點的數據。Kafka的log compaction 功能有助於支持這種用法。Kafka在這種用法中相似於Apache BookKeeper 項目。
本教程假設你從零開始,沒有現成的Kafka或ZooKeeper數據。因爲Kafka控制檯腳本在Unix基礎的和Windows平臺上的不一樣,在Windows平臺上使用bin\windows\,而不是bin/,並修改腳本擴展爲.bat。
下載0.10.2.0釋放和un-tar它。
> tar -xzf kafka_2.11-0.10.2.0.tgz > cd kafka_2.11-0.10.2.0
Kafka使用ZooKeeper的,因此你須要先啓動ZooKeeper的服務器,若是你尚未,您可使用Kafka包裝裏的方便腳原本獲得一個快速和污染的單節點的ZooKeeper實例。
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
如今啓動Kafka服務器:
> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
讓咱們建立一個名爲「test」主題,只有一個分區,只有一個副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
如今咱們能夠看到,若是咱們運行的列表主題命令話題:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
除了手動建立主題,你還能夠配置你的代理服務器(broker),當一個不存在的主題被髮布的時候它能自動建立相應的主題。
Kafka帶有一個命令行客戶端,獲取從文件或來自標準輸入的輸入,並做爲消息發送到Kafka集羣。默認狀況下,每一行將被做爲單獨的消息發送。
運行生產者腳本,而後輸入一些信息到控制檯發送到服務器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
Kafka也有一個命令行消費者,將收到的消息輸出到標準輸出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
若是你在不一樣的終端上運行上面的命令,那麼你如今應該能看到從生產者終端輸入的消息會出如今消費者終端。
全部的命令行工具都有其餘選項; 不帶參數運行命令將顯示更加詳細的使用信息。
到目前爲止,咱們已經運行了單個代理的服務器,可是這沒有樂趣。對於Kafka,一個代理是隻有一個單節點的集羣,所以多代理集羣只是比開始多了一些代理實例外,沒有什麼太大的變化。但只是爲了感覺一下,咱們的集羣擴展到三個節點(全部的節點仍是在本地機器上)。
首先,咱們爲每一個經紀人作一個配置文件(在Windows上使用copy命令來代替):
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
如今,編輯這些新文件和設置如下屬性:
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
該broker.id屬性是集羣中的每一個節點的惟一和永久的名字。咱們要重寫端口和日誌目錄,由於咱們都在同一臺機器上運行這些代理,咱們要防止經紀人在同一端口上註冊或覆蓋彼此的數據。
咱們已經有Zookeeper服務和咱們的單個節點服務,因此咱們只須要啓動兩個新節點:
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
如今,建立一個新的具備三個的副本因子的主題:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
好了,如今咱們有一個集羣,可是如何才能知道哪一個代理節點在作什麼?要查看運行「describe topics」命令:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
下面是輸出的解釋。第一行給出了全部分區的摘要,每一個附加的行提供了一個分區的信息。因爲咱們只有一個分區,因此這個主題只有一行。
請注意,在個人例子節點1是該主題的惟一分區中的leader。
咱們能夠運行相同的命令看看咱們建立原來的話題的狀態:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
因此絕不奇怪,原來的話題沒有副本,只有咱們建立它時的惟一的服務器0。
讓咱們發佈一些消息到咱們新的話題:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
如今讓咱們來消費這些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
如今,讓咱們測試容錯性。代理1是領導者,讓咱們殺死它:
> ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564
在Windows上使用:
> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties" java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.2.0.jar" kafka.Kafka config\server-1.properties 644 > taskkill /pid 644 /f
領導權已經切換到備機中的一個節點上去了,節點1再也不在同步中的副本集(in-sync replica set)中:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但消息仍然是可用於消費,即便是原來負責寫任務的領導者已經不在了:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
從控制檯寫入數據和寫回控制檯是一個很方便入門的例子,但你可能想用Kafka使用其餘來源的數據或導出Kafka的數據到其餘系統。相對於許多系統須要編寫定製集成的代碼,您可使用Kafka鏈接到系統去導入或導出數據。
Kafka Connect是包括在Kafka中一個工具,用來導入導出數據到Kafka。它是connectors的一個可擴展工具,其執行定製邏輯,用於與外部系統交互。在這個快速入門,咱們將看到如何使用Kafka Connect作一些簡單的鏈接器從一個文件導入數據到Kafka的主題,和將主題數據導出到一個文件。
首先,咱們須要建立一些原始數據來開始測試:
> echo -e "foo\nbar" > test.txt
接下來,咱們將啓動兩個運行在獨立模式的鏈接器,這意味着他們在一個單一的,局部的,專用的進程中運行。咱們提供三個配置文件做爲參數。第一始終是Kafka鏈接過程當中的公共配置,如要鏈接到的Kafka的代理服務器的配置和數據的序列化格式的配置。剩餘的每一個配置文件用來建立指定的鏈接器。這些文件包括一個惟一的鏈接器名稱,須要實例化的鏈接器類,還有建立該鏈接器所需的其餘配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
用這些Kafka的示例配置文件,使用前面已經啓動的本地羣集的默認配置,創建兩個鏈接器:第一是一個源鏈接器,其從輸入文件中讀取每行的內容,發佈到的Kafka主題和第二個是一個sink鏈接器負責從Kafka主題讀取消息,生產出的消息按行輸出到文件。
在啓動過程當中,你會看到一些日誌信息,包括一些代表該鏈接器被實例化的信息。一旦Kafka Connect進程已經開始,源鏈接器應該開始從test.txt讀取每行的消息,並將其生產發佈到主題connect-test,而sink鏈接器應該從主題connect-test讀取消息,並將其寫入文件test.sink.txt。咱們能夠經過檢查輸出文件的內容來驗證數據都已經過整個管道輸送:
> cat test.sink.txt foo bar
請注意,數據被存儲在Kafka主題的connect-test中,因此咱們也能夠運行控制檯消費者消費主題中的數據(或使用定製的消費者代碼來處理它):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} ...
鏈接器不停的處理數據,所以咱們能夠將數據添加到該文件,並能看到數據經過管道移動:
> echo "Another line" >> test.txt
您應該看到一行消息出如今控制檯消費者的控制檯和sink文件中。
Kafka Streams 是Kafka的客戶端庫, 用來作實時流處理和分析存儲在Kafka代理服務器的數據。該快速入門例子將演示如何運行這個流應用庫。這裏是要點WordCountDemo的示例代碼(轉換爲方便閱讀的Java 8 lambda表達式)。
// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic ""streams-file-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input"); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count("Counts") // Store the running counts as a changelog stream to the output topic. wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
它實現了單詞計數算法,計算輸入文本中一個單詞的出現次數。然而,與其餘單詞計數的算法不一樣,其餘的算法通常都是對有界數據進行操做,該算法演示應用程序的表現略有不一樣,由於他能夠被設計去操做無限的,無界的流數據。和操做有界數據的算法類似,它是一個有狀態的算法,能夠跟蹤和更新單詞的計數。然而,由於它必須承擔潛在的無界輸入數據的處理,它會週期性地輸出其當前狀態和結果,同時繼續處理更多的數據,由於它沒法知道他有沒有處理完「全部」的輸入數據。
做爲第一步驟,咱們將準備好輸入到Kafka主題的數據,隨後由Kafka Streams應用程序進行處理。
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
或在Windows上:
> echo all streams lead to kafka> file-input.txt > echo hello kafka streams>> file-input.txt > echo|set /p=join kafka summit>> file-input.txt
接下來,咱們使用控制檯生產者把輸入的數據發送到主題名streams-file-input 的主題上,其內容從STDIN一行一行的讀取,並一行一行的發佈到主題,每一行的消息都有一個空鍵和編碼後的字符串(在實踐中,當應用程序將啓動並運行後,流數據極可能會持續流入Kafka):
> bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
如今,咱們能夠運行單詞計數應用程序來處理輸入數據:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
演示應用程序將從輸入主題streams-file-input讀取數據,對讀取的消息的執行單詞計數算法,而且持續寫入其當前結果到輸出主題streams-wordcount-output。所以,除了寫回Kafka的日誌條目,不會有任何的STDOUT輸出。該演示將運行幾秒鐘,與典型的流處理應用不一樣,演示程序會自動終止。
如今,咱們經過讀取輸出主題的輸出獲得單詞計數演示程序的結果:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
下面的數據會被輸出到控制檯:
all 1 lead 1 to 1 hello 1 streams 2 join 1 kafka 3 summit 1
這裏,第一列是java.lang.String類型的消息健,而第二列是java.lang.Long型消息值。注意,這裏的輸出實際上是數據更新的連續流,每一個數據記錄(上面的例子裏的每行的輸出)都有一個單詞更新後的數目值,例如「Kafka」做爲鍵的記錄。對於具備相同鍵的多個記錄,每一個後面的記錄都是前一個記錄的更新。
下面的兩個圖說明什麼發生在幕後的過程。第一列顯示的當前狀態的變化,用KTable<String, Long>來統計單詞出現的數目。第二列顯示KTable狀態更新致使的發生變化的記錄,這個變化的記錄被髮送到輸出Kafka主題streams-wordcount-output。
首先, 「all streams lead to kafka」這樣一行文本正在被處理。當新的單詞被處理的時候,KTable會增長一個新的表項(以綠色背景高亮顯示),並有相應的變化記錄發送到下游KStream。
當第二行「hello kafka streams」被處理的時候,咱們觀察到,現有的KTable中的表項第一次被更新(這裏: 單詞 「kafka」 和 「streams」)。再次,改變的記錄被髮送到輸出話題。
以此類推(咱們跳過的第三行是如何被處理的插圖)。這就解釋了爲何輸出主題有咱們上面例子顯示的內容,由於它包含了完整的更改記錄。
跳出這個具體的例子咱們從總體去看, Kafka流利用表和日誌變化(changelog)流之間的二元性(here: 表= the KTable, 日誌變化流 = the downstream KStream):你能夠發佈的每個表的變化去一個流,若是你從開始到結束消費了整個的日誌變化(changelog)流,你能夠重建表的內容。
如今,你能夠寫更多的輸入信息到streams-file-input主題,並觀察更多的信息加入到了 streams-wordcount-output主題,反映了更新後的單詞數目(例如,使用上述的控制檯生產者和控制檯消費者)。
您能夠經過Ctrl-C 中止控制檯消費者。
除了Kafka的主要版本以外,還有不少應用集成了Kafka工具。該生態系統頁面中列出的許多工具,包括流處理系統,Hadoop的集成,監控和部署工具。
從0.8.4,0.9.x,0.10.0.x或0.10.1.x升級到0.10.2.0
0.10.2.0的有線協議有變化。經過下面的推薦滾動升級計劃,你能保證在升級過程當中無需停機。可是,請在升級以前查看0.10.2.0版本顯著的變化。
從0.10.2版本開始,Java客戶端(生產者和消費者)已得到與舊版本代理服務器溝通的能力。版本0.10.2客戶能夠跟0.10.0版或更新版本的代理溝通。可是,若是你的代理比0.10.0老,你必須在升級客戶端以前升級Kafka集羣中的全部代理服務器(Broker)。版本0.10.2代理支持0.8.x和更新的客戶端。
對於滾動升級:
注意:若是你願意接受宕機,你能夠簡單地把全部的代理服務器關閉,更新代碼,而後從新啓動他們。他們將默認使用新的協議。
注:改變協議版本並從新啓動能夠在代理服務器升級以後的任什麼時候間作,沒有必要必須馬上就作。
從0.8.4,0.9.x版本或0.10.0.X升級到0.10.1.0
0.10.1.0有線協議發生了變化。經過下面的推薦滾動升級計劃,能保證在升級過程當中無需停機。可是,請注意在升級以前仔細閱讀0.10.1.0潛在的重大更改。
注意:因爲新協議的引入,它是升級你的客戶端以前請先完成Kafka集羣的升級(即0.10.1.x客戶端僅支持0.10.1.x或更高版本的代理,但0.10.1.x的代理能夠支持舊版本客戶端)。
對於滾動升級:
注意:若是你願意接受宕機,你能夠簡單地把全部的代理服務器關閉,更新代碼,而後從新啓動他們。他們將默認使用新的協議。
注:改變協議版本並從新啓動能夠在代理服務器升級以後的任什麼時候間作,沒有必要必須馬上就作。
0.10.0.0具備的潛在的重大更改(請在升級前仔細檢查更改)和 在升級後的性能影響。經過下面的推薦滾動升級計劃,能保證不宕機,不影響性能和隨後的升級。
注意:因爲新協議的引入,升級客戶端以前升級您的Kafka集羣是很重要的。
注意0.9.0.0版本的客戶端:因爲0.9.0.0引入了一個錯誤,即依賴於ZooKeeper的客戶(老Scala高層次消費者和與老消費者一塊兒使用的MirrorMaker)不能和0.10.0.x代理一塊兒工做。所以,在代理都升級到0.10.0.x以前, 0.9.0.0客戶端應升級到0.9.0.1 . 這一步對0.8.4或0.9.0.1客戶端沒有必要。
對於滾動升級:
注意:若是你願意接受宕機,你能夠簡單地把全部的代理服務器關閉,更新代碼,而後從新啓動他們。他們將默認使用新的協議。
注:改變協議版本並從新啓動能夠在代理服務器升級以後的任什麼時候間作,沒有必要必須馬上就作。
0.10.0消息格式包括一個新的時間戳字段,並對壓縮的消息使用相對偏移。磁盤上的消息格式能夠經過在server.properties文件的log.message.format.version進行配置。默認的磁盤上的消息格式爲0.10.0。若是消費者客戶端的版本是0.10.0.0以前的版本,那它只能明白0.10.0以前的消息格式。在這種狀況下,代理可以把消息從0.10.0格式轉換到一個較早的格式再發送舊版本的響應給消費者。然而,代理不能在這種狀況下使用零拷貝轉移。Kafka社區報告顯示性能的影響爲CPU利用率從20%增長至將近100%,這迫使全部客戶端的必須即時升級使性能恢復正常。爲了不這樣的消息轉換帶來的性能問題,消費者升級到0.10.0.0以前,在升級代理到0.10.0.0的過程當中設置log.message.format.version到0.8.2或0.9.0。這樣一來,代理仍然可使用零拷貝傳輸,將數據發送到老消費者。一旦消費者升級完成,消息格式更改成0.10.0,這樣代理就能夠享受新的消息格式包括新的時間戳和改進的壓縮算法。這種轉換能夠支持兼容性,對只有幾個尚未更新到最新客戶端的應用程序很是有用,但不切實際的是使用一個過分使用的集羣中去支持全部消費者的流量。所以,當代理已經升級,但大多數客戶端尚未完成升級的狀況,要儘量避免使用這種信息轉換。
對於升級到0.10.0.0客戶,沒有性能影響。
注:設置消息格式版本是一個證實,現有的全部支持的消息都在這個版本或低於該消息格式的版本。不然, 0.10.0.0以前的消費者可能不能正常工做。特別是消息格式設置爲0.10.0以後,不該該再改回先前的格式,由於它可能使得0.10.0.0以前的消費者工做異常。
注:因爲每一個消息中引入了另外的時間戳,生產者發送的消息大小比較小的時候由於額外的負載開銷也許會看到吞吐量的降低。一樣,副本的複製會讓每一個消息額外傳輸8個字節。若是你正在運行接近集羣承載能力的網絡容量,你可能會壓垮網卡,因爲超載而發生故障和性能問題。
注:若是您已對生產者啓用壓縮算法,您可能會注意到下降的生產者吞吐量和/或在某些狀況下代理下降的壓縮比。當接收到壓縮的消息,0.10.0代理避免再次壓縮消息,其一般下降了等待時間,並提升了吞吐量。在某些狀況下,這可能會減小生產者批量消息包的大小,這可能致使更糟糕的吞吐量。若是發生這種狀況,用戶能夠調整生產者的linger.ms和batch.size以得到更好的吞吐量。此外,用於高效壓縮消息的生產者緩衝區比代理使用的緩衝區小,這可能對磁盤的壓縮消息比率有負面的影響。咱們打算在將來的Kafka版本中可以配置這些參數。
升級0.8.0,0.8.1.X或0.8.2.X到0.9.0.0
0.9.0.0具備的潛在的重大更改(請在升級前檢查),還有之前的版本到如今的代理間協議的變化。這意味着升級的代理和客戶端可能不兼容舊版本。您在升級您的客戶端以前升級Kafka集羣是很重要的。若是您正在使用MirrorMaker下游集羣應該先升級爲好。
對於滾動升級:
注意:若是你願意接受宕機,你能夠簡單地把全部的代理服務器關閉,更新代碼,而後從新啓動他們。他們將默認使用新的協議。
注:改變協議版本並從新啓動能夠在代理服務器升級以後的任什麼時候間作,沒有必要必須馬上就作。
0.9.0.0棄用的功能
0.8.2與0.8.1徹底兼容。能夠經過簡單地將其關閉,更新代碼,並從新啓動逐一升級代理。
0.8.1與0.8徹底兼容。能夠經過簡單地將其關閉,更新代碼,並從新啓動逐一升級代理。從0.7升級
0.7版本與新版本不兼容。API,Zookeeper的數據結構和協議,能夠配置的增長副本(這是在0.7沒有的),都發生了重大變化。從0.7到更高版本的升級須要特殊的工具進行遷移。這種遷移能夠無需宕機就能夠完成。