爲何會須要消息隊列(MQ)?

爲何會須要消息隊列(MQ)?java

##########################################################################################
mysql

主要緣由是因爲在高併發環境下,因爲來不及同步處理,請求每每會發生堵塞,好比說,大量的insert,update之類的請求同時到達MySQL,直接致使無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。經過使用消息隊列,咱們能夠異步處理請求,從而緩解系統的壓力。linux

##########################################################################################
redis

美國計算機科學家,LaTex的做者Leslie Lamport說:「分佈式系統就是這樣一個系統,系統中一個你甚至都不知道的計算機出了故障,卻可能致使你本身的計算機不可用。」一語道破了開發分佈式系統的玄機,那就是它的複雜與不可控。因此Martin Fowler強調:分佈式調用的第一原則就是不要分佈式。這句話看似頗具哲理,然而就企業應用系統而言,只要整個系統在不停地演化,並有多個子系統共同存在時,這條原則就會被迫打破。蓋由於在當今的企業應用系統中,很難尋找到徹底不須要分佈式調用的場景。Martin Fowler提出的這條原則,一方面是但願設計者可以審慎地對待分佈式調用,另外一方面卻也是分佈式系統自身存在的缺陷所致。不管是CORBA,仍是EJB 2;不管是RPC平臺,仍是Web Service,都由於駐留在不一樣進程空間的分佈式組件,而引入額外的複雜度,並可能對系統的效率、可靠性、可預測性等諸多方面帶來負面的影響。算法

然而,不能否認的是在企業應用系統領域,咱們老是會面對不一樣系統之間的通訊、集成與整合,尤爲當面臨異構系統時,這種分佈式的調用與通訊變得越重要,它在架構設計中就更加凸顯其價值。而且,從業務分析與架構質量的角度來說,咱們也但願在系統架構中儘量地造成對服務的重用,經過獨立運行在進程中服務的形式,完全解除客戶端與服務端的耦合。這經常是架構演化的必然道路。在個人同事陳金洲發表在InfoQ上的文章《架構腐化之謎》中,就認爲能夠經過「將獨立的模塊放入獨立的進程」來解決架構由於代碼規模變大而腐化的問題。sql

隨着網絡基礎設施的逐步成熟,從RPC進化到Web Service,並在業界開始廣泛推行SOA,再到後來的RESTful平臺以及雲計算中的PaaS與SaaS概念的推廣,分佈式架構在企業應用中開始呈現出不一樣的風貌,然而異曲同工,這些分佈式架構的目標仍然是但願回到建造巴別塔的時代,系統之間的交流再也不爲不一樣語言與平臺的隔閡而產生障礙。正如Martin Fowler在《企業集成模式》一書的序中寫道:「集成之因此重要是由於相互獨立的應用是沒有生命力的。咱們須要一種技術能將在設計時並未考慮互操做的應用集成起來,打破它們之間的隔閡,得到比單個應用更多的效益」。這或許是分佈式架構存在的主要意義。數據庫

一、集成模式中的消息模式

歸根結底,企業應用系統就是對數據的處理,而對於一個擁有多個子系統的企業應用系統而言,它的基礎支撐無疑就是對消息的處理。與對象不一樣,消息本質上是一種數據結構(固然,對象也能夠看作是一種特殊的消息),它包含消費者與服務雙方都能識別的數據,這些數據須要在不一樣的進程(機器)之間進行傳遞,並可能會被多個徹底不一樣的客戶端消費。在衆多分佈式技術中,消息傳遞相較文件傳遞與遠程過程調用(RPC)而言,彷佛更勝一籌,由於它具備更好的平臺無關性,並可以很好地支持併發與異步調用。對於Web Service與RESTful而言,則能夠看作是消息傳遞技術的一種衍生或封裝。在《面向模式的軟件架構(卷四)》一書中,將關於消息傳遞的模式劃歸爲分佈式基礎設施的範疇,這是由於諸多消息中間件產品的出現,使得原來須要開發人員本身實現的功能,已經能夠直接重用。這極大地下降了包括設計成本、實現成本在內的開發成本。所以,對於架構師的要求也就從原來的設計實現,轉變爲對業務場景和功能需求的判斷,從而可以正確地進行架構決策、技術選型與模式運用。apache

經常使用的消息模式編程

在我參與過的全部企業應用系統中,無一例外地都採用(或在某些子系統與模塊中部分採用)了基於消息的分佈式架構。可是不一樣之處在於,讓咱們作出架構決策的證據卻迥然而異,這也直接影響咱們所要應用的消息模式。json

消息通道(Message Channel)模式

咱們經常運用的消息模式是Message Channel(消息通道)模式,如圖1所示。

