kafka 基礎知識梳理-kafka是一種高吞吐量的分佈式發佈訂閱消息系統

1、kafka 簡介html

今社會各類應用系統諸如商業、社交、搜索、瀏覽等像信息工廠同樣不斷的生產出各類信息,在大數據時代,咱們面臨以下幾個挑戰:java

  1. 如何收集這些巨大的信息
  2. 如何分析它
  3. 如何及時作到如上兩點

以上幾個挑戰造成了一個業務需求模型,即生產者生產(produce)各類信息,消費者消費(consume)(處理分析)這些信息,而在生產者與消費者之間,須要一個溝通二者的橋樑-消息系統。從一個微觀層面來講,這種需求也可理解爲不一樣的系統之間如何傳遞消息。       linux

kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 web

Apache kafka 它具有快速、可擴展、可持久化的特色。它如今是Apache旗下的一個開源系統,做爲hadoop生態系統的一部分,被各類商業公司普遍應用。它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/spark流式處理引擎。算法

     1.1 Kafka的特性

  • 高吞吐量、低延遲:kafka每秒能夠處理幾十萬條消息,它的延遲最低只有幾毫秒
  • 可擴展性:kafka集羣支持熱擴展
  • 持久性、可靠性:消息被持久化到本地磁盤,而且支持數據備份防止數據丟失
  • 容錯性:容許集羣中節點失敗(若副本數量爲n,則容許n-1個節點失敗)
  • 高併發:支持數千個客戶端同時讀寫

2.2 Kafka一些重要設計思想

下面介紹先大致介紹一下Kafka的主要設計思想,可讓相關人員在短期內瞭解到kafka相關特性,若是想深刻研究,後面會對其中每個特性都作詳細介紹。apache

  • Consumergroup:各個consumer能夠組成一個組,每一個消息只能被組中的一個consumer消費,若是一個消息能夠被多個consumer消費的話,那麼這些consumer必須在不一樣的組。
  • 消息狀態:在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪一個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着若是consumer處理很差的話,broker上的一個消息可能會被消費屢次。
  • 消息持久化:Kafka中會把消息持久化到本地文件系統中,而且保持極高的效率。
  • 消息有效期:Kafka會長久保留其中的消息,以便consumer能夠屢次消費,固然其中不少細節是可配置的。
  • 批量發送:Kafka支持以消息集合爲單位進行批量發送,以提升push效率。
  • push-and-pull : Kafka中的Producer和consumer採用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,二者對消息的生產和消費是異步的。
  • Kafka集羣中broker之間的關係:不是主從關係,各個broker在集羣中地位同樣,咱們能夠隨意的增長或刪除任何一個broker節點。
  • 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對於0.7.x主要靠zookeeper來實現負載均衡)。
  • 同步異步:Producer採用異步push方式,極大提升Kafka系統的吞吐率(能夠經過參數控制是採用同步仍是異步方式)。
  • 分區機制partition:Kafka的broker端支持消息分區,Producer能夠決定把消息發到哪一個分區,在一個分區中消息的順序就是Producer發送消息的順序,一個主題中能夠有多個分區,具體分區的數量是可配置的。分區的意義很重大,後面的內容會逐漸體現。
  • 離線數據裝載:Kafka因爲對可拓展的數據持久化的支持,它也很是適合向Hadoop或者數據倉庫中進行數據裝載。
  • 插件支持:如今很多活躍的社區已經開發出很多插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。

2.3 kafka 應用場景

  • 日誌收集:一個公司能夠用Kafka能夠收集各類服務的log,經過kafka以統一接口服務的方式開放給各類consumer,例如hadoop、Hbase、Solr等。
  • 消息系統:解耦和生產者和消費者、緩存消息等。
  • 用戶活動跟蹤:Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic來作實時的監控分析,或者裝載到hadoop、數據倉庫中作離線分析和挖掘。
  • 運營指標:Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。
  • 流式處理:好比spark streaming和storm
  • 事件源

2.4 Kafka架構組件

客戶端和服務端經過TCP協議通訊。Kafka提供了Java客戶端,而且對多種語言都提供了支持。api

Kafka中發佈訂閱的對象是topic。咱們能夠爲每類數據建立一個topic,把向topic發佈消息的客戶端稱做producer,從topic訂閱消息的客戶端稱做consumer。Producers和consumers能夠同時從多個topic讀寫數據。一個kafka集羣由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。producers經過網絡將消息發送到Kafka集羣,集羣向消費者提供消息 數組

  • topic:消息存放的目錄即主題
  • Producer:生產消息到topic的一方
  • Consumer:訂閱topic消費消息的一方
  • Broker:Kafka的服務實例就是一個broker

