初識Apache Kafka 核心概念

                初識Apache Kafka 核心概念html

                                                    做者:尹正傑java

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。正則表達式

 

 

一.kafka概要設計數據庫

  kafka在設計初衷就是爲了解決互聯網公司的超級大量級數據的實時傳輸。爲了實現這個目標,kafka在設計之初就須要考慮如下四個方面:
    (1)吞吐量/延遲
    (2)消息持久化 
    (3)負載均衡和故障轉移
    (4)伸縮性

1>.吞吐量/延遲windows

一.吞吐量/延時介紹
  咱們先打個比方:若kafka處理一條消息須要花費2ms,那麼計算獲得的吞吐量不會超過500條消息每秒(1000ms/2ms=500條/s)。可是若咱們採用批處理(batching)的思想,假設在發送前咱們首先會等待一段時間(假設是8ms),那麼此時消息發送的延遲變成了10ms(2ms+8ms),即延遲增長了4倍,但假設在這8ms中咱們總共積累了1000條消息,那麼整個系統的吞吐量就變成了100000 條/s。
  此時你會發現吞吐量提高了200倍!看到micor-batch的威力了吧?這就是目前諸如Storm Trident 和 Spark Streaming等消息處理平臺所採用的批處理思想。 

二.Kafka如何作到高吞吐量,低延遲的呢? 
  首先,kafka的寫入操做是很快的,這主要得益於它對磁盤的使用方法的不一樣。雖然kafka會持久化全部數據到磁盤,但本質上每次寫入操做其實都只是把數據寫入到操做系統的頁緩存(page cache)中,而後由操做系統自行決定何時把頁緩存中的數據寫入磁盤上。這樣的設計由三個主要的優點:
    (1)操做系統頁緩存是內存中分配的,因此消息寫入的速度很是快;
    (2)kafka沒必要直接與底層的文件系統打交道。因此煩瑣的I/O操做都交由操做系統來處理;
    (3)kafka寫入操做採用追加寫入(append)方式,避免了磁盤隨機寫操做(據資料統計,順序磁盤I/O速度是絕不遜色於隨機讀寫內存I/O速度。感興趣的小夥伴可使用相關工具測試一下。); 

三.Kafka的高吞吐量,低延遲的設計目標
  (1)大量使用操做系統頁緩存,內存操做速度快且命中率高; 
  (2)Kafka不直接參與物理I/O操做,而是交由最擅長此時的操做系統來完成; 
  (3)採用追加寫入方式,摒棄了緩慢的磁盤隨機讀/寫操做;
  (4)使用sendfile爲表明的零拷貝技術增強網絡間的數據傳輸效率;

2>.消息持久化的優勢後端

  (1)解耦消息發送和消息消費
      本質上來講,kakfa最核心的功能就是提供了生產者-消費者模式的完整解決方案。經過將消息持久化使得生產者方再也不須要直接和消費者方耦合,它只是簡單的把消息生產出來並交由kafka服務器保存便可,所以提高了總體的吞吐量。 
  (2)實現靈活的消息處理
      不少kafka的下游子系統(接受kafka消息的系統)都有這樣的需求:對於已經處理過的消息可能在將來的某個時間點從新處理一次,即所謂的消息消息重演(message replay)。消息持久化即可以很方便地實現這樣的需求。 

3>.負載均衡和故障轉移centos

  做爲一個功能完備的分佈式系統,kafka若是隻提供了最基本的消息引擎功能確定不足以幫助它脫穎而出。一套完整的消息引擎解決方案中必然要提供負載均衡(load balancing)和故障轉移(fail-over)功能。
  何爲負載均衡?顧名思義就是讓系統的負載根據必定的規則均衡地分配在全部參數工做的服務器上,從而最大限度的提高總體的運行效率。kafka實現負載均衡其實是經過智能化的分區領導者選舉(partition leader election)來實現的。 
  除了負載均衡,完備的分佈式系統還支持故障轉移,所謂故障轉移,是指當服務器意外終止時,整個集羣能夠快速的檢測到該失效(failure),並當即將該服務器上應用或服務自動轉移到其餘服務器上。故障轉移一般是「心跳」和「會話「的機制來實現的。kafka服務器支持故障轉移的方式就是使用會話機制。每臺kafka服務器啓動後會以會話的形式把本身註冊到zookeeper服務器上。一旦該服務運轉出現問題,與zookeeper的會話變不能維持從而超時失效,此時kafka集羣會選舉出另一臺服務器來徹底代替這臺服務器繼續提供服務。