圖1 Message Channel模式(圖片來自eaipatterns

消息通道做爲在客戶端(消費者,Consumer)與服務(生產者,Producer)之間引入的間接層,能夠有效地解除兩者之間的耦合。只要實現規定雙方須要通訊的消息格式,以及處理消息的機制與時機,就能夠作到消費者對生產者的「無知」。事實上,該模式能夠支持多個生產者與消費者。例如,咱們可讓多個生產者向消息通道發送消息,由於消費者對生產者的無知性,它沒必要考慮到底是哪一個生產者發來的消息。

雖然消息通道解除了生產者與消費者之間的耦合,使得咱們能夠任意地對生產者與消費者進行擴展,但它又同時引入了各自對消息通道的依賴,由於它們必須知道通道資源的位置。要解除這種對通道的依賴,能夠考慮引入Lookup服務來查找該通道資源。例如,在JMS中就能夠經過JNDI來獲取消息通道Queue。若要作到充分的靈活性,能夠將與通道相關的信息存儲到配置文件中,Lookup服務首先經過讀取配置文件來得到通道。

消息通道一般以隊列的形式存在,這種先進先出的數據結構無疑最爲適合這種處理消息的場景。微軟的MSMQ、IBM MQ、JBoss MQ以及開源的RabbitMQApache ActiveMQ都經過隊列實現了Message Channel模式。所以,在選擇運用Message Channel模式時,更多地是要從質量屬性的層面對各類實現了該模式的產品進行全方位的分析與權衡。例如,消息通道對併發的支持以及在性能上的表現;消息通道是否充分地考慮了錯誤處理;對消息安全的支持;以及關於消息持久化、災備(fail over)與集羣等方面的支持。由於通道傳遞的消息每每是一些重要的業務數據,一旦通道成爲故障點或安全性的突破點,對系統就會形成災難性的影響。在本文的第二部分,我將給出一個實際案例來闡釋在進行架構決策時應該考慮的架構因素,並由此作出正確地決策。

發佈者-訂閱者(Publisher-Subscriber)模式

一旦消息通道須要支持多個消費者時,就可能面臨兩種模型的選擇:拉模型與推模型。拉模型是由消息的消費者發起的,主動權把握在消費者手中,它會根據本身的狀況對生產者發起調用。如圖2所示:

圖2 拉模型

拉模型的另外一種體現則由生產者在狀態發生變動時,通知消費者其狀態發生了改變。但獲得通知的消費者卻會以回調方式,經過調用傳遞過來的消費者對象獲取更多細節消息。

在基於消息的分佈式系統中,拉模型的消費者一般以Batch Job的形式,根據事先設定的時間間隔,按期偵聽通道的狀況。一旦發現有消息傳遞進來,就會轉而將消息傳遞給真正的處理器(也能夠看作是消費者)處理消息,執行相關的業務。在本文第二部分介紹的醫療衛生系統,正是經過引入Quartz.NET實現了Batch Job,完成對消息通道中消息的處理。

推模型的主動權經常掌握在生產者手中,消費者被動地等待生產者發出的通知,這就要求生產者必須瞭解消費者的相關信息。如圖3所示:

圖3 推模型

對於推模型而言,消費者無需瞭解生產者。在生產者通知消費者時,傳遞的每每是消息(或事件),而非生產者自身。同時,生產者還能夠根據不一樣的狀況,註冊不一樣的消費者,又或者在封裝的通知邏輯中,根據不一樣的狀態變化,通知不一樣的消費者。

兩種模型各有優點。拉模型的好處在於能夠進一步解除消費者對通道的依賴,經過後臺任務去按期訪問消息通道。壞處是須要引入一個單獨的服務進程,以Schedule形式執行。而對於推模型而言,消息通道事實上會做爲消費者觀察的主體,一旦發現消息進入,就會通知消費者執行對消息的處理。不管推模型,拉模型,對於消息對象而言,均可能採用相似Observer模式的機制,實現消費者對生產者的訂閱,所以這種機制一般又被稱爲Publisher-Subscriber模式,如圖4所示:

圖4 Publisher-Subscriber模式(圖片來自eaipatterns )

一般狀況下,發佈者和訂閱者都會被註冊到用於傳播變動的基礎設施(即消息通道)上。發佈者會主動地瞭解消息通道,使其可以將消息發送到通道中;消息通道一旦接收到消息,會主動地調用註冊在通道中的訂閱者,進而完成對消息內容的消費。

對於訂閱者而言,有兩種處理消息的方式。一種是廣播機制,這時消息通道中的消息在出列的同時,還須要複製消息對象,將消息傳遞給多個訂閱者。例如,有多個子系統都須要獲取從CRM系統傳來的客戶信息,並根據傳遞過來的客戶信息,進行相應的處理。此時的消息通道又被稱爲Propagation通道。另外一種方式則屬於搶佔機制,它遵循同步方式,在同一時間只能有一個訂閱者可以處理該消息。實現Publisher-Subscriber模式的消息通道會選擇當前空閒的惟一訂閱者,並將消息出列,並傳遞給訂閱者的消息處理方法。

目前,有許多消息中間件都可以很好地支持Publisher-Subscriber模式,例如JMS接口規約中對於Topic對象提供的MessagePublisher與MessageSubscriber接口。RabbitMQ也提供了本身對該模式的實現。微軟的MSMQ雖然引入了事件機制,能夠在隊列收到消息時觸發事件,通知訂閱者。但它並不是嚴格意義上的Publisher-Subscriber模式實現。由微軟MVP Udi Dahan做爲主要貢獻者的NServiceBus,則對MSMQ以及WCF作了進一層包裝,並可以很好地實現這一模式。

消息路由(Message Router)模式

不管是Message Channel模式,仍是Publisher-Subscriber模式,隊列在其中都扮演了舉足輕重的角色。然而,在企業應用系統中,當系統變得愈來愈複雜時,對性能的要求也會愈來愈高,此時對於系統而言,可能就須要支持同時部署多個隊列,並可能要求分佈式部署不一樣的隊列。這些隊列能夠根據定義接收不一樣的消息,例如訂單處理的消息,日誌信息,查詢任務消息等。這時,對於消息的生產者和消費者而言,並不適宜承擔決定消息傳遞路徑的職責。事實上,根據S單一職責原則,這種職責分配也是不合理的,它既不利於業務邏輯的重用,也會形成生產者、消費者與消息隊列之間的耦合,從而影響系統的擴展。

既然這三種對象(組件)都不宜承擔這樣的職責,就有必要引入一個新的對象專門負責傳遞路徑選擇的功能,這就是所謂的Message Router模式,如圖5所示:

圖5 Message Router模式(圖片來自eaipatterns )

經過消息路由,咱們能夠配置路由規則指定消息傳遞的路徑,以及指定具體的消費者消費對應的生產者。例如指定路由的關鍵字,並由它來綁定具體的隊列與指定的生產者(或消費者)。路由的支持提供了消息傳遞與處理的靈活性,也有利於提升整個系統的消息處理能力。同時,路由對象有效地封裝了尋找與匹配消息路徑的邏輯,就好似一個調停者(Meditator),負責協調消息、隊列與路徑尋址之間關係。

除了以上的模式以外,Messaging模式提供了一個通訊基礎架構,使得咱們能夠將獨立開發的服務整合到一個完整的系統中。 Message Translator模式則完成對消息的解析,使得不一樣的消息通道可以接收和識別不一樣格式的消息。並且經過引入這樣的對象,也可以很好地避免出現盤根錯節,彼此依賴的多個服務。Message Bus模式能夠爲企業提供一個面向服務的體系架構。它能夠完成對消息的傳遞,對服務的適配與協調管理,並要求這些服務以統一的方式完成協做。

二、消息模式的應用場景

基於消息的分佈式架構老是圍繞着消息來作文章。例如能夠將消息封裝爲對象,或者指定消息的規範例如SOAP,或者對實體對象的序列化與反序列化。這些方式的目的只有一個,就是將消息設計爲生產者和消費者都可以明白的格式,並能經過消息通道進行傳遞。

場景一:基於消息的統一服務架構

在製造工業的CIMS系統中,咱們嘗試將各類業務以服務的形式公開給客戶端的調用者,例如定義這樣的接口:

public interface IService {
    IMessage Execute(IMessage aMessage);
    void SendRequest(IMessage aMessage);
}

之因此可以設計這樣的服務,緣由在於咱們對業務信息進行了高度的抽象,以消息的形式在服務之間傳遞。此時的消息實際上是生產者與消費者之間的契約或接口,只要遵循這樣的契約,按照規定的格式對消息進行轉換與抽取,就能很好地支持系統的分佈式處理。

在這個CIMS系統中,咱們將消息劃分爲ID,Name和Body,經過定義以下的接口方法,能夠得到消息主體的相關屬性:

public interface IMessage:ICloneable
{
     string MessageID { get; set; }
     string MessageName() { get; set; }
     IMessageItemSequence CreateMessageBody();
     IMessageItemSequence GetMessageBody();
}

消息主體類Message實現了IMessage接口。在該類中,消息體Body爲IMessageItemSequence類型。這個類型用於獲取和設置消息的內容:Value和Item:

public interface IItemValueSetting {
     string getSubValue(string name);
     void setSubValue(string name, string value);  
}
public interface IMessageItemSequence:IItemValueSetting, ICloneable
{      
     IMessageItem GetMessageItem(string aName);
     IMessageItem CreateMessageItem(string aName);       
}

Value爲字符串類型,它利用了HashTable存儲Key和Value的鍵值對。Item則爲IMessageItem類型,在IMessageItemSequence的實現類中,一樣利用了HashTable存儲Key和Item的鍵值對。

IMessageItem支持消息體的嵌套。它包含了兩部分:SubValue和SubItem。實現的方式和IMessageItemSequence類似。經過定義這樣的嵌套結構,使得消息的擴展成爲可能。通常的消息結構以下所示:

       IMessage——Name
                     ——ID
                     ——Body(IMessageItemSequence)
                            ——Value
                            ——Item(IMessageItem)
                                   ——SubValue
                                   ——SubItem(IMessageItem)
                                          ——……

各個消息對象之間的關係如圖6所示:

圖6 消息對象之間的關係

在實現服務進程通訊以前,咱們必須定義好各個服務或各個業務的消息格式。經過消息體的方法在服務的一端設置消息的值,而後發送,並在服務的另外一端得到這些值。例如發送消息端定義以下的消息體:

IMessageFactory factory = new MessageFactory();
IMessage message = factory.CreateMessage();
message.SetMessageName("service1");

IMessageItemSequence body = message.CreateMessageBody();
body.SetSubValue("subname1","subvalue1");
body.SetSubValue("subname2","subvalue2");

IMessageItem item1 = body.CreateMessageItem(」item1」);
item1.SetSubValue("subsubname11","subsubvalue11");
item1.SetSubValue("subsubname12","subsubvalue12");

//Send Request Message
MyServiceClient service = new MyServiceClient("Client");
service.SendRequest(message);

咱們在客戶端引入了一個ServiceLocator對象,它經過MessageQueueListener對消息隊列進行偵聽,一旦接收到消息,就獲取該消息中的name去定位它所對應的服務,而後調用服務的Execute(aMessage)方法,執行相關的業務。

ServiceLocator承擔的定位職責實際上是對存儲在ServiceContainer容器中的服務進行查詢。ServiceContainer容器能夠讀取配置文件,在啓動服務的時候初始化全部的分佈式服務(注意,這些服務都是無狀態的),並對這些服務進行管理。它封裝了服務的基本信息,諸如服務所在的位置,服務的部署方式等,從而避免服務的調用者直接依賴於服務的細節,既減輕了調用者的負擔,還可以較好地實現服務的擴展與遷移。

在這個系統中,咱們主要引入了Messaging模式,經過定義的IMessage接口,使得咱們更好地對服務進行抽象,並以一種扁平的格式存儲數據信息,從而解除服務之間的耦合。只要各個服務就共用的消息格式達成一致,請求者就能夠不依賴於接收者的具體接口。經過引入的Message對象,咱們就能夠創建一種在行業中通用的消息模型與分佈式服務模型。事實上,基於這樣的一個框架與平臺,在對製造行業的業務進行開發時,開發人員最主要的活動是與領域專家就各類業務的消息格式進行討論,這樣一種面向領域的消息語言,很好地掃清了技術人員與業務人員的溝通障礙;同時在各個子系統之間,咱們也只須要維護服務間相互傳遞的消息接口表。每一個服務的實現都是徹底隔離的,有效地作到了對業務知識與基礎設施的合理封裝與隔離。

對於消息的格式和內容,咱們考慮引入了Message Translator模式,負責對前面定義的消息結構進行翻譯和解析。爲了進一步減輕開發人員的負擔,咱們還能夠基於該平臺搭建一個消息-對象-關係的映射框架,引入實體引擎(Entity Engine)將消息轉換爲領域實體,使得服務的開發者可以以徹底面向對象的思想開發各個服務組件,並經過調用持久層實現消息數據的持久化。同時,利用消息總線(此時的消息總線能夠看作是各個服務組件的鏈接器)鏈接不一樣的服務,並容許異步地傳遞消息,對消息進行編碼。這樣一個基於消息的分佈式架構如圖7所示:

圖7 基於Message Bus的CIMS分佈式架構

場景二:消息中間件的架構決策

在一個醫療衛生系統中,咱們面臨了客戶對系統性能/可用性的非功能需求。在咱們最初啓動該項目時,客戶就表達了對性能與可用性的特別關注。客戶但願最終用戶在進行復雜的替換刪除操做時,可以具備很好的用戶體驗,簡言之,就是但願可以快速地獲得操做的響應。問題在於這樣的替換刪除操做須要處理比較複雜的業務邏輯,同時牽涉到的關聯數據量很是大,整個操做若需完成,最壞狀況下可能須要幾分鐘的時間。咱們能夠經過引入緩存、索引、分頁等多種方式對數據庫操做進行性能調優,但整個操做的耗時始終沒法達到客戶的要求。因爲該系統是在一個遺留系統的基礎上開發,若是要引入Map-Reduce來處理這些操做,以知足質量需求,則對架構的影響太大,且不能很好地重用以前系統的某些組件。顯然,付出的成本與收益並不成正比。

經過對需求進行分析,咱們注意到最終客戶並不須要實時得到結果,只要可以保證最終結果的一致性和完整性便可。關鍵在於就用戶體驗而言,他們不但願經歷漫長的等待,而後再通知他們操做到底是成功仍是失敗。這是一個典型須要經過後臺任務進行異步處理的場景。

在企業應用系統中,咱們經常會遭遇這樣的場景。咱們曾經在一個金融系統中嘗試經過本身編寫任務的方式來控制後臺線程的併發訪問,並完成對任務的調度。事實證實,這樣的設計並不是行之有效。對於這種典型的異步處理來講,基於消息傳遞的架構模式纔是解決這一問題的最佳辦法。

由於消息中間件的逐步成熟,對於這一問題的架構設計,已經由原來對設計實現的關注轉爲如何進行產品選型和技術決策。例如,在.NET平臺下,架構師須要重點考慮的是應該選擇哪一種消息中間件來處理此等問題?這就須要咱們必須結合具體的業務場景,來識別這種異步處理方式的風險,而後再根據這些風險去比較各類技術,以求尋找到最適合的方案。

經過分析業務場景以及客戶性質,咱們發現該業務場景具備以下特徵:

  • 在一些特定情形下,可能會集中發生批量的替換刪除操做,使得操做的併發量達到高峯;例如FDA要求召回一些違規藥品時,就須要刪除藥品庫中該藥品的信息;
  • 操做結果不要求實時性,但須要保證操做的可靠性,不能由於異常失敗而致使某些操做沒法進行;
  • 自動操做的過程是不可逆轉的,所以須要記錄操做歷史;
  • 基於性能考慮,大多數操做須要調用數據庫的存儲過程;
  • 操做的數據須要具有必定的安全性,避免被非法用戶對數據形成破壞;
  • 與操做相關的功能以組件形式封裝,保證組件的可重用性、可擴展性與可測試性;
  • 數據量可能隨着最終用戶的增多而逐漸增大;

針對如上的業務需求,咱們決定從如下幾個方面對各類技術方案進行橫向的比較與考量。

  • 併發:選擇的消息隊列必定要很好地支持用戶訪問的併發性;
  • 安全:消息隊列是否提供了足夠的安全機制;
  • 性能伸縮:不能讓消息隊列成爲整個系統的單一性能瓶頸;
  • 部署:儘量讓消息隊列的部署更爲容易;
  • 災備:不能由於意外的錯誤、故障或其餘因素致使處理數據的丟失;
  • API易用性:處理消息的API必須足夠簡單、並可以很好地支持測試與擴展;

咱們前後考察了MSMQ、Resque、ActiveMQ和RabbitMQ,經過查詢相關資料,以及編寫Spike代碼驗證相關質量,咱們最終選擇了RabbitMQ。

咱們選擇放棄MSMQ,是由於它嚴重依賴Windows操做系統;它雖然提供了易用的GUI方便管理人員對其進行安裝和部署,但若要編寫自動化部署腳本,卻很是困難。同時,MSMQ的隊列容量不能查過4M字節,這也是咱們沒法接收的。Resque的問題是目前僅支持Ruby的客戶端調用,不能很好地與.NET平臺集成。此外,Resque對消息持久化的處理方式是寫入到Redis中,於是須要在已有RDBMS的前提下,引入新的Storage。咱們比較傾心於ActiveMQ與RabbitMQ,但經過編寫測試代碼,採用循環發送大數據消息以驗證消息中間件的性能與穩定性時,咱們發現ActiveMQ的表現並不太讓人滿意。至少,在咱們的詢證調研過程當中,ActiveMQ會由於頻繁發送大數據消息而偶爾出現崩潰的狀況。相對而言,RabbitMQ在各個方面都比較適合咱們的架構要求。

例如在災備與穩定性方面,RabbitMQ提供了可持久化的隊列,可以在隊列服務崩潰的時候,將未處理的消息持久化到磁盤上。爲了不由於發送消息到寫入消息之間的延遲致使信息丟失,RabbitMQ引入了Publisher Confirm機制以確保消息被真正地寫入到磁盤中。它對Cluster的支持提供了Active/Passive與Active/Active兩種模式。例如,在Active/Passive模式下,一旦一個節點失敗,Passive節點就會立刻被激活,並迅速替代失敗的Active節點,承擔起消息傳遞的職責。如圖8所示:

圖8 Active/Passive Cluster(圖片來自RabbitMQ官方網站)

在併發處理方面,RabbitMQ自己是基於erlang編寫的消息中間件,做爲一門面向併發處理的編程語言,erlang對併發處理的天生優點使得咱們對RabbitMQ的併發特性抱有信心。RabbitMQ能夠很是容易地部署到Windows、Linux等操做系統下,同時,它也能夠很好地部署到服務器集羣中。它的隊列容量是沒有限制的(取決於安裝RabbitMQ的磁盤容量),發送與接收信息的性能表現也很是好。RabbitMQ提供了Java、.NET、Erlang以及C語言的客戶端API,調用很是簡單,而且不會給整個系統引入太多第三方庫的依賴。 例如.NET客戶端只須要依賴一個程序集。

即便咱們選擇了RabbitMQ,但仍有必要對系統與具體的消息中間件進行解耦,這就要求咱們對消息的生產者與消費者進行抽象,例如定義以下的接口:

    public interface IQueueSubscriber
    {
        void ListenTo<T>(string queueName, Action<T> action);
        void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully);
        void ListenTo<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages);
    }

    public interface IQueueProvider
    {
        T Pop<T>(string queueName);
        T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully);
        T PopAndAwaitAcknowledgement<T>(string queueName, Predicate<T> messageProcessedSuccessfully, bool requeueFailedMessages);
        void Push(FunctionalArea functionalArea, string routingKey, object payload);
    }

