Kafka的架構設計(目前翻譯最好的一稿)

轉自:http://www.oschina.net/translate/kafka-designjava

參與翻譯(4人):fbm, 飛翔的猴子, Khiyuan, nesteaanode

感謝這些同志們的辛勤工做,翻譯的真不錯,目前見到的最好的Kafka中文文章linux

-------------------------------web

Kafka是一個高吞吐量分佈式消息系統。linkedin開源的kafka。 Kafka就跟這個名字同樣,設計很是獨特。首先,kafka的開發者們認爲不須要在內存裏緩存什麼數據,操做系統的文件緩存已經足夠完善和強大,只要你不搞隨機寫,順序讀寫的性能是很是高效的。kafka的數據只會順序append,數據的刪除策略是累積到必定程度或者超過必定時間再刪除。Kafka另外一個獨特的地方是將消費者信息保存在客戶端而不是MQ服務器,這樣服務器就不用記錄消息的投遞過程,每一個客戶端都本身知道本身下一次應該從什麼地方什麼位置讀取消息,消息的投遞過程也是採用客戶端主動pull的模型,這樣大大減輕了服務器的負擔。Kafka還強調減小數據的序列化和拷貝開銷,它會將一些消息組織成Message Set作批量存儲和發送,而且客戶端在pull數據的時候,儘可能以zero-copy的方式傳輸,利用sendfile(對應java裏的 FileChannel.transferTo/transferFrom)這樣的高級IO函數來減小拷貝開銷。可見,kafka是一個精心設計,特定於某些應用的MQ系統,這種偏向特定領域的MQ系統我估計會愈來愈多,垂直化的產品策略值的考慮。正則表達式

 

咱們爲何要搭建該系統

 

Kafka是一個消息系統,本來開發自LinkedIn,用做LinkedIn的活動流(activity stream)和運營數據處理管道(pipeline)的基礎。如今它已爲多家不一樣類型的公司 做爲多種類型的數據管道(data pipeline)和消息系統使用。算法

活動流數據是全部站點在對其網站使用狀況作報表時要用到的數據中最常規的部分。活動數據包括頁面訪問量(page view)、被查看內容方面的信息以及搜索狀況等內容。這種數據一般的處理方式是先把各類活動以日誌的形式寫入某種文件,而後週期性地對這些文件進行統計分析。運營數據指的是服務器的性能數據(CPU、IO使用率、請求時間、服務日誌等等數據)。運營數據的統計方法種類繁多。數據庫

近年來,活動和運營數據處理已經成爲了網站軟件產品特性中一個相當重要的組成部分,這就須要一套稍微更加複雜的基礎設施對其提供支持。apache

 

 

活動流和運營數據的若干用例

  • "動態彙總(News feed)"功能。將你朋友的各類活動信息廣播給你api

  • 相關性以及排序。經過使用計數評級(count rating)、投票(votes)或者點擊率( click-through)斷定一組給定的條目中那一項是最相關的.數組

  • 安全:網站須要屏蔽行爲不端的網絡爬蟲(crawler),對API的使用進行速率限制,探測出擴散垃圾信息的企圖,並支撐其它的行爲探測和預防體系,以切斷網站的某些不正常活動。

  • 運營監控:大多數網站都須要某種形式的實時且隨機應變的方式,對網站運行效率進行監控並在有問題出現的狀況下能觸發警告。

  • 報表和批處理: 將數據裝載到數據倉庫或者Hadoop系統中進行離線分析,而後針對業務行爲作出相應的報表,這種作法很廣泛。

 

 

 

 

活動流數據的特色

 

這種由不可變(immutable)的活動數據組成的高吞吐量數據流表明了對計算能力的一種真正的挑戰,因其數據量很容易就可能會比網站中位於第二位的數據源的數據量大10到100倍。

傳統的日誌文件統計分析對報表和批處理這種離線處理的狀況來講,是一種很不錯且頗有伸縮性的方法;可是這種方法對於實時處理來講其時延太大,並且還具備較高的運營複雜度。另外一方面,現有的消息隊列系統(messaging and queuing system)卻很適合於在實時或近實時(near-real-time)的狀況下使用,但它們對很長的未被處理的消息隊列的處理很不給力,每每並不將數據持久化做爲首要的事情考慮。這樣就會形成一種狀況,就是當把大量數據傳送給Hadoop這樣的離線系統後, 這些離線系統每一個小時或天天僅能處理掉部分源數據。Kafka的目的就是要成爲一個隊列平臺,僅僅使用它就可以既支持離線又支持在線使用這兩種狀況。

Kafka支持很是通用的消息語義(messaging semantics)。儘管咱們這篇文章主要是想把它用於活動處理,但並無任何限制性條件使得它僅僅適用於此目的。

 

 

部署

 

下面的示意圖所示是在LinkedIn中部署後各系統造成的拓撲結構。

要注意的是,一個單個的Kafka集羣系統用於處理來自各類不一樣來源的全部活動數據。它同時爲在線和離線的數據使用者提供了一個單個的數據管道,在線活動和異步處理之間造成了一個緩衝區層。咱們還使用kafka,把全部數據複製(replicate)到另一個不一樣的數據中心去作離線處理。

咱們並不想讓一個單個的Kafka集羣系統跨越多個數據中心,而是想讓Kafka支持多數據中心的數據流拓撲結構。這是經過在集羣之間進行鏡像或「同步」實現的。這個功能很是簡單,鏡像集羣只是做爲源集羣的數據使用者的角色運行。這意味着,一個單個的集羣就可以未來自多個數據中心的數據集中到一個位置。下面所示是可用於支持批量裝載(batch loads)的多數據中心拓撲結構的一個例子:

請注意,在圖中上面部分的兩個集羣之間不存在通訊鏈接,二者可能大小不一樣,具備不一樣數量的節點。下面部分中的這個單個的集羣能夠鏡像任意數量的源集羣。要了解鏡像功能使用方面的更多細節,請訪問這裏.

 

 

主要的設計元素

 

Kafka之因此和其它絕大多數信息系統不一樣,是由於下面這幾個爲數很少的比較重要的設計決策:

  1. Kafka在設計之時爲就將持久化消息做爲一般的使用狀況進行了考慮。

  2. 主要的設計約束是吞吐量而不是功能。

  3. 有關哪些數據已經被使用了的狀態信息保存爲數據使用者(consumer)的一部分,而不是保存在服務器之上。

  4. Kafka是一種顯式的分佈式系統。它假設,數據生產者(producer)、代理(brokers)和數據使用者(consumer)分散於多臺機器之上。

