.NET 環境中使用RabbitMQ RabbitMQ與Redis隊列對比 RabbitMQ入門與使用篇

.NET 環境中使用RabbitMQ

 

在企業應用系統領域,會面對不一樣系統之間的通訊、集成與整合,尤爲當面臨異構系統時,這種分佈式的調用與通訊變得愈加重要。其次,系統中通常會有不少對實時性要求不高的可是執行起來比較較耗時的地方,好比發送短信,郵件提醒,更新文章閱讀計數,記錄用戶操做日誌等等,若是實時處理的話,在用戶訪問量比較大的狀況下,對系統壓力比較大。html

面對這些問題,咱們通常會將這些請求,放在消息隊列中處理;異構系統之間使用消息進行通信。消息傳遞相較文件傳遞與遠程過程調用(RPC)而言,彷佛更勝一籌,由於它具備更好的平臺無關性,並可以很好地支持併發與異步調用。因此若是系統中出現了以下狀況:node

  • 對操做的實時性要求不高,而須要執行的任務極爲耗時;
  • 存在異構系統間的整合;

通常的能夠考慮引入消息隊列。對於第一種狀況,經常會選擇消息隊列來處理執行時間較長的任務。引入的消息隊列就成了消息處理的緩衝區。消息隊列引入的異步通訊機制,使得發送方和接收方都不用等待對方返回成功消息,就能夠繼續執行下面的代碼,從而提升了數據處理的能力。尤爲是當訪問量和數據流量較大的狀況下,就能夠結合消息隊列與後臺任務,經過避開高峯期對大數據進行處理,就能夠有效下降數據庫處理數據的負荷。git

在前面的一篇講解CQRS模式的文章中,全部的對系統的狀態的更改都是經過事件來完成,通常的將事件存儲到消息隊列中,而後進行統一的處理。github

本文簡單介紹在RabbitMQ這一消息代理工具,以及在.NET中如何使用RabbitMQ.數據庫

一 環境搭建

首先,因爲RabbitMQ使用Erlang編寫的,須要運行在Erlang運行時環境上,因此在安裝RabbitMQ Server以前須要安裝Erlang 運行時環境,能夠到Erlang官網下載對應平臺的安裝文件。若是沒有安裝運行時環境,安裝RabbitMQ Server的時候,會提示須要先安裝Erlang環境。 安裝完成以後,確保已經將Erlang的安裝路徑註冊到系統的環境變量中。安裝完Erlang以後,這個環境會自動設置,若是沒有,在administrator環境下在控制檯下面輸入,也能夠設置:數組

Setx  ERLANG_HOME 「D:\Program Files (x86)\erl6.3″

Erlang Enviroment Path

而後,去RabbitMQ官網下載RabbitMQ Server服務端程序,選擇合適的平臺版本下載。安裝完成以後,就能夠開始使用了。瀏覽器

如今就能夠對RabbitMQ Server進行配置了。緩存

首先,切換到RabbitMQ Server的安裝目錄:bash

RabbitMQ Install folder

在sbin下面有不少batch文件,用來控制RabbitMQ Server,固然您也能夠直接在安裝開始菜單中來執行相應的操做:服務器

RabitMQ Menu

最簡單的方式是使RabbitMQ以Windows Service的方式在後臺運行,因此咱們須要以管理員權限打開cmd,而後切換到sbin目錄下,執行這三條命令便可:

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

Start RabbitMQ Service

如今RabbitMQ的服務端已經啓動起來了。

下面可使用sbin目錄下面的rabbitmqctl.bat這個腳原本查看和控制服務端狀態的,在cmd中直接運行rabbitmqctl status。若是看到如下結果:

Unable to connect node

顯示node沒有鏈接上,須要到C:\Windows目錄下,將.erlang.cookie文件,拷貝到用戶目錄下 C:\Users\{用戶名},這是Erlang的Cookie文件,容許與Erlang進行交互,如今重複運行剛纔的命令就會獲得以下信息:

rabbit mq status

RabbitMQ Server上面也有用戶概念,安裝好以後,使用rabbitmqctl list_users命令,能夠看到上面目前的用戶:

RabbitMQ users

能夠看到,如今只有一個角色爲administrator的名爲guest的用戶,這個是RabbitMQ默認爲咱們建立的,他有RabbitMQ的全部權限,通常的,咱們須要新建一個咱們本身的用戶,設置密碼,並授予權限,並將其設置爲管理員,可使用下面的命令來執行這一操做:

rabbitmqctl  add_user  yy  hello!
rabbitmqctl  set_permissions  yy  ".*"  ".*"  ".*"
rabbitmqctl  set_user_tags yy administrator

Create RabbitMQ user

上面的一條命令添加了一個名爲yy的用戶,並設置了密碼hello!,下面的命令爲用戶yy分別授予對全部消息隊列的配置、讀和寫的權限。

如今咱們能夠將默認的guest用戶刪掉,使用下面的命令便可:

rabbitmqctl delete_user guest

若是要修改密碼,可使用下面的命令:

rabbitmqctl change_password {username}  {newpassowrd}

二 開始使用

在.NET中使用RabbitMQ須要下載RabbitMQ的客戶端程序集,能夠到官網下載,下載解壓後就能夠獲得RabbitMQ.Client.dll,這就是RabbitMQ的客戶端。

在使用RabitMQ以前,須要對下面的幾個基本概念說明一下:

RabbitMQ是一個消息代理。他從消息生產者(producers)那裏接收消息,而後把消息送給消息消費者(consumer)在發送和接受之間,他可以根據設置的規則進行路由,緩存和持久化。

通常提到RabbitMQ和消息,都用到一些專有名詞。

  • 生產(Producing)意思就是發送。發送消息的程序就是一個生產者(producer)。咱們通常用"P"來表示:

producer

  • 隊列(queue)就是郵箱的名稱。消息經過你的應用程序和RabbitMQ進行傳輸,它們只能存儲在隊列(queue)中。 隊列(queue)容量沒有限制,你要存儲多少消息均可以——基本上是一個無限的緩衝區。多個生產者(producers)可以把消息發送給同一個隊列,一樣,多個消費者(consumers)也能從同一個隊列(queue)中獲取數據。隊列能夠畫成這樣(圖上是隊列的名稱):

queue

  • 消費(Consuming)和獲取消息是同樣的意思。一個消費者(consumer)就是一個等待獲取消息的程序。咱們把它畫做"C":

consumer

一般,消息生產者,消息消費者和消息代理不在同一臺機器上。

2.1 Hello World

爲了展現RabbitMQ的基本使用,咱們發送一個HelloWorld消息,而後接收並處理。

rabbitmq hello world

首先建立一個控制檯程序,用來將消息發送到RabbitMQ的消息隊列中,代碼以下:

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory();
        factory.HostName = "localhost";
        factory.UserName = "yy";
        factory.Password = "hello!";

        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("hello", false, false, false, null);
                string message = "Hello World";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("", "hello", null, body);
                Console.WriteLine(" set {0}", message);
            }
        }
    }

首先,須要建立一個ConnectionFactory,設置目標,因爲是在本機,因此設置爲localhost,若是RabbitMQ不在本機,只須要設置目標機器的IP地址或者機器名稱便可,而後設置前面建立的用戶名yy和密碼hello!。

緊接着要建立一個Channel,若是要發送消息,須要建立一個隊列,而後將消息發佈到這個隊列中。在建立隊列的時候,只有RabbitMQ上該隊列不存在,纔會去建立。消息是以二進制數組的形式傳輸的,因此若是消息是實體對象的話,須要序列化和而後轉化爲二進制數組。

如今客戶端發送代碼已經寫好了,運行以後,消息會發布到RabbitMQ的消息隊列中,如今須要編寫服務端的代碼鏈接到RabbitMQ上去獲取這些消息。

一樣,建立一個名爲Receive的服務端控制檯應用程序,服務端代碼以下:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("hello", true, consumer);

            Console.WriteLine(" waiting for message.");
            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received {0}", message);

            }
        }
    }
}