在這兩個接口的實現類中,咱們封裝了RabbitMQ的調用類,例如:

    public class RabbitMQSubscriber : IQueueSubscriber
    {
        public void ListenTo<T>(string queueName, Action<T> action)
        {
            using (IConnection connection = _factory.OpenConnection())
            using (IModel channel = connection.CreateModel())
            {
                var consumer = new QueueingBasicConsumer(channel);
                string consumerTag = channel.BasicConsume(queueName, AcknowledgeImmediately, consumer);

                var response = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                var serializer = new JavaScriptSerializer();
                string json = Encoding.UTF8.GetString(response.Body);
                var message = serializer.Deserialize<T>(json);

                action(message);
            }
        }       
    }
    public class RabbitMQProvider : IQueueProvider
    {

        public T Pop<T>(string queueName)
        {
            var returnVal = default(T);
            const bool acknowledgeImmediately = true;

            using (var connection = _factory.OpenConnection())
            using (var channel = connection.CreateModel())
            {
                var response = channel.BasicGet(queueName, acknowledgeImmediately);

                if (response != null)
                {
                    var serializer = new JavaScriptSerializer();
                    var json = Encoding.UTF8.GetString(response.Body);
                    returnVal = serializer.Deserialize<T>(json);
                }
            }

            return returnVal;
        }
    }

