Kafka筆記4(消費者)

消費者和消費羣組:apache

  Kafka消費者從屬於消費者羣組,一個羣組裏的消費者訂閱的是同一個主題,每一個消費者接收主題的一部分分區消息服務器

  消費者的數量不要超過主題分區的數量,多餘的消費者只會被閒置session

  一個主題能夠被多個消費羣組使用,消費者羣組之間互不影響異步

  

  當一個消費者加入羣組時,他讀取的數據是本來由其餘消費者讀取的信息函數

  分區的全部權從一個消費者轉移至另外一個消費者的行爲稱爲「再均衡」fetch

  再均衡期間,消費者當前的讀取狀態會丟失,消費者沒法讀取信息,形成集羣一小段時間的不可用,在恢復狀態以前會拖慢應用程序線程

  消費者經過向羣組協調器broker發送心跳維持他們和羣組的從屬關係以及他們對分區的全部權關係,若是broker認爲消費者死亡會觸發再均衡行爲blog

 分配分區過程:接口

  當消費者加入羣組時,他會向羣組協調器發送一個JoinGroup請求,第一個加入羣組的消費者稱爲羣主,羣主從協調器那裏得到羣組的成員列表,並負責給每個消費者分配分區。他使用一個實現PartitionAssignor接口的類來決定哪些分區應該被分配給消費者,分配完畢以後,羣主把分配狀況列表發送給broker,broker再把這些信息發送給全部消費者,每一個消費者只能看到本身的分配信息,只有羣主知道羣組的全部消費者的分配信息字符串

  消息輪詢是消費者API核心,經過從一個簡單的輪詢向服務器請求數據,一旦消費者訂閱了主題,輪詢就會處理全部細節,包括羣組協調/分區再均衡/發送心跳/獲取數據

  一個消費者使用一個線程

  

消費者重要的屬性參數配置:

  fetch.min.bytes

    指定了消費者從服務器獲取記錄的最小字節數,若是broker收到消費者請求,但數據可用量小於fetch.min.bytes,就會等到有足夠的可用數據才把它返回給消費者

  fetch.max.wait.ms

    指定broker等待時間,默認500ms  

  max.partition.fetch.bytes

    指定服務器從每一個分區裏返回給消費者的最大字節數,默認1MB    max.partition.fetch.size的值必須比broker能接收的最大消息字節數(max.message.size)大

  session.timeout.ms

    指定消費者在被認爲死亡以前能夠與服務器斷開鏈接的時間,默認3S

    heartbeat.interval.ms =  session.timeout.ms / 3

  auto.offset.reset

    指定消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下該如何處理

    =latest  消費者從最新的記錄開始讀取數據

    =earliest  消費者從起始位置讀取分區記錄

  enable.auto.commit

    指定消費者是否自動提交偏移量,默認true

      auto.commit.interval.ms 控制提交頻率

  partition.assignment.strategy

    =org.apache.kafka.clients.consumer.RangeAssignor 把主題的若干連續分區分配給消費者

    =org.apache.kafka.clients.consumer.RoundRobinAssignor  把主題的全部分區逐個分配給消費者

  client.id  

    任意字符串,broker用來標識從客戶端發送來的消息

  max.poll.records

    用於控制單次調用call() 方法返回的記錄數量,能夠幫你控制在輪詢裏須要處理的數據量

  receive.buffer.bytes 和 send.buffer.bytes

    默認-1   

 

更新分區當前位置的操做叫提交

  消費者會向一個叫作 _consumer_offset 的特殊主題發送消息,消息裏包含了每一個分區的偏移量

  

 

    

  Kafka能夠設置消費者自動提交偏移量,設置enable.auto.commit=true,提交時間間隔auto.commit.interval.ms=5s

  自動提交是在輪詢裏進行的,消費者每次輪詢時會檢查是否該提交偏移量了,是則提交上一次輪詢返回的偏移量

  提交當前偏移量,使用API函數 commitSync()

  異步提交偏移量,使用API函數commitAsync()

    可使用一個單調遞增的序列號來維護異步提交順序

相關文章
相關標籤/搜索