如何學習kafka?

  本文是我學習kafka的一個思路和總結,但願對剛接觸kafka的你有所幫助。在學習kafka以前,最好能對kafka有一個簡單的瞭解,能夠提出一些問題,帶着問題去學習,就會容易一些。java

0 什麼是kakfa1 kafka的版本2 kakfa中的術語3 Kafka消息模型4 kafka的結構5 使用kafka建立demo6 kakfa客戶端請求是如何被處理的7 kafka中的組件coordinatorcontroller8 位移提交與分區管理9 重平衡10 kakfa的參數(整理項,選讀)brokertopicproducerconsumer端bootstrap

0 什麼是kakfa

  kakfa是一個開源的消息引擎系統,提供了一組規範,企業能夠利用這組規範在不一樣的系統中,傳遞語義準確的消息。(通俗的來說,就是系統A發消息給消息引擎,系統B從消息引擎中讀取系統A發送的消息)。api

1 kafka的版本

  咱們學習開源框架,必定要有版本意識。至少要了解每一個版本有哪些大的改動,若是沒有版本意識,就很容易出現學習後,因爲使用的版本不一樣,致使各類錯誤,甚至是調用的api不能經過編譯。
  接下來我就對kafka版本的改動點作一個總結:
  0.7: 只提供了最基礎的消息隊列的功能
  0.8: 引入了副本的機制,保證了系統的可靠性,此時kafka使用的仍是老版本的客戶端api,須要指定zookeeper地址,0.8.2引入了新版本的producer,可是bug較多。
  0.9: 引入了kakfa connect組件,使用了java語言重寫了新版 consumer api,從這個版本開始,新版producer api相對穩定。
  0.10: 引入了kafka stream 使得kafka正式升級成爲了分佈式流處理平臺,這個版本的consumer api開始相對穩定,0.10.22修復了可能會下降producer性能的bug,若是你使用的是這個版本的kafka,建議升級爲0.10.22
  0.11: 引入了冪等性Producer 以及事務性,這個版本的kafka消息進行了重構,由v1升級成了v2,提升了壓縮的程度和效率。
  1.0/2.0:主要是對kafka Stream進行改進和優化 緩存

2 kakfa中的術語

  消息(record)、主題(topic)、分區(Partition)、位移(offset)、副本(replica)、生產者(producer)、消費者(consumer)、消費者組(consumer group)、重平衡(rebalance) 服務器

3 Kafka消息模型

  點對點模型:也叫作消息隊列模型,即系統A發送的消息只能系統B去接收,相似於打電話。
  發佈/訂閱模型:這裏有一個主題的概念,能夠理解成消息的邏輯容器。消息的發佈者向主題發佈消息,訂閱者從它訂閱的主題中獲取消息。在這個過程當中發佈者和訂閱者均可能是多個,主題也能夠是多個,相似於訂報紙。網絡

4 kafka的結構


  注:topic只是一個邏輯容器,經常使用來區分不一樣的業務數據。

 

5 使用kafka建立demo

  kafka集羣的建立過程以及使用java調用kafka客戶端api的demo,請參考本博客前2篇文章,有很是詳細的教程。session

6 kakfa客戶端請求是如何被處理的

  kafka broker採用的React模型處理請求,React模式是事件驅動架構的一種實現方式,特被適用於處理多個客戶端併發向服務器端發送請求的場景。咱們經過一幅圖來對React模式有一個初步的瞭解。架構


  客戶端的請求發送到Reactor,Reactor中有一個dispatch線程,負責分發請求,它將請求分發到工做線程中,由工做線程進行處理。映射到kafka中, SocketServer就上述中的Reactor, acceptor線程就是上述中的dispatch線程,kafka工做線程也有一個專屬的名字,叫 網絡線程池。可是kafka在此基礎上,進行了進一步的細化,接收用戶請求的線程(網絡線程池中的線程)不對用戶請求進行處理,而是將請求放到一個 共享請求隊列中,由 IO線程池中的線程進行處理。

  上圖中的purgatory組件用於緩存延時請求,好比咱們配置了acks = all,當ISR中的其餘副本尚未寫入結果的時候,這個響應就會緩存在purgatory中,直到條件知足,IO線程纔會將結果返回到網絡響應隊列中。
  咱們經過上圖能夠知道,若是想提升kafka處理消息的能力,能夠提升網絡線程數和io線程數,這兩個線程數分別對應着 num.network.threadsnum.io.threads兩個參數。

 