咱們用Quartz.Net來實現Batch Job。經過定義一個實現了IStatefulJob接口的Job類,在Execute()方法中完成對隊列的偵聽。Job中RabbitMQSubscriber類的ListenTo()方法會調用Queue的Dequeue()方法,當接收的消息到達隊列時,Job會偵聽到消息達到的事件,而後以同步的方式使得消息彈出隊列,並將消息做爲參數傳遞給Action委託。所以,在Batch Job的Execute()方法中,能夠定義消息處理的方法,並調用RabbitMQSubscriber類的ListenTo()方法,以下所示(注意,這裏傳遞的消息事實上是Job的Id):

        public void Execute(JobExecutionContext context)
        {
            string queueName = queueConfigurer.GetQueueProviders().Queue.Name;
            try
            {
                queueSubscriber.ListenTo<MyJob>(
queueName,
                    job => request.MakeRequest(job.Id.ToString()));
            }
            catch(Exception err)
            {
                Log.WarnFormat("Unexpected exception while processing queue '{0}', Details: {1}", queueName, err);
            }
        }

        

隊列的相關信息例如隊列名都存儲在配置文件中。Execute()方法調用了request對象的MakeRequest()方法,並將得到的消息(即JobId)傳遞給該方法。它會根據JobId到數據庫中查詢該Job對應的信息,並執行真正的業務處理。