以上這些設計決策將在下文中進行逐條詳述。

 

 

基礎知識

 

首先來看一些基本的術語和概念。

消息指的是通訊的基本單位。由消息生產者(producer)發佈關於某話題(topic)的消息,這句話的意思是,消息以一種物理方式被髮送給了做爲代理(broker)的服務器(多是另一臺機器)。若干的消息使用者(consumer)訂閱(subscribe)某個話題,而後生產者所發佈的每條消息都會被髮送給全部的使用者。

Kafka是一個顯式的分佈式系統 —— 生產者、使用者和代理均可以運行在做爲一個邏輯單位的、進行相互協做的集羣中不一樣的機器上。對於代理和生產者,這麼作很是天然,但使用者卻須要一些特殊的支持。每一個使用者進程都屬於一個使用者小組(consumer group) 。準確地講,每條消息都只會發送給每一個使用者小組中的一個進程。所以,使用者小組使得許多進程或多臺機器在邏輯上做爲一個單個的使用者出現。使用者小組這個概念很是強大,能夠用來支持JMS中隊列(queue)或者話題(topic)這兩種語義。爲了支持隊列 語義,咱們能夠將全部的使用者組成一個單個的使用者小組,在這種狀況下,每條消息都會發送給一個單個的使用者。爲了支持話題語義,能夠將每一個使用者分到它本身的使用者小組中,隨後全部的使用者將接收到每一條消息。在咱們的使用當中,一種更常見的狀況是,咱們按照邏輯劃分出多個使用者小組,每一個小組都是有做爲一個邏輯總體的多臺使用者計算機組成的集羣。在大數據的狀況下,Kafka有個額外的優勢,對於一個話題而言,不管有多少使用者訂閱了它,一條條消息都只會存儲一次。

 

 

消息持久化(Message Persistence)及其緩存

不要懼怕文件系統!

 

在對消息進行存儲和緩存時,Kafka嚴重地依賴於文件系統。 你們廣泛認爲「磁盤很慢」,於是人們都對持久化結(persistent structure)構可以提供說得過去的性能抱有懷疑態度。實際上,同人們的指望值相比,磁盤能夠說是既很慢又很快,這取決於磁盤的使用方式。設計的很好的磁盤結構每每能夠和網絡同樣快。

磁盤性能方面最關鍵的一個事實是,在過去的十幾年中,硬盤的吞吐量正在變得和磁盤尋道時間嚴重不一致了。結果,在一個由6個7200rpm的SATA硬盤組成的RAID-5磁盤陣列上,線性寫入(linear write)的速度大約是300MB/秒,但隨即寫入卻只有50k/秒,其中的差異接近10000倍。線性讀取和寫入是全部使用模式中最具可預計性的一種方式,於是操做系統採用預讀(read-ahead)和後寫(write-behind)技術對磁盤讀寫進行探測並優化後效果也不錯。預讀就是提早將一個比較大的磁盤塊中內容讀入內存,後寫是將一些較小的邏輯寫入操做合併起來組成比較大的物理寫入操做。關於這個問題更深刻的討論請參考這篇文章ACM Queue article;實際上他們發現,在某些狀況下,順序磁盤訪問可以比隨即內存訪問還要快!

 

 

爲了抵消這種性能上的波動,現代操做系變得愈來愈積極地將主內存用做磁盤緩存。全部現代的操做系統都會樂於將全部空閒內存轉作磁盤緩存,即時在須要回收這些內存的狀況下會付出一些性能方面的代價。全部的磁盤讀寫操做都須要通過這個統一的緩存。想要捨棄這個特性都不太容易,除非使用直接I/O。所以,對於一個進程而言,即便它在進程內的緩存中保存了一份數據,這份數據也可能在OS的頁面緩存(pagecache)中有重複的一份,結構就成了一份數據保存了兩次。

更進一步講,咱們是在JVM的基礎之上開發的系統,只要是瞭解過一些Java中內存使用方法的人都知道這兩點:

  1. Java對象的內存開銷(overhead)很是大,每每是對象中存儲的數據所佔內存的兩倍(或更糟)。

  2. Java中的內存垃圾回收會隨着堆內數據不斷增加而變得愈來愈不明確,回收所花費的代價也會愈來愈大。

  3.  

 

因爲這些因素,使用文件系統並依賴於頁面緩存要優於本身在內存中維護一個緩存或者什麼別的結構 —— 經過對全部空閒內存自動擁有訪問權,咱們至少將可用的緩存大小翻了一倍,而後經過保存壓縮後的字節結構而非單個對象,緩存可用大小接着可能又翻了一倍。這麼作下來,在GC性能不受損失的狀況下,咱們可在一臺擁有32G內存的機器上得到高達28到30G的緩存。並且,這種緩存即便在服務重啓以後會仍然保持有效,而不象進程內緩存,進程重啓後還須要在內存中進行緩存重建(10G的緩存重建時間可能須要10分鐘),不然就須要以一個全空的緩存開始運行(這麼作它的初始性能會很是糟糕)。這還大大簡化了代碼,由於對緩存和文件系統之間的一致性進行維護的全部邏輯如今都是在OS中實現的,這事OS作起來要比咱們在進程中作那種一次性的緩存更加高效,準確性也更高。若是你使用磁盤的方式更傾向於線性讀取操做,那麼隨着每次磁盤讀取操做,預讀就能很是高效使用隨後準能用得着的數據填充緩存。

 

 

這就讓人聯想到一個很是簡單的設計方案:不是要在內存中保存儘量多的數據並在須要時將這些數據刷新(flush)到文件系統,而是咱們要作徹底相反的事情。全部數據都要當即寫入文件系統中持久化的日誌中但不進行刷新數據的任何調用。實際中這麼作意味着,數據被傳輸到OS內核的頁面緩存中了,OS隨後會將這些數據刷新到磁盤的。此外咱們添加了一條基於配置的刷新策略,容許用戶對把數據刷新到物理磁盤的頻率進行控制(每當接收到N條消息或者每過M秒),從而能夠爲系統硬件崩潰時「處於危險之中」的數據在量上加個上限。

這種以頁面緩存爲中心的設計風格在一篇講解Varnish的設計思想的文章中有詳細的描述(文風略帶有助於身心健康的傲氣)。

 

 

常量時長足矣

 