2.5 Kafka Topic&Partition

消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic由是由一些Partition Logs(分區日誌)組成,其組織結構以下圖所示:緩存

咱們能夠看到,每一個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每個消息都被賦予了一個惟一的offset值。 
Kafka集羣會保存全部的消息,無論消息有沒有被消費;咱們能夠設定消息的過時時間,只有過時的數據纔會被自動清除以釋放磁盤空間。好比咱們設置消息過時時間爲2天,那麼這2天內的全部消息都會被保存到集羣中,數據只有超過了兩天才會被清除。 
Kafka須要維持的元數據只有一個–消費消息在Partition中的offset值,Consumer每消費一個消息,offset就會加1。其實消息的狀態徹底是由Consumer控制的,Consumer能夠跟蹤和重設這個offset值,這樣的話Consumer就能夠讀取任意位置的消息。 
把消息日誌以Partition的形式存放有多重考慮,第一,方便在集羣中擴展,每一個Partition能夠經過調整以適應它所在的機器,而一個topic又能夠有多個Partition組成,所以整個集羣就能夠適應任意大小的數據了;第二就是能夠提升併發,由於能夠以Partition爲單位讀寫了。服務器

1.1 kafka名詞解釋

  • producer:生產者。
  • consumer:消費者。
  • topic: 消息以topic爲類別記錄,Kafka將消息種子(Feed)分門別類,每一類的消息稱之爲一個主題(Topic)。
  • broker:以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker;消費者能夠訂閱一個或多個主題(topic),並從Broker拉數據,從而消費這些已發佈的消息。

      每一個消息(也叫做record記錄,也被稱爲消息)是由一個key,一個value和時間戳構成。

1.2 kafka有四個核心API介紹

  • 應用程序使用producer API發佈消息到1個或多個topic中。
  • 應用程序使用consumer API來訂閱一個或多個topic,並處理產生的消息。
  • 應用程序使用streams API充當一個流處理器,從1個或多個topic消費輸入流,併產生一個輸出流到1個或多個topic,有效地將輸入流轉換到輸出流。
  • connector API容許構建或運行可重複使用的生產者或消費者,將topic連接到現有的應用程序或數據系統。 

1.3 kafka基基原理

        一般來說,消息模型能夠分爲兩種:隊列和發佈-訂閱式。隊列的處理方式是一組消費者從服務器讀取消息,一條消息只有其中的一個消費者來處理。在發佈-訂閱模型中,消息被廣播給全部的消費者,接收到消息的消費者均可以處理此消息。Kafka爲這兩種模型提供了單一的消費者抽象模型: 消費者組(consumer group)。消費者用一個消費者組名標記本身。

       一個發佈在Topic上消息被分發給此消費者組中的一個消費者。假如全部的消費者都在一個組中,那麼這就變成了queue模型。假如全部的消費者都在不一樣的組中,那麼就徹底變成了發佈-訂閱模型。更通用的, 咱們能夠建立一些消費者組做爲邏輯上的訂閱者。每一個組包含數目不等的消費者,一個組內多個消費者能夠用來擴展性能和容錯。       

       而且,kafka可以保證生產者發送到一個特定的Topic的分區上,消息將會按照它們發送的順序依次加入,也就是說,若是一個消息M1和M2使用相同的producer發送,M1先發送,那麼M1將比M2的offset低,而且優先的出如今日誌中。消費者收到的消息也是此順序。若是一個Topic配置了複製因子(replication facto)爲N,那麼能夠容許N-1服務器宕機而不丟失任何已經提交(committed)的消息。此特性說明kafka有比傳統的消息系統更強的順序保證。可是,相同的消費者組中不能有比分區更多的消費者,不然多出的消費者一直處於空等待,不會收到消息。

1.5 主題和日誌 (Topic和Log)

先來看一下Kafka提供的一個抽象概念:topic.
一個topic是對一組消息的概括。對每一個topic,Kafka 對它的日誌進行了分區

每個分區(partition)都是一個順序的、不可變的消息隊列,而且能夠持續的添加。分區中的消息都被分了一個序列號,稱之爲偏移量(offset),在每一個分區中此偏移量都是惟一的。Kafka集羣保持全部的消息,直到它們過時,不管消息是否被消費了。

