kafka集羣原理介紹

kafka集羣原理介紹

@(博客文章)[kafka|大數據]java

本系統文章共三篇,分別爲node

一、kafka集羣原理介紹瞭如下幾個方面的內容:linux

(1)kafka基礎理論算法

(2)參數配置apache

(3)錯誤處理編程

(4)kafka集羣在zookeeper集羣中的內容數組

二、kafka集羣操做介紹了kafka集羣的安裝與操做緩存

(1)單機版安裝性能優化

(2)集羣安裝

(3)集羣啓停操做

(4)topic相關操做

(5)某個broker掛掉,重啓本機器

(6)某個broker掛掉且沒法重啓,使用其它機器代替

(7)擴容

(8)數據遷移

(9)機器下線

(10)增長副本數量

(11)平衡leader

三、kafka集羣編程介紹了...

(一)基礎理論

一、相關資料
官方資料,很是詳細:
http://kafka.apache.org/documentation.html#quickstart

如下部份內容來源於此文檔。

二、kafka是什麼?
(1)Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Kafka是一個 分佈式的、可分區的、可複製的消息系統。它提供了普通消息系統的功能,但具備本身獨特的設計。
(2)能夠簡單的理解爲:kafka是一個日誌集羣,各類各樣的服務器將它們自身的日誌發送到集羣中進行統一彙總和存儲,而後其它機器從集羣中拉取消息進行分析處理,如ELT、數據挖掘等。
(3)kafka使用scala語言實現,提供了JAVA API,同時對多種語言都提供了支持。

三、幾個關鍵術語
topic: Kafka將消息以topic爲單位進行概括。
producer: 將向Kafka topic發佈消息的程序稱爲producers.
consumer: 將預訂topics並消費消息的程序稱爲consumer.
broker: Kafka以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker.

四、分區與副本
(1)一個topic是對一組消息的概括。對每一個topic,Kafka 對它的日誌進行了分區。
(2)通常而言,一個topic會有多個分區,每一個分區會有多個副本。
分區是分了將一個topic分到多個地方存儲,提升並行處理的能力。副本是爲了容錯,保證數據不丟失。
(3)對於每個分區,都會選取一個leader,這個分區的全部讀取都在這個leader中進行,而其它副本會同步leader中的數據,且只作備份。
即leader只是針對一個分區而言,而非整個集羣。一個服務器對於某個分區是leader,對於其它分區多是follower。
(4) Producer將消息發佈到它指定的topic中,並負責決定發佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也能夠經過特定的分區函數選擇分區。
(5)發佈消息一般有兩種模式:隊列模式(queuing)和發佈-訂閱模式(publish-subscribe)。隊列模式中,consumers能夠同時從服務端讀取消息,每一個消息只被其中一個consumer讀到;發佈-訂閱模式中消息被廣播到全部的consumer中。
Consumers能夠加入一個consumer 組,共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。同一組中的consumer能夠在不一樣的程序中,也能夠在不一樣的機器上。若是全部的consumer都在一個組中,這就成爲了傳統的隊列模式,在各consumer中實現負載均衡。
若是全部的consumer都不在不一樣的組中,這就成爲了發佈-訂閱模式,全部的消息都被分發到全部的consumer中。
更常見的是,每一個topic都有若干數量的consumer組,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組由若干consumer組成。這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。
(6)有序性

相比傳統的消息系統,Kafka能夠很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分 發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使 順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。

在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分 只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多 個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。

Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。

五、數據持久化(本部份內容直接翻譯自官方文檔)

不要畏懼文件系統!

Kafka大量依賴文件系統去存儲和緩存消息。對於硬盤有個傳統的觀念是硬盤老是很慢,這使不少人懷疑基於文件系統的架構可否提供優異的性能。實際上硬盤的快慢徹底取決於使用它的方式。設計良好的硬盤架構能夠和內存同樣快。

在6塊7200轉的SATA RAID-5磁盤陣列的線性寫速度差很少是600MB/s,可是隨即寫的速度倒是100k/s,差了差很少6000倍。現代的操做系統都對次作了大量的優化,使用了 read-ahead 和 write-behind的技巧,讀取的時候成塊的預讀取數據,寫的時候將各類微小瑣碎的邏輯寫入組織合併成一次較大的物理寫入。對此的深刻討論能夠查看這裏,它們發現線性的訪問磁盤,不少時候比隨機的內存訪問快得多。

爲了提升性能,現代操做系統每每使用內存做爲磁盤的緩存,現代操做系統樂於把全部空閒內存用做磁盤緩存,雖然這可能在緩存回收和從新分配時犧牲一些性能。全部的磁盤讀寫操做都會通過這個緩存,這不太可能被繞開除非直接使用I/O。因此雖然每一個程序都在本身的線程裏只緩存了一份數據,但在操做系統的緩存裏還有一份,這等於存了兩份數據。

另外再來討論一下JVM,如下兩個事實是衆所周知的:

•Java對象佔用空間是很是大的,差很少是要存儲的數據的兩倍甚至更高。

•隨着堆中數據量的增長,垃圾回收回變的愈來愈困難。