消息系統元數據的持久化數據結構每每採用BTree。 BTree是目前最通用的數據結構,在消息系統中它能夠用來普遍支持多種不一樣的事務性或非事務性語義。 它的確也帶來了一個很是高的處理開銷,Btree運算的時間複雜度爲O(log N)。通常O(log N)被認爲基本上等於常量時長,但對於磁盤操做來說,狀況就不一樣了。磁盤尋道時間一次要花10ms的時間,並且每一個磁盤同時只能進行一個尋道操做,於是其並行程度頗有限。所以,即便少許的磁盤尋道操做也會形成很是大的時間開銷。由於存儲系統混合了高速緩存操做和真正的物理磁盤操做,因此樹型結構(tree structure)可觀察到的性能每每是超線性的(superlinear)。更進一步講,BTrees須要一種很是複雜的頁面級或行級鎖定機制才能避免在每次操做時鎖定一整顆樹。實現這種機制就要爲行級鎖定付出很是高昂的代價,不然就必須對全部的讀取操做進行串行化(serialize)。由於對磁盤尋道操做的高度依賴,就不太可能高效地從驅動器密度(drive density)的提升中得到改善,於是就不得不使用容量較小(< 100GB)轉速較高的SAS驅動去,以維持一種比較合理的數據與尋道容量之比。

 

直覺上講,持久化隊列能夠按照一般的日誌解決方案的樣子構建,只是簡單的文件讀取和簡單地向文件中添加內容。雖然這種結果必然沒法支持BTree實現中的豐富語義,但有個優點之處在於其全部的操做的複雜度都是O(1),讀取操做並不須要阻止寫入操做,並且反之亦然。這樣作顯然有性能優點,由於性能徹底同數據大小之間脫離了關係 —— 一個服務器如今就能利用大量的廉價、低轉速、容量超過1TB的SATA驅動器。雖然這些驅動器尋道操做的性能很低,但這些驅動器在大量數據讀寫的狀況下性能還湊和,而只需1/3的價格就能得到3倍的容量。 可以存取到幾乎無限大的磁盤空間而無須付出性能代價意味着,咱們能夠提供一些消息系統中並不常見的功能。例如,在Kafka中,消息在使用完後並無當即刪除,而是會將這些消息保存至關長的一段時間(比方說一週)。

 

 

效率最大化

 

咱們的假設是,系統裏消息的量很是之大,實際消息量是網站頁面瀏覽總數的數倍之多(由於每一個頁面瀏覽就是咱們要處理的其中一個活動)。並且咱們假設發佈的每條消息都會被至少讀取一次(每每是屢次),於是咱們要爲消息使用而不是消息的產生進行系統優化,

致使低效率的緣由常見的有兩個:過多的網絡請求和大量的字節拷貝操做。

爲了提升效率,API是圍繞這「消息集」(message set)抽象機制進行設計的,消息集將消息進行天然分組。這麼作能讓網絡請求把消息合成一個小組,分攤網絡往返(roundtrip)所帶來的開銷,而不是每次僅僅發送一個單個消息。

 

 

MessageSet實現(implementation)自己是對字節數組或文件進行一次包裝後造成的一薄層API。於是,裏面並不存在消息處理所需的單獨的序列化(serialization)或逆序列化(deserialization)的步驟。消息中的字段(field)是按需進行逆序列化的(或者說,在不須要時就不進行逆序列化)。

由代理維護的消息日誌自己不過是那些已寫入磁盤的消息集的目錄。按此進行抽象處理後,就可讓代理和消息使用者共用一個單個字節的格式(從某種程度上說,消息生產者也能夠用它,消息生產者的消息要求其校驗和(checksum)並在驗證後纔會添加到日誌中)

使用共通的格式後就能對最重要的操做進行優化了:持久化後日志塊(chuck)的網絡傳輸。爲了將數據從頁面緩存直接傳送給socket,現代的Unix操做系統提供了一個高度優化的代碼路徑(code path)。在Linux中這是經過sendfile這個系統調用實現的。經過Java中的API,FileChannel.transferTo,由它來簡潔的調用上述的系統調用。

 

 

爲了理解sendfile所帶來的效果,重要的是要理解將數據從文件傳輸到socket的數據路徑:

  1. 操做系統將數據從磁盤中讀取到內核空間裏的頁面緩存

  2. 應用程序將數據從內核空間讀入到用戶空間的緩衝區

  3. 應用程序將讀到的數據寫回內核空間並放入socke的緩衝區

  4. 操做系統將數據從socket的緩衝區拷貝到NIC(網絡藉口卡,即網卡)的緩衝區,自此數據才能經過網絡發送出去

這樣效率顯然很低,由於裏面涉及4次拷貝,2次系統調用。使用sendfile就能夠避免這些重複的拷貝操做,讓OS直接將數據從頁面緩存發送到網絡中,其中只需最後一步中的將數據拷貝到NIC的緩衝區。

咱們預期的一種常見的用例是一個話題擁有多個消息使用者。採用前文所述的零拷貝優化方案,數據只需拷貝到頁面緩存中一次,而後每次發送給使用者時都對它進行重複使用便可,而無須先保存到內存中,而後在閱讀該消息時每次都須要將其拷貝到內核空間中。如此一來,消息使用的速度就能接近網絡鏈接的極限。

要獲得Java中對send'file和零拷貝的支持方面的更多背景知識,請參考IBM developerworks上的這篇文章

 

 

端到端的批量壓縮

 

多數狀況下系統的瓶頸是網絡而不是CPU。 這一點對於須要將消息在個數據中心間進行傳輸的數據管道來講,尤爲如此。固然,無需來自Kafka的支持,用戶老是能夠自行將消息壓縮後進行傳輸,但這麼作的壓縮率會很是低,由於不一樣的消息裏都有不少重複性的內容(好比JSON裏的字段名、web日誌中的用戶代理或者經常使用的字符串)。高效壓縮須要將多條消息一塊兒進行壓縮而不是分別壓縮每條消息。理想狀況下,以端到端的方式這麼作是行得通的 —— 也即,數據在消息生產者發送以前先壓縮一下,而後在服務器上一直保存壓縮狀態,只有到最終的消息使用者那裏才須要將其解壓縮。

經過運行遞歸消息集,Kafka對這種壓縮方式提供了支持。 一批消息能夠打包到一塊兒進行壓縮,而後以這種形式發送給服務器。這批消息都會被髮送給同一個消息使用者,並會在到達使用者那裏以前一直保持爲被壓縮的形式。

Kafka支持GZIP和Snappy壓縮協議。關於壓縮的更多更詳細的信息,請參見這裏

 

 

