初探kafka

    平常中工做中我並無對kafka接觸不少,但瞭解到不少的框架都和kafka有着緊密的關係。好比rockmetmq是參考了kafka的設計,neflix的緩存組件ehcache是用kafka作數據的同步。同時kafka在大數據方面一般和spark,hadoop,storm一塊兒使用,因此我對kafka也產生了一些興趣,抽了些時間去研究了一下這個框架。由於尚未深刻的研究和使用,因此只能算是初探~。html

    kafka架構

         

 

 

 左邊是kafka,右邊是rocketmq。kafka的架構如上所示,與rocketmq很類似。不一樣的是rocketmq用的是namesrv,而kafka用的是zookeeper。zookeeper在kafka的做用是起到一個動態註冊發現與負載均衡的做用。linux

  zookeeper與kafka

    一、broker註冊

    kafka使用了全局惟一的數字來指代每一個Broker服務器,不一樣的Broker必須使用不一樣的Broker ID進行註冊,建立完節點後,每一個Broker就會將本身的IP地址和端口信息記錄到該節點中去。其中,Broker建立的節點類型是臨時節點,一旦Broker宕機,則對應的臨時節點也會被自動刪除。好比我本地起了三個Broker,查看zookeepr /brokers/ids的目錄就看到如下內容git

ls /brokers/ids

[0, 1, 2]

get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.66.51.58:9093"],"jmx_port":-1,"host":"10.66.51.58","timestamp":"1532936089303","port":9093,"version":4}
cZxid = 0x14b
ctime = Mon Jul 30 15:34:49 CST 2018
mZxid = 0x14b
mtime = Mon Jul 30 15:34:49 CST 2018
pZxid = 0x14b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x164ea1258570003
dataLength = 192
numChildren = 0

  二、topic註冊

  在kafka中,同一個Topic的消息會被分紅多個分區並將其分佈在多個Broker上,這些分區信息及與Broker的對應關係也都是由Zookeeper在維護,由專門的節點/broker/topics來記錄。Broker服務器啓動後,會到對應Topic節點(/brokers/topics)上註冊本身的Broker ID並寫入針對該Topic的分區信息。好比Topic "test3"建立時指定了三個分區。查看zookeeper內容則知道"2"分區放在Broker-0,"1"分區放在Broker-2,"0"分區放在Broker-1github

ls /brokers/topics
[connect-test, test3, test, my-replicated-topic, __consumer_offsets]