基於以上分析,若是把數據緩存在內存裏,由於須要存儲兩份,不得不使用兩倍的內存空間,Kafka基於JVM,又不得不將空間再次加倍,再加上要避免GC帶來的性能影響,在一個32G內存的機器上,不得不使用到28-30G的內存空間。而且當系統重啓的時候,又必需要將數據刷到內存中( 10GB 內存差很少要用10分鐘),就算使用冷刷新(不是一次性刷進內存,而是在使用數據的時候沒有就刷到內存)也會致使最初的時候新能很是慢。可是使用文件系統,即便系統重啓了,也不須要刷新數據。使用文件系統也簡化了維護數據一致性的邏輯。

因此與傳統的將數據緩存在內存中而後刷到硬盤的設計不一樣,Kafka直接將數據寫到了文件系統的日誌中。

常量時間的操做效率

在大多數的消息系統中,數據持久化的機制每每是爲每一個cosumer提供一個B樹或者其餘的隨機讀寫的數據結構。B樹固然是很棒的,可是也帶了一些代價:好比B樹的複雜度是O(log N),O(log N)一般被認爲就是常量複雜度了,但對於硬盤操做來講並不是如此。磁盤進行一次搜索須要10ms,每一個硬盤在同一時間只能進行一次搜索,這樣併發處理就成了問題。雖然存儲系統使用緩存進行了大量優化,可是對於樹結構的性能的觀察結果卻代表,它的性能每每隨着數據的增加而線性降低,數據增加一倍,速度就會下降一倍。

直觀的講,對於主要用於日誌處理的消息系統,數據的持久化能夠簡單的經過將數據追加到文件中實現,讀的時候從文件中讀就行了。這樣作的好處是讀和寫都是 O(1) 的,而且讀操做不會阻塞寫操做和其餘操做。這樣帶來的性能優點是很明顯的,由於性能和數據的大小沒有關係了。

既然可使用幾乎沒有容量限制(相對於內存來講)的硬盤空間創建消息系統,就能夠在沒有性能損失的狀況下提供一些通常消息系統不具有的特性。好比,通常的消息系統都是在消息被消費後當即刪除,Kafka卻能夠將消息保存一段時間(好比一星期),這給consumer提供了很好的機動性和靈活性。

六、事務性

以前討論了consumer和producer是怎麼工做的,如今來討論一下數據傳輸方面。數據傳輸的事務定義一般有如下三種級別:

最多一次: 消息不會被重複發送,最多被傳輸一次,但也有可能一次不傳輸。

最少一次: 消息不會被漏發送,最少被傳輸一次,但也有可能被重複傳輸.

精確的一次(Exactly once): 不會漏傳輸也不會重複傳輸,每一個消息都傳輸被一次並且僅僅被傳輸一次,這是你們所指望的。

大多數消息系統聲稱能夠作到「精確的一次」,可是仔細閱讀它們的的文檔能夠看到裏面存在誤導,好比沒有說明當consumer或producer失敗時怎麼樣,或者當有多個consumer並行時怎麼樣,或寫入硬盤的數據丟失時又會怎麼樣。kafka的作法要更先進一些。當發佈消息時,Kafka有一個「committed」的概念,一旦消息被提交了,只要消息被寫入的分區的所在的副本broker是活動的,數據就不會丟失。關於副本的活動的概念,下節文檔會討論。如今假設broker是不會down的。

若是producer發佈消息時發生了網絡錯誤,但又不肯定實在提交以前發生的仍是提交以後發生的,這種狀況雖然不常見,可是必須考慮進去,如今Kafka版本尚未解決這個問題,未來的版本正在努力嘗試解決。

並非全部的狀況都須要「精確的一次」這樣高的級別,Kafka容許producer靈活的指定級別。好比producer能夠指定必須等待消息被提交的通知,或者徹底的異步發送消息而不等待任何通知,或者僅僅等待leader聲明它拿到了消息(followers沒有必要)。

如今從consumer的方面考慮這個問題,全部的副本都有相同的日誌文件和相同的offset,consumer維護本身消費的消息的offset,若是consumer不會崩潰固然能夠在內存中保存這個值,固然誰也不能保證這點。若是consumer崩潰了,會有另一個consumer接着消費消息,它須要從一個合適的offset繼續處理。這種狀況下能夠有如下選擇:

consumer能夠先讀取消息,而後將offset寫入日誌文件中,而後再處理消息。這存在一種可能就是在存儲offset後還沒處理消息就crash了,新的consumer繼續從這個offset處理,那麼就會有些消息永遠不會被處理,這就是上面說的「最多一次」。

consumer能夠先讀取消息,處理消息,最後記錄offset,固然若是在記錄offset以前就crash了,新的consumer會重複的消費一些消息,這就是上面說的「最少一次」。

「精確一次」能夠經過將提交分爲兩個階段來解決:保存了offset後提交一次,消息處理成功以後再提交一次。可是還有個更簡單的作法:將消息的offset和消息被處理後的結果保存在一塊兒。好比用Hadoop ETL處理消息時,將處理後的結果和offset同時保存在HDFS中,這樣就能保證消息和offser同時被處理了

七、關於性能優化