客戶狀態

 

追蹤(客戶)消費了什麼是一個消息系統必須提供的一個關鍵功能之一。它並不直觀,可是記錄這個狀態是該系統的關鍵性能之一。狀態追蹤要求(不斷)更新一個有持久性的實體的和一些潛在會發生的隨機訪問。所以它更可能受到存儲系統的查詢時間的制約而不是帶寬(正如上面所描述的)。

大部分消息系統保留着關於代理者使用(消費)的消息的元數據。也就是說,當消息被交到客戶手上時,代理者本身記錄了整個過程。這是一個至關直觀的選擇,並且確實對於一個單機服務器來講,它(數據)能去(放在)哪裏是不清晰的。又因爲許多消息系統存儲使用的數據結構規模小,因此這也是個實用的選擇--由於代理者知道什麼被消費了使得它能夠馬上刪除它(數據),保持數據大小不過大。

 

也許不顯然的是,讓代理和使用者這二者對消息的使用狀況作到一致表述毫不是一件垂手可得的事情。若是代理每次都是在將消息發送到網絡中後就將該消息記錄爲已使用的話,一旦使用者沒能真正處理到該消息(比方說,由於它宕機或這請求超時了抑或別的什麼緣由),就會出現消息丟失的狀況。爲了解決此問題,許多消息系新加了一個確認功能,當消息發出後僅把它標示爲已發送而不是已使用,而後代理須要等到來自使用者的特定的確認信息後纔將消息記錄爲已使用。這種策略的確解決了丟失消息的問題,但由此產生了新問題。首先,若是使用者已經處理了該消息但卻未能發送出確認信息,那麼就會讓這一條消息被處理兩次。第二個問題是關於性能的,這種策略中的代理必須爲每條單個的消息維護多個狀態(首先爲了防止重複發送就要將消息鎖定,而後,而後還要將消息標示爲已使用後才能刪除該消息)。另外還有一些棘手的問題須要處理,好比,對於那些以發出卻未獲得確認的消息該如何處理?

 

 

消息傳遞語義(Message delivery semantics)

 

系統能夠提供的幾種可能的消息傳遞保障以下所示:

  • 最多一次—這種用於處理前段文字所述的第一種狀況。消息在發出後當即標示爲已使用,所以消息不會被髮出去兩次,但這在許多故障中都會致使消息丟失。

  • 至少一次—這種用於處理前文所述的第二種狀況,系統保證每條消息至少會發送一次,但在有故障的狀況下可能會致使重複發送。

  • 僅僅一次—這種是人們實際想要的,每條消息只會並且僅會發送一次。

這個問題已獲得普遍的研究,屬於「事務提交」問題的一個變種。提供僅僅一次語義的算法已經有了,兩階段或者三階段提交法以及Paxos算法的一些變種就是其中的一些例子,但它們都有與生俱來的的缺陷。這些算法每每須要多個網絡往返(round trip),可能也沒法很好的保證其活性(liveness)(它們可能會致使無限期停機)。FLP結果給出了這些算法的一些基本的侷限。

Kafka對元數據作了兩件很不尋常的事情。一件是,代理將數據流劃分爲一組互相獨立的分區。這些分區的語義由生產者定義,由生產者來指定每條消息屬於哪一個分區。一個分區內的消息以到達代理的時間爲準進行排序,未來按此順序將消息發送給使用者。這麼一來,就用不着爲每一天消息保存一條元數據(好比說,將消息標示爲已使用)了,咱們只需爲使用者、話題和分區的每種組合記錄一個「最高水位標記」(high water mark)便可。所以,標示使用者狀態所需的元數據總量實際上特別小。在Kafka中,咱們將該最高水位標記稱爲「偏移量」(offset),這麼叫的緣由將在實現細節部分講解。

 

 

使用者的狀態

 

在Kafka中,由使用者負責維護反映哪些消息已被使用的狀態信息(偏移量)。典型狀況下,Kafka使用者的library會把狀態數據保存到Zookeeper之中。然而,讓使用者將狀態信息保存到保存它們的消息處理結果的那個數據存儲(datastore)中也許會更佳。例如,使用者也許就是要把一些統計值存儲到集中式事物OLTP數據庫中,在這種狀況下,使用者能夠在進行那個數據庫數據更改的同一個事務中將消息使用狀態信息存儲起來。這樣就消除了分佈式的部分,從而解決了分佈式中的一致性問題!這在非事務性系統中也有相似的技巧可用。搜索系統可用將使用者狀態信息同它的索引段(index segment)存儲到一塊兒。儘管這麼作可能沒法保證數據的持久性(durability),但卻可用讓索引同使用者狀態信息保存同步:若是因爲宕機形成有一些沒有刷新到磁盤的索引段信息丟了,咱們老是可用從上次創建檢查點(checkpoint)的偏移量處繼續對索引進行處理。與此相似,Hadoop的加載做業(load job)從Kafka中並行加載,也有相同的技巧可用。每一個Mapper在map任務結束前,將它使用的最後一個消息的偏移量存入HDFS。

這個決策還帶來一個額外的好處。使用者可用故意回退(rewind)到之前的偏移量處,再次使用一遍之前使用過的數據。雖然這麼作違背了隊列的通常協約(contract),但對不少使用者來說倒是個很基本的功能。舉個例子,若是使用者的代碼裏有個Bug,並且是在它處理完一些消息以後才被發現的,那麼當把Bug改正後,使用者還有機會從新處理一遍那些消息。

 

 

Push和Pull

 

相關問題還有一個,就是究竟是應該讓使用者從代理那裏吧數據Pull(拉)回來仍是應該讓代理把數據Push(推)給使用者。和大部分消息系統同樣,Kafka在這方面遵循了一種更加傳統的設計思路:由生產者將數據Push給代理,而後由使用者將數據代理那裏Pull回來。近來有些系統,好比scribe和flume,更着重於日誌統計功能,遵循了一種很是不一樣的基於Push的設計思路,其中每一個節點均可以做爲代理,數據一直都是向下遊Push的。上述兩種方法都各有優缺點。然而,由於基於Push的系統中代理控制着數據的傳輸速率,所以它難以應付大量不一樣種類的使用者。咱們的設計目標是,讓使用者能以它最大的速率使用數據。不幸的是,在Push系統中當數據的使用速率低於產生的速率時,使用者每每會處於超載狀態(這實際上就是一種拒絕服務攻擊)。基於Pull的系統在使用者的處理速度稍稍落後的狀況下會表現更佳,並且還可讓使用者在有能力的時候每每前趕趕。讓使用者採用某種退避協議(backoff protocol)向代理代表本身處於超載狀態,能夠解決部分問題,可是,將傳輸速率調整到正好能夠徹底利用(但從不能過分利用)使用者的處理能力可比初看上去難多了。之前咱們嘗試過屢次,想按這種方式構建系統,獲得的經驗教訓使得咱們選擇了更加常規的Pull模型。

 

 

