EasyNetQ的使命是爲基於RabbitMQ的消息傳遞提供最簡單的API。 核心IBus接口有意避免公開AMQP概念,如交換,綁定和隊列,而是實現基於消息類型的默認交換綁定隊列拓撲。html
對於某些場景,可以配置您本身的exchange綁定隊列拓撲是頗有用的;高級EasyNetQ API容許您這樣作。高級API對AMQP有很好的理解。前端
高級API經過IAdvancedBus接口實現。 該接口的一個實例能夠經過IBus的高級屬性進行訪問: var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced; 數組
1,聲明交換機安全
要聲明交換機,請使用IAdvancedBus的ExchangeDeclare方法:服務器
IExchange ExchangeDeclare( string name, string type, bool passive = false, bool durable = true, bool autoDelete = false, bool @internal = false, string alternateExchange = null, bool delayed = false);
name: 交換機名稱
type: 有效的交換機類型(使用靜態ExchangeType類的屬性安全地聲明交換)
passive: 不要建立交換。 若是指定的交換不存在,則拋出異常。 (默認爲false)
durable: 生存服務器從新啓動。 若是此參數爲false,則在服務器從新啓動時,交換將被刪除。 (默認爲true)
autoDelete: 最後一個隊列未被綁定時刪除此交換。 (默認爲false)
internal: 這種交換不能由發佈者直接使用,而只能由交換使用來交換綁定。 (默認爲false)
alternateExchange:若是沒法路由郵件,則將郵件路由到此交換機。
delayed:若是設置,則分配x延遲型交換以路由延遲的消息。app
①簡單案例異步
// create a direct exchange var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct); // create a topic exchange var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Topic); // create a fanout exchange var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Fanout);
要得到RabbitMQ默認交換,請執行如下操做:函數
var exchange = Exchange.GetDefault();
2,聲明隊列oop
要聲明隊列,請使用IAdvancedBus的QueueDeclare方法:性能
IQueue QueueDeclare( string name, bool passive = false, bool durable = true, bool exclusive = false, bool autoDelete = false, int? perQueueMessageTtl = null, int? expires = null, byte? maxPriority = null, string deadLetterExchange = null, string deadLetterRoutingKey = null, int? maxLength = null, int? maxLengthBytes = null);
name: 隊列的名稱
passive:若是隊列不存在,則不要建立該隊列,而是引起異常(默認爲false)
durable: 能夠在服務器從新啓動後繼續運行 若是這是錯誤的,則在服務器從新啓動時,隊列將被刪除。 (默認爲true)
exclusive: 只能由當前鏈接訪問,其餘鏈接上來會拋異常。 (默認爲false)
autoDelete: 全部消費者斷開鏈接後刪除隊列。 (默認爲false)
perQueueMessageTtl:丟棄以前,消息在隊列中應保留多長時間(以毫秒爲單位)。 (默認未設置)
expires: 自動刪除以前,隊列應該保持未使用狀態的時間以毫秒爲單位。 (默認未設置)
maxPriority: 肯定隊列應支持的最大消息優先級。
deadLetterExchange:肯定交換機的名稱在被服務器自動刪除以前能夠保持未使用狀態。
deadLetterRoutingKey:若是設置,將路由消息與指定的路由密鑰,若是未設置,則消息將使用與最初發布的路由密鑰相同的路由。
maxLength: 隊列中可能存在的最大可用消息數。 一旦達到限制,郵件就會從隊列的前面被刪除或死信,以便爲新郵件騰出空間。
maxLengthBytes:隊列的最大大小(以字節爲單位)。 一旦達到限制,郵件就會從隊列的前面被刪除或死信,以便爲新郵件騰出空間
請注意,若是定義了maxLength和/或maxLengthBytes屬性,則RabbitMQ的行爲可能並不如人們所指望的那樣。 人們可能會指望代理拒絕進一步的消息; 可是RabbitMQ文檔(https://www.rabbitmq.com/maxlength.html)代表,一旦達到限制,郵件將從隊列的前端丟棄或死鎖,以便爲新郵件騰出空間。
①簡單案例
// declare a durable queue var queue = advancedBus.QueueDeclare("my_queue"); // declare a queue with message TTL of 10 seconds: var queue = advancedBus.QueueDeclare("my_queue", perQueueTtl:10000);
要聲明一個'未命名的'獨佔隊列,其中RabbitMQ提供隊列名稱,請使用不帶參數的QueueDeclare重載:
var queue = advancedBus.QueueDeclare();
請注意,EasyNetQ的自動消費者從新鏈接邏輯被關閉以用於獨佔隊列。
3,綁定
你將一個隊列綁定到像這樣的交換機上:
var queue = advancedBus.QueueDeclare("my.queue"); var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic); var binding = advancedBus.Bind(exchange, queue, "A.*");
要指定隊列和交換機之間的多個綁定,只需執行多個綁定調用便可:
var queue = advancedBus.QueueDeclare("my.queue"); var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic); advancedBus.Bind(exchange, queue, "A.B"); advancedBus.Bind(exchange, queue, "A.C");
你也能夠將交換機綁定在一個鏈上:
var sourceExchange = advancedBus.ExchangeDeclare("my.exchange.1", ExchangeType.Topic); var destinationExchange = advancedBus.ExchangeDeclare("my.exchange.2", ExchangeType.Topic); var queue = advancedBus.QueueDeclare("my.queue"); advancedBus.Bind(sourceExchange, destinationExchange, "A.*"); advancedBus.Bind(destinationExchange, queue, "A.C");
4,發佈
先進的Publish方法容許您指定要發佈消息的交換機。 它還容許訪問消息的AMQP基本屬性。
建立你的消息。 高級API要求您的消息包裝在消息中:
var myMessage = new MyMessage {Text = "Hello from the publisher"}; var message = new Message<MyMessage>(myMessage);
Message類可以讓您訪問AMQP基本屬性,例如:
message.Properties.AppId = "my_app_id"; message.Properties.ReplyTo = "my_reply_queue";
最後使用發佈方法發佈您的消息。 在這裏,咱們正在向默認交流發佈:
bus.Publish(Exchange.GetDefault(), queueName, false, false, message);
發佈的重載容許您繞過EasyNetQ的消息序列化並建立本身的字節數組消息:
var properties = new MessageProperties(); var body = Encoding.UTF8.GetBytes("Hello World!"); bus.Publish(Exchange.GetDefault(), queueName, false, false, properties, body);
5,訂閱
使用IAdvancedBus的Consume方法來消費隊列中的消息。
IDisposable Consume<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> onMessage) where T : class;
onMessage委託是您爲消息傳遞提供的處理程序。 其參數以下:
如上面發佈部分所述,IMessage使您能夠訪問消息及其MessageProperties。 MessageReceivedInfo爲您提供有關消息消耗的上下文的額外信息:
public class MessageReceivedInfo { public string ConsumerTag { get; set; } public ulong DeliverTag { get; set; } public bool Redelivered { get; set; } public string Exchange { get; set; } public string RoutingKey { get; set; } }
您返回一個容許您編寫非阻塞異步處理程序的任務。
消耗方法返回一個IDisposable。 調用其Dispose方法來取消使用者。
若是您只須要同步處理程序,則可使用同步重載:
IDisposable Consume<T>(IQueue queue, Action<IMessage<T>, MessageReceivedInfo> onMessage) where T : class;
要繞過EasyNetQ的消息序列化器,請使用提供原始字節數組消息的消耗超載:
void Consume(IQueue queue, Func<Byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
在這個例子中,咱們使用隊列'my_queue'中的原始消息字節:
var queue = advancedBus.QueueDeclare("my_queue"); advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() => { var message = Encoding.UTF8.GetString(body); Console.WriteLine("Got message: '{0}'", message); }));
您能夠選擇使用Consume方法的這種重載向單個使用者註冊多個處理程序:
IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers);
IHandlerRegistration接口以下所示:
public interface IHandlerRegistration { /// <summary> /// 添加異步處理程序 /// </summary> /// <typeparam name="T">The message type</typeparam> /// <param name="handler">The handler</param> /// <returns></returns> IHandlerRegistration Add<T>(Func<IMessage<T>, MessageReceivedInfo, Task> handler) where T : class; /// <summary> /// 添加同步處理程序 /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="handler">The handler</param> /// <returns></returns> IHandlerRegistration Add<T>(Action<IMessage<T>, MessageReceivedInfo> handler) where T : class; /// <summary> ///若是處理程序集合在未找到匹配的處理程序時應拋出EasyNetQException,則設置爲true;若是應返回noop處理程序,則設置爲false .Default爲true。 /// </summary> bool ThrowOnNoMatchingHandler { get; set; } }
在這個例子中,咱們註冊了兩個不一樣的處理程序,一個處理MyMessage類型的消息,另外一個處理MyOtherMessage類型的消息:
bus.Advanced.Consume(queue, x => x .Add<MyMessage>((message, info) => { Console.WriteLine("Got MyMessage {0}", message.Body.Text); countdownEvent.Signal(); }) .Add<MyOtherMessage>((message, info) => { Console.WriteLine("Got MyOtherMessage {0}", message.Body.Text); countdownEvent.Signal(); }) );
查看這篇博文了解更多信息:http://mikehadlow.blogspot.co.uk/2013/11/easynetq-multiple-handlers-per-consumer.html
6,從隊列中獲取單個消息
要從隊列中獲取單條消息,請使用IAdvancedBus.Get方法:
IBasicGetResult<T> Get<T>(IQueue queue) where T : class;
從AMQP文檔:「此方法使用同步對話提供對隊列中消息的直接訪問,該同步對話旨在用於同步功能比性能更重要的特定類型的應用程序。」 不要使用Get來輪詢消息。 在典型的應用場景中,您應該始終支持消費。
IBasicGetResult具備如下簽名:
/// <summary> ///AdvancedBus Get方法的結果 /// </summary> /// <typeparam name="T"></typeparam> public interface IBasicGetResult<T> where T : class { /// <summary> ///若是消息可用,則爲true,不然爲false。 /// </summary> bool MessageAvailable { get; } /// <summary> /// 消息從隊列中回收。 若是沒有消息可用,此屬性將引起MessageNotAvailableException。 在嘗試訪問它以前,您應該檢查MessageAvailable屬性。 /// </summary> IMessage<T> Message { get; } }
在訪問Message屬性前老是檢查MessageAvailable方法。
一個例子:
var queue = advancedBus.QueueDeclare("get_test"); advancedBus.Publish(Exchange.GetDefault(), "get_test", false, false, new Message<MyMessage>(new MyMessage{ Text = "Oh! Hello!" })); var getResult = advancedBus.Get<MyMessage>(queue); if (getResult.MessageAvailable) { Console.Out.WriteLine("Got message: {0}", getResult.Message.Body.Text); } else { Console.Out.WriteLine("Failed to get message!"); }
要訪問原始二進制消息,請使用非通用Get方法:
IBasicGetResult Get(IQueue queue);
非泛型IBasicGetResult具備如下定義:
public interface IBasicGetResult { byte[] Body { get; } MessageProperties Properties { get; } MessageReceivedInfo Info { get; } }
7,消息類型必須匹配
EasyNetQ高級API指望訂戶僅接收通用類型參數提供的類型的消息。 在上面的例子中,只有MyMessage類型的消息應該被接收。 可是,EasyNetQ不保護您不向用戶發佈錯誤類型的消息。 我能夠很容易地設置一個交換綁定隊列拓撲來發布NotMyMessage類型的消息,該消息將被上面的處理程序接收。 若是接收到錯誤類型的消息,EasyNetQ將拋出EasyNetQInvalidMessageTypeException異常:
EasyNetQ.EasyNetQInvalidMessageTypeException: Message type is incorrect. Expected 'EasyNetQ_Tests_MyMessage:EasyNetQ_Tests', but was 'EasyNetQ_Tests_MyOtherMessage:EasyNetQ_Tests' at EasyNetQ.RabbitAdvancedBus.CheckMessageType[TMessage](MessageProperties properties) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 217 at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass1`1.<Subscribe>b__0(Byte[] body, MessageProperties properties, MessageReceivedInfo messageRecievedInfo) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 131 at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass6.<Subscribe>b__5(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, Byte[] body) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 176 at EasyNetQ.QueueingConsumerFactory.HandleMessageDelivery(BasicDeliverEventArgs basicDeliverEventArgs) in D:\Source\EasyNetQ\Source\EasyNetQ\QueueingConsumerFactory.cs:line 85
8,事件
當經過RabbitHutch實例化一個IBus時,您能夠指定一個AdvancedBusEventHandlers。 該類包含IAdvancedBus中存在的每一個事件的事件處理程序屬性,並提供了在總線實例化以前指定事件處理程序的方法。
沒必要使用它,由於一旦建立了總線,它仍然能夠添加事件處理程序。 可是,若是您但願可以捕獲RabbitAdvancedBus的第一個Connected事件,則必須將AdvancedBusEventHandlers與Connected EventHandler一塊兒使用。 這是由於總線將在構造函數返回以前嘗試鏈接一次,若是鏈接嘗試成功,將會引起RabbitAdvancedBus.OnConnected。
var buss = RabbitHutch.CreateBus("host=localhost", new AdvancedBusEventHandlers(connected: (s, e) => { var advancedBus = (IAdvancedBus)s; Console.WriteLine(advancedBus.IsConnected); // This will print true. }));