在一個可配置的時間段內,Kafka集羣保留全部發布的消息,無論這些消息有沒有被消費。好比,若是消息的保存策略被設置爲2天,那麼在一個消息被髮布的兩天時間內,它都是能夠被消費的。以後它將被丟棄以釋放空間。Kafka的性能是和數據量無關的常量級的,因此保留太多的數據並非問題。

實際上消費者所持有的僅有的元數據就是這個偏移量,也就是消費者在這個log中的位置。 這個偏移量由消費者控制:正常狀況當消費者消費消息的時候,偏移量也線性的的增長。可是實際偏移量由消費者控制,消費者能夠將偏移量重置爲更老的一個偏移量,從新讀取消息。

實際上每一個consumer惟一須要維護的數據是消息在日誌中的位置,也就是offset.這個offset有consumer來維護:通常狀況下隨着consumer不斷的讀取消息,這offset的值不斷增長,但其實consumer能夠以任意的順序讀取消息,好比它能夠將offset設置成爲一箇舊的值來重讀以前的消息。

能夠看到這種設計對消費者來講操做自如, 一個消費者的操做不會影響其它消費者對此log的處理。 再說說分區。Kafka中採用分區的設計有幾個目的。一是能夠處理更多的消息,不受單臺服務器的限制。Topic擁有多個分區意味着它能夠不受限的處理更多的數據。第二,分區能夠做爲並行處理的單元,稍後會談到這一點。

1.6 分佈式(Distribution)

  每一個分區在Kafka集羣的若干服務中都有副本,這樣這些持有副本的服務能夠共同處理數據和請求,副本數量是能夠配置的。副本使Kafka具有了容錯能力。
  每一個分區都由一個服務器做爲「leader」,零或若干服務器做爲「followers」,leader負責處理消息的讀和寫,followers則去複製leader.若是leader down了,followers中的一臺則會自動成爲leader。集羣中的每一個服務都會同時扮演兩個角色:做爲它所持有的一部分分區的leader,同時做爲其餘分區的followers,這樣集羣就會據有較好的負載均衡。     

 Log的分區被分佈到集羣中的多個服務器上。每一個服務器處理它分到的分區。根據配置每一個分區還能夠複製到其它服務器做爲備份容錯。 每一個分區有一個leader,零或多個follower。Leader處理此分區的全部的讀寫請求,而follower被動的複製數據。若是leader宕機,其它的一個follower會被推舉爲新的leader。 一臺服務器可能同時是一個分區的leader,另外一個分區的follower。 這樣能夠平衡負載,避免全部的請求都只讓一臺或者某幾臺服務器處理。

 

Producer將消息發佈到它指定的topic中,並負責決定發佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也能夠經過特定的分區函數選擇分區。使用的更多的是第二種。

 

發佈消息一般有兩種模式:隊列模式(queuing)和發佈-訂閱模式(publish-subscribe)。隊列模式中,consumers能夠同時從服務端讀取消息,每一個消息只被其中一個consumer讀到;發佈-訂閱模式中消息被廣播到全部的consumer中。Consumers能夠加入一個consumer 組,共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。同一組中的consumer能夠在不一樣的程序中,也能夠在不一樣的機器上。若是全部的consumer都在一個組中,這就成爲了傳統的隊列模式,在各consumer中實現負載均衡。若是全部的consumer都不在不一樣的組中,這就成爲了發佈-訂閱模式,全部的消息都被分發到全部的consumer中。更常見的是,每一個topic都有若干數量的consumer組,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組由若干consumer組成。這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。

 

由兩個機器組成的集羣擁有4個分區 (P0-P3) 2個consumer組. A組有兩個consumerB組有4個

相比傳統的消息系統,Kafka能夠很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。

在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。

Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。

在深刻學習Kafka以前,須要先了解topicsbrokersproducersconsumers等幾個主要術語。 下面說明了主要術語的詳細描述和組件。

 

在上圖中,主題(topic)被配置爲三個分區。 分區1(Partition 1)具備兩個偏移因子01。分區2(Partition 2)具備四個偏移因子0,1,23,分區3(Partition 3)具備一個偏移因子0replica 的id與託管它的服務器的id相同。

