分佈式隊列編程:從模型、實戰到優化(轉)

前言

做爲一種基礎的抽象數據結構,隊列被普遍應用在各種編程中。大數據時代對跨進程、跨機器的通信提出了更高的要求,和以往相比,分佈式隊列編程的運用幾乎已無處不在。可是,這種常見的基礎性的事物每每容易被忽視,使用者每每會忽視兩點:html

  • 使用分佈式隊列的時候,沒有意識到它是隊列。
  • 有具體需求的時候,忘記了分佈式隊列的存在。

本文分爲三部分,包括模型篇、實戰篇和優化篇,主要剖析了分佈式隊列編程模型的需求來源、定義、結構以及其變化多樣性;根據做者在新美大實際工做經驗,給出了隊列式編程在分佈式環境下的一些具體應用。node

第一部分 模型篇

首先從最基礎的需求出發,詳細剖析分佈式隊列編程模型的需求來源、定義、結構以及其變化多樣性。經過這一部分的講解,做者指望能在兩方面幫助讀者:一方面,提供一個系統性的思考方法,使讀者可以將具體需求關聯到分佈式隊列編程模型,具有進行分佈式隊列架構的能力;另外一方面,經過全方位的講解,讓讀者可以快速識別工做中碰到的各類分佈式隊列編程模型。git

 

什麼時候選擇分佈式隊列apache

通訊是人們最基本的需求,一樣也是計算機最基本的需求。對於工程師而言,在編程和技術選型的時候,更容易進入大腦的概念是RPC、RESTful、Ajax、Kafka。在這些具體的概念後面,最本質的東西是「通訊」。因此,大部分建模和架構都須要從「通訊」這個基本概念開始。當肯定系統之間有通訊需求的時候,工程師們須要作不少的決策和平衡,這直接影響工程師們是否會選擇分佈式隊列編程模型做爲架構。從這個角度出發,影響建模的因素有四個:When、Who、Where、How。編程

When:同步VS異步設計模式

 

通訊的一個基本問題是:發出去的消息何時須要被接收到?這個問題引出了兩個基礎概念:「同步通訊」和「異步通訊」。根據理論抽象模型,同步通訊和異步通訊最本質的差異來自於時鐘機制的有無。同步通訊的雙方須要一個校準的時鐘,異步通訊的雙方不須要時鐘。現實的狀況是,沒有徹底校準的時鐘,因此沒有絕對的同步通訊。一樣,絕對異步通訊意味着沒法控制一個發出去的消息被接收到的時間點,無期限的等待一個消息顯然毫無實際意義。因此,實際編程中全部的通訊既不是「同步通訊」也不是「異步通訊」;或者說,既是「同步通訊」也是「異步通訊」。特別是對於應用層的通訊,其底層架構可能既包含「同步機制」也包含「異步機制」。判斷「同步」和「異步」消息的標準問題太深,而不適合繼續展開。做者這裏給一些啓發式的建議:緩存

  • 發出去的消息是否須要確認,若是不須要確認,更像是異步通訊,這種通訊有時候也稱爲單向通訊(One-Way Communication)。
  • 若是須要確認,能夠根據須要確認的時間長短進行判斷。時間長的更像是異步通訊,時間短的更像是同步通訊。固然時間長短的概念是純粹的主觀概念,不是客觀標準。
  • 發出去的消息是否阻塞下一個指令的執行,若是阻塞,更像是同步,不然,更像是異步。

不管如何,工程師們不能生活在混沌之中,不作決定每每是最壞的決定。當分析一個通訊需求或者進行通訊構架的時候,工程師們被迫做出「同步」仍是「異步」的決定。當決策的結論是「異步通訊」的時候,分佈式隊列編程模型就是一個備選項。

Who:發送者接收者解耦

在進行通訊需求分析的時候,須要回答的另一個基本問題是:消息的發送方是否關心誰來接收消息,或者反過來,消息接收方是否關心誰來發送消息。若是工程師的結論是:消息的發送方和接收方不關心對方是誰、以及在哪裏,分佈式隊列編程模型就是一個備選項。由於在這種場景下,分佈式隊列架構所帶來的解耦能給系統架構帶來這些好處:

  • 不管是發送方仍是接收方,只須要跟消息中間件通訊,接口統一。統一意味着下降開發成本。
  • 在不影響性能的前提下,同一套消息中間件部署,能夠被不一樣業務共享。共享意味着下降運維成本。
  • 發送方或者接收方單方面的部署拓撲的變化不影響對應的另外一方。解藕意味着靈活和可擴展。

Where:消息暫存機制

在進行通訊發送方設計的時候,令工程師們苦惱的問題是:若是消息沒法被迅速處理掉而產生堆積怎麼辦、可否被直接拋棄?若是根據需求分析,確認存在消息積存,而且消息不該該被拋棄,就應該考慮分佈式隊列編程模型構架,由於隊列能夠暫存消息。

How:如何傳遞

對通訊需求進行架構,一系列的基礎挑戰會迎面而來,這包括:

  • 可用性,如何保障通訊的高可用。
  • 可靠性,如何保證消息被可靠地傳遞。
  • 持久化,如何保證消息不會丟失。
  • 吞吐量和響應時間。
  • 跨平臺兼容性。
  • 除非工程師對造輪子有足夠的興趣,而且有充足的時間,採用一個知足各項指標的分佈式隊列編程模型就是一個簡單的選擇。

分佈式隊列編程定義

很難給出分佈式隊列編程模型的精肯定義,因爲本文偏重於應用,做者並不打算徹底參照某個標準的模型。整體而言:分佈式隊列編程模型包含三類角色:發送者(Sender)、分佈式隊列(Queue)、接收者(Receiver)。發送者和接收者分別指的是生產消息和接收消息的應用程序或服務。

須要重點明確的概念是分佈式隊列,它是提供如下功能的應用程序或服務:

  1. 接收「發送者」產生的消息實體;
  2. 傳輸、暫存該實體;
  3. 爲「接收者」提供讀取該消息實體的功能。

特定的場景下,它固然能夠是Kafka、RabbitMQ等消息中間件。但它的展示形式並不限於此,例如:

  • 隊列能夠是一張數據庫的表,發送者將消息寫入表,接收者從數據表裏讀消息。
  • 若是一個程序把數據寫入Redis等內存Cache裏面,另外一個程序從Cache裏面讀取,緩存在這裏就是一種分佈式隊列。
  • 流式編程裏面的的數據流傳輸也是一種隊列。
  • 典型的MVC(Model–view–controller)設計模式裏面,若是Model的變化須要致使View的變化,也能夠經過隊列進行傳輸。這裏的分佈式隊列能夠是數據庫,也能夠是某臺服務器上的一塊內存。

