kafka-3-部分原理

kafka 部分設計及原理

topic & partition & segment

  1. 定義

    topic :主題名,用於對消息進行分類,是一個邏輯上的概念html

    partition :是物理上的一個概念,一個topic能夠對應多個partition,消息實際存儲在partition正則表達式

    partition 是一個有序,不可變的記錄序列。partition中的每一條消息都有一個序列號,稱之爲offset,offset 在一個partition內惟一,用於區別消息shell

    segment :partition被分爲多個segment文件進行存儲apache

  2. partition 的物理存儲結構編程

    建立topic:test.show.log, 副本數2,分區數3,segment文件大小512bytebootstrap

    # 建立topic
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic test.show.log --config segment.bytes=512
    # 查看topic 狀態
    bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test.show.log
    
     Topic:test.show.log     PartitionCount:3        ReplicationFactor:2     Configs:segment.bytes=512
            Topic: test.show.log    Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
            Topic: test.show.log    Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
            Topic: test.show.log    Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1

    能夠看到broker.0 服務持有partition 0,1的副本,而且爲1 partition的leader數組

    打開其日誌保存目錄能夠看到緩存

    test.show.log-0
    test.show.log-1

    broker 對於topic 的每個 partition 使用單獨的目錄保存,每一個目錄下初始有服務器

    00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint

    .log文件: segment 日誌文件網絡

    .index文件: segment offset索引文件

    .timeindex文件:segment timestamp索引文件

    leader-epoch-checkpoint:用於副本備份機制

    每個分區的日誌文件被分爲多個segment文件,segment文件的命名規則:

    • 第一個segment文件名都是0
    • <u>後續segment文件名是上一個segment文件最後一條消息的offset值</u>

segment 文件記錄的信息

.index 文件:偏移量->消息在.log文件中的物理位置

.timeindex 文件:時間戳T->.index文件中的偏移量offset,表示表示比T晚的全部消息偏移量都比offset大

根據offset查找文件的步驟

.index 使用 稀疏索引,kafka會在內存中維護一份索引,經過二分查找定位到消息所在的.log文件,以及在.log文件中的位置。

  1. 分區的意義

    • 有利於水平擴展
    • 負載均衡
  2. 分段的意義

    • 快速定位消息
    • 日誌快速清除
  3. timeindex 的意義

    • 根據時間戳來定位消息
    • <u>基於時間戳的日誌切分策略</u>
    • <u>基於時間戳的日誌清除策略</u>

消息格式

消息格式通過了幾個版本的變動

具體見:

https://www.cnblogs.com/qwang...

https://kafka.apache.org/docu...

挑幾個字段來講明

key: 每一個消息均可以指定一個key。能夠經過給多個消息指定同一個key分發到同一個partition中,從而保證消息的有序性。

headers:可變長消息頭,能夠不須要解析payload而拿到一些消息的屬性信息。

timestamp: 時間戳。時間戳有兩種類型:CreateTime,LogAppendTime。

能夠經過 broker 配置 log.message.timestamp.type來指定全局topic時間戳類型;

也能夠經過命令行建立topic時單獨指定該topic的時間戳類型。

生產者發送消息時,能夠指定消息的事件戳,若是未指定,則使用生產者客戶端當前時間。

leader & 集羣發現機制

kafka client 對 partition的讀寫都是直接訪問leader。那麼客戶端是如何找到leader的?

主要是經過發送一個稱之爲TopicMetadataRequest的請求來獲取。

TopicMetadataRequest => [TopicName]
  TopicName => string
Field Type Description
TopicName []string 想要獲取的topic的元數組,若是爲空,則下發全部topic的元數據
MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]
Field Description
Broker kafka broker的信息,包括broker id, hostname, port
Isr 與leader 保持同步的broker id
Leader leader的broker id,若是當前不存在leader(leader正在選舉中),則爲-1
Replicas 全部當前存活的follower

服務器MetaData的存儲後續研究