分發

 

Kafka一般狀況下是運行在集羣中的服務器上。沒有中央的「主」節點。代理彼此之間是對等的,不須要任何手動配置便可可隨時添加和刪除。一樣,生產者和消費者能夠在任什麼時候候開啓。 每一個代理均可以在Zookeeper(分佈式協調系統)中註冊的一些元數據(例如,可用的主題)。生產者和消費者可使用Zookeeper發現主題和相互協調。關於生產者和消費者的細節將在下面描述。

 

 

生產者

生產者自動負載均衡

 

對於生產者,Kafka支持客戶端負載均衡,也可使用一個專用的負載均衡器對TCP鏈接進行負載均衡調整。專用的第四層負載均衡器在Kafka代理之上對TCP鏈接進行負載均衡。在這種配置的狀況,一個給定的生產者所發送的消息都會發送給一個單個的代理。使用第四層負載均衡器的好處是,每一個生產者僅需一個單個的TCP鏈接而無須同Zookeeper創建任何鏈接。很差的地方在於全部均衡工做都是在TCP鏈接的層次完成的,於是均衡效果可能並不佳(若是有些生產者產生的消息遠多於其它生產者,按每一個代理對TCP鏈接進行平均分配可能會致使每一個代理接收到的消息總數並不平均)。

 

 

採用客戶端基於zookeeper的負載均衡能夠解決部分問題。若是這麼作就能讓生產者動態地發現新的代理,並按請求數量進行負載均衡。相似的,它還能讓生產者按照某些鍵值(key)對數據進行分區(partition)而不是隨機亂分,於是能夠保存同使用者的關聯關係(例如,按照用戶id對數據使用進行分區)。這種分法叫作「語義分區」(semantic partitioning),下文再討論其細節。

下面講解基於zookeeper的負載均衡的工做原理。在發生下列事件時要對zookeeper的監視器(watcher)進行註冊:

  • 加入了新的代理

  • 有一個代理下線了

  • 註冊了新的話題

  • 代理註冊了已有話題。

生產者在其內部爲每個代理維護了一個彈性的鏈接(同代理創建的鏈接)池。經過使用zookeeper監視器的回調函數(callback),該鏈接池在創建/保持同全部在線代理的鏈接時都要進行更新。當生產者要求進入某特定話題時,由分區者(partitioner)選擇一個代理分區(參加語義分區小結)。從鏈接池中找出可用的生產者鏈接,並經過它將數據發送到剛纔所選的代理分區。

 

 

異步發送

 

對於可伸縮的消息系統而言,異步非阻塞式操做是不可或缺的。在Kafka中,生產者有個選項(producer.type=async)可用指定使用異步分發出產請求(produce request)。這樣就容許用一個內存隊列(in-memory queue)把生產請求放入緩衝區,而後再以某個時間間隔或者事先配置好的批量大小將數據批量發送出去。由於通常來講數據會從一組以不一樣的數據速度生產數據的異構的機器中發佈出,因此對於代理而言,這種異步緩衝的方式有助於產生均勻一致的流量,於是會有更佳的網絡利用率和更高的吞吐量。

 

 

語義分區

 

下面看看一個想要爲每一個成員統計一個我的空間訪客總數的程序該怎麼作。應該把一個成員的全部我的空間訪問事件發送給某特定分區,所以就能夠把對一個成員的全部更新都放在同一個使用者線程中的同一個事件流中。生產者具備從語義上將消息映射到有效的Kafka節點和分區之上的能力。這樣就能夠用一個語義分區函數將消息流按照消息中的某個鍵值進行分區,並將不一樣分區發送給各自相應的代理。經過實現kafak.producer.Partitioner接口,能夠對分區函數進行定製。在缺省狀況下使用的是隨即分區函數。上例中,那個鍵值應該是member_id,分區函數能夠是hash(member_id)%num_partitions。

 

 

對Hadoop以及其它批量數據裝載的支持

 

具備伸縮性的持久化方案使得Kafka可支持批量數據裝載,可以週期性將快照數據載入進行批量處理的離線系統。咱們利用這個功能將數據載入咱們的數據倉庫(data warehouse)和Hadoop集羣。

批量處理始於數據載入階段,而後進入非循環圖(acyclic graph)處理過程以及輸出階段(支持狀況在這裏)。支持這種處理模型的一個重要特性是,要有從新裝載從某個時間點開始的數據的能力(以防處理中有任何錯誤發生)。

對於Hadoop,咱們經過在單個的map任務之上分割裝載任務對數據的裝載進行了並行化處理,分割時,全部節點/話題/分區的每種組合都要分出一個來。Hadoop提供了任務管理,失敗的任務能夠重頭再來,不存在數據被重複的危險。

 

 

實施細則

 

下面給出了一些在上一節所描述的低層相關的實現系統的某些部分的細節的簡要說明。

API 設計

生產者 APIs

生產者 API 是給兩個底層生產者的再封裝 -kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.

[java] view plaincopy

 

  1. class Producer {  

  2.       

  3.   /* Sends the data, partitioned by key to the topic using either the */  

  4.   /* synchronous or the asynchronous producer */  

  5.   public void send(kafka.javaapi.producer.ProducerData producerData);  

  6.   

  7.   /* Sends a list of data, partitioned by key to the topic using either */  

  8.   /* the synchronous or the asynchronous producer */  

  9.   public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);  

  10.   

  11.   /* Closes the producer and cleans up */     

  12.   public void close();  

  13.   

  14. }   

 