Kafka在提升效率方面作了很大努力。Kafka的一個主要使用場景是處理網站活動日誌,吞吐量是很是大的,每一個頁面都會產生好屢次寫操做。讀方面,假設每一個消息只被消費一次,讀的量的也是很大的,Kafka也儘可能使讀的操做更輕量化。

咱們以前討論了磁盤的性能問題,線性讀寫的狀況下影響磁盤性能問題大約有兩個方面:太多的瑣碎的I/O操做和太多的字節拷貝。I/O問題發生在客戶端和服務端之間,也發生在服務端內部的持久化的操做中。
消息集(message set)
爲了不這些問題,Kafka創建了「消息集(message set)」的概念,將消息組織到一塊兒,做爲處理的單位。以消息集爲單位處理消息,比以單個的消息爲單位處理,會提高很多性能。Producer把消息集一塊發送給服務端,而不是一條條的發送;服務端把消息集一次性的追加到日誌文件中,這樣減小了瑣碎的I/O操做。consumer也能夠一次性的請求一個消息集。
另一個性能優化是在字節拷貝方面。在低負載的狀況下這不是問題,可是在高負載的狀況下它的影響仍是很大的。爲了不這個問題,Kafka使用了標準的二進制消息格式,這個格式能夠在producer,broker和producer之間共享而無需作任何改動。
zero copy
Broker維護的消息日誌僅僅是一些目錄文件,消息集以固定隊的格式寫入到日誌文件中,這個格式producer和consumer是共享的,這使得Kafka能夠一個很重要的點進行優化:消息在網絡上的傳遞。現代的unix操做系統提供了高性能的將數據從頁面緩存發送到socket的系統函數,在linux中,這個函數是sendfile.
爲了更好的理解sendfile的好處,咱們先來看下通常將數據從文件發送到socket的數據流向:

操做系統把數據從文件拷貝內核中的頁緩存中
應用程序從頁緩存從把數據拷貝本身的內存緩存中
應用程序將數據寫入到內核中socket緩存中
操做系統把數據從socket緩存中拷貝到網卡接口緩存,從這裏發送到網絡上。

這顯然是低效率的,有4次拷貝和2次系統調用。Sendfile經過直接將數據從頁面緩存發送網卡接口緩存,避免了重複拷貝,大大的優化了性能。
在一個多consumers的場景裏,數據僅僅被拷貝到頁面緩存一次而不是每次消費消息的時候都重複的進行拷貝。這使得消息以近乎網絡帶寬的速率發送出去。這樣在磁盤層面你幾乎看不到任何的讀操做,由於數據都是從頁面緩存中直接發送到網絡上去了。

八、數據壓縮
不少時候,性能的瓶頸並不是CPU或者硬盤而是網絡帶寬,對於須要在數據中心之間傳送大量數據的應用更是如此。固然用戶能夠在沒有Kafka支持的狀況下各自壓縮本身的消息,可是這將致使較低的壓縮率,由於相比於將消息單獨壓縮,將大量文件壓縮在一塊兒才能起到最好的壓縮效果。
Kafka採用了端到端的壓縮:由於有「消息集」的概念,客戶端的消息能夠一塊兒被壓縮後送到服務端,並以壓縮後的格式寫入日誌文件,以壓縮的格式發送到consumer,消息從producer發出到consumer拿到都被是壓縮的,只有在consumer使用的時候才被解壓縮,因此叫作「端到端的壓縮」。
Kafka支持GZIP和Snappy壓縮協議。

九、producer和consumer

Kafka Producer

消息發送

producer直接將數據發送到broker的leader(主節點),不須要在多個節點進行分發。爲了幫助producer作到這點,全部的Kafka節點均可以及時的告知:哪些節點是活動的,目標topic目標分區的leader在哪。這樣producer就能夠直接將消息發送到目的地了。

客戶端控制消息將被分發到哪一個分區。能夠經過負載均衡隨機的選擇,或者使用分區函數。Kafka容許用戶實現分區函數,指定分區的key,將消息hash到不一樣的分區上(固然有須要的話,也能夠覆蓋這個分區函數本身實現邏輯).好比若是你指定的key是user id,那麼同一個用戶發送的消息都被髮送到同一個分區上。通過分區以後,consumer就能夠有目的的消費某個分區的消息。

異步發送

批量發送能夠頗有效的提升發送效率。Kafka producer的異步發送模式容許進行批量發送,先將消息緩存在內存中,而後一次請求批量發送出去。這個策略能夠配置的,好比能夠指定緩存的消息達到某個量的時候就發出去,或者緩存了固定的時間後就發送出去(好比100條消息就發送,或者每5秒發送一次)。這種策略將大大減小服務端的I/O次數。

既然緩存是在producer端進行的,那麼當producer崩潰時,這些消息就會丟失。Kafka0.8.1的異步發送模式還不支持回調,就不能在發送出錯時進行處理。Kafka 0.9可能會增長這樣的回調函數。見Proposed Producer API.

Kafka Consumer

Kafa consumer消費消息時,向broker發出"fetch"請求去消費特定分區的消息。consumer指定消息在日誌中的偏移量(offset),就能夠消費從這個位置開始的消息。customer擁有了offset的控制權,能夠向後回滾去從新消費以前的消息,這是頗有意義的。