抽象模型

最基礎的分佈式隊列編程抽象模型是點對點模型,其餘抽象構架模型居於改基本模型上各角色的數量和交互變化所致使的不一樣拓撲圖。具體而言,不一樣數量的發送者、分佈式隊列以及接收者組合造成了不一樣的分佈式隊列編程模型。記住並理解典型的抽象模型結構對需求分析和建模而言相當重要,同時也會有助於學習和深刻理解開源框架以及別人的代碼。

點對點模型(Point-to-point)

基礎模型中,只有一個發送者、一個接收者和一個分佈式隊列。以下圖所示:

生產者消費者模型(Producer–consumer)

若是發送者和接收者均可以有多個部署實例,甚至不一樣的類型;可是共用同一個隊列,這就變成了標準的生產者消費者模型。在該模型,三個角色通常稱爲生產者(Producer)、分佈式隊列(Queue)、消費者(Consumer)。

發佈訂閱模型(PubSub)

若是隻有一類發送者,發送者將產生的消息實體按照不一樣的主題(Topic)分發到不一樣的邏輯隊列。每種主題隊列對應於一類接收者。這就變成了典型的發佈訂閱模型。在該模型,三個角色通常稱爲發佈者(Publisher),分佈式隊列(Queue),訂閱者(Subscriber)。

MVC模型

若是發送者和接收者存在於同一個實體中,可是共享一個分佈式隊列。這就很像經典的MVC模型。

編程模型

爲了讓讀者更好地理解分佈式隊列編程模式概念,這裏將其與一些容易混淆的概念作一些對比 。

分佈式隊列模型編程和異步編程

分佈式隊列編程模型的通信機制通常是採用異步機制,可是它並不等同於異步編程。

首先,並不是全部的異步編程都須要引入隊列的概念,例如:大部分的操做系統異步I/O操做都是經過硬件中斷( Hardware Interrupts)來實現的。

其次,異步編程並不必定須要跨進程,因此其應用場景並不必定是分佈式環境。

最後,分佈式隊列編程模型強調發送者、接收者和分佈式隊列這三個角色共同組成的架構。這三種角色與異步編程沒有太多關聯。

分佈式隊列模式編程和流式編程

隨着Spark Streaming,Apache Storm等流式框架的普遍應用,流式編程成了當前很是流行的編程模式。可是本文所闡述的分佈式隊列編程模型和流式編程並不是同一律念。

首先,本文的隊列編程模式不依賴於任何框架,而流式編程是在具體的流式框架內的編程。

其次,分佈式隊列編程模型是一個需求解決方案,關注如何根據實際需求進行分佈式隊列編程建模。流式框架裏的數據流通常都經過隊列傳遞,不過,流式編程的關注點比較聚焦,它關注如何從流式框架裏獲取消息流,進行map、reduce、 join等轉型(Transformation)操做、生成新的數據流,最終進行彙總、統計。

第二部分 實戰篇

第二部分實戰篇給出了隊列式編程在分佈式環境下的一些具體應用。這些例子的基礎模型並不是首次出如今互聯網的文檔中,可是全部的例子都是按照挑戰、構思、架構三個步驟進行講解的。

受限於保密性要求,有些細節並未給出,但這些細節並不影響講解的完整性。另外一方面,特別具體的需求容易讓人費解,爲了使講解更加順暢,做者也會採用一些更通俗易懂的例子。經過本篇的講解,但願和讀者一塊兒去實踐「如何從需求出發去構架分佈式隊列編程模型」。

須要聲明的是,這裏的解決方案並非所處場景的最優方案。可是,任何一個稍微複雜的問題,都沒有最優解決方案,更談不上惟一的解決方案。實際上,工程師天天所追尋的只是在知足必定約束條件下的可行方案。固然不一樣的約束會致使不一樣的方案,約束的鬆弛度決定了工程師的可選方案的寬廣度。

信息採集處理

信息採集處理應用普遍,例如:廣告計費、用戶行爲收集等。做者碰到的具體項目是爲廣告系統設計一套高可用的採集計費系統。

典型的廣告CPC、CPM計費原理是:收集用戶在客戶端或者網頁上的點擊和瀏覽行爲,按照點擊和瀏覽進行計費。計費業務有以下典型特徵:

  • 採集者和處理者解耦,採集發生在客戶端,而計費發生在服務端。
  • 計費與錢息息相關。
  • 重複計費意味着災難。
  • 計費是動態實時行爲,須要接受預算約束,若是消耗超過預算,則廣告投放須要中止。
  • 用戶的瀏覽和點擊量很是大。

挑戰

計費業務的典型特徵給咱們帶來了以下挑戰:

  • 高吞吐量--廣告的瀏覽和點擊量很是巨大,咱們須要設計一個高吞吐量的採集架構。
  • 高可用性--計費信息的丟失意味着直接的金錢損失。任何處理服務器的崩潰不該該致使系統不可用。
  • 高一致性要求--計費是一個實時動態處理過程,但要受到預算的約束。收集到的瀏覽和點擊行爲若是不能快速處理,可能會致使預算花超,或者點擊率預估不許確。因此採集到的信息應該在最短的時間內傳輸到計費中心進行計費。
  • 完整性約束--這包括反做弊規則,單個用戶行爲不能重複計費等。這要求計費是一個集中行爲而非分佈式行爲。
  • 持久化要求--計費信息須要持久化,避免由於機器崩潰而致使收集到的數據產生丟失。

構思

採集的高可用性意味着咱們須要多臺服務器同時採集,爲了不單IDC故障,採集服務器須要部署在多IDC裏面。

實現一個高可用、高吞吐量、高一致性的信息傳遞系統顯然是一個挑戰,爲了控制項目開發成本,採用開源的消息中間件進行消息傳輸就成了必然選擇。

完整性約束要求集中進行計費,因此計費系統發生在覈心IDC。

計費服務並不關心採集點在哪裏,採集服務也並不關心誰進行計費。

根據以上構思,咱們認爲採集計費符合典型的「生產者消費者模型」。

架構

採集計費系統架構圖以下:

  • 用戶點擊瀏覽收集服務(Click/View Collector)做爲生產者部署在多個機房裏,以提升收集服務可用性。
  • 每一個機房裏採集到的數據經過消息隊列中間件發送到核心機房IDC_Master。
  • Billing服務做爲消費者部署在覈心機房集中計費。