4>.伸縮性 緩存

  所謂伸縮性,英文名是scalability。伸縮性表示想分佈式系統中增長額外的計算資源(好比CPU,內存,存儲或帶寬)時吞吐量提高的能力。阻礙線性擴容的一個很常見的因素就是狀態的保存。咱們知道,不管是哪類分佈式系統,集羣的每臺服務器必定會維護不少內部狀態。若是由服務器本身來保存這些狀態信息,則必須處理一致性的問題。相反,若是服務器是無狀態的,狀態的保存和管理交與專門的協調服務來作(好比zookeeper)。那麼整個集羣的服務武器之間就無需繁重的狀態共享,這極大的下降了維護複雜度。假若要擴容集羣節點,只須要簡單的啓動新的節點集羣和進行自動負載均衡就能夠了。 
  Kafka正式採用了這樣的思想:每臺kafka服務器上的狀態統一交友zookeeper保管。擴展kafka集羣也只須要一步:啓動新的kafka服務器便可。固然這裏須要言明的是,在kafka服務器上並非全部的狀態信息都不保存,它只保存了很輕量級的內部狀態(好比從kakka 0.10.x版本以後,它將每一個topic的消費者的偏移量本身維護了,把這些偏移量存放到了一個叫作「__consumer_offsets」的的topic進行維護)。

 

二.Kafka簡介安全

1>.什麼是JMS服務器

  在Java中有一個角消息系統的東西,咱們叫他Java Message Service,簡稱JMS。好比各類MQ。咱們舉個簡單的例子,在java中進程之間通訊須要socket,「路人甲」要向「路人乙」發送數據,須要「路人乙」開啓服務,暴露端口。這樣面臨的問題就是若是「路人乙」不在線,「路人甲」就不能發送數據給「路人乙」。

  爲了解決該問題就須要在「路人甲」和「路人乙」之間引入消息中間件,進行解耦。以下圖所示:

2>.JMS的兩種工做模式

  第一種模式:點到點(point to point,簡稱P2P),典型的一對一模式(一我的發送數據的同時只有一我的接收數據),也有人稱之爲端到端(peer to peer)或者隊列模式(queue)。

  第二種模式:發佈訂閱模式(publish subscribe,簡稱P-S),典型的一對多模式(一我的發送數據的同時能夠有多我的接收數據),也有人稱爲主題模式(在生產者和消費者之間加入了topic(主題),主題至關於公告欄,生產者發送消息到主題後,全部消費者均可以看到,功能相似於我們平時接觸的微信公衆號)。

3>.Kafka的工做模式

    Kafka的工做模式能夠把JMS的兩種模式結合在一塊兒,咱們稱之爲消費者組模式。

4>.什麼是Kafka

Kafka最初是由LinkedIn公司開發,並於 2011年初開源。在流式計算中,Kafka通常用來緩存數據,Storm,Spark,Fink等經過消費Kafka的數據進行計算。Apache Kafka是一個開源消息系統,由Scala寫成。是由Apache軟件基金會開發的一個開源消息系統項目。

做爲一個流系統,Apache Kafka有三種關鍵能力:   (
1)發佈和訂閱記錄流。在這個方面其等於一個消息隊列或企業級消息中間件,也是咱們最熟悉的用法   (2)以容錯的方式存儲記錄流。這是Kafka和不少僅基於內存的消息隊列的重要區別   (3)對流中的記錄進行處理。這是Kafka 0.10後新增的能力(Kafka Streams) 爲進一步學習其機制,你須要瞭解:   (1)Kafka以集羣形式運行在一到多臺服務器上,每一個Kafka工做進程稱爲broker;   (2)Kafka集羣對記錄的流進行分類存儲,這種分類稱爲主題(topic);   (3)每條記錄由鍵(key:消息鍵,對消息作partition時使用,即決定消息被保存在某topic下的哪一個partition)、值(value:消息體,保存實際的消息數據)、時間戳(timestamp:消息發送時間戳, 用於流式處理及其餘依賴時間的處理語義。若是不指定,則取當前時間)等,感興趣的能夠深刻了解一下Kafka的消息格式,網上有不少相關資料,我這裏就不當搬運工了.

5>.Kafka版本

  Kafka是一種高吞吐,分佈式,基於發佈訂閱的流系統。最初由LinkedIn公司開發,後成爲Apache頂級項目。目前(20190710)社區版本最新版本爲2.2.x,從0.10.0版本開始核心設計沒有大的變化,API也基本穩定。

  以下圖所示,咱們能夠對比 Cloudera Distribution of Apache Kafka(簡稱CDK)和 Apache Kafka的對應關係,原連接爲:https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html#concept_fzg_phl_br。

6>.Kafka核心API

