C#隊列學習筆記:RabbitMQ安裝及使用

原文: C#隊列學習筆記:RabbitMQ安裝及使用

    1、環境搭建

    1.一、因爲RabbitMQ是使用Erlang語言開發的,所以要安裝Erlang運行時環境,下載地址:Erlang官網下載  CSDN分享下載html

    1.二、去RabbitMQ官網下載RabbitMQ Server服務端程序,選擇合適的平臺版本下載並安裝。數組

    RabbitMQ安裝時,會自動在Windows服務中建立RabbitMQ服務,並自動啓動。瀏覽器

    1.三、開始->全部程序->RabbitMQ Server->RabbitMQ Command Prompt (sbin dir):緩存

    運行RabbitMQ Command Prompt與cmd下cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin的效果是同樣的。分佈式

    1.3.一、sbin目錄下的rabbitmqctl.bat,是用來查看和控制服務端狀態的。運行rabbitmqctl status檢查RabbitMQ狀態:ide

    1.3.三、RabbitMQ Server上面也有用戶概念,使用rabbitmqctl list_users命令,能夠看到目前的用戶:學習

    能夠看到,如今只有一個名爲gues角色爲administratort的用戶,這個是RabbitMQ默認爲咱們建立的,它有RabbitMQ的全部權限。通常狀況下,咱們須要新建一個本身的用戶,並設置密碼及授予權限,同時設置爲管理員。操做方法以下:fetch

rabbitmqctl add_user hello world rabbitmqctl set_permissions hello ".*" ".*" ".*" rabbitmqctl set_user_tags hello administrator

    上面的第一命令添加了一個名爲hello的用戶並設置了密碼world;第二條命令爲用戶hello分別授予對全部消息隊列的配置、讀和寫的權限;第三條命令將用戶hello設置爲管理員。ui

    如今咱們能夠將默認的guest用戶刪掉,使用下面的命令便可:atom

rabbitmqctl delete_user guest

    若是要修改密碼,可使用下面的命令:

rabbitmqctl change_password {username} {newpassowrd}

   2、管理界面

    RabbitMQ還有一個管理界面,是以插件形式提供的,經過該界面能夠查看RabbitMQ Server當前的狀態。啓用命令以下: 

rabbitmq-plugins enable rabbitmq_management

    如今,在瀏覽器中輸入 http://server-name:15672/ 便可。

    注:server-name爲計算機名或IP地址,若是是本地的,直接用localhost便可。登陸界面,使用咱們以前建立的hello用戶登陸。

    3、開始使用

    在.NET中使用RabbitMQ須要下載RabbitMQ客戶端程序集,下載解壓後在bin下找到RabbitMQ.Client.dll,並添加引用到項目中。

    3.一、Hello World

    爲了展現RabbitMQ的基本使用,咱們發送一個HelloWorld消息,而後接收並處理。

rabbitmq hello world

    3.1.一、建立一個名爲Send的客戶端控制檯程序,用來將消息發送到RabbitMQ消息隊列中,代碼以下:

class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.創建信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.構建byte消息數據包
                    string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message); //消息是以二進制數組的形式傳輸的 //6.發送數據包
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
Send.cs

    3.1.二、建立一個名爲Receive的服務端控制檯程序,用來接收RabbitMQ消息隊列中的消息,代碼以下:

class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.構造消費者實例
                    var consumer = new EventingBasicConsumer(channel); //6.綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Received {message}"); }; //7.啓動消費者
                    channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    3.1.三、先運行消息接收端,再運行消息發送端,結果以下:

    從上面的代碼中能夠看出,發送端和接收端的代碼前4步都是同樣的。主要的區別在於發送端調用channel.BasicPublish方法發送消息,而接收端須要實例化一個EventingBasicConsumer實例來進行消息處理。另一點須要注意的是:消息接收端和發送端的隊列名稱(queue)必須保持一致,這裏指定的隊列名稱爲hello。

    3.二、工做隊列

    工做隊列(work queues,又稱Task Queues)的主要思想是:爲了不當即執行一些實時性要求不高可是比較耗資源或時間的操做(如寫日誌),把任務看成消息發送到隊列中,由一個運行在後臺的工做者(worker)進程取出並處理。當有多個工做者(workers)運行時,任務會在它們之間共享。

    如今發送一些字符串來模擬耗時的任務,在字符串中加上點號(.)來表示任務的複雜程度。一個點號將會耗時1秒鐘,好比"Hello World..."就會耗時3秒鐘。