和發送同樣,首先須要定義鏈接,而後聲明消息隊列。要接收消息,須要定義一個Consume,而後從消息隊列中不斷Dequeue消息,而後處理。

如今發送端和接收端的代碼都寫好了,運行發送端,發送消息:

SendHelloword

如今,名爲hello的消息隊列中,發送了一條消息。這條消息存儲到了RabbitMQ的服務器上了。使用rabbitmqctl 的list_queues能夠查看全部的消息隊列,以及裏面的消息個數,能夠看到,目前Rabbitmq上只有一個消息隊列,裏面只有一條消息:

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.2\sbin>rabbitmqctl list_queues
Listing queues ...
hello   1

如今運行接收端程序,以下:

ReceiveHelloWorld

能夠看到,已經接受到了客戶端發送的Hello World,如今再來看RabitMQ上的消息隊列信息:

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.2\sbin>rabbitmqctl list_queues
Listing queues ...
hello   0

能夠看到,hello這個隊列中的消息隊列個數爲0,這表示,當接收端,接收到消息以後,RabbitMQ上就把這個消息刪掉了。

2.2 工做隊列

前面的例子展現瞭如何往一個指定的消息隊列中發送和收取消息。如今咱們建立一個工做隊列(work queue)來將一些耗時的任務分發給多個工做者(workers):

rabbitmq-work queue

工做隊列(work queues, 又稱任務隊列Task Queues)的主要思想是爲了不當即執行並等待一些佔用大量資源、時間的操做完成。而是把任務(Task)看成消息發送到隊列中,稍後處理。一個運行在後臺的工做者(worker)進程就會取出任務而後處理。當運行多個工做者(workers)時,任務會在它們之間共享。

這個在網絡應用中很是有用,它能夠在短暫的HTTP請求中處理一些複雜的任務。在一些實時性要求不過高的地方,咱們能夠處理完主要操做以後,以消息的方式來處理其餘的不緊要的操做,好比寫日誌等等。

準備

在第一部分,發送了一個包含「Hello World!」的字符串消息。如今發送一些字符串,把這些字符串看成複雜的任務。這裏使用time.sleep()函數來模擬耗時的任務。在字符串中加上點號(.)來表示任務的複雜程度,一個點(.)將會耗時1秒鐘。好比"Hello..."就會耗時3秒鐘。

對以前示例的send.cs作些簡單的調整,以即可以發送隨意的消息。這個程序會按照計劃發送任務到咱們的工做隊列中。

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2;

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "hello", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

加粗部分是通過修改過了的。

接着咱們修改接收端,讓他根據消息中的逗點的個數來Sleep對應的秒數:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("hello", true, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);
                        
                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");
            }
        }
    }
}

輪詢分發

使用工做隊列的一個好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了,擴展很簡單。

如今,咱們先啓動兩個接收端,等待接受消息,而後啓動一個發送端開始發送消息。

Send message queue 

在cmd條件下,發送了5條消息,每條消息後面的逗點表示該消息須要執行的時長,來模擬耗時的操做。

而後能夠看到,兩個接收端依次接收到了發出的消息:

receive message queue 

默認,RabbitMQ會將每一個消息按照順序依次分發給下一個消費者。因此每一個消費者接收到的消息個數大體是平均的。 這種消息分發的方式稱之爲輪詢(round-robin)。

2.3 消息響應

當處理一個比較耗時得任務的時候,也許想知道消費者(consumers)是否運行到一半就掛掉。在當前的代碼中,當RabbitMQ將消息發送給消費者(consumers)以後,立刻就會將該消息從隊列中移除。此時,若是把處理這個消息的工做者(worker)停掉,正在處理的這條消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。

咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願該消息會從新發送給其餘的工做者(worker)。

爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)機制。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ纔會釋放並刪除這條消息。

若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,即便工做者(workers)偶爾的掛掉,也不會丟失消息。

消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。

消息響應默認是開啓的。在以前的例子中使用了no_ack=True標識把它關閉。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。

channel.BasicConsume("hello", false, consumer);