以下圖所示,Kafka有四類核心API: 
  (1)Producer API
      容許應用程序將記錄流發佈到一個或多個topic中.
  (2)Consumer API
      容許應用程序從一個或多個topic中訂閱記錄流.
  (3)Streams API
      容許應用程序做爲流處理器,從一個或多個topic中消費輸入流,並向一個或多個topic寫出輸出流,在輸入流和輸出流中作轉換.
  (4)Connector API
      容許建立和運行可重用的生產者或消費者,其將topic鏈接到現有應用程序或數據系統,例如一個到關係型數據庫的connector可將表的變更捕捉到topic中.

 

7>.kafka特色

  第一:能夠處理大量數據,TB級別;

  第二:高吞吐量,支持每秒種百萬消息,傳輸速度可達到300MB/s;

  第三:分佈式,支持在多個Server之間進行消息分區;

  第四:多客戶端支持,和多種語言進行協同;

  第五:它是一個集羣,擴容起來也至關方便;

 

三.kafka消息隊列

1>.kafka消息隊列內部實現原理

點對點模式(一對一,消費者主動拉取數據,消息收到後消息清除,pull)
    點對點模型一般是一個基於拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特色是發送到隊列的消息被一個且只有一個接收者接收處理,即便有多個消息監聽者也是如此。


發佈/訂閱模式(一對多,數據生產後,推送給全部訂閱者,push)
    發佈訂閱模型則是一個基於推送的消息傳送模型。發佈訂閱模型能夠有多種不一樣的訂閱者,臨時訂閱者只在主動監聽主題時才接收消息,而持久訂閱者則監聽主題的全部消息,即便當前訂閱者不可用,處於離線狀態。

2>.爲何須要消息隊列

  (1)解耦:
      容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。
  (2)冗餘:
      消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
  (3)擴展性:
      由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。
  (4)靈活性 & 峯值處理能力:
      在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
  (5)可恢復性:
      系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
  (6)順序保證:
      在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)
  (7)緩衝:
      有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。
  (8)異步通訊:
      不少時候,用戶不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。

3>. Kafka架構圖簡介

 

Kafka關鍵名詞介紹:
一.Producer :
    消息生產者,就是向kafka broker發消息的客戶端。

二.Consumer :
    消息消費者,向kafka broker取消息的客戶端

三.Topic :
    能夠理解爲一個隊列,它是Kafka管理消息的實例。

四.Consumer Group (咱們這裏簡稱CG,即消費者組):
    這是kafka用來實現一個topic消息的廣播(發給全部的consumer)和單播(發給任意一個consumer)的手段。一個topic能夠有多個CG。topic的消息會複製-給consumer。若是須要實現廣播,只要每一個consumer有一個獨立的CG就能夠了。要實現單播只要全部的consumer在同一個CG。用CG還能夠將consumer進行自由的分組而不須要屢次發送消息到不一樣的topic。
    關於Consumer Group咱們要注意如下幾點:
        1>.在同一個CG的中,同時只能有一個consumer對topic進行消費;
        2>.在同一個CG中,全部的consumer是不會重複消費數據的,也就是說,同一個topic中的某個Partition中的數據被當前CG的一個consumer消費後,是不會再被這個GC中的其它consumer再次進行消費啦;
        3>.在同一個CG中,每個consumer消費單元是都以Partition爲消費單元的,換句話說,在同一個CG中,只要consumer和topic中的Partition創建RPC鏈接後,那麼這個Partition中的全部數據只會被這個consumer消費。

五.Broker :
    一臺kafka服務器就是一個broker。一個集羣由多個broker組成。一個broker能夠容納多個topic。

六.Partition:
    Kafka集羣中,Partition是生產者和消費者操做的最小單元。爲了實現擴展性,一個很是大的topic能夠分佈到多個broker(即服務器)上,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的總體(多個partition間)的順序。

七.Offset:
    kafka的存儲文件都是按照offset.kafka來命名,用offset作名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件便可。固然the first offset就是00000000000.kafka

八.Zookeeper集羣
    Zookeeper保存的東西有兩個:
        1>.Kafka集羣節點的狀態信息(便於管理leader和follower角色);
        2>.消費者當前正在消費消息的狀態信息(好比保存消費者消費的偏移量(Offset))。
        舒適提示:它並無保存生產者的一些元數據信息。

九.Replica
    Kafka存儲數據的分區分爲主從,即Leader和Follower。也就是說,kafka本身就有冗餘機制,它將數據寫入Leader的Partition以後,會將這份數據拷貝到其它broker的follower的Partition之中。舒適提示:這個存儲在follower的Partition數據不會直接和消息生成者溝通,更不會跟消息消費者進行溝通,它僅僅是起到一個數據備份的做用!當Kafka的leader節點掛掉時,follower的各個節點會重寫選舉出新的leader,並由新的leader向外提供服務。             