該API的目的是將生產者的全部功能經過一個單個的API公開給其使用者(client)。新建的生產者能夠:

  • 對多個生產者請求進行排隊/緩衝並異步發送批量數據 —— kafka.producer.Producer提供了在將多個生產請求序列化併發送給適當的Kafka代理分區以前,對這些生產請求進行批量處理的能力(producer.type=async)。批量的大小能夠經過一些配置參數進行控制。當事件進入隊列時會先放入隊列進行緩衝,直到時間到了queue.time或者批量大小到達batch.size爲止,後臺線程(kafka.producer.async.ProducerSendThread)會將這批數據從隊列中取出,交給kafka.producer.EventHandler進行序列化併發送給適當的kafka代理分區。經過event.handler這個配置參數,能夠在系統中插入一個自定義的事件處理器。在該生產者隊列管道中的各個不一樣階段,爲了插入自定義的日誌/跟蹤代碼或者自定義的監視邏輯,如能注入回調函數會很是有用。經過實現kafka.producer.asyn.CallbackHandler接口並將配置參數callback.handler設置爲實現類就可以實現注入。

  • 使用用戶指定的Encoder處理數據的序列化(serialization)

    1 interface Encoder<T> {
    2   public Message toMessage(T data);
    3 }

    Encoder的缺省值是一個什麼活都不幹的kafka.serializer.DefaultEncoder。

  • 提供基於zookeeper的代理自動發現功能 —— 經過使用zk.connect配置參數指定zookeeper的鏈接url,就可以使用基於zookeeper的代理髮現和負載均衡功能。在有些應用場合,可能不太適合於依賴zookeeper。在這種狀況下,生產者能夠從broker.list這個配置參數中得到一個代理的靜態列表,每一個生產請求會被隨即的分配給各代理分區。若是相應的代理宕機,那麼生產請求就會失敗。

  • 經過使用一個可選性的、由用戶指定的Partitioner,提供由軟件實現的負載均衡功能 —— 數據發送路徑選擇決策受kafka.producer.Partitioner的影響。

    1 interface Partitioner<T> {
    2    int partition(T key, int numPartitions);
    3 }

    分區API根據相關的鍵值以及系統中具備的代理分區的數量返回一個分區id。將該id用做索引,在broker_id和partition組成的通過排序的列表中爲相應的生產者請求找出一個代理分區。缺省的分區策略是hash(key)%numPartitions。若是key爲null,那就進行隨機選擇。使用partitioner.class這個配置參數也能夠插入自定義的分區策略。

 

 

使用者API

 

咱們有兩個層次的使用者API。底層比較簡單的API維護了一個同單個代理創建的鏈接,徹底同發送給服務器的網絡請求相吻合。該API徹底是無狀態的,每一個請求都帶有一個偏移量做爲參數,從而容許用戶以本身選擇的任意方式維護該元數據。

高層API對使用者隱藏了代理的具體細節,讓使用者可運行於集羣中的機器之上而無需關心底層的拓撲結構。它還維護着數據使用的狀態。高層API還提供了訂閱同一個過濾表達式(例如,白名單或黑名單的正則表達式)相匹配的多個話題的能力。

 

 

底層API

class SimpleConsumer {
	
  /* Send fetch request to a broker and get back a set of messages. */ 
  public ByteBufferMessageSet fetch(FetchRequest request);

  /* Send a list of fetch requests to a broker and get back a response set. */ 
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);

  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

 

底層API不但用於實現高層API,並且還直接用於咱們的離線使用者(好比Hadoop這個使用者),這些使用者還對狀態的維護有比較特定的需求。

高層API

/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {
	
  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 

  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()
  
  /* Shut down the connector */
  public shutdown()
}

該API的中心是一個由KafkaStream這個類實現的迭代器(iterator)。每一個KafkaStream都表明着一個從一個或多個分區到一個或多個服務器的消息流。每一個流都是使用單個線程進行處理的,因此,該API的使用者在該API的建立調用中能夠提供所需的任意個數的流。這樣,一個流可能會表明多個服務器分區的合併(同處理線程的數目相同),但每一個分區只會把數據發送給一個流中。

createMessageStreams方法爲使用者註冊到相應的話題之上,這將致使須要對使用者/代理的分配狀況進行從新平衡。爲了將從新平衡操做減小到最小。該API鼓勵在一次調用中就建立多個話題流。createMessageStreamsByFilter方法爲發現同其過濾條件想匹配的話題(額外地)註冊了多個監視器(watchers)。應該注意,createMessageStreamsByFilter方法所返回的每一個流均可能會對多個話題進行迭代(好比,在知足過濾條件的話題有多個的狀況下)。

 

網絡層

網絡層就是一個特別直截了當的NIO服務器,在此就不進行過於細緻的討論了。sendfile是經過給MessageSet接口添加了一個writeTo方法實現的。這樣就可讓基於文件的消息更加高效地利用transferTo實現,而不是使用線程內緩衝區讀寫方式。線程模型用的是一個單個的接收器(acceptor)線程和每一個能夠處理固定數量網絡鏈接的N個處理器線程。這種設計方案在別處已經通過了很是完全的檢驗,發現其實現起來簡單、運行起來很快。其中使用的協議一直都很是簡單,未來還能夠用其它語言實現其客戶端。

 

 

消息

 

消息由一個固定大小的消息頭和一個變長不透明字節數字的有效載荷構成(opaque byte array payload)。消息頭包含格式的版本信息和一個用於探測出壞數據和不完整數據的CRC32校驗。讓有效載荷保持不透明是個很是正確的決策:在用於序列化的代碼庫方面如今正在取得很是大的進展,任何特定的選擇都不可能適用於全部的使用狀況。都不用說,在Kafka的某特定應用中頗有可能在它的使用中須要採用某種特殊的序列化類型。MessageSet接口就是一個使用特殊的方法對NIOChannel進行大宗數據讀寫(bulk reading and writing to an NIOChannel)的消息迭代器。

 

 

消息的格式

	/** 
	 * A message. The format of an N byte message is the following: 
	 * 
	 * If magic byte is 0 
	 * 
	 * 1. 1 byte "magic" identifier to allow format changes 
	 * 
	 * 2. 4 byte CRC32 of the payload 
	 * 
	 * 3. N - 5 byte payload 
	 * 
	 * If magic byte is 1 
	 * 
	 * 1. 1 byte "magic" identifier to allow format changes 
	 * 
	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
	 * 
	 * 3. 4 byte CRC32 of the payload 
	 * 
	 * 4. N - 6 byte payload 
	 * 
	 */

日誌

 