7 kafka中的組件

coordinator

  管理消費者組,與位移提交有關。
  coordinator是每一個broker都有的組件,那麼如何肯定coordinator呢?
  肯定位移主題是由哪一個分區保存的:Math.abs(group.id % offsetTopicParitionCount);
  找出該分區leader對應的broker。併發

controller

  主題、分區管理、集羣管理、數據緩存app

8 位移提交與分區管理

  consumer會按期向kakfa提交本身的消費位移(offset),kafkaConsumer api提供了多種提交位移的方式,就用戶而言,位移提交分爲手動提交和自動提交(auto.commit.offset.enable),就Consumer而言,提交位移分爲同步提交(commitSync)和異步提交(CommitAsync),consumer會將位移信息提交到__consumer_offset這個主題中。
  kafka中的副本(replica),是在分區(Partition)層面上進行的,能夠實現數據的冗餘(replica.factor)。在同一個分區中,數據是有序的,kafka支持對消息設置,同一個鍵的消息會被髮送到一個分區中,一個分區只能由一個Consumer進行消費。
  在kafka中只有leader 副本對客戶端提供服務,follower副本只是異步同步數據。不提供服務。

9 重平衡

  重平衡能夠說是kafka中最重要的概念了,重平衡的本質是一種協議,它規定了一個consumer group下的全部consumer如何達成一致,對訂閱topic的partition進行分配
  那麼爲什咱們要避免重平衡呢?重平衡的效率不高,在重平衡的過程當中,當前consumer group的全部consumer會中止消費(能夠類比java中的full gc),重平衡的影響範圍較廣,consumer group下的全部comsumer都會受到影響。
  重平衡的實現,須要藉助Coordinator組件,消費者端的重平衡能夠分爲兩步:
  1.Consumer入組
  2.等待Leader consumer分配方案,分別對應着joinGroup和syncGroup兩類請求。
  首先當consumer加入consumer group的時候,會向coordinator發送join Group請求,這樣coordinator就知道了全部訂閱主題的消費者信息。一旦收集了全部consumer的信息,協調者就會選出一個comsumer leader。
  領導者消費者的任務就是收集全部成員的訂閱信息,而後根據信息,制定分區方案。
  最後領導者消費者會將分配方案發送給協調者,其餘消費者也會發送請求給協調者,不過請求中沒有實際的內容。協調者會以響應的方式將方案返回給全部消費者,這樣消費者組內的成員就能夠知道本身消費的分區了。

10 kakfa的參數(整理項,選讀)

  總結的都是我的認爲比較重要的參數,可是篇幅有限,很難展開說明,其目的就是讓你有一個印象,下次見到這些參數的時候,要重點記憶

broker

  log.dirs:指定了broker使用的若干文件路徑。
  listeners:監聽器,告訴外部鏈接者經過什麼協議訪問指定名稱和端口的kafka服務
  advertised.listeners:和listeners功能一致,不一樣的是它是對外部發布的
  auto.create.topics.enable:是否容許自動建立topic
  unclean.leader.election.enable:是否容許unclean的leader進行選舉
  auto.leader.rebalance.enabl:是否容許按期舉行leader選舉
  log.retention.{hours|minutes|ms}:消息被保存的時間
  message.max.bytes:broker最大能接收消息的大小
  replica.lag.time.max.ms:follower副本能夠落後leader副本的最長時間間隔。

topic

  retention.ms:該topic中消息被保存的時間
  max.message.bytes:該topic最大能接收的消息大小
  replication.factor:消息冗餘份數