十.Event
    Kafka集羣保存消息是以Partition去保存的,每個Partition是按照隊列去保存的,消息是以Event來包裝消息的,一個消息就是一個event。所以每一個Partition的消息是有序的,多個Partition之間的消息是無序的!

 

四.Kafka核心概念

1>.topic和partition

  在概念上來講,topic只是一個邏輯概念,表明了一類消息,也能夠認爲是消息被髮送到的地方。一般咱們可使用topic來區分實際業務,好比業務A使用一個topic,業務B使用另外一個topic。從本質上說,每一個Kafka topic都由若干個partition組成,而Kafka的partition是不可修改的有序消息序列,也就是說是有序的消息日誌。每一個partition有本身專屬的partition號,一般是從0開始的。用戶堆partition我惟一能作的操做就是在消息序列的尾部追加寫入消息。

  partition上的每條消息都會被分配一個惟一的序列號,按照Kafka的術語來說,該序列號被稱爲位移(offset)。該位移值是從0開始順序遞增的證書。位移信息能夠惟必定義到某partition下的一條消息。值得一提的是,Kafka的partition實際上並無太多的業務含義,它的引入就是單純的爲了提高系統的吞吐量,所以在建立Kafka topic的時候能夠根據集羣實際配置設置具體的partition數,實現總體性能的最大化。

2>.offset

  上面說過,topic partition下的每條消息都被分配了一個位移值。實際上,Kafka消費者端也有位移(offset)的概念,但必定要注意這兩個offset屬於不一樣的概念。

  顯然,每條消息在某個partition的位移是固定的,但消費該partition的消費者的位移是會隨着消費進度不斷遷移,但終究不可能超過該分區最新一條消息的位移。綜合以前說的topic,partition和offset,咱們能夠斷言Kafka中的一條消息其實就是一個
<topic,partition,offset>三元組(tuple),經過該元組值咱們能夠在Kafka集羣中找到位移對應的那條消息。

3>.Replica

  既然咱們已知partition是有序的消息日誌,那麼必定不能只保存者一份日誌,不然一旦保存在partition的Kafka服務器掛掉了,其上保存的消息也就都丟失了。分佈式系統必然要實現高可靠性,而目前實現的主要途徑仍是依靠冗餘機制。換句話說,就是備份多份日誌。這些分貝日誌在Kafka中被稱爲副本(replica),它們存在的惟一目的就是防止數據丟失,這一點必定要記住!

4>.leader和follower

  副本(replia)分爲兩類:領導者副本(leader replia)和追隨者副本(follower replia)。follower replica是不能提供服務給客戶端的,也就是說不負責響應客戶端發來的消息寫入和消息消費請求。它只是被動地向領導者副本(leader replia)獲取數據,而一旦leader replica 所在的broker宕機,Kafka會從剩餘的replica中選舉出新的leader繼續提供服務。

  Kafka保證同一個partition的多個replica必定不會分配在同一臺broker上。畢竟若是同一個broker上有同一個partition的多個replica,那麼將沒法實現備份冗餘的效果。

5>.producer

  生產者將數據發佈到它們指定的topics。生產者負責選擇將記錄分配到topic中的哪一個分區。能夠以round-robin方式分配以簡單地負載均衡,或能夠按能夠按某個分區函數(基於記錄的鍵來計算)來分配。 

6>.consumer

  消費者擁有一個消費者組(consumer group)名,topic的每條記錄會被 傳輸給其消費者組中的惟一一個消費者。消費者實例能夠是單獨的線程,也能夠位於單獨的機器上。 

  若是全部的消費者實例有相同的消費者組名,記錄會在這些消費者實例間 有效地負載均衡。若是全部的消費者實例都屬於不一樣的消費者組,則每條 記錄會被廣播到全部的消費者線程上。

  Kafka中消費的實現,是將log中的分區劃分到消費者實例上,使得任什麼時候 刻每一個實例都是分區「公平劃分」後的排他的消費者。這個管理組內成員 的過程是由Kafka協議動態處理的。若是新實例加入消費者組,它們會從 其餘組內成員接管部分分區;若是一個實例掛掉,其分區會被分配給其餘 實例。 Kafka只提供分區內記錄的全局有序,而不是某topic內不一樣分區間的全局 有序。分區有序加上按鍵分區數據的能力對大多數應用來講足夠。然而, 若是必定須要記錄的全局有序,能夠經過指定topic只有一個分區實現, 但這意味着每一個消費者組只能有一個消費線程。 

  下圖爲2節點的Kafka集羣維持4個分區(P0-P3),有2個消費者組。消費 者組A有2個消費者,消費者組B有4個消費者。 