get /brokers/topics/test3
{"version":1,"partitions":{"2":[0],"1":[2],"0":[1]}}
cZxid = 0x154
ctime = Mon Jul 30 16:11:30 CST 2018
mZxid = 0x154
mtime = Mon Jul 30 16:11:30 CST 2018
pZxid = 0x158
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 52
numChildren = 1

 

   三、生產者負載均衡

   因爲同一個Topic消息會被分區並將其分佈在多個Broker上,所以,生產者須要將消息合理地發送到這些分佈式的Broker上,那麼如何實現生產者的負載均衡,Kafka支持傳統的四層負載均衡,也支持Zookeeper方式實現負載均衡。apache

   (1) 四層負載均衡,根據生產者的IP地址和端口來爲其肯定一個相關聯的Broker。一般,一個生產者只會對應單個Broker,而後該生產者產生的消息都發往該Broker。這種方式邏輯簡單,每一個生產者不須要同其餘系統創建額外的TCP鏈接,只須要和Broker維護單個TCP鏈接便可。可是,其沒法作到真正的負載均衡,由於實際系統中的每一個生產者產生的消息量及每一個Broker的消息存儲量都是不同的,若是有些生產者產生的消息遠多於其餘生產者的話,那麼會致使不一樣的Broker接收到的消息總數差別巨大,同時,生產者也沒法實時感知到Broker的新增和刪除。緩存

   (2) 使用Zookeeper進行負載均衡,因爲每一個Broker啓動時,都會完成Broker註冊過程,生產者會經過該節點的變化來動態地感知到Broker服務器列表的變動,這樣就能夠實現動態的負載均衡機制。服務器

   四、消費者負載均衡

   與生產者相似,Kafka中的消費者一樣須要進行負載均衡來實現多個消費者合理地從對應的Broker服務器上接收消息,每一個消費者分組包含若干消費者,每條消息都只會發送給分組中的一個消費者,不一樣的消費者分組消費本身特定的Topic下面的消息,互不干擾。網絡

   五、消費者offset的存儲

   消費者的offset我以前瞭解到是存在zookeeper上面,但下載最新版的沒在上面找到,後面瞭解到是存到了broker上面~。數據結構


    消息的持久化

     Kafka 對消息的存儲和緩存嚴重依賴於文件系統,但kafka的性能卻遠超出了人們對磁盤IO的性能預估。主要緣由有:架構

      一、順序磁盤IO

      關於磁盤性能的關鍵事實是,磁盤的吞吐量和過去十年裏磁盤的尋址延遲不一樣。所以,使用6個7200rpm、SATA接口、RAID-5的磁盤陣列在JBOD配置下的順序寫 入的性能約爲600MB/秒,但隨機寫入的性能僅約爲100k/秒,相差6000倍以上。由於線性的讀取和寫入是磁盤使用模式中最有規律的,而且由操做系統進行了大量的優化。現代操做系統提供了 read-ahead 和 write-behind 技術,read-ahead 是以大的 data block 爲單位預先讀取數據,而 write-behind 是將多個小型的邏輯寫  合併成一次大型的物理磁盤寫入。關於該問題的進一步討論能夠參考 ACM Queue article,他們發現實際上順序磁盤訪問在某些狀況下比隨機內存訪問還要快

      二、pageCache而不是in-memory cache

      爲了提升性能,現代操做系統在愈來愈注重使用內存對磁盤進行 cache。現代操做系統主動將全部空閒內存用做 disk caching,代價是在內存回收時性能會有所下降。全部對磁盤的讀寫操做都會經過這個統一的 cache。若是不使用直接I/O,該功能不能輕易關閉。所以即便進程維護了 in-process cache,該數據也可能會被複制到操做系統的 pagecache 中,事實上全部內容都被存儲了兩份。

  此外,Kafka 創建在 JVM 之上,瞭解 Java 內存使用的人都知道兩點:

  1. 對象的內存開銷很是高,一般是所存儲的數據的兩倍(甚至更多)。
  2. 隨着堆中數據的增長,Java 的垃圾回收變得愈來愈複雜和緩慢。

        受這些因素影響,相比於維護 in-memory cache 或者其餘結構,使用文件系統和 pagecache 顯得更有優點--咱們能夠經過自動訪問全部空閒內存將可用緩存的容量至少翻倍,而且經過存儲緊湊的字節結構而不是獨立的對象,有望將緩存容量再翻一番。 這樣使得32GB的機器緩存容量能夠達到28-30GB,而且不會產生額外的 GC 負擔。此外,即便服務從新啓動,緩存依舊可用,而 in-process cache 則須要在內存中重建(重建一個10GB的緩存可能須要10分鐘),不然進程就要從 cold cache 的狀態開始(這意味着進程最初的性能表現十分糟糕)。 這同時也極大的簡化了代碼,由於全部保持 cache 和文件系統之間一致性的邏輯如今都被放到了 OS 中,這樣作比一次性的進程內緩存更準確、更高效。若是你的磁盤使用更傾向於順序讀取,那麼 read-ahead 能夠有效的使用每次從磁盤中讀取到的有用數據預先填充 cache。

      因此kafka給出了一個簡單的設計:相比於維護儘量多的 in-memory cache,而且在空間不足的時候匆忙將數據 flush 到文件系統,咱們把這個過程倒過來。全部數據一開始就被寫入到文件系統的持久化日誌中,而不用在 cache 空間不足的時候 flush 到磁盤。實際上,這代表數據被轉移到了內核的 pagecache 中。

     三、隊列存儲數據

      消息系統使用的持久化數據結構,BTree 是最通用的數據結構,能夠在消息系統可以支持各類事務性和非事務性語義。 雖然 BTree 的操做複雜度是 O(log N),但成本也至關高。一般咱們認爲 O(log N) 基本等同於常數時間,但這條在磁盤操做中不成立。磁盤尋址是每10ms一跳,而且每一個磁盤同時只能執行一次尋址,所以並行性受到了限制。 所以即便是少許的磁盤尋址也會很高的開銷。因爲存儲系統將很是快的cache操做和很是慢的物理磁盤操做混合在一塊兒,當數據隨着 fixed cache 增長時,能夠看到樹的性能一般是非線性的——好比數據翻倍時性能降低不僅兩倍。

      因此直觀來看,持久化隊列能夠創建在簡單的讀取和向文件後追加兩種操做之上,這和日誌解決方案相同。這種架構的優勢在於全部的操做複雜度都是O(1),並且讀操做不會阻塞寫操做,讀操做之間也不會互相影響。這有着明顯的性能優點,因爲性能和數據大小徹底分離開來——服務器如今能夠充分利用大量廉價、低轉速的1+TB SATA硬盤。 雖然這些硬盤的尋址性能不好,但他們在大規模讀寫方面的性能是能夠接受的,並且價格是原來的三分之1、容量是原來的三倍。

      在不產生任何性能損失的狀況下可以訪問幾乎無限的硬盤空間,這意味着咱們能夠提供一些其它消息系統不常見的特性。例如:在 Kafka 中,咱們可讓消息保留相對較長的一段時間(好比一週),而不是試圖在被消費後當即刪除。正如咱們後面將要提到的,這給消費者帶來了很大的靈活性。

   

   消息的傳輸

      解決了數據持久化的問題,還須要解決數據的發送和消費等相關傳輸問題。

      一、批量操做而不是屢次小IO

        一旦消除了磁盤訪問模式不佳的狀況,系統性能低下的主要緣由就剩下了兩個:大量的小型 I/O 操做,以及過多的字節拷貝。小型的 I/O 操做發生在客戶端和服務端之間以及服務端自身的持久化操做中。爲了不這種狀況,kafka的通信協議是創建在一個 「消息塊」 的抽象基礎上,合理將消息分組。 這使得網絡請求將多個消息打包成一組,而不是每次發送一條消息,從而使整組消息分擔網絡中往返的開銷。Consumer 每次獲取多個大型有序的消息塊,並由服務端 依次將消息塊一次加載到它的日誌中。

         這個簡單的優化對速度有着數量級的提高。批處理容許更大的網絡數據包,更大的順序讀寫磁盤操做,連續的內存塊等等,全部這些都使 KafKa 將隨機流消息順序寫入到磁盤, 再由 consumers 進行消費。

      二、sendfile避免過多的字節拷貝

          broker 維護的消息日誌自己就是一個文件目錄,每一個文件都由一系列以相同格式寫入到磁盤的消息集合組成,這種寫入格式被 producer 和 consumer 共用。保持這種通用格式能夠對一些很重要的操做進行優化: 持久化日誌塊的網絡傳輸。 現代的unix 操做系統提供了一個高度優化的編碼方式,用於將數據從 pagecache 轉移到 socket 網絡鏈接中;在 Linux 中系統調用 sendfile 作到這一點。

       爲了理解 sendfile 的意義,瞭解數據從文件到套接字的常見數據傳輸路徑就很是重要:

  1. 操做系統從磁盤讀取數據到內核空間的 pagecache
  2. 應用程序讀取內核空間的數據到用戶空間的緩衝區
  3. 應用程序將數據(用戶空間的緩衝區)寫回內核空間到套接字緩衝區(內核空間)
  4. 操做系統將數據從套接字緩衝區(內核空間)複製到經過網絡發送的 NIC 緩衝區

      這顯然是低效的,有四次 copy 操做和兩次系統調用。使用 sendfile 方法,能夠容許操做系統將數據從 pagecache 直接發送到網絡,這樣避免從新複製數據。因此這種優化方式,只須要最後一步的copy操做,將數據複製到 NIC 緩衝區。

