kafka

《KAFKA官方文檔》入門指南

1.入門指南html

1.1簡介java

Apache的Kafka™是一個分佈式流平臺(a distributed streaming platform)。這到底意味着什麼?

咱們認爲,一個流處理平臺應該具備三個關鍵能力:git

  1. 它可讓你發佈和訂閱記錄流。在這方面,它相似於一個消息隊列或企業消息系統。
  2. 它可讓你持久化收到的記錄流,從而具備容錯能力。
  3. 它可讓你處理收到的記錄流。

 

Kafka擅長哪些方面?github

它被用於兩大類應用:web

  1. 創建實時流數據管道從而可以可靠地在系統或應用程序之間的共享數據
  2. 構建實時流應用程序,可以變換或者對數據
  3. 進行相應的處理。

想要了解Kafka如何具備這些能力,讓咱們從下往上深刻探索Kafka的能力。正則表達式

首先,明確幾個概念:算法

  • Kafka是運行在一個或多個服務器的集羣(Cluster)上的。
  • Kafka集羣分類存儲的記錄流被稱爲主題(Topics)。
  • 每一個消息記錄包含一個鍵,一個值和時間戳。

Kafka有四個核心API:數據庫

  • 生產者 API 容許應用程序發佈記錄流至一個或多個Kafka的話題(Topics)。
  • 消費者API容許應用程序訂閱一個或多個主題,並處理這些主題接收到的記錄流。
  • Streams API容許應用程序充當流處理器(stream processor,從一個或多個主題獲取輸入流,並生產一個輸出流至一個或多個的主題,可以有效地變換輸入流爲輸出流。
  • Connector API容許構建和運行可重用的生產者或消費者,可以把 Kafka主題鏈接到現有的應用程序或數據系統。例如,一個鏈接到關係數據庫的鏈接器(connector)可能會獲取每一個表的變化。

 

Kafka的客戶端和服務器之間的通訊是靠一個簡單的,高性能的,與語言無關的TCP協議完成的。這個協議有不一樣的版本,並保持向後兼容舊版本(向前兼容舊版本?)。Kafka不光提供了一個Java客戶端,還有許多語言版本的客戶端。apache

主題和日誌編程

讓咱們先來了解Kafka的核心抽象概念記錄流 – 主題。

主題是一種分類或發佈的一系列記錄的名義上的名字。Kafka的主題始終是支持多用戶訂閱的; 也就是說,一個主題能夠有零個,一個或多個消費者訂閱寫入的數據。

對於每個主題,Kafka集羣保持一個分區日誌文件,看下圖:

每一個分區是一個有序的,不可變的消息序列,新的消息不斷追加到這個有組織的有保證的日誌上。分區會給每一個消息記錄分配一個順序ID號 – 偏移量, 可以惟一地標識該分區中的每一個記錄。

Kafka集羣保留全部發布的記錄,無論這個記錄有沒有被消費過,Kafka提供可配置的保留策略去刪除舊數據(還有一種策略根據分區大小刪除數據)。例如,若是將保留策略設置爲兩天,在記錄公佈後兩天,它可用於消費,以後它將被丟棄以騰出空間。Kafka的性能跟存儲的數據量的大小無關, 因此將數據存儲很長一段時間是沒有問題的。

事實上,保留在每一個消費者元數據中的最基礎的數據就是消費者正在處理的當前記錄的偏移量(offset)或位置(position)。這種偏移是由消費者控制:一般偏移會隨着消費者讀取記錄線性前進,但事實上,由於其位置是由消費者進行控制,消費者能夠在任何它喜歡的位置讀取記錄。例如,消費者能夠恢復到舊的偏移量對過去的數據再加工或者直接跳到最新的記錄,並消費從「如今」開始的新的記錄。

這些功能的結合意味着,實現Kafka的消費者的代價都是很小的,他們能夠增長或者減小而不會對集羣或其餘消費者有太大影響。例如,你可使用咱們的命令行工具去追隨任何主題,並且不會改變任何現有的消費者消費的記錄。

數據日誌的分區,一舉數得。首先,它們容許數據可以擴展到更多的服務器上去。每一個單獨的分區的大小受到承載它的服務器的限制,但一個話題可能有不少分區,以便它可以支持海量的的數據。其次,更重要的意義是分區是進行並行處理的基礎單元。

分佈式

日誌的分區會跨服務器的分佈在Kafka集羣中,每一個服務器會共享分區進行數據請求的處理。每一個分區能夠配置必定數量的副本分區提供容錯能力。

每一個分區都有一個服務器充當「leader」和零個或多個服務器充當「followers」。 leader處理全部的讀取和寫入分區的請求,而followers被動的從領導者拷貝數據。若是leader失敗了,followers之一將自動成爲新的領導者。每一個服務器可能充當一些分區的leader和其餘分區的follower,這樣的負載就會在集羣內很好的均衡分配。

生產者

生產者發佈數據到他們所選擇的主題。生產者負責選擇把記錄分配到主題中的哪一個分區。這可使用輪詢算法( round-robin)進行簡單地平衡負載,也能夠根據一些更復雜的語義分區算法(好比基於記錄一些鍵值)來完成。

消費者

消費者以消費羣(consumer group 的名稱來標識本身,每一個發佈到主題的消息都會發送給訂閱了這個主題的消費羣裏面的一個消費者的一個實例。消費者的實例能夠在單獨的進程或單獨的機器上。

若是全部的消費者實例都屬於相同的消費羣,那麼記錄將有效地被均衡到每一個消費者實例。

若是全部的消費者實例有不一樣的消費羣,那麼每一個消息將被廣播到全部的消費者進程。

兩個服務器的Kafka集羣具備四個分區(P0-P3)和兩個消費羣。A消費羣有兩個消費者,B羣有四個。

更常見的是,咱們會發現主題有少許的消費羣,每個都是「邏輯上的訂閱者」。每組都是由不少消費者實例組成,從而實現可擴展性和容錯性。這只不過是發佈 – 訂閱模式的再現,區別是這裏的訂閱者是一組消費者而不是一個單一的進程的消費者。

Kafka消費羣的實現方式是經過分割日誌的分區,分給每一個Consumer實例,使每一個實例在任什麼時候間點的均可以「公平分享」獨佔的分區。維持消費羣中的成員關係的這個過程是經過Kafka動態協議處理。若是新的實例加入該組,他將接管該組的其餘成員的一些分區; 若是一個實例死亡,其分區將被分配到剩餘的實例。

 

Kafka只保證一個分區內的消息有序,不能保證一個主題的不一樣分區之間的消息有序。分區的消息有序與依靠主鍵進行數據分區的能力相結合足以知足大多數應用的要求。可是,若是你想要保證全部的消息都絕對有序能夠只爲一個主題分配一個分區,雖然這將意味着每一個消費羣同時只能有一個消費進程在消費。

保證

Kafka提供瞭如下一些高級別的保證:

  • 由生產者發送到一個特定的主題分區的消息將被以他們被髮送的順序來追加。也就是說,若是一個消息M1和消息M2都來自同一個生產者,M1先發,那麼M1將有一個低於M2的偏移,會更早在日誌中出現。
  • 消費者看到的記錄排序就是記錄被存儲在日誌中的順序。
  • 對於副本因子N的主題,咱們將承受最多N-1次服務器故障切換而不會損失任何的已經保存的記錄。

對這些保證的更多細節能夠參考文檔的設計部分。

Kafka做爲消息系統

如何將Kafka的流的概念和傳統的企業信息系統做比較?

消息處理模型從來有兩種:隊列發佈-訂閱。在隊列模型中,一組消費者能夠從服務器讀取記錄,每一個記錄都會被其中一個消費者處理; 在發佈-訂閱模式裏,記錄被廣播到全部的消費者。這兩種模式都具備必定的優勢和弱點。隊列的優勢是它可讓你把數據分配到多個消費者去處理,它可讓您擴展你的處理能力。不幸的是,隊列不支持多個訂閱者,一旦一個進程讀取了數據,這個數據就會消失。發佈-訂閱模式可讓你廣播數據到多個進程,可是由於每個消息發送到每一個訂閱者,沒辦法對訂閱者處理能力進行擴展。

Kafka的消費羣的推廣了這兩個概念。消費羣能夠像隊列同樣讓消息被一組進程處理(消費羣的成員),與發佈 – 訂閱模式同樣,Kafka可讓你發送廣播消息到多個消費羣。

 

Kafka的模型的優勢是,每一個主題都具備這兩個屬性,它能夠擴展處理能力,也能夠實現多個訂閱者,沒有必要二選一。

Kafka比傳統的消息系統具備更強的消息順序保證的能力。

傳統的消息隊列的消息在隊列中是有序的,多個消費者從隊列中消費消息,服務器按照存儲的順序派發消息。然而,儘管服務器是按照順序派發消息,可是這些消息記錄被異步傳遞給消費者,消費者接收到的消息也許已是亂序的了。這實際上意味着消息的排序在並行消費中都將丟失。消息系統一般靠 「排他性消費」( exclusive consumer)來解決這個問題,只容許一個進程從隊列中消費,固然,這意味着沒有並行處理的能力。

Kafka作的更好。經過一個概念:並行性-分區-主題實現主題內的並行處理,Kafka是可以經過一組消費者的進程同時提供排序保證和負載均衡。每一個主題的分區指定給每一個消費羣中的一個消費者,使每一個分區只由該組中的一個消費者所消費。經過這樣作,咱們確保消費者是一個分區惟一的讀者,從而順序的消費數據。由於有許多的分區,因此負載還可以均衡的分配到不少的消費者實例上去。可是請注意,一個消費羣的消費者實例不能比分區數量多。

Kafka做爲存儲系統

任何消息隊列都可以解耦消息的生產和消費,還可以有效地存儲正在傳送的消息。Kafka不同凡響的是,它是一個很是好的存儲系統。

Kafka把消息數據寫到磁盤和備份分區。Kafka容許生產者等待返回確認,直到副本複製和持久化所有完成才認爲成功,不然則認爲寫入服務器失敗。

Kafka使用的磁盤結構很好擴展,Kafka將執行相同的策略無論你是有50 KB或50TB的持久化數據。

 

因爲存儲的重要性,並容許客戶控制本身的讀取位置,你能夠把Kafka認爲是一種特殊用途的分佈式文件系統,致力於高性能,低延遲的有保障的日誌存儲,可以備份和自我複製。

Kafka流處理

只是讀,寫,以及儲存數據流是不夠的,目的是可以實時處理數據流。

在Kafka中,流處理器是從輸入的主題連續的獲取數據流,而後對輸入進行一系列的處理,並生產連續的數據流到輸出主題。

例如,零售應用程序可能須要輸入銷售和出貨量,根據輸入數據計算出從新訂購的數量和調整後的價格,而後輸出到主題。

這些簡單處理能夠直接使用生產者和消費者的API作到。然而,對於更復雜的轉換Kafka提供了一個徹底集成的流API。這容許應用程序把一些重要的計算過程從流中剝離或者加入流一塊兒。

這種設施可幫助解決這類應用面臨的難題:處理雜亂的數據,改變代碼去從新處理輸入,執行有狀態的計算等

流API創建在Kafka提供的核心基礎單元之上:它使用生產者和消費者的API進行輸入輸出,使用Kafka存儲有狀態的數據,並使用羣組機制在一組流處理實例中實現容錯。

把功能組合起來

消息的傳輸,存儲和流處理的組合看似不尋常倒是Kafka做爲流處理平臺的關鍵。

像HDFS分佈式文件系統,容許存儲靜態文件進行批量處理。像這樣的系統容許存儲和處理過去的歷史數據

傳統的企業消息系統容許處理您訂閱後才抵達的消息。這樣的系統只能處理未來到達的數據。

Kafka結合了這些功能,這種結合對Kafka做爲流應用平臺以及數據流處理的管道相當重要。

經過整合存儲和低延遲訂閱,流處理應用能夠把過去和將來的數據用相同的方式處理。這樣一個單獨的應用程序,不但能夠處理歷史的,保存的數據,當它到達最後一條記錄不會中止,繼續等待處理將來到達的數據。這是泛化了的的流處理的概念,包括了批處理應用以及消息驅動的應用。

一樣,流數據處理的管道結合實時事件的訂閱令人們可以用Kafka實現低延遲的管道; 可靠的存儲數據的能力令人們有可能使用它傳輸一些重要的必須保證可達的數據。能夠與一個按期加載數據的線下系統集成,或者與一個由於維護長時間下線的系統集成。流處理的組件可以保證轉換(處理)到達的數據。

有關Kafka提供的保證,API和功能的更多信息,看其他文件

1.2使用案例

下面描述了一些使用Apache Kafka™的流行用例。更多的關於這些領域實踐的概述,參考這個博客

消息

Kafka可以很好的替代傳統的消息中間件。消息中間件因爲各類緣由被使用(解耦數據的生產和消費,緩衝未處理的消息等)。相較於大多數消息處理系統,Kafka有更好的吞吐量,內置分區,副本複製和容錯性,使其成爲大規模消息處理應用的理想解決方案。

根據咱們的經驗消息的使用一般具備相對低的吞吐量,但可能須要端到端的低延遲,以及高可靠性的保證,這種低延遲和可靠性的保證偏偏是Kafka可以提供的。

在這一領域Kafka是可以和傳統的消息系統相媲美的,例如ActiveMQ或 RabbitMQ

網站活動跟蹤

最初的用例是用Kafka重建一個用戶活動跟蹤管道使之做爲一組實時發佈 – 訂閱的數據源。這意味着網站活動(網頁瀏覽,搜索,或其餘可能的操做)被看成一組中心主題發佈,每種活動被看成一個主題。這些數據源(feeds)可被一系列的應用訂閱,包括實時處理,實時監測,加載到Hadoop系統或離線數據倉庫系統進行離線處理和報告。

活動追蹤一般會產生巨大的數據量,由於每一個用戶頁面的瀏覽都會產生不少的活動消息。

測量

Kafka一般用於監測數據的處理。這涉及從分佈式應用程序彙集統計數據,生產出集中的運行數據源feeds(以便訂閱)。

日誌聚合

許多人用Kafka做爲日誌聚合解決方案的替代品。日誌聚合一般從服務器收集物理日誌文件,並把它們放在一個集中的地方(文件服務器或HDFS)進行處理。Kafka抽象了文件的詳細信息,把日誌或事件數據的簡潔抽象做爲消息流傳輸。這爲低時延的處理提供支持,並且更容易支持多個數據源和分佈式的數據消費。相比集中式的日誌處理系統,Scribe or Flume,Kafka提供一樣良好的性能,並且由於副本備份提供了更強的可靠性保證和更低的端到端延遲。

流處理

Kafka的流數據管道在處理數據的時候包含多個階段,其中原始輸入數據從Kafka主題被消費而後彙總,加工,或轉化成新主題用於進一步的消費或後續處理。例如,用於推薦新聞文章的數據流處理管道可能從RSS源抓取文章內容,並將其發佈到「文章」主題; 進一步的處理多是標準化或刪除重複數據,而後發佈處理過的文章內容到一個新的話題; 最後的處理階段可能會嘗試推薦這個內容給用戶。這樣的數據流處理管道基於各個主題建立了實時數據數據流程圖。從版本0.10.0.0開始,Apache Kafka加入了輕量級的但功能強大的流處理庫Kafka Streams ,Kafka Streams支持如上所述的數據處理。除了Kafka Streams,能夠選擇的開源流處理工具包括 Apache Storm and Apache Samza.

Event Sourcing

Event sourcing 是一種應用程序設計風格,是按照時間順序記錄的狀態變化的序列。Kafka的很是強大的存儲日誌數據的能力使它成爲構建這種應用程序的極好的後端選擇。

Commit Log

Kafka能夠爲分佈式系統提供一種外部提交日誌(commit-log)服務。日誌有助於節點之間複製數據,並做爲一種數據從新同步機制用來恢復故障節點的數據。Kafka的log compaction 功能有助於支持這種用法。Kafka在這種用法中相似於Apache BookKeeper 項目。

1.3快速開始

本教程假設你從零開始,沒有現成的Kafka或ZooKeeper數據。因爲Kafka控制檯腳本在Unix基礎的和Windows平臺上的不一樣,在Windows平臺上使用bin\windows\,而不是bin/,並修改腳本擴展爲.bat。

1步:下載代碼

下載0.10.2.0釋放和un-tar它。

> tar -xzf kafka_2.11-0.10.2.0.tgz
> cd kafka_2.11-0.10.2.0

2步:啓動服務器

Kafka使用ZooKeeper的,因此你須要先啓動ZooKeeper的服務器,若是你尚未,您可使用Kafka包裝裏的方便腳原本獲得一個快速和污染的單節點的ZooKeeper實例。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

如今啓動Kafka服務器:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3步:建立一個話題

讓咱們建立一個名爲「test」主題,只有一個分區,只有一個副本:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

如今咱們能夠看到,若是咱們運行的列表主題命令話題:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

除了手動建立主題,你還能夠配置你的代理服務器(broker),當一個不存在的主題被髮布的時候它能自動建立相應的主題。

4步:發送一些消息

Kafka帶有一個命令行客戶端,獲取從文件或來自標準輸入的輸入,並做爲消息發送到Kafka集羣。默認狀況下,每一行將被做爲單獨的消息發送。

運行生產者腳本,而後輸入一些信息到控制檯發送到服務器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

5步:啓動消費者

Kafka也有一個命令行消費者,將收到的消息輸出到標準輸出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

若是你在不一樣的終端上運行上面的命令,那麼你如今應該能看到從生產者終端輸入的消息會出如今消費者終端。

全部的命令行工具都有其餘選項; 不帶參數運行命令將顯示更加詳細的使用信息。

6步:設置多代理羣集

到目前爲止,咱們已經運行了單個代理的服務器,可是這沒有樂趣。對於Kafka,一個代理是隻有一個單節點的集羣,所以多代理集羣只是比開始多了一些代理實例外,沒有什麼太大的變化。但只是爲了感覺一下,咱們的集羣擴展到三個節點(全部的節點仍是在本地機器上)。

首先,咱們爲每一個經紀人作一個配置文件(在Windows上使用copy命令來代替):

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

如今,編輯這些新文件和設置如下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

該broker.id屬性是集羣中的每一個節點的惟一和永久的名字。咱們要重寫端口和日誌目錄,由於咱們都在同一臺機器上運行這些代理,咱們要防止經紀人在同一端口上註冊或覆蓋彼此的數據。

咱們已經有Zookeeper服務和咱們的單個節點服務,因此咱們只須要啓動兩個新節點:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

如今,建立一個新的具備三個的副本因子的主題:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好了,如今咱們有一個集羣,可是如何才能知道哪一個代理節點在作什麼?要查看運行「describe topics」命令:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

下面是輸出的解釋。第一行給出了全部分區的摘要,每一個附加的行提供了一個分區的信息。因爲咱們只有一個分區,因此這個主題只有一行。

  • 「Leader」,負責指定分區全部讀取和寫入的節點。每一個節點將是一部分隨機選擇的分區中的領導者。
  • 「Replicas」是此分區日誌的節點列表集合,無論這些節點是不是領導者或者只是還活着(不在in-sync狀態)。
  • 「ISR」是一組」in-sync」 節點列表的集合。這個列表包括目前活着並跟leader保持同步的replicas,Isr 是Replicas的子集。

請注意,在個人例子節點1是該主題的惟一分區中的leader。

咱們能夠運行相同的命令看看咱們建立原來的話題的狀態:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

因此絕不奇怪,原來的話題沒有副本,只有咱們建立它時的惟一的服務器0。

讓咱們發佈一些消息到咱們新的話題:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

如今讓咱們來消費這些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

如今,讓咱們測試容錯性。代理1是領導者,讓咱們殺死它:

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在Windows上使用:

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.2.0.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

領導權已經切換到備機中的一個節點上去了,節點1再也不在同步中的副本集(in-sync replica set)中:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0

但消息仍然是可用於消費,即便是原來負責寫任務的領導者已經不在了:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

7步:使用Kafka鏈接導入/導出數據

從控制檯寫入數據和寫回控制檯是一個很方便入門的例子,但你可能想用Kafka使用其餘來源的數據或導出Kafka的數據到其餘系統。相對於許多系統須要編寫定製集成的代碼,您可使用Kafka鏈接到系統去導入或導出數據。

Kafka Connect是包括在Kafka中一個工具,用來導入導出數據到Kafka。它是connectors的一個可擴展工具,其執行定製邏輯,用於與外部系統交互。在這個快速入門,咱們將看到如何使用Kafka Connect作一些簡單的鏈接器從一個文件導入數據到Kafka的主題,和將主題數據導出到一個文件。

首先,咱們須要建立一些原始數據來開始測試:

> echo -e "foo\nbar" > test.txt

接下來,咱們將啓動兩個運行在獨立模式的鏈接器,這意味着他們在一個單一的,局部的,專用的進程中運行。咱們提供三個配置文件做爲參數。第一始終是Kafka鏈接過程當中的公共配置,如要鏈接到的Kafka的代理服務器的配置和數據的序列化格式的配置。剩餘的每一個配置文件用來建立指定的鏈接器。這些文件包括一個惟一的鏈接器名稱,須要實例化的鏈接器類,還有建立該鏈接器所需的其餘配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

用這些Kafka的示例配置文件,使用前面已經啓動的本地羣集的默認配置,創建兩個鏈接器:第一是一個源鏈接器,其從輸入文件中讀取每行的內容,發佈到的Kafka主題和第二個是一個sink鏈接器負責從Kafka主題讀取消息,生產出的消息按行輸出到文件。

在啓動過程當中,你會看到一些日誌信息,包括一些代表該鏈接器被實例化的信息。一旦Kafka Connect進程已經開始,源鏈接器應該開始從test.txt讀取每行的消息,並將其生產發佈到主題connect-test,而sink鏈接器應該從主題connect-test讀取消息,並將其寫入文件test.sink.txt。咱們能夠經過檢查輸出文件的內容來驗證數據都已經過整個管道輸送:

> cat test.sink.txt
foo
bar

請注意,數據被存儲在Kafka主題的connect-test中,因此咱們也能夠運行控制檯消費者消費主題中的數據(或使用定製的消費者代碼來處理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

鏈接器不停的處理數據,所以咱們能夠將數據添加到該文件,並能看到數據經過管道移動:

> echo "Another line" >> test.txt

您應該看到一行消息出如今控制檯消費者的控制檯和sink文件中。

8步:使用Kafka Streams處理數據

Kafka Streams 是Kafka的客戶端庫, 用來作實時流處理和分析存儲在Kafka代理服務器的數據。該快速入門例子將演示如何運行這個流應用庫。這裏是要點WordCountDemo的示例代碼(轉換爲方便閱讀的Java 8 lambda表達式)。

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic ""streams-file-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count("Counts")

// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");

它實現了單詞計數算法,計算輸入文本中一個單詞的出現次數。然而,與其餘單詞計數的算法不一樣,其餘的算法通常都是對有界數據進行操做,該算法演示應用程序的表現略有不一樣,由於他能夠被設計去操做無限的,無界的流數據。和操做有界數據的算法類似,它是一個有狀態的算法,能夠跟蹤和更新單詞的計數。然而,由於它必須承擔潛在的無界輸入數據的處理,它會週期性地輸出其當前狀態和結果,同時繼續處理更多的數據,由於它沒法知道他有沒有處理完「全部」的輸入數據。

做爲第一步驟,咱們將準備好輸入到Kafka主題的數據,隨後由Kafka Streams應用程序進行處理。

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

或在Windows上:

> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt

接下來,咱們使用控制檯生產者把輸入的數據發送到主題名streams-file-input 的主題上,其內容從STDIN一行一行的讀取,並一行一行的發佈到主題,每一行的消息都有一個空鍵和編碼後的字符串(在實踐中,當應用程序將啓動並運行後,流數據極可能會持續流入Kafka):

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

如今,咱們能夠運行單詞計數應用程序來處理輸入數據:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示應用程序將從輸入主題streams-file-input讀取數據,對讀取的消息的執行單詞計數算法,而且持續寫入其當前結果到輸出主題streams-wordcount-output。所以,除了寫回Kafka的日誌條目,不會有任何的STDOUT輸出。該演示將運行幾秒鐘,與典型的流處理應用不一樣,演示程序會自動終止。

如今,咱們經過讀取輸出主題的輸出獲得單詞計數演示程序的結果:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

下面的數據會被輸出到控制檯:

all     1
lead    1
to      1
hello   1
streams 2
join    1
kafka   3
summit  1

這裏,第一列是java.lang.String類型的消息健,而第二列是java.lang.Long型消息值。注意,這裏的輸出實際上是數據更新的連續流,每一個數據記錄(上面的例子裏的每行的輸出)都有一個單詞更新後的數目值,例如「Kafka」做爲鍵的記錄。對於具備相同鍵的多個記錄,每一個後面的記錄都是前一個記錄的更新。

下面的兩個圖說明什麼發生在幕後的過程。第一列顯示的當前狀態的變化,用KTable<String, Long>來統計單詞出現的數目。第二列顯示KTable狀態更新致使的發生變化的記錄,這個變化的記錄被髮送到輸出Kafka主題streams-wordcount-output

 

首先, 「all streams lead to kafka」這樣一行文本正在被處理。當新的單詞被處理的時候,KTable會增長一個新的表項(以綠色背景高亮顯示),並有相應的變化記錄發送到下游KStream。

當第二行「hello kafka streams」被處理的時候,咱們觀察到,現有的KTable中的表項第一次被更新(這裏: 單詞 「kafka」 和 「streams」)。再次,改變的記錄被髮送到輸出話題。

以此類推(咱們跳過的第三行是如何被處理的插圖)。這就解釋了爲何輸出主題有咱們上面例子顯示的內容,由於它包含了完整的更改記錄。

跳出這個具體的例子咱們從總體去看, Kafka流利用表和日誌變化(changelog)流之間的二元性(here: 表= the KTable, 日誌變化流 = the downstream KStream):你能夠發佈的每個表的變化去一個流,若是你從開始到結束消費了整個的日誌變化(changelog)流,你能夠重建表的內容。

如今,你能夠寫更多的輸入信息到streams-file-input主題,並觀察更多的信息加入到了 streams-wordcount-output主題,反映了更新後的單詞數目(例如,使用上述的控制檯生產者和控制檯消費者)。

您能夠經過Ctrl-C 中止控制檯消費者。

1.4生態系統

除了Kafka的主要版本以外,還有不少應用集成了Kafka工具。該生態系統頁面中列出的許多工具,包括流處理系統,Hadoop的集成,監控和部署工具。

1.5從之前版本升級

0.8.40.9.x0.10.0.x0.10.1.x升級到0.10.2.0

0.10.2.0的有線協議有變化。經過下面的推薦滾動升級計劃,你能保證在升級過程當中無需停機。可是,請在升級以前查看0.10.2.0版本顯著的變化

從0.10.2版本開始,Java客戶端(生產者和消費者)已得到與舊版本代理服務器溝通的能力。版本0.10.2客戶能夠跟0.10.0版或更新版本的代理溝通。可是,若是你的代理比0.10.0老,你必須在升級客戶端以前升級Kafka集羣中的全部代理服務器(Broker)。版本0.10.2代理支持0.8.x和更新的客戶端。

對於滾動升級:

  1. 更新全部代理服務器上的server.properties文件,添加如下屬性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0或0.10.1)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在性能的影響瞭解此配置作什麼的詳細信息。)
  2. 逐一升級代理:關閉代理,更新代碼,並從新啓動。
  3. 一旦整個羣集升級成功,經過編輯inter.broker.protocol.version將其設置爲0.10.2的協議版本。
  4. 若是您之前的消息格式爲0.10.0,改變log.message.format.version至0.10.2(這是一個無效操做,由於0.10.0,0.10.1和0.10.2的消息格式相同)。若是您之前的消息格式版本低於0.10.0,不要改變log.message.format.version – 這個參數只能在全部的消費者都已經升級到0.10.0.0或更高版本以後改動。
  5. 逐一從新啓動代理服務器使新協議版本生效。
  6. 若是這時log.message.format.version仍比0.10.0低,等到全部的消費者都已經升級到0.10.0或更高版本,而後更改每一個代理服務器的log.message.format.version到0.10.2,而後逐一從新啓動。

注意:若是你願意接受宕機,你能夠簡單地把全部的代理服務器關閉,更新代碼,而後從新啓動他們。他們將默認使用新的協議。

注:改變協議版本並從新啓動能夠在代理服務器升級以後的任什麼時候間作,沒有必要必須馬上就作。

升級0.10.1版本的Kafka流應用

  • 從0.10.1升級您的流應用程序到0.10.2不須要升級代理。0.10.2 Kafka流應用程序能夠鏈接到0.10.2和0.10.1代理(但沒法鏈接到 0.10.0的代理)。
  • 你須要從新編譯代碼。只是替換Kafka流的jar文件將沒法正常工做,這破壞你的應用程序。
  • 若是您使用自定義(即用戶實現的)的時間戳提取,則須要更新此代碼,由於TimestampExtractor接口改變了。
  • 若是您註冊了自定義指標,您將須要更新此代碼,由於StreamsMetric接口被改變了。
  • 0.10.2 流 API的變化更多的細節。

0.10.2.1顯著的變化

  • 對於StreamsConfig類的兩個配置的默認值的修改提升了Kafka流應用的彈性。內部Kafka流生產者retries默認值從0變化到10,內部Kafka流消費者max.poll.interval.ms 缺省值從300000到改變Integer.MAX_VALUE。

0.10.2.0顯著的變化

  • 在Java客戶端(生產者和消費者)已得到與舊版本代理溝通的能力。版本0.10.2客戶端能夠跟0.10.0版或更新版本的代理溝通。請注意,某些功能在跟就代理溝通的時候不可用或被限制了。
  • 在Java消費者中有幾種方法如今可能拋出InterruptException若是調用線程被中斷。請參閱KafkaConsumer的Javadoc,對這種變化有一個更深刻的解釋。
  • Java的消費者如今被恰當關閉。默認狀況下,消費者會等待30秒才能完成掛起的請求。一個帶有timeout參數的新的API已添加到KafkaConsumer去控制最大等待時間。
  • 用逗號分隔的多個正則表達式能夠傳遞多個Java消費者給MirrorMaker–whitelist選擇。這使得與MirrorMaker使用老Scala消費者時的行爲一致。
  • 從0.10.1升級您的流應用程序0.10.2不須要代理服務器升級。Kafka 0.10.2流應用程序能夠鏈接到0.10.2和0.10.1代理(但沒法鏈接到0.10.0代理)。
  • Zookeeper的依賴從流API中刪除。流API如今使用Kafka協議來管理內部主題,而不是直接修改動物園管理員的主題。這消除了須要直接訪問Zookeeper的特權,而「StreamsConfig.ZOOKEEPER_CONFIG」也不須要在流應用被設置。若是Kafka集羣是安全認證的,流應用程序必須具有必要的安全權限才能夠建立新的主題。
  • 一些新的參數,包括「security.protocol」, 「connections.max.idle.ms」, 「retry.backoff.ms」, 「reconnect.backoff.ms」和「request.timeout.ms」添加到StreamsConfig類。若是用戶須要設置這些,要注意這些默認值。欲瞭解更多詳情,請參閱3.5Kafka流CONFIGS
  • 該offsets.topic.replication.factor代理的配置如今在主題生產中強制使用。直到集羣的大小符合這個複製因子要求,不然,主題的生產將失敗,返回GROUP_COORDINATOR_NOT_AVAILABLE錯誤。

新的協議版本

  • KIP-88:OffsetFetchRequest v2支持偏移檢索全部的主題,若是topics數組設置爲null。
  • KIP-88:OffsetFetchResponse V2引入了頂級error_code域。
  • KIP-103:UpdateMetadataRequest v3引入一個listener_name字段到end_points數組中的元素。
  • KIP-108:CreateTopicsRequest V1引入了一個validate_only參數。
  • KIP-108:CreateTopicsResponse V1引入了error_message到數組topic_errors的元素。

0.8.40.9.x版本或0.10.0.X升級到0.10.1.0

0.10.1.0有線協議發生了變化。經過下面的推薦滾動升級計劃,能保證在升級過程當中無需停機。可是,請注意在升級以前仔細閱讀0.10.1.0潛在的重大更改
注意:因爲新協議的引入,它是升級你的客戶端以前請先完成Kafka集羣的升級(即0.10.1.x客戶端僅支持0.10.1.x或更高版本的代理,但0.10.1.x的代理能夠支持舊版本客戶端)。

對於滾動升級:

  1. 更新全部代理上的server.properties文件,並添加如下屬性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2.0,0.9.0.0或0.10.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在性能的影響對於此配置作什麼的詳細信息。)
  2. 升級代理服務器一次一個:關閉代理,更新代碼,並從新啓動。
  3. 一旦整個羣集升級完成,經過編輯inter.broker.protocol.version並將其設置爲0.10.1.0的協議版本。
  4. 若是您之前的消息格式爲0.10.0,改變log.message.format.version至0.10.1(這是一個無效操做,若是0.10.0和0.10.1兩個協議的消息格式相同)。若是您之前的消息格式版本低於0.10.0,不要改變log.message.format.version — 這個參數只能在全部的消費者都已經升級到0.10.0.0或更高版本以後修改。
  5. 逐一從新啓動代理,新版本協議生效。
  6. 若是log.message.format.version仍比0.10.0低,等到全部的消費者都已經升級到0.10.0或更高版本,而後更改log.message.format.version到0.10.1,逐一從新啓動代理服務器。