7>.broker

  Kafka是一個分佈式消息隊列。Kafka對消息保存是根據Topic進行歸類,發送消息者稱爲Producer,消息接受者稱爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)成爲broker。

  不管是kafka集羣,仍是producer和consumer都依賴於zookeeper集羣保存一些meta信息,來保證系統可用性。所以Zookeeper在生產環境中最少建議部署3臺,若是集羣較大(100個節點以上)推薦配置5臺。

8>.ISR

  ISR的全稱是in-sync replica,翻譯過來就是與leader replica保持同步的replica集合。這是一個特別重要的概念。前面講了不少關於Kafka的副本機制,好比一個partition能夠配置N個replica,那麼這是否就意味着對副本因子爲N的topic,可容忍最多N-1個服務器失效,不會丟失任何已提交到broker的記錄呢?答案是:「否」!(網上有資料說是能夠保證數據不丟失,若是在min.insync.replicas使用默認值爲1且unclean.leader.election.enable的值爲true時,可能會致使數據丟失,而Producer的acks設置的爲1時,也就是當leader的parition寫入成功就認爲數據是寫入成功的,咱們知道follower是有可能和leader節點不一致的!所以當leader節點掛掉的話,此時的數據就丟失啦!)
  
  副本數對Kafka的吞吐率是有必定的影響,但極大的加強了可用性。默認狀況下Kafka的replica數量爲1,即每一個partition都有一個惟一的leader,爲了確保消息的可靠性,一般應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置爲大於1,好比3。

  全部的副本(replicas)統稱爲Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支持replica.lag.time.max.ms這個維度),任意一個超過閾值都會把follower踢出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。相反的,當這些replicas從新「追上」了leader的進度時,那麼Kafka會將他們加回到ISR中。這一切都是自動維護的,不須要用戶進行人爲干預,於是在保證了消息交付語義的同時,還簡化了用戶的操做成本。

9>.高水位線

High watermark(高水位線)如下簡稱HW,表示消息被commit的部分,leader(和followers,看狀況)確認寫入本地log並返回ack後這些數據才被認爲是committed(kafka的參數min.insync.replicas能夠控制多少個副本ack後纔算commit,默認爲1,即只須要leader寫成功該段log就committed,此時ISR可能最小隻有1,若是設爲n,則表示leader和n-1個followers都確認寫入纔算committed),也就是能夠被消費,因此在HW位置如下的消息均可以被消費。(在min.insync.replicas=1時,若是此時ISR只有1個replica,leader又掛掉,又設置了unclean.leader.election.enable=true,則會有一個非ISR內的follower成爲新的leader,此時HW會縮小,所以consumer的lag值可能出現負數!這些配置在Apache Kafka 0.10都是默認值,所以極可能出現)

Log end offset(日誌結束位置)如下簡稱LEO,表示消息的最後位置。LEO>=HW,通常會有沒提交的部分(除了ISR中最慢的follower,事實上HW=min(ISR中各副本的LEO))。uncommitted部分的消息,須要等待如何向producer進行ack,對應的請求會停留在一塊稱爲purgatory(煉獄)的區域中。具體怎麼ack,取決於producer的acks設置:若是設置爲0,則producer發出消息後就不等待ack,是at most once的一致性;若是設爲1(默認值,注意!),則只要leader的log成功寫入請求就返回了,但若是在ack發送給producer前leader就掛了(或丟包了),producer仍然收不到ack,會對請求進行重發,此時數據就有可能重,是at least once的一致性。但在leader掛掉時,leader有但未被followers同步的數據又沒法同步,會丟一部分數據;若是設置爲all或-1,則須要全部ISR中的followers確認寫入後請求才返回(注意,若是容許ISR最少只有1個成員,則和acks=1又沒有實質區別了!),也是at least once的一致性,且基本不存在丟數據的可能。

所以,對於producer來講,保證數據不丟(producer送出的消息均一直能被消費到)須要知足如下條件:min.insync.replicas至少設爲二、unclean.leader.election.enable設爲false、producer的acks設爲all。但這種設置會對kafka的吞吐率、故障恢復時間形成巨大影響,究竟要不要保證這麼強的一致性,就須要你評估了。

以上討論的都與producer相關。對於consumer一側,狀況則比較明朗,根據同步仍是異步消費、commit調用的時機決定了at least once仍是atmost once,你們可參考文檔或圖書進行理解。

10>.reblance掃盲

一.rebalance簡介
  consumer group的rebalance本質上是一組協議,它規定了一個consumer group 是如何達成一致來分配訂閱topic的全部分區的。假設某個組下有20個consumer實例,該組訂閱一個有着100個分區的topic。正常狀況下,Kafka會爲每一個consumer平均分配5個分區。這個分配過程就被稱爲rebalance。
  當consumer成功執行rebalance後,組訂閱topic的每一個分區只會分配給組內一個consumer實例。換句話說,同一個消費者組的消費者不能同時對同一個topic的同一個分區進行消費。
  和舊版本consumer依託於zookeeper進行rebalance不一樣,新版本consumer使用了Kafka內置的一個全新的協議(group coordination protocol)。對於每一個組而言,Kafka的某個broker會被選舉爲組協調者(group coordinator)。coordinator負責對組對狀態進行管理,他的主要責任就是當新成員到達時促成組內全部成員達成新對分區分配方案,即coordinator負責對組執行rebalance操做。