while (true)
{
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine("Received {0}", message);
    Console.WriteLine("Done");

    channel.BasicAck(ea.DeliveryTag, false);
}

如今,能夠保證,即便正在處理消息的工做者被停掉,這些消息也不會丟失,全部沒有被應答的消息會被從新發送給其餘工做者.

一個很常見的錯誤就是忘掉了BasicAck這個方法,這個錯誤很常見,可是後果很嚴重. 當客戶端退出時,待處理的消息就會被從新分發,可是RabitMQ會消耗愈來愈多的內存,由於這些沒有被應答的消息不可以被釋放。調試這種case,可使用rabbitmqct打印messages_unacknoledged字段。

rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

2.4 消息持久化

前面已經搞定了即便消費者down掉,任務也不會丟失,可是,若是RabbitMQ Server停掉了,那麼這些消息仍是會丟失。

當RabbitMQ Server 關閉或者崩潰,那麼裏面存儲的隊列和消息默認是不會保存下來的。若是要讓RabbitMQ保存住消息,須要在兩個地方同時設置:須要保證隊列和消息都是持久化的。

首先,要保證RabbitMQ不會丟失隊列,因此要作以下設置:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

雖然在語法上是正確的,可是在目前階段是不正確的,由於咱們以前已經定義了一個非持久化的hello隊列。RabbitMQ不容許咱們使用不一樣的參數從新定義一個已經存在的同名隊列,若是這樣作就會報錯。如今,定義另一個不一樣名稱的隊列:

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclare 這個改動須要在發送端和接收端同時設置。

如今保證了task_queue這個消息隊列即便在RabbitMQ Server重啓以後,隊列也不會丟失。 而後須要保證消息也是持久化的, 這能夠經過設置IBasicProperties.SetPersistent 爲true來實現:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

須要注意的是,將消息設置爲持久化並不能徹底保證消息不丟失。雖然他告訴RabbitMQ將消息保存到磁盤上,可是在RabbitMQ接收到消息和將其保存到磁盤上這之間仍然有一個小的時間窗口。 RabbitMQ 可能只是將消息保存到了緩存中,並無將其寫入到磁盤上。持久化是不可以必定保證的,可是對於一個簡單任務隊列來講已經足夠。若是須要消息隊列持久化的強保證,可使用publisher confirms

2.5 公平分發

你可能會注意到,消息的分發可能並無如咱們想要的那樣公平分配。好比,對於兩個工做者。當奇數個消息的任務比較重,可是偶數個消息任務比較輕時,奇數個工做者始終處理忙碌狀態,而偶數個工做者始終處理空閒狀態。可是RabbitMQ並不知道這些,他仍然會平均依次的分發消息。

爲了改變這一狀態,咱們可使用basicQos方法,設置perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工做者發送多於1個的消息,或者換句話說。在一個工做者還在處理消息,而且沒有響應消息以前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那麼忙碌的工做者。

channel.BasicQos(0, 1, false); 

2.6 完整實例

如今將全部這些放在一塊兒:

發送端代碼以下:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
                   
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
                    
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.SetPersistent(true);
                  

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "task_queue", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

接收端代碼以下:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
            channel.BasicQos(0, 1, false);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("task_queue", false, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");

                channel.BasicAck(ea.DeliveryTag, false);
            }
        }
    }
}

三 管理界面

RabbitMQ還有一個管理界面,經過該界面能夠查看RabbitMQ Server 當前的狀態,該界面是以插件形式提供的,而且在安裝RabbitMQ的時候已經自帶了該插件。須要作的是在RabbitMQ控制檯界面中啓用該插件,命令以下:

rabbitmq-plugins enable rabbitmq_management

rabbitmq management

如今,在瀏覽器中輸入 http://server-name:15672/ server-name換成機器地址或者域名,若是是本地的,直接用localhost(RabbitMQ 3.0以前版本端口號爲55672)在輸入以後,彈出登陸界面,使用咱們以前建立的用戶登陸。

RabbitMQ Web management.

在該界面上能夠看到當前RabbitMQServer的全部狀態。

四 總結