採用此架構,咱們能夠在以下方面作進一步優化:

  • 提升可擴展性,若是一個Billing部署實例在性能上沒法知足要求,能夠對採集的數據進行主題分區(Topic Partition)計費,即採用發佈訂閱模式以提升可擴展性(Scalability)。
  • 全局排重和反做弊。採用集中計費架構解決了點擊瀏覽排重的問題,另外一方面,這也給反做弊提供了全局信息。
  • 提升計費系統的可用性。採用下文單例服務優化策略,在保障計費系統集中性的同時,提升計費系統可用性。

分佈式緩存更新(Distributed Cache Replacement)

緩存是一個很是寬泛的概念,幾乎存在於系統各個層級。典型的緩存訪問流程以下:

  • 接收到請求後,先讀取緩存,若是命中則返回結果。
  • 若是緩存不命中,讀取DB或其它持久層服務,更新緩存並返回結果。

對於已經存入緩存的數據,其更新時機和更新頻率是一個經典問題,即緩存更新機制(Cache Replacement Algorithms )。典型的緩存更新機制包括:近期最少使用算法(LRU)、最不常用算法(LFU)。這兩種緩存更新機制的典型實現是:啓動一個後臺進程,按期清理最近沒有使用的,或者在一段時間內最少使用的數據。因爲存在緩存驅逐機制,當一個請求在沒有命中緩存時,業務層須要從持久層中獲取信息並更新緩存,提升一致性。

挑戰

分佈式緩存給緩存更新機制帶來了新的問題:

  • 數據一致性低。分佈式緩存中鍵值數量巨大,從而致使LRU或者LFU算法更新週期很長。在分佈式緩存中,拿LRU算法舉例,其典型作法是爲每一個Key值設置一個生存時間(TTL),生存時間到期後將該鍵值從緩存中驅逐除去。考慮到分佈式緩存中龐大的鍵值數量,生存時間每每會設置的比較長,這就致使緩存和持久層數據不一致時間很長。若是生存時間設置太短,大量請求沒法命中緩存被迫讀取持久層,系統響應時間會急劇惡化。
  • 新數據不可用。在不少場景下,因爲分佈式緩存和持久層的訪問性能相差太大,在緩存不命中的狀況下,一些應用層服務不會嘗試讀取持久層,而直接返回空結果。漫長的緩存更新週期意味着新數據的可用性就被犧牲了。從統計的角度來說,新鍵值須要等待半個更新週期纔會可用。

構思

根據上面的分析,分佈式緩存須要解決的問題是:在保證讀取性能的前提下,儘量地提升老數據的一致性和新數據的可用性。若是仍然假定最近被訪問的鍵值最有可能被再次訪問(這是LRU或者LFU成立的前提),鍵值每次被訪問後觸發一次異步更新就是提升可用性和一致性最先的時機。不管是高性能要求仍是業務解耦都要求緩存讀取和緩存更新分開,因此咱們應該構建一個單獨的集中的緩存更新服務。集中進行緩存更新的另一個好處來自於頻率控制。因爲在一段時間內,不少類型訪問鍵值的數量知足高斯分佈,短期內重複對同一個鍵值進行更新Cache並不會帶來明顯的好處,甚至形成緩存性能的降低。經過控制同一鍵值的更新頻率能夠大大緩解該問題,同時有利於提升總體數據的一致性,參見「排重優化」。

綜上所述,業務訪問方須要把請求鍵值快速傳輸給緩存更新方,它們之間不關心對方的業務。要快速、高性能地實現大量請求鍵值消息的傳輸,高性能分佈式消息中間件就是一個可選項。這三方一塊兒組成了一個典型的分佈式隊列編程模型。

架構

以下圖,全部的業務請求方做爲生產者,在返回業務代碼處理以前將請求鍵值寫入高性能隊列。Cache Updater做爲消費者從隊列中讀取請求鍵值,將持久層中數據更新到緩存中。

採用此架構,咱們能夠在以下方面作進一步優化:

  • 提升可擴展性,若是一個Cache Updater在性能上沒法知足要求,能夠對鍵值進行主題分區(Topic Partition)進行並行緩存更新,即採用發佈訂閱模式以提升可擴展性(Scalability)。
  • 更新頻率控制。緩存更新都集中處理,對於發佈訂閱模式,同一類主題(Topic)的鍵值集中處理。Cache Updater能夠控制對同一鍵值的在短時間內的更新頻率(參見下文排重優化)。

後臺任務處理

典型的後臺任務處理應用包括工單處理、火車票預訂系統、機票選座等。咱們所面對的問題是爲運營人員建立工單。一次能夠爲多個運營人員建立多個工單。這個應用場景和火車票購買很是相似。工單相對來講更加抽象,因此,下文會結合火車票購買和運營人員工單分配這兩種場景同時講解。典型的工單建立要經歷兩個階段:數據篩選階段、工單建立階段。例如,在火車票預訂場景,數據篩選階段用戶選擇特定時間、特定類型的火車,而在工單建立階段,用戶下單購買火車票。

挑戰

工單建立每每會面臨以下挑戰:

  • 數據一致性問題。以火車票預訂爲例,用戶篩選火車票和最終購買之間每每有必定的時延,意味着兩個操做之間數據是不一致的。在篩選階段,工程師們需決定是否進行車票鎖定,若是不鎖定,則沒法保證出票成功。反之,若是在篩選地時候鎖定車票,則會大大下降系統效率和出票吞吐量。
  • 約束問題。工單建立須要知足不少約束,主要包含兩種類型:動態約束,與操做者的操做行爲有關,例如購買幾張火車票的決定每每發生在篩選最後階段。隱性約束,這種約束很難經過界面進行展現,例如一個用戶購買了5張火車票,這些票應該是在同一個車箱的臨近位置。
  • 優化問題。工單建立每每是約束下的優化,這是典型的統籌優化問題,而統籌優化每每須要比較長的時間。
  • 響應時間問題。對於多任務工單,一個請求意味着多個任務產生。這些任務的建立每每須要遵循事務性原則,即All or Nothing。在數據層面,這意味着工單之間須要知足串行化需求(Serializability)。大數據量的串行化每每意味着鎖衝突延遲甚至失敗。不管是延遲機制所致使的長時延,仍是高建立失敗率,都會大大傷害用戶體驗。

構思

若是將用戶篩選的最終規則作爲消息存儲下來,併發送給工單建立系統。此時,工單建立系統將具有建立工單所需的全局信息,具有在知足各類約束的條件下進行統籌優化的能力。若是工單建立階段採用單實例部署,就能夠避免數據鎖定問題,同時也意味着沒有鎖衝突,因此也不會有死鎖或任務延遲問題。