假設,若是該主題的複製因子設置爲3,則Kafka將爲每一個分區建立3個相同的副本,並將它們放入羣集中以使其可用於其全部操做。 爲了平衡集羣中的負載,每一個代理存儲一個或多個這些分區。 多個生產者和消費者能夠同時發佈和檢索消息。

    • Topics - 屬於特定類別的消息流被稱爲主題(Topics),數據存儲在主題中。主題分爲多個分區。 對於每一個主題,Kafka都保留一個分區的最小範圍。 每一個這樣的分區都以不可變的有序順序包含消息。 分區被實現爲一組相同大小的段文件。
    • Partition - 主題可能有不少分區,因此它能夠處理任意數量的數據。
    • Partition offset - 每一個分區消息都有一個稱爲偏移量的惟一序列標識。
    • Replicas of partition - 副本只是分區的備份。 副本從不讀取或寫入數據。 它們用於防止數據丟失。
    • Brokers

      • 經紀人(Brokers)是簡單的系統,負責維護公佈的數據。 每一個代理可能每一個主題有零個或多個分區。 假設,若是一個主題和N個代理中有N個分區,則每一個代理將有一個分區。
      • 假設某個主題中有N個分區而且N個代理(n + m)多於N個,則第一個N代理將擁有一個分區,下一個M代理將不會擁有該特定主題的任何分區。
      • 假設某個主題中有N個分區且N個代理(n-m)少於N個代理,則每一個代理將擁有一個或多個分區共享。 因爲經紀人之間的負載分配不均衡,不推薦這種狀況。
    • Kafka Cluster - Kafka擁有多個經紀人稱爲Kafka集羣。 Kafka集羣能夠在無需停機的狀況下進行擴展。 這些集羣用於管理消息數據的持久性和複製。

    • Producers - 生產者(Producer)是一個或多個Kafka主題的發佈者。 生產者向Kafka經紀人發送數據。 每當生產者向經紀人發佈消息時,經紀人只需將消息附加到最後一個段文件。 實際上,該消息將被附加到分區。 生產者也能夠將消息發送到他們選擇的分區。
    • Consumers - 消費者從經紀人那裏讀取數據。 消費者經過從經紀人處獲取數據來訂閱一個或多個主題並消費發佈的消息。
    • Leader - Leader是負責全部分區讀寫的節點。 每一個分區都有一臺服務器充當領導者。
    • Follower - 遵循領導者(Leader)指示的節點稱爲追隨者(Follower)。 若是領導失敗,其中一個追隨者將自動成爲新領導。 追隨者扮演正常的消費者角色,拉動消息並更新本身的數據存儲。

3、Kafka 核心組件

3.1 Replications、Partitions 和Leaders

經過上面介紹的咱們能夠知道,kafka中的數據是持久化的而且可以容錯的。Kafka容許用戶爲每一個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。若是你的副本數量設置爲3,那麼一份數據就會被存放在3臺不一樣的機器上,那麼就容許有2個機器失敗。通常推薦副本數量至少爲2,這樣就能夠保證增減、重啓機器時不會影響到數據消費。若是對數據持久化有更高的要求,能夠把副本數量設置爲3或者更多。 
Kafka中的topic是以partition的形式存放的,每個topic均可以設置它的partition數量,Partition的數量決定了組成topic的log的數量。Producer在生產數據時,會按照必定規則(這個規則是能夠自定義的)把消息發佈到topic的各個partition中。上面將的副本都是以partition爲單位的,不過只有一個partition的副本會被選舉成leader做爲讀寫用。 
關於如何設置partition值須要考慮的因素。一個partition只能被一個消費者消費(一個消費者能夠同時消費多個partition),所以,若是設置的partition的數量小於consumer的數量,就會有消費者消費不到數據。因此,推薦partition的數量必定要大於同時運行的consumer的數量。另一方面,建議partition的數量大於集羣broker的數量,這樣leader partition就能夠均勻的分佈在各個broker中,最終使得集羣負載均衡。在Cloudera,每一個topic都有上百個partition。須要注意的是,kafka須要爲每一個partition分配一些內存來緩存消息數據,若是partition數量越大,就要爲kafka分配更大的heap space。

3.2 Producers

