消費消息的請求(按序)
- org/apache/kafka/common/requests/RequestHeader
- org/apache/kafka/common/requests/ApiVersionsRequest
- org/apache/kafka/common/requests/MetadataRequest 批量查詢topic的元數據信息
- org/apache/kafka/common/requests/FindCoordinatorRequest 從拿到的topic的元數據中取出leader節點 做爲組協調者
- org/apache/kafka/common/requests/JoinGroupRequest
- org/apache/kafka/common/requests/SyncGroupRequest
- org/apache/kafka/common/requests/OffsetFetchRequest
- org/apache/kafka/common/requests/ListOffsetRequest
- org/apache/kafka/common/requests/FetchRequest
- org/apache/kafka/common/requests/HeartbeatRequest
請求接口文檔參考
響應接口文檔參考html
請求頭
java
name |
type |
defaultValue |
docString |
api_key |
INT16 |
null |
請求接口編號 |
api_version |
INT16 |
null |
api版本 |
correlation_id |
INT32 |
null |
用戶提供的一個整數id,用於響應時由響應體帶回來 |
client_id |
NULLABLE_STRING |
null |
用戶提供的client id |
ApiVersionsRequest
查詢API版本信息node
請求 version:1
僅僅有請求頭apache
響應 version:1
name |
type |
defaultValue |
docString |
error_code |
INT16 |
null |
錯誤碼 |
api_versions |
ARRAY({api_key:INT16,min_version:INT16,max_version:INT16}) |
null |
broker能支持的api各版本列表。含最低版本,最高版本. |
throttle_time_ms |
INT32 |
0 |
因爲配額衝突而阻止請求的持續時間(毫秒)(若是請求未違反任何配額,則爲零 |
雖是請求broker端,可是實際仍是用的client中的API完成的邏輯:
ApiVersionsResponse.apiVersionsResponse
根據messageFormatVersion
消息格式版本推導出各API版本狀況。
API版本 最小的是0 。寫的固定的。 最大的是 requestSchemas的length -1 即requestSchemas最大版本。
此處不只返回每一個API的最小版本與最大版本,還返回能支持的API列表。如因版本問題不能支持的API是不會返回的。
可否支持的判斷依據是,API依賴的最小消息格式版本小於當前的消息格式版本,那麼就支持。api
for (ApiKeys apiKey : ApiKeys.values()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
versionList.add(neApiVersionsResponse.ApiVersion(apiKey));
}
}
批量查詢topic的元數據信息緩存
請求 version:5
name |
type |
defaultValue |
docString |
topics |
ARRAY(STRING) |
null |
須要查元數據的topic的列表,若是不送則查全部topic的元數據 |
allow_auto_topic_creation |
BOOLEAN |
null |
在broker配置了容許自動建立topic時是否自動建立topic |
響應 version:4
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
因爲配額衝突而阻止請求的持續時間(毫秒)(若是請求未違反任何配額,則爲零) |
brokers |
ARRAY({node_id:INT32,host:STRING,port:INT32,rack:NULLABLE_STRING}) |
null |
全部 活着的 broker的id ip port的信息 |
cluster_id |
NULLABLE_STRING |
null |
集羣id |
controller_id |
INT32 |
null |
controller角色的broker的id |
topic_metadata |
ARRAY({error_code:INT16,topic:STRING,is_internal:BOOLEAN,partition_metadata:ARRAY({error_code:INT16,partition:INT32,leader:INT32,replicas:ARRAY(INT32),isr:ARRAY(INT32)})}) |
null |
topic元數據,分區數、leader broker的id、副本所在broker id列表、isr broker id列表 |
broker端處理
在broker端session
- 過濾出受權的topics KafkaApis.handleTopicMetadataRequest
- 查詢出受權topics的元數據 KafkaApis.getTopicMetadata
2.1 從緩存中拿,拿到(跟topics的size相同)即返回
2.2 處理沒拿到的topic
2.2.1 容許建立topic的,就按默認副本數和默認分區數建立,不能建立的或者建立出錯的返回出錯信息。建立topic前提是協調者可用。不然COORDINATOR_NOT_AVAILABLE。
2.2.2 返回建立後的metadata
元數據信息有緩存 kafka.server.MetadataCache.cache
:topic <--> [partitionNo <--> 分區狀態封裝]
MetadataCache
中一系列getxxx方法都是用來讀取檢索的。
元數據緩存的更新參見 《MetadataCache更新》
FindCoordinatorRequest
查詢協調者fetch
請求 version:1
name |
type |
defaultValue |
docString |
coordinator_key |
STRING |
null |
組協調時是組id 事務協調時是事務id |
coordinator_type |
INT8 |
null |
協調類型(0 = group, 1 = transaction) |
響應 version:1
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
error_code |
INT16 |
null |
Response error code |
error_message |
NULLABLE_STRING |
null |
Response error message |
coordinator |
{node_id:INT32,host:STRING,port:INT32} |
null |
協調者broker的id ip port |
組協調與事務協調都用這個請求
coordinatorKey 組協調是組id 事務協調時是事務id
分區對應的leader節點就是組協調者ui
val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
.find(_.partition == partition)
.map(_.leader) // SimonNote: leader節點做爲協調者
這個請求的響應也就是將協調者信息(node_id,host,port)返回去this
JoinGroupRequest
加入消費組的請求
請求 version:2
name |
type |
defaultValue |
docString |
group_id |
STRING |
null |
惟一的組標誌 |
session_timeout |
INT32 |
null |
會話時間,超過這個時間沒收到心跳,協調者就認爲這個消費者掛了 |
rebalance_timeout |
INT32 |
null |
協調者在從新平衡組時等待每一個成員從新加入的最長時間 |
member_id |
STRING |
null |
由組協調者分配的成員ID,若是是第一次加入,則爲空。 |
protocol_type |
STRING |
null |
組協調協議實現類的惟一名稱 |
group_protocols |
ARRAY({protocol_name:STRING,protocol_metadata:BYTES}) |
null |
組成員能支持的組協調協議列表 |
響應 version:2
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
error_code |
INT16 |
null |
Response error code |
generation_id |
INT32 |
null |
組的年代? |
group_protocol |
STRING |
null |
協調者選中的組協議 |
leader_id |
STRING |
null |
組中的leader |
member_id |
STRING |
null |
第一次加入的時候組協調者給分的成員id |
members |
ARRAY({member_id:STRING,member_metadata:BYTES}) |
null |
組內成員? |
kafka.coordinator.group.GroupCoordinator.handleJoinGroup
一系列的check:
- 協調者是否可用
- 是不是本分區的協調者
- 消費組id是否合法(是否爲空)
- 是否協調者正在load中,
GroupMetadataManager
會管理當前partition是否在load中
- sessionTimeoutMs是否在組配置的最大最小範圍內
向groupManager
加入新建的GroupMetadata
實例(若是沒有的話,有就直接下一步了),GroupMetadata
有哪些東西,下面註釋寫了一部分,可是還包含事務消息用一些offset
/**
* Group contains the following metadata:
*
* Membership metadata:
* 1. Members registered in this group
* 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
* 3. Protocol metadata associated with group members
*
* State metadata:
* 1. group state
* 2. generation id
* 3. leader id
*/
doJoinGroup
一系列的check後,根據group.currentState
作相應處理
group.currentState
GroupMetadata.scala
中有對group狀態定義及action及走向到哪的明確詳細描述,很是重要
SyncGroupRequest
請求 version:1
name |
type |
defaultValue |
docString |
group_id |
STRING |
null |
group惟一標誌 |
generation_id |
INT32 |
null |
代的標誌? |
member_id |
STRING |
null |
第一次加入的時候組協調者給分的成員id |
group_assignment |
ARRAY({member_id:STRING,member_assignment:BYTES}) |
null |
null |
響應 version:1
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
error_code |
INT16 |
null |
Response error code |
member_assignment |
BYTES |
null |
null |
OffsetFetchRequest
請求 version:3
name |
type |
defaultValue |
docString |
group_id |
STRING |
null |
group id |
topics |
ARRAY({topic:STRING,partitions:ARRAY({partition:INT32})}) |
null |
topic列表,支持多個topic |
響應 version:3
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
responses |
ARRAY({topic:STRING,partition_responses:ARRAY({partition:INT32,offset:INT64,metadata:NULLABLE_STRING,error_code:INT16})}) |
null |
列表:topic-[{分區號-offset,元數據信息}] |
error_code |
INT16 |
null |
Response error code |
ListOffsetRequest
請求 version:2
name |
type |
defaultValue |
docString |
replica_id |
INT32 |
null |
follower的broker的id. 正常消費用-1. |
isolation_level |
INT8 |
null |
事務消息可見性設置。 使用 READ_UNCOMMITTED (isolation_level = 0)能看到全部消息. 使用 READ_COMMITTED (isolation_level = 1), 非事務消息和已經提交的消息能被看到. 更具體一點, READ_COMMITTED 返回比當前 LSO (last stable offset)小的offset, 並容許返回已經取消的事務 |
topics |
ARRAY({topic:STRING,partitions:ARRAY({partition:INT32,timestamp:INT64})}) |
null |
列表:topic,partitions{分區號,時間戳} |
響應 version:2
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
responses |
ARRAY({topic:STRING,partition_responses:ARRAY({partition:INT32,error_code:INT16,timestamp:INT64,offset:INT64})}) |
null |
列表:topic 分區號 錯誤碼 時間戳 offset |
FetchRequest
請求 version:6
name |
type |
defaultValue |
docString |
replica_id |
INT32 |
null |
follower的broker的id. 正常消費用-1 |
max_wait_time |
INT32 |
null |
等待響應的最大時間 單位ms. |
min_bytes |
INT32 |
null |
最小字節 |
max_bytes |
INT32 |
null |
最大字節. 單條消息若是超過這個大小也將返回 |
isolation_level |
INT8 |
null |
事務隔離級別 |
topics |
ARRAY({topic:STRING,partitions:ARRAY({partition:INT32,fetch_offset:INT64,log_start_offset:INT64,max_bytes:INT32})}) |
null |
列表: topic 分區號 取的offset log開始的 offset?? 最大字節. |
響應 version:6
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
responses |
ARRAY({topic:STRING,partition_responses:ARRAY({partition_header:{partition:INT32,error_code:INT16,high_watermark:INT64,last_stable_offset:INT64,log_start_offset:INT64,aborted_transactions:ARRAY({producer_id:INT64,first_offset:INT64})},record_set:RECORDS})}) |
null |
列表: topic 分區頭: 分區號 高水位值 LSO(上次穩定offset), log開始offset,取消事務:生產者id 第一個offset。 消息記錄集 |
HeartbeatRequest
請求 version:1
name |
type |
defaultValue |
docString |
group_id |
STRING |
null |
group id |
generation_id |
INT32 |
null |
group的年代 |
member_id |
STRING |
null |
第一次加入的時候組協調者給分的成員id |
響應 version:1
name |
type |
defaultValue |
docString |
throttle_time_ms |
INT32 |
0 |
Duration in milliseconds for which the request was throttled due to quota violation (Zero if the request did not violate any quota) |
error_code |
INT16 |
null |
響應碼 |