居於以上思路,在多工單處理系統的模型中,篩選階段的規則建立系統將充當生產者角色,工單建立系統將充當消費者角色,篩選規則將做爲消息在二者之間進行傳遞。這就是典型的分佈式隊列編程架構。根據工單建立量的不一樣,能夠採用數據庫或開源的分佈式消息中間件做爲分佈式隊列。

架構

該架構流程以下圖:

  • 用戶首選進行規則建立,這個過程主要是一些搜索篩選操做。
  • 用戶點擊工單建立,TicketRule Generator將把全部的篩選性組裝成規則消息併發送到隊列裏面去。
  • Ticket Generator做爲一個消費者,實時從隊列中讀取工單建立請求,開始真正建立工單。

採用該架構,咱們在數據鎖定、運籌優化、原子性問題都能獲得比較好成果: 

  • 數據鎖定推遲到工單建立階段,能夠減小數據鎖定範圍,最大程度的下降工單建立對其餘在線操做的影響範圍。
  • 若是須要進行統籌優化,能夠將Ticket Generator以單例模式進行部署(參見單例服務優化)。這樣,Ticket Generator能夠讀取一段時間內的工單請求,進行全局優化。例如,在咱們的項目中,在某種條件下,運營人員須要知足分級公平原則,即相同級別的運營人員的工單數量應該接近,不一樣級別的運營人員工單數量應該有所區分。若是不集中進行統籌優化,實現這種優化規則將會很困難。
  • 保障了約束完整性。例如,在咱們的場景裏面,每一個運營人員天天可以處理的工單是有數量限制的,若是採用並行處理的方式,這種完整性約束將會很難實施。

第三部分 優化篇

接下來重點闡述工程師運用分佈式隊列編程構架的時候,在生產者、分佈式隊列以及消費者這三個環節的注意點以及優化建議。

肯定採用分佈式隊列編程模型以後,主體架構就算完成了,但工程師的工做還遠遠未結束。天下事必作於細,細節是一個不錯的架構向一個優秀的系統進階的關鍵因素。優化篇選取了做者以及其同事在運用分佈式隊列編程模型架構時所碰到的典型問題和解決方案。這裏些問題出現的頻率較高,若是你經驗不夠,極可能會「踩坑」。但願經過這些講解,幫助讀者下降分佈式隊列編程模型的使用門檻。本文將對分佈式隊列編程模型的三種角色:生產者(Producer),分佈式隊列(Queue),消費者(Consumer)分別進行優化討論。

生產者優化

在分佈式隊列編程中,生產者每每並不是真正的生產源頭,只是整個數據流中的一個節點,這種生產者的操做是處理-轉發(Process-Forward)模式。

這種模式給工程師們帶來的第一個問題是吞吐量問題。這種模式下運行的生產者,一邊接收上游的數據,一邊將處理完的數據發送給下游。本質上,它是一個很是經典的數學問題,其抽象模型是一些沒有蓋子的水箱,每一個水箱接收來自上一個水箱的水,進行處理以後,再將水發送到下一個水箱。工程師須要預測水源的流量、每一個環節水箱的處理能力、水龍頭的排水速度,最終目的是避免水溢出水箱,或者儘量地減少溢出事件的機率。實際上流式編程框架以及其開發者花了大量的精力去處理和優化這個問題。下文的緩存優化和批量寫入優化都是針對該問題的解決方案。

第二個須要考慮的問題是持久化。因爲各類緣由,系統老是會宕機。若是信息比較敏感,例如計費信息、火車票訂單信息等,工程師們須要考慮系統宕機所帶來的損失,找到讓損失最小化的解決方案。持久化優化重點解決這一類問題。

緩存優化

處於「處理-轉發」模式下運行的生產者每每被設計成請求驅動型的服務,即每一個請求都會觸發一個處理線程,線程處理完後將結果寫入分佈式隊列。若是因爲某種緣由隊列服務不可用,或者性能惡化,隨着新請求的到來,生產者的處理線程就會產生堆積。這可能會致使以下兩個問題:

  • 系統可用性下降。因爲每一個線程都須要必定的內存開銷,線程過多會使系統內存耗盡,甚至可能產生雪崩效應致使最終徹底不可用。
  • 信息丟失。爲了不繫統崩潰,工程師可能會給請求驅動型服務設置一個處理線程池,設置最大處理線程數量。這是一種典型的降級策略,目的是爲了系統崩潰。可是,後續的請求會由於沒有處理線程而被迫阻塞,最終可能產生信息丟失。例如:對於廣告計費採集,若是採集系統由於線程耗盡而不接收客戶端的計費行爲,這些計費行爲就會丟失。

緩解這類問題的思路來自於CAP理論,即經過下降一致性來提升可用性。生產者接收線程在收到請求以後第一時間不去處理,直接將請求緩存在內存中(犧牲一致性),而在後臺啓動多個處理線程從緩存中讀取請求、進行處理並寫入分佈式隊列。與線程所佔用的內存開銷相比,大部分的請求所佔內存幾乎能夠忽略。經過在接收請求和處理請求之間增長一層內存緩存,能夠大大提升系統的處理吞吐量和可擴展性。這個方案本質上是一個內存生產者消費者模型。

批量寫入優化

若是生產者的請求過大,寫分佈式隊列可能成爲性能瓶頸,有以下幾個因素:

  • 隊列自身性能不高。
  • 分佈式隊列編程模型每每被應用在跨機房的系統裏面,跨機房的網絡開銷每每容易成爲系統瓶頸。
  • 消息確認機制每每會大大下降隊列的吞吐量以及響應時間。

若是在處理請求和寫隊列之間添加一層緩存,消息寫入程序批量將消息寫入隊列,能夠大大提升系統的吞吐量。緣由以下:

  • 批量寫隊列能夠大大減小生產者和分佈式隊列的交互次數和消息傳輸量。特別是對於高吞吐小載荷的消息實體,批量寫能夠顯著下降網絡傳輸量。
  • 對於須要確認機制的消息,確認機制每每會大大下降隊列的吞吐量以及響應時間,某些高敏感的消息須要多個消息中間件代理同時確認,這近一步惡化性能。在生產者的應用層將多條消息批量組合成一個消息體,消息中間件就只須要對批量消息進行一次確認,這可能會數量級的提升消息傳輸性能。

持久化優化

經過添加緩存,消費者服務的吞吐量和可用性都獲得了提高。但緩存引入了一個新問題——內存數據丟失。對於敏感數據,工程師須要考慮以下兩個潛在問題:

  • 若是內存中存在未處理完的請求,而某些緣由致使生產者服務宕機,內存數據就會丟失而可能沒法恢復。
  • 若是分佈式隊列長時間不可用,隨着請求數量的不斷增長,最終系統內存可能會耗盡而崩潰,內存的消息也可能丟失。