具備兩個分區的、名稱爲"my_topic"的話題的日誌由兩個目錄組成(即:my_topic_0和my_topic_1),目錄中存儲的是內容爲該話題的消息的數據文件。日誌的文件格式是一系列的「日誌項」;每條日誌項包含一個表示消息長度的4字節整數N,其後接着保存的是N字節的消息。每條消息用一個64位的整數偏移量進行惟一性標示,該偏移量表示了該消息在那個分區中的那個話題下發送的全部消息組成的消息流中所處的字節位置。每條消息在磁盤上的格式以下文所示。每一個日誌文件的以它所包含的第一條消息的偏移量來命名。所以,第一個建立出來的文件的名字將爲00000000000.kafka,隨後每一個後加的文件的名字將是前一個文件的文件名大約再加S個字節所得的整數,其中,S是配置文件中指定的最大日誌文件的大小。

 

 

消息的確切的二進制格式都有版本,它保持爲一個標準的接口,讓消息集能夠根據須要在生產者、代理、和使用者直接進行自由傳輸而無須從新拷貝或轉換。其格式以下所示:

On-disk format of a message

message length : 4 bytes (value: 1+4+n) 
"magic" value  : 1 byte
crc            : 4 bytes
payload        : n bytes

將消息的偏移量做爲消息的可不常見。咱們原先的想法是使用由生產者產生的GUID做爲消息id,而後在每一個代理上做一個從GUID到偏移量的映射。可是,既然使用者必須爲每一個服務器維護一個ID,那麼GUID所具備的全局惟一性就失去了價值。更有甚者,維護將從一個隨機數到偏移量的映射關係帶來的複雜性,使得咱們必須使用一種重量級的索引結構,並且這種結構還必須與磁盤保持同步,這樣咱們還就必須使用一種徹底持久化的、需隨機訪問的數據結構。如此一來,爲了簡化查詢結構,咱們就決定使用一個簡單的依分區的原子計數器(atomic counter),這個計數器能夠同分區id以及節點id結合起來惟一的指定一條消息;這種方法使得查詢結構簡化很多,儘管每次在處理使用者請求時仍有可能會涉及屢次磁盤尋道操做。然而,一旦咱們決定使用計數器,跳向直接使用偏移量做爲id就很是天然了,畢竟二者都是分區內具備惟一性的、單調增長的整數。既然偏移量是在使用者API中並不會體現出來,因此這個決策最終仍是屬於一個實現細節,進而咱們就選擇了這種更加高效的方式。

 

 

寫操做

 

日誌能夠順序添加,添加的內容老是保存到最後一個文件。當大小超過配置中指定的大小(好比說1G)後,該文件就會換成另一個新文件。有關日誌的配置參數有兩個,一個是M,用於指出寫入多少條消息以後就要強制OS將文件刷新到磁盤;另外一個是S,用來指定過多少秒就要強制進行一次刷新。這樣就能夠保證一旦發生系統崩潰,最多會有M條消息丟失,或者最長會有S秒的數據丟失,

 

 

讀操做

 

能夠經過給出消息的64位邏輯偏移量和S字節的數據塊最大的字節數對日誌文件進行讀取。讀取操做返回的是這S個字節中包含的消息的迭代器。S應該要比最長的單條消息的字節數大,但在出現特別長的消息狀況下,能夠重複進行屢次讀取,每次的緩衝區大小都加倍,直到能成功讀取出這樣長的一條消息。也能夠指定一個最大的消息和緩衝區大小並讓服務器拒絕接收比這個大小大一些的消息,這樣也能給客戶端一個可以讀取一條完整消息所需緩衝區的大小的上限。頗有可能會出現讀取緩衝區以一個不完整的消息結尾的狀況,這個狀況用大小界定(size delimiting)很容易就能探知。

 

 

從某偏移量開始進行日誌讀取的實際過程須要先找出存儲所需數據的日誌段文件,從全局偏移量計算出文件內偏移量,而後再從該文件偏移量處開始讀取。搜索過程經過對每一個文件保存在內存中的範圍值進行一種變化後的二分查找完成。

日誌提供了獲取最新寫入的消息的功能,從而容許從「當下」開始消息訂閱。這個功能在使用者在SLA規定的天數內沒能正常使用數據的狀況下也頗有用。當使用者企圖從一個並不存在的偏移量開始使用數據時就會出現這種狀況,此時使用者會獲得一個OutOfRangeException異常,它能夠根據具體的使用狀況對本身進行重啓或者僅僅失敗而退出。

 

 

如下是發送給數據使用者(consumer)的結果的格式。

MessageSetSend (fetch result)

total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes
MultiMessageSetSend (multiFetch result)

total length       : 4 bytes
error code         : 2 bytes
messageSetSend 1
...
messageSetSend n

刪除

一次只能刪除一個日誌段的數據。 日誌管理器容許經過可加載的刪除策略設定刪除的文件。 當前策略刪除修改事件超過N 天以上的文件,也能夠選擇保留最後 N GB 的數據。 爲了不刪除時的讀取鎖定衝突,咱們可使用副本寫入模式,以便在進行刪除的同時對日誌段的一個不變的靜態快照進行二進制搜索。

 

 

數據正確性保證

 

日誌功能裏有一個配置參數M,可對在強制進行磁盤刷新以前可寫入的消息的最大條目數進行控制。在系統啓動時會運行一個日誌恢復過程,對最新的日誌段內全部消息進行迭代,以對每條消息項的有效性進行驗證。一條消息項是合法的,僅當其大小加偏移量小於文件的大小而且該消息中有效載荷的CRC32值同該消息中存儲的CRC值相等。在探測出有數據損壞的狀況下,就要將文件按照最後一個有效的偏移量進行截斷。

要注意,這裏有兩種必需處理的數據損壞狀況:因爲系統崩潰形成的未被正常寫入的數據塊(block)於是須要截斷的狀況以及因爲文件中被加入了毫無心義的數據塊而形成的數據損壞狀況。形成數據損壞的緣由是,通常來講OS並不能保證文件索引節點(inode)和實際數據塊這二者的寫入順序,所以,除了可能會丟失未刷新的已寫入數據以外,在索引節點已經用新的文件大小更新了但在將數據塊寫入磁盤塊以前發生了系統崩潰的狀況下,文件就可能會得到一些毫無心義的數據。CRC值就是用於這種極端狀況,避免由此形成整個日誌文件的損壞(儘管未獲得保存的消息固然是真的找不回來了)。

 

 

分發

Zookeeper目錄

 

接下來討論zookeeper用於在使用者和代理直接進行協調的結構和算法。

記法