注意:若是你願意接受宕機,你能夠簡單地把全部的代理服務器關閉,更新代碼,而後從新啓動他們。他們將默認使用新的協議。

注:改變協議版本並從新啓動能夠在代理服務器升級以後的任什麼時候間作,沒有必要必須馬上就作。

0.10.1.0的重大更改

  • 日誌保留時間再也不基於日誌段的最後修改時間。相反,它會基於日誌段裏擁有最大的時間戳的消息。
  • 日誌滾動時間再也不取決於日誌段建立時間。相反,它如今是基於消息的時間戳。進一步來講,若是日誌段中第一個消息的時間戳是T,當一個新的消息具備的時間戳大於或等於T + log.roll.m,該日誌將被覆蓋。
  • 0.10.0的打開文件的處理程序將增長〜33%,由於每一個日誌段增長的時間索引文件。
  • 時間索引和偏移索引共享相同的索引大小的配置。由於時間索引條目大小是1.5倍偏移索引條目的大小。用戶可能須要增長log.index.size.max.bytes以免潛在的頻繁的日誌滾動。
  • 因爲增長的索引文件,在某些代理服務器上具備大量的日誌段(例如> 15K),代理啓動期間日誌加載過程可能很長。根據咱們的實驗,num.recovery.threads.per.data.dir設置爲1可減小日誌裝載時間。

