1、kafka基礎介紹html
0. kakfa概述java
Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica)開源消息系統,由Scala寫成,是由Apache軟件基金會開發的一個開源消息系統項目,該項目的目標是爲處理實時數據提供一個統1、高通量、低等待的平臺。kafka基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源 項目。node
kafka是一個分佈式消息隊列:生產者、消費者的功能。它提供了相似於JMS的特性,可是在設計實現上徹底不一樣,此外它並非JMS規範的實現。Kafka對消息保存時根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)成爲broker。不管是kafka集羣,仍是producer和consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性linux
kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。nginx
消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一,Kafka能夠實現高效文件存儲,實際應用效果極好。git
1. kafka名詞解釋(架構的四個部分)github
Producer客戶端負責消息的分發web
kafka集羣中的任何一個broker均可以向producer提供metadata信息,這些metadata中包含"集羣中存活的servers列表"、"partitions leader列表"等信息;算法
當producer獲取到metadata信息以後, producer將會和Topic下全部partition leader保持socket鏈接;數據庫
消息由producer直接經過socket發送到broker,中間不會通過任何"路由層"。事實上,消息被路由到哪一個partition上由producer客戶端決定,好比能夠採用"random""key-hash""輪詢"等。
若是一個topic中有多個partitions,那麼在producer端實現"消息均衡分發"是必要的。
在producer端的配置文件中,開發者能夠指定partition路由的方式。
Producer消息發送的應答機制
設置發送數據是否須要服務端的反饋,有三個值0,1,-1
0: producer不會等待broker發送ack
1: 當leader接收到消息以後發送ack
-1: 當全部的follower都同步消息成功後發送ack
request.required.acks=0
kafka只支持Topic
每一個group中能夠有多個consumer,每一個consumer屬於一個consumer group;一般狀況下,一個group中會包含多個consumer,這樣不只能夠提升topic中消息的併發消費能力,並且還能提升"故障容錯"性,若是group中的某個consumer失效那麼其消費的partitions將會有其餘consumer自動接管。
對於Topic中的一條特定的消息,只會被訂閱此Topic的每一個group中的其中一個consumer消費,此消息不會發送給一個group的多個consumer;那麼一個group中全部的consumer將會交錯的消費整個Topic,每一個group中consumer消息消費互相獨立,咱們能夠認爲一個group是一個"訂閱"者。
在kafka中,一個partition中的消息只會被group中的一個consumer消費(同一時刻);
一個Topic中的每一個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer能夠同時消費多個partitions中的消息。
kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,不然將意味着某些consumer將沒法獲得消息。
kafka只能保證一個partition中的消息被某個consumer消費時是順序的;事實上,從Topic角度來講,當有多個partitions時,消息仍不是全局有序的。
注意:分佈式系統中,對於同一個消費者建議加分佈式鎖,避免重複消費
2. Kafka的特性(優點)
- 高性能:kafka每秒能夠處理幾十萬條消息,它的延遲最低只有幾毫秒,每一個topic能夠分多個partition, consumer group 對partition進行consume操做。Kafka在數據發佈和訂閱過程當中都能保證數據的高吞吐量。即使在TB級數據存儲的狀況下,仍然能保證穩定的性能。即高吞吐量,低延遲!
- 可擴展性:kafka集羣支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁盤,而且支持數據備份防止數據丟失。Kafka是一個具備分區機制、副本機制和容錯機制的分佈式消息系統
- 容錯性:容許集羣中節點失敗(若副本數量爲n,則容許n-1個節點失敗)
- 高併發:支持數千個客戶端同時讀寫
3. kafka有四個核心API
- 應用程序使用producer API發佈消息到1個或多個topic中。
- 應用程序使用consumer API來訂閱一個或多個topic,並處理產生的消息。
- 應用程序使用streams API充當一個流處理器,從1個或多個topic消費輸入流,併產生一個輸出流到1個或多個topic,有效地將輸入流轉換到輸出流。
- connector API容許構建或運行可重複使用的生產者或消費者,將topic連接到現有的應用程序或數據系統。
4. kafka基本原理
一般來說,消息模型能夠分爲兩種:隊列和發佈-訂閱式。隊列的處理方式是一組消費者從服務器讀取消息,一條消息只有其中的一個消費者來處理。在發佈-訂閱模型中,消息被廣播給全部的消費者,接收到消息的消費者均可以處理此消息。Kafka爲這兩種模型提供了單一的消費者抽象模型: 消費者組(consumer group)。消費者用一個消費者組名標記本身。
一個發佈在Topic上消息被分發給此消費者組中的一個消費者。假如全部的消費者都在一個組中,那麼這就變成了queue模型。假如全部的消費者都在不一樣的組中,那麼就徹底變成了發佈-訂閱模型。更通用的, 咱們能夠建立一些消費者組做爲邏輯上的訂閱者。每一個組包含數目不等的消費者,一個組內多個消費者能夠用來擴展性能和容錯。
而且,kafka可以保證生產者發送到一個特定的Topic的分區上,消息將會按照它們發送的順序依次加入,也就是說,若是一個消息M1和M2使用相同的producer發送,M1先發送,那麼M1將比M2的offset低,而且優先的出如今日誌中。消費者收到的消息也是此順序。若是一個Topic配置了複製因子(replication facto)爲N,那麼能夠容許N-1服務器宕機而不丟失任何已經提交(committed)的消息。此特性說明kafka有比傳統的消息系統更強的順序保證。可是,相同的消費者組中不能有比分區更多的消費者,不然多出的消費者一直處於空等待,不會收到消息。
5. kafka應用場景
- 日誌收集:一個公司能夠用Kafka能夠收集各類服務的log,經過kafka以統一接口服務的方式開放給各類consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic來作實時的監控分析,或者裝載到hadoop、數據倉庫中作離線分析和挖掘。
- 運營指標:Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。
- 流式處理:好比spark streaming和storm
- 事件源
- 構建實時的流數據管道,可靠地獲取系統和應用程序之間的數據。
- 構建實時流的應用程序,對數據流進行轉換或反應。
6. 主題和日誌 (Topic和Log)
每個分區(partition)都是一個順序的、不可變的消息隊列,而且能夠持續的添加。分區中的消息都被分了一個序列號,稱之爲偏移量(offset),在每一個分區中此偏移量都是惟一的。Kafka集羣保持全部的消息,直到它們過時,不管消息是否被消費了。實際上消費者所持有的僅有的元數據就是這個偏移量,也就是消費者在這個log中的位置。 這個偏移量由消費者控制:正常狀況當消費者消費消息的時候,偏移量也線性的的增長。可是實際偏移量由消費者控制,消費者能夠將偏移量重置爲更老的一個偏移量,從新讀取消息。 能夠看到這種設計對消費者來講操做自如, 一個消費者的操做不會影響其它消費者對此log的處理。 再說說分區。Kafka中採用分區的設計有幾個目的。一是能夠處理更多的消息,不受單臺服務器的限制。Topic擁有多個分區意味着它能夠不受限的處理更多的數據。第二,分區能夠做爲並行處理的單元,稍後會談到這一點。
7. 分佈式(Distribution)
Log的分區被分佈到集羣中的多個服務器上。每一個服務器處理它分到的分區。根據配置每一個分區還能夠複製到其它服務器做爲備份容錯。 每一個分區有一個leader,零或多個follower。Leader處理此分區的全部的讀寫請求,而follower被動的複製數據。若是leader宕機,其它的一個follower會被推舉爲新的leader。 一臺服務器可能同時是一個分區的leader,另外一個分區的follower。 這樣能夠平衡負載,避免全部的請求都只讓一臺或者某幾臺服務器處理。
8. Kakfa Broker Leader的選舉
Kakfa Broker集羣受Zookeeper管理。全部的Kafka Broker節點一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper註冊Watch)。這個Controller會監聽其餘的Kafka Broker的全部信息,若是這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時全部的kafka broker又會一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上全部的partition在zookeeper上的狀態,並選取ISR列表中的一個replica做爲partition leader(若是ISR列表中的replica全掛,選一個倖存的replica做爲leader; 若是該partition的全部的replica都宕機了,則將新的leader設置爲-1,等待恢復,等待ISR中的任一個Replica"活"過來,而且選它做爲Leader;或選擇第一個"活"過來的Replica(不必定是ISR中的)做爲Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其餘的kafka broker。
順便說下曾經發生過的一個bug:TalkingData使用Kafka0.8.1的時候,kafka controller在Zookeeper上註冊成功後,它和Zookeeper通訊的timeout時間是6s,也就是若是kafka controller若是有6s中沒有和Zookeeper作心跳,那麼Zookeeper就認爲這個kafka controller已經死了,就會在Zookeeper上把這個臨時節點刪掉,那麼其餘Kafka就會認爲controller已經沒了,就會再次搶着註冊臨時節點,註冊成功的那個kafka broker成爲controller,而後,以前的那個kafka controller就須要各類shut down去關閉各類節點和事件的監聽。可是當kafka的讀寫流量都很是巨大的時候,TalkingData的一個bug是,因爲網絡等緣由,kafka controller和Zookeeper有6s中沒有通訊,因而從新選舉出了一個新的kafka controller,可是原來的controller在shut down的時候老是不成功,這個時候producer進來的message因爲Kafka集羣中存在兩個kafka controller而沒法落地。致使數據淤積。
這裏曾經還有一個bug,TalkingData使用Kafka0.8.1的時候,當ack=0的時候,表示producer發送出去message,只要對應的kafka broker topic partition leader接收到的這條message,producer就返回成功,無論partition leader 是否真的成功把message真正存到kafka。當ack=1的時候,表示producer發送出去message,同步的把message存到對應topic的partition的leader上,而後producer就返回成功,partition leader異步的把message同步到其餘partition replica上。當ack=all或-1,表示producer發送出去message,同步的把message存到對應topic的partition的leader和對應的replica上以後,才返回成功。可是若是某個kafka controller 切換的時候,會致使partition leader的切換(老的 kafka controller上面的partition leader會選舉到其餘的kafka broker上),可是這樣就會致使丟數據。
9. Topic & Partition
Topic至關於傳統消息系統MQ中的一個隊列queue,producer端發送的message必須指定是發送到哪一個topic,可是不須要指定topic下的哪一個partition,由於kafka會把收到的message進行load balance,均勻的分佈在這個topic下的不一樣的partition上( hash(message) % [broker數量] )。物理上存儲上,這個topic會分紅一個或多個partition,每一個partiton至關因而一個子queue。在物理結構上,每一個partition對應一個物理的目錄(文件夾),文件夾命名是[topicname]_[partition]_[序號],一個topic能夠有無數多的partition,根據業務需求和數據量來設置。在kafka配置文件中可隨時更高num.partitions參數來配置更改topic的partition數量,在建立Topic時經過參數指定parittion數量。Topic建立以後經過Kafka提供的工具也能夠修改partiton數量。
通常來講,a)一個Topic的Partition數量大於等於Broker的數量,能夠提升吞吐率;b)同一個Partition的Replica儘可能分散到不一樣的機器,高可用。
當add a new partition的時候,partition裏面的message不會從新進行分配,原來的partition裏面的message數據不會變,新加的這個partition剛開始是空的,隨後進入這個topic的message就會從新參與全部partition的load balance
10. Partition Replica
每一個partition能夠在其餘的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集羣。存replica副本的方式是按照kafka broker的順序存。例若有5個kafka broker節點,某個topic有3個partition,每一個partition存2個副本,那麼partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replica副本數目不能大於kafka broker節點的數目,不然報錯。這裏的replica數其實就是partition的副本總數,其中包括一個leader,其餘的就是copy副本)。這樣若是某個broker宕機,其實整個kafka內數據依然是完整的。可是,replica副本數越高,系統雖然越穩定,可是回來帶資源和性能上的降低;replica副本少的話,也會形成系統丟數據的風險。
a)怎樣傳送消息:producer先把message發送到partition leader,再由leader發送給其餘partition follower。(若是讓producer發送給每一個replica那就太慢了)
b) 在向Producer發送ACK前須要保證有多少個Replica已經收到該消息:根據ack配的個數而定
c) 怎樣處理某個Replica不工做的狀況:若是這個部工做的partition replica不在ack列表中,就是producer在發送消息到partition leader上,partition leader向partition follower發送message沒有響應而已,這個不會影響整個系統,也不會有什麼問題。若是這個不工做的partition replica在ack列表中的話,producer發送的message的時候會等待這個不工做的partition replca寫message成功,可是會等到time out,而後返回失敗由於某個ack列表中的partition replica沒有響應,此時kafka會自動的把這個部工做的partition replica從ack列表中移除,之後的producer發送message的時候就不會有這個ack列表下的這個部工做的partition replica了。
d)怎樣處理Failed Replica恢復回來的狀況:若是這個partition replica以前不在ack列表中,那麼啓動後從新受Zookeeper管理便可,以後producer發送message的時候,partition leader會繼續發送message到這個partition follower上。若是這個partition replica以前在ack列表中,此時重啓後,須要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的,出現某個部工做的partition replica的時候自動從ack列表中移除的)
11. Partition leader與follower
partition也有leader和follower之分。leader是主partition,producer寫kafka的時候先寫partition leader,再由partition leader push給其餘的partition follower。partition leader與follower的信息受Zookeeper控制,一旦partition leader所在的broker節點宕機,zookeeper會衝其餘的broker的partition follower上選擇follower變爲parition leader。
12. 消息投遞可靠性
一個消息如何算投遞成功,Kafka提供了三種模式:
- 第一種是啥都無論,發送出去就看成成功,這種狀況固然不能保證消息成功投遞到broker;
- 第二種是Master-Slave模型,只有當Master和全部Slave都接收到消息時,纔算投遞成功,這種模型提供了最高的投遞可靠性,可是損傷了性能;
- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數狀況下都會中和可靠性和性能選擇第三種模型
消息在broker上的可靠性,由於消息會持久化到磁盤上,因此若是正常stop一個broker,其上的數據不會丟失;可是若是不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這能夠經過配置flush頁面緩存的週期、閾值緩解,可是一樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際狀況配置。
消息消費的可靠性,Kafka提供的是「At least once」模型,由於消息的讀取進度由offset提供,offset能夠由消費者本身維護也能夠維護在zookeeper裏,可是當消息消費後consumer掛掉,offset沒有即時寫回,就有可能發生重複讀的狀況,這種狀況一樣能夠經過調整commit offset週期、閾值緩解,甚至消費者本身把消費和commit offset作成一個事務解決,可是若是你的應用不在意重複消費,那就乾脆不要解決,以換取最大的性能
13. kafka相關特性說明
- message狀態:在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪一個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着若是consumer處理很差的話,broker上的一個消息可能會被消費屢次。
- message持久化:Kafka中會把消息持久化到本地文件系統中,而且保持o(1)極高的效率。咱們衆所周知IO讀取是很是耗資源的性能也是最慢的,這就是爲了數據庫的瓶頸常常在IO上,須要換SSD硬盤的緣由。可是Kafka做爲吞吐量極高的MQ,卻能夠很是高效的message持久化到文件。這是由於Kafka是順序寫入o(1)的時間複雜度,速度很是快。也是高吞吐量的緣由。因爲message的寫入持久化是順序寫入的,所以message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。通常的機器,單機每秒100k條數據。
- message有效期:Kafka會長久保留其中的消息,以便consumer能夠屢次消費,固然其中不少細節是可配置的。
- Produer : Producer向Topic發送message,不須要指定partition,直接發送就行了。kafka經過partition ack來控制是否發送成功並把信息返回給producer,producer能夠有任意多的thread,這些kafka服務器端是不care的。Producer端的delivery guarantee默認是At least once的。也能夠設置Producer異步發送實現At most once。Producer能夠用主鍵冪等性實現Exactly once
- Kafka高吞吐量: Kafka的高吞吐量體如今讀寫上,分佈式併發的讀和寫都很是快,寫的性能體如今以o(1)的時間複雜度進行順序寫入。讀的性能體如今以o(1)的時間複雜度進行順序讀取, 對topic進行partition分區,consume group中的consume線程能夠以很高能性能進行順序讀。
- Kafka delivery guarantee(message傳送保證):(1)At most once消息可能會丟,絕對不會重複傳輸;(2)At least once 消息絕對不會丟,可是可能會重複傳輸;(3)Exactly once每條信息確定會被傳輸一次且僅傳輸一次,這是用戶想要的。
- 批量發送:Kafka支持以消息集合爲單位進行批量發送,以提升push效率。
- push-and-pull : Kafka中的Producer和consumer採用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,二者對消息的生產和消費是異步的。
- Kafka集羣中broker之間的關係:不是主從關係,各個broker在集羣中地位同樣,咱們能夠隨意的增長或刪除任何一個broker節點。
- 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對於0.7.x主要靠zookeeper來實現負載均衡)。
- 同步異步:Producer採用異步push方式,極大提升Kafka系統的吞吐率(能夠經過參數控制是採用同步仍是異步方式)。
- 分區機制partition:Kafka的broker端支持消息分區partition,Producer能夠決定把消息發到哪一個partition,在一個partition 中message的順序就是Producer發送消息的順序,一個topic中能夠有多個partition,具體partition的數量是可配置的。partition的概念使得kafka做爲MQ能夠橫向擴展,吞吐量巨大。partition能夠設置replica副本,replica副本存在不一樣的kafka broker節點上,第一個partition是leader,其餘的是follower,message先寫到partition leader上,再由partition leader push到parition follower上。因此說kafka能夠水平擴展,也就是擴展partition。
- 離線數據裝載:Kafka因爲對可拓展的數據持久化的支持,它也很是適合向Hadoop或者數據倉庫中進行數據裝載。
- 實時數據與離線數據:kafka既支持離線數據也支持實時數據,由於kafka的message持久化到文件,並能夠設置有效期,所以能夠把kafka做爲一個高效的存儲來使用,能夠做爲離線數據供後面的分析。固然做爲分佈式實時消息系統,大多數狀況下仍是用於實時的數據處理的,可是當cosumer消費能力降低的時候能夠經過message的持久化在淤積數據在kafka。
- 插件支持:如今很多活躍的社區已經開發出很多插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。
- 解耦: 至關於一個MQ,使得Producer和Consumer之間異步的操做,系統之間解耦。
- 冗餘: replica有多個副本,保證一個broker node宕機後不會影響整個服務。
- 擴展性: broker節點能夠水平擴展,partition也能夠水平增長,partition replica也能夠水平增長。
- 峯值: 在訪問量劇增的狀況下,kafka水平擴展, 應用仍然須要繼續發揮做用。
- 可恢復性: 系統的一部分組件失效時,因爲有partition的replica副本,不會影響到整個系統。
- 順序保證性:因爲kafka的producer的寫message與consumer去讀message都是順序的讀寫,保證了高效的性能。
- 緩衝:因爲producer那面可能業務很簡單,然後端consumer業務會很複雜並有數據庫的操做,所以確定是producer會比consumer處理速度快,若是沒有kafka,producer直接調用consumer,那麼就會形成整個系統的處理速度慢,加一層kafka做爲MQ,能夠起到緩衝的做用。
- 異步通訊:做爲MQ,Producer與Consumer異步通訊。
kafka部分名稱解釋
Kafka中發佈訂閱的對象是topic。咱們能夠爲每類數據建立一個topic,把向topic發佈消息的客戶端稱做producer,從topic訂閱消息的客戶端稱做consumer。Producers和consumers能夠同時從多個topic讀寫數據。一個kafka集羣由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。
- Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker能夠組成一個Kafka集羣。
- Topic:一類消息,消息存放的目錄即主題,例如page view日誌、click日誌等均可以以topic的形式存在,Kafka集羣可以同時負責多個topic的分發。
- Partition:topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。
- Segment:partition物理上由多個segment組成,每一個Segment存着message信息。
- Producer : 生產message發送到topic。
- Consumer : 訂閱topic消費message, consumer做爲一個線程來消費。
- Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)能夠組成一個組(Consumer group ),partition中的每一個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,若是一個message能夠被多個consumer(consumer 線程 ) 消費的話,那麼這些consumer必須在不一樣的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理,即使是來自不一樣的consumer group的也不行。它不能像AMQ那樣能夠多個BET做爲consumer去處理message,這是由於多個BET去消費一個Queue中的數據的時候,因爲要保證不能多個線程拿同一條message,因此就須要行級別悲觀所(for update),這就致使了consume的性能降低,吞吐量不夠。而kafka爲了保證吞吐量,只容許一個consumer線程去訪問一個partition。若是以爲效率不高的時候,能夠加partition的數量來橫向擴展,那麼再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就造成了分佈式消費的概念。
14. kafka一些原理概念
持久化
kafka使用文件存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴文件系統的自己特性.且不管任何OS下,對文件系統自己的優化是很是艱難的.文件緩存/直接內存映射等是經常使用的手段.由於kafka是對日誌文件進行append操做,所以磁盤檢索的開支是較小的;同時爲了減小磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到必定閥值時,再flush到磁盤,這樣減小了磁盤IO調用的次數.對於kafka而言,較高性能的磁盤,將會帶來更加直接的性能提高.
性能
除磁盤IO以外,咱們還須要考慮網絡IO,這直接關係到kafka的吞吐量問題.kafka並無提供太多高超的技巧;對於producer端,能夠將消息buffer起來,當消息的條數達到必定閥值時,批量發送給broker;對於consumer端也是同樣,批量fetch多條消息.不過消息量的大小能夠經過配置文件來指定.對於kafka broker端,彷佛有個sendfile系統調用能夠潛在的提高網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域便可,而無需進程再次copy和交換(這裏涉及到"磁盤IO數據"/"內核內存"/"進程內存"/"網絡緩衝區",多者之間的數據copy).
其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,所以啓用消息壓縮機制是一個良好的策略;壓縮須要消耗少許的CPU資源,不過對於kafka而言,網絡IO更應該須要考慮.能夠將任何在網絡上傳輸的消息都通過壓縮.kafka支持gzip/snappy等多種壓縮方式.
負載均衡
kafka集羣中的任何一個broker,均可以向producer提供metadata信息,這些metadata中包含"集羣中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節點信息). 當producer獲取到metadata信息以後, producer將會和Topic下全部partition leader保持socket鏈接;消息由producer直接經過socket發送到broker,中間不會通過任何"路由層".
異步發送,將多條消息暫且在客戶端buffer起來,並將他們批量發送到broker;小數據IO太多,會拖慢總體的網絡延遲,批量延遲發送事實上提高了網絡效率;不過這也有必定的隱患,好比當producer失效時,那些還沒有發送的消息將會丟失。
Topic模型
其餘JMS實現,消息消費的位置是有prodiver保留,以便避免重複發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker須要太多額外的工做.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有複雜的消息確認機制,可見kafka broker端是至關輕量級的.當消息被consumer接收以後,consumer能夠在本地保存最後消息的offset,並間歇性的向zookeeper註冊offset.因而可知,consumer客戶端也很輕量級。
kafka中consumer負責維護消息的消費記錄,而broker則不關心這些,這種設計不只提升了consumer端的靈活性,也適度的減輕了broker端設計的複雜度;這是和衆多JMS prodiver的區別.此外,kafka中消息ACK的設計也和JMS有很大不一樣,kafka中的消息是批量(一般以消息的條數或者chunk的尺寸爲單位)發送給consumer,當消息消費成功後,向zookeeper提交消息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬鬆"的設計,將會有"丟失"消息/"消息重發"的危險.
消息傳輸一致
Kafka提供3種消息傳輸一致性語義:最多1次,最少1次,剛好1次。
最少1次:可能會重傳數據,有可能出現數據被重複處理的狀況;
最多1次:可能會出現數據丟失狀況;
剛好1次:並非指真正只傳輸1次,只不過有一個機制。確保不會出現「數據被重複處理」和「數據丟失」的狀況。
at most once: 消費者fetch消息,而後保存offset,而後處理消息;當client保存offset以後,可是在消息處理過程當中consumer進程失效(crash),致使部分消息未能繼續處理.那麼此後可能其餘consumer會接管,可是由於offset已經提早保存,那麼新的consumer將不能fetch到offset以前的消息(儘管它們尚沒有被處理),這就是"at most once".
at least once: 消費者fetch消息,而後處理消息,而後保存offset.若是消息處理成功以後,可是在保存offset階段zookeeper異常或者consumer失效,致使保存offset操做未能執行成功,這就致使接下來再次fetch時可能得到上次已經處理過的消息,這就是"at least once".
"Kafka Cluster"到消費者的場景中能夠採起如下方案來獲得「剛好1次」的一致性語義:
最少1次+消費者的輸出中額外增長已處理消息最大編號:因爲已處理消息最大編號的存在,不會出現重複處理消息的狀況。
副本
kafka中,replication策略是基於partition,而不是topic;kafka將每一個partition數據複製到多個server上,任何一個partition有一個leader和多個follower(能夠沒有);備份的個數能夠經過broker配置文件來設定。leader處理全部的read-write請求,follower須要和leader保持同步.Follower就像一個"consumer",消費消息並保存在本地日誌中;leader負責跟蹤全部的follower狀態,若是follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當全部的follower都將一條消息保存成功,此消息才被認爲是"committed",那麼此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具備良好的網絡環境.即便只有一個replicas實例存活,仍然能夠保證消息的正常發送和接收,只要zookeeper集羣存活便可.
選擇follower時須要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,若是一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力.在選舉新leader,須要考慮到"負載均衡",partition leader較少的broker將會更有可能成爲新的leader.
log
每一個log entry格式爲"4個字節的數字N表示消息的長度" + "N個字節的消息內容";每一個日誌都有一個offset來惟一的標記一條消息,offset的值爲8個字節的數字,表示此消息在此partition中所處的起始位置..每一個partition在物理存儲層面,有多個log file組成(稱爲segment).segment file的命名爲"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
獲取消息時,須要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數).根據offset,能夠找到此消息所在segment文件,而後根據segment的最小offset取差值,獲得它在file中的相對位置,直接讀取輸出便可.
分佈式
kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變動並做出相應的動做(好比consumer失效,觸發負載均衡等)
Broker node registry: 當一個kafka broker啓動後,首先會向zookeeper註冊本身的節點信息(臨時znode),同時當broker和zookeeper斷開鏈接時,此znode也會被刪除.
Broker Topic Registry: 當一個broker啓動時,會向zookeeper註冊本身持有的topic和partitions信息,仍然是一個臨時znode.
Consumer and Consumer group: 每一個consumer客戶端被建立時,會向zookeeper註冊本身的信息;此做用主要是爲了"負載均衡".一個group中的多個consumer能夠交錯的消費一個topic的全部partitions;簡而言之,保證此topic的全部partitions都能被此group所消費,且消費時爲了性能考慮,讓partition相對均衡的分散到每一個consumer上.
Consumer id Registry: 每一個consumer都有一個惟一的ID(host:uuid,能夠經過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.
Consumer offset Tracking: 用來跟蹤每一個consumer目前所消費的partition中最大的offset.此znode爲持久節點,能夠看出offset跟group_id有關,以代表當group中一個消費者失效,其餘consumer能夠繼續消費.
Partition Owner registry: 用來標記partition正在被哪一個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那麼將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"遊離"的partitions)
當consumer啓動時,所觸發的操做:
A) 首先進行"Consumer id Registry";
B) 而後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其餘consumer的"leave"和"join";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管partitions).
C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance.
總結:
1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息.
2) Broker端使用zookeeper用來註冊broker信息,已經監測partition leader存活性.
3) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息。
Leader的選擇
Kafka的核心是日誌文件,日誌文件在集羣中的同步是分佈式數據系統最基礎的要素。
若是leaders永遠不會down的話咱們就不須要followers了!一旦leader down掉了,須要在followers中選擇一個新的leader.可是followers自己有可能延時過久或者crash,因此必須選擇高質量的follower做爲leader.必須保證,一旦一個消息被提交了,可是leader down掉了,新選出的leader必須能夠提供這條消息。大部分的分佈式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據全部副本節點的情況動態的選擇最適合的做爲leader.Kafka並非使用這種方法。
Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每一個節點讀取並追加到日誌中了,纔回通知外部這個消息已經被提交了。所以這個集合中的任何一個節點隨時均可以被選爲leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就能夠容許在f個節點down掉的狀況下不會丟失消息並正常提供服。ISR的成員是動態的,若是一個節點被淘汰了,當它從新達到「同步中」的狀態時,他能夠從新加入ISR.這種leader的選擇方式是很是快速的,適合kafka的應用場景。
一個邪惡的想法:若是全部節點都down掉了怎麼辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦全部節點都down了,這個就不能保證了。
實際應用中,當全部的副本都down掉時,必須及時做出反應。能夠有如下兩種選擇:
1. 等待ISR中的任何一個節點恢復並擔任leader。
2. 選擇全部節點中(不僅是ISR)第一個恢復的節點做爲leader.
這是一個在可用性和連續性之間的權衡。若是等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集羣就永遠恢復不了了。若是等待ISR意外的節點恢復,這個節點的數據就會被做爲線上數據,有可能和真實的數據有所出入,由於有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在將來的版本中將使這個策略的選擇可配置,能夠根據場景靈活的選擇。
這種窘境不僅Kafka會遇到,幾乎全部的分佈式數據系統都會遇到。
副本管理
以上僅僅以一個topic一個分區爲例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka儘可能的使全部分區均勻的分佈到集羣全部的節點上而不是集中在某些節點上,另外主從關係也儘可能均衡這樣每一個幾點都會擔任必定比例的分區的leader.
優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點做爲「controller」,當發現有節點down掉的時候它負責在游泳分區的全部節點中選擇新的leader,這使得Kafka能夠批量的高效的管理全部分區節點的主從關係。若是controller down掉了,活着的節點中的一個會備切換爲新的controller.
Leader與副本同步
對於某個分區來講,保存正分區的"broker"爲該分區的"leader",保存備份分區的"broker"爲該分區的"follower"。備份分區會徹底複製正分區的消息,包括消息的編號等附加屬性值。爲了保持正分區和備份分區的內容一致,Kafka採起的方案是在保存備份分區的"broker"上開啓一個消費者進程進行消費,從而使得正分區的內容與備份分區的內容保持一致。通常狀況下,一個分區有一個「正分區」和零到多個「備份分區」。能夠配置「正分區+備份分區」的總數量,關於這個配置,不一樣主題能夠有不一樣的配置值。注意,生產者,消費者只與保存正分區的"leader"進行通訊。
Kafka容許topic的分區擁有若干副本,這個數量是能夠配置的,你能夠爲每一個topic配置副本的數量。Kafka會自動在每一個副本上備份數據,因此當一個節點down掉時數據依然是可用的。
Kafka的副本功能不是必須的,你能夠配置只有一個副本,這樣其實就至關於只有一份數據。
建立副本的單位是topic的分區,每一個分區都有一個leader和零或多個followers.全部的讀寫操做都由leader處理,通常分區的數量都比broker的數量多的多,各分區的leader均勻的分佈在brokers中。全部的followers都複製leader的日誌,日誌中的消息和順序都和leader中的一致。followers向普通的consumer那樣從leader那裏拉取消息並保存在本身的日誌文件中。
許多分佈式的消息系統自動的處理失敗的請求,它們對一個節點是否着(alive)」有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:
1. 節點必須能夠維護和ZooKeeper的鏈接,Zookeeper經過心跳機制檢查每一個節點的鏈接。
2. 若是節點是個follower,他必須能及時的同步leader的寫操做,延時不能過久。
符合以上條件的節點準確的說應該是「同步中的(in sync)」,而不是模糊的說是「活着的」或是「失敗的」。Leader會追蹤全部「同步中」的節點,一旦一個down掉了,或是卡住了,或是延時過久,leader就會把它移除。至於延時多久算是「過久」,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
只有當消息被全部的副本加入到日誌中時,纔算是「committed」,只有committed的消息纔會發送給consumer,這樣就不用擔憂一旦leader down掉了消息會丟失。Producer也能夠選擇是否等待消息被提交的通知,這個是由參數acks決定的。
Kafka保證只要有一個「同步中」的節點,「committed」的消息就不會丟失。
===========================================================================
2、kafka集羣環境部署記錄
1)服務器信息
ip地址 主機名 安裝軟件 192.168.10.202 kafka01 zookeeper、kafka 192.168.10.203 kafka02 zookeeper、kafka 192.168.10.205 kafka03 zookeeper、kafka 192.168.10.206 kafka-manager kafka-manager 4臺機器關閉iptables和selinux [root@kafka01 ~]# /etc/init.d/iptables stop [root@kafka01 ~]# vim /etc/sysconfig/selinux ...... SELINUX=disabled [root@kafka01 ~]# setenforce 0 [root@kafka01 ~]# getenforce Permissive 4臺機器作hosts綁定 [root@kafka01 ~]# vim /etc/hosts ...... 192.168.10.202 kafka01 192.168.10.203 kafka02 192.168.10.205 kafka03 192.168.10.206 kafka-manager
2)jdk安裝(四臺機器都要操做,安裝1.7以上版本)
將jdk-8u131-linux-x64.rpm下載到/opt目錄下 下載地址:https://pan.baidu.com/s/1pLaAjPp 提取密碼:x27s [root@kafka01 ~]# cd /usr/local/src/ [root@kafka01 src]# ll jdk-8u131-linux-x64.rpm -rw-r--r--. 1 root root 169983496 Sep 28 2017 jdk-8u131-linux-x64.rpm [root@kafka01 src]# rpm -ivh jdk-8u131-linux-x64.rpm [root@kafka01 src]# vim /etc/profile ...... JAVA_HOME=/usr/java/jdk1.8.0_131 JAVA_BIN=/usr/java/jdk1.8.0_131/bin PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/bin:/sbin/ CLASSPATH=.:/lib/dt.jar:/lib/tools.jar export JAVA_HOME JAVA_BIN PATH CLASSPATH [root@kafka01 src]# source /etc/profile [root@kafka01 src]# java -version java version "1.8.0_131" Java(TM) SE Runtime Environment (build 1.8.0_131-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
3)安裝及配置kafka(192.168.10.20二、192.168.10.20三、192.168.10.205三臺機器以下一樣操做)
1)安裝三個節點的zookeeper(zookeeper集羣部署能夠參考:http://www.cnblogs.com/kevingrace/p/7879390.html) [root@kafka01 ~]# cd /usr/local/src/ [root@kafka01 src]# wget http://apache.forsale.plus/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz [root@kafka01 src]# tar -zvxf zookeeper-3.4.10.tar.gz [root@kafka01 src]# mkdir /data [root@kafka01 src]# mv zookeeper-3.4.10 /data/zk 修改三個節點的zookeeper的配置文件,內容以下所示: [root@kafka01 src]# mkdir -p /data/zk/data [root@kafka01 src]# cp /data/zk/conf/zoo_sample.cfg /data/zk/conf/zoo_sample.cfg.bak [root@kafka01 src]# cp /data/zk/conf/zoo_sample.cfg /data/zk/conf/zoo.cfg [root@kafka01 src]# vim /data/zk/conf/zoo.cfg #清空以前的內容,配置成下面內容 tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zk/data/zookeeper dataLogDir=/data/zk/data/logs clientPort=2181 maxClientCnxns=60 autopurge.snapRetainCount=3 autopurge.purgeInterval=1 server.1=192.168.10.202:2888:3888 server.2=192.168.10.203:2888:3888 server.3=192.168.10.205:2888:3888 =============== 配置參數說明: server.id=host:port:port:表示了不一樣的zookeeper服務器的自身標識,做爲集羣的一部分,每一臺服務器應該知道其餘服務器的信息。 用戶能夠從"server.id=host:port:port" 中讀取到相關信息。 在服務器的data(dataDir參數所指定的目錄)下建立一個文件名爲myid的文件,這個文件的內容只有一行,指定的是自身的id值。 好比,服務器"1"應該在myid文件中寫入"1"。這個id必須在集羣環境中服務器標識中是惟一的,且大小在1~255之間。 這同樣配置中,zoo1表明第一臺服務器的IP地址。第一個端口號(port)是從follower鏈接到leader機器的端口,第二個端口是用來進行leader選舉時所用的端口。 因此,在集羣配置過程當中有三個很是重要的端口:clientPort=218一、port:288八、port:3888。 =============== 注意:若是想更換日誌輸出位置,除了在zoo.cfg加入"dataLogDir=/data/zk/data/logs"外,還須要修改zkServer.sh文件,大概修改方式地方在 125行左右,內容以下: [root@kafka01 src]# cp /data/zk/bin/zkServer.sh /data/zk/bin/zkServer.sh.bak [root@kafka01 src]# vim /data/zk/bin/zkServer.sh ....... 125 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')" #添加這一行 126 if [ ! -w "$ZOO_LOG_DIR" ] ; then 127 mkdir -p "$ZOO_LOG_DIR" 128 fi [root@kafka01 src]# diff /data/zk/bin/zkServer.sh /data/zk/bin/zkServer.sh.bak 125d124 < ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')" 在啓動zookeeper服務以前,還須要分別在三個zookeeper節點機器上建立myid,方式以下: [root@kafka01 src]# mkdir /data/zk/data/zookeeper/ [root@kafka01 src]# echo 1 > /data/zk/data/zookeeper/myid ================================================================= 另外兩個節點的myid分別爲二、3(注意這三個節點機器的myid決不能同樣,配置文件等其餘都是同樣配置) [root@kafka02 src]# mkdir /data/zk/data/zookeeper [root@kafka02 src]# echo 2 > /data/zk/data/zookeeper/myid [root@kafka03 src]# mkdir /data/zk/data/zookeeper [root@kafka03 src]# echo 3 > /data/zk/data/zookeeper/myid ================================================================= 啓動三個節點的zookeeper服務 [root@kafka01 src]# /data/zk/bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /data/zk/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@kafka01 src]# ps -ef|grep zookeeper root 25512 1 0 11:49 pts/0 00:00:00 /usr/java/jdk1.8.0_131/bin/java -Dzookeeper.log.dir=/data/zk/data/logs -Dzookeeper.root.logger=INFO,CONSOLE -cp /data/zk/bin/../build/classes:/data/zk/bin/../build/lib/*.jar:/data/zk/bin/../lib/slf4j-log4j12-1.6.1.jar:/data/zk/bin/../lib/slf4j-api-1.6.1.jar:/data/zk/bin/../lib/netty-3.10.5.Final.jar:/data/zk/bin/../lib/log4j-1.2.16.jar:/data/zk/bin/../lib/jline-0.9.94.jar:/data/zk/bin/../zookeeper-3.4.10.jar:/data/zk/bin/../src/java/lib/*.jar:/data/zk/bin/../conf:.:/lib/dt.jar:/lib/tools.jar -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /data/zk/bin/../conf/zoo.cfg root 25555 24445 0 11:51 pts/0 00:00:00 grep zookeeper [root@kafka01 src]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 25512 root 23u IPv6 8293793 0t0 TCP *:eforward (LISTEN) 查看三個節點的zookeeper角色 [root@kafka01 src]# /data/zk/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /data/zk/bin/../conf/zoo.cfg Mode: follower [root@kafka02 src]# /data/zk/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /data/zk/bin/../conf/zoo.cfg Mode: leader [root@kafka03 src]# /data/zk/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /data/zk/bin/../conf/zoo.cfg Mode: follower ————————————————————————————————————————————————————————————————————————————————————————————— 2)安裝kafka(三個節點一樣操做) 下載地址:http://kafka.apache.org/downloads.html [root@kafka01 ~]# cd /usr/local/src/ [root@kafka01 src]# wget http://mirrors.shu.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz [root@kafka01 src]# tar -zvxf kafka_2.11-1.1.0.tgz [root@kafka01 src]# mv kafka_2.11-1.1.0 /data/kafka 進入kafka下面的config目錄,修改配置文件server.properties: [root@kafka01 src]# cp /data/kafka/config/server.properties /data/kafka/config/server.properties.bak [root@kafka01 src]# vim /data/kafka/config/server.properties broker.id=0 delete.topic.enable=true listeners=PLAINTEXT://192.168.10.202:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/data num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.10.202:2181,192.168.10.203:2181,192.168.10.205:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 其餘兩個節點的server.properties只須要修改下面兩行,其餘配置都同樣 [root@kafka02 src]# vim /data/kafka/config/server.properties [root@kafka02 src]# cat /data/kafka/config/server.properties broker.id=1 ...... listeners=PLAINTEXT://192.168.10.203:9092 ....... [root@kafka03 src]# vim /data/kafka/config/server.properties [root@kafka03 src]# cat /data/kafka/config/server.properties broker.id=2 ...... listeners=PLAINTEXT://192.168.10.205:9092 ...... 啓動三個節點的kafka服務 [root@kafka01 src]# nohup /data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties >/dev/null 2>&1 & [root@kafka01 src]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 26114 root 97u IPv6 8298666 0t0 TCP kafka01:XmlIpcRegSvc (LISTEN) java 26114 root 113u IPv6 8298672 0t0 TCP kafka01:53112->kafka01:XmlIpcRegSvc (ESTABLISHED) java 26114 root 114u IPv6 8298673 0t0 TCP kafka01:XmlIpcRegSvc->kafka01:53112 (ESTABLISHED) 驗證服務 隨便在其中一臺節點主機執行 [root@kafka01 src]# /data/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.10.202:2181,192.168.10.203:2181,192.168.10.205:2181 --replication-factor 1 --partitions 1 --topic test 出現下面信息說明建立成功 Created topic "test". 而後再在其餘主機查看上面建立的topic [root@kafka02 src]# /data/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.10.202:2181,192.168.10.203:2181,192.168.10.205:2181 test 到此,kafka集羣環境已部署完成!
4)安裝kafka集羣管理工具kafka-manager
爲了簡化開發者和服務工程師維護Kafka集羣的工做,yahoo構建了一個叫作Kafka管理器的基於Web工具,叫作 Kafka Manager。kafka-manager 項目地址:https://github.com/yahoo/kafka-manager。這個管理工具能夠很容易地發現分佈在集羣中的哪些topic分佈不均勻,或者是分區在整個集羣分佈不均勻的的狀況。它支持管理多個集羣、選擇副本、副本從新分配以及建立Topic。同時,這個管理工具也是一個很是好的能夠快速瀏覽這個集羣的工具,kafka-manager有以下功能:
- 管理多個kafka集羣
- 便捷的檢查kafka集羣狀態(topics,brokers,備份分佈狀況,分區分佈狀況)
- 選擇你要運行的副本
- 基於當前分區情況進行
- 能夠選擇topic配置並建立topic(0.8.1.1和0.8.2的配置不一樣)
- 刪除topic(只支持0.8.2以上的版本而且要在broker配置中設置delete.topic.enable=true)
- Topic list會指明哪些topic被刪除(在0.8.2以上版本適用)
- 爲已存在的topic增長分區
- 爲已存在的topic更新配置
- 在多個topic上批量重分區
- 在多個topic上批量重分區(可選partition broker位置)
kafka-manager安裝過程以下
下載安裝 kafka-manager 想要查看和管理Kafka,徹底使用命令並不方便,咱們可使用雅虎開源的Kafka-manager,GitHub地址以下: https://github.com/yahoo/kafka-manager 也可使用Git或者直接從Releases中下載,此處從下面的地址下載 1.3.3.7 版本: https://github.com/yahoo/kafka-manager/releases 須要注意: 上面下載的是源碼,下載後須要按照後面步驟進行編譯。若是以爲麻煩,能夠直接下載編譯好的kafka-manager-1.3.3.7.zip。 下載地址:https://pan.baidu.com/s/12j2DEt94WsWRY6dD9aR6BQ 提取密碼:8x57 [root@kafka-manager src]# ls kafka-manager-1.3.3.7.zip kafka-manager-1.3.3.7.zip [root@kafka-manager src]# unzip kafka-manager-1.3.3.7.zip [root@kafka-manager src]# mv kafka-manager-1.3.3.7 /data/ [root@kafka-manager src]# cd /data/kafka-manager-1.3.3.7 [root@kafka-manager kafka-manager-1.3.3.7]# cd conf/ [root@kafka-manager conf]# cp application.conf application.conf.bak [root@kafka-manager conf]# vim application.conf ...... #kafka-manager.zkhosts="localhost:2181" #註釋這一行,下面添加一行 kafka-manager.zkhosts="192.168.10.202:2181,192.168.10.203:2181,192.168.10.205:2181" 啓動kafka-manager [root@kafka-manager conf]# nohup /data/kafka-manager-1.3.3.7/bin/kafka-manager >/dev/null 2>&1 & ---------------------------------------------------------------------------------------------------- 須要注意: kafka-manager 默認的端口是9000,可經過 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件: [root@kafka-manager conf]# nohup bin/kafka-manager -Dconfig.file=/data/kafka-manager-1.3.3.7/conf/application.conf -Dhttp.port=8080 & ---------------------------------------------------------------------------------------------------- 啓動完畢後能夠查看端口是否啓動,因爲啓動過程須要一段時間,端口起來的時間可能會延後。 [root@kafka-manager conf]# lsof -i:9000 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 27218 root 114u IPv6 3766984 0t0 TCP *:cslistener (LISTEN) 最後就可使用http://192.168.10.206:9000訪問了
kafka-mamager測試
新建 Cluster1
點擊【Cluster】>【Add Cluster】打開以下添加集羣的配置界面:
輸入集羣的名字(如Kafka-Cluster-test)和 Zookeeper 服務器地址(如localhost:2181),選擇最接近的Kafka版本(如0.10.1.1)
-------------------------------------------------------------------
注意:若是沒有在 Kafka 中配置過 JMX_PORT,千萬不要選擇第一個複選框。
Enable JMX Polling
若是選擇了該複選框,Kafka-manager 可能會沒法啓動。
-------------------------------------------------------------------
其餘broker的配置能夠根據本身須要進行配置,默認狀況下,點擊【保存】時,會提示幾個默認值爲1的配置錯誤,須要配置爲>=2的值。提示以下。
新建完成後,運行界面以下:
查看TOPIC 信息
查看broker信息
管理 kafka-mamager
新建主題
點擊【Topic】>【Create】能夠方便的建立並配置主題。以下顯示。
因爲集羣只有三個節點,故replication factor最多隻能設置爲3
================================================
針對上面Topic->Create新建主題的配置,下面根據一張圖講解
在上圖一個Kafka集羣中,有兩個服務器,每一個服務器上都有2個分區。P0,P3可能屬於同一個主題,也多是兩個不一樣的主題。若是設置的Partitons和Replication Factor都是2,這種狀況下該主題的分步就和上圖中Kafka集羣顯示的相同,此時P0,P3是同一個主題的兩個分區。P1,P2也是同一個主題的兩個分區,Server1和Server2其中一個會做爲Leader進行讀寫操做,另外一個經過複製進行同步。若是設置的Partitons和Replication Factor都是1,這時只會根據算法在某個Server上建立一個分區,能夠是P0~4中的某一個(分區都是新建的,不是先存在4個而後從中取1個)。