在對基於消息處理的架構進行決策時,除了前面提到的考慮因素外,還須要就許多設計細節進行多方位的判斷與權衡。例如針對Job的執行以及隊列的管理,就須要考慮以下因素:

  • 對Queue中Job狀態的監控與查詢;
  • 對Job優先級的管理;
  • 可否取消或終止執行時間過長的Job;
  • 是否可以設定Job的執行時間;
  • 是否可以設定Poll的間隔時間;
  • 可否跨機器分佈式的放入Job;
  • 對失敗Job的處理;
  • 可否支持多個隊列,命名隊列;
  • 可否容許執行Job的工做進程對應特定的隊列;
  • 對Dead Message的支持。

三、選擇的時機

究竟在何時,咱們應該選擇基於消息處理的分佈式架構?根據我參與的多個企業應用系統的經驗,竊覺得須要知足以下幾個條件:

  • 對操做的實時性要求不高,而須要執行的任務極爲耗時;
  • 存在企業內部的異構系統間的整合;
  • 服務器資源須要合理分配與利用;

對於第一種狀況,咱們經常會選擇消息隊列來處理執行時間較長的任務。此時引入的消息隊列就成了消息處理的緩衝區。消息隊列引入的異步通訊機制,使得發送方和接收方都不用等待對方返回成功消息,就能夠繼續執行下面的代碼,從而提升了數據處理的能力。尤爲是當訪問量和數據流量較大的狀況下,就能夠結合消息隊列與後臺任務,經過避開高峯期對大數據進行處理,就能夠有效下降數據庫處理數據的負荷。前面提到的醫療衛生系統正是這樣一種適用場景。