升級0.10.0Kafka流應用

  • 從0.10.0升級您的流應用程序到0.10.1確實須要一個代理的升級,由於Kafka 0.10.1的流應用程序只能鏈接到0.10.1代理。
  • 有幾個API的變化不向後兼容(參見流API在0.10.1的變化有詳細介紹)。所以,你須要更新和從新編譯代碼。只是交換了Kafka流庫的jar文件將沒法正常工做,並會破壞你的應用程序。

0.10.1.0顯著的變化

  • 新的Java消費者不是beta版了,咱們推薦它作新的應用開發。老Scala消費者仍然支持,但他們會在將來的版本中將會棄用,並將在將來的主版本中刪除。
  • 在使用像MirrorMaker和控制檯消費者新建消費者的過程當中–new-consumer/ –new.consumer開關再也不被須要; 一個簡單地使用是經過一個Kafka代理去鏈接,而不是Zookeeper的合集。此外,控制檯消費者去鏈接舊版本的消費者已被棄用,並將在將來的主版本中刪除。
  • Kafka集羣如今能夠經過一個集羣ID被惟一標識。其會在一個代理升級到0.10.1.0時自動生成。集羣ID經由kafka.server可用:type= KafkaServer,name= ClusterId metric ,它是所述元數據響應的一部分。串行器,客戶端攔截器和度量報告能夠經過實現ClusterResourceListener接口接收集羣ID。
  • BrokerState 「RunningAsController」(值4)已被刪除。因爲一個bug,代理在轉換狀態以前只會簡單的這種狀態下,所以去除的影響應該很小。一種推薦的檢測方法是一個給定的代理的控制器是由kafka.controller實現:type=KafkaController,name=ActiveControllerCount metric。
  • 新的Java消費者如今能夠容許用戶經過時間戳在分區上搜索偏移量(offset)。
  • 新的Java消費者如今能夠從後臺線程支持心跳檢查。有一個新的配置 max.poll.interval.ms,它控制消費者會主動離開組(5分鐘默認狀況下)以前輪詢調用的最大時間。配置的值 request.timeout.ms必須始終大於max.poll.interval.ms由於這是一個JoinGroup請求能夠在服務器上被阻止到消費者被負載均衡以前的最長時間.因此咱們能夠改變默認值爲恰好超過5分鐘。最後,默認值session.timeout.ms已調整到10秒,默認值max.poll.records已更改成500。
  • 當受權者和用戶沒有說明某個主題的受權,代理將再也不返回TOPIC_AUTHORIZATION_FAILED給請求,由於這會泄漏主題名稱。相反,UNKNOWN_TOPIC_OR_PARTITION錯誤代碼將被返回。使用Kafka生產者和消費者一般會在收到未知的主題錯誤時自動重試,這可能會致使意外的超時或延遲。若是你懷疑這種狀況發生了,你能夠查看客戶端的log去檢查。
  • 獲取返回有默認的大小限制(消費者50 MB和副本的複製10 MB)。現有的每一個分區的限制也適用(消費者和副本複製爲1 MB)。請注意,這些限制都不是絕對最大值,在下一個要點有解釋。
  • 消費者和副本能夠繼續進行,若是發現一個消息大於返回/分區大小的限制。更具體地,若是在非空的分區上提取的第一個消息比任一個或兩個限值大,仍然會被返回。
  • 重載的構造函數加入到kafka.api.FetchRequest和kafka.javaapi.FetchRequest容許調用者指定分區順序(由於順序在V3是很重要的)。先前存在的構造函數被棄用,在發送請求以免飢餓問題以前,分區會被洗牌。