因此緩存中的數據須要按期被持久化到磁盤等持久層設備中,典型的持久化觸發策略主要有兩種:

  • 按期觸發,即每隔一段時間進行一次持久化。
  • 定量觸發,即每當緩存中的請求數量達到必定閾值後進行持久化。
  • 是否須要持久化優化,以及持久化策略應該由請求數據的敏感度、請求量、持久化性能等因素共同決定。

中間件選型

分佈式隊列不等同於各類開源的或者收費的消息中間件,甚至在一些場景下徹底不須要使用消息中間件。可是,消息中間件產生的目的就是解決消息傳遞問題,這爲分佈式隊列編程架構提供了不少的便利。在實際工做中,工程師們應該將成熟的消息中間件做爲隊列的首要備選方案。

本節對消息中間件的功能、模型進行闡述,並給出一些消息中間件選型、部署的具體建議。

中間件的功能

明白一個系統的每一個具體功能是設計和架構一個系統的基礎。典型的消息中間件主要包含以下幾個功能:

  • 消息接收
  • 消息分發
  • 消息存儲
  • 消息讀取

概念模型

抽象的消息中間件模型包含以下幾個角色:

  • 發送者和接收者客戶端(Sender/Receiver Client),在具體實施過程當中,它們通常以庫的形式嵌入到應用程序代碼中。
  • 代理服務器(Broker Server),它們是與客戶端代碼直接交互的服務端代碼。
  • 消息交換機(Exchanger),接收到的消息通常須要經過消息交換機(Exchanger)分發到具體的消息隊列中。
  • 消息隊列,通常是一塊內存數據結構或持久化數據。

概念模型以下圖:

爲了提升分發性能,不少消息中間件把消息代理服務器的拓撲圖發送到發送者和接收者客戶端(Sender/Receiver Client),如此一來,發送源能夠直接進行消息分發。

選型標準

要完整的描述消息中間件各個方面很是困難,大部分良好的消息中間件都有完善的文檔,這些文檔的長度遠遠超過本文的總長度。但以下幾個標準是工程師們在進行消息中間件選型時常常須要考慮和權衡的。

性能

性能主要有兩個方面須要考慮:吞吐量(Throughput)和響應時間(Latency)。

不一樣的消息隊列中間件的吞吐量和響應時間相差甚遠,在選型時能夠去網上查看一些性能對比報告。

對於同一種中間件,不一樣的配置方式也會影響性能。主要有以下幾方面的配置:

  • 是否須要確認機制,即寫入隊列後,或從隊列讀取後,是否須要進行確認。確認機制對響應時間的影響每每很大。
  • 可否批處理,即消息可否批量讀取或者寫入。批量操做能夠大大減小應用程序與消息中間件的交互次數和消息傳遞量,大大提升吞吐量。
  • 可否進行分區(Partition)。將某一主題消息隊列進行分區,同一主題消息能夠有多臺機器並行處理。這不只僅能影響消息中間件的吞吐量,還決定着消息中間件是否具有良好的可伸縮性(Scalability)。
  • 是否須要進行持久化。將消息進行持久化每每會同時影響吞吐量和響應時間。

可靠性

可靠性主要包含:可用性、持久化、確認機制等。

高可用性的消息中間件應該具有以下特徵:

  • 消息中間件代理服務器(Broker)具備主從備份。即當一臺代理服務宕機以後,備用服務器能接管相關的服務。
  • 消息中間件中緩存的消息是否有備份、並持久化。
  • 根據CAP理論,高可用、高一致性以及網絡分裂不可兼得。根據做者的觀察,大部分的消息中間件在面臨網絡分裂的狀況下下,都很難保證數據的一致性以及可用性。 不少消息中間件都會提供一些可配置策略,讓使用者在可用性和一致性之間作權衡。

高可靠的消息中間件應該確保從發送者接收到的消息不會丟失。中間件代理服務器的宕機並非小几率事件,因此保存在內存中的消息很容易發生丟失。大部分的消息中間件都依賴於消息的持久化去下降消息丟失損失,即將接收到的消息寫入磁盤。即便提供持久化,仍有兩個問題須要考慮:

  • 磁盤損壞問題。長時間來看,磁盤出問題的機率仍然存在。
  • 性能問題。與操做內存相比,磁盤I/O的操做性能要慢幾個數量級。頻繁持久化不只會增長響應時間,也會下降吞吐量。
  • 解決這兩個問題的一個解決方案就是:多機確認,按期持久化。即消息被緩存在多臺機器的內存中,只有每臺機器都確認收到消息,纔跟發送者確認(不少消息中間件都會提供相應的配置選項,讓用戶設置最少須要多少臺機器接收到消息)。因爲多臺獨立機器同時出故障的機率遵循乘法法則,指數級下降,這會大大提升消息中間件的可靠性。

確認機制本質上是通信的握手機制(Handshaking)。若是沒有該機制,消息在傳輸過程當中丟失將不會被發現。高敏感的消息要求選取具有確認機制的消息中間件。固然若是沒有接收到消息中間件確認完成的指令,應用程序須要決定如何處理。典型的作法有兩個:

  • 屢次重試。
  • 暫存到本地磁盤或其它持久化媒介。

客戶端接口所支持語言

採用現存消息中間件就意味着避免重複造輪子。若是某個消息中間件未能提供對應語言的客戶端接口,則意味着極大的成本和兼容性問題。

投遞策略(Delivery policies)

投遞策略指的是一個消息會被髮送幾回。主要包含三種策略:最多一次(At most Once )、最少一次(At least Once)、僅有一次(Exactly Once)。

在實際應用中,只考慮消息中間件的投遞策略並不能保證業務的投遞策略,由於接收者在確認收到消息和處理完消息並持久化之間存在一個時間窗口。例如,即便消息中間件保證僅有一次(Exactly Once),若是接收者先確認消息,在持久化以前宕機,則該消息並未被處理。從應用的角度,這就是最多一次(At most Once)。反之,接收者先處理消息並完成持久化,但在確認以前宕機,消息就要被再次發送,這就是最少一次(At least Once)。 若是消息投遞策略很是重要,應用程序自身也須要仔細設計。

消費者優化

消費者是分佈式隊列編程中真正的數據處理方,數據處理方最多見的挑戰包括:有序性、串行化(Serializability)、頻次控制、完整性和一致性等。

挑戰

有序性

在不少場景下,如何保證隊列信息的有序處理是一個棘手的問題。以下圖,假定分佈式隊列保證請求嚴格有序,請求ri2和ri1都是針對同一數據記錄的不一樣狀態,ri2的狀態比ri1的狀態新。T一、T二、T3和T4表明各個操做發生的時間,而且 T1 < T2 < T3 < T4("<"表明早於)。

