KIP-32 Add timestamps to Kafka message

經過KIP32,Kafka的每條消息都加進了時間戳,這個KIP在0.10.0.0被加入。算法

說到「時間」,先貼張圖,娛樂一下(若是對星球大戰系列電影不熟的話,請自動略過……)apache


 這個KIP的文檔在服務器

KIP-32 - Add timestamps to Kafka message

下面貼一下這個KIP的關鍵部分,俺的註解部分用灰色的字標識。app

Motivation 動機

This KIP tries to address the following issues in Kafka.ide

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Log rolling might break for a newly created replica as well because of the same reason as (1).
  3. Some use cases such as streaming processing needs a timestamp in messages.

To solve the above issues, we propose to add a timestamp to Kafka messages.性能

 

前兩個緣由都和replica的從新分配有關,replica從新分配就是把某個分區的副本遷移到另外一臺機器上,一般是爲了調節機器的負載,增長副本數,或者移除機器。在進行replica遷移的時候,Kafka會在遷移的目的地新建一個replica,而且從當前的leader處抓取消息,直到新的副本和leader同步,而後停掉再也不保留的replica。相關的文檔在這裏。 那麼,這裏就存在一個問題,就是新的replic在得到數據時,從leader的哪一個offset開始拉取數據呢?若是直接從最新的數據開始拉,那麼這個replica的數據就不足以承擔它做爲「副本」的任務,所以,確定是從最舊的offset開始拉。這個操做,在源碼裏的調用過程挺複雜的……大概會通過LeaderAndIsrRequest 處理 -> becomeFollower -> createReplica -> ReplicaFetcherThreader.handleOffsetOutOfRange,而後新的replica就會從leader的最先的offset開始拉取消息,而且寫成文件。這裏就遷扯到了Log的rolling和retention的問題。retention的本意是爲了消除再也不須要的消息或者限制Kafka在本地存儲的大小。當爲了第一個目的時,應該刪除已經保存了好久的消息,這個「好久」是指消息進入到Kafka的時間(或者消息產生的時間)距離當前時間過了好久。roll的本意是爲了保持單個文件不要太大,過大的文件不利於retention。可是,因爲Kafka的消息中沒有時間戳,因此新的replica是不知道消息真正進入Kafka的時間(或產生的時間)的,因此roll和rotention機制就沒法正確地工做。「沒法正確的工做」表如今log retention是依據於log segment(對應於一個log文件)的最後修改時間,而log rolling是依據於依據於log segment的建立時間。當replica reassign發生時,新的replica裏最初的這些log segment的建立時間和修改時間都不能反應這個log segment裏邊消息的產生或處理時間。flex

第三個增長timestamp的緣由,是因爲流處理系統的須要,好比上邊那幅圖,是StephanEwan在講Apache Flink的時間舉的例子。ui

Public Interfaces 與外部協議相關的改變

This KIP has the following public interface changes:this

  1. Add a new timestamp field to the message format.
  2. Use the forth least significant bit to indicate the timestamp type. (0 for CreateTime, 1 for LogAppendTime)
  3. Add the following two configurations to the broker
    1. message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
    2. max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
  4. Add a timestamp field to ProducerRecord and ConsumerRecord. A producer will be able to set a timestamp for a ProducerRecord. A consumer will see the message timestamp when it sees the messages.
  5. Add ProduceRequest/ProduceResponse V2 which uses the new message format.
  6. Add a timestamp in ProduceResponse V2 for each partition. The timestamp will either be LogAppendTime if the topic is configured to use it or it will be NoTimestamp if create time is used.
  7. Add FetchRequest/FetchResponse V2 which uses the new message format.
  8. Add a timestamp variable to RecordMetadata. The timestamp is the timestamp of messages appended to partition log.

最重要的有三點:spa

1. 由用戶來指定這個時間戳的確切含義,能夠指定兩種含義裏一個:1. 建立時間,2. 消息append到log的時間。當用戶指定時間戳的含義是create time時,broker會拒絕消息的create time與它進入到broker的時間相差過大的消息。實際上,對於用戶而言,這是個艱難的選擇。例如,選擇create time的話,這個timestamp是由用戶指定的,因此就可能在錯誤或者誤差,從而影響到rolling和retention的正常工做(好比可能根本不retention,從而寫滿磁盤)。對這種狀況,能夠經過max.message.time.difference.ms來避免。可是,還有些其它的狀況比這種要複雜,在這個KIP的文檔裏討論了各類選擇的優缺點。

2. 用戶能夠經過producer指定這個時間以人爲戳,consumer能夠得到這個時間戳。可是時間戳的含義仍是由第一點來肯定。

3. 須要更改Kafka協議,因此會致使與舊版本的兼容性問題。在KIP的文檔裏描述了升級的方案,因此也沒必要過去擔憂。


 CreateTime和LogAppendTime的優劣(簡要)

There are three options proposed before this proposal. The details of option 1, option 2 and Option 3 are in the Rejected Alternatives section.

The key decision we made in this KIP is whether use LogAppendTime(Broker Time) or CreateTime(Application Time)

The good things about LogAppendTime are:

  1. Broker is more robust.
  2. Monotonically increasing.
  3. Deterministic behavior for log rolling and retention.
  4. If CreateTime is required, it can always be put into the message payload.

LogAppendTime的好處是:

  1. Broker更加健壯(與create time相比,timestamp徹底在broker的掌握之中,因此broker的行爲更肯定)
  2. 單調增加
  3. log rolling和retention的行爲是肯定的
  4. 若是須要CreateTime,老是能夠把它放在消息的負載中

The good things about CreateTime are:

  1. More intuitive to users.
  2. User may want to have log retention based on when the message is created instead of when the message enters the pipeline.
  3. Immutable after entering the pipeline.