新的協議版本

  • ListOffsetRequest V1支持精確的基於時間戳的偏移搜索。
  • MetadataResponse V2引入了一個新的參數: 「CLUSTER_ID」。
  • FetchRequest v3支持限制請求返回的大小(除了現有的每一個分區的限制),它可以返回比限制更大的消息和在請求中加入分區的順序具備重要意義。
  • JoinGroup V1引入了一個新的字段: 「rebalance_timeout」。

升級0.8.40.9.x版本到0.10.0.0

0.10.0.0具備的潛在的重大更改(請在升級前仔細檢查更改)和 在升級後的性能影響。經過下面的推薦滾動升級計劃,能保證不宕機,不影響性能和隨後的升級。
注意:因爲新協議的引入,升級客戶端以前升級您的Kafka集羣是很重要的。

注意0.9.0.0版本的客戶端:因爲0.9.0.0引入了一個錯誤,即依賴於ZooKeeper的客戶(老Scala高層次消費者和與老消費者一塊兒使用的MirrorMaker)不能和0.10.0.x代理一塊兒工做。所以,代理都升級到0.10.0.x以前, 0.9.0.0客戶端應升級到0.9.0.1 . 這一步對0.8.4或0.9.0.1客戶端沒有必要。

對於滾動升級:

  1. 更新全部代理服務器的server.properties文件,並添加如下屬性:
    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2或0.9.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級後的潛在性能的影響對於此配置作什麼的詳細信息。)
  2. 升級代理。這能夠經過簡單地將其關機,更新代碼,並從新啓動實現。
  3. 一旦整個羣集升級結束,經過編輯inter.broker.protocol.version並將其設置爲0.10.0.0的協議版本。注意:您不該該修改log.message.format.version — 這個參數只能在全部的消費者都已經升級到0.10.0.0以後再修改。
  4. 逐一從新啓動代理,新協議版本生效。
  5. 一旦全部的消費者都已經升級到0.10.0,逐一修改log.message.format.version至0.10.0和重啓代理服務器。