producer

  bootstrap.server:用於與kafka集羣中的broker建立鏈接,只須要配置部分broker便可
  key.serializer/value.serializer:鍵/值的序列化方式,必填
  acks:很是重要,用於控制producer產生消息的持久性,kafka只對「已提交」的消息作有有限度的持久化保證。而這個參數,就是定義「已提交」
  acks = 0:producer不用理睬broker端的處理結果,只要消息發送後,就認爲是「已提交」
  acks = all或-1,不只leader broker要將消息寫入本地日誌,還要ISR集合中的全部副本都成功寫入各自的本地日誌後,才發送響應消息給producer,認爲消息「已提交」
  acks = 1折中方案,當leader broker將消息寫入本地日誌,就返回響應給producer,認爲消息「已提交」
  min.insync.replica:消息至少要被寫入多少個副本,纔算寫入成功,這個參數和acks功能相似,不過它強調的是,acks = all時,強調的是全部副本。(好比ISR中只有一個replica,那麼配置acks = 1即寫入一個broker便可,min.sync.replica = 2,即須要寫入2個broker,這條消息會寫入失敗)
  buffer.memory:指定了producer端用於緩存消息的緩衝區大小。kafka發送消息採用的是異步架構的方式,消息寫入緩衝區,由一個專屬線程從緩衝區中獲取消息,執行真正的發送
  compression.type:producer端壓縮方式,壓縮能夠節省網絡傳輸中的帶寬,犧牲CPU使用率
  retries:失敗後的重試次數,很是重要的參數,用於實現kafka的處理語義
  retries = 0,即失敗後不會進行重試,實現至多一次的處理語義
  retries = n,即失敗後會重試n次,實現至少一次的處理語義
  (kafka 0.11後推出了精確一次的處理語義,即冪等性producer以及事務,相關參數:enable.idempotence = true)
  bacth.size:producer會將發往同一分區的消息,打成一個batch,當batch滿了後,producer會一次發送batch中的全部消息,這個參數控制者batch的大小。
  linger.ms:上面提到的batch,在batch沒滿的時候,也會進行發送,這實際上是一種權衡,權衡的是吞吐量消息延時,linger.ms控制的就是消息的延時行爲,默認值是0,表示消息會被當即發送,無論batch是否裝滿,咱們能夠改變這個參數,來修改發送消息的時間,即一條消息是否會被髮送,取決於一、batch是否裝滿;二、有沒有達到linger.ms規定的時間。
  max.request.size:控制producer端最大能夠發送消息的大小。
  request.timeout.ms:當producer發送消息給broker後,broker須要在指定時間內返回響應,不然producer就會認爲該請求超時,並顯示拋出TimeoutException。

consumer端

  bootstrap.server:用於與kafka集羣中的broker建立鏈接,只須要配置部分broker便可
  key.deserializer/value.deserializer:鍵/值的反序列化方式,必填
  group.id:consumer group的名字,必填
  session.time.out:coordinator檢測到consumer失活的時間,這個值設置的較小有利於coordinator更快的檢測consumer失活。
  max.poll.interval.ms:consumer處理邏輯的最大時間,若是一次poll()超過了這個時間,則coordinator會認爲該consumer已經不可用,會將其踢出消費者組並進行重平衡。
  auto.offset.reset:制定了無位移或位移越界時,kafka的對應策略。取值:earliest:從最先位移進行消費、laster:從最新位移開始消費、none:拋出異常。
  auto.commit.enable:指定consumer是否自定義提交位移
  fetch.max.bytes:consumer端單次獲取數據的最大字節數
  max.poll.records:單次poll()返回的最大消息數
  heartbeat.interval.ms:心跳請求頻率
  connections.max.idle.ms:按期關閉空閒的tcp鏈接。

  最後,期待您的訂閱和點贊,專欄每週都會更新,但願能夠和您一塊兒進步,同時也期待您的批評與指正!

imageimage
相關文章
相關標籤/搜索