C#使用RabbitMQ windows配置Erlang環境

1. 說明 

  在企業應用系統領域,會面對不一樣系統之間的通訊、集成與整合,尤爲當面臨異構系統時,這種分佈式的調用與通訊變得愈加重要。其次,系統中通常會有不少對實時性要求不高的可是執行起來比較較耗時的地方,好比發送短信,郵件提醒,更新文章閱讀計數,記錄用戶操做日誌等等,若是實時處理的話,在用戶訪問量比較大的狀況下,對系統壓力比較大。html

面對這些問題,咱們通常會將這些請求,放在消息隊列MQ中處理;異構系統之間使用消息進行通信。node

    MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。web

  MQ是消費-生產者模型的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取或者訂閱隊列中的消息。數據庫

   RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。 windows

  消息傳遞相較文件傳遞與遠程過程調用(RPC)而言,彷佛更勝一籌,由於它具備更好的平臺無關性,並可以很好地支持併發與異步調用。因此若是系統中出現了以下狀況:數組

  • 對操做的實時性要求不高,而須要執行的任務極爲耗時;
  • 存在異構系統間的整合;

  通常的能夠考慮引入消息隊列。對於第一種狀況,經常會選擇消息隊列來處理執行時間較長的任務。引入的消息隊列就成了消息處理的緩衝區。消息隊列引入的異步通訊機制,使得發送方和接收方都不用等待對方返回成功消息,就能夠繼續執行下面的代碼,從而提升了數據處理的能力。尤爲是當訪問量和數據流量較大的狀況下,就能夠結合消息隊列與後臺任務,經過避開高峯期對大數據進行處理,就能夠有效下降數據庫處理數據的負荷。 瀏覽器

  本文簡單介紹在RabbitMQ這一消息代理工具,以及在.NET中如何使用RabbitMQ.緩存

2. 搭建環境

  2.1 安裝Erlang語言運行環境服務器

  因爲RabbitMQ使用Erlang語言編寫,因此先安裝Erlang語言運行環境。具體移步博客:windows配置Erlang環境cookie

  2.2 安裝RabbitMQ服務端

  地址 http://www.rabbitmq.com/

  下載安裝。

  使RabbitMQ以Windows Service的方式在後臺運行:打開cmd切換到sbin目錄下執行

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

   如今RabbitMQ的服務端已經啓動起來了。

  要查看和控制RabbitMQ服務端的狀態,能夠用rabbitmqctl這個腳本。

  好比查看狀態:

rabbitmqctl status

  

  假如顯示node沒有鏈接上,須要到C:\Windows目錄下,將.erlang.cookie文件,拷貝到用戶目錄下 C:\Users\{用戶名},這是Erlang的Cookie文件,容許與Erlang進行交互。

   使用命令查看用戶:

rabbitmqctl list_users

  RabbitMQ會爲咱們建立默認的用戶名guest和密碼guest,guest默認擁有RabbitMQ的全部權限。

  通常的,咱們須要新建一個咱們本身的用戶,設置密碼,並授予權限,並將其設置爲管理員,可使用下面的命令來執行這一操做:

rabbitmqctl  add_user  JC JayChou   //建立用戶JC密碼爲JayChou
rabbitmqctl  set_permissions  JC ".*"  ".*"  ".*"    //賦予JC讀寫全部消息隊列的權限
rabbitmqctl  set_user_tags JC administrator    //分配用戶組

  修改JC密碼爲123:

rabbitmqctl change_password JC  123

  刪除用戶JC:

rabbitmqctl delete_user  JC

  也能夠開啓rabbitmq_management插件,在web界面查看和管理RabbitMQ服務

rabbitmq-plugins enable rabbitmq_management  

 

  2.3下載RabbitMQ的Client端dll

  下載地址:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

  本人下載了這個 rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip

   解壓,咱們須要的是這個文件,之後會引用到vs的項目中:

3.使用

  3.1在使用RabitMQ以前,先對幾個概念作一下說明

  

  RabbitMQ是一個消息代理。他從消息生產者(producers)那裏接收消息,而後把消息送給消息消費者(consumer)在發送和接受之間,他可以根據設置的規則進行路由,緩存和持久化。

  通常提到RabbitMQ和消息,都用到一些專有名詞。

  • 生產(Producing)意思就是發送。發送消息的程序就是一個生產者(producer)。咱們通常用"P"來表示:

       producer

  • 隊列(queue)就是郵箱的名稱。消息經過你的應用程序和RabbitMQ進行傳輸,它們只能存儲在隊列(queue)中。 隊列(queue)容量沒有限制,你要存儲多少消息均可以——基本上是一個無限的緩衝區。多個生產者(producers)可以把消息發送給同一個隊列,一樣,多個消費者(consumers)也能從同一個隊列(queue)中獲取數據。隊列能夠畫成這樣(圖上是隊列的名稱):

     queue

  • 消費(Consuming)和獲取消息是同樣的意思。一個消費者(consumer)就是一個等待獲取消息的程序。咱們把它畫做"C":

     consumer

  一般,消息生產者,消息消費者和消息代理不在同一臺機器上。

3.2 Hello Word

  下面來展現簡單的RabbitMQ的使用:


      rabbitmq hello world

 3.2.1 首先建立名爲ProjectSend的控制檯項目,須要引用RabbitMQ.Client.dll。這個程序做爲Producer生產者,用來發送數據:

複製代碼
static void Main(string[] args)
    {
        var factory = new ConnectionFactory();
        factory.HostName = "localhost";//RabbitMQ服務在本地運行
        factory.UserName = "guest";//用戶名
        factory.Password = "guest";//密碼

        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("hello", false, false, false, null);//建立一個名稱爲hello的消息隊列
                string message = "Hello World"; //傳遞的消息內容
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("", "hello", null, body); //開始傳遞
                Console.WriteLine("已發送: {0}", message);
          Console.ReadLine(); } } }
複製代碼

  

  首先,須要建立一個ConnectionFactory,設置目標,因爲是在本機,因此設置爲localhost,若是RabbitMQ不在本機,只須要設置目標機器的IP地址或者機器名稱便可,而後設置前面建立的用戶名和密碼。

  緊接着要建立一個Channel,若是要發送消息,須要建立一個隊列,而後將消息發佈到這個隊列中。在建立隊列的時候,只有RabbitMQ上該隊列不存在,纔會去建立。消息是以二進制數組的形式傳輸的,因此若是消息是實體對象的話,須要序列化和而後轉化爲二進制數組。

  如今客戶端發送代碼已經寫好了,運行以後,消息會發布到RabbitMQ的消息隊列中,如今須要編寫服務端的代碼鏈接到RabbitMQ上去獲取這些消息。

3.2.2建立名爲ProjectReceive的控制檯項目,引用RabbitMQ.Client.dll。做爲Consumer消費者,用來接收數據:

複製代碼
static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("hello", false, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume("hello", false, consumer);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body); 
                        Console.WriteLine("已接收: {0}", message);   
                    };
                    Console.ReadLine(); 
                }
            }
        }
複製代碼

   和發送同樣,首先須要定義鏈接,而後聲明消息隊列。要接收消息,須要定義一個Consume,而後在接收消息的事件中處理數據。

 3.2.3 如今發送和接收的客戶端都寫好了,讓咱們編譯執行起來

  發送消息:

  如今,名爲hello的消息隊列中,發送了一條消息。這條消息存儲到了RabbitMQ的服務器上了。使用rabbitmqctl 的list_queues能夠查看全部的消息隊列,以及裏面的消息個數,能夠看到,目前Rabbitmq上只有一個消息隊列,裏面只有一條消息:

  也能夠在web管理界面查看此queue的相關信息:

 

 

  接收消息:

   既然消息已經被接收了,那咱們再來看queue的內容:

  可見,消息中的內容在接收以後已被刪除了。