注意:若是你願意接受宕機,你能夠簡單地把全部的代理服務器關閉,更新代碼,而後從新啓動他們。他們將默認使用新的協議。

注:改變協議版本並從新啓動能夠在代理服務器升級以後的任什麼時候間作,沒有必要必須馬上就作。

升級到0.10.0.0帶來的潛在的性能影響

0.10.0消息格式包括一個新的時間戳字段,並對壓縮的消息使用相對偏移。磁盤上的消息格式能夠經過在server.properties文件的log.message.format.version進行配置。默認的磁盤上的消息格式爲0.10.0。若是消費者客戶端的版本是0.10.0.0以前的版本,那它只能明白0.10.0以前的消息格式。在這種狀況下,代理可以把消息從0.10.0格式轉換到一個較早的格式再發送舊版本的響應給消費者。然而,代理不能在這種狀況下使用零拷貝轉移。Kafka社區報告顯示性能的影響爲CPU利用率從20%增長至將近100%,這迫使全部客戶端的必須即時升級使性能恢復正常。爲了不這樣的消息轉換帶來的性能問題,消費者升級到0.10.0.0以前,在升級代理到0.10.0.0的過程當中設置log.message.format.version到0.8.2或0.9.0。這樣一來,代理仍然可使用零拷貝傳輸,將數據發送到老消費者。一旦消費者升級完成,消息格式更改成0.10.0,這樣代理就能夠享受新的消息格式包括新的時間戳和改進的壓縮算法。這種轉換能夠支持兼容性,對只有幾個尚未更新到最新客戶端的應用程序很是有用,但不切實際的是使用一個過分使用的集羣中去支持全部消費者的流量。所以,當代理已經升級,但大多數客戶端尚未完成升級的狀況,要儘量避免使用這種信息轉換。