二.rebalance觸發條件
  組rebalance觸發對條件有如下3個:
    第一:組成員發生變動,好比新consumer加入組,或已有consumer主動離開組,再或是已有consumer崩潰時則觸發rebalance;
    第二:組訂閱topic數發生變動,好比使用基於正則表達式對訂閱,當匹配正則表達式對新topic被建立時則會觸發rebalance;
    第三:組訂閱topic時分區發生變動,好比使用命令行腳本增長了訂閱topic的分區數;
  真實應用場景引起rebalance最多見的緣由就是違背了第一個條件(好比flume的kafka source相對於broker集羣來講就是consumer對象),特別是consumer崩潰的狀況。這裏的崩潰不必定就是指consumer進程「掛掉」或consumer進程所在的機器宕機。當consumer沒法在指定的時間內完成消息處理,那麼coordinator就認爲該consumer已經崩潰,從而引起新一輪rebalance。
  我在生產環境中也使用flume消費kafka的數據到hdfs集羣上,也遇到過這種rebalance的狀況,最終分析緣由是:該group下的consumer處理消息的邏輯太重,並且事件處理時間波動很大,很是不穩定,從而致使coordinator會常常行的認爲某個consumer已經掛掉,引起rebalance。鑑於目前一次rebalance操做的開銷很大,生產環境中用戶必定要結合自身業務特色仔細調優consumer參數:「request.timeout.ms」,「max.poll.records」和「max.poll.interval.ms」這幾個參數,以免沒必要要的rebalance出現。

三.rebalance協議
  前面咱們提到過rebalance本質上是一組協議。group於coordinator共同使用這組協議完成group的rebalance。最新版本的Kafka中提供了下面5個協議來處理rebalance相關事宜。
    第一:JoinGroup請求:consumer請求加入組;
    第二:SyncGroup請求:group leader 把分配方案同步更新到組內全部成員中;
    第三:Heartbeat請求:consumer按期向coordinator彙報心跳代表本身依然存活;
    第四:LeaveGroup請求:consumer主動通知coordinator該consumer即將離組;
    第五:DescribeGroup請求:查看組的全部信息,包括成員信息,協議信息,分配方案以及訂閱信息等。該請求類型主要供管理員使用。coordinator不使用該請求執行rebalance。
  在rebalance過程當中,coordinator主要處理consumer發過來的joinGroup和SyncGroup請求。當consumer主動離組時會發送LeaveGroup請求給coordinator。
  在成功rebalance過程當中,組內全部consumer都須要按期向coordinator發送Hearbeat請求。而每一個consumer也是根據Heartbeat請求的響應中是否包含REBALANCE_IN_PROGRESS來判斷當前group是否開啓來新一輪rebalance。
  
  好啦~關於rebalance我們瞭解到這裏基本上就夠用來,感興趣的小夥伴能夠查看rebalance genneration,rebalance流程,rebalance監聽器等技術,咱們這裏就不用深刻探討啦~

 

五.Kafka的使用場景

  Kafka以消息引擎聞名,所以它特別適合處理生產環境中的那些流式數據。如下就是Kafka在實際應用中一些典型的使用場景。

1>.消息傳輸

  Kafka很是適合替代傳統的消息總線(message bus)或消息代理(message broker)。傳統的這類系統擅長於解耦生產者和消費者以及批量處理消息,而這些特色Kafka都具有。除此以外,Kafka還具備更好的吞吐量特性,其內置的分區機制和副本機制既實現了高性能的消息傳輸,同時還達到了高性能的高容錯性。一次Kafka特別適合用於實現一個超大量級消息處理應用。

2>.網站行爲日誌追蹤

  Kafka最先就是用於重建用戶行爲數據追蹤系統的。不少網站上的用戶操做都會以消息的形式發送到Kafka的某個對應的topic上。這些點擊流蘊含了巨大的商業價值,事實上,目前就有不少創業公司使用機器學習或其餘實時處理框架來幫助收集並分析用戶的點擊流數據。鑑於這種點擊流數據量是很大的,Kafka超強的吞吐量特性此時就有了用武之地。

3>.審計數據收集

  不少企業和組織都須要對關鍵的操做和運維進行監控和審計。這就須要從各個方面運維應用程序處實時彙總操做步驟信息進行集中式管理。在這種使用場景下,你會發現Kafka是很是適合的解決方案,它能夠便捷的對多路消息進行實時收集,同時因爲其持久化的特性,是的後續離線審計稱爲可能。

