[.NET] RabbitMQ 的行爲藝術

RabbitMQ 的行爲藝術

 

  好像,今天已是 2 月 28 號了。html

  據說,2九、30、31 號放假。java

  聽說,有圖,有真相。數組

 

目錄

  • 簡介服務器

  • 環境搭建異步

  • 示例一:簡單的 Hello Worldide

  • 示例二:發佈/訂閱模式性能

  • 嘗試發現 - 新物種 EasyNetQui

 

簡介

  RabbitMQ:一個消息系統,基於 AMQP 系統協議,由 Erlang 語言開發。url

  優勢:健壯、使用簡單、開源和支持各類流行的語言(如 Python、java、.NET)等。spa

 

  MQ(Message Queue):消息隊列的簡稱,是一種應用程序之間的通訊機制。

  做用:將部分無需當即回調獲取結果,而且耗時的操做,使用異步處理的方式提升服務器的吞吐量及性能。如:日誌記錄。

 

圖:簡單的通訊方式,及加入 MQ 後的變化

 
     A 端:生產者將消息寫(插)入隊列;
     MQ(隊列) :中間件,消息的載體;
     B 端:消費者從隊列讀(取)出消息。
 
   MQ 特色:消費者 - 生產者模型的一種表現形式。
 

環境搭建

  1.官網下載安裝包:http://www.rabbitmq.com/ ;

  2.安裝時會提示你下載 Erlang 語言環境;

  3.啓動安裝完的服務:RabbitMQ;

 

  4.在 cmd 中指向 sbin 目錄,並輸入如下命令,才能打開 WEB 管理界面:

rabbitmq-plugins enable rabbitmq_management

 

  5.默認 url:http://localhost:15672/#/

 

示例一:簡單的 Hello World

 

  P(Producer):生產者,意味着發送;

  Queue:隊列,本質上是一個無限的緩衝區,能夠儲存儘量多的信息;

  C(Consumer):消費者,等待並接收消息。

  【備註】生產者和消費者不須要駐留在同一臺服務器上。

 

  Producer.cs

 1     public class Producer  
 2     {
 3         public static void Send()
 4         {
 5             var factory = new ConnectionFactory { HostName = "localhost" };
 6 
 7             //建立鏈接對象,基於 Socket
 8             using (var connection = factory.CreateConnection())
 9             {
10                 //建立新的渠道、會話
11                 using (var channel = connection.CreateModel())
12                 {
13                     //聲明隊列
14                     channel.QueueDeclare(queue: "hello",    //隊列名
15                         durable: false,     //持久性
16                         exclusive: false,   //排他性
17                         autoDelete: false,  //自動刪除
18                         arguments: null);
19 
20                     const string message = "Hello World!";
21                     var body = Encoding.UTF8.GetBytes(message);
22 
23                     channel.BasicPublish(exchange: "",  //交換機名
24                         routingKey: "hello",    //路由鍵
25                         basicProperties: null,
26                         body: body);
27                 }
28             }
29         }
30     }

  【備註】隊列名若是已存在,將不會重複建立。假設隊列已存在,修改 channel.QueueDeclare() 方法內的參數後啓動會出現異常。

  【備註】消息內容是一個字節數組。

 
  Consumer.cs
 1     class Consumer
 2     {
 3         public static void Receive()
 4         {
 5             var factory = new ConnectionFactory() { HostName = "localhost" };
 6 
 7             using (var connection = factory.CreateConnection())
 8             {
 9                 using (var channel = connection.CreateModel())
10                 {
11                     channel.QueueDeclare(queue: "hello",
12                                          durable: false,
13                                          exclusive: false,
14                                          autoDelete: false,
15                                          arguments: null);
16 
17                     //建立基於該隊列的消費者,綁定事件
18                     var consumer = new EventingBasicConsumer(channel);
19                     consumer.Received += (model, ea) =>
20                     {
21                         var body = ea.Body;     //消息主體
22                         var message = Encoding.UTF8.GetString(body);
23                         Console.WriteLine(" [x] Received {0}", message);
24                     };
25 
26                     //啓動消費者
27                     channel.BasicConsume(queue: "hello",    //隊列名
28                                          noAck: true,   //false:手動應答;true:自動應答
29                                          consumer: consumer);
30 
31                     Console.Read();
32                 }
33             }
34         }
35     }

  【疑問】在消費者的類裏面爲何會再次聲明隊列(channel.QueueDeclare())呢?-- 由於接收方可能會在發送方啓動前啓動,這是出於保險起見。

 

示例二:發佈/訂閱模式

 
 

  1.Exchange 交換機和 Exchange Type 交換類型  

  RabbitMQ 消息傳遞模型的核心思想是,生產者不會直接將消息發給隊列。

  這裏咱們將引入新的名詞 Exchange(交換機)。交換機傳遞消息的類型也有不少種:direct, topic, headers(不經常使用) 和 fanout,咱們稱之爲交換類型。

圖:Direct

 

圖:Fanout 

 

圖:Topic

 --上述 3 張圖來源:http://m.blog.csdn.net/article/details?id=52262850

  

  這裏,建立一個名爲 「logs」 的交換機,它的類型爲廣播類型(fanout:能夠將收到的全部消息,廣播給全部已知的隊列)。

channel.ExchangeDeclare(exchange: "logs",   //交換機名
                        type: "fanout");    //交換類型

  

  2.臨時隊列

  做爲消費者,咱們有時候只須要一些新的(或者空的)隊列,此時,更好的方式就是讓它自動生成一個隨機名字的隊列;其次,當隊列鏈接中斷時會選擇自動刪除對應的消費者。

  建立一個非持久,有排他性和自動刪除特性的隊列(無參時)。