對於升級到0.10.0.0客戶,沒有性能影響。

注:設置消息格式版本是一個證實,現有的全部支持的消息都在這個版本或低於該消息格式的版本。不然, 0.10.0.0以前的消費者可能不能正常工做。特別是消息格式設置爲0.10.0以後,不該該再改回先前的格式,由於它可能使得0.10.0.0以前的消費者工做異常。

注:因爲每一個消息中引入了另外的時間戳,生產者發送的消息大小比較小的時候由於額外的負載開銷也許會看到吞吐量的降低。一樣,副本的複製會讓每一個消息額外傳輸8個字節。若是你正在運行接近集羣承載能力的網絡容量,你可能會壓垮網卡,因爲超載而發生故障和性能問題。

 

注:若是您已對生產者啓用壓縮算法,您可能會注意到下降的生產者吞吐量和/或在某些狀況下代理下降的壓縮比。當接收到壓縮的消息,0.10.0代理避免再次壓縮消息,其一般下降了等待時間,並提升了吞吐量。在某些狀況下,這可能會減小生產者批量消息包的大小,這可能致使更糟糕的吞吐量。若是發生這種狀況,用戶能夠調整生產者的linger.ms和batch.size以得到更好的吞吐量。此外,用於高效壓縮消息的生產者緩衝區比代理使用的緩衝區小,這可能對磁盤的壓縮消息比率有負面的影響。咱們打算在將來的Kafka版本中可以配置這些參數。