CreateTime的好處在於:

  1. 對於用戶更直觀
  2. 用戶可能但願log retention基於消息的建立時間而不是消息進入流水線(指消息的處理流程)的時間。
  3. 進入流水線之後就再也不改變。

Because both LogAppendTime and CreateTime have their own use cases, the proposed change provides users with the flexibility to choose which one they want to use.

For more detail discussion please refer to the mail thread as well as the Rejected Alternatives section.

This KIP is closely related to KIP-33. Some of the contents listed in this KIP are actually part of KIP-33. They are put here because they are important for the design decision. More specifically, KIP-33 will implement the following changes:

  1. Build a time index for each log segment using the timestamps in messages.
  2. Enforce log retention and log rolling use time based index.

因爲LogAppendTime和CreateTime各自有它們的使用場景,這個KIP的提議是由用戶本身選擇。

關於更詳盡地討論請參照郵件組的討論,以及下面的Rejected Alternatives一節。

這個KIP與KIP-33緊密相關,KIP-33將會作下面的變化:

1. 基於消息中的時間戳,爲每一個log segment建立基於時間的索引。

2. 使用基於時間的索引來增強log retention和log rolling。


 

加了時間戳之後,Kafka的工做細節

  1. 容許用戶在生產消息時候加入時間戳
  2. 當一個做爲leader的broker收到消息時
    • 若是message.timestamp.type=LogAppendTime, broker會用本身的本地時間覆蓋消息的時間戳,而且把消息追加到log
      • 若是收到的是壓縮的消息,那麼包裝後的消息的TS(timestamp)將會用當前的服務器時間覆蓋。Broker將會把包裝後消息的timestamp type位 置爲1。Broker會忽略內部消息的時間戳。在使用LogAppendTime時,之因此不修改每一個內部消息的TS,是爲了不從新壓縮帶來的性能損失。
      • 若是消息沒有壓縮,那麼消息的TS將會被覆蓋爲服務器的本地時間
    • 若是message.timestamp.type=CreateTime
      • 若是時間差在max.message.time.difference.ms以內,那麼broker將會接收這個消息而且把它追加到log。對於壓縮後的消息,broker將會把壓縮後消息的TS更新爲內部消息的最大的TS。
      • 若是時間差超過了max.message.time.difference.ms, broker將會以TimestampExceededThresholdException的形式拒絕整批消息。
  3. 當一個follwer broker收到一個消息時
    • 若是這個消息是壓縮後的消息,follower broker會使用壓縮後消息的TS來構建索引。也就是說,一個壓縮後消息的TS老是它的全部內部消息的TS裏最大的一個(譯註:之因此這麼作,是爲了構建索引,這與按TS索引的算法有關)。
    • 若是這個消息是一個沒有壓縮的消息,那麼這個消息的TS將會被用來構建索引。
  4. 當一個consumer收到消息時
    • 若是這個消息是一個壓縮後的消息
      • 若是包裝後消息的timestamp屬性位是0(CreateTime),那麼將會使用內部消息的時間戳
      • 若是包裝後消息的timestamp屬性位是1,那麼包裝消息的TS將會被用做內部消息的TS
    • 若是消息是一個沒有壓縮的消息,那麼這個消息的TS將會被使用。 
  5. message.timestamp.type和max.message.time.difference.ms將會是能夠按topic配置的
  6. 在ProduceResponseV2中,每一個partition都會返回一個TS
    • 若是topic使用LogAppendTime,那麼返回的TS將會是這個message set的LogAppendTime.
    • 若是topic使用CreateTime,那麼返回的TS將會是NoTimestamp
    • 若是producer爲每一個消息啓用了callback,那麼若是produce response不是NoTimestamp,它就會使用produce response中的TS,不然就使用producer記錄的的TS。
    • 在這種狀況下,producer將沒法分辨TS是LogAppendTime仍是CreateTime。
  7. 基於使用的索引有如下的保證(請注意基於時間的索引將會在KIP-33中被實現,而不是這個KIP。之因此在這裏討論索引的問題,是由於它和這個KIP的設計緊密相關)
    • 若是用戶索引一個時間戳:
      • 因此在這個時間戳以後的消息都將會被消息
      • 用戶可能會看到更早的消息
    • log retention將會按照時間索引文件的最後一個條目。由於最後一個條目將會是整個log segment裏將新的timestamp。若是這個entry過時了,那麼整個log segment將會被刪除。
    • log rolling將會依據全部見到過的消息的最大的timestamp。If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.(????這個不合邏輯呀,明顯要用最先的timestamp)
  8. 這個提議的很差的方面包括:
    • 若是message.timestamp.typ=CreateTime的話,timestamp可能不會是遞增的
    • log retention可能不是肯定的。也就是說,一個應該被刪除的消息如今依賴於同一個log segment的其它消息。而且,這些由用戶提供的時間戳依賴於被配置的時間差的閥值(即,max.message.time.differnece.ms)。
  9. 儘管這個提議有這些缺點,可是它給了用戶使有時間戳的靈活性。
    • 若是message.timestamp.type=CreateTime
      • 若是時間差閥值被設爲Long.MaxValue,那麼消息裏的時間戳就等於CreateTime
      • 若是時間差閥值在0和Long.MaxValue之間,就能保證消息的時間戳總能在一個肯定的範圍以內
    • 若是message.timestamp.type=LogAppendTime,那麼時間戳就會是log append time.      

總結:

關於時間戳的類型的選擇:CreateTime仍是LogAppendTime,仍是得依據於具體的使用場景。好比,若是強烈須要使用event time來進行後續的處理,那就只能選create time。重要的是在選擇好一種類型之後,瞭解它對於Kafka的各類行爲的影響。 

相關文章
相關標籤/搜索