對於不一樣系統乃至於異構系統的整合,偏偏是消息模式善於處理的場景。只要規定了消息的格式與傳遞方式,就能夠有效地實現不一樣系統之間的通訊。在爲某汽車製造商開發一個大型系統時,分銷商做爲.NET客戶端,須要將數據傳遞到管理中心。這些數據將被Oracle的EBS(E-Business Suite)使用。分銷商管理系統(Dealer Management System,DMS)採用了C/S結構,數據庫爲SQL Server,汽車製造商管理中心的EBS數據庫爲Oracle 10g。咱們須要解決兩種不一樣數據庫間數據的傳遞。解決方案就是利用MSMQ,將數據轉換爲與數據庫無關的消息數據,並在兩端部署MSMQ服務器,創建消息隊列以便於存儲消息數據。實現架構如圖9所示。

圖10 利用MSMQ實現的分佈式處理架構

首先,分銷商的數據經過MSMQ傳遞到MSMQ Server,再將數據插入到SQL Server數據庫的同時,利用FTP將數據傳送到專門的文件服務器上。EBS App Server會將文件服務器中的文件,基於接口規範寫入到Oracle數據庫,從而實現.NET系統與Oracle系統之間的整合。

分佈式系統一般可以緩解單個服務器的壓力,經過將不一樣的業務操做與數據處理以不一樣的服務形式部署並運行在不一樣的服務器上,就能夠有效地分配與利用服務器資源。在這種狀況下,部署在不一樣服務器上的服務,既可能做爲服務端,用以處理客戶端調用的請求,也可能做爲客戶端,在處理完本身的業務後,將其他業務請求委派給其餘服務。在早期的CORBA系統中,經過創建統一的Naming Service,用以管理和分派服務,並經過Event Service實現事件的分發與處理。但CORBA系統採用的是RPC的方式,須要將服務設計和部署爲遠程對象,並創建代理。若是經過消息通道的方式,則既能夠解除這種對遠程對象的依賴,又能夠很好地支持異步調用模型。在前面提到的CIMS系統,就是經過消息總線提供消息傳遞的基礎設施,並創建統一的消息處理服務模型,解除服務見的依賴,使得各個服務可以獨立地部署到不一樣服務器上。