0.10.0.0潛在的重大更改

  • 從Kafka0.10.0.0開始,Kafka消息格式的版本被表示爲Kafka版本。例如,消息格式0.9.0指經過Kafka0.9.0支持的最高消息版本。
  • 消息格式0.10.0已經推出,它是默認使用的版本。它引入了一個時間戳字段和相對偏移被用於壓縮消息。
  • ProduceRequest /Response V2已經被引入,它在默認狀況下支持消息格式0.10.0
  • FetchRequest /Response V2已經被引入,它在默認狀況下支持消息格式0.10.0
  • MessageFormatter接口從def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)變爲 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
  • MessageReader接口從def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]變爲 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
  • MessageFormatter的包從kafka.tools改變爲kafka.common
  • MessageReader的包從kafka.tools改變我kafka.common
  • MirrorMakerMessageHandler再也不公開方法handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]),由於它歷來沒有被調用。
  • 0.7 KafkaMigrationTool再也不被打包進Kafka包。若是您須要從0.7遷移到0.10.0,請先遷移到0.8,而後再按照文檔的升級過程升級0.8到0.10.0。
  • 新的消費者擁有標準化的API,接受java.util.Collection做爲方法參數序列類型。現有的代碼可能須要更新才能與0.10.0客戶端庫一塊兒工做。
  • LZ4-compressed的消息處理被改變爲使用可互操做的幀規範(LZ4f V1.5.1)。爲了保持與舊客戶端的兼容性,這一變化僅適用於消息格式0.10.0及更高版本。使用V0 / V1(消息格式0.9.0)的客戶端應該繼續使用0.9.0幀規範實現執行產生/抓取LZ4壓縮消息。使用生產/獲取協議v2或更高版本客戶端應該使用互操做LZ4f幀規範。可互操做的LZ4庫的列表,請參考http://www.lz4.org/