Producers直接發送消息到broker上的leader partition,不須要通過任何中介一系列的路由轉發。爲了實現這個特性,kafka集羣中的每一個broker均可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是能夠直接被訪問的。 
Producer客戶端本身控制着消息被推送到哪些partition。實現的方式能夠是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的分區,用戶能夠爲每一個消息指定一個partitionKey,經過這個key來實現一些hash分區算法。好比,把userid做爲partitionkey的話,相同userid的消息將會被推送到同一個分區。 
以Batch的方式推送數據能夠極大的提升處理效率,kafka Producer 能夠將消息在內存中累計到必定數量後做爲一個batch發送請求。Batch的數量大小能夠經過Producer的參數控制,參數值能夠設置爲累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。經過增長batch的大小,能夠減小網絡請求和磁盤IO的次數,固然具體參數設置須要在效率和時效性方面作一個權衡。 
Producers能夠異步的並行的向kafka發送消息,可是一般producer在發送完消息以後會獲得一個future響應,返回的是offset值或者發送過程當中遇到的錯誤。這其中有個很是重要的參數「acks」,這個參數決定了producer要求leader partition 收到確認的副本個數,若是acks設置數量爲0,表示producer不會等待broker的響應,因此,producer沒法知道消息是否發送成功,這樣有可能會致使數據丟失,但同時,acks值爲0會獲得最大的系統吞吐量。 
若acks設置爲1,表示producer會在leader partition收到消息時獲得broker的一個確認,這樣會有更好的可靠性,由於客戶端會等待直到broker確認收到消息。若設置爲-1,producer會在全部備份的partition收到消息時獲得broker的確認,這個設置能夠獲得最高的可靠性保證。 
Kafka 消息有一個定長的header和變長的字節數組組成。由於kafka消息支持字節數組,也就使得kafka能夠支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但咱們推薦消息大小不要超過1MB,一般通常消息大小都在1~10kB以前。 

4、Kafka核心特性

4.1 壓縮

咱們上面已經知道了Kafka支持以集合(batch)爲單位發送消息,在此基礎上,Kafka還支持對消息集合進行壓縮,Producer端能夠經過GZIP或Snappy格式對消息集合進行壓縮。Producer端進行壓縮以後,在Consumer端需進行解壓。壓縮的好處就是減小傳輸的數據量,減輕對網絡傳輸的壓力,在對大數據處理上,瓶頸每每體如今網絡上而不是CPU(壓縮和解壓會耗掉部分CPU資源)。 
那麼如何區分消息是壓縮的仍是未壓縮的呢,Kafka在消息頭部添加了一個描述壓縮屬性字節,這個字節的後兩位表示消息的壓縮採用的編碼,若是後兩位爲0,則表示消息未被壓縮。

4.2消息可靠性

在消息系統中,保證消息在生產和消費過程當中的可靠性是十分重要的,在實際消息傳遞過程當中,可能會出現以下三中狀況:

  • 一個消息發送失敗
  • 一個消息被髮送屢次
  • 最理想的狀況:exactly-once ,一個消息發送成功且僅發送了一次

有許多系統聲稱它們實現了exactly-once,可是它們其實忽略了生產者或消費者在生產和消費過程當中有可能失敗的狀況。好比雖然一個Producer成功發送一個消息,可是消息在發送途中丟失,或者成功發送到broker,也被consumer成功取走,可是這個consumer在處理取過來的消息時失敗了。 
從Producer端看:Kafka是這麼處理的,當一個消息被髮送後,Producer會等待broker成功接收到消息的反饋(可經過參數控制等待時間),若是消息在途中丟失或是其中一個broker掛掉,Producer會從新發送(咱們知道Kafka有備份機制,能夠經過參數控制是否等待全部備份節點都收到消息)。 
從Consumer端看:前面講到過partition,broker端記錄了partition中的一個offset值,這個值指向Consumer下一個即將消費message。當Consumer收到了消息,但卻在處理過程當中掛掉,此時Consumer能夠經過這個offset值從新找到上一個消息再進行處理。Consumer還有權限控制這個offset值,對持久化到broker端的消息作任意處理。

4.3 備份機制

備份機制是Kafka0.8版本的新特性,備份機制的出現大大提升了Kafka集羣的可靠性、穩定性。有了備份機制後,Kafka容許集羣中的節點掛掉後而不影響整個集羣工做。一個備份數量爲n的集羣容許n-1個節點失敗。在全部備份節點中,有一個節點做爲lead節點,這個節點保存了其它備份節點列表,並維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:

4.4 Kafka高效性相關設計

4.4.1 消息的持久化