十、推仍是拉?

Kafka最初考慮的問題是,customer應該從brokes拉取消息仍是brokers將消息推送到consumer,也就是pull還push。在這方面,Kafka遵循了一種大部分消息系統共同的傳統的設計:producer將消息推送到broker,consumer從broker拉取消息。
一些消息系統好比Scribe和Apache Flume採用了push模式,將消息推送到下游的consumer。這樣作有好處也有壞處:由broker決定消息推送的速率,對於不一樣消費速率的consumer就不太好處理了。消息系統都致力於讓consumer以最大的速率最快速的消費消息,但不幸的是,push模式下,當broker推送的速率遠大於consumer消費的速率時,consumer恐怕就要崩潰了。最終Kafka仍是選取了傳統的pull模式。
Pull模式的另一個好處是consumer能夠自主決定是否批量的從broker拉取數據。Push模式必須在不知道下游consumer消費能力和消費策略的狀況下決定是當即推送每條消息仍是緩存以後批量推送。若是爲了不consumer崩潰而採用較低的推送速率,將可能致使一次只推送較少的消息而形成浪費。Pull模式下,consumer就能夠根據本身的消費能力去決定這些策略。
Pull有個缺點是,若是broker沒有可供消費的消息,將致使consumer不斷在循環中輪詢,直到新消息到t達。爲了不這點,Kafka有個參數可讓consumer阻塞知道新消息到達(固然也能夠阻塞知道消息的數量達到某個特定的量這樣就能夠批量發送)。

十一、消費狀態跟蹤

對消費消息狀態的記錄也是很重要的。
大部分消息系統在broker端的維護消息被消費的記錄:一個消息被分發到consumer後broker就立刻進行標記或者等待customer的通知後進行標記。這樣也能夠在消息在消費後立馬就刪除以減小空間佔用。
可是這樣會不會有什麼問題呢?若是一條消息發送出去以後就當即被標記爲消費過的,一旦consumer處理消息時失敗了(好比程序崩潰)消息就丟失了。爲了解決這個問題,不少消息系統提供了另一個個功能:當消息被髮送出去以後僅僅被標記爲已發送狀態,當接到consumer已經消費成功的通知後才標記爲已被消費的狀態。這雖然解決了消息丟失的問題,但產生了新問題,首先若是consumer處理消息成功了可是向broker發送響應時失敗了,這條消息將被消費兩次。第二個問題時,broker必須維護每條消息的狀態,而且每次都要先鎖住消息而後更改狀態而後釋放鎖。這樣麻煩又來了,且不說要維護大量的狀態數據,好比若是消息發送出去但沒有收到消費成功的通知,這條消息將一直處於被鎖定的狀態,
Kafka採用了不一樣的策略。Topic被分紅了若干分區,每一個分區在同一時間只被一個consumer消費。這意味着每一個分區被消費的消息在日誌中的位置僅僅是一個簡單的整數:offset。這樣就很容易標記每一個分區消費狀態就很容易了,僅僅須要一個整數而已。這樣消費狀態的跟蹤就很簡單了。
這帶來了另一個好處:consumer能夠把offset調成一個較老的值,去從新消費老的消息。這對傳統的消息系統來講看起來有些難以想象,但確實是很是有用的,誰規定了一條消息只能被消費一次呢?consumer發現解析數據的程序有bug,在修改bug後再來解析一次消息,看起來是很合理的額呀!

十二、離線處理消息

高級的數據持久化容許consumer每一個隔一段時間批量的將數據加載到線下系統中好比Hadoop或者數據倉庫。這種狀況下,Hadoop能夠將加載任務分拆,拆成每一個broker或每一個topic或每一個分區一個加載任務。Hadoop具備任務管理功能,當一個任務失敗了就能夠重啓而不用擔憂數據被從新加載,只要從上次加載的位置繼續加載消息就能夠了。

1三、副本與主從關係(本部分直接翻譯自官方文檔)

Kafka容許topic的分區擁有若干副本,這個數量是能夠配置的,你能夠爲每一個topci配置副本的數量。Kafka會自動在每一個個副本上備份數據,因此當一個節點down掉時數據依然是可用的。

Kafka的副本功能不是必須的,你能夠配置只有一個副本,這樣其實就至關於只有一份數據。

建立副本的單位是topic的分區,每一個分區都有一個leader和零或多個followers.全部的讀寫操做都由leader處理,通常分區的數量都比broker的數量多的多,各分區的leader均勻的分佈在brokers中。全部的followers都複製leader的日誌,日誌中的消息和順序都和leader中的一致。flowers向普通的consumer那樣從leader那裏拉取消息並保存在本身的日誌文件中。
許多分佈式的消息系統自動的處理失敗的請求,它們對一個節點是否
着(alive)」有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:

節點必須能夠維護和ZooKeeper的鏈接,Zookeeper經過心跳機制檢查每一個節點的鏈接。
若是節點是個follower,他必須能及時的同步leader的寫操做,延時不能過久。
符合以上條件的節點準確的說應該是「同步中的(in sync)」,而不是模糊的說是「活着的」或是「失敗的」。Leader會追蹤全部「同步中」的節點,一旦一個down掉了,或是卡住了,或是延時過久,leader就會把它移除。至於延時多久算是「過久」,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
只有當消息被全部的副本加入到日誌中時,纔算是「committed」,只有committed的消息纔會發送給consumer,這樣就不用擔憂一旦leader down掉了消息會丟失。Producer也能夠選擇是否等待消息被提交的通知,這個是由參數request.required.acks決定的。

Kafka保證只要有一個「同步中」的節點,「committed」的消息就不會丟失。

1四、Leader的選擇

Kafka的核心是日誌文件,日誌文件在集羣中的同步是分佈式數據系統最基礎的要素。

若是leaders永遠不會down的話咱們就不須要followers了!一旦leader down掉了,須要在followers中選擇一個新的leader.可是followers自己有可能延時過久或者crash,因此必須選擇高質量的follower做爲leader.必須保證,一旦一個消息被提交了,可是leader down掉了,新選出的leader必須能夠提供這條消息。大部分的分佈式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據全部副本節點的情況動態的選擇最適合的做爲leader.Kafka並非使用這種方法。

Kafaka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每一個節點讀取並追加到日誌中了,纔回通知外部這個消息已經被提交了。所以這個集合中的任何一個節點隨時均可以被選爲leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就能夠容許在f個節點down掉的狀況下不會丟失消息並正常提供服。ISR的成員是動態的,若是一個節點被淘汰了,當它從新達到「同步中」的狀態時,他能夠從新加入ISR.這種leader的選擇方式是很是快速的,適合kafka的應用場景。

一個邪惡的想法:若是全部節點都down掉了怎麼辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦全部節點都down了,這個就不能保證了。
實際應用中,當全部的副本都down掉時,必須及時做出反應。能夠有如下兩種選擇:

等待ISR中的任何一個節點恢復並擔任leader。
選擇全部節點中(不僅是ISR)第一個恢復的節點做爲leader.
這是一個在可用性和連續性之間的權衡。若是等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集羣就永遠恢復不了了。若是等待ISR意外的節點恢復,這個節點的數據就會被做爲線上數據,有可能和真實的數據有所出入,由於有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在將來的版本中將使這個策略的選擇可配置,能夠根據場景靈活的選擇。

這種窘境不僅Kafka會遇到,幾乎全部的分佈式數據系統都會遇到。

1五、副本管理

以上僅僅以一個topic一個分區爲例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka儘可能的使全部分區均勻的分佈到集羣全部的節點上而不是集中在某些節點上,另外主從關係也儘可能均衡這樣每一個幾點都會擔任必定比例的分區的leader.

優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點做爲「controller」,當發現有節點down掉的時候它負責在游泳分區的全部節點中選擇新的leader,這使得Kafka能夠批量的高效的管理全部分區節點的主從關係。若是controller down掉了,活着的節點中的一個會備切換爲新的controller.

1六、消息格式

(1)消息格式

消息由一個固定長度的頭部和可變長度的字節數組組成。頭部包含了一個版本號和CRC32校驗碼。

/**

* 具備N個字節的消息的格式以下

*

* 若是版本號是0

*

* 1. 1個字節的 "magic" 標記

*

* 2. 4個字節的CRC32校驗碼

*

* 3. N - 5個字節的具體信息

*

* 若是版本號是1

*

* 1. 1個字節的 "magic" 標記

*

* 2.1個字節的參數容許標註一些附加的信息好比是否壓縮了,解碼類型等

*

* 3.4個字節的CRC32校驗碼

*

* 4. N - 6 個字節的具體信息

*

*/

(2)日誌

一個叫作「my_topic」且有兩個分區的的topic,它的日誌有兩個文件夾組成,my_topic_0和my_topic_1,每一個文件夾裏放着具體的數據文件,每一個數據文件都是一系列的日誌實體,每一個日誌實體有一個4個字節的整數N標註消息的長度,後邊跟着N個字節的消息。每一個消息均可以由一個64位的整數offset標註,offset標註了這條消息在發送到這個分區的消息流中的起始位置。每一個日誌文件的名稱都是這個文件第一條日誌的offset.因此第一個日誌文件的名字就是00000000000.kafka.因此每相鄰的兩個文件名字的差就是一個數字S,S差很少就是配置文件中指定的日誌文件的最大容量。

消息的格式都由一個統一的接口維護,因此消息能夠在producer,broker和consumer之間無縫的傳遞。存儲在硬盤上的消息格式以下所示:

消息長度: 4 bytes (value: 1+4+n)

版本號: 1 byte

CRC校驗碼: 4 bytes

具體的消息: n bytes

(3)寫操做

消息被不斷的追加到最後一個日誌的末尾,當日志的大小達到一個指定的值時就會產生一個新的文件。對於寫操做有兩個參數,一個規定了消息的數量達到這個值時必須將數據刷新到硬盤上,另一個規定了刷新到硬盤的時間間隔,這對數據的持久性是個保證,在系統崩潰的時候只會丟失必定數量的消息或者一個時間段的消息。