3.3 工做隊列

  前面的例子展現瞭如何在指定的消息隊列發送和接收消息。

  如今咱們建立一個工做隊列(work queue)來將一些耗時的任務分發給多個工做者(workers):

   rabbitmq-work queue

  工做隊列(work queues, 又稱任務隊列Task Queues)的主要思想是爲了不當即執行並等待一些佔用大量資源、時間的操做完成。而是把任務(Task)看成消息發送到隊列中,稍後處理。一個運行在後臺的工做者(worker)進程就會取出任務而後處理。當運行多個工做者(workers)時,任務會在它們之間共享。

  這個在網絡應用中很是有用,它能夠在短暫的HTTP請求中處理一些複雜的任務。在一些實時性要求不過高的地方,咱們能夠處理完主要操做以後,以消息的方式來處理其餘的不緊要的操做,好比寫日誌等等。

準備

  在第一部分,發送了一個包含「Hello World!」的字符串消息。如今發送一些字符串,把這些字符串看成複雜的任務。這裏使用time.sleep()函數來模擬耗時的任務。在字符串中加上點號(.)來表示任務的複雜程度,一個點(.)將會耗時1秒鐘。好比"Hello..."就會耗時3秒鐘。

對以前示例的send.cs作些簡單的調整,以即可以發送隨意的消息。這個程序會按照計劃發送任務到咱們的工做隊列中。

複製代碼
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2;

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "hello", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
複製代碼

 

接着咱們修改接收端,讓他根據消息中的逗點的個數來Sleep對應的秒數:

複製代碼
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("hello", true, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);
                        
                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");
            }
        }
    }
}
複製代碼

 

輪詢分發

  使用工做隊列的一個好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了,擴展很簡單。

如今,咱們先啓動兩個接收端,等待接受消息,而後啓動一個發送端開始發送消息。

Send message queue 

  在cmd條件下,發送了5條消息,每條消息後面的逗點表示該消息須要執行的時長,來模擬耗時的操做。

  而後能夠看到,兩個接收端依次接收到了發出的消息:

receive message queue 

默認,RabbitMQ會將每一個消息按照順序依次分發給下一個消費者。因此每一個消費者接收到的消息個數大體是平均的。 這種消息分發的方式稱之爲輪詢(round-robin)。

3.4 消息響應

當處理一個比較耗時得任務的時候,也許想知道消費者(consumers)是否運行到一半就掛掉。在當前的代碼中,當RabbitMQ將消息發送給消費者(consumers)以後,立刻就會將該消息從隊列中移除。此時,若是把處理這個消息的工做者(worker)停掉,正在處理的這條消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。

咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願該消息會從新發送給其餘的工做者(worker)。

爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)機制。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ纔會釋放並刪除這條消息。

若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,即便工做者(workers)偶爾的掛掉,也不會丟失消息。

消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。

消息響應默認是開啓的。在以前的例子中使用了no_ack=True標識把它關閉。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。

複製代碼
channel.BasicConsume("hello", false, consumer);

while (true)
{
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine("Received {0}", message);
    Console.WriteLine("Done");

    channel.BasicAck(ea.DeliveryTag, false);
}
複製代碼

 

如今,能夠保證,即便正在處理消息的工做者被停掉,這些消息也不會丟失,全部沒有被應答的消息會被從新發送給其餘工做者.

一個很常見的錯誤就是忘掉了BasicAck這個方法,這個錯誤很常見,可是後果很嚴重. 當客戶端退出時,待處理的消息就會被從新分發,可是RabitMQ會消耗愈來愈多的內存,由於這些沒有被應答的消息不可以被釋放。調試這種case,可使用rabbitmqct打印messages_unacknoledged字段。

rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

 

3.5 消息持久化