Kafka高度依賴文件系統來存儲和緩存消息,通常的人認爲磁盤是緩慢的,這致使人們對持久化結構具備競爭性持懷疑態度。其實,磁盤遠比你想象的要快或者慢,這決定於咱們如何使用磁盤。 
一個和磁盤性能有關的關鍵事實是:磁盤驅動器的吞吐量跟尋到延遲是相背離的,也就是所,線性寫的速度遠遠大於隨機寫。好比:在一個6 7200rpm SATA RAID-5 的磁盤陣列上線性寫的速度大概是600M/秒,可是隨機寫的速度只有100K/秒,二者相差將近6000倍。線性讀寫在大多數應用場景下是能夠預測的,所以,操做系統利用read-ahead和write-behind技術來從大的數據塊中預取數據,或者將多個邏輯上的寫操做組合成一個大寫物理寫操做中。更多的討論能夠在ACMQueueArtical中找到,他們發現,對磁盤的線性讀在有些狀況下能夠比內存的隨機訪問要快一些。 
爲了補償這個性能上的分歧,現代操做系統都會把空閒的內存用做磁盤緩存,儘管在內存回收的時候會有一點性能上的代價。全部的磁盤讀寫操做會在這個統一的緩存上進行。 
此外,若是咱們是在JVM的基礎上構建的,熟悉java內存應用管理的人應該清楚如下兩件事情:

  1. 一個對象的內存消耗是很是高的,常常是所存數據的兩倍或者更多。
  2. 隨着堆內數據的增多,Java的垃圾回收會變得很是昂貴。

基於這些事實,利用文件系統而且依靠頁緩存比維護一個內存緩存或者其餘結構要好——咱們至少要使得可用的緩存加倍,經過自動訪問可用內存,而且經過存儲更緊湊的字節結構而不是一個對象,這將有可能再次加倍。這麼作的結果就是在一臺32GB的機器上,若是不考慮GC懲罰,將最多有28-30GB的緩存。此外,這些緩存將會一直存在即便服務重啓,然而進程內緩存須要在內存中重構(10GB緩存須要花費10分鐘)或者它須要一個徹底冷緩存啓動(很是差的初始化性能)。它同時也簡化了代碼,由於如今全部的維護緩存和文件系統之間內聚的邏輯都在操做系統內部了,這使得這樣作比one-off in-process attempts更加高效與準確。若是你的磁盤應用更加傾向於順序讀取,那麼read-ahead在每次磁盤讀取中實際上獲取到這人緩存中的有用數據。 
以上這些建議了一個簡單的設計:不一樣於維護儘量多的內存緩存而且在須要的時候刷新到文件系統中,咱們換一種思路。全部的數據不須要調用刷新程序,而是馬上將它寫到一個持久化的日誌中。事實上,這僅僅意味着,數據將被傳輸到內核頁緩存中並稍後被刷新。咱們能夠增長一個配置項以讓系統的用戶來控制數據在何時被刷新到物理硬盤上。

4.4.2 常數時間性能保證

消息系統中持久化數據結構的設計一般是維護者一個和消費隊列有關的B樹或者其它可以隨機存取結構的元數據信息。B樹是一個很好的結構,能夠用在事務型與非事務型的語義中。可是它須要一個很高的花費,儘管B樹的操做須要O(logN)。一般狀況下,這被認爲與常數時間等價,但這對磁盤操做來講是不對的。磁盤尋道一次須要10ms,而且一次只能尋一個,所以並行化是受限的。 
直覺上來說,一個持久化的隊列能夠構建在對一個文件的讀和追加上,就像通常狀況下的日誌解決方案。儘管和B樹相比,這種結構不能支持豐富的語義,可是它有一個優勢,全部的操做都是常數時間,而且讀寫之間不會相互阻塞。這種設計具備極大的性能優點:最終系統性能和數據大小徹底無關,服務器能夠充分利用廉價的硬盤來提供高效的消息服務。 
事實上還有一點,磁盤空間的無限增大而不影響性能這點,意味着咱們能夠提供通常消息系統沒法提供的特性。好比說,消息被消費後不是立馬被刪除,咱們能夠將這些消息保留一段相對比較長的時間(好比一個星期)。

4.4.3 進一步提升效率

