RabbitMQ.Client API (.NET)中文文檔

主要的名稱空間,接口和類

核心API中定義接口和類 RabbitMQ.Client 名稱空間: node

1
using RabbitMQ.Client;
核心API接口和類
  • IModel :表示一個AMQP 0-9-1頻道,提供了大部分 的操做(方法)協議。
  • IConnection :表示一個AMQP 0-9-1鏈接
  • ConnectionFactory :構造 IConnection 實例
  • IBasicConsumer:表明一個消費者消息
其餘有用的接口和類包括:
  • DefaultBasicConsumer:經常使用的消費者基類
RabbitMQ.Client之外的公共命名空間 包括:
  • RabbitMQ.Client.Events :客戶端庫的各類事件和事件處理程序,包括 EventingBasicConsumer , 創建在消費者實現c#事件處理程序。
  • RabbitMQ.Client.Exceptions :用戶可見的異常。

全部其餘的命名空間是留給庫的私有實現細節,雖然私有的命名空間的成員使用該庫以容許開發者實現他們在庫實現發現錯誤或設計錯誤的解決方法一般是提供給應用程序。應用程序能夠不依賴任何類,接口,成員出現私人的命名空間內的跨庫的版本保持穩定的變量等。

鏈接到代理( Connecting to a Broker

要鏈接到RabbitMQ的,有必要實例化(例示)一個鏈接工廠和其配置爲使用所需的主機,虛擬主機和證書(證書)。而後使用ConnectionFactory.CreateConnection()打開的鏈接。下面兩段代碼鏈接到主機名RabbitMQ的節點:
   
   
   
   
     
     
     
     
ConnectionFactory factory = new ConnectionFactory(); factory.UserName = user; // "gue factory.Password = pass; factory.VirtualHost = vhost; factory.HostName = hostName; IConnection conn = factory.CreateConnection();
   
   
   
   
     
     
     
     
因爲.NET客戶端使用AMQP 0-9-1 URI規格比其餘客戶的嚴格的解釋,必須當心使用URI時服用。特別是,主機部分不能被省略,而且與空名稱虛擬主機不可尋址(可尋址的)。全部出廠的屬性都有默認值。若是該屬性保持創建鏈接以前未分配將用於一個屬性的默認值: 用戶名     「guest」 密碼     「guest」 虛擬主機     「/」 主機名     「localhost」 端口     5672按期鏈接,5671鏈接使用TLS 而後IConnection接口可用於打開一個通道:
      
      
      
      
通道 如今能夠被用來發送和接收消息,如在隨後的章節中描述。
ConnectionFactory factory = new ConnectionFactory(); factory.Uri = "amqp://user:pass@hostName:port/vhost";IConnection conn = factory.CreateConnection();IModel channel= conn.CreateModel();
使用交換機(Exchanges)和隊列(Queues)

客戶端應用程序在交換機和隊列中工做,AMQP 0-9-1的高層次的積木工做。這些都必須先「申明」,而後才能使用它們。聲明任一類型的對象只是確保該名稱的一個存在,若是有必要創造它。繼續前面的例子,下面的代碼聲明一個交換機和隊列,而後綁定在一塊兒。
     
     
     
     
model.ExchangeDeclare(exchangeName, ExchangeType.Direct);model.QueueDeclare(queueName, false, false, false, null);model.QueueBind(queueName, exchangeName, routingKey, null);
這將激活一下對象:
   1: 「直連」(direct)類型的非持久性(non-durable),非自動刪除(non-autodelete)交換機(exchange)
   2:非持久,不自動刪除,非排他性(non-exclusive)的隊列

       交換機可經過使用額外的參數 定製 。上面的代碼將隊列綁定到指定路由的交換機。請注意,許多通道API(IModel)方法被重載。 ExchangeDeclare的便利縮寫形式使用合理的默認值。也有較長的形式與更多的參數,讓你 根據須要重載 這些默認值,充分控制在須要的地方。這種「短版,長版」的格局在整個API使用。

發送消息(Publishing Messages)

使用  IModel.BasicPublish 發送消息到交換機,  以下:
     
     
     
     
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
爲了更好的控制,可使用重載變量指定的強制性標誌,或指定的消息屬性:
     
     
     
     
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");IBasicProperties props = model.CreateBasicProperties();props.ContentType = "text/plain";props.DeliveryMode = 2;model.BasicPublish(exchangeName,routingKey,props,messageBodyBytes);

以持續性的交互模式發送文本消息,有關消息屬性的詳細信息請查看IBasicProperties接口的定義。git

在下面的例子中,咱們發送定義了Header(頭)的消息:
正則表達式

1
2
3
4
5
6
7
8
9
10
byte [] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes( "Hello, world!" );
   
IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain" ;
props.DeliveryMode = 2;
props.Headers = new Dictionary< string , object >();
props.Headers.Add( "latitude" ,  51.5252949);
props.Headers.Add( "longitude" , -0.0905493);
   
model.BasicPublish(exchangeName, routingKey,props, messageBodyBytes);

下面的示例代碼設置消息過時時間:數據庫

1
2
3
4
5
6
7
8
byte [] messageBodyBytes=System.Text.Encoding.UTF8.GetBytes( "Hello, world!" );
  
IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain" ;
props.DeliveryMode = 2;
props.Expiration = "36000000"
  
mode.BasicPublish(exchangeName,routingKey,props,messageBodyBytes);

獲取單條消息(Fetching Individual Messages ("pull API"))編程

使用IModel.BasicGet獲取單條消息,從消息的Header(屬性)和消息主體能夠獲取到BasicGetResult的實例c#

1
2
3
4
5
6
7
8
bool noAck = false ;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null ) {
     // No message available at this time.
} else {
     IBasicProperties props = result.BasicProperties;
     byte [] body = result.Body;
     ...

上面的  noAck=false 你也可使用  IModel.BasicAck 來確認成功的接受並處理了消息。服務器

1
2
3
4
...
     // acknowledge receipt of the message
     channel.BasicAck(result.DeliveryTag, false );
}