四、面臨的困難

因爲消息模式自身的特殊性,咱們在運用消息模式創建基於消息的分佈式架構時,經常會面臨許多困難。

首先是系統集成的問題。因爲系統之間的通訊靠消息進行傳遞,就必須保證消息的一致性,同時,還須要維護系統之間(主要是服務之間)接口的穩定性。一旦接口發生變化,就可能影響到該接口的全部調用者。即便服務經過接口進行了抽象,因爲消息持有雙方服務規定的業務數據,在必定程度上違背了封裝的要義。換言之,生產與消費消息的雙方都緊耦合於消息。消息的變化會直接影響到各個服務接口的實現類。然而,爲了儘量保證接口的抽象性,咱們所要處理的消息都不是強類型的,這就使得咱們在編譯期間很難發現由於消息內容發生變動產生的錯誤。在我以前提到的汽車零售商管理系統就存在這樣的問題。當時我負責的CRM模塊須要同時與多個子系統進行通訊,而每一個子系統又是由不一樣的團隊進行開發。團隊之間由於溝通緣由,經常未能及時地同步接口表。雖然各個子系統的單元測試和功能測試都已經過,但直到對CRM進行集成測試,才發現存在大量消息不匹配的集成問題,這些問題的原由都是由於消息的變動。

解決的方案是引入充分的集成測試,甚至是迴歸測試,並須要及時運行這些測試,以快速地得到反饋。咱們能夠將集成測試做爲提交代碼的驗證們,要求每次提交代碼都必須運行集成測試與指定的迴歸測試 。這正是持續集成的體現。經過在本地構建與遠程構建運行集成測試與迴歸測試,有效地保證本地版本與集成後的版本不會由於消息的改變使得功能遭受破壞。一旦遭受破壞,也可以及時得到反饋,發現問題,即刻解決這些問題,而不是等到項目後期集中進行集成測試。