咱們已經爲效率作了很是多的努力。可是有一種很是主要的應用場景是:處理Web活動數據,它的特色是數據量很是大,每一次的網頁瀏覽都會產生大量的寫操做。更進一步,咱們假設每個被髮布的消息都會被至少一個consumer消費,所以咱們更要怒路讓消費變得更廉價。 
經過上面的介紹,咱們已經解決了磁盤方面的效率問題,除此以外,在此類系統中還有兩類比較低效的場景:

  • 太多小的I/O操做
  • 過多的字節拷貝

爲了減小大量小I/O操做的問題,kafka的協議是圍繞消息集合構建的。Producer一次網絡請求能夠發送一個消息集合,而不是每一次只發一條消息。在server端是以消息塊的形式追加消息到log中的,consumer在查詢的時候也是一次查詢大量的線性數據塊。消息集合即MessageSet,實現自己是一個很是簡單的API,它將一個字節數組或者文件進行打包。因此對消息的處理,這裏沒有分開的序列化和反序列化的上步驟,消息的字段能夠按需反序列化(若是沒有須要,能夠不用反序列化)。 
另外一個影響效率的問題就是字節拷貝。爲了解決字節拷貝的問題,kafka設計了一種「標準字節消息」,Producer、Broker、Consumer共享這一種消息格式。Kakfa的message log在broker端就是一些目錄文件,這些日誌文件都是MessageSet按照這種「標準字節消息」格式寫入到磁盤的。 
維持這種通用的格式對這些操做的優化尤其重要:持久化log 塊的網絡傳輸。流行的unix操做系統提供了一種很是高效的途徑來實現頁面緩存和socket之間的數據傳遞。在Linux操做系統中,這種方式被稱做:sendfile system call(Java提供了訪問這個系統調用的方法:FileChannel.transferTo api)。

爲了理解sendfile的影響,須要理解通常的將數據從文件傳到socket的路徑:

  1. 操做系統將數據從磁盤讀到內核空間的頁緩存中
  2. 應用將數據從內核空間讀到用戶空間的緩存中
  3. 應用將數據寫回內核空間的socket緩存中
  4. 操做系統將數據從socket緩存寫到網卡緩存中,以便將數據經網絡發出

這種操做方式明顯是很是低效的,這裏有四次拷貝,兩次系統調用。若是使用sendfile,就能夠避免兩次拷貝:操做系統將數據直接從頁緩存發送到網絡上。因此在這個優化的路徑中,只有最後一步將數據拷貝到網卡緩存中是須要的。 
咱們指望一個主題上有多個消費者是一種常見的應用場景。利用上述的zero-copy,數據只被拷貝到頁緩存一次,而後就能夠在每次消費時被重得利用,而不須要將數據存在內存中,而後在每次讀的時候拷貝到內核空間中。這使得消息消費速度能夠達到網絡鏈接的速度。這樣以來,經過頁面緩存和sendfile的結合使用,整個kafka集羣幾乎都已以緩存的方式提供服務,並且即便下游的consumer不少,也不會對整個集羣服務形成壓力。 
關於sendfile和zero-copy,請參考:zero-copy

5、Kafka集羣部署

5.1 集羣部署

爲了提升性能,推薦採用專用的服務器來部署kafka集羣,儘可能與hadoop集羣分開,由於kafka依賴磁盤讀寫和大的頁面緩存,若是和hadoop共享節點的話會影響其使用頁面緩存的性能。 
Kafka集羣的大小須要根據硬件的配置、生產者消費者的併發數量、數據的副本個數、數據的保存時長綜合肯定。 
磁盤的吞吐量尤其重要,由於一般kafka的瓶頸就在磁盤上。 
Kafka依賴於zookeeper,建議採用專用服務器來部署zookeeper集羣,zookeeper集羣的節點採用偶數個,通常建議用三、五、7個。注意zookeeper集羣越大其讀寫性能越慢,由於zookeeper須要在節點之間同步數據。一個3節點的zookeeper集羣容許一個節點失敗,一個5節點集羣容許2個幾點失敗。

5.2 集羣大小

有不少因素決定着kafka集羣須要具有存儲能力的大小,最準確的衡量辦法就是模擬負載來測算一下,Kafka自己也提供了負載測試的工具。 
若是不想經過模擬實驗來評估集羣大小,最好的辦法就是根據硬盤的空間需求來推算。下面我就根據網絡和磁盤吞吐量需求來作一下估算。 
咱們作以下假設:

  • W:每秒寫多少MB
  • R :副本數
  • C :Consumer的數量