(4)讀操做

讀操做須要兩個參數:一個64位的offset和一個S字節的最大讀取量。S一般比單個消息的大小要大,但在一些個別消息比較大的狀況下,S會小於單個消息的大小。這種狀況下讀操做會不斷重試,每次重試都會將讀取量加倍,直到讀取到一個完整的消息。能夠配置單個消息的最大值,這樣服務器就會拒絕大小超過這個值的消息。也能夠給客戶端指定一個嘗試讀取的最大上限,避免爲了讀到一個完整的消息而無限次的重試。

在實際執行讀取操縱時,首先須要定位數據所在的日誌文件,而後根據offset計算出在這個日誌中的offset(前面的的offset是整個分區的offset),而後在這個offset的位置進行讀取。定位操做是由二分查找法完成的,Kafka在內存中爲每一個文件維護了offset的範圍。

下面是發送給consumer的結果的格式:

MessageSetSend (fetch result)

total length     : 4 bytes

error code       : 2 bytes

message 1        : x bytes

...

message n        : x bytes

MultiMessageSetSend (multiFetch result)



total length       : 4 bytes

error code         : 2 bytes

messageSetSend 1

...

messageSetSend n

(5)刪除

日誌管理器容許定製刪除策略。目前的策略是刪除修改時間在N天以前的日誌(按時間刪除),也可使用另一個策略:保留最後的N GB數據的策略(按大小刪除)。爲了不在刪除時阻塞讀操做,採用了copy-on-write形式的實現,刪除操做進行時,讀取操做的二分查找功能實際是在一個靜態的快照副本上進行的,這相似於Java的CopyOnWriteArrayList。

(6)可靠性保證

日誌文件有一個可配置的參數M,緩存超過這個數量的消息將被強行刷新到硬盤。一個日誌矯正線程將循環檢查最新的日誌文件中的消息確認每一個消息都是合法的。合法的標準爲:全部文件的大小的和最大的offset小於日誌文件的大小,而且消息的CRC32校驗碼與存儲在消息實體中的校驗碼一致。若是在某個offset發現不合法的消息,從這個offset到下一個合法的offset之間的內容將被移除。

有兩種狀況必須考慮:1,當發生崩潰時有些數據塊未能寫入。2,寫入了一些空白數據塊。第二種狀況的緣由是,對於每一個文件,操做系統都有一個inode(inode是指在許多「類Unix文件系統」中的一種數據結構。每一個inode保存了文件系統中的一個文件系統對象,包括文件、目錄、大小、設備文件、socket、管道, 等等),但沒法保證更新inode和寫入數據的順序,當inode保存的大小信息被更新了,但寫入數據時發生了崩潰,就產生了空白數據塊。CRC校驗碼能夠檢查這些塊並移除,固然由於崩潰而未寫入的數據塊也就丟失了

2、配置文件

(一)java調優

特別說明一下JVM配置 在bin/kafka-server-start.sh中添加如下內容:

export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"

官方的推薦使用G1GC,但感受還不穩定,仍是先用CMS算了。如下爲官方推薦內容

-Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35

For reference, here are the stats on one of LinkedIn's busiest clusters (at peak): - 15 brokers - 15.5k partitions (replication factor 2) - 400k messages/sec in - 70 MB/sec inbound, 400 MB/sec+ outbound The tuning looks fairly aggressive, but all of the brokers in that cluster have a 90% GC pause time of about 21ms, and they're doing less than 1 young GC per second.

(二)參數說明

kafka中有不少的配置參數,大體能夠分爲如下4類:

Broker Configs
 Consumer Configs
 Producer Configs
 New Producer Configs

如下僅對部分重要參數說明並不斷完善,所有的參數說明請參考http://kafka.apache.org/documentation.html#consumerconfigs

broker中的配置只有3個參數是必須提供的:broker.id,log,dir, zookeeper.connect.

一、broker.id=0 用於區分broker,確保每臺機器不一樣,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息狀況

二、log.dirs=/home/data/kafka kafka用於放置消息的目錄,默認爲/tmp/kafka-logs。它能夠是以逗號分隔的多個目錄,建立新分區時,默認會選擇存在最少分區的目錄。

三、zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka zk用於放置kafka信息的地方。注意通常狀況下,直接使用192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181便可,此時kafka的相關信息會放在zk的根目錄下,但若是這個zk集羣同時爲多個kafka集羣,或者其它集羣服務,則信息會很混亂,甚至有衝突。所以通常會建一個目錄用於放置kafka集羣信息的目錄,此處的目錄爲/kafka。注意,這個目錄必須手工建立,kafka不會自動建立這個目錄。此外,在conusmer中也必須使用192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka來讀取topic內容。

四、num.partitions=1 建立topic時,默認的分區數

五、num.network.threads=10 broker用於處理網絡請求的線程數,如不配置默認爲3

六、zookeeper.connection.timeout.ms=6000

七、message.max.bytes=1000000000

replica.fetch.max.bytes=1073741824

一條消息的最大字節數,說明以下:

kafka中出現如下異常:

[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.

緣由是集羣默認每次只能接受約1M的消息,若是客戶端一次發送的消息大於這個數值則會致使異常。
在server.properties中添加如下參數

message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824

同時在consumer.properties中添加如下參數:

fetch.message.max.bytes=1073741824

而後重啓kafka進程便可,如今每次最大可接收100M的消息。

八、delete.topic.enable=true 默認爲false,即delete topic時只是marked for deletion,但並不會真正刪除topic。

九、關於日誌的保存時間或量:
(1)log.retention.hours=24 消息被刪除前保存多少小時,默認1周168小時
(2)log.retention.bytes 默認爲-1,即不限制大小。注意此外的大小是指一個topic的一個分區的最大字節數。
當超出上述2個限制的任何一個時,日誌均會被刪除。

也能夠在topic級別定義這個參數:

retention.bytes=3298534883328   #3T
retention.bytes與retention.ms

十、同步發送仍是異步發送,異步吞吐量較大,但可能引入錯誤,默認爲sync
producer.type=sync|async
This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.

十一、batch.size 默認值爲16384
在async模式下,producer緩存多少個消息後再一塊兒發送

十二、compression.type 默認值爲none,可選gzip snappy
The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, or snappy. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).

1三、default.replication.factor 消息副本的數量,默認爲1,即沒有副本

還有一些須要關注的配置項:
Replication configurations
用於follower從leader複製消息的線程數,默認爲1
num.replica.fetchers=4
follower每次從leader複製消息的字節數,默認爲1M,即1024*1024
replica.fetch.max.bytes=1048576
當follow向leader發送數據請求後,最大的等待時長,默認爲500ms replica.fetch.wait.max.ms=500
每隔多久,follower會將其複製的highwater寫到磁盤中,以便出錯時恢復。 replica.high.watermark.checkpoint.interval.ms=5000
follower與leader之間的time out時長,默認爲30秒 replica.socket.timeout.ms=30000
socket每次的buffer字節數 replica.socket.receive.buffer.bytes=65536
若是一個follower在這段時長內都沒有向leader發出複製請求,則leader會認爲其已經down掉,並從ISR中去掉。
replica.lag.time.max.ms=10000
若是一個follower比leader落後超過這個數據的消息數,則leader會將其從isr中去掉。 replica.lag.max.messages=4000 partition management controller 與replica之間的超時時長 controller.socket.timeout.ms=30000
The buffer size for controller-to-broker-channels
controller.message.queue.size=10
Log configuration
若是在建立topic時沒有指定分區大小,默認的分區大小以下 num.partitions=8
kafka集羣能夠接收的最大消息字節數,默認爲1M.注意,若是增大了這個數值,在consumer中也必須增大這個數值,不然consumer將沒法消費這個消息。
message.max.bytes=1000000
當向一個不存在的topic發送消息時,是否容許自動建立topic auto.create.topics.enable=true
kafka保存多久的數據,單位是小時
log.retention.hours=72
The number of messages written to a log partition before we force an fsync on the log. Setting this lower will sync data to disk more
often but will have a major impact on performance. We generally recommend that people make use of replication for durability rather
than depending on single-server fsync, however this setting can be used to be extra certain.下面2個值默認都是Long.MaxValue。
log.flush.interval.ms=10000 log.flush.interval.messages=20000 log.flush.scheduler.interval.ms=2000 log.roll.hours=168 log.retention.check.interval.ms=300000 log.segment.bytes=1073741824 # ZK configuration zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 # Socket server configuration
執行請求的線程數,至少與你的磁盤數量相同。 num.io.threads=8
服務器用於處理網絡請求的線程數,通常不須要更改,默認爲3. num.network.threads=8
服務器容許最大的請求大小。它能夠預防out of memory,並且應該小於java 堆大小。 socket.request.max.bytes=104857600 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=16 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100

3、錯誤處理

一、配置kafka時,若是使用zookeeper create /kafka建立了節點,kafka與storm集成時new ZkHosts(zks) 須要改爲 new ZkHosts(zks,」/kafka/brokers」),否則會報

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。

storm-kafka插件默認kafka的 zk_path以下:

public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = 「/brokers」;

二、若是出現如下問題,表明偏移量出錯,建議從新開一個topic

ERROR [KafkaApi-3] Error when processing fetch request for partition [xxxxx,1] offset 112394 from consumer with correlation id 0 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 112394 but we only have log segments in the range 0 to 665.

三、當沒有某個topic,或者是某個topic的node放置不在默認位置時,會有如下異常:

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/mytest/partitions at storm.kafka.Dynam

四、kafka中出現如下異常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
緣由是集羣默認每次只能接受約1M的消息,若是客戶端一次發送的消息大於這個數值則會致使異常。
在server.properties中添加如下參數

message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824

同時在consumer.properties中添加如下參數:

fetch.message.max.bytes=1073741824
``
而後重啓kafka進程便可,如今每次最大可接收100M的消息。
 
五、open too many files
kafka出現異常,日誌提示open too many file
查找文件打開數量
lsof -p 30353 | wc
若是在1000以上,通常都是不正常,走過65535就會出錯。
緣由打開了太多producer,沒關閉,調用producer.close()便可。
 