var queueName = channel.QueueDeclare().QueueName;

 

  3.Binding 綁定

  【疑問】有了 Exchange 和 channel,這時,還須要什麼東西呢?-- 咱們要建立 Exchange 和 channel 關係的橋樑,這個橋樑稱之爲 Binding(綁定)。

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

 

 
 1     class Producer
 2     {
 3         public static void Send()
 4         {
 5             var factory = new ConnectionFactory()
 6             {
 7                 HostName = "localhost",
 8                 Port = 5672,
 9                 UserName = "guest",
10                 Password = "guest"
11             };
12 
13             using (var connection = factory.CreateConnection())
14             {
15                 using (var channel = connection.CreateModel())
16                 {
17                     channel.ExchangeDeclare(exchange: "logs",   //交換機名
18                         type: "fanout");    //交換類型
19 
20                     // Guid
21                     var message = Guid.NewGuid().ToString();
22                     var body = Encoding.UTF8.GetBytes(message);
23                     channel.BasicPublish(exchange: "logs",
24                                          routingKey: "",
25                                          basicProperties: null,
26                                          body: body);
27 
28                     Console.WriteLine(" [x] Sent {0}", message);
29                 }
30 
31                 Console.WriteLine(" Press [enter] to exit.");
32                 Console.ReadLine();
33             }
34         }
35     }
Producer.cs //生產者
 1     class Reciver
 2     {
 3         public static void Recive()
 4         {
 5             var factory = new ConnectionFactory()
 6             {
 7                 HostName = "localhost",
 8                 Port = 5672,
 9                 UserName = "guest",
10                 Password = "guest"
11             };
12 
13             using (var connection = factory.CreateConnection())
14             using (var channel = connection.CreateModel())
15             {
16                 channel.ExchangeDeclare(exchange: "wen_logs",   //交換機名
17                     type: "fanout");    //交換類型
18 
19                 //建立隊列
20                 var queueName = channel.QueueDeclare().QueueName;
21                 channel.QueueBind(queue: queueName,
22                                   exchange: "wen_logs",
23                                   routingKey: "");
24 
25                 Console.WriteLine(" [*] Waiting for logs.");
26 
27                 var consumer = new EventingBasicConsumer(channel);
28                 consumer.Received += (model, ea) =>
29                 {
30                     var body = ea.Body;
31                     var message = Encoding.UTF8.GetString(body);
32                     Console.WriteLine(" [x] {0}", message);
33                 };
34                 channel.BasicConsume(queue: queueName,
35                                      noAck: true,
36                                      consumer: consumer);
37 
38                 Console.WriteLine(" Press [enter] to exit.");
39                 Console.ReadLine();
40             }
41         }
42     }
Reciver.cs //接收者

 

嘗試發現 - 新物種 EasyNetQ

  這都不是事!EasyNetQ,看名字就知道,搞定 MQ,So easy!

 

  鏈接 RabbitMQ 代理:

var bus = RabbitHutch.CreateBus("host=localhost");

  發佈:

bus.Publish(message);

  訂閱:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

 

  下面咱們經過 Demo 來感覺一下 Easy 的程度吧,建立項目(效果圖以下,附 Demo 下載):

  Wen.EasyNetQDemo.Model:類庫

  Wen.EasyNetQDemo.Publisher,Wen.EasyNetQDemo.Subscriber:控制檯應用程序,都使用 Nuget 直接安裝 EasyNetQ 包,都引用類庫 Model。

 
  Demo.cs
    public class Demo
    {
        public string Message { get; set; }
    }

 

  Publisher

 1 using System;
 2 using EasyNetQ;
 3 using Wen.EasyNetQDemo.Model;
 4 
 5 namespace Wen.EasyNetQDemo.Publisher
 6 {
 7     internal class Program
 8     {
 9         private static void Main(string[] args)
10         {
11             using (var bus = RabbitHutch.CreateBus("host=localhost"))
12             {
13                 string input;
14                 Console.WriteLine("請輸入信息。 若是是「esc」 將退出當前窗口。");
15 
16                 while ((input = Console.ReadLine()) != "esc")
17                 {
18                     bus.Publish(new Demo
19                     {
20                         Message = input
21                     });
22                 }
23 
24             }
25         }
26     }
27 }

  【備註】RabbitHutch.CreateBus() 方法能夠建立一個簡單的發佈/訂閱和包含請求/響應 API 的消息總線。

 

  Subscriber

 1 using System;
 2 using EasyNetQ;
 3 using Wen.EasyNetQDemo.Model;
 4 
 5 namespace Wen.EasyNetQDemo.Subscriber
 6 {
 7     internal class Program
 8     {
 9         private static void Main(string[] args)
10         {
11             using (var bus = RabbitHutch.CreateBus("host=localhost"))
12             {
13                 bus.Subscribe<Demo>("test", HandleDemo);
14 
15                 Console.WriteLine("監聽信息中...輸入「return」將退出當前窗口!");
16                 Console.ReadLine();
17             }
18         }
19 
20         private static void HandleDemo(Demo demo)
21         {
22             Console.ForegroundColor = ConsoleColor.Green;
23             Console.WriteLine($"Got message: {demo.Message}");
24             Console.ResetColor();
25         }
26     }
27 }

 圖:效果圖

 

 

「世事洞明皆學問 人情練達即文章」

【博主】反骨仔
相關文章
相關標籤/搜索