通常的來講,kafka集羣瓶頸在於網絡和磁盤吞吐量,因此咱們先評估一下集羣的網絡和磁盤需求。 
對於每條消息,每一個副本都要寫一遍,因此總體寫的速度是W*R。讀數據的部分主要是集羣內部各個副本從leader同步消息讀和集羣外部的consumer讀,因此集羣內部讀的速率是(R-1)*W,同時,外部consumer讀的速度是C*W,所以:

  • Write:W*R
  • Read:(R-1)*W+C*W

須要注意的是,咱們能夠在讀的時候緩存部分數據來減小IO操做,若是一個集羣有M MB內存,寫的速度是W MB/sec,則容許M/(W*R) 秒的寫能夠被緩存。若是集羣有32GB內存,寫的速度是50MB/s的話,則能夠至少緩存10分鐘的數據。  

 2、kafka 安裝

2.1 jdk安裝

#以oracle jdk爲例,下載地址http://java.sun.com/javase/downloads/index.jsp 

yum -y install jdk-8u141-linux-x64.rpm

 

2.2 安裝zookeeper  

wget http://apache.forsale.plus/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar zxf zookeeper-3.4.9.tar.gz
mv zookeeper-3.4.9 /data/zk

 

修改配置文件內容以下所示: 

[root@localhost ~]# cat /data/zk/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zk/data/zookeeper
dataLogDir=/data/zk/data/logs
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
 
server.1=zk01:2888:3888
server.2=zk02:2888:3888
server.3=zk03:2888:3888

 

參數說明:

server.id=host:port:port:表示了不一樣的zookeeper服務器的自身標識,做爲集羣的一部分,每一臺服務器應該知道其餘服務器的信息。用戶能夠從「server.id=host:port:port」 中讀取到相關信息。在服務器的data(dataDir參數所指定的目錄)下建立一個文件名爲myid的文件,這個

文件的內容只有一行,指定的是自身的id值。好比,服務器「1」應該在myid文件中寫入「1」。這個id必須在集羣環境中服務器標識中是惟一的,且大小在1~255之間。這同樣配置中,zoo1表明第一臺服務器的IP地址。第一個端口號(port)是從follower鏈接到leader機器的
端口,第二個端口是用來進行leader選舉時所用的端口。因此,在集羣配置過程當中有三個很是重要的端口:clientPort:218一、port:288八、port:3888。
關於zoo.cfg配置文件說明,參考鏈接https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_configuration;

若是想更換日誌輸出位置,除了在zoo.cfg加入"dataLogDir=/data/zk/data/logs"外,還須要修改zkServer.sh文件,大概修改方式地方在125行左右,內容以下: 

125 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"
126 if [ ! -w "$ZOO_LOG_DIR" ] ; then
127 mkdir -p "$ZOO_LOG_DIR"
128 fi

 

在啓動服務以前,還須要分別在zookeeper建立myid,方式以下: 

echo 1 >  /data/zk/data/zookeeper/myid

 

啓動服務

/data/zk/bin/zkServer.sh start

 

驗證服務 

### 查看相關端口號[root@localhost ~]# ss -lnpt|grep java
LISTEN     0      50          :::34442                   :::*                   users:(("java",pid=2984,fd=18))
LISTEN     0      50       ::ffff:192.168.15.133:3888                    :::*                   users:(("java",pid=2984,fd=26))
LISTEN     0      50          :::2181                    :::*                   users:(("java",pid=2984,fd=25))###查看zookeeper服務狀態
   
root@localhost ~]# /data/zk/bin/zkServer.sh status
    ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfgMode: follower

 

zookeeper相關命令說明,參考https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html (文末有說明);

2.3 安裝kafka 

tar zxf kafka_2.11-0.11.0.0.tgz
mv kafka_2.11-0.11.0.0 /data/kafka

 修改配置 

[root@localhost ~]# grep -Ev "^#|^$" /data/kafka/config/server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://192.168.15.131:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

 

 

提示:其餘主機將該機器的kafka目錄拷貝便可,而後須要修改broker.id、listeners地址。有關kafka配置文件參數,參考:http://orchome.com/12;

啓動服務 

/data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties

 

 

驗證服務 

### 隨便在其中一臺主機執行
/data/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181 --replication-factor 1 --partitions 1 --topic test
 
###在其餘主機查看
/data/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181

 

參考:Kafka基本原理

參考:Kafka 設計與原理詳解

相關文章
相關標籤/搜索