kafka 0.9 重寫消費者設計

Rebalance是將一個group訂閱的topic的一組分區分配給該group組內的consumer實例,使每一個實例都擁有一個獨立的互斥的分區。即:一個topic的分區只能被group的一個consumer實例消費,再rebalance以前不能再被group的其餘consumer消費。
在一個消費group Reblance成功以後,全部被訂閱主題的每個分區在其Group內都有且只有一個consumer實例去消費。
Rebalance的工做方式以下
每一個broker都當選爲消費組(consumer group)中某個consumer的協調者(co-ordinator);當一個group中的consumer成員個數變化或者其訂閱的topics的分區變化時,做爲這個group協調者的broker負責編排Reblance運算。他還負責在參與Reblance運算的全部consumer和分區之間通訊。ios

消費者(Consumer)
一、當消費者啓動或者協調者故障時,consumer會發送一個ConsumerMetadataRequest給 bootstrap.brokers配置的列表中的任何一個broker。
二、而後消費者鏈接協調者而且發送一個心跳請求(HeartbeatRequest),若是返回的HeartbeatResponse中錯誤碼爲IllegalGeneration。這表示協調者啓動了Reblance。而後消費者中止獲取數據,提交offsets而且發送一個JoinGroupRequest 請求給協調者所在broker,在返回的JoinGroupResponse中,它收到它有全消費的topic的分區列表,和所在group的新的Id。此時,消費者開始讀取數據而且提交分區的offsets。redis

三、若是在HeartbeatResponse中沒有錯誤,消費者就繼續從它最後擁有權限的分區中獲取數據,而不會被中斷。bootstrap

協調者(Co-ordinator)session

一、在穩定狀態下,協調者經過它的故障檢測協議跟蹤每一個group中每一個消費者的健康情況。
二、在選舉結束或者啓動時,協調者會從zookeeper中讀取它管理的group列表和它們的成員信息。若是一個group以前沒有任何成員信息,它不作任何事情,直到某個組的第一個consumer使用它進行註冊。socket

三、在協調者沒有加載完它負責的全部group的成員信息以前,當consumer發送HearbeatRequests, OffsetCommitRequests 和JoinGroupRequests請求時,會返回CoordinatorStartupNotComplete錯誤碼,而後consumer將會在以後進行從新請求。ide

四、選舉工做結束後或啓動時,協調者啓動了group中的全部消費者故障檢測。被標記爲死亡的消費者會被從group中移除,而且會觸發Rebalance操做。fetch

五、平衡是經過在HeartbeatResponse返回IllegalGeneration錯誤代碼觸發。一旦全部活着的消費者經過協調者發送JoinGroupRequests請求要求從新註冊,那麼它在JoinGroupResponse中告訴每一個consumer他們擁有的新的分區來完成Rebalance操做ui

六、消費者還監聽全部有consumer消費的topics的分區變化。若是檢測到任何一個topic有新的分區,都會出發Reblance操做。this

Failure detection protocolatom

消費者在向協調者請求加入消費group時能夠在JoinGroupRequest中指定session的超市時間。當一個消費者成功加入group,故障檢測程序在消費者和協調者上啓動。消費者會按期的發起心跳請求(HeartbeatRequest),每session.timeout.ms/heartbeat.frequency發送一次心跳並等待返回。若是協調者在session.timeout.ms期間沒有收到來自消費者的心跳請求,就會標記這個消費者死亡。一樣的,若是消費在沒有在session.timeout.ms時間內收到HeartbeatResponse迴應,它就會假設協調者已經掛了,而且開始co-ordinator rediscovery過程。heartbeat.frequency是配置在consumer端的。heartbeat.frequency要設置比較高的值,特別是若是session.timeout.ms也設置的比高大的狀況下。可是,應注意heartbeat.frequency不要設置的過高。

接收ConsumerMetadataResponse或JoinGroupResponse以後,消費者按期發送一個HeartbeatRequest到協調者(everysession.timeout.ms/heartbeat.frequency毫秒)。
收到請求的心跳後,協調者檢查生成編號,消費者ID和消費group。若是消費者指定了無效或失效代Id,在HeartbeatResponse中會返回IllegalGeneration錯誤代碼。
若是協調者未在session.timeout.ms內接收到任何一次來自消費者心跳請求,它標誌着消費者已死並觸發組的Rebalance。
若是消費者沒有在 session.timeout.ms實踐內收到協調者的HeartbeatResponse,或發現socket被關閉,它認爲協調者已經掛掉了並觸發從新發現協調者的過程。