class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.創建信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.構建byte消息數據包
                    string message = args.Length > 0 ? string.Join(" ", args) : "Hello World..."; var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//設置消息是否持久化 1:非持久化 2:持久化
                    var body = Encoding.UTF8.GetBytes(message);//消息是以二進制數組的形式傳輸的 //6.發送數據包
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.構造消費者實例
                    var consumer = new EventingBasicConsumer(channel); //6.綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); }; //7.啓動消費者
                    channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    3.3輪詢分發

    使用工做隊列的一個好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了,擴展很簡單。

    如今,咱們先啓動兩個接收端,等待接受消息,而後啓動一個發送端開始發送消息(cmd->send.exe所在的目錄)。

    上面發了10條信息,兩個接收端各收到5條信息。

    默認狀況下,RabbitMQ會將每一個消息按照順序依次分發給下一個消費者,因此每一個消費者接收到的消息個數大體是平均的。 這種消息分發的方式稱之爲輪詢(round-robin)。

    3.四、消息響應

    當處理一個比較耗時得任務的時候,也許想知道消費者(consumers)是否運行到一半就掛掉。在當前的代碼中,當RabbitMQ將消息發送給消費者以後,立刻就會將該消息從隊列中移除。此時,若是把處理這個消息的工做者(worker)停掉,正在處理的這條消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。

    咱們不想丟失任何任務消息,若是一個工做者掛掉了,咱們但願該消息可以從新發送給其它的工做者。

    爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)機制。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ纔會釋放並刪除這條消息。若是消費者掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其它消費者。這樣,即便工做者偶爾的掛掉,也不會丟失消息。

    消息是沒有超時這個概念的。當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。

    消息響應默認是開啓的。在以前的例子中使用了no_Ack=true標識把它關閉。是時候移除這個標識了,當工做者完成了任務,就會發送一個響應。

    下面修改Receive.cs,主要改動的是:將 autoAck:true修改成autoAck:fasle,以及在消息處理完畢後手動調用BasicAck方法進行手動消息確認。

class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5.構造消費者實例
                    var consumer = new EventingBasicConsumer(channel); //6.綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //7.發送消息確認信號(手動消息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //8.啓動消費者(noAck: false 啓用消息響應)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    一個很常見的錯誤就是忘掉了BasicAck這個方法,這個錯誤很常見,可是後果很嚴重。當客戶端退出時,待處理的消息就會被從新分發,可是RabitMQ會消耗愈來愈多的內存,由於這些沒有被應答的消息不可以被釋放。調試這種case,可使用rabbitmqctl打印messages_unacknowledged字段。

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin>rabbitmqctl list_queues name messages_ready messages_unacknowledged Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages_ready messages_unacknowledged hello 1 0

    3.五、消息持久化

    消息確認確保了即便消費端異常,消息也不會丟失可以被從新分發處理。可是若是RabbitMQ服務端異常,消息依然會丟失。除非咱們指定durable:true,不然當RabbitMQ退出或崩潰時,消息將依然會丟失。經過指定durable:true(隊列),並指定Persistent=true(消息),來告知RabbitMQ將消息持久化。一句話歸納:須要保證隊列和消息都是持久化的。