kafka 協議文檔地址:https://cwiki.apache.org/conf...

consumer & consumer group & offset commit

  1. pull vs push

    push :broker 控制傳輸速度,若是消費者處理速度較慢,會積壓大量消息,最終致使消費者拒絕提供服務

    pull :適用於向消費者批量發送大量數據

    pull 方式的不足:

    消費者可能須要輪訓等待消息的到達,爲避免這種狀況,拉取請求中能夠指示等待給定數量的數據到達。

  2. consumer 默認從哪一個位置開始消費

        auto.offset.reset

  1. consumer group && offset commit

    consumer 做爲組進行消費時,須要記錄每一個分區消費的offset,以便於進行重平衡後,新的消費者能夠從上一個位置繼續消費。

    這個offset是由consumer主動提交的,broker會記錄在稱之爲__consumer_offsets的topic中,其對應的partition爲 hash(group.id)%mode

    提交方式

    1. 自動提交

      enable.auto.commit=true

    2. 手動提交

    一個partition,同一時間只會被同一group內的consumer消費,若是consumer數量大於partition數量,則多餘的consumer一直空閒

  2. rebalance

    consumer 的離開加入,partition數量的變化,以及訂閱topic數發生變動(能夠經過正則表達式訂閱多個topic)都會致使rebalance,rebalance 是由一個稱之爲coordinator的broker來負責的。

    coordinator 的選取:

    1. 看offset保存在哪一個partition中
    2. 該partition的leader做爲該group的coordinator

    rebalance 大體流程:

    1. coordinator 隨機選取一個consumer做爲leader,並將角色信息發給consumers,還會把follower的信息發送給leader。
    2. consumer leader 根據獲得的信息分配partition
    3. consumers 發送同步請求至coordinator,consumer leader 發送的請求中包含分配狀況
    4. coordinator 將分配狀況告訴全部consumer

ack & replica & leader election

  1. replica

    kafka 默認使用副本機制,將不須要備份的topic看作副本數爲1。kafka 備份的單元是topic partition。

    follower 做爲 consumer 從 leader 拉取數據,而後應用到本身的log中。拉取方式可使follower 批量寫入本身的log。

    Isr (in sync)狀態

    需同時知足如下兩個條件

    1. 節點的session必須在zookeeper中(必須和zk保持鏈接,經過zookeeper的心跳機制)
    2. 若是是一個follwer,必須同步leader的寫入,而且不能太落後

    leader 跟蹤 Isr 節點集,若是follower 死亡,卡住,或落後,leader 就會將他從 Isr 列表中刪除。卡住或滯後副本的肯定由replica.lag.time.max.ms配置項控制。

    replica.lag.time.max.ms

    If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr

    long

    default 10000

  2. message committed 狀態

    定義:Isr集合內的全部節點將其寫入本身的log中

    只有確認commited的消息纔會被consumer 消費,消費者沒必要擔憂會獲得一條會丟失的消息,也就是說只要消費者獲得了一條消息,那麼即便leader掛掉,該消息也不會丟失(由於已同步至Isr集合)。

  3. acks

    對於producer來講,能夠在發送請求的即時性和消息的持久化之間進行權衡,來選擇是否等待消息commited。

    這個選項能夠經過acks配置。

    acks=-1|all 與min.insync.replicas 配合能夠最大程度保證消息的可靠性。

    acks=-1|all 表示消息被追加到Isr集合內全部節點後才返回,若是當前Isr只有一個leader(follower因爲某些緣由掉線),那麼也會返回成功,隨後leader掛掉,其餘follower被選舉爲leader,那麼該條消息就會丟失。min.insync.replicas 能夠指定,若是當前Isr內節點數量小於min.insync.replicas指定數量,則producer直接拋出異常。

  4. leader election

    kafka controller.brokers中的一個節點會擔當controller的角色(聽說是經過zk建立節點,建立成功則爲controller,失敗則監聽該節點,若是controller掛掉,則再次競爭)

    controller 負責管理整個集羣中分區和副本的狀態。

    leader選舉

    若是Isr中有至少一個replica倖存,則選擇其中一個爲leader。

    不然選擇該partition中的任意一個倖存的replica爲leader。

    replica 都不工做

    1. 等待ISR中的任一個replica 活過來,而且選他做爲leader。
    2. 選擇第一個活過來的replica做爲leader。

    第一種等待的時間可能會比較長,或者不可用。第二種不保證包含了全部已commited的消息。

    須要在可用性和一致性當中作出折中。

    unclean.leader.election.enable 配置指定使用哪一種方案,默認是true,使用第2種。

  5. HW(高水位)

    數據的一致性

    待研究