注意,在協調者故障轉移時,消費者可能會發現新的協調者,這個新的協調者可能尚未完成故障轉或者已經完成故障轉移,包括從ZK中服務消費羣(consumer group )的元數據。在後一種狀況下,協調者只能接受正常的心跳請求,前一種狀況協調者可能會拒絕請求,致使消費者會從新執行發現過程而且從新鏈接,這比較正常。然而,若是消費者太晚去鏈接一個新的協調者,協調者會編輯這個消費者死亡,而且觸發Rebalance操做。
狀態圖State diagram

消費者

Here is a description of the states -
Down - The consumer process is down

在啓動了和尋找協調者狀態,消費者爲所在消費組尋找協調者。一旦它發現了協調了消費者就會發送JoinGroupRequest(沒有消費ID)。若是有消費者在同組指定的分區分配策略有衝突,那麼JoinGroupRequest請求就會收到InconsistentPartitioningStrategy錯誤代碼。若是在JoinGroupRequest請求中指定的分區分配策略(PartitionAssignmentStrategy )Broker沒法識別,則會返回UnknownPartitioningStrategy錯誤代碼。在這些狀況下,消費者沒法加入消費組。

在已經加入組這個狀態下,若是消費者收到的JoinGroupResponse沒有錯誤代碼,而有consumer id和消費組的generation id,那麼消費者就會成爲組的一個成員。在這種狀態下,消費者發送心跳請求,根據收到的錯誤代碼,消費者將保持這種狀態,或者被協調者標記死亡中止消費,或者發起從新發現協調者過程。

在從新尋找協調者狀態。消費者不會中止消費,可是會試圖經過發送ConsumerMetadataRequest 請求去發起重尋協調者的過程,而且會一直等待一個沒有錯誤代碼的響應。

中止消費狀態。消費者中止消費而且提交offsets,直到它再次從新加入消費組。

圖片描述

協調者

掛機(Down)--協調者死亡或者降級。
Catch up--協調者被選舉可是還沒準備好接受請求。
Ready--新當選的協調者完成加載它負責的全部消費組的元數據。
Prepare for rebalance--對於消費組內的全部消費者的心跳請求,協調者在心跳響應(HeartbeatResponse )中發送IllegalGeneration錯誤代碼,而且等待消費者發送一個JoinGroupRequest請求。
Rebalancing--協調者從消費者那裏收到一個JoinGroupRequest請求,從新生成一個組generation id,指定消費ID( consumer ids)而且分配消費分區。
穩定(Steady)--協調者從每個消費組的全部消費者那裏接收OffsetCommitRequests 請求和心跳請求。

圖片描述

消費者ID分配

啓動後,消費者在從協調者那裏收到的第一個JoinGroupResponse中獲得它的Consumer Id。從這時起,消費者每次在發送HeartbeatRequest 和OffsetCommitRequest時請求中必須包含consumer id。若是協調者收到HeartbeatRequest或者OffsetCommitRequest中的Consumer Id和消費組中的任何一個都不同,則會在該響應中發送一個UnknownConsumer錯誤代碼。

協調者在Rebalance成功時會分配一個consumer id給消費者,並在JoinGroupResponse中返回。消費者能夠選擇在之後每次JoinGroupRequest中包含這個consumer id直到關機或死亡,這樣作的好處是在進行rebalance操做時會有更低的延遲。當rebalance被觸發,協調者會等待前一輪全部的消費者發送JoinGroupRequest,肯定消費者是經過consumer id的方式。若是消費者選擇發送JoinGroupRequest不包含consumer id,協調者會徹底在配置的session.timeout.ms的時間中等待,纔會進行從新rebalance操做。它這麼作的緣由是沒法印證一個不存在consumer id的JoinGroupRequest的消費者來源,這使得rebalance有個高(視session.timeout.ms)的延遲。在另外一方面,若是消費者在以後每次JoinGroupRequests中都包含了consumer id,協調者能當即識別消費者,而且一旦全部已知的消費者都發送了JoinGroupRequest就會馬上發起rebalance操做。