4>.日誌收集

  這多是Kafka最多見的使用方式了(日誌收集彙總解決方案),每一個企業都會產生大量的服務日誌,這些日誌分散在不一樣的機器上。咱們可使用Kafka對他們進行全量收集,並集中往下游的分佈式存儲中(好比HDFS等)。比起其餘主流的日誌抽取框架(好比Apache Flume),Kafka有更好的性能,並且提供了完備的可靠性解決方案,同時還保持 了低延遲的特色。

5>.Event Sourcing

  Event Sourcing其實是領域驅動設計(Domain-Driven Design,簡稱DDD)的名次,它使用事件序列來表示狀態變動,這種思想和Kafka的設計特性不謀而合。還記得吧,Kafka也是用不可變動的消息序列來抽象化表示業務信息的,所以Kafka特別適合做爲這種應用的後端存儲。

6>.流式處理

    不少用戶接觸到Kafka都是由於它的消息存儲引擎。自0.10.0.0版本開始,Kafka社區推出了一個全新的流式組件 Kafka Streams。這標誌着Kafka正式進入流式處理框架俱樂部。相比老牌流式處理框架Apache Storm,Apache Samza,或是最近風頭正勁的Spark Streaming,抑或是Apache Flink,Kafka Streams的競爭力如何?讓咱們拭目以待吧!

 

六.集羣環境規劃 

1>.操做系統的選型

  咱們知道Kafka依賴於Java環境,所以咱們只要能在操做系統上安裝jdk理論上就能夠部署kafka環境了。沒錯,事實上kafka的確能夠運行在主流的操做系統上,好比windows,Linux,mac OS等等。可是這麼多操做系統咱們究竟應該選擇哪一個操做系統去安裝呢?爲何你們部署kafka集羣都選擇的是Linux環境呢?其實我們是能夠分析緣由的:
    第一:Kafka新版本clients在設計底層網絡庫時採用了Java的Selecor機制(NIO),然後者在Linux實現機制就是epoll;可是在window平臺上,Java NIO的Selector底層是使用select模型而非IOCP實現的,只有Java NIO2纔是使用IOCP實現的。所以這一點上,在Linux部署Kafka要在比Windows上部署可以獲得高效的I/O處理能力;
    第二:對於數據網絡傳輸效率而言,Linux也更具備優點。具體來講,Kafka這種應用必然須要大量的經過網絡於磁盤進行數據傳輸,而大部分這樣的操做都是經過Java的FileChannel.transferTo方法實現的,在Linux平臺上該方法底層會調用sendfile系統調用,即採用了Linux提供的零拷貝(Zero Copy)技術。
  
  Kafka能夠在ext4或xfs上很好的工做。有一些掛載優化選項可用,例如noatime,data=writeback等,但除noatime強烈建議使用外,其他選項對效率的提高有限,能夠參考官網文件進行配置。
  
  另外一個問題是要不要調整 文件刷寫行爲。Kafka的寫操做當即將數據寫到文件系統層面,但文件系統是否立刻將數據刷寫的到磁盤上,則有一些選項能夠控制。例如,Kafka的默認行爲是應用內不調用fsync,而是由OS的後臺線程進行刷寫;也能夠週期性地調用fsync確保數據已經刷寫到磁盤。良好的事件是,不要改變默認的行爲。在文件系統層面也不須要作額外的調整。

2>.磁盤規劃

  事實上,根據公開的資料顯示,LinkedIn公司的Kafka集羣就是使用RAID 10做爲底層存儲的。除了默認提供的數據冗餘以外,RAID 10 還能夠將數據自動的負載分佈到多個磁盤上。因而可知,RAID做爲Kafka的底層存儲其實主要的優點有兩個:
    第一:提供冗餘的數據存儲空間;
    第二:自然提供負載均衡;
  以上兩個優點對於任何系統而言都是很好的特性。不過對於Kafka而言,Kafka在框架層面其實已經提供了這兩個特性:經過副本機制提供冗餘和高可靠性,以及經過分散到各個節點的領導者選舉機制來實現負載均衡,因此從這方面來看,RAID的優點就顯得不是那麼明顯了。事實上,LinkedIn公司目前正在計劃將整個Kafka集羣從RAID