log compaction

  1. 日誌刪除策略

    kafka對於過時日誌的刪除有兩種策略

    • delete

      直接刪除過時的segment文件
    • compact

      經過建立新的segment文件將相同key的最新一條消息保留下來(縮容,合併)

compact 效果圖:

  1. 應用場景

    這個特性能夠保證日誌包含每一個key的最終值的完整快照,消費者就能夠從這個topic中恢復本身的狀態,而不須要保留全部更改的完整日誌。

    • 數據容災,提升數據的可用性。定時將內存中的數據備份到topic中,當崩潰恢復後再從topic中讀回來
  2. 源碼解析

    待研究

  3. 啓用 log compaction

    首先確保 broker 配置項 log.cleaner.enable 爲 true.

    能夠經過設置broker 配置項 log.cleanup.policy 爲 compact,默認是 delete.

    能夠經過設置topic 配置項c leanup.policy 爲 compact,默認是delete.

    topic 配置項能夠覆蓋broker全局配置.

    log.cleanup.policy 和 cleanup.policy 取值能夠是compact或delete或compact,delete

  4. 參考

    https://kafka.apache.org/documentation/#compaction

    https://blog.csdn.net/u013256...

網絡層

  1. NIO
  2. 批量發送數據集時使用sendfile。延申:零拷貝技術,寫時拷貝技術
  3. 線程模型

    一個線程做爲acceptor,接受tcp鏈接,n個處理線程,每一個處理線程處理固定數量的鏈接。
  4. 協議

    定長編碼

    int8, int16, int32, int64。大端序

    變長編碼

    length+content

    字符串 length 用int16表示,二進制數組length用int32表示。

    content爲空用length=-1表示

    數組

    sizeof(array)+array[0]+array[1]....+array[n]

    數組大小使用int32表示

集羣擴展

增長broker,增長partition,增長replication

待研究

事務冪等

  1. producer請求的冪等性

    冪等:

    在編程中一個冪等操做的特色是其任意屢次執行所產生的影響均與一次執行的影響相同

    producer 引入冪等性的意義:

    防止生產者重複生產消息。生產者進行retry時重複產生消息,有了冪等性以後,在進行retry重試時,只會有一條消息commited。

    實現方式:

    1. PID。每一個producer在初始化時會被分配一個惟一的PID,改PID對用戶不可見
    2. Sequence Number。producer 在向每一個paritition發送的每條數據都會攜帶一個seq。其從0遞增
    3. broker 會緩存PID及其seq。若是收到的消息seq比緩存seq大1(highwater mark)則接受,不然丟棄。(tcp ack)

    非冪等

    冪等

    PID 的生成:

    1. producer 從任意一個broker獲取事務協調者(Transaction Coordinator)的信息
    2. producer 向 Transaction Coordinator 請求PID

    Transaction Coordinator 的選擇

    PID的生成規則及保存失效規則

    注意事項:

    若是使用kafka的冪等性,則必須開啓 topic 配置 enable.idempotence

    When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retriesto be greater than 0 and acks must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigExceptionwill be thrown.

    前提條件 acks必須是all

  2. 事務

    待研究

dashboard

待研究

相關文章
相關標籤/搜索