在收到消費組中全部存在的消費者的JoinGroupRequest請求以後,協調者開始分配consumer id。假設,消費者是新啓動的或者是選擇不發送以前非配的consumer ID,在這個時候,它給在每一個在JoinGroupRequest中沒有consumer id的消費者指定一個新的Id<group>-<uuid>。

若是在JoinGroupRequest 中指定的consumer id和組內成員全部的id都不匹配。協調者會在JoinGroupResponse 響應中發送UnknownConsumer 錯誤代碼,而且拒絕消費者加入消費組。這不會致使組內其餘消費者的rebalance操做,可是也不會容許這樣的消費者加入現有組。

請求格式

對於每個消費組,協調者存儲一下信息:
1)對於每一個消費組,元數據包括:

組訂閱的topic列表
      組的配置,包括session timeout等。

組中每個消費者的元數據,包括hostname和consumer id。
每個topic 分區消費的當前的offsets。
分區全部權元數據,包括消費者-分配-分區( consumer-assigned-partitions )映射。
2)對於每一個主題,當前訂閱它的消費組的列表。

It is assumed that all the following requests and responses have the common header. So the fields mentioned exclude the ones in the header.
ConsumerMetadataRequest

{
  GroupId                => String
}

ConsumerMetadataResponse

{
  ErrorCode              => int16
  Coordinator            => Broker
}

JoinGroupRequest

{
  GroupId                     => String
  SessionTimeout              => int32
  Topics                      => [String]
  ConsumerId                  => String
  PartitionAssignmentStrategy => String
 }

JoinGroupResponse

{
  ErrorCode              => int16
  GroupGenerationId      => int32
  ConsumerId             => String
  PartitionsToOwn        => [TopicName [Partition]]
}
TopicName => String
Partition => int32

HeartbeatRequest

{
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
}

HeartbeatResponse

{
  ErrorCode              => int16
}

OffsetCommitRequest (v1)

OffsetCommitRequest => ConsumerGroup GroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
  ConsumerGroup => string
  GroupGenerationId => int32
  ConsumerId => String
  TopicName => string
  Partition => int32
  Offset => int64

TimeStamp => int64
Metadata => string

配置

服務端配置

This list is still in progress
max.session.timeout - 任何組的session timeout不該高於此值以減小對broker的開銷。若是高於此值, the broker將會在JoinGroupResponse中返回SessionTimeoutTooHigh錯誤碼。
partition.assignment.strategies - 都好分割的分區策略,用戶可自定義,也可使用kafka提供的默認策略。當消費者在JoinGroupRequest指定了分區策略。它必須使用該配置列表中存在的策略,不然將收到一個UnknownPartitionAssignmentStrategyException異常。

通配符訂閱

基於通配符訂閱(好比,白名單和黑名單),消費者有責任經過topic元數據請求檢測匹配的topics。也就是說,它的topic metadata request會包含一個空的topic列表,它的響應會返回全部主題的分區信息,它會過濾topics選出和通配符匹配的主題,而後使用subscribe()接口更新訂閱列表。一樣,若是訂閱列表改變了,它將會觸發Reblance
考慮下有趣的場景Interesting scenarios to consider

協調者故障或者協調者斷開了鏈接

協調者故障,控制器自動爲因爲協調者故障而影響的消費組選取新的leader。協調者從zookeeper讀取它負責的消費組的元數據。包括consumer ids,the generation id和訂閱的主題列表。知道協調者從zookeeper中讀取了全部元數據,它纔會在HeartbeatResponse中返回CoordinatorStartupNotComplete錯誤碼。在這段時間內消費者的JoinGroupRequest都是非法的。

假如在broker經過UpdateMetadataRequest請求從控制器(zk?)更新消費組元數據以前,一個消費者向broker發送一個ConsumerMetadataRequest請求。ConsumerMetadataResponse將會返回一個過期的協調者信息,消費者在發送心跳,或者提交offset時將會受到一個NotCoordinatorForGroup錯誤代碼。在收到NotCoordinatorForGroup錯誤代碼後,消費者回滾而且從新發送ConsumerMetadataRequest請求。

在協調者故障或者重尋期間,消費者不會中止讀取數據。

訂閱的主題分區變化

組的協調者發現它訂閱的主題的分區發生了變化。
協調者標記組準備Rebalance並在HeartbeatResponse中發送IllegalGeneration錯誤代碼,而後消費者中止獲取數據,提交offsets而且向協調者發送一個JoinGroupRequest。