當一個路徑中的元素是用[xyz]這種形式表示的時,其意思是, xyz的值並不固定並且實際上xyz的每種可能的值都有一個zookpeer z節點(znode)。例如,/topics/[topic]表示了一個名爲/topics的目錄,其中包含的子目錄同話題對應,一個話題一個目錄而且目錄名即爲話題的名稱。也能夠給出數字範圍,例如[0...5],表示的是子目錄0、一、二、三、4。箭頭->用於給出z節點的內容。例如/hello -> world表示的是一個名稱爲/hello的z節點,包含的值爲"world"。

 

 

代理節點的註冊

/brokers/ids/[0...N] --> host:port (ephemeral node)

 

上面是全部出現的代理節點的列表,列表中每一項都提供了一個具備惟一性的邏輯代理id,用於讓使用者可以識別代理的身份(這個必須在配置中給出)。在啓動時,代理節點就要用/brokers/ids下列出的邏輯代理id建立一個z節點,並在本身註冊到系統中。使用邏輯代理id的目的是,可讓咱們在不影響數據使用者的狀況下就能把一個代理搬到另外一臺不一樣的物理機器上。試圖用已在使用中的代理id(好比說,兩個服務器配置成了同一個代理id)進行註冊會致使發生錯誤。

由於代理是以非長久性z節點的方式註冊的,因此這個註冊過程是動態的,當代理關閉或宕機後註冊信息就會消失(至此要數據使用者,該代理再也不有效)。

代理話題的註冊

/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

每一個代理會都要註冊在某話題之下,註冊後它會維護並保存該話題的分區總數。

 

 

使用者和使用者小組

爲了對數據的使用進行負載均衡並記錄使用者使用的每一個代理上的每一個分區上的偏移量,全部話題的使用者都要在Zookeeper中進行註冊。

多個使用者能夠組成一個小組共同使用一個單個的話題。同一小組內的每一個使用者共享同一個給定的group_id。好比說,若是某個使用者負責用三臺機器進行某某處理過程,你就能夠爲這組使用者分配一個叫作「某某」的id。這個小組id是在使用者的配置文件中指定的,而且這就是你告訴使用者它到底屬於哪一個組的方法。

小組內的使用者要儘可能公正地劃分出分區,每一個分區僅爲小組內的一個使用者所使用。

 

 

使用者ID的註冊

 

除了小組內的全部使用者都要共享一個group_id以外,每一個使用者爲了要同其它使用者區別開來,還要有一個非永久性的、具備惟一性的consumer_id(採用hostname:uuid的形式)。 consumer_id要在如下的目錄中進行註冊。

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

小組內的每一個使用者都要在它所屬的小組中進行註冊並採用consumer_id建立一個z節點。z節點的值包含了一個<topic, #streams>的map。 consumer_id只是用來識別小組內活躍的每一個使用者。使用者創建的z節點是個臨時性的節點,所以若是這個使用者進程終止了,註冊信息也將隨之消失。

 

 

數據使用者偏移追蹤

 

數據使用者跟蹤他們在每一個分區中耗用的最大偏移量。這個值被存儲在一個Zookeeper(分佈式協調系統)目錄中。

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

分區擁有者註冊表

每一個代理分區都被分配給了指定使用者小組中的單個數據使用者。數據使用者必須在耗用給定分區前確立對其的全部權。要確立其全部權,數據使用者須要將其 id 寫入到特定代理分區中的一個臨時節點(ephemeral node)中。

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)

 

 

代理節點的註冊

 

代理節點之間基本上都是相互獨立的,所以它們只須要發佈它們擁有的信息。當有新的代理加入進來時,它會將本身註冊到代理節點註冊目錄中,寫下它的主機名和端口。代理還要將已有話題的列表和它們的邏輯分區註冊到代理話題註冊表中。在代理上生成新話題時,須要動態的對話題進行註冊。

使用者註冊算法

當使用者啓動時,它要作如下這些事情:

  1. 將本身註冊到它屬小組下的使用者id註冊表。

  2. 註冊一個監視使用者id列的表變化狀況(有新的使用者加入或者任何現有使用者的離開)的變化監視器。(每一個變化都會觸發一次對發生變化的使用者所屬的小組內的全部使用者進行負載均衡。)

  3. 主次一個監視代理id註冊表的變化狀況(有新的代理加入或者任何現有的代理的離開)的變化監視器。(每一個變化都會觸發一次對全部小組內的全部使用者負載均衡。)

  4. 若是使用者使用某話題過濾器建立了一個消息流,它還要註冊一個監視代理話題變化狀況(添加了新話題)的變化監視器。(每一個變化都會觸發一次對全部可用話題的評估,以找出話題過濾器過濾出哪些話題。新過濾出來的話題將觸發一次對該使用者所在的小組內全部的使用者負載均衡。)

  5. 迫使本身在小組內進行從新負載均衡。

 

 

使用者從新負載均衡的算法

 

使用者從新複雜均衡的算法可用讓小組內的全部使用者對哪一個使用者使用哪些分區達成一致意見。使用者從新負載均衡的動做每次添加或移除代理以及同一小組內的使用者時被觸發。對於一個給定的話題和一個給定的使用者小組,代理分區是在小組內的全部使用者中進行平均劃分的。一個分區老是由一個單個的使用者使用。這種設計方案簡化了實施過程。假設咱們運行多個使用者以併發的方式同時使用同一個分區,那麼在該分區上就會造成爭用(contention)的狀況,這樣一來就須要某種形式的鎖定機制。若是使用者的個數比分區多,就會出現有寫使用者根本得不到數據的狀況。在從新進行負載均衡的過程當中,咱們按照儘可能減小每一個使用者須要鏈接的代理的個數的方式,嚐嚐試着將分區分配給使用者。

每一個使用者在從新進行負載均衡時須要作下列的事情:

   1. 針對Ci所訂閱的每一個話題T
   2.   將PT設爲生產話題T的全部分區
   3.   將CG設爲小組內同Ci 同樣使用話題T的全部使用者
   4.   對PT進行排序(讓同一個代理上的各分區挨在一塊兒)
   5.   對CG進行排序 
   6.   將i設爲Ci在CG中的索引值並讓N = size(PT)/size(CG)
   7.   將從i*N到(i+1)*N - 1的分區分配給使用者Ci 
   8.   將Ci當前所擁有的分區從分區擁有者註冊表中刪除
   9.   將新分配的分區加入到分區擁有者註冊表中
        (咱們可能須要屢次嘗試才能讓原先的分區擁有者釋放其擁有權)

在觸發了一個使用者要從新進行負載均衡時,同一小組內的其它使用者也會幾乎在同時被觸發從新進行負載均衡。

相關文章
相關標籤/搜索