#4、zookeeper中的內容
默認狀況,kafka在zk的/brokers目錄下記錄topic相關的信息,但若是在建立topic時,指定了路徑,則放置到固定的路徑中,如:

bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test_topic

建立的topic,其相關信息會放置到/kafka/brokers中,這個目錄中主要包括2個子目錄:ids 和 topics
一、ids:記錄這個kafka集羣中有多少個broker
如:
 
ls /kafka/brokers/ids/
3   2   5   4
 
這個集羣有4個節點,節點id分別爲2,3,4,5。 咱們看一下內容

[zk: localhost:2181(CONNECTED) 27] get /kafka/brokers/ids/2
{"jmx_port":-1,"timestamp":"1435833841290","host":"kafka02-log.i.nease.net","version":1,"port":9092}
cZxid = 0x1000e8a68
ctime = Thu Jul 02 18:44:01 HKT 2015
mZxid = 0x1000e8a68
mtime = Thu Jul 02 18:44:01 HKT 2015
pZxid = 0x1000e8a68
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x44e440d0bdf06eb
dataLength = 104
numChildren = 0

記錄着這個節點的一些基本狀況。

 
二、topics
先看一下有哪些內容:

[zk: localhost:2181(CONNECTED) 29] ls /kafka/brokers/topics/test30/partitions
[3, 2, 1, 0, 4]
[zk: localhost:2181(CONNECTED) 30] ls /kafka/brokers/topics/test30/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 1] get /kafka/brokers/topics/test30/partitions/0/state
{"controller_epoch":4,"leader":5,"version":1,"leader_epoch":2,"isr":[5]}
cZxid = 0x100017c5e
ctime = Wed Jul 01 14:54:24 HKT 2015
mZxid = 0x1000e8a84
mtime = Thu Jul 02 18:44:01 HKT 2015
pZxid = 0x100017c5e
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

能夠看某個分區的leader是哪一個,從而讀取kafka消息時,能夠從這個leader中讀取數據。


如下內容來自官方文檔:

下面給出了zk中用於保存consumber與brokers相關信息的目錄結構與算法介紹。

關於目錄結構的前提說明:默認狀況下,kafka相關的信息放在zk根目錄下的某個路徑中,但也能夠設置爲單獨的路徑,設置方法見配置選項部分。在咱們的集羣中,咱們創建了一個目錄/kafka做爲全部kafka相關信息的保存位置。所以咱們在這裏所列的/kafka/xyz,對於默認狀況應該是/xyz。

broker節點的註冊

[zk: localhost:2181(CONNECTED) 140] get /kafka/brokers/ids/2
{"jmx_port":-1,"timestamp":"1437460315901","host":"gdc-kafka02-log.i.nease.net","version":1,"port":9092}

在zk中,有一個broker節點的列表,列表中的每一項表示一個邏輯broker。在啓動時,broker節點會在zk中的/kafka/broker/ids/目錄下建立一個znode,名稱爲配置文件中定義的broker id,如上面所示的/kafka/brokers/ids/2。創建邏輯broker id的目的是容許一個broker節點遷移到另外一臺機器上,而不會影響到consumer的消費。若是想註冊一個已經存在的broker id會引發錯誤(好比說有2個broker的配置文件都寫了同一個broker id)。

因爲broker在zk中註冊的是一個ephemeral znodes,所以當這個broker關機或者掛掉的時候,這個註冊信息會自動刪除,從而會通知consumer這個節點已經不可用。

Topic註冊

ls /kafka/brokers/topics/testtopic/partitions/

3 2 1 0 4

get /kafka/brokers/topics/testtopic/partitions/0/state

{"controller_epoch":9,"leader":5,"version":1,"leader_epoch":26,"isr":[5]}

每一個topic都會在zk中註冊,如上面的testtopic有5個分區。

consumer與consumer組
爲了彼此協調以及平衡數據的消費,consumer也會在zk中註冊信息。經過設置offsets.storage=zookeeper,能夠將consumer的offset保存在zk中,不過這種作法會被逐步淘汰。如今推薦使用kafka做爲offset的保存。

一個組內的consumer能夠共同消費一個topic,它們擁有同一個group_id。組內的consumer會盡量公平的將topic的分區切分。

consumer id註冊
每個consumer都會在zk註冊信息,如:

get /kafka/consumers/console-consumer-30094/ids/console-consumer-30094_gdc-kafka03-log.i.nease.net-1437029151314-d7cdc855
{"version":1,"subscription":{"streaming_ma30_sdc":1},"pattern":"white_list","timestamp":"1437459282749"}

consumer offset
conusumer會根據它已經消費的最大的offset,默唸會存儲在zk的目錄下(也能夠設置爲kafka)。

get /kafka/consumers/testtopic/offsets/testtopic/0
1413950858

注意這是一個永久節點,所以當consumer掛掉重啓時能夠繼續讀取。

分區owner註冊
每個broker分區會官能一個consumer組裏的一個consumer消費,這個consumer必須創建它對這個分區的佔有(ownership),再開始消費。爲了創建這個佔有關係,consumer會在zk中創建相關的信息。

/kafka/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)

```

相關文章
相關標籤/搜索