協調者等待組內全部的消費者發送JoinGroupRequest。一旦收到全部預期的JoinGroupRequests,它在ZK中的組generation id增長。計算新分區分配,而且在JoinGroupResponse中返回更新後的分區分配和一個新的generation id。請注意:即便組內消費者成員沒有改變generation id也會遞增。

在收到JoinGroupResponse後,消費者在本地存儲新的 generation id和consumer id而且開始從返回的分區列表中獲取數據。在後面的JoinGroupResponse請求中將會使用新的generation id和consumer id。

rebalance期間的Offset提交

若是消費者受到IllegalGeneration錯誤代碼,就中止獲取數據而且在發送JoinGroupRequest以前提交offsets。

協調者會檢查OffsetCommitRequest中的generation id,若是比協調者中的generation id高的時候會拒絕請求。這意味着在消費者代碼中有bug。

協調者也不容許在提交offset請求時候的generation id比當前組在ZK中存儲的generation id(身份認證)舊。在rebalance期間這個約束不是一個問題,從第一個消費者發送JoinGroupRequest到最後一個,協調者不會對組的generation id進行自增,今後時直到協調者返回JoinGroupResponse,它都不會從當前generation的組中的任何消費者那裏接受OffsetCommitRequests請求。因此消費者發送的OffsetCommitRequest中的generation id應該和當前協調者中的匹配。

另一種值得一說的狀況是當消費者在rebalance期間有一個軟錯誤,好比長時間的GC。若是一個消費者停頓的時間超過了session.timeout.ms配置,協調者就不會在session.timeout.ms時間內收到消費者的JoinGroupRequest請求。協調者標記這個消費者死亡,而且根據在新一輪(new generation)發送JoinGroupRequest的消費者完成rebalance操做。

Heartbeats during rebalance

Consumer periodically sends a HeartbeatRequest to the coordinator every session.timeout.ms/hearbeat.frequency milliseconds. If the consumer receives the IllegalGeneration error code in the HeartbeatResponse, it stops fetching, commits offsets and sends a JoinGroupRequest to the co-ordinator. Until the consumer receives a JoinGroupResponse, it does not send any more HearbeatRequests to the co-ordinator.
A higher heartbeat.frequency ensures lower latency on a rebalance operation since the co-ordinator notifies a consumer of the need to rebalance only on a HeartbeatResponse.
The co-ordinator pauses failure detection for a consumer that has sent a JoinGroupRequest until a JoinGroupResponse is sent out. It restarts the hearbeat timer once the JoinGroupResponse is sent out and marks a consumer dead if it does not receive a HeartbeatRequest from that time for another session.timeout.ms milliseconds.The reason the co-ordinator stops depending on heartbeats to detect failures during a rebalance is due to a design decision on the broker's socket server - Kafka only allows the broker to read and process one outstanding request per client. This is done to make it simpler to reason about ordering. This prevents the consumer and broker from processing a heartbeat request at the same time as a join group request for the same client. Marking failures based on JoinGroupRequest prevents the co-ordinator from marking a consumer dead during a rebalance operation. Note that this does not prevent the rebalance operation from finishing if a consumer goes through a soft failure during a rebalance operation. If the consumer pauses before it sends a JoinGroupRequest, the co-ordinator will mark it dead during the rebalance and complete the rebalance operation by including the rest of the consumers in the new generation. If a consumer pauses after it sends a JoinGroupRequest, the co-ordinator will send it the JoinGroupResponse assuming the rebalance completed successfully and will restart it's heartbeat timer. If the consumer resumes before session.timeout.ms, consumption starts normally. If the consumer pauses for session.timeout.ms after that, then it is marked dead by the co-ordinator and it will trigger a rebalance operation.
The co-ordinator returns the new generation id and consumer id only in the JoinGroupResponse. Once the consumer receives a JoinGroupResponse, it sends the next HeartbeatRequest with the new generation id and consumer id.
Co-ordinator failure during rebalance

一個rebalance操做要通過幾個階段
一、協調者接收rebalance的通知。不管是ZK監聽到一個主題/分區改變,或者註冊一個新的消費者又或是一個消費者死亡。
二、協調者在HeartbeatResponse中返回IllegalGeneration錯誤碼來初始化rebalance操做。
三、消費者發送JoinGroupRequest。
四、協調者在ZK中自增組的generation id,而且告訴ZK分區的新全部權。
五、協調者返回JoinGroupResponse。