注意:使用該API獲取消息是效率較低。若是你想使用RabbitMQ將郵件推送到客戶端,請參閱下一節。 
網絡


經過訂閱檢索消息(Retrieving Messages By Subscription ("push API"))併發


接收消息的另外一種方法是使用IBasicConsumer接口創建訂閱。該消息將在到達時被自動推送,而沒必要進行主動請求。實現消費者的一種方法是使用的方便(convenience)類EventingBasicConsumer,其中調度交付和其餘消費的生命週期事件爲C#事件:
app

1
2
3
4
5
6
7
8
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                 {
                     var body = ea.Body;
                     // ... process the message
                     ch.BasicAck(ea.DeliveryTag, false );
                 };
String consumerTag=channel.BasicConsume(queueName, false ,consumer);

另外一種選擇是繼承DefaultBasicConsumer類,重寫必要的方法,或者直接實現IBasicConsumer。一般要實現核心方法IBasicConsumer.HandleBasicDeliver。更復雜的消費者將須要實施進一步的方法。特別是,HandleModelShutdown使 channel/connection關閉。消費者還能夠實現HandleBasicCancelOk通知取消的消息。 在沒有被提交給原始IModel.BasicConsume狀況下,DefaultBasicConsumer的ConsumerTag屬性可用於檢索服務器生成的消費者標籤。您可使用IModel.BasicCancel主動取消消費者:

1
channel.BasicCancel(consumerTag);

當調用API方法,你老是經過消費者標籤提交到他們本身的消費者,它能夠是客戶端或服務器生成的,詳見AMQP規範0-9-1文件中解釋。


消費者的併發性考慮(Concurrency Considerations for Consumers)


每一個IConnection實例,在當前實現中,由單個後臺線程從Socket中讀取並調度所得事件給應用程序的支持。若是啓用心跳,必須3.5.0版本,它們用.NET的定時器來實現的。一般,所以,在使用這種庫的應用程序至少須要激活兩個線程:

1:應用程序線程(the application thread

包含應用程序的邏輯,調用 IModel 的方法執行協議操做。

2:活動的 I/O 線程

經過IConnection的實例隱藏和徹底管理


在任何 回調的應用程序和庫中,線程模型對應用程序是可見的。這樣的回調包括:

一、任何IBasicConsumer方法

二、在IModel的BasicReturn事件

三、任何對IConnection了各類關閉事件,IModel等。


消費者回調和訂閱(Consumer Callbacks and Ordering)


從版本3.5.0應用回調處理程序能夠調用阻塞操做(如IModel.QueueDeclare或IModel.BasicCancel)。IBasicConsumer回調併發調用。然而,每一個通道的操做順序將予以保留。換句話說,若是消息A和B在該順序輸送在同一通道上,它們將被以該順序進行處理。若是消息A和B分別在不一樣的通道輸送,它們能夠以任何順序進行處理(或並行)。消費者回調在派往由.NET運行庫提供的默認的TaskScheduler任務調用。


使用自定義計劃任務(Using a Custom Task Scheduler)

咱們能夠經過設置ConnectionFactory.TaskScheduler使用自定義的任務調度程序:

1
2
3
4
5
6
7
public class CustomTaskScheduler : TaskScheduler
{
   // ...
}
 
var cf = new ConnectionFactory();
cf.TaskScheduler = new CustomTaskScheduler();

此處的例子,能夠用來限制與一個自定義的TaskScheduler併發程度。


線程之間共享通道(Sharing Channels Between Threads)

根據經驗,IModel實例不該由多個線程同時使用:應用程序代碼應該爲IModel實例維護一個清晰的線程全部權概念。若是多個線程須要訪問特定的IModel實例,應用程序應該實施互斥自己。 實現這一點的一種方式是對於IModel的全部用戶鎖定實例自己

   
   
   
   
IModel ch = RetrieveSomeSharedIModelInstance();lock (ch) { ch.BasicPublish(...);}
不正確序列化的 IModel操做包括但不限於,以下:

一、 在線路上發送的無效幀序列(例如,若是同時運行多於一個BasicPublish操做,則發生),和/或NotSupportedExceptions從RpcContinuationQueue類中的方法拋出,引起「禁止請求的管道」(在同時運行多個AMQP 0-9-1同步操做(如ExchangeDeclare)的狀況下)。


處理不可路由的消息(Handling Unroutable Messages

若是發佈的消息具備設置的「mandatory」標誌,但不能傳遞,代理將返回給發送客戶端(經過basic.return AMQP 0-9-1命令)。 爲了通知這樣的返回,客戶能夠訂閱IModel.BasicReturn事件。 若是沒有鏈接到事件的偵聽器,則返回的消息將被靜默刪除。

   
   
   
   
model.BasicReturn += new RabbitMQ.Client.Events.BasicReturnEventHandler(...);
例如,若是客戶端發佈了一條「強制」標誌設置爲未綁定到隊列的「direct」類型交換的消息,則BasicReturn事件將觸發。

斷開與RabbitMQ的鏈接( Disconnecting from RabbitMQ )

要斷開鏈接,只需關閉通道和鏈接:

   
   
   
   
channel.Close(200, "Goodbye");conn.Close();
注意,關閉頻道被認爲是良好的作法,但不是絕對必要的 - 它將在底層鏈接關閉時自動完成。 在某些狀況下,您可能但願鏈接在鏈接上的最後一個打開通道關閉後自動關閉。 要實現這一點,請將IConnection.AutoClose屬性設置爲true,但僅在建立第一個通道後:
    
    
    
    
IConnection conn = factory.CreateConnection(...);IModel channel = conn.CreateModel();conn.AutoClose = true;
當AutoClose爲true時,最後關閉的通道也將致使鏈接關閉。 若是在建立任何通道以前將其設置爲true,則鏈接將在此時關閉。

從網絡故障自動恢復( Automatic Recovery From Network Failures )
鏈接恢復( Connection Recovery )

客戶端和RabbitMQ節點之間的網絡鏈接可能失敗。 RabbitMQ .NET / C#客戶端支持自動恢復鏈接和拓撲(queues, exchanges, bindings, and consumers)。 許多應用程序的自動恢復過程遵循如下步驟:

     一、從新鏈接 (Reconnect)

     二、還原鏈接偵聽器( Restore connection listeners)

     三、從新打開通道(Re-open channels)

     四、還原頻道偵聽器(Restore channel listeners

     五、恢復通道basic.qos設置,發佈者確認和事務設置( Restore channel basic.qos setting, publisher confirms and transaction settings


拓撲恢復包括對每一個通道執行的如下操做:

     一、從新聲明交易(除了預約義的交易)(Re-declare exchanges (except for predefined ones)

     二、從新聲明隊列(Re-declare queues)

     三、恢復全部綁定(Recover all bindings)

     四、恢復全部消費者(Recover all consumers)

要啓用自動鏈接恢復,請將ConnectionFactory.AutomaticRecoveryEnabled設置爲true:

   
   
   
   
ConnectionFactory factory = new ConnectionFactory();factory.AutomaticRecoveryEnabled = true;// connection that will recover automaticallyIConnection conn = factory.CreateConnection();
若是恢復因爲異常(例如RabbitMQ節點仍然不可達)失敗,將在固定的時間間隔(默認爲5秒)後重試。 間隔能夠配置:
    
    
    
    
ConnectionFactory factory = new ConnectionFactory();// attempt recovery every 10 secondsfactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

拓撲恢復

拓撲恢復涉及恢復queues, exchanges, bindings, and consumers。 默認狀況下啓用它,但能夠禁用:

   
   
   
   
ConnectionFactory factory = new ConnectionFactory();Connection conn = factory.CreateConnection();factory.AutomaticRecoveryEnabled = true;factory.TopologyRecoveryEnabled = false;
手動確認和自動恢復
當使用手動確認時,可能與RabbitMQ節點的網絡鏈接在消息傳遞和確認之間失敗。 在鏈接恢復後,RabbitMQ將重置全部通道上的交付標籤。 這意味着使用舊的傳遞標記的basic.ack,basic.nack和basic.reject將致使通道異常。 爲了不這種狀況,RabbitMQ .NET客戶端跟蹤和更新傳遞標記,使它們在恢復之間單調增加。 IModel.BasicAck,IModel.BasicNack和IModel.BasicReject而後將調整後的交付標籤轉換爲RabbitMQ使用的標籤。 不會發送過時交貨標籤的確認。 使用手動確認和自動恢復的應用程序必須可以處理從新遞送。

使用AMQP 0-9-1的常見方法( Common ways of working with AMQP 0-9-1

當使用RabbitMQ構建分佈式系統時,會有一些不一樣的消息模式反覆出現。在本節中,咱們將介紹一些最多見的編碼模式和交互風格:


    點對點消息:遠程過程調用(RPC)和指向特定接收器的異步消息。

    事件廣播:一對多交互;隱含地指向一組感興趣的接收者的消息的傳輸,以及零個或多個可能的響應的收集。

    責任轉移:選擇網絡中的哪一個部分負責任何給定的消息。

    消息傳輸:至少一次和最多一次消息傳遞。

    在與外部資源交互時保持原子性和冪等性。


有限庫支持也可用於處理這些模式,在RabbitMQ.Client.MessagePatterns命名空間:


    訂閱提供了從服務器接收消息的高級接口。

    SimpleRpcServer構建在Subscription上以實現RPC或單向服務。

    SimpleRpcClient構建在Subscription上,與遠程服務交互。


未來的RabbitMQ .NET客戶端庫版本將包括改進對最多見的消息傳遞模式及其變體的高級支持。


點對點消息(Point-to-point Messaging


當消息的發佈者具備特定的接收應用時,例如,當經過AMQP服務器使得RPC樣式的服務可用時,或者當工做流鏈中的應用接收到消息時,發生點對點消息傳遞模式工做項,並將轉換後的工做項發送給其後繼者。

同步,客戶機 - 服務器遠程過程調用(RPC)

爲了執行請求/響應RPC,


    一些解決服務的手段必須可用

    一些接收答覆的方法必須可用

    將請求消息與回覆消息相關聯的一些裝置必須可用


尋址服務(Addressing the service

因爲AMQP消息是使用一對交換名稱和路由密鑰發佈的,所以這足以用於尋址服務。使用簡單的交換名/路由 - 密鑰組合容許多種不一樣的方式來實現服務,同時向客戶端呈現相同的接口。例如,服務能夠被實現爲從隊列消耗的單個進程和在內部的負載均衡,或者其能夠是從單個隊列消耗的多個進程,被遞送請求循環式,從而在沒有特殊編碼的狀況下進行負載均衡服務邏輯。消息也能夠尋址到服務請求隊列


    直接,使用AMQP默認交換(「」);要麼

    間接地經過使用服務特定交換,其使得路由密鑰免費用於諸如方法選擇或附加服務特定尋址信息的目的;要麼

    間接地,經過使用由多個服務共享的交換,其中服務名稱在路由密鑰中編碼。


使用默認交換以外的交換容許其餘應用程序接收每一個請求消息的副本,這對於監視,審計,日誌記錄和調試是有用的。

確保服務實例正在偵聽


AMQP 0-9-1發佈操做(IModel.BasicPublish)提供了交付標誌「強制性」,可用於確保客戶端發送請求時的服務可用性。若是不能將請求路由到隊列,則設置「mandatory」標誌會致使返回請求。返回的消息顯示爲basic.return命令,經過IModel上用於發佈消息的IModel.BasicReturn事件使其可見。


因爲已發佈的消息經過basic.return方法返回到客戶端,而basic.return是異步否認確認事件,所以不能將特定消息的basic.return做爲傳遞的確認:使用傳遞標誌只提供了提升杆的方法,而不是徹底消除故障。


另外,消息被標記爲「強制性」而且成功地入隊在一個或多個隊列上的事實不能保證其最終接收:最日常地,隊列能夠在消息被處理以前被刪除,可是其餘狀況,例如使用noAck標誌的消息消費者,也可使得「強制」提供的保證有條件。


或者,您可使用發佈商確認。經過調用IModel.ConfirmSelect將通道設置爲確認模式會致使代理在經過傳遞到就緒消費者或持久存儲到磁盤來處理每一個消息後發送Basic.Ack。一旦經過IModel.BasicAcks事件處理程序確認成功處理的消息,代理就承擔了該消息的責任,客戶端能夠考慮處理消息。注意,代理還能夠經過發送回Basic.Nack來否認確認消息。在這種狀況下,若是經過IModel.BasicNacks事件處理程序拒絕消息,客戶端應該假定消息丟失或以其餘方式沒法投遞。此外,請注意,不可路由的消息 - 發佈爲不存在隊列的強制性消息 - 都是Basic.Return和Basic.Ack'ed。


接收回復(Receiving Replies

AMQP 0-9-1內容頭(IBasicProperties)包含一個稱爲ReplyTo的字段,可用於告知服務在何處發佈對接收到的RPC請求的答覆。在當前的RabbitMQ客戶端庫中,ReplyTo頭中的字符串使用最普遍的格式是一個簡單的隊列名稱,儘管傳遞經過應用程序特定規則加入的交換名稱和路由鍵也是一個選項。服務實例將其答覆發佈到指定的目的地,而且請求客戶端應該安排接收如此尋址的消息,使用在適當綁定的隊列上的BasicGet或BasicConsume。

將接收到的應答與發送的請求相關聯

IBasicProperties包含一個名爲CorrelationId的字段,在AMQP 0-9-1中是一個非結構化字符串,可用於將請求匹配到回覆。應答消息應具備與附加到請求消息的相同的相關標識。


異步,單向消息傳遞(Asynchronous, one-way messaging

在某些狀況下,簡單的請求 - 回覆交互模式不適合您的應用程序。 在這些狀況下,感興趣的交互模式能夠從異步,單向,點對點消息構造。 若是應用程序要響應同步,RPC樣式請求和異步單向請求,它應該使用ReplyTo的值來決定請求它的交互樣式:若是ReplyTo存在而且非空, 請求能夠假定是一個RPC樣式的調用; 不然,應假定它是單向消息。 CorrelationId字段能夠用於將多個相關消息分組在一塊兒,就像對於RPC樣式的狀況同樣,可是更一般地將任意數量的消息綁定在一塊兒。


點對點的確認模式(Acknowledgment modes for point-to-point)

當從服務器接收消息時,AMQP能夠以兩種模式之一操做:自動確認模式(當BasicGet,BasicConsume或Subscription構造函數上設置noAck標誌時)或手動確認模式。選擇正確的確認模式對於您的應用程序很重要:


    自動確認模式意味着當服務器在網絡上傳輸消息時,服務器將內部將消息標記爲已成功傳遞。以自動確認模式傳送的消息一般不會從新傳送到任何其餘接收器。

    手動確認模式意味着在將消息標記爲已成功傳送以前,服務器將等待接收的確定確認。若是在服務器接收到確認以前關閉了交付的手動確認模式下的通道(IModel),則將從新排隊。


通常來講,

    若是服務處於手動確認模式,則它不該該確認請求消息,直到它回覆它;請參閱下面有關與外部資源交互的部分。

    客戶端可使用自動確認模式,這取決於請求消息的重傳的結果。


庫支持點對點消息傳遞(Library support for point-to-point messaging)

RabbitMQ .NET客戶端庫包括涉及點對點消息傳遞的常見任務的基本支持。

SimpleRpcServer

類RabbitMQ.Client.MessagePatterns.SimpleRpcServer實現同步RPC樣式的請求處理以及異步消息處理。用戶應該繼承SimpleRpcServer,覆蓋一個或多個以「Handle」開頭的方法。 SimpleRpcServer實例具備請求調度循環MainLoop,它將請求解釋爲RPC樣式的請求,若是請求的IBasicProperties的ReplyTo字段爲非空且非空,則須要回覆。具備缺乏或空的ReplyTo字段的請求被視爲單向。當處理了RPC樣式的請求時,將答覆發送到ReplyTo地址。答覆地址首先與描述上面給出的相似URI的語法的正則表達式相匹配;若是匹配,則使用相似URI的語法的組件做爲回覆地址,若是不匹配,則將整個字符串用做簡單隊列名稱,並將回覆發送到默認交換(「」)一個等於ReplyTo字符串的路由鍵。

SimpleRpcClient

類RabbitMQ.Client.MessagePatterns.SimpleRpcClient實現與SimpleRpcServers或相似的交互的代碼。 RPC風格的交互是用Call方法執行的。 (私人)訂閱設置爲從服務接收回復,而且ReplyTo字段設置爲指向訂閱。請求的CorrelationId字段被初始化爲新的GUID。異步/單向交互被簡單地傳遞到IModel.BasicPublish而不修改:它是由調用者在異步狀況下設置CorrelationId。該類目前不支持在已發佈的請求消息上設置「mandatory」標誌,也不支持處理因爲設置該標誌而可能產生的任何BasicReturn事件。從內部訂閱檢索答覆的代碼當前沒法處理多個同時未解決的RPC請求,由於它要求答覆以與發送請求相同的順序到達。在解除此限制以前,不要嘗試管理經過SimpleRpcClient的單個實例發送的請求。另請參見可覆蓋的受保護方法SimpleRpcClient.RetrieveReply。使用SimpleRpcClient的基本模式以下:

   
   
   
   
using (IConnection conn = new ConnectionFactory() .CreateConnection(args[0])) { using (IModel ch = conn.CreateModel()) { SimpleRpcClient client = new SimpleRpcClient(ch, /* ... */); // in the line above, the "..." indicates the parameters // used to specify the address to use to route messages // to the service. // The next three lines are optional: client.TimeoutMilliseconds = 5000; // defaults to infinity client.TimedOut += new EventHandler(TimedOutHandler); client.Disconnected += new EventHandler(DisconnectedHandler); byte[] replyMessageBytes = client.Call(requestMessageBytes); // other useful overloads of Call() and Cast() are // available. See the code documentation of SimpleRpcClient // for full details. }}
請注意,單個SimpleRpcClient實例能夠執行許多(順序)Call()和Cast()請求! 建議單個SimpleRpcClient重複用於多個服務請求,只要請求是嚴格順序的。


事件廣播(Event Broadcasting)

當應用程序但願在不知道每一個感興趣方的地址的狀況下向應用程序池指示狀態改變或其餘通知時,發生事件廣播模式。 對某個事件子集感興趣的應用程序使用交換和隊列綁定來配置哪些事件被路由到其本身的專用隊列。


一般,事件將經過主題交換廣播,可是直接交換雖然不太靈活,可是有時對於其有限模式匹配能力足夠的應用能夠執行得更好。


發佈事件(Publishing events

要發佈事件,首先確保交換存在,而後肯定適當的路由密鑰。 例如,對於股票,一個鍵如「stock.ibm.nyse」多是合適的; 對於其餘應用程序,其餘主題層次結構將天然出現。 主題交換經常使用。 而後發佈消息。 例如:

   
   
   
   
using (IConnection conn = new ConnectionFactory() .CreateConnection(args[0])) { using (IModel ch = conn.CreateModel()) { IBasicProperties props = ch.CreateBasicProperties(); FillInHeaders(props); // or similar byte[] body = ComputeBody(props); // or similar ch.BasicPublish("exchangeName", "chosen.routing.key", props, body); }}
請參閱RabbitMQ.Client.IModel類中的BasicPublish的各類重載的文檔。

訂閱(Subscription)
RabbitMQ.Client.MessagePatterns.Subscription類實現了大多數接收消息(包括,特別是廣播事件)的樣板,包括消費者聲明和管理,但不包括隊列和交換聲明和隊列綁定。例如,
    
    
    
    
// "IModel ch" in scope.Subscription sub = new Subscription(ch, "STOCK.IBM.#");foreach (BasicDeliverEventArgs e in sub) { // handle the message contained in e ... // ... and finally acknowledge it sub.Ack(e);}
將使用IModel.BasicConsume在隊列上啓動一個消費者。它假定隊列和任何綁定之前已經聲明。應該爲每一個接收的事件調用Subscription.Ack(),不管是否使用自動確認模式,由於Subscription內部知道是否須要確認的實際網絡消息,並以有效的方式爲您處理只要在你的代碼中老是調用Ack()。有關完整的詳細信息,請參閱Subscription類的代碼文檔。

使用自定義消費者檢索事件(Retrieving events with a custom consumer)
有時,使用Subscription的高級方法是足夠的。 然而,其餘時候,須要使用定製消費者。 這種檢索事件的方法是將隊列綁定到與適當的路由 - 密鑰模式規範相關的交換。 例如,假設咱們的應用程序想要在隊列「MyApplicationQueue」上檢索有關IBM的全部價格:         

   
   
   
   
// "IModel ch" in scope.ch.ExchangeDeclare("prices", "topic");ch.QueueDeclare("MyApplicationQueue", false, true, true, null);ch.QueueBind("MyApplicationQueue", "prices", "STOCK.IBM.#", false, null);
而後使用BasicGet或BasicConsume從「MyApplicationQueue」消耗消息。 一個更完整的例子在ApiOverview章節。


事件廣播的確認模式(Acknowledgment modes for event broadcasting

與用於點對點消息傳遞的相同的自動確認/手動確認決定可用於廣播事件的消費者,可是交互的模式引入不一樣的權衡:


    對於高容量消息傳遞,其中偶爾可接受的是不接收一個感興趣的消息,自動確認模式是有意義的

    對於其中知足咱們的訂閱的每一個消息須要被遞送的狀況,手動確認是適當的


有關詳細信息,請參閱下面的可靠郵件傳輸部分。還要注意,只要爲每一個接收的消息調用Subscription.Ack(),類Subscription就會負責確認和各類確認模式。


可靠的消息傳輸(Reliable message transfer)

消息能夠在具備不一樣服務質量(QoS)水平的端點之間傳輸。通常來講,不能徹底排除故障,但重要的是要了解各類交付故障模式,以瞭解從故障中恢復的種類,以及可能恢復的狀況。重申:不可能徹底排除故障。能夠作的最好是縮小可能發生故障的條件,而且當檢測到故障時通知系統操做員。

至少一次遞送


該QoS水平確保消息被傳遞到其最終目的地至少一次。也就是說,接收器能夠接收消息的多個副本。若是對於給定消息,反作用僅發生一次是重要的,則應該使用至多一次遞送。


要實施至少一次投放(At-least-once delivery)

    像往常同樣發佈消息,在其上具備一些相關標識符和回覆地址,使得接收方能夠確認對發送方的接收。當接收到消息時,將確認消息發送回發送者。若是消息是RPC請求,則RPC應答消息隱式地是對請求的接收的確認。

    或者,不是手動實現往返邏輯,而是客戶端可使用發佈者確認。經過在通道上啓用確認模式,客戶端請求代理確認或否認確認從該點開始在該通道上發送的全部消息。請參閱「責任轉移」中有關如何使用確認的說明。


決定郵件重發策略可能很困難。一些簡單的從新發送策略是:


    若是您的鏈接丟失或在您收到收據確認以前發生其餘崩潰,請從新發送

    若是您在幾秒鐘內沒有收到確認,則超時並從新發送。請確保每次從新發送的超時時間加倍,以幫助避免與重試相關的拒絕服務和網絡擁塞。


最多一次傳送(At-most-once delivery)

對於最多一次傳遞,只需發佈消息,一次,照常。不須要相關標識符。在使用應用程序中接收消息,注意交貨時的Redelivered標誌。 Redelivered標誌只有在服務器認爲它提供第一次消息消息時纔會清除。若是以前已進行任何交貨嘗試,則從新送達標誌將被設置。 Redelivered標誌是一個很是有限的信息,只給出最多一次的語義。


用多節點RabbitMQ集羣編碼(Coding with multi-node RabbitMQ clusters

在須要連續服務的狀況下,能夠經過一些仔細的編程和用於故障轉移的熱備份集羣的可用性來對抗服務器故障的可能性。

失敗時的主要關注點是


    公佈/認可的工做單位的原子性,以及備份服務器上已配置資源的可用性


消息生產者應注意使用事務,以便從服務器接收一組消息的確定確認,而且應該保留他們爲了執行它們的工做須要可用的交換,隊列和綁定的記錄,所以在故障切換時,能夠在重放最近的要恢復的事務以前聲明適當的資源。


消息消費者應該意識到在故障轉移時丟失或重複消息的可能性:發佈者能夠決定從新發送其結果有疑問的事務,或者發佈者認爲完成的事務可能因爲集羣節點的故障而徹底消失。

與外部資源交互

服務的常見模式是


    經由隊列接收服務請求

    更新一些外部資源,如文件或數據庫

    經過RabbitMQ進行回覆,或至少向服務器確認觸發操做的消息已完成


至少一次模式的元素一般與外部資源模式一塊兒出現 - 具體地,上面關於可靠消息傳遞的部分中討論的反作用一般對外部資源產生影響。


在交付必須被處理不超過一次而且與外部資源結合使用的狀況下,重要的是編寫可以在每一個步驟的代碼以肯定該步驟是否已經在完成整個交易的一些先前嘗試中採起,而且若是它具備,則可以在該嘗試中省略它並繼續下一步驟。例如:


    若是工做觸發請求丟失,另外一個副本將(最終)從最終請求者到達。

    若是已經執行了工做,例如更新了數據庫表,則在先前接收到有問題的工做項時,服務須要保持對於原子工做自己的原子的外部工做的完成的記錄:例如,在相同的數據庫事務中,能夠更新尊敬請求的一些日誌,或者能夠更新被修改的行以包括引發修改的請求的ID,以及修改該行的先前請求ID題。


這使得重要的是可以壓縮請求ID,以便它們不在執行的工做的日誌中佔用無界空間,而且使得咱們不須要與最終請求者引入徹底分佈式垃圾收集協議。這樣作的一種方式是選擇使用嚴格增長的請求ID,使得可使用「高水位線」。一旦知道已經執行了工做,而且已經產生了答覆(若是存在答覆),則能夠根據須要將答覆發送回請求者。請求者知道它指望的回覆,而且能夠丟棄不想要的重複。只要相同請求的重複老是收到相同的答覆消息,則複製者沒必要當心發送太多的複製副本。一旦已經將答覆發送到服務器,則能夠確認請求消息爲已接收而且與服務器服務器一塊兒處理。在沒有對請求的答覆的狀況下,確認仍然有用以確保請求不丟失。




相關文章
相關標籤/搜索