0.10.0.0顯著的變化

  • 從Kafka0.10.0.0開始,新的客戶端庫Kafka可用於流處理存儲在Kafka主題的數據。這個新的客戶端庫只適用於0.10.x及後面版本的代理。欲瞭解更多信息,請閱讀流文件
  • 對新的消費者,配置參數receive.buffer.bytes的默認值如今是64k。
  • 新的消費者如今公開暴露配置參數exclude.internal.topics去限制內部主題(諸如消費者偏移主題),不讓這些主題被偶然的包括在正則表達式的主題訂閱中。默認狀況下,它處於啓用狀態。
  • 老Scala生產者已被棄用。用戶要儘快遷移他們的代碼到Kafka客戶端JAR裏的Java生產者。
  • 新的消費者API已經被標記爲穩定。

升級0.8.00.8.1.X0.8.2.X0.9.0.0

0.9.0.0具備的潛在的重大更改(請在升級前檢查),還有之前的版本到如今的代理間協議的變化。這意味着升級的代理和客戶端可能不兼容舊版本。您在升級您的客戶端以前升級Kafka集羣是很重要的。若是您正在使用MirrorMaker下游集羣應該先升級爲好。

對於滾動升級:

  1. 更新全部代理上的server.properties文件,並添加如下屬性:inter.broker.protocol.version = 0.8.2.X
  2. 逐一升級的代理。能夠經過簡單地將其關閉,更新代碼,並從新啓動它實現。
  3. 一旦整個羣集升級成功,經過編輯inter.broker.protocol.version並將其設置爲0.9.0.0的協議版本。
  4. 逐一從新啓動代理使新協議版本生效

注意:若是你願意接受宕機,你能夠簡單地把全部的代理服務器關閉,更新代碼,而後從新啓動他們。他們將默認使用新的協議。

注:改變協議版本並從新啓動能夠在代理服務器升級以後的任什麼時候間作,沒有必要必須馬上就作。

0.9.0.0潛在的重大更改

  • Java 1.6再也不支持。
  • Scala 2.9再也不支持。
  • 1000以上的代理ID如今默認保留,用來作自動分配的代理ID。若是您的集羣已存在高於閾值的經紀人的ID確保相應地增長reserved.broker.max.id代理配置屬性。
  • 配置參數replica.lag.max.messages被刪除。分區Leader將再也不考慮滯後的消息數量來決定哪些副本是同步的,。
  • 配置參數replica.lag.time.max.ms如今不只指從副本提取請求所花費的時間,也標識副本最後一次同步到如今通過的時間。那些副本仍然從領導者獲取信息,但在replica.lag.time.max.ms時間內沒有從leader最新消息的副本將被認爲是不一樣步的。
  • 壓縮主題再也不接受沒有主鍵消息和遇到這種狀況生產者會拋出一個異常。在0.8.4,沒有主鍵的消息會致使日誌壓縮線程退出(並中止全部壓縮主題的處理)。
  • MirrorMaker再也不支持多種目標集羣。所以,它只能接受一個–consumer.config參數。要鏡像多個源集羣,則須要每一個源集羣至少一個MirrorMaker實例,每一個都有本身的消費者配置。
  • 在包org.apache.kafka.clients.tools.*裏的工具已移至org.apache.kafka.tools.*。全部的其中的腳本將仍然像往常同樣起做用,只是直接導入這些類的自定義代碼將受到影響。
  • 默認的KafkaJVM性能選項(KAFKA_JVM_PERFORMANCE_OPTS)已經在kafka-run-class.sh被改變。
  • 該kafka-topics.sh腳本(kafka.admin.TopicCommand)如今失敗會返回非零退出代碼。
  • 該kafka-topics.sh腳本(kafka.admin.TopicCommand)如今碰到因爲使用「.」 或「_」的主題的名稱將打印警告信息,以及在實際發生衝突的狀況下打印錯誤信息。
  • 該kafka-console-producer.sh腳本(kafka.tools.ConsoleProducer)默認將使用Java生產者而不是舊的Scala生產者,而且用戶必須指定「老生產者」使用舊版本的生產者。
  • 默認狀況下,全部命令行工具將打印全部消息記錄到stderr而不是stdout。

0.9.0.1的顯着變化

  • 新的代理ID自動生成功能能夠經過設置broker.id.generation.enable爲false禁用。
  • 配置參數log.cleaner.enable如今默認爲true。這意味主題在配置 cleanup.policy=compact下將缺省壓縮,清潔器進程經過log.cleaner.dedupe.buffer.size缺省被分配128MB堆。您能夠檢查你的配置log.cleaner.dedupe.buffer.size,並根據您的壓縮主題使用其餘log.cleaner配置值。
  • 對於新的消費者,配置參數fetch.min.bytes的默認值如今是1。

0.9.0.0棄用的功能

  • 從kafka-topics.sh腳本(kafka.admin.TopicCommand)改變主題配置已被棄用。從此,請使用kafka-configs.sh腳本(kafka.admin.ConfigCommand)。
  • 該kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已被棄用。從此,請使用kafka-consumer-groups.sh(kafka.admin.ConsumerGroupCommand)。
  • 該kafka.tools.ProducerPerformance類已棄用。從此,請使用org.apache.kafka.tools.ProducerPerformance(kafka-producer-perf-test.sh也將改成使用新的類)。
  • 生產者配置block.on.buffer.full已被棄用,並將在將來的版本中刪除。目前,它的默認值已更改成false。該KafkaProducer將再也不拋出BufferExhaustedException而是將使用max.block.ms值來阻止,以後它會拋出一個TimeoutException。若是block.on.buffer.full屬性顯式的設置爲true,它將設置max.block.ms到Long.MAX_VALUE,而metadata.fetch.timeout.ms將不被承認。

0.8.1 升級到0.8.2

0.8.2與0.8.1徹底兼容。能夠經過簡單地將其關閉,更新代碼,並從新啓動逐一升級代理。

0.8.0升級到 0.8.1

0.8.1與0.8徹底兼容。能夠經過簡單地將其關閉,更新代碼,並從新啓動逐一升級代理。0.7升級

0.7版本與新版本不兼容。API,Zookeeper的數據結構和協議,能夠配置的增長副本(這是在0.7沒有的),都發生了重大變化。從0.7到更高版本的升級須要特殊的工具進行遷移。這種遷移能夠無需宕機就能夠完成。

轉載自併發編程網 – ifeve.com

相關文章
相關標籤/搜索