10 遷移到JBOD上,只不過在整個過程當中JBOD方案須要解決當前的Kafka一些固有缺陷,好比:     第一:任意磁盤損壞都會致使broker宕機,普通磁盤損壞的機率是很大的,所以這個缺陷從某種程度上來講是致命的。不過社區正在改進這個問題,將來版本中只要爲broker配置的多塊磁盤中還有良好的磁盤,broker就不會掛掉。     第二:JBOD的管理須要更加細粒度化,目前Kafka沒有提供腳本或其餘工具用於在不一樣磁盤間進行手動分配,但這是使用JBOD方案中必要的功能。     第三:JBOD也應該提供相似於負載均衡的功能,目前只是間的依賴輪訓的方式爲副本數據選擇磁盤,後續須要提供更加豐富的策略。
  結合JBOD和RAID之間的優劣對比以及LinkIn公司的實際案例,我們能夠給硬盤規劃的結論性總結以下:     第一:追求性價比的公司能夠考慮使用JBOD;     第二:使用機械硬盤徹底能夠知足Kafka集羣的使用,SSD更好(儘可能不要使用NAS(Network Attached Storage)這樣的網絡存儲設備。);

3>. 磁盤容量規劃

對於磁盤容量的規劃和如下結果因素有關:
    第一:新增消息數;
    第二:消息留存時間;
    第四:平均消息大小;
    第五:副本數;
    第六:是否啓用壓縮;

根據實際狀況而定,好比咱們線上的服務器磁盤使用率始終達不到50,儘管我默認保留了168小時,即數據保留了7天喲~就會存在資源浪費的狀況,雖說你能夠爲了使用這些存儲空間,能夠在該服務器上搭建其餘網絡存儲服務,但你沒法肯定搭建的這個服務是否在高峯期時會影響到kafka集羣的性能!

[root@kafka116 ~]# df -h
Filesystem                         Size  Used Avail Use% Mounted on
/dev/mapper/centos_localhost-root   50G   27G   24G  53% /
devtmpfs                            16G     0   16G   0% /dev
tmpfs                               16G     0   16G   0% /dev/shm
tmpfs                               16G  1.6G   14G  11% /run
tmpfs                               16G     0   16G   0% /sys/fs/cgroup
/dev/sda1                         1014M  143M  872M  15% /boot
/dev/mapper/centos_localhost-home   80T   26T   55T  32% /home
tmpfs                              3.2G     0  3.2G   0% /run/user/0
tmpfs                              3.2G     0  3.2G   0% /run/user/1003
[root@kafka116 ~]# 

4>.內存規劃

  Kafka對於內存對使用可稱做其設計亮點之一。雖然在前面咱們強調了Kafka大量依靠和磁盤來保存消息,但其實它還會對消息進行緩存,而這個消息換粗你得地方就是內存,具體來講是操做系統對頁緩存(page cache)。Kafka雖然會持久化每條消息,但其實這個工做都是底層對文件系統來完成。Kafka僅僅將消息寫入page cache而已,以後將消息「flush」到磁盤對任務徹底交由操做系統來完成。
  
  通常狀況下,broker所需的堆內存都不會超過6GB。因此對於一臺16GB內存的機器而言,文件系統page cache的大小甚至能夠達到10~14GB!總之對於內存規劃的建議以下:     第一:儘可能分配更多的內存給操做系統的page cache;     第二:不要爲broker設置過大的堆內存,最好不超過6GB;     第三:page大小至少要大於一個日誌段的大小;

5>.CPU規劃

  比起磁盤和內存,CPU於kafka而言並無那麼重要,嚴格來講,kafka不屬於計算密集型(CPU-bound)的系統,所以對於CPU須要記住一點就能夠了:追求多核而非高時鐘頻率。我們對CPU資源規劃以下:
    第一:使用多核系統,CPU核數最好大於8;
    第二:若是使用Kafka 0.10.0.0以前的版本或clients端消息版本不一致(若無顯式配置,這種狀況多半由clients和broker版本不一致形成),則考慮多配置一些資源以防止消息解壓操做消耗過多CPU)。

6>.帶寬規劃

    第一:儘可能使用高速網絡;
    第二:根據自身網絡條件和帶寬來評估Kafka集羣機器數量;
    第三:避免使用跨機房網絡;

7>.關於JVM

 須要使用1.8以上的JDK。推薦使用G1GC,其次可選擇ParNew+CMS的組合(可是作的相應的調整也比較多)。設置充足的堆大小。如下是一個示範例子:
  -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

8>.典型線上環境配置

下面給出一份典型的線上環境配置,用戶能夠參考這份配置以及結合本身的是實際狀況進行二次調整:
    CPU 24核心;
    內存 32GB;
    磁盤 1TB 7200轉SAS盤2快;
    帶寬:1Gb/s;
    ulimit -n 1000000;
    Socket Buffer 至少64KB,適合於跨機房網絡傳輸;

 

 七.博主推薦閱讀Kafka相關書籍

《Kafka權威指南》

《Apache Kafka實戰》
相關文章
相關標籤/搜索