採用多消費者架構,這兩條記錄被兩個消費者(Consumer1和Consumer2)處理後更新到數據庫裏面。Consumer1雖然先讀取ri1可是卻後寫入數據庫,這就致使,新的狀態被老的狀態覆蓋,因此多消費者不保證數據的有序性。

串行化

不少場景下,串行化是數據處理的一個基本需求,這是保證數據完整性、可恢復性、事務原子性等的基礎。爲了在並行計算系統裏實現串行化,一系列的相關理論和實踐算法被提出。對於分佈式隊列編程架構,要在在多臺消費者實現串行化很是複雜,無異於重複造輪子。

頻次控制

有時候,消費者的消費頻次須要被控制,可能的緣由包括:

  • 費用問題。若是每次消費所引發的操做都須要收費,而同一個請求消息在隊列中保存多份,不進行頻次控制,就會致使無謂的浪費。
  • 性能問題。每次消費可能會引發對其餘服務的調用,被調用服務但願對調用量有所控制,對同一個請求消息的屢次訪問就須要有所控制。

完整性和一致性

完整性和一致性是全部多線程和多進程的代碼都面臨的問題。在多線程或者多進程的系統中考慮完整性和一致性每每會大大地增長代碼的複雜度和系統出錯的機率。

單例服務優化

幾乎全部串行化理論真正解決的問題只有一個:性能。 因此,在性能容許的前提下,對於消費者角色,建議採用單實例部署。經過單實例部署,有序性、串行化、完整性和一致性問題自動得到了解決。另外,單實例部署的消費者擁有所有所需信息,它能夠在頻次控制上採起不少優化策略。

天下沒有免費的午飯。一樣,單實例部署並不是沒有代價,它意味着系統可用性的下降,不少時候,這是沒法接受的。解決可用性問題的最直接的思路就是冗餘(Redundancy)。最經常使用的冗餘方案是Master-slave架構,不過大部分的Master-slave架構都是Active/active模式,即主從服務器都提供服務。例如,數據庫的Master-slave架構就是主從服務器都提供讀服務,只有主服務器提供寫服務。大部分基於負載均衡設計的Master-slave集羣中,主服務器和從服務器同時提供相同的服務。這顯然不知足單例服務優化需求。有序性和串行化須要Active/passive架構,即在某一時刻只有主實例提供服務,其餘的從服務等待主實例失效。這是典型的領導人選舉架構,即只有得到領導權的實例才能充當實際消費者,其餘實例都在等待下一次選舉。採用領導人選舉的Active/passive架構能夠大大緩解純粹的單實例部署所帶來的可用性問題。

使人遺憾的是,除非工程師們本身在消費者實例裏面實現Paxos等算法,並在每次消息處理以前都執行領導人選舉。不然,理論上講,沒有方法能夠保障在同一個時刻只有一個領導者。而對每一個消息都執行一次領導人選舉,顯然性能不可行。實際工做中,最容易出現的問題時機發生在領導人交接過程當中,即前任領導人實例變成輔助實例,新部署實例開始承擔領導人角色。爲了平穩過渡,這二者之間須要有必定的通信機制,可是,不管是網絡分區(Network partition)仍是原領導人服務崩潰都會使這種通信機制變的不可能。

對於完整性和一致性要求很高的系統,咱們須要在選舉制度和交接制度這兩塊進行優化。

領導人選舉架構

典型的領導人選舉算法有Paxos、ZAB( ZooKeeper Atomic Broadcast protocol)。爲了不重複造輪子,建議採用ZooKeeper的分佈式鎖來實現領導人選舉。典型的ZooKeeper實現算法以下(摘自參考資料[10]):

Let ELECTION be a path of choice of the application. To volunteer to be a leader:
1.Create znode z with path "ELECTION/guid-n_" with both SEQUENCE and EPHEMERAL flags;
2.Let C be the children of "ELECTION", and i be the sequence number of z;
3.Watch for changes on "ELECTION/guid-n_j", where j is the largest 
sequence number such that j < i and n_j is a znode in C;

Upon receiving a notification of znode deletion:
1.Let C be the new set of children of ELECTION;
2.If z is the smallest node in C, then execute leader procedure;
3.Otherwise, watch for changes on "ELECTION/guid-n_j", where j is 
the largest sequence number such that j < i and n_j is a znode in C;

領導人交接架構

領導人選舉的整個過程發生在ZooKeeper集羣中,各個消費者實例在這場選舉中只充當被告知者角色(Learner)。領導人選舉算法,只能保證最終只有一個Leader被選舉出來,並不保障被告知者對Leader的理解是徹底一致的。本質上,上文的架構裏,選舉的結果是做爲令牌(Token)傳遞給消費者實例,消費者將自身的ID與令牌進行對比,若是相等,則開始執行消費操做。因此當發生領導人換屆的狀況,不一樣的Learner獲知新Leader的時間並不一樣。例如,前任Leader若是由於網絡問題與ZooKeeper集羣斷開,前任Leader只能在超時後才能判斷本身是否再也不承擔Leader角色了,而新的Leader可能在這以前已經產生。另外一方面,即便前任Leader和新Leader同時接收到新Leader選舉結果,某些業務的完整性要求迫使前任Leader仍然完成當前未完成的工做。以上的講解很是抽象,生活中卻給了一些更加具體的例子。衆所周知,美國總統候選人在選舉結束後並不直接擔任美國總統,從選舉到最終承擔總統角色須要一個過渡期。對於新當選Leader的候選人而言,過渡期間稱之爲加冕階段(Inauguration)。對於即將卸任的Leader,過渡期稱爲交接階段(HandOver)。因此一個基於領導人選舉的消費者從加冕到卸任經歷三個階段:Inauguration、Execution、HandOver。在加冕階段,新領導須要進行一些初始化操做。Execution階段是真正的隊列消息處理階段。在交接階段,前任領導須要進行一些清理操做。

相似的,爲了解決領導人交接問題,全部的消費者從代碼實現的角度都須要實現相似ILeaderCareer接口。這個接口包含三個方發inaugurate(),handOver()和execute()。某個部署實例(Learner)在得知本身承擔領導人角色後,須要調用inaugurate()方法,進行加冕。主要的消費邏輯經過不停的執行execute()實現,當確認本身再也不承擔領導人以後,執行handOver()進行交接。

public interface ILeaderCareer
 {     
      public void inaugurate();     
      public void handOver();     
      public boolean execute(); 
 }

