在計算機世界中常常須要與數據打交道,這也是咱們戲稱CURD工程師的緣由之一。寫了兩年代碼,接觸了很多存儲系統,Redis、MySQL、Kafka、Elasticsearch…慢慢地發現背後的一些公共的設計思想老是那麼似曾相識,再深究一下,就會發現一些隱藏在這些系統背後的數學理論。html
生活中產生的大量數據須要交由計算機來處理,根據處理方式的不一樣分爲OLTP和OLAP兩大類應用。有些數據好比登陸流水、系統日誌信息,源源不斷,先採集下來拋給消息中間件(Kafka);有些數據,好比一條描述用戶特徵的記錄,就適合存儲到MySQL,並按日期建查詢索引。也就是說:面對大量的數據,把數據存儲起來只是一小步,重要的是如何把這些數據用起來,體現到存儲系統則是:有沒有一套方便的查詢接口可以方便快速地查到咱們想要的數據。若是將數據放到Kafka上了,那要怎麼查?若是把數據放到MySQL上了,很是適合針對高cardinality列建B+樹索引查詢,而對於文本類的數據,放到ES上,則基於倒排索引這種數據結構,根據tf-idf、bm25等這些衡量文檔類似度的算法來快速地得到想要的數據。node
從這也能夠看出,不一樣的存儲系統,爲了知足"查詢",它們背後的存儲原理(所採用的數據結構)是不一樣的。而對於這些存儲系統而言,都面臨着兩個問題:高可靠性和高可用性。可靠性,在我看來,是站在存儲系統自己來看,通常是討論單個實例如何保證數據的可靠。好比,一個正在運行的MySQL實例,它根據checkpoint機制,經過redo log 文件來保證持久性,另外還有double write buffer,保證數據頁的寫入是可靠的。相似地,在Elasticsearch裏面也有translog機制,用來保證數據的可靠性。因此,想深刻了解存儲系統,不妨對比一下它們之間的各類checkpoint機制。程序員
數據爲何須要有可靠性呢?根本緣由仍是內存是一種易失性存儲,根據馮偌依曼體系結構,程序老是從內存中取數據交給CPU作運算。若是數據沒有fsync到磁盤,若是系統宕機那數據會不會丟?redis
而對於可用性,是從Client角度而言的。即我無論你背後是一個redis實例仍是一個redis 集羣,你只管正常地給我提供好讀寫服務就行了。這裏爲了不SPOF,分佈式集羣就派上用場了,一臺機器掛了,另外一臺機器頂上。在分佈式系統中,須要管理好各個存儲實例,這時就須要節點的角色劃分,好比master節點、controller節點之類的稱呼。畢竟管理是要有層級的嘛,你們角色都同樣,怎麼管理呢?在這一點上,Redis集羣與Kafka集羣或者Elasticsearch集羣有很大的不一樣,具體體如今Redis本質上是一個P2P結構的集羣,而Elasticsearch和Kafka 採用的主從模型,爲何這麼說呢?Redis雖然也有Master節點和Slave節點之分,但它的各個Master節點之間是平等的,Redis的數據分佈方式是hash16384個槽到各個master節點上,每一個master節點負責處理落在這些槽內的數據,這是從數據分佈的角度來定義的Master節點,而Kafka中的Controller節點、Elasticsearch中的master節點並非從數據分佈的角度定義的,而是從集羣元信息維護、集羣管理的角度定義的,關於它們之間的具體區別我在這篇文章中也有過一些描述。另外,MySQL做爲關係型數據庫,受數據完整性約束、事務支持的限制,在分佈式集羣能力上要弱一些。算法
最近碰到一個問題,多個業務往向一個Kafka topic發送消息,有些業務的消費量很大,有些業務的消息量很小。因Kafka還沒有較好地支持按優先級來消費消息,致使某些業務的消息消費延時的問題。一種簡單的解決方案是再增長几個Topic,面對一些系統遺留問題,增長Topic帶來的是生產者和消費者處理邏輯複雜性。一種方法是使用Kafka Standalone consumer,先使用consumer.partitionFor("TOPIC_NAME")
獲取topic下的全部分區信息,再使用consumer.assign(partitions)
顯示地爲consumer指定消費分區。另外一種方法是基於consumer group 自定義Kafka consumer的分區分配策略,那這時候就得對Kafka目前已有的分區分配策略有所瞭解,而且明白何時、什麼場景下觸發rebalance?數據庫
Kafka consumer要消費消息,哪些的分區的消息交給哪一個consumer消費呢?這是consumer的分區分配策略,默認有三個:range、round-robin、sticky。說到round-robin這個算法,真是無處不在,它常常用在一些須要負載均衡的場景。好比Elasticsearch client向ES Server發送搜索請求時,由於默認狀況下每臺ES節點均可作爲coordinator節點接收用戶的查詢請求,而在coordinator節點上須要彙總全部分片的查詢結果,這須要消耗大量的內存和CPU,所以ES Client 也是基於round-robin算法選擇將查詢請求發送到哪一個ES節點上。若是你仔細留意,會發如今Redis裏面也會有這個算法的身影。再好比說:Redis LRU Cache中關於Key的access pattern,通常服從冪指分佈(power-law distribution):具備某一特徵的一小部分的Key訪問頻率遠遠大於其餘的Key,正如這種訪問特性,LRU能達到很好的緩存效果。另外,Redis sorted set類型是基於skiplist實現,新的skipNode節點屬於哪一層呢?這也是個power-law distribution問題,其源碼註釋中:apache
Returns a random level for the new skiplist node we are going to create. The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL (both inclusive), with a powerlaw-alike distribution where higher levels are less likely to be returned.緩存
其實,我想表達的是有些思想或者說是解決方案,它是通用的,應用於各個不一樣的存儲系統中,將它們對比起來看,能更好地理解系統背後的原理。網絡
最近每次想寫一些筆記時,腦海裏老是出現一些其餘各類各樣的想法。此次原本主要是想寫kafka 中這兩個配置參數:session.timeout.ms 和 heartbeat.interval.ms的區別的,結果就先扯了一通數據存儲相關的東西。session
由於一個topic每每有多個分區,而咱們又會在一個consumer group裏面建立多個消費者消費這個topic,所以:就有了一個問題:哪些的分區的消息交給哪一個consumer消費呢?這裏涉及到三個概念:consumer group,consumer group裏面的consumer,以及每一個consumer group有一個 group coordinator。conusmer分區分配是經過組管理協議來實施的:具體以下:
consumer group裏面的各個consumer都向 group coordinator發送JoinGroup請求,這樣group coordinator就有了全部consumer的成員信息,因而它從中選出一個consumer做爲Leader consumer,並告訴Leader consumer說:你拿着這些成員信息和我給你的topic分區信息去安排一下哪些consumer負責消費哪些分區吧
接下來,Leader consumer就根據咱們配置的分配策略(由參數partition.assignment.strategy指定)爲各個consumer計算好了各自待消費的分區。因而,各個consumer向 group coordinator 發送SyncGroup請求,但只有Leader consumer的請求中有分區分配策略,group coordinator 收到leader consumer的分區分配方案後,把該方案下發給各個consumer。畫個圖,就是下面這樣的:
而在正常狀況下 ,當有consumer進出consumer group時就會觸發rebalance,所謂rebalance就是從新制訂一個分區分配方案。而制訂好了分區分配方案,就得及時告知各個consumer,這就與 heartbeat.interval.ms參數有關了。具體說來就是:每一個consumer 都會根據 heartbeat.interval.ms 參數指定的時間週期性地向group coordinator發送 hearbeat,group coordinator會給各個consumer響應,若發生了 rebalance,各個consumer收到的響應中會包含 REBALANCE_IN_PROGRESS 標識,這樣各個consumer就知道已經發生了rebalance,同時 group coordinator也知道了各個consumer的存活狀況。
那爲何要把 heartbeat.interval.ms 與 session.timeout.ms 進行對比呢?session.timeout.ms是指:group coordinator檢測consumer發生崩潰所需的時間。一個consumer group裏面的某個consumer掛掉了,最長鬚要 session.timeout.ms 秒檢測出來。舉個示例session.timeout.ms=10,heartbeat.interval.ms=3
session.timeout.ms是個"邏輯"指標,它指定了一個閾值---10秒,在這個閾值內若是coordinator未收到consumer的任何消息,那coordinator就認爲consumer掛了。而heartbeat.interval.ms是個"物理"指標,它告訴consumer要每3秒給coordinator發一個心跳包,heartbeat.interval.ms越小,發的心跳包越多,它是會影響發TCP包的數量的,產生了實際的影響,這也是我爲何將之稱爲"物理"指標的緣由。
若是group coordinator在一個heartbeat.interval.ms週期內未收到consumer的心跳,就把該consumer移出group,這有點說不過去。就好像consumer犯了一個小錯,就一棍子把它打死了。事實上,有可能網絡延時,有可能consumer出現了一次長時間GC,影響了心跳包的到達,說不定下一個heartbeat就正常了。
而heartbeat.interval.ms確定是要小於session.timeout.ms的,若是consumer group發生了rebalance,經過心跳包裏面的REBALANCE_IN_PROGRESS,consumer就能及時知道發生了rebalance,從而更新consumer可消費的分區。而若是超過了session.timeout.ms,group coordinator都認爲consumer掛了,那也固然不用把 rebalance信息告訴該consumer了。
在kafka0.10.1以後的版本中,將session.timeout.ms 和 max.poll.interval.ms 解耦了。也就是說:new KafkaConsumer對象後,在while true循環中執行consumer.poll拉取消息這個過程當中,其實背後是有2個線程的,即一個kafka consumer實例包含2個線程:一個是heartbeat 線程,另外一個是processing線程,processing線程可理解爲調用consumer.poll方法執行消息處理邏輯的線程,而heartbeat線程是一個後臺線程,對程序員是"隱藏不見"的。若是消息處理邏輯很複雜,好比說須要處理5min,那麼 max.poll.interval.ms可設置成比5min大一點的值。而heartbeat 線程則和上面提到的參數 heartbeat.interval.ms有關,heartbeat線程 每隔heartbeat.interval.ms向coordinator發送一個心跳包,證實本身還活着。只要 heartbeat線程 在 session.timeout.ms 時間內 向 coordinator發送過心跳包,那麼 group coordinator就認爲當前的kafka consumer是活着的。
在kafka0.10.1以前,發送心跳包和消息處理邏輯這2個過程是耦合在一塊兒的,試想:若是一條消息處理時長要5min,而session.timeout.ms=3000ms,那麼等 kafka consumer處理完消息,group coordinator早就將consumer 移出group了,由於只有一個線程,在消息處理過程當中就沒法向group coordinator發送心跳包,超過3000ms未發送心跳包,group coordinator就將該consumer移出group了。而將兩者分開,一個processing線程負責執行消息處理邏輯,一個heartbeat線程負責發送心跳包,那麼:就算一條消息須要處理5min,只要底heartbeat線程在session.timeout.ms向group coordinator發送了心跳包,那consumer能夠繼續處理消息,而不用擔憂被移出group了。另外一個好處是:若是consumer出了問題,那麼在 session.timeout.ms內就能檢測出來,而不用等到 max.poll.interval.ms 時長後才能檢測出來。
原文:http://www.javashuo.com/article/p-mqghhnpy-ma.html
最近碰到一些中文分詞的歸一化、分詞結果的準確度(分詞生成自定義的詞)、定製 ES Analyzer插件知足特殊符號搜索、中文行業術語搜索 需求的問題...有時間再寫一篇。