協調者可能在上面任何一步失敗:
If the co-ordinator fails at step #1 after receiving a notification but not getting a chance to act on it, the new co-ordinator has to be able to detect the need for a rebalance operation on completing the failover. As part of failover, the co-ordinator reads a group's metadata from zookeeper, including the list of topics the group has subscribed to and the previous partition ownership decision. If the # of topics or # of partitions for the subscribed topics are different from the ones in the previous partition ownership decision, the new co-ordinator detects the need for a rebalance and initiates one for the group. Similarly if the consumers that connect to the new co-ordinator are different from the ones in the group's generation metadata in zookeeper, it initiates a rebalance for the group.
If the co-ordinator fails at step #2, it might send a HeartbeatResponse with the error code to some consumers but not all. Similar to failure #1 above, the co-ordinator will detect the need for rebalance after failover and initiate a rebalance again. If a rebalance was initiated due to a consumer failure and the consumer recovers before the co-ordinator failover completes, the co-ordinator will not initiate a rebalance. However, if any consumer (with an empty or unknown consumer id) sends it a JoinGroupRequest, it will initiate a rebalance for the entire group.
If a co-ordinator fails at step #3, it might receive JoinGroupRequests from only a subset of consumers in the group. After failover, the co-ordinator might receive a HeartbeatRequest from all alive consumers OR JoinGroupRequests from some. Similar to #1, it will trigger a rebalance for the group.
If a co-ordinator fails at step #4, it might fail after writing the new generation id and group membership in zookeeper. The generation id and membership information is written in one atomic zookeeper write operation. After failover, the consumer will send HeartbeatRequests to the new co-ordinator with an older generation id. The co-ordinator triggers a rebalance by returning an IllegalGeneration error code in the response that causes the consumer to send it a JoinGroupRequest. Note that this is the reason why it is worth sending both the generation id as well as the consumer id in the HeartbeatRequest and OffsetCommitRequest
If a co-ordinator fails at step #5, it might send the JoinGroupResponse to only a subset of the consumers in a group. A consumer that received a JoinGroupResponse will detect the failed co-ordinator while sending a heartbeat or committing offsets. At this point, it will discover the new co-ordinator and send it a heartbeat with the new generation id. The co-ordinator will send it a HeartbeatResponse with no error code at this point. A consumer that did not receive a JoinGroupResponse will discover the new co-ordinator and send it a JoinGroupRequest. This will cause the co-ordinator to trigger a rebalance for the group.

慢消費者Slow consumers

若是沒有在session.timeout.ms時間內收到心跳請求,協調者能夠將慢消費者從組中移除。一般,若是消息處理比session.timeout.ms慢,就會成爲慢消費者。致使兩次poll()方法的調用間隔比session.timeout.ms時間長。因爲心跳只在 poll()調用時纔會發送,這就會致使協調者標記慢消費者死亡。下面是協調者如何處理一個慢消費者: 若是沒有在session.timeout.ms時間內收到心跳請求,協調者標記消費者死亡而且斷開和它的鏈接。同時,經過向組內其餘消費者的HeartbeatResponse中發送IllegalGeneration 錯誤代碼 觸發rebalance操做。若是慢消費者在協調者收到組內其餘任何消費者的HeartbeatRequest以前發送了心跳請求,它會取消Rebalance操做的意圖,並在HeartbeatResponses 中不會發送錯誤碼。若是不是,協調者依然會進行Rebalance操做。而且會給慢消費者也返回IllegalGeneration 錯誤碼。因爲協調者只能從活着的消費者那裏等待JoinGroupRequest請求,一旦受到其餘消費者的加入請求就會完成rebalance操做。若是慢消費者剛好也發送了JoinGroupRequest,協調者就會在當前一輪包含它,反之就不會包含這個慢消費者。若是協調者已經返回了JoinGroupResponse,它會在當前Rebalance完成以後,再觸發新的Rebalance操做。若是本輪須要很長時間,慢消費者的接收JoinGroupResponse超時,它就會發起重尋協調者,而且給協調者從新發送JoinGroupRequest。

相關文章
相關標籤/搜索