class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.創建信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列(指定durable:true,告知rabbitmq對消息進行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.構建byte消息數據包
                    string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message);//消息是以二進制數組的形式傳輸的 //6.發送數據包(指定basicProperties)
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列(指定durable:true,告知rabbitmq對消息進行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.構造消費者實例
                    var consumer = new EventingBasicConsumer(channel); //6.綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //7.發送消息確認信號(手動消息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //8.啓動消費者(noAck: false 啓用消息響應)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    將消息標記爲持久性不能徹底保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,可是當RabbitMQ接受消息而且尚未保存時​​,仍然有一個很短的時間窗口。RabbitMQ可能只是將消息保存到了緩存中,並無將其寫入到磁盤上。持久化不是必定可以保證的,可是對於一個簡單任務隊列來講已經足夠。

    若是須要確保消息隊列的持久化,可使用publisher confirms

    3.六、公平分發

    你可能會注意到,消息的分發可能並無如咱們想要的那樣公平分配。好比,對於兩個工做者。當奇數個消息的任務比較重可是偶數個消息任務比較輕時,奇數個工做者始終處於忙碌狀態,而偶數個工做者始終處於空閒狀態,可是RabbitMQ並不知道這些,它仍然會平均依次地分發消息。

    爲了改變這一狀態,咱們可使用basicQos方法,設置perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工做者發送多於1個的消息。換句話說,在一個工做者還在處理消息而且沒有響應消息以前,不要給它分發新的消息,而是將這條新的消息發送給下一個不那麼忙碌的工做者。

複製代碼
//Receive.cs //4.聲明隊列(指定durable:true,告知rabbitmq對消息進行持久化。) //channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare("hello", true, false, false, null); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
var properties = channel.CreateBasicProperties(); properties.Persistent = true; //設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息。 //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.BasicQos(0, 1, false);
複製代碼

    3.7完整實例

class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.創建信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列(指定durable:true,告知rabbitmq對消息進行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.構建byte消息數據包
                    string message = args.Length > 0 ? args[0] : "Hello World"; var body = Encoding.UTF8.GetBytes(message);//消息是以二進制數組的形式傳輸的 //6.發送數據包(指定basicProperties)
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region Hello World
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列(指定durable:true,告知rabbitmq對消息進行持久化。)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //5.構造消費者實例
                    var consumer = new EventingBasicConsumer(channel); //6.綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //7.發送消息確認信號(手動消息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //8.啓動消費者(noAck: false 啓用消息響應)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    4、Exchange

    上面的示例,生產者和消費者直接是經過相同隊列名稱進行匹配銜接的。消費者訂閱某個隊列,生產者建立消息發佈到隊列中,隊列再將消息轉發到訂閱的消費者。這樣就會有一個侷限性,即消費者一次只能發送消息到某一個隊列。

    那消費者如何才能發送消息到多個消息隊列呢?

    RabbitMQ提供了Exchange,它相似於路由器的功能,對消息進行路由,將消息發送到多個隊列上。Exchange一方面從生產者接收消息,另外一方面將消息推送到隊列。可是Exchange是如何知道將消息附加到哪一個隊列或者直接忽略的呢?這些實際上是由Exchange Type來定義的。關於Exchange的圖文介紹,請看上一篇《C#隊列學習筆記:RabbitMQ基礎知識》,此處僅提供示例代碼。

    4.一、fanout

class Program { static void Main(string[] args) { #region fanout exchange type
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.創建信道
                using (var channel = connection.CreateModel()) { //4.使用fanout exchange type,指定exchange名稱。
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.構建byte消息數據包
                    for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二進制數組的形式傳輸的 //6.發送數據包(指定exchange;fanout類型無需指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region fanout exchange type
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.使用fanout exchange type,指定exchange名稱。
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); //5.聲明隊列(隨機生成隊列名稱)
                    var queueName = channel.QueueDeclare().QueueName; //綁定隊列到指定fanout類型exchange,fanout類型無需指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "fanoutEC", routingKey: ""); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.構造消費者實例
                    var consumer = new EventingBasicConsumer(channel); //7.綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.發送消息確認信號(手動消息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.啓動消費者(noAck: false 啓用消息響應)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    4.二、direct

class Program { static void Main(string[] args) { #region direct exchange type
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.創建信道
                using (var channel = connection.CreateModel()) { //4.使用direct exchange type,指定exchange名稱。
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct"); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.構建byte消息數據包
                    for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二進制數組的形式傳輸的 //6.發送數據包(指定exchange;direct類型必須指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region direct exchange type
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.使用direct exchange type,指定exchange名稱。
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct"); //5.聲明隊列(隨機生成隊列名稱)
                    var queueName = channel.QueueDeclare().QueueName; //綁定隊列到指定direct類型exchange,direct類型必須指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "directEC", routingKey: "green"); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.構造消費者實例
                    var consumer = new EventingBasicConsumer(channel); //7.綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.發送消息確認信號(手動消息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.啓動消費者(noAck: false 啓用消息響應)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    4.三、topic

class Program { static void Main(string[] args) { #region topic exchange type
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.創建信道
                using (var channel = connection.CreateModel()) { //4.使用topic exchange type,指定exchange名稱。
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.構建byte消息數據包
                    for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二進制數組的形式傳輸的 //6.發送數據包(指定exchange;topic類型必須指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region topic exchange type
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.使用topic exchange type,指定exchange名稱。
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); //5.聲明隊列(隨機生成隊列名稱)
                    var queueName = channel.QueueDeclare().QueueName; //綁定隊列到指定topic類型exchange,topic類型必須指定routingKey。
                    channel.QueueBind(queue: queueName, exchange: "topicEC", routingKey: "#.*.fast"); //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
                    var properties = channel.CreateBasicProperties(); properties.Persistent = true; //設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息。
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.構造消費者實例
                    var consumer = new EventingBasicConsumer(channel); //7.綁定消息接收後的事件委託
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.發送消息確認信號(手動消息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.啓動消費者(noAck: false 啓用消息響應)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
Receive.cs

    5、RPC

    RPC--Remote Procedure Call,遠程過程調用。RabbitMQ是如何進行遠程調用的呢?示意圖以下:

    第一步:主要是進行遠程調用的客戶端須要指定接收遠程回調的隊列,並聲明消費者監聽此隊列。

    第二步:遠程調用的服務端除了要聲明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽結果的隊列中去。

class Program { static void Main(string[] args) { #region rpc
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.創建信道
                using (var channel = connection.CreateModel()) { //4.聲明惟一guid用來標識這次發送的遠程調用請求
                    var correlationId = Guid.NewGuid().ToString(); //5.聲明須要監聽的回調隊列
                    var replyQueue = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties(); properties.Persistent = true;//將消息標記爲持久性
                    properties.ReplyTo = replyQueue;//指定回調隊列
                    properties.CorrelationId = correlationId;//指定消息惟一標識 //6.構建byte消息數據包
                    string number = args.Length > 0 ? args[0] : "30"; var body = Encoding.UTF8.GetBytes(number); //7.發送數據包
                    channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body); Console.WriteLine($"Request fib({number})"); //8.建立消費者用於處理消息回調(遠程調用返回結果)
                    var callbackConsumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: replyQueue, noAck: false, consumer: callbackConsumer); callbackConsumer.Received += (model, ea) => { //僅當消息回調的ID與發送的ID一致時,說明遠程調用結果正確返回。
                        if (ea.BasicProperties.CorrelationId == correlationId) { var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}"; Console.WriteLine($"{responseMsg}"); } }; Console.Read(); } } #endregion } }
Send.cs
class Program { static void Main(string[] args) { #region rpc
            //1.實例化鏈接工廠
            var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.創建鏈接
            using (var connection = factory.CreateConnection()) { //3.建立信道
                using (var channel = connection.CreateModel()) { //4.聲明隊列接收遠程調用請求
                    channel.QueueDeclare(queue: "rpc_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine("Waiting for message."); //5.請求處理邏輯
                    consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int n = int.Parse(message); Console.WriteLine($"Receive request of Fib({n})"); int result = Fib(n); //6.從請求的參數中獲取請求的惟一標識,在消息回傳時一樣綁定。
                        var properties = ea.BasicProperties; var replyProerties = channel.CreateBasicProperties(); replyProerties.CorrelationId = properties.CorrelationId; //7.將遠程調用結果發送到客戶端監聽的隊列上
                        channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo, basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString())); //8.手動發回消息確認
                        channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Return result: Fib({n})= {result}"); }; channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer); Console.Read(); } } int Fib (int n) { if (n <= 2) return 1; else
                    return Fib(n - 1) + Fib(n - 2); } #endregion } }
Receive.cs

    6、總結

    本文介紹了RabbitMQ消息代理在Windows上的安裝以及在.NET中的使用。消息隊列在構建分佈式系統、提升系統的可擴展性及響應性方面,有着很重要的做用。

 

    參考自:

    https://www.cnblogs.com/yangecnu/p/Introduce-RabbitMQ.html#!comments

    http://www.javashuo.com/article/p-ntcqqvey-hs.html

相關文章
相關標籤/搜索