上一篇隨筆記錄到RabbitMQ的安裝,安裝完成,咱們就開始使用吧。html
RabbitMQ簡介算法
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。安全
AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。服務器
RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集羣和分佈式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。異步
DotNet Core使用RabbitMQ分佈式
經過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/ui
定義生產者:spa
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //聲明一個隊列 channel.QueueDeclare("hello", false, false, false, null); Console.WriteLine("\nRabbitMQ鏈接成功,請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish("", "hello", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();
定義消費者:設計
//建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到消息: {message}"); //確認該消息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啓動消費者 設置爲手動應答消息 channel.BasicConsume("hello", false, consumer); Console.WriteLine("消費者已啓動"); Console.ReadKey(); channel.Dispose(); connection.Close();
演示以下:code
啓動了一個生產者,兩個消費者,能夠看見兩個消費者都能接收到消息,消息投遞到哪一個消費者是由RabbitMQ決定的。
RabbitMQ消費失敗的處理
RabbitMQ採用消息應答機制,即消費者收到一個消息以後,須要發送一個應答,而後RabbitMQ纔會將這個消息從隊列中刪除,若是消費者在消費過程當中出現異常,斷開鏈接切沒有發送應答,那麼RabbitMQ會將這個消息從新投遞。
咱們來修改一下消費者的代碼:
//接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到消息: {message}"); Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發送回執"); Thread.Sleep(10000); //確認該消息已被消費 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"已發送回執[{ea.DeliveryTag}]"); };
演示以下:
從圖中能夠看出,設置了消息應答延遲10s,若是在這10s中,該消費者斷開了鏈接,那麼消息會被RabbitMQ從新投遞。
使用RabbitMQ的Exchange
前面的例子,咱們能夠看到生產者將消息投遞到Queue中,實際上這種方式在RabbitMQ中永遠都不會發生的。實際的狀況是,生產者將消息發送到Exchange(交換器),下圖中的X,由Exchange(交換器)將消息路由到一個或多個Queue中(或者丟棄)。
AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者一般不知道是否一個消息會被髮送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,而後Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就相似於一個交換機,轉發各個消息分發到相應的隊列中。
Exchange Types(交換器類型)
RabbitMQ經常使用的Exchange Type有Fanout、Direct、Topic、Headers這四種
一、Fanout:
這種類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中,這時Routing key不起做用
Fanout Exchange 不須要處理RouteKey 。只須要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的全部隊列上。相似子網廣播,每臺子網內的主機都得到了一份複製的消息。
因此,Fanout Exchange 轉發消息是最快的。
爲了演示效果,定義了兩個隊列,分別爲hello1,hello2,每一個隊列都擁有一個消費者。
static void Main(string[] args) { string exchangeName = "TestFanoutChange"; string queueName1 = "hello1"; string queueName2 = "hello2"; string routeKey = ""; //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定義一個Direct類型交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null); //定義隊列1 channel.QueueDeclare(queueName1, false, false, false, null); //定義隊列2 channel.QueueDeclare(queueName2, false, false, false, null); //將隊列綁定到交換機 channel.QueueBind(queueName1, exchangeName, routeKey, null); channel.QueueBind(queueName2, exchangeName, routeKey, null); //生成兩個隊列的消費者 ConsumerGenerator(queueName1); ConsumerGenerator(queueName2); Console.WriteLine($"\nRabbitMQ鏈接成功,\n\n請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); }
/// <summary> /// 根據隊列名稱生成消費者 /// </summary> /// <param name="queueName"></param> static void ConsumerGenerator(string queueName) { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Queue:{queueName}收到消息: {message}"); //確認該消息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啓動消費者 設置爲手動應答消息 channel.BasicConsume(queueName, false, consumer); Console.WriteLine($"Queue:{queueName},消費者已啓動"); }
運行效果以下:
二、Direct
這種類型的Exchange路由規則也很簡單,它會把消息路由到哪些binding key與routingkey徹底匹配的Queue中。
Direct模式,可使用rabbitMQ自帶的Exchange:default Exchange 。因此不須要將Exchange進行任何綁定(binding)操做 。消息傳遞時,RouteKey必須徹底匹配,纔會被隊列接收,不然該消息會被拋棄。
static void Main(string[] args) { string exchangeName = "TestChange"; string queueName = "hello"; string routeKey = "helloRouteKey"; //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定義一個Direct類型交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定義一個隊列 channel.QueueDeclare(queueName, false, false, false, null); //將隊列綁定到交換機 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ鏈接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();
運行效果以下:
三、Topic
這種類型的Exchange的路由規則支持 binding key 和 routing key 的模糊匹配,會把消息路由到知足條件的Queue。 binding key 中能夠存在兩種特殊字符 *與 #,用於作模糊匹配,其中 * 用於匹配一個單詞,# 用於匹配0個或多個單詞,單詞以符號「.」爲分隔符。
以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1與Q2,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey。
static void Main(string[] args) { string exchangeName = "TestTopicChange"; string queueName = "hello"; string routeKey = "TestRouteKey.*"; //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定義一個Direct類型交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null); //定義隊列1 channel.QueueDeclare(queueName, false, false, false, null); //將隊列綁定到交換機 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ鏈接成功,\n\n請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); }
運行效果以下:
四、Headers
這種類型的Exchange不依賴於 routing key 與 binding key 的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。
參考:
官網:https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html