咱們指望一個廣泛的應用場景,一個 topic 被多消費者消費。使用上面提交的 zero-copy(零拷貝)優化,數據在使用時只會被複制到 pagecache 中一次,節省了每次拷貝到用戶空間內存中,再從用戶空間進行讀取的消耗。這使得消息可以以接近網絡鏈接速度的 上限進行消費。

       pagecache 和 sendfile 的組合使用意味着,在一個kafka集羣中,大多數 consumer 消費時,您將看不到磁盤上的讀取活動,由於數據將徹底由緩存提供。

      三、壓縮數據

      在某些狀況下,數據傳輸的瓶頸不是 CPU ,也不是磁盤,而是網絡帶寬。對於須要經過廣域網在數據中心之間發送消息的數據管道尤爲如此。固然,用戶能夠在不須要 Kakfa 支持下一次一個的壓縮消息。可是這樣會形成很是差的壓縮比和消息重複類型的冗餘,好比 JSON 中的字段名稱或者是或 Web 日誌中的用戶代理或公共字符串值。高性能的壓縮是一次壓縮多個消息,而不是壓縮單個消息。

      Kafka 以高效的批處理格式支持一批消息能夠壓縮在一塊兒發送到服務器。這批消息將以壓縮格式寫入,而且在日誌中保持壓縮,只會在 consumer 消費時解壓縮。

      Kafka 支持 GZIP,Snappy 和 LZ4 壓縮協議       

     

     消息是推仍是拉?

        Kafka 在這方面採起了一種較爲傳統的設計方式,也是大多數的消息系統所共享的方式:即 producer 把數據 push 到 broker,而後 consumer 從 broker 中 pull 數據。 但也有一些 系統,好比 Scribe 和 Apache Flume,沿着一條徹底不一樣的 push-based 的路徑,將數據 push 到下游節點。這兩種方法都有優缺點。然而,因爲 broker 控制着數據傳輸速率, 因此 push-based 系統很難處理不一樣的 consumer。讓 broker 控制數據傳輸速率主要是爲了讓 consumer 可以以可能的最大速率消費;不幸的是,這致使着在 push-based 的系統中,當消費速率低於生產速率時,consumer 每每會不堪重負(本質上相似於拒絕服務攻擊)。pull-based 系統有一個很好的特性, 那就是當 consumer 速率落後於 producer 時,能夠在適當的時間遇上來。還能夠經過使用某種 backoff 協議來減小這種現象:即 consumer 能夠經過 backoff 表示它已經不堪重負了,然而經過得到負載狀況來充分使用 consumer(但永遠不超載)這一方式實現起來比它看起來更棘手。前面以這種方式構建系統的嘗試,引導着 Kafka 走向了更傳統的 pull 模型。

       另外一個 pull-based 系統的優勢在於:它能夠大批量生產要發送給 consumer 的數據。而 push-based 系統必須選擇當即發送請求或者積累更多的數據,而後在不知道下游的 consumer 可否當即處理它的狀況下發送這些數據。若是系統調整爲低延遲狀態,這就會致使一次只發送一條消息,以致於傳輸的數據再也不被緩衝,這種方式是極度浪費的。 而 pull-based 的設計修復了該問題,由於 consumer 老是將全部可用的(或者達到配置的最大長度)消息 pull 到 log 當前位置的後面,從而使得數據可以獲得最佳的處理而不會引入沒必要要的延遲。

     簡單的 pull-based 系統的不足之處在於:若是 broker 中沒有數據,consumer 可能會在一個緊密的循環中結束輪詢,實際上 busy-waiting 直到數據到來。爲了不 busy-waiting,咱們在 pull 請求中加入參數,使得 consumer 在一個「long pull」中阻塞等待,直到數據到來(還能夠選擇等待給定字節長度的數據來確保傳輸長度)。

 

    消息的offset

    大多數消息系統都在 broker 上保存被消費消息的元數據。也就是說,當消息被傳遞給 consumer,broker 要麼當即在本地記錄該事件,要麼等待 consumer 的確認後再記錄。這是一種至關直接的選擇,並且事實上對於單機服務器來講,也沒與其它地方可以存儲這些狀態信息。 因爲大多數消息系統用於存儲的數據結構規模都很小,因此這也是一個很實用的選擇——由於只要 broker 知道哪些消息被消費了,就能夠在本地當即進行刪除,一直保持較小的數據量。

    但要讓 broker 和 consumer 就被消費的數據保持一致性也不是一個小問題。若是 broker 在每條消息被髮送到網絡的時候,當即將其標記爲 consumed,那麼一旦 consumer 沒法處理該消息(可能由 consumer 崩潰或者請求超時或者其餘緣由致使),該消息就會丟失。 爲了解決消息丟失的問題,許多消息系統增長了確認機制:即當消息被髮送出去的時候,消息僅被標記爲sent 而不是 consumed;而後 broker 會等待一個來自 consumer 的特定確認,再將消息標記爲consumed。這個策略修復了消息丟失的問題,但也產生了新問題。 首先,若是 consumer 處理了消息但在發送確認以前出錯了,那麼該消息就會被消費兩次。第二個是關於性能的,如今 broker 必須爲每條消息保存多個狀態(首先對其加鎖,確保該消息只被發送一次,而後將其永久的標記爲 consumed,以便將其移除)。 還有更棘手的問題要處理,好比如何處理已經發送但一直得不到確認的消息。

      Kafka 使用徹底不一樣的方式解決消息丟失問題。Kafka的 topic 被分割成了一組徹底有序的 partition,其中每個 partition 在任意給定的時間內只能被每一個訂閱了這個 topic 的 consumer 組中的一個 consumer 消費。這意味着 partition 中 每個 consumer 的位置僅僅是一個數字,即下一條要消費的消息的offset。這使得被消費的消息的狀態信息至關少,每一個 partition 只須要一個數字。這個狀態信息還能夠做爲週期性的 checkpoint。這以很是低的代價實現了和消息確認機制等同的效果。

    這種方式還有一個附加的好處。consumer 能夠回退到以前的 offset 來再次消費以前的數據,這個操做違反了隊列的基本原則,但事實證實對大多數 consumer 來講這是一個必不可少的特性。 例如,若是 consumer 的代碼有 bug,而且在 bug 被發現前已經有一部分數據被消費了, 那麼 consumer 能夠在 bug 修復後經過回退到以前的 offset 來再次消費這些數據。

   

    以上就是我整理的一些關於kafka的資料,主要仍是集中在概念設計這一塊,不知道你們看了有沒有有所收穫呢。

相關文章
相關標籤/搜索