前面已經搞定了即便消費者down掉,任務也不會丟失,可是,若是RabbitMQ Server停掉了,那麼這些消息仍是會丟失。

當RabbitMQ Server 關閉或者崩潰,那麼裏面存儲的隊列和消息默認是不會保存下來的。若是要讓RabbitMQ保存住消息,須要在兩個地方同時設置:須要保證隊列和消息都是持久化的。

首先,要保證RabbitMQ不會丟失隊列,因此要作以下設置:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

 

雖然在語法上是正確的,可是在目前階段是不正確的,由於咱們以前已經定義了一個非持久化的hello隊列。RabbitMQ不容許咱們使用不一樣的參數從新定義一個已經存在的同名隊列,若是這樣作就會報錯。如今,定義另一個不一樣名稱的隊列:

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

 

queueDeclare 這個改動須要在發送端和接收端同時設置。

如今保證了task_queue這個消息隊列即便在RabbitMQ Server重啓以後,隊列也不會丟失。 而後須要保證消息也是持久化的, 這能夠經過設置IBasicProperties.SetPersistent 爲true來實現:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

 

須要注意的是,將消息設置爲持久化並不能徹底保證消息不丟失。雖然他告訴RabbitMQ將消息保存到磁盤上,可是在RabbitMQ接收到消息和將其保存到磁盤上這之間仍然有一個小的時間窗口。 RabbitMQ 可能只是將消息保存到了緩存中,並無將其寫入到磁盤上。持久化是不可以必定保證的,可是對於一個簡單任務隊列來講已經足夠。若是須要消息隊列持久化的強保證,可使用publisher confirms

3.6 公平分發

你可能會注意到,消息的分發可能並無如咱們想要的那樣公平分配。好比,對於兩個工做者。當奇數個消息的任務比較重,可是偶數個消息任務比較輕時,奇數個工做者始終處理忙碌狀態,而偶數個工做者始終處理空閒狀態。可是RabbitMQ並不知道這些,他仍然會平均依次的分發消息。

爲了改變這一狀態,咱們可使用basicQos方法,設置perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工做者發送多於1個的消息,或者換句話說。在一個工做者還在處理消息,而且沒有響應消息以前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那麼忙碌的工做者。

channel.BasicQos(0, 1, false); 

 

3.7 完整實例

如今將全部這些放在一塊兒:

發送端代碼以下:

複製代碼
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
                   
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
                    
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.SetPersistent(true);
                  

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "task_queue", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
複製代碼

 

接收端代碼以下:

複製代碼
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
            channel.BasicQos(0, 1, false);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("task_queue", false, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");

                channel.BasicAck(ea.DeliveryTag, false);
            }
        }
    }
}
複製代碼

 

4 管理界面

RabbitMQ管理界面,經過該界面能夠查看RabbitMQ Server 當前的狀態,該界面是以插件形式提供的,而且在安裝RabbitMQ的時候已經自帶了該插件。須要作的是在RabbitMQ控制檯界面中啓用該插件,命令以下:

rabbitmq-plugins enable rabbitmq_management

rabbitmq management

如今,在瀏覽器中輸入 http://server-name:15672/ server-name換成機器地址或者域名,若是是本地的,直接用localhost(RabbitMQ 3.0以前版本端口號爲55672)在輸入以後,彈出登陸界面,使用咱們以前建立的用戶登陸。

RabbitMQ Web management .

在該界面上能夠看到當前RabbitMQServer的全部狀態。

5 總結

本文簡單介紹了消息隊列的相關概念,並介紹了RabbitMQ消息代理的基本原理以及在Windows 上如何安裝RabbitMQ和在.NET中如何使用RabbitMQ。消息隊列在構建分佈式系統和提升系統的可擴展性和響應性方面有着很重要的做用,但願本文對您瞭解消息隊列以及如何使用RabbitMQ有所幫助。

相關文章
相關標籤/搜索