另外一個問題是後臺任務的非實時性帶來的測試困難。因爲後臺任務是按期對消息隊列中的消息進行處理,於是觸發的時機是不可預測的 。對於這種狀況,咱們一般會同時運用兩種方案,雙管其下地解決問題。首先,咱們會爲系統引入一個同步實現功能的版本,並經過在配置文件中引入toggle的開關機制,隨時能夠在同步功能與異步功能之間進行切換。若是咱們可以保證消息隊列處理與後臺任務執行的正確性,就能夠設置爲同步功能,這樣就能快速而準確地對該任務所表明的功能進行測試,並及時收穫反饋。同時,咱們能夠在持續集成服務器上創建一個專門的管道(pipeline),用以運行基於消息處理的異步版本。這個管道對應的任務能夠經過手動執行,也能夠對管道設置定時器,在指定時間執行(例如在凌晨兩點執行一次,這樣在次日開始工做以前能夠得到反饋)。咱們須要爲該管道準備特定的執行環境,並將後臺任務的偵聽與執行時間修改成能夠接受的值。這樣既可以及時瞭解功能是否正確,又能保證基於消息的系統是工做正常的。

固然,分佈式系統還存在解析消息、網絡傳遞的性能損耗。對於這些問題,須要架構師審慎地分析業務場景,正確地選擇架構方案與架構模式。相比較本地系統而言,分佈式系統的維護難度可能成倍遞增。這既須要咱們在進行架構決策與設計時,充分考慮系統架構的穩定性,同時還須要引入系統日誌處理。更好的作法是爲日誌處理增長錯誤通知的功能,只要發生消息處理的錯誤信息,就經過郵件、短信等方式通知系統管理員,及時地處理錯誤。由於只有在發生錯誤的當時查詢錯誤日誌,纔可以更好對問題進行定位。同時,還能夠爲系統引入Error Message Queue以及Dead Message Queue,以便於處理錯誤和異常狀況。

對於分佈式系統而言,還須要考慮服務執行結果的一致性,尤爲是當某個業務須要多個服務參與到一個會話中時,一旦某個服務發生故障,就可能致使應用出現狀態不一致的狀況,由於只有全部參與者都成功執行了任務,才能視爲徹底成功。這就牽涉到分佈式事務的問題,此時任務的執行就變成了事務型的:即任務必須是原子的,結果狀態必須保持一致。在任務處理過程當中,狀態修改是彼此隔離的,成功的狀態修改在整個事務執行過程當中是持久的。這就是事務的ACID(Atomic,Consistent,Isolated與Durable)屬性。

一種方案是引入分佈式事務協調器,即DTC(Distributed Transaction Coordinator),將事務分爲兩段式甚至三段式提交,要求整個事務的全部參與者以投票形式決定事務是徹底成功仍是失敗。另外一種方案是下降對結果一致性的要求。根據eBay的最佳實踐,考慮到分佈式事務的成本,得到分佈式資源即時的一致性是沒必要要的,也是不現實的。在Randy Shoup的文章《可伸縮性最佳實踐:來自eBay的經驗》中提到了Eric Brewer的CAP公理:分佈式系統的三項重要指標——一致性(Consistency)、可用性(Availability)和 分區耐受性(Partition-tolerance)——在任意時刻,只有兩項能同時成立。咱們應該根據不一樣的應用場景,權衡這三個要素。在沒必要要保證即時的一致性前提下,咱們能夠考慮合理地劃分服務,儘可能將可能做用在同一個事務範圍的業務操做部署在同一個進程中,以免分佈式部署。若是確實須要多個分佈式服務之間保持執行結果的一致,能夠考慮引入數據覈對,異步恢復事件或集中決算等手段。



 
0
0
相關文章
相關標籤/搜索