本文簡單介紹了消息隊列的相關概念,並介紹了RabbitMQ消息代理的基本原理以及在Windows 上如何安裝RabbitMQ和在.NET中如何使用RabbitMQ。消息隊列在構建分佈式系統和提升系統的可擴展性和響應性方面有着很重要的做用,但願本文對您瞭解消息隊列以及如何使用RabbitMQ有所幫助。

五 參考文獻

  1. http://www.infoq.com/cn/articles/message-based-distributed-architecture
  2. http://www.rabbitmq.com/getstarted.html
  3. http://www.codethinked.com/using-rabbitmq-with-c-and-net
  4. http://www.infoq.com/cn/articles/AMQP-RabbitMQ
  5. http://www.infoq.com/cn/articles/ebay-scalability-best-practices
做者:  yangecnuyangecnu's Blog on 博客園) 
出處: http://www.cnblogs.com/yangecnu/ 
做品由yangecnu 創做,採用知識共享署名-非商業性使用-禁止演繹 2.5 中國大陸許可協議進行許可。 歡迎轉載,但任何轉載必須保留完整文章,在顯要地方顯示署名以及原文連接。如您有任何疑問或者受權方面的協商,請 給我留言
 
 
 

RabbitMQ與Redis隊列對比

2017-11-09 15:46 閱讀 786 views 次 評論 0 條

本文僅針對RabbitMQ與Redis作隊列應用時的狀況進行對比,
具體採用什麼方式實現,還須要取決於系統的實際需求

 

簡要介紹

 

 

RabbitMQ

RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

 

Redis

是一個Key-Value的NoSQL數據庫,開發維護很活躍,雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底能夠當作一個輕量級的隊列服務來使用。

 

具體對比

 

 

可靠消費

Redis:沒有相應的機制保證消息的消費,當消費者消費失敗的時候,消息體丟失,須要手動處理
RabbitMQ:具備消息消費確認,即便消費者消費失敗,也會自動使消息體返回原隊列,同時可全程持久化,保證消息體被正確消費

 

可靠發佈

Reids:不提供,需自行實現
RabbitMQ:具備發佈確認功能,保證消息被髮布到服務器

 

高可用

Redis:採用主從模式,讀寫分離,可是故障轉移尚未很是完善的官方解決方案
RabbitMQ:集羣採用磁盤、內存節點,任意單點故障都不會影響整個隊列的操做

 

持久化

Redis:將整個Redis實例持久化到磁盤
RabbitMQ:隊列,消息,均可以選擇是否持久化

 

消費者負載均衡

Redis:不提供,需自行實現
RabbitMQ:根據消費者狀況,進行消息的均衡分發

 

隊列監控

Redis:不提供,需自行實現
RabbitMQ:後臺能夠監控某個隊列的全部信息,(內存,磁盤,消費者,生產者,速率等)

 

流量控制

Redis:不提供,需自行實現
RabbitMQ:服務器過載的狀況,對生產者速率會進行限制,保證服務可靠性

 

出入隊性能

對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。
測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。

注:此數據來源於互聯網,部分數據有誤,已修正


 

應用場景分析

Redis:輕量級,高併發,延遲敏感
即時數據分析、秒殺計數器、緩存等

RabbitMQ:重量級,高併發,異步
批量數據異步處理、並行任務串行化,高負載任務的負載均衡等

 

 

版權聲明:本文版權由 木秀林網全部,轉載請保留連接: RabbitMQ與Redis隊列對比
 
 

RabbitMQ入門與使用篇

 

介紹

RabbitMQ是一個由erlang開發的基於AMQP(Advanced Message Queue)協議的開源實現。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面都很是的優秀。是當前最主流的消息中間件之一。

RabbitMQ的官方