若是承擔領導人角色的消費者,在執行execute()階段得知本身將要下臺,根據消息處理的原子性,該領導人能夠決定是否提早終止操做。若是整個消息處理是一個原子性事務,直接終止該操做能夠快速實現領導人換屆。不然,前任領導必須完成當前消息處理後,才進入交接階段。這意味着新的領導人,在inaugurate()階段須要進行必定時間的等待。

排重優化

頻次控制是一個經典問題。對於分佈式隊列編程架構,相同請求重複出如今隊列的狀況並很多見。若是相同請求在隊列中重複太多,排重優化就顯得很必要。分佈式緩存更新是一個典型例子,全部請求都被髮送到隊列中用於緩存更新。若是請求符合典型的高斯分佈,在一段時間內會出現大量重複的請求,而同時多線程更新同一請求緩存顯然沒有太大的意義。

排重優化是一個算法,其本質是基於狀態機的編程,整個講解經過模型、構思和實施三個步驟完成。

模型

進行排重優化的前提是大量重複的請求。在模型這一小節,咱們首先闡述重複度模型、以及不一樣重複度所致使的消費模型,最後基於這兩個模型去講解排重狀態機。

重複度模型

首先咱們給出最小重複長度的概念。同一請求最小重複長度:同一請求在隊列中的重複出現的最小間距。例如,請求ri第一次出如今位置3,第二次出如今10,最小重複長度等於7。

是否須要進行排重優化取決於隊列中請求的重複度。因爲不一樣請求之間並不存在重複的問題,不失通常性,這裏的模型只考了單個請求的重複度,重複度分爲三個類:無重複、稀疏重複、高重複。

  • 無重複:在整個請求過程,沒有任何一個請求出現一次以上。
  • 稀疏重複:主要的請求最小重複長度大於消費隊列長度。
  • 高重複:大量請求最小重複長度小於消費隊列長度。

對於不一樣的重複度,會有不一樣的消費模型。

無重複消費模型

在整個隊列處理過程當中,全部的請求都不相同,以下圖:

稀疏重複消費模型

當同一請求最小重複長度大於消費者隊列長度,以下圖。假定有3個消費者,Consumer1將會處理r1,Consumer2將會處理r2,Consumer3將會處理r3,若是每一個請求處理的時間嚴格相等,Consumer1在處理完r1以後,接着處理r4,Consumer2將會處理r2以後會處理r1。雖然r1被再次處理,可是任什麼時候刻,只有這一個消費者在處理r1,不會出現多個消費者同時處理同一請求的場景。

高重複消費模型

以下圖,仍然假定有3個消費者,隊列中前面4個請求都是r1,它會同時被3個消費者線程處理:

顯然,對於無重複和稀疏重複的分佈式隊列,排重優化並不會帶來額外的好處。排重優化所針對的對象是高重複消費模型,特別是對於並行處理消費者比較多的狀況,重複處理同一請求,資源消耗極大。

排重狀態機

排重優化的主要對象是高重複的隊列,多個消費者線程或進程同時處理同一個冪等請求只會浪費計算資源並延遲其餘待請求處理。因此,排重狀態機的一個目標是處理惟一性,即:同一時刻,同一個請求只有一個消費者處理。若是消費者獲取一條請求消息,但發現其餘消費者正在處理該消息,則當前消費者應該處於等待狀態。若是對同一請求,有一個消費者在處理,一個消費者在等待,而同一請求再次被消費者讀取,再次等待則沒有意義。因此,狀態機的第二個目標是等待惟一性,即:同一時刻,同一個請求最多隻有一個消費者處於等待狀態。總上述,狀態機的目標是:處理惟一性和等待惟一性。咱們把正在處理的請求稱爲頭部請求,正在等待的請求稱爲尾部請求。

因爲狀態機的處理單元是請求,因此須要針對每個請求創建一個排重狀態機。基於以上要求,咱們設計的排重狀態機包含4個狀態Init,Process,Block,Decline。各個狀態之間轉化過程以下圖:

  1. 狀態機建立時處於Init狀態。
  2. 對Init狀態進行Enqueue操做,即接收一個請求,開始處理(稱爲頭部請求),狀態機進入Process狀態。
  3. 狀態機處於Process狀態,代表當前有消費者正在處理頭部請求。此時,若是進行Dequeue操做,即頭部請求處理完成,返回Init狀態。若是進行Enqueue操做,即另外一個消費者準備處理同一個請求,狀態機進入Block狀態(該請求稱爲尾部請求)。
  4. 狀態機處於Block狀態,代表頭部請求正在處理,尾部請求處於阻塞狀態。此時,進行Dequeue操做,即頭部請求處理完成,返回Process狀態,而且尾部請求變成頭部請求,原尾部請求消費者結束阻塞狀態,開始處理。進行Enqueue操做,代表一個新的消費者準備處理同一個請求,狀態機進入Decline狀態。
  5. 狀態機進入Decline狀態,根據等待惟一性目標,處理最新請求的消費者將被拋棄該消息,狀態機自動轉換回Block狀態。

構思

狀態機描述的是針對單個請求操做所引發狀態變化,排重優化須要解決隊列中全部請求的排重問題,須要對全部請求的狀態機進行管理。這裏只考慮單虛擬機內部對全部請求狀態機的管理,對於跨虛擬機的管理能夠採用相似的方法。對於多狀態機管理主要包含三個方面:一致性問題、完整性問題和請求緩存驅逐問題。

一致性問題

一致性在這裏要求同一請求的不一樣消費者只會操做一個狀態機。因爲每一個請求都產生一個狀態機,系統將會包含大量的狀態機。爲了兼顧性能和一致性,咱們採用ConcurrentHashMap保存全部的狀態機。用ConcurrentHashMap而不是對整個狀態機隊列進行加鎖,能夠提升並行處理能力,使得系統能夠同時操做不一樣狀態機。爲了不處理同一請求的多消費者線程同時對ConcurrentHashMap進行插入所致使狀態機不一致問題,咱們利用了ConcurrentHashMap的putIfAbsent()方法。代碼方案以下,key2Status用於存儲全部的狀態機。消費者在處理請求以前,從狀態機隊列中讀取排重狀態機TrafficAutomate。若是沒有找到,則建立一個新的狀態機,並經過putIfAbsent()方法插入到狀態機隊列中。

private ConcurrentHashMap<T, TrafficAutomate>
 key2Status = new ConcurrentHashMap(); 
TrafficAutomate trafficAutomate = key2Status.get(key);
if(trafficAutomate == null) 
{     
    trafficAutomate = new TrafficAutomate();     
    TrafficAutomate oldAutomate = key2Status.
	putIfAbsent(key, trafficAutomate);     
    if(oldAutomate != null)    
    {         
       trafficAutomate = oldAutomate; 
    } 
 } 

