做爲一種基礎的抽象數據結構,隊列被普遍應用在各種編程中。大數據時代對跨進程、跨機器的通信提出了更高的要求,和以往相比,分佈式隊列編程的運用幾乎已無處不在。可是,這種常見的基礎性的事物每每容易被忽視,使用者每每會忽視兩點:html
本文分爲三部分,包括模型篇、實戰篇和優化篇,主要剖析了分佈式隊列編程模型的需求來源、定義、結構以及其變化多樣性;根據做者在新美大實際工做經驗,給出了隊列式編程在分佈式環境下的一些具體應用。node
首先從最基礎的需求出發,詳細剖析分佈式隊列編程模型的需求來源、定義、結構以及其變化多樣性。經過這一部分的講解,做者指望能在兩方面幫助讀者:一方面,提供一個系統性的思考方法,使讀者可以將具體需求關聯到分佈式隊列編程模型,具有進行分佈式隊列架構的能力;另外一方面,經過全方位的講解,讓讀者可以快速識別工做中碰到的各類分佈式隊列編程模型。git
什麼時候選擇分佈式隊列apache
通訊是人們最基本的需求,一樣也是計算機最基本的需求。對於工程師而言,在編程和技術選型的時候,更容易進入大腦的概念是RPC、RESTful、Ajax、Kafka。在這些具體的概念後面,最本質的東西是「通訊」。因此,大部分建模和架構都須要從「通訊」這個基本概念開始。當肯定系統之間有通訊需求的時候,工程師們須要作不少的決策和平衡,這直接影響工程師們是否會選擇分佈式隊列編程模型做爲架構。從這個角度出發,影響建模的因素有四個:When、Who、Where、How。編程
When:同步VS異步設計模式
通訊的一個基本問題是:發出去的消息何時須要被接收到?這個問題引出了兩個基礎概念:「同步通訊」和「異步通訊」。根據理論抽象模型,同步通訊和異步通訊最本質的差異來自於時鐘機制的有無。同步通訊的雙方須要一個校準的時鐘,異步通訊的雙方不須要時鐘。現實的狀況是,沒有徹底校準的時鐘,因此沒有絕對的同步通訊。一樣,絕對異步通訊意味着沒法控制一個發出去的消息被接收到的時間點,無期限的等待一個消息顯然毫無實際意義。因此,實際編程中全部的通訊既不是「同步通訊」也不是「異步通訊」;或者說,既是「同步通訊」也是「異步通訊」。特別是對於應用層的通訊,其底層架構可能既包含「同步機制」也包含「異步機制」。判斷「同步」和「異步」消息的標準問題太深,而不適合繼續展開。做者這裏給一些啓發式的建議:緩存
不管如何,工程師們不能生活在混沌之中,不作決定每每是最壞的決定。當分析一個通訊需求或者進行通訊構架的時候,工程師們被迫做出「同步」仍是「異步」的決定。當決策的結論是「異步通訊」的時候,分佈式隊列編程模型就是一個備選項。
Who:發送者接收者解耦
在進行通訊需求分析的時候,須要回答的另一個基本問題是:消息的發送方是否關心誰來接收消息,或者反過來,消息接收方是否關心誰來發送消息。若是工程師的結論是:消息的發送方和接收方不關心對方是誰、以及在哪裏,分佈式隊列編程模型就是一個備選項。由於在這種場景下,分佈式隊列架構所帶來的解耦能給系統架構帶來這些好處:
Where:消息暫存機制
在進行通訊發送方設計的時候,令工程師們苦惱的問題是:若是消息沒法被迅速處理掉而產生堆積怎麼辦、可否被直接拋棄?若是根據需求分析,確認存在消息積存,而且消息不該該被拋棄,就應該考慮分佈式隊列編程模型構架,由於隊列能夠暫存消息。
How:如何傳遞
對通訊需求進行架構,一系列的基礎挑戰會迎面而來,這包括:
分佈式隊列編程定義
很難給出分佈式隊列編程模型的精肯定義,因爲本文偏重於應用,做者並不打算徹底參照某個標準的模型。整體而言:分佈式隊列編程模型包含三類角色:發送者(Sender)、分佈式隊列(Queue)、接收者(Receiver)。發送者和接收者分別指的是生產消息和接收消息的應用程序或服務。
須要重點明確的概念是分佈式隊列,它是提供如下功能的應用程序或服務:
特定的場景下,它固然能夠是Kafka、RabbitMQ等消息中間件。但它的展示形式並不限於此,例如:
最基礎的分佈式隊列編程抽象模型是點對點模型,其餘抽象構架模型居於改基本模型上各角色的數量和交互變化所致使的不一樣拓撲圖。具體而言,不一樣數量的發送者、分佈式隊列以及接收者組合造成了不一樣的分佈式隊列編程模型。記住並理解典型的抽象模型結構對需求分析和建模而言相當重要,同時也會有助於學習和深刻理解開源框架以及別人的代碼。
點對點模型(Point-to-point)
基礎模型中,只有一個發送者、一個接收者和一個分佈式隊列。以下圖所示:
生產者消費者模型(Producer–consumer)
若是發送者和接收者均可以有多個部署實例,甚至不一樣的類型;可是共用同一個隊列,這就變成了標準的生產者消費者模型。在該模型,三個角色通常稱爲生產者(Producer)、分佈式隊列(Queue)、消費者(Consumer)。
發佈訂閱模型(PubSub)
若是隻有一類發送者,發送者將產生的消息實體按照不一樣的主題(Topic)分發到不一樣的邏輯隊列。每種主題隊列對應於一類接收者。這就變成了典型的發佈訂閱模型。在該模型,三個角色通常稱爲發佈者(Publisher),分佈式隊列(Queue),訂閱者(Subscriber)。
MVC模型
若是發送者和接收者存在於同一個實體中,可是共享一個分佈式隊列。這就很像經典的MVC模型。
編程模型
爲了讓讀者更好地理解分佈式隊列編程模式概念,這裏將其與一些容易混淆的概念作一些對比 。
分佈式隊列模型編程和異步編程
分佈式隊列編程模型的通信機制通常是採用異步機制,可是它並不等同於異步編程。
首先,並不是全部的異步編程都須要引入隊列的概念,例如:大部分的操做系統異步I/O操做都是經過硬件中斷( Hardware Interrupts)來實現的。
其次,異步編程並不必定須要跨進程,因此其應用場景並不必定是分佈式環境。
最後,分佈式隊列編程模型強調發送者、接收者和分佈式隊列這三個角色共同組成的架構。這三種角色與異步編程沒有太多關聯。
分佈式隊列模式編程和流式編程
隨着Spark Streaming,Apache Storm等流式框架的普遍應用,流式編程成了當前很是流行的編程模式。可是本文所闡述的分佈式隊列編程模型和流式編程並不是同一律念。
首先,本文的隊列編程模式不依賴於任何框架,而流式編程是在具體的流式框架內的編程。
其次,分佈式隊列編程模型是一個需求解決方案,關注如何根據實際需求進行分佈式隊列編程建模。流式框架裏的數據流通常都經過隊列傳遞,不過,流式編程的關注點比較聚焦,它關注如何從流式框架裏獲取消息流,進行map、reduce、 join等轉型(Transformation)操做、生成新的數據流,最終進行彙總、統計。
第二部分實戰篇給出了隊列式編程在分佈式環境下的一些具體應用。這些例子的基礎模型並不是首次出如今互聯網的文檔中,可是全部的例子都是按照挑戰、構思、架構三個步驟進行講解的。
受限於保密性要求,有些細節並未給出,但這些細節並不影響講解的完整性。另外一方面,特別具體的需求容易讓人費解,爲了使講解更加順暢,做者也會採用一些更通俗易懂的例子。經過本篇的講解,但願和讀者一塊兒去實踐「如何從需求出發去構架分佈式隊列編程模型」。
須要聲明的是,這裏的解決方案並非所處場景的最優方案。可是,任何一個稍微複雜的問題,都沒有最優解決方案,更談不上惟一的解決方案。實際上,工程師天天所追尋的只是在知足必定約束條件下的可行方案。固然不一樣的約束會致使不一樣的方案,約束的鬆弛度決定了工程師的可選方案的寬廣度。
信息採集處理應用普遍,例如:廣告計費、用戶行爲收集等。做者碰到的具體項目是爲廣告系統設計一套高可用的採集計費系統。
典型的廣告CPC、CPM計費原理是:收集用戶在客戶端或者網頁上的點擊和瀏覽行爲,按照點擊和瀏覽進行計費。計費業務有以下典型特徵:
挑戰
計費業務的典型特徵給咱們帶來了以下挑戰:
構思
採集的高可用性意味着咱們須要多臺服務器同時採集,爲了不單IDC故障,採集服務器須要部署在多IDC裏面。
實現一個高可用、高吞吐量、高一致性的信息傳遞系統顯然是一個挑戰,爲了控制項目開發成本,採用開源的消息中間件進行消息傳輸就成了必然選擇。
完整性約束要求集中進行計費,因此計費系統發生在覈心IDC。
計費服務並不關心採集點在哪裏,採集服務也並不關心誰進行計費。
根據以上構思,咱們認爲採集計費符合典型的「生產者消費者模型」。
架構
採集計費系統架構圖以下:
採用此架構,咱們能夠在以下方面作進一步優化:
緩存是一個很是寬泛的概念,幾乎存在於系統各個層級。典型的緩存訪問流程以下:
對於已經存入緩存的數據,其更新時機和更新頻率是一個經典問題,即緩存更新機制(Cache Replacement Algorithms )。典型的緩存更新機制包括:近期最少使用算法(LRU)、最不常用算法(LFU)。這兩種緩存更新機制的典型實現是:啓動一個後臺進程,按期清理最近沒有使用的,或者在一段時間內最少使用的數據。因爲存在緩存驅逐機制,當一個請求在沒有命中緩存時,業務層須要從持久層中獲取信息並更新緩存,提升一致性。
挑戰
分佈式緩存給緩存更新機制帶來了新的問題:
構思
根據上面的分析,分佈式緩存須要解決的問題是:在保證讀取性能的前提下,儘量地提升老數據的一致性和新數據的可用性。若是仍然假定最近被訪問的鍵值最有可能被再次訪問(這是LRU或者LFU成立的前提),鍵值每次被訪問後觸發一次異步更新就是提升可用性和一致性最先的時機。不管是高性能要求仍是業務解耦都要求緩存讀取和緩存更新分開,因此咱們應該構建一個單獨的集中的緩存更新服務。集中進行緩存更新的另一個好處來自於頻率控制。因爲在一段時間內,不少類型訪問鍵值的數量知足高斯分佈,短期內重複對同一個鍵值進行更新Cache並不會帶來明顯的好處,甚至形成緩存性能的降低。經過控制同一鍵值的更新頻率能夠大大緩解該問題,同時有利於提升總體數據的一致性,參見「排重優化」。
綜上所述,業務訪問方須要把請求鍵值快速傳輸給緩存更新方,它們之間不關心對方的業務。要快速、高性能地實現大量請求鍵值消息的傳輸,高性能分佈式消息中間件就是一個可選項。這三方一塊兒組成了一個典型的分佈式隊列編程模型。
架構
以下圖,全部的業務請求方做爲生產者,在返回業務代碼處理以前將請求鍵值寫入高性能隊列。Cache Updater做爲消費者從隊列中讀取請求鍵值,將持久層中數據更新到緩存中。
採用此架構,咱們能夠在以下方面作進一步優化:
典型的後臺任務處理應用包括工單處理、火車票預訂系統、機票選座等。咱們所面對的問題是爲運營人員建立工單。一次能夠爲多個運營人員建立多個工單。這個應用場景和火車票購買很是相似。工單相對來講更加抽象,因此,下文會結合火車票購買和運營人員工單分配這兩種場景同時講解。典型的工單建立要經歷兩個階段:數據篩選階段、工單建立階段。例如,在火車票預訂場景,數據篩選階段用戶選擇特定時間、特定類型的火車,而在工單建立階段,用戶下單購買火車票。
挑戰
工單建立每每會面臨以下挑戰:
構思
若是將用戶篩選的最終規則作爲消息存儲下來,併發送給工單建立系統。此時,工單建立系統將具有建立工單所需的全局信息,具有在知足各類約束的條件下進行統籌優化的能力。若是工單建立階段採用單實例部署,就能夠避免數據鎖定問題,同時也意味着沒有鎖衝突,因此也不會有死鎖或任務延遲問題。
居於以上思路,在多工單處理系統的模型中,篩選階段的規則建立系統將充當生產者角色,工單建立系統將充當消費者角色,篩選規則將做爲消息在二者之間進行傳遞。這就是典型的分佈式隊列編程架構。根據工單建立量的不一樣,能夠採用數據庫或開源的分佈式消息中間件做爲分佈式隊列。
架構
該架構流程以下圖:
採用該架構,咱們在數據鎖定、運籌優化、原子性問題都能獲得比較好成果:
接下來重點闡述工程師運用分佈式隊列編程構架的時候,在生產者、分佈式隊列以及消費者這三個環節的注意點以及優化建議。
肯定採用分佈式隊列編程模型以後,主體架構就算完成了,但工程師的工做還遠遠未結束。天下事必作於細,細節是一個不錯的架構向一個優秀的系統進階的關鍵因素。優化篇選取了做者以及其同事在運用分佈式隊列編程模型架構時所碰到的典型問題和解決方案。這裏些問題出現的頻率較高,若是你經驗不夠,極可能會「踩坑」。但願經過這些講解,幫助讀者下降分佈式隊列編程模型的使用門檻。本文將對分佈式隊列編程模型的三種角色:生產者(Producer),分佈式隊列(Queue),消費者(Consumer)分別進行優化討論。
在分佈式隊列編程中,生產者每每並不是真正的生產源頭,只是整個數據流中的一個節點,這種生產者的操做是處理-轉發(Process-Forward)模式。
這種模式給工程師們帶來的第一個問題是吞吐量問題。這種模式下運行的生產者,一邊接收上游的數據,一邊將處理完的數據發送給下游。本質上,它是一個很是經典的數學問題,其抽象模型是一些沒有蓋子的水箱,每一個水箱接收來自上一個水箱的水,進行處理以後,再將水發送到下一個水箱。工程師須要預測水源的流量、每一個環節水箱的處理能力、水龍頭的排水速度,最終目的是避免水溢出水箱,或者儘量地減少溢出事件的機率。實際上流式編程框架以及其開發者花了大量的精力去處理和優化這個問題。下文的緩存優化和批量寫入優化都是針對該問題的解決方案。
第二個須要考慮的問題是持久化。因爲各類緣由,系統老是會宕機。若是信息比較敏感,例如計費信息、火車票訂單信息等,工程師們須要考慮系統宕機所帶來的損失,找到讓損失最小化的解決方案。持久化優化重點解決這一類問題。
緩存優化
處於「處理-轉發」模式下運行的生產者每每被設計成請求驅動型的服務,即每一個請求都會觸發一個處理線程,線程處理完後將結果寫入分佈式隊列。若是因爲某種緣由隊列服務不可用,或者性能惡化,隨着新請求的到來,生產者的處理線程就會產生堆積。這可能會致使以下兩個問題:
緩解這類問題的思路來自於CAP理論,即經過下降一致性來提升可用性。生產者接收線程在收到請求以後第一時間不去處理,直接將請求緩存在內存中(犧牲一致性),而在後臺啓動多個處理線程從緩存中讀取請求、進行處理並寫入分佈式隊列。與線程所佔用的內存開銷相比,大部分的請求所佔內存幾乎能夠忽略。經過在接收請求和處理請求之間增長一層內存緩存,能夠大大提升系統的處理吞吐量和可擴展性。這個方案本質上是一個內存生產者消費者模型。
批量寫入優化
若是生產者的請求過大,寫分佈式隊列可能成爲性能瓶頸,有以下幾個因素:
若是在處理請求和寫隊列之間添加一層緩存,消息寫入程序批量將消息寫入隊列,能夠大大提升系統的吞吐量。緣由以下:
持久化優化
經過添加緩存,消費者服務的吞吐量和可用性都獲得了提高。但緩存引入了一個新問題——內存數據丟失。對於敏感數據,工程師須要考慮以下兩個潛在問題:
因此緩存中的數據須要按期被持久化到磁盤等持久層設備中,典型的持久化觸發策略主要有兩種:
分佈式隊列不等同於各類開源的或者收費的消息中間件,甚至在一些場景下徹底不須要使用消息中間件。可是,消息中間件產生的目的就是解決消息傳遞問題,這爲分佈式隊列編程架構提供了不少的便利。在實際工做中,工程師們應該將成熟的消息中間件做爲隊列的首要備選方案。
本節對消息中間件的功能、模型進行闡述,並給出一些消息中間件選型、部署的具體建議。
中間件的功能
明白一個系統的每一個具體功能是設計和架構一個系統的基礎。典型的消息中間件主要包含以下幾個功能:
概念模型
抽象的消息中間件模型包含以下幾個角色:
概念模型以下圖:
爲了提升分發性能,不少消息中間件把消息代理服務器的拓撲圖發送到發送者和接收者客戶端(Sender/Receiver Client),如此一來,發送源能夠直接進行消息分發。
選型標準
要完整的描述消息中間件各個方面很是困難,大部分良好的消息中間件都有完善的文檔,這些文檔的長度遠遠超過本文的總長度。但以下幾個標準是工程師們在進行消息中間件選型時常常須要考慮和權衡的。
性能主要有兩個方面須要考慮:吞吐量(Throughput)和響應時間(Latency)。
不一樣的消息隊列中間件的吞吐量和響應時間相差甚遠,在選型時能夠去網上查看一些性能對比報告。
對於同一種中間件,不一樣的配置方式也會影響性能。主要有以下幾方面的配置:
可靠性
可靠性主要包含:可用性、持久化、確認機制等。
高可用性的消息中間件應該具有以下特徵:
高可靠的消息中間件應該確保從發送者接收到的消息不會丟失。中間件代理服務器的宕機並非小几率事件,因此保存在內存中的消息很容易發生丟失。大部分的消息中間件都依賴於消息的持久化去下降消息丟失損失,即將接收到的消息寫入磁盤。即便提供持久化,仍有兩個問題須要考慮:
確認機制本質上是通信的握手機制(Handshaking)。若是沒有該機制,消息在傳輸過程當中丟失將不會被發現。高敏感的消息要求選取具有確認機制的消息中間件。固然若是沒有接收到消息中間件確認完成的指令,應用程序須要決定如何處理。典型的作法有兩個:
客戶端接口所支持語言
採用現存消息中間件就意味着避免重複造輪子。若是某個消息中間件未能提供對應語言的客戶端接口,則意味着極大的成本和兼容性問題。
投遞策略指的是一個消息會被髮送幾回。主要包含三種策略:最多一次(At most Once )、最少一次(At least Once)、僅有一次(Exactly Once)。
在實際應用中,只考慮消息中間件的投遞策略並不能保證業務的投遞策略,由於接收者在確認收到消息和處理完消息並持久化之間存在一個時間窗口。例如,即便消息中間件保證僅有一次(Exactly Once),若是接收者先確認消息,在持久化以前宕機,則該消息並未被處理。從應用的角度,這就是最多一次(At most Once)。反之,接收者先處理消息並完成持久化,但在確認以前宕機,消息就要被再次發送,這就是最少一次(At least Once)。 若是消息投遞策略很是重要,應用程序自身也須要仔細設計。
消費者是分佈式隊列編程中真正的數據處理方,數據處理方最多見的挑戰包括:有序性、串行化(Serializability)、頻次控制、完整性和一致性等。
有序性
在不少場景下,如何保證隊列信息的有序處理是一個棘手的問題。以下圖,假定分佈式隊列保證請求嚴格有序,請求ri2和ri1都是針對同一數據記錄的不一樣狀態,ri2的狀態比ri1的狀態新。T一、T二、T3和T4表明各個操做發生的時間,而且 T1 < T2 < T3 < T4("<"表明早於)。
採用多消費者架構,這兩條記錄被兩個消費者(Consumer1和Consumer2)處理後更新到數據庫裏面。Consumer1雖然先讀取ri1可是卻後寫入數據庫,這就致使,新的狀態被老的狀態覆蓋,因此多消費者不保證數據的有序性。
串行化
不少場景下,串行化是數據處理的一個基本需求,這是保證數據完整性、可恢復性、事務原子性等的基礎。爲了在並行計算系統裏實現串行化,一系列的相關理論和實踐算法被提出。對於分佈式隊列編程架構,要在在多臺消費者實現串行化很是複雜,無異於重複造輪子。
頻次控制
有時候,消費者的消費頻次須要被控制,可能的緣由包括:
完整性和一致性
完整性和一致性是全部多線程和多進程的代碼都面臨的問題。在多線程或者多進程的系統中考慮完整性和一致性每每會大大地增長代碼的複雜度和系統出錯的機率。
幾乎全部串行化理論真正解決的問題只有一個:性能。 因此,在性能容許的前提下,對於消費者角色,建議採用單實例部署。經過單實例部署,有序性、串行化、完整性和一致性問題自動得到了解決。另外,單實例部署的消費者擁有所有所需信息,它能夠在頻次控制上採起不少優化策略。
天下沒有免費的午飯。一樣,單實例部署並不是沒有代價,它意味着系統可用性的下降,不少時候,這是沒法接受的。解決可用性問題的最直接的思路就是冗餘(Redundancy)。最經常使用的冗餘方案是Master-slave架構,不過大部分的Master-slave架構都是Active/active模式,即主從服務器都提供服務。例如,數據庫的Master-slave架構就是主從服務器都提供讀服務,只有主服務器提供寫服務。大部分基於負載均衡設計的Master-slave集羣中,主服務器和從服務器同時提供相同的服務。這顯然不知足單例服務優化需求。有序性和串行化須要Active/passive架構,即在某一時刻只有主實例提供服務,其餘的從服務等待主實例失效。這是典型的領導人選舉架構,即只有得到領導權的實例才能充當實際消費者,其餘實例都在等待下一次選舉。採用領導人選舉的Active/passive架構能夠大大緩解純粹的單實例部署所帶來的可用性問題。
使人遺憾的是,除非工程師們本身在消費者實例裏面實現Paxos等算法,並在每次消息處理以前都執行領導人選舉。不然,理論上講,沒有方法能夠保障在同一個時刻只有一個領導者。而對每一個消息都執行一次領導人選舉,顯然性能不可行。實際工做中,最容易出現的問題時機發生在領導人交接過程當中,即前任領導人實例變成輔助實例,新部署實例開始承擔領導人角色。爲了平穩過渡,這二者之間須要有必定的通信機制,可是,不管是網絡分區(Network partition)仍是原領導人服務崩潰都會使這種通信機制變的不可能。
對於完整性和一致性要求很高的系統,咱們須要在選舉制度和交接制度這兩塊進行優化。
領導人選舉架構
典型的領導人選舉算法有Paxos、ZAB( ZooKeeper Atomic Broadcast protocol)。爲了不重複造輪子,建議採用ZooKeeper的分佈式鎖來實現領導人選舉。典型的ZooKeeper實現算法以下(摘自參考資料[10]):
Let ELECTION be a path of choice of the application. To volunteer to be a leader: 1.Create znode z with path "ELECTION/guid-n_" with both SEQUENCE and EPHEMERAL flags; 2.Let C be the children of "ELECTION", and i be the sequence number of z; 3.Watch for changes on "ELECTION/guid-n_j", where j is the largest sequence number such that j < i and n_j is a znode in C; Upon receiving a notification of znode deletion: 1.Let C be the new set of children of ELECTION; 2.If z is the smallest node in C, then execute leader procedure; 3.Otherwise, watch for changes on "ELECTION/guid-n_j", where j is the largest sequence number such that j < i and n_j is a znode in C;
領導人交接架構
領導人選舉的整個過程發生在ZooKeeper集羣中,各個消費者實例在這場選舉中只充當被告知者角色(Learner)。領導人選舉算法,只能保證最終只有一個Leader被選舉出來,並不保障被告知者對Leader的理解是徹底一致的。本質上,上文的架構裏,選舉的結果是做爲令牌(Token)傳遞給消費者實例,消費者將自身的ID與令牌進行對比,若是相等,則開始執行消費操做。因此當發生領導人換屆的狀況,不一樣的Learner獲知新Leader的時間並不一樣。例如,前任Leader若是由於網絡問題與ZooKeeper集羣斷開,前任Leader只能在超時後才能判斷本身是否再也不承擔Leader角色了,而新的Leader可能在這以前已經產生。另外一方面,即便前任Leader和新Leader同時接收到新Leader選舉結果,某些業務的完整性要求迫使前任Leader仍然完成當前未完成的工做。以上的講解很是抽象,生活中卻給了一些更加具體的例子。衆所周知,美國總統候選人在選舉結束後並不直接擔任美國總統,從選舉到最終承擔總統角色須要一個過渡期。對於新當選Leader的候選人而言,過渡期間稱之爲加冕階段(Inauguration)。對於即將卸任的Leader,過渡期稱爲交接階段(HandOver)。因此一個基於領導人選舉的消費者從加冕到卸任經歷三個階段:Inauguration、Execution、HandOver。在加冕階段,新領導須要進行一些初始化操做。Execution階段是真正的隊列消息處理階段。在交接階段,前任領導須要進行一些清理操做。
相似的,爲了解決領導人交接問題,全部的消費者從代碼實現的角度都須要實現相似ILeaderCareer接口。這個接口包含三個方發inaugurate(),handOver()和execute()。某個部署實例(Learner)在得知本身承擔領導人角色後,須要調用inaugurate()方法,進行加冕。主要的消費邏輯經過不停的執行execute()實現,當確認本身再也不承擔領導人以後,執行handOver()進行交接。
public interface ILeaderCareer { public void inaugurate(); public void handOver(); public boolean execute(); }
若是承擔領導人角色的消費者,在執行execute()階段得知本身將要下臺,根據消息處理的原子性,該領導人能夠決定是否提早終止操做。若是整個消息處理是一個原子性事務,直接終止該操做能夠快速實現領導人換屆。不然,前任領導必須完成當前消息處理後,才進入交接階段。這意味着新的領導人,在inaugurate()階段須要進行必定時間的等待。
排重優化
頻次控制是一個經典問題。對於分佈式隊列編程架構,相同請求重複出如今隊列的狀況並很多見。若是相同請求在隊列中重複太多,排重優化就顯得很必要。分佈式緩存更新是一個典型例子,全部請求都被髮送到隊列中用於緩存更新。若是請求符合典型的高斯分佈,在一段時間內會出現大量重複的請求,而同時多線程更新同一請求緩存顯然沒有太大的意義。
排重優化是一個算法,其本質是基於狀態機的編程,整個講解經過模型、構思和實施三個步驟完成。
模型
進行排重優化的前提是大量重複的請求。在模型這一小節,咱們首先闡述重複度模型、以及不一樣重複度所致使的消費模型,最後基於這兩個模型去講解排重狀態機。
重複度模型
首先咱們給出最小重複長度的概念。同一請求最小重複長度:同一請求在隊列中的重複出現的最小間距。例如,請求ri第一次出如今位置3,第二次出如今10,最小重複長度等於7。
是否須要進行排重優化取決於隊列中請求的重複度。因爲不一樣請求之間並不存在重複的問題,不失通常性,這裏的模型只考了單個請求的重複度,重複度分爲三個類:無重複、稀疏重複、高重複。
對於不一樣的重複度,會有不一樣的消費模型。
無重複消費模型
在整個隊列處理過程當中,全部的請求都不相同,以下圖:
稀疏重複消費模型
當同一請求最小重複長度大於消費者隊列長度,以下圖。假定有3個消費者,Consumer1將會處理r1,Consumer2將會處理r2,Consumer3將會處理r3,若是每一個請求處理的時間嚴格相等,Consumer1在處理完r1以後,接着處理r4,Consumer2將會處理r2以後會處理r1。雖然r1被再次處理,可是任什麼時候刻,只有這一個消費者在處理r1,不會出現多個消費者同時處理同一請求的場景。
高重複消費模型
以下圖,仍然假定有3個消費者,隊列中前面4個請求都是r1,它會同時被3個消費者線程處理:
顯然,對於無重複和稀疏重複的分佈式隊列,排重優化並不會帶來額外的好處。排重優化所針對的對象是高重複消費模型,特別是對於並行處理消費者比較多的狀況,重複處理同一請求,資源消耗極大。
排重狀態機
排重優化的主要對象是高重複的隊列,多個消費者線程或進程同時處理同一個冪等請求只會浪費計算資源並延遲其餘待請求處理。因此,排重狀態機的一個目標是處理惟一性,即:同一時刻,同一個請求只有一個消費者處理。若是消費者獲取一條請求消息,但發現其餘消費者正在處理該消息,則當前消費者應該處於等待狀態。若是對同一請求,有一個消費者在處理,一個消費者在等待,而同一請求再次被消費者讀取,再次等待則沒有意義。因此,狀態機的第二個目標是等待惟一性,即:同一時刻,同一個請求最多隻有一個消費者處於等待狀態。總上述,狀態機的目標是:處理惟一性和等待惟一性。咱們把正在處理的請求稱爲頭部請求,正在等待的請求稱爲尾部請求。
因爲狀態機的處理單元是請求,因此須要針對每個請求創建一個排重狀態機。基於以上要求,咱們設計的排重狀態機包含4個狀態Init,Process,Block,Decline。各個狀態之間轉化過程以下圖:
構思
狀態機描述的是針對單個請求操做所引發狀態變化,排重優化須要解決隊列中全部請求的排重問題,須要對全部請求的狀態機進行管理。這裏只考慮單虛擬機內部對全部請求狀態機的管理,對於跨虛擬機的管理能夠採用相似的方法。對於多狀態機管理主要包含三個方面:一致性問題、完整性問題和請求緩存驅逐問題。
一致性問題
一致性在這裏要求同一請求的不一樣消費者只會操做一個狀態機。因爲每一個請求都產生一個狀態機,系統將會包含大量的狀態機。爲了兼顧性能和一致性,咱們採用ConcurrentHashMap保存全部的狀態機。用ConcurrentHashMap而不是對整個狀態機隊列進行加鎖,能夠提升並行處理能力,使得系統能夠同時操做不一樣狀態機。爲了不處理同一請求的多消費者線程同時對ConcurrentHashMap進行插入所致使狀態機不一致問題,咱們利用了ConcurrentHashMap的putIfAbsent()方法。代碼方案以下,key2Status用於存儲全部的狀態機。消費者在處理請求以前,從狀態機隊列中讀取排重狀態機TrafficAutomate。若是沒有找到,則建立一個新的狀態機,並經過putIfAbsent()方法插入到狀態機隊列中。
private ConcurrentHashMap<T, TrafficAutomate> key2Status = new ConcurrentHashMap(); TrafficAutomate trafficAutomate = key2Status.get(key); if(trafficAutomate == null) { trafficAutomate = new TrafficAutomate(); TrafficAutomate oldAutomate = key2Status. putIfAbsent(key, trafficAutomate); if(oldAutomate != null) { trafficAutomate = oldAutomate; } }
完整性問題
完整性要求保障狀態機Init,Process,Block,Decline四種狀態正確、狀態之間的轉換也正確。因爲狀態機的操做很是輕量級,兼顧完整性和下降代碼複雜度,咱們對狀態機的全部方法進行加鎖。
請求緩存驅逐問題(Cache Eviction)
若是不一樣請求的數量太多,內存永久保存全部請求的狀態機的內存開銷太大。因此,某些狀態機須要在恰當的時候被驅逐出內存。這裏有兩個思路:
標識問題
每一個請求對應於一個狀態機,不一樣的狀態機採用不一樣的請求進行識別。
對於同一狀態機的不一樣消費者,在單虛擬機方案中,咱們採用線程id進行標識。
實施
排重優化的主要功能都是經過排重狀態機(TrafficAutomate)和狀態機隊列(QueueCoordinator)來實施的。排重狀態機描述的是針對單個請求的排重問題,狀態機隊列解決全部請求狀態機的排重問題。
狀態機實施(TrafficAutomate)
根據狀態機模型,其主要操做爲enQueue和deQueue,其狀態由頭部請求和尾部請求的狀態共同決定,因此須要定義兩個變量爲head和tail,用於表示頭部請求和尾部請求。爲了確保多線程操做下狀態機的完整性(Integraty),全部的操做都將加上鎖。
enQueue操做
當一個消費者執行enQueue操做時:若是此時尾部請求不爲空,根據等待惟一性要求,返回DECLINE,當前消費者應該拋棄該請求;若是頭部請求爲空,返回ACCPET,當前消費者應該馬上處理該消息;不然,返回BLOCK,該消費者應該等待,並不停的查看狀態機的狀態,一直到頭部請求處理完成。enQueue代碼以下:
synchronized ActionEnum enQueue(long id) { if(tail != INIT_QUEUE_ID) { return DECLINE; } if(head == INIT_QUEUE_ID) { head = id; return ACCEPT; } else { tail = id; return BLOCK; } }
deQueue操做
對於deQueue操做,首先將尾部請求賦值給頭部請求,並將尾部請求置爲無效。deQueue代碼以下:
synchronized boolean deQueue(long id) { head = tail; tail = INIT_QUEUE_ID; return true; }
狀態機隊列實施(QueueCoordinator)接口定義
狀態機隊列集中管理全部請求的排重狀態機,因此其操做和單個狀態機同樣,即enQueue和deQueuqe接口。這兩個接口的實現須要識別特定請求的狀態機,因此它們的入參應該是請求。爲了兼容不一樣類型的請求消息,咱們採用了Java泛型編程。接口定義以下:
public interface QueueCoordinator { public boolean enQueue(T key); public void deQueue(T key); }
enQueue操做
enQueue操做過程以下:
首先,根據傳入的請求key值,獲取狀態機, 若是不存在則建立一個新的狀態機,並保存在ConcurrentHashMap中。
接下來,獲取線程id做爲該消費者的惟一標識,並對對應狀態機進行enQueue操做。
若是狀態機返回值爲ACCEPT或者DECLINE,返回業務層處理代碼,ACCEPT意味着業務層須要處理該消息,DECLINE表示業務層能夠拋棄當前消息。若是狀態機返回值爲Block,則該線程保持等待狀態。
在某些狀況下,頭部請求線程可能因爲異常,未能對狀態機進行deQueue操做(做爲組件提供方,不能假定全部的規範被使用方實施)。爲了不處於阻塞狀態的消費者無期限地等待,建議對狀態機設置安全超時時限。超過了必定時間後,狀態機強制清空頭部請求,返回到業務層,業務層開始處理該請求。
代碼以下:
public boolean enQueue(T key) { _loggingStastic(); TrafficAutomate trafficAutomate = key2Status.get(key); if(trafficAutomate == null) { trafficAutomate = new TrafficAutomate(); TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate); if(oldAutomate != null) { trafficAutomate = oldAutomate; } } long threadId = Thread.currentThread().getId(); ActionEnum action = trafficAutomate.enQueue(threadId); if(action == DECLINE) { return false; } else if (action == ACCEPT) { return true; } //Blocking status means some other thread are working on this key, so just wait till timeout long start = System.currentTimeMillis(); long span = 0; do { _nonExceptionSleep(NAP_TIME_IN_MILL); if(trafficAutomate.isHead(threadId)) { return true; } span = System.currentTimeMillis() - start; }while(span <= timeout); //remove head so that it won't block the queue for too long trafficAutomate.evictHeadByForce(threadId); return true; }
deQueue操做
deQueue操做首先從ConcurrentHashMap獲取改請求所對應的狀態機,接着獲取該線程的線程id,對狀態機進行deQueue操做。
enQueue代碼以下:
public void deQueue(T key) { TrafficAutomate trafficAutomate = key2Status.get(key); if(trafficAutomate == null) { logger.error("key {} doesn't exist ", key); return; } long threadId = Thread.currentThread().getId(); trafficAutomate.deQueue(threadId); }
源代碼
完整源代碼能夠在QueueCoordinator獲取。
轉載地址:http://www.infoq.com/cn/articles/distributed-queue-programme-model-actual-combat-optimization