image

  • 概念:
    • Brocker:消息隊列服務器實體。
    • Exchange:消息交換機,指定消息按什麼規則,路由到哪一個隊列。
    • Queue:消息隊列,每一個消息都會被投入到一個或者多個隊列裏。
    • Binding:綁定,它的做用是把exchange和queue按照路由規則binding起來。
    • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
    • Vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不用用戶的權限分離。
    • Producer:消息生產者,就是投遞消息的程序。
    • Consumer:消息消費者,就是接受消息的程序。
    • Channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
  • 消息隊列的使用過程大概以下:
    • 消息接收
      • 客戶端鏈接到消息隊列服務器,打開一個channel。
      • 客戶端聲明一個exchange,並設置相關屬性。
      • 客戶端聲明一個queue,並設置相關屬性。
      • 客戶端使用routing key,在exchange和queue之間創建好綁定關係。
    • 消息發佈
      • 客戶端投遞消息到exchange。
      • exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
  • AMQP 裏主要要說兩個組件:
    • Exchange 和 Queue
    • 綠色的 X 就是 Exchange ,紅色的是 Queue ,這二者都在 Server 端,又稱做 Broker
    • 這部分是 RabbitMQ 實現的,而藍色的則是客戶端,一般有 Producer 和 Consumer 兩種類型。
  • Exchange一般分爲四種:
    • fanout:該類型路由規則很是簡單,會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中,至關於廣播功能
    • direct:該類型路由規則會將消息路由到binding key與routing key徹底匹配的Queue中
    • topic:與direct類型類似,只是規則沒有那麼嚴格,能夠模糊匹配和多條件匹配
    • headers:該類型不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配
  • 使用場景

下載與安裝

  • 下載
  • 安裝
    • 先安裝erlang
    • 而後再安裝rabbitmq

管理工具

操做起來很簡單,只須要在DOS下面,進入安裝目錄(安裝路徑\RabbitMQ Server\rabbitmq_server-3.2.2\sbin)執行以下命令就能夠成功安裝。

rabbitmq-plugins enable rabbitmq_management 

能夠經過訪問:http://localhost:15672進行測試,默認的登錄帳號爲:guest,密碼爲:guest。

圖片

其餘配置

1. 安裝完之後erlang須要手動設置ERLANG_HOME 的系統變量。

set ERLANG_HOME=F:\Program Files\erl9.0 #環境變量`path`里加入:%ERLANG_HOME%\bin #環境變量`path`里加入: 安裝路徑\RabbitMQ Server\rabbitmq_server-3.6.10\sbin 

2.激活Rabbit MQ’s Management Plugin

使用Rabbit MQ 管理插件,能夠更好的可視化方式查看Rabbit MQ 服務器實例的狀態,你能夠在命令行中使用下面的命令激活。

rabbitmq-plugins.bat  enable rabbitmq_management 

3.建立管理用戶

rabbitmqctl.bat add_user sa 123456

4. 設置管理員

rabbitmqctl.bat set_user_tags sa administrator

5.設置權限

rabbitmqctl.bat set_permissions -p / sa ".*" ".*" ".*" 

6. 其餘命令

#查詢用戶: rabbitmqctl.bat list_users #查詢vhosts: rabbitmqctl.bat list_vhosts #啓動RabbitMQ服務: net stop RabbitMQ && net start RabbitMQ 

以上這些,帳號、vhost、權限、做用域等基本就設置完了。


基於.net使用

RabbitMQ.Client 是RabbiMQ 官方提供的的客戶端 
EasyNetQ 是基於RabbitMQ.Client 基礎上封裝的開源客戶端,使用很是方便

如下操做RabbitMQ的代碼例子,都是基於EasyNetQ的使用和再封裝,在文章底部有demo例子的源碼下載地址


建立 IBus

/// <summary> /// 消息服務器鏈接器 /// </summary> public class BusBuilder { public static IBus CreateMessageBus() { // 消息服務器鏈接字符串 // var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"]; string connString = "host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456"; if (connString == null || connString == string.Empty) throw new Exception("messageserver connection string is missing or empty"); return RabbitHutch.CreateBus(connString); } } 

Fanout Exchange

image

全部發送到Fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的全部Queue上。 
Fanout Exchange 不須要處理RouteKey 。只須要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的全部隊列上。相似子網廣播,每臺子網內的主機都得到了一份複製的消息。 因此,Fanout Exchange 轉發消息是最快的。


