1.非中心的架構模型安全
2.基於TCP的一套Kafka通訊協議session
3.消息中間件&存儲系統架構
4.存儲邏輯層的高併發保證併發
5.isr機制下降了保證分佈式一致性的代價socket
咱們知道,在分佈式系統的架構類型裏,既有主從式的架構,也有非中心式的架構,像hadoop和hbase都採用了主從式的架構模型,主從式的架構優勢有不少,可是主從式下爲了不單點故障而採起的各類策略使得主從式架構的優勢並不那麼理想,kafka做爲一個分佈式的消息系統,它採用了非中心式的架構模型,每一個節點都做爲獨立的Server向Client提供服務,在集羣環境下,多個節點依賴zookeeper維護client在讀寫訪問中的分佈式一致性。tcp
在早期0.8.2以前的kafka版本中,kafka對zookeeper的依賴很是大,producer、server、consumer都很是依賴zookeeper,雖然zookeeper做爲一個輕量級的文件系統(已經成爲分佈式服務的基礎服務,用以提供分佈式環境下的一致性),可是大量的與其交互仍然會致使一些性能問題和不穩定的方面,在0.8.2以後的改進中,經過將一些狀態保持在kafka自身中,減小與zookeeper的大量交互,爲讀寫提供了更穩定的實現。分佈式
kafka的通訊協議至關的簡單,只有六類核心的客戶端請求APIs。ide
Metadata:描述當前可用的brokers的host和port信息,並給出每一個broker上分配了哪些partitions;高併發
Send:發送messages到broker;工具
Fetch:從broker中獲取messages,包括獲取data、獲取集羣的元數據信息以及獲取某個topic的offset信息;
Offsets:獲取某個給定topic partition的可用offsets信息;
Offset Commit:提交consumer group的offsets信息;
Offset Fetch:獲取某個consumer group的offsets信息集合。
這些都會在下面詳細描述。另外,0.9版本的kafka對consumers和kafka connect支持通常的group management。這部分的Client Api包括五種requests:
GroupCoordinator:定位當前consumer group的coordinator;
JoinGroup:加入一個consumer group,若是沒有就建立一個;
SyncGroup:同步同一個group下的全部consumer狀態(partition分配到consumer的分佈狀況);
Heartbeat:用來檢測group中的成員的存活狀態;
LeaveGroup:直接離開一個group。
還有一些用於監控/管理 kafka集羣的administrativeAPIs
DescribeGroups:用來檢測當前的groups;
ListGroups:列出broker中管理的groups。
RequestMessage => ApiKey ApiVersionCorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest |OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Response => CorrelationIdResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse |ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse |OffsetFetchResponse
ApiKey |
這個int數值是用來代表是哪種請求,KafkaApis根據這個值來調用相應的處理邏輯 |
ApiVersion |
因爲不一樣的Kafka版本支持的ApiVersion不一樣,所以要根據KafkaServer支持的ApiVersion來發送對應格式的Request |
CorrelationId |
客戶端提供的一個整型值,在response中會原封不動的返回,它的做用主要是用來匹配client和server之間的request和response。 |
下面的列表是ApiKey的整型值對應的Request類:
API name |
ApiKey Value |
ProduceRequest |
0 |
FetchRequest |
1 |
OffsetRequest |
2 |
MetadataRequest |
3 |
Non-user facing control APIs |
4-7 |
OffsetCommitRequest |
8 |
OffsetFetchRequest |
9 |
GroupCoordinatorRequest |
10 |
JoinGroupRequest |
11 |
HeartbeatRequest |
12 |
LeaveGroupRequest |
13 |
SyncGroupRequest |
14 |
DescribeGroupsRequest |
15 |
ListGroupsRequest |
16 |
Error |
Code |
重試 |
描述 |
NoError |
0 |
沒有錯誤,執行成功! |
|
Unknown |
-1 |
未知的server error |
|
OffsetOutRange |
1 |
請求的offset值超出了server端維護的對應topic/partition的offset值(能夠大於也能夠小於) |
|
InvalidMessage/CorruptMessage |
2 |
YES |
消息內容不能經過CRC校驗 |
UnknownTopicOrPartition |
3 |
YES |
請求的topic或partition再也不發往的broker上 |
InvalidMessageSize |
4 |
消息的大小爲負值 |
|
LeaderNotAvailable |
5 |
YES |
請求發生在leader選舉過程當中時拋出這個異常,此時請求的partition沒有leader沒法讀寫 |
NotLeaderForPartition |
6 |
YES |
在客戶端嘗試向不是leader的replica寫入信息時拋出,意味着客戶端的元數據信息過時了 |
RequestTimedOut |
7 |
YES |
request超過了用戶指定的時間,通常是值socket超時 |
BrokerNotAvailable |
8 |
這個錯誤不是client遇到的,每每發生在工具類的請求中 |
|
ReplicaNotAvailable |
9 |
broker上沒有指望的replica(能夠被安全的忽視) |
|
MessageSizeTooLarge |
10 |
server有一個最大消息的配置,當client向server端寫入超過配置大小的message時拋出 |
|
StaleControllerEpochCode |
11 |
在broker和broker通訊時發生的內部錯誤 |
|
OffsetMetadataTooLargeCode |
12 |
若是指定了一個大於配置的offset metadata大小的string |
|
GroupLoadInProgressCode |
14 |
YES |
當topic partition的leader發生變化後,新的leader在load offsets時,offset fetch request請求時拋出,或者在group membership(例如heartbeat)的response中返回當coordinator在load group metadata時 |
GroupCoordinatorNotAvailableCode |
15 |
YES |
offsets topic還沒建立或者group coordinator沒有active |
NotCoordinatorForGroupCode |
16 |
YES |
offset fetch或commit request的請求發往一個不是coordinator的節點 |
InvalidTopicCode |
17 |
訪問一個不可用的topic或者嘗試對內部topic(__consumer_offset)進行寫入操做時 |
|
RecordListTooLargeCode |
18 |
若是produce的message batch超過了配置的segment size |
|
NotEnoughReplicasCode |
19 |
YES |
處於in-sync的replicas數量小於配置的produce要求的最小replicas和requiredAcks=-1 |
NotEnoughReplicasAfterAppendCode |
20 |
YES |
當message被寫入到log後,可是in-sync的replicas數小於須要的 |
InvalidRequiredAcksCode |
21 |
請求的requiredAcks是不可得到的 |
|
IllegalGenerationCode |
22 |
server端的generation id和request中的generation id不一致 |
|
InconsistentGroupProtocolCode |
23 |
當前group可以接受的protocol type中不包含join group時給出的protocol type |
|
InvalidGroupIdCode |
24 |
當join group時groupId爲空或者null |
|
UnknownMemberIdCode |
25 |
當前generation裏group中不包含請求的memberId |
|
InvalidSessionTimeoutCode |
26 |
join group時超出了配置的request session timeout |
|
RebalanceInProgressCode |
27 |
當請求發起時coodinator正在對group進行rebalance,此時client要從新join group |
|
InvalidCommitOffsetSizeCode |
28 |
當offset commit超過metadata的最大限制被拒絕時 |
|
TopicAuthorizationFailedCode |
29 |
client沒有訪問請求的topic的權限時 |
|
GroupAuthorizationFailedCode |
30 |
||
ClusterAuthorizationFailedCode |
31 |
kafka實現了基於tcp的一種通訊協議,只要符合通訊協議的規範,便可與kafka server進行通訊,於是kafka client是跨語言的
kafka既能夠被認爲是消息中間件,也能夠做爲存儲系統使用
因爲kafka能夠將producer發送的消息保存起來供consumer消費,所以既能夠做爲消息中間件使用,也能夠做爲存儲系統來保存數據。
kafka在存儲邏輯層的設計爲高吞吐量提供了可能
kafka存儲數據的邏輯單元是partition,producer和consumer的處理單元也是基於partition的,針對某個topic,能夠有多個partition,而多個partition又能夠分佈在不一樣的節點上,從而在存儲層保證了I/O的併發,爲高吞吐量提供了可能。
kafka的isr同步機制使得保證分佈式一致性的代價大大下降
kafka的isr機制,容許isr中的replica和主副本以前有必定的差距,這樣作保證了響應的及時性,另外一方面,因爲在isr層面沒有考慮嚴格的分佈式一致性,沒有使用paxos的leader選舉策略,使得kafka的leader選舉更加容易,沒有嚴格的節點數要求的限制,只要有一個節點是isr中的,就不會丟數據。