時序數據流通過 Kafka 隊列時可能產生的亂序緣由和解決方法

Kafka 做爲一個流行的消息隊列,以分佈式高性能,高可靠性等特色已經在多種場景下普遍使用。在工業互聯網、物聯網時序數據存儲的解決方案中也有大量用到。java

但在實際部署過程當中,可能會由於配置緣由致使通過 Kafka 的數據在接收方產生亂序,給後續處理環節帶來排序等工做,形成沒必要要的處理開銷,下降系統的處理性能和額外排序的工做。git

其實能夠經過合理的規劃設計 Kafka 的配置和方法來避免消息在經過 Kafka 後亂序的產生,只須要遵循如下原則便可:對於須要確保順序的一條消息流,發送到同一個 partition 上去github

Kafka 能夠在一個 topic 下設置多個 partition 來實現分佈式和負載均衡,由同一 consumer group 下的不一樣 consumer 去消費;這樣的機制可以支持多線程分佈式的處理,帶來高性能,但也帶來了同一消息流走了不一樣路徑的可能性,若是沒有針對性的規劃,從架構上就沒法保證消息的順序。以下圖所示,對於同一個 topic 的一條消息流,寫入不一樣的 partition,就會產生多條路徑。web

爲了確保一條消息流的數據可以嚴格按照時間順序被消費,則必須遵循一條路徑的原則,這樣才能實現 FIFO(First In First Out)。

根據 Kafka 的文檔描述,把哪條記錄發到哪一個 partition,是由 producer 負責:算法

Producers數據庫

Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!緩存

可見,Kafka 已考慮到了確保消息順序的需求,提供了接口來實現根據指定的 key 值發送到同一 partition 的方法。 能夠看看 Kafka 相關源碼:多線程

1class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
2  private val random = new java.util.Random
3  def partition(key: Any, numPartitions: Int): Int = {
4    Utils.abs(key.hashCode) % numPartitions
5  }
6}
複製代碼

從源碼上來看,Kafka 支持經過 Key 的 hash 值對 partition 的數量求餘來實現基於 Key 的分配 partition 方法。所以咱們只要對不一樣時序消息流,找到他們不一樣的 key,而且這個 key 是不會發生變化的,那麼就能在發送到 Kafka 的時候,確保每一條消息流發送到同一個 partition,走惟一的路徑。所以咱們能夠經過指定 Key 的方式,來實現這種嚴格的時序關係。架構

具體實現方法

在 TDengine 的應用場景下,咱們一般會把某一類設備(超級表)劃分爲一個 topic。對於每一個設備,會單獨建表,一個設備產生的數據,會只放到一張表裏。對於設備產生的原始數據,就須要在這個數據中找一個可以表明這個數據的 ID,並且不會發生變化的字段,做爲 Key 值,在發送給 Kafka 時,帶上這個 Key 值。這樣就能確保該設備的全部數據流通過 Kafka 時,走惟一的路徑。這個 ID 或 key 每每是設備具備惟一性的設備編碼,這個編碼不只能夠做爲 Kafka 的 Key, 也能夠做爲 TDengine 裏的表名。app

具體實現很是簡單,在 producer 發送數據時,選擇一個 key,經過 KeyedMessage 方法生成消息,而後 send。以 Java 爲例,其餘語言能夠從 Kafka 文檔中找到相同功能的接口:

1 producer.send(new KeyedMessage<String, String>(topic,key,record))
複製代碼

這個接口,可讓使用者很是方便無需增長代碼的狀況下來實現指定每一個消息流綁定一個 partition 的結果。用戶也能夠經過本身實現一個 partition 的算法,來實現更精準的 partition 分配控制。具體實現能夠參考"Kafka 指定 partition 生產,消費"中的介紹。

關於 TDengine

TDengine是濤思數據擁有自主知識產權的高性能、可伸縮、高可靠、零管理的物聯網大數據平臺軟件,能夠將數據庫、緩存、消息隊列、流式計算等功能徹底融合在一塊兒。因爲針對物聯網大數據特色作了各類優化,TDengine的數據插入、查詢的性能比通用的大數據平臺好10倍以上,存儲空間也大爲節省,採用SQL接口,與第三方軟件能無縫集成,大幅簡化了物聯網平臺的系統架構,大幅減小了研發和運維的複雜度與成本。TDengine可普遍運用於物聯網、車聯網、工業大數據等領域。2019年7月12日,TDengine開源,在GitHub全球趨勢排行榜上連續幾天排名第一。

目前在GitHub上,TDengine的Star數已超10,000,GitHub地址:github.com/taosdata/TD… ,歡迎來GitHub上Star咱們!

相關文章
相關標籤/搜索