/// <summary> /// 消息消耗(fanout) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="handler">回調</param> /// <param name="exChangeName">交換器名</param> /// <param name="queueName">隊列名</param> /// <param name="routingKey">路由名</param> public static void FanoutConsume<T>(Action<T> handler, string exChangeName = "fanout_mq", string queueName = "fanout_queue_default", string routingKey = "") where T : class { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); var queue = CreateQueue(adbus, queueName); adbus.Bind(exchange, queue, routingKey); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { handler(message.Body); }); }); } /// <summary> /// 消息上報(fanout) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="topic">主題名</param> /// <param name="t">消息命名</param> /// <param name="msg">錯誤信息</param> /// <returns></returns> public static bool FanoutPush<T>(T t, out string msg, string exChangeName = "fanout_mq", string routingKey = "") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } 

image 
全部發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。 
Direct模式,可使用RabbitMQ自帶的Exchange:default Exchange 。因此不須要將Exchange進行任何綁定(binding)操做 。消息傳遞時,RouteKey必須徹底匹配,纔會被隊列接收,不然該消息會被拋棄。


/// <summary> /// 消息發送(direct) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="queue">發送到的隊列</param> /// <param name="message">發送內容</param> public static void DirectSend<T>(string queue, T message) where T : class { using (var bus = BusBuilder.CreateMessageBus()) { bus.Send(queue, message); } } /// <summary> /// 消息接收(direct) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="queue">接收的隊列</param> /// <param name="callback">回調操做</param> /// <param name="msg">錯誤信息</param> /// <returns></returns> public static bool DirectReceive<T>(string queue, Action<T> callback, out string msg) where T : class { msg = string.Empty; try { var bus = BusBuilder.CreateMessageBus(); bus.Receive<T>(queue, callback); } catch (Exception ex) { msg = ex.ToString(); return false; } return true; } /// <summary> /// 消息發送 /// <![CDATA[(direct EasyNetQ高級API)]]> /// </summary> /// <typeparam name="T"></typeparam> /// <param name="t"></param> /// <param name="msg"></param> /// <param name="exChangeName"></param> /// <param name="routingKey"></param> /// <returns></returns> public static bool DirectPush<T>(T t, out string msg, string exChangeName = "direct_mq", string routingKey = "direct_rout_default") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 消息接收 /// <![CDATA[(direct EasyNetQ高級API)]]> /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="handler">回調</param> /// <param name="exChangeName">交換器名</param> /// <param name="queueName">隊列名</param> /// <param name="routingKey">路由名</param> public static bool DirectConsume<T>(Action<T> handler, out string msg, string exChangeName = "direct_mq", string queueName = "direct_queue_default", string routingKey = "direct_rout_default") where T : class { msg = string.Empty; try { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct); var queue = CreateQueue(adbus, queueName); adbus.Bind(exchange, queue, routingKey); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { handler(message.Body); }); }); } catch (Exception ex) { msg = ex.ToString(); return false; } return true; } 

Topic Exchange

image

  • 消息發佈(Publish)

要使用主題發佈,只需使用帶有主題的重載的Publish方法:

var bus = RabbitHutch.CreateBus(...); bus.Publish(message, "X.A"); 

訂閱者能夠經過指定要匹配的主題來過濾郵件。

  • 這些能夠包括通配符:
    • *=>匹配一個字。
    • #=>匹配到零個或多個單詞。

因此發佈的主題爲「X.A.2」的消息將匹配「#」,「X.#」,「* .A.*」,而不是「X.B. *」或「A」。

警告,Publish只顧發送消息到隊列,可是無論有沒有消費端訂閱,因此,發佈以後,若是沒有消費者,該消息將不會被消費甚至丟失。

  • 消息訂閱(Subscribe)

EasyNetQ提供了消息訂閱,當調用Subscribe方法時候,EasyNetQ會建立一個用於接收消息的隊列,不過與消息發佈不一樣的是,消息訂閱增長了一個參數,subscribe_id.代碼以下:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*")); 