完整性問題

完整性要求保障狀態機Init,Process,Block,Decline四種狀態正確、狀態之間的轉換也正確。因爲狀態機的操做很是輕量級,兼顧完整性和下降代碼複雜度,咱們對狀態機的全部方法進行加鎖。

請求緩存驅逐問題(Cache Eviction)

若是不一樣請求的數量太多,內存永久保存全部請求的狀態機的內存開銷太大。因此,某些狀態機須要在恰當的時候被驅逐出內存。這裏有兩個思路:

  • 當狀態機返回Init狀態時,清除出隊列。
  • 啓動一個後臺線程,定時掃描狀態機隊列,採用LRU等標準緩存清除機制。

標識問題

每一個請求對應於一個狀態機,不一樣的狀態機採用不一樣的請求進行識別。

對於同一狀態機的不一樣消費者,在單虛擬機方案中,咱們採用線程id進行標識。

實施

排重優化的主要功能都是經過排重狀態機(TrafficAutomate)和狀態機隊列(QueueCoordinator)來實施的。排重狀態機描述的是針對單個請求的排重問題,狀態機隊列解決全部請求狀態機的排重問題。

狀態機實施(TrafficAutomate)

根據狀態機模型,其主要操做爲enQueue和deQueue,其狀態由頭部請求和尾部請求的狀態共同決定,因此須要定義兩個變量爲head和tail,用於表示頭部請求和尾部請求。爲了確保多線程操做下狀態機的完整性(Integraty),全部的操做都將加上鎖。

enQueue操做

當一個消費者執行enQueue操做時:若是此時尾部請求不爲空,根據等待惟一性要求,返回DECLINE,當前消費者應該拋棄該請求;若是頭部請求爲空,返回ACCPET,當前消費者應該馬上處理該消息;不然,返回BLOCK,該消費者應該等待,並不停的查看狀態機的狀態,一直到頭部請求處理完成。enQueue代碼以下:

synchronized ActionEnum enQueue(long id)
{      
    if(tail != INIT_QUEUE_ID)     
    { 
            return DECLINE;     
    } 
         if(head == INIT_QUEUE_ID)
              { 
                      head = id;
                       return ACCEPT;
               }
          else
               {
                      tail = id; 
                      return BLOCK;
                } 
} 

deQueue操做

對於deQueue操做,首先將尾部請求賦值給頭部請求,並將尾部請求置爲無效。deQueue代碼以下:

synchronized boolean deQueue(long id)
{ 
        head = tail; 
        tail = INIT_QUEUE_ID;         
        return true; 
} 

狀態機隊列實施(QueueCoordinator)接口定義

狀態機隊列集中管理全部請求的排重狀態機,因此其操做和單個狀態機同樣,即enQueue和deQueuqe接口。這兩個接口的實現須要識別特定請求的狀態機,因此它們的入參應該是請求。爲了兼容不一樣類型的請求消息,咱們採用了Java泛型編程。接口定義以下:

public interface QueueCoordinator
{      
       public boolean enQueue(T key);
        public void deQueue(T key);  
} 

enQueue操做

enQueue操做過程以下:

首先,根據傳入的請求key值,獲取狀態機, 若是不存在則建立一個新的狀態機,並保存在ConcurrentHashMap中。

接下來,獲取線程id做爲該消費者的惟一標識,並對對應狀態機進行enQueue操做。

若是狀態機返回值爲ACCEPT或者DECLINE,返回業務層處理代碼,ACCEPT意味着業務層須要處理該消息,DECLINE表示業務層能夠拋棄當前消息。若是狀態機返回值爲Block,則該線程保持等待狀態。

在某些狀況下,頭部請求線程可能因爲異常,未能對狀態機進行deQueue操做(做爲組件提供方,不能假定全部的規範被使用方實施)。爲了不處於阻塞狀態的消費者無期限地等待,建議對狀態機設置安全超時時限。超過了必定時間後,狀態機強制清空頭部請求,返回到業務層,業務層開始處理該請求。

代碼以下:

public boolean enQueue(T key) {     
     _loggingStastic();      
     TrafficAutomate trafficAutomate = key2Status.get(key); 
	 if(trafficAutomate == null)     
     {         
           trafficAutomate = new TrafficAutomate();        
           TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);         
           if(oldAutomate != null)         
           {             
                 trafficAutomate = oldAutomate;         
            }     
       }     
       long threadId = Thread.currentThread().getId();      
       ActionEnum action = trafficAutomate.enQueue(threadId);       
	   if(action == DECLINE)     
       {         
             return false;     
        }     
        else if (action == ACCEPT)     
        {         
             return true;     
         }     
         //Blocking status means some other thread are working on this key, 
		 so just wait till timeout     
         long start = System.currentTimeMillis();     
         long span = 0;     
         do {         
               _nonExceptionSleep(NAP_TIME_IN_MILL);          
               if(trafficAutomate.isHead(threadId))         
                    {             
                         return true;         
                     }          
                span = System.currentTimeMillis() - start;              
				}while(span <= timeout);      
          
          //remove head so that it won't block the queue for too long 
		  trafficAutomate.evictHeadByForce(threadId); 
               
          return true; 
} 

deQueue操做

deQueue操做首先從ConcurrentHashMap獲取改請求所對應的狀態機,接着獲取該線程的線程id,對狀態機進行deQueue操做。

enQueue代碼以下:

public void deQueue(T key) {     
    TrafficAutomate trafficAutomate = key2Status.get(key);   
	if(trafficAutomate == null)     
         {         
             logger.error("key {} doesn't exist ", key);                   
			 return;     
          }      
      long threadId = Thread.currentThread().getId();      
      trafficAutomate.deQueue(threadId); 
}

源代碼

完整源代碼能夠在QueueCoordinator獲取。

參考資料

  1. RabbitMQ, Highly Available Queues.
  2. IBM Knowledge Center, Introduction to message queuing.
  3. Wikipedia, Serializability.
  4. Hadoop, ZooKeeper Recipes and Solutions.
  5. Apache Kafka.
  6. Lamport L, Paxos Made Simple.
  7. Rabbit MQ, Highly Available Queues.
  8. IBM Knowledge Center, Introduction to message queuing.
  9. Wikipedia, Serializability.
  10. Hadoop, ZooKeeper Recipes and Solutions.
  11. Apache Kafka.
  12. Lamport L, Paxos Made Simple.

轉載地址:http://www.infoq.com/cn/articles/distributed-queue-programme-model-actual-combat-optimization

相關文章
相關標籤/搜索