警告: 具備相同訂閱者但不一樣主題字符串的兩個單獨訂閱可能不會產生您指望的效果。 subscriberId有效地標識個體AMQP隊列。 具備相同subscriptionId的兩個訂閱者將鏈接到相同的隊列,而且二者都將添加本身的主題綁定。 因此,例如,若是你這樣作:

bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*")); bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B")); 

匹配「x.」或「 .B」的全部消息將被傳遞到「XXX_my_id」隊列。 而後,RabbitMQ將向兩個消費者傳遞消息,其中handlerOfXDotStar和handlerOfStarDotB輪流獲取每條消息。

如今,若是你想要匹配多個主題(「X. 」OR「 .B」),你可使用另外一個重載的訂閱方法,它採用多個主題,以下所示:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B")); 

/// <summary> /// 獲取主題 /// </summary> /// <typeparam name="T">主題內容類型</typeparam> /// <param name="subscriptionId">訂閱者ID</param> /// <param name="callback">消息接收響應回調</param> /// <param name="topics">訂閱主題集合</param> public static void TopicSubscribe<T>(string subscriptionId, Action<T> callback, params string[] topics) where T : class { var bus = BusBuilder.CreateMessageBus(); bus.Subscribe(subscriptionId, callback, (config) => { foreach (var item in topics) config.WithTopic(item); }); } /// <summary> /// 發佈主題 /// </summary> /// <typeparam name="T">主題內容類型</typeparam> /// <param name="topic">主題名稱</param> /// <param name="message">主題內容</param> /// <param name="msg">錯誤信息</param> /// <returns></returns> public static bool TopicPublish<T>(string topic, T message, out string msg) where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { bus.Publish(message, topic); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 發佈主題 /// </summary> /// <![CDATA[(topic EasyNetQ高級API)]]> /// <typeparam name="T">消息類型</typeparam> /// <param name="t">消息內容</param> /// <param name="topic">主題名</param> /// <param name="msg">錯誤信息</param> /// <param name="exChangeName">交換器名</param> /// <returns></returns> public static bool TopicSub<T>(T t, string topic, out string msg, string exChangeName = "topic_mq") where T : class { msg = string.Empty; try { if (string.IsNullOrWhiteSpace(topic)) throw new Exception("推送主題不能爲空"); using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; //var queue = adbus.QueueDeclare("user.notice.zhangsan"); var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic); adbus.Publish(exchange, topic, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 獲取主題 /// </summary> /// <![CDATA[(topic EasyNetQ高級API)]]> /// <typeparam name="T">消息類型</typeparam> /// <param name="subscriptionId">訂閱者ID</param> /// <param name="callback">回調</param> /// <param name="exChangeName">交換器名</param> /// <param name="topics">主題名</param> public static void TopicConsume<T>(Action<T> callback, string exChangeName = "topic_mq",string subscriptionId = "topic_subid", params string[] topics) where T : class { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic); var queue = adbus.QueueDeclare(subscriptionId); foreach (var item in topics) adbus.Bind(exchange, queue, item); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { callback(message.Body); }); }); } 

具體發佈/訂閱消息的Demo和相關測試看源碼Demo

image

注意

當在建立訂閱者去消費隊列的時候

/// <summary> /// 獲取主題 /// </summary> /// <param name="topic"></param> public static void GetSub<T>(T topic, Action<T> callback) where T : class { using (var bus = BusBuilder.CreateMessageBus()) { bus.Subscribe<T>(topic.ToString(), callback, x => x.WithTopic(topic.ToString())); } } 

using裏的對象在執行完成後被回收了,致使剛鏈接上去就又斷開了(剛開始寫的時候,習慣性加using,排查了很久才發現,欲哭無淚)

源碼項目運行前的準備與確認:

到RabbitMQ管理後臺添加TestQueueVHost,而且分配用戶權限,而後到RabbitMQHelper.BusBuilder類裏配置RabbitMQ鏈接服務的相關信息 host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456,(根據配置的內容和用戶修改)

image


參考資料(鳴謝):


附:Demo源碼GitHub地址 


 

歡迎到原文地址關注和交流

相關文章
相關標籤/搜索