.NET Core RabbitMQ探索(1)

  RabbitMQ能夠被比做一個郵局,當你向郵局寄一封信時,郵局會保證將這封信送達你寫的收件人,而RabbitMQ與郵局最主要的區別是,RabbitMQ並不真的處理信件,它處理的是二進制的數據塊,它除了接收「信件」,他還要負責處理數據(好比保存數據)、轉發數據(實際上送信是郵遞員的職責)等。docker

  在RabbitMQ中有如下幾個概念:異步

  • 生產者

  一個生產者僅僅負責向RabbitMQ中發送消息,除此以外它不會完成其餘工做。函數

  • 隊列

  隊列由RabbitMQ自身提供,它的做用就像是郵箱和郵局的關係同樣。雖然看似RabbitMQ和你的應用直接通訊,可是實際上消息是被存儲到隊列中的,實際上,隊列就是一個消息緩衝區,受宿主環境內存和磁盤空間的限制。多個生產者能夠將消息發送到一個隊列,而多個消費者能夠從一個隊列中接收消息。測試

  • 消費者

  一個消費者僅僅負責接收來自生產者的消息。spa

  這裏須要注意如下兩點:.net

  1. 生產者、消費者和RabbitMQ不必定必須在相同的宿主機上,在實際的生產環境中,它們經常處於不一樣的宿主機;
  2. 某個應用既能夠同時是一個生產者和消費者。

  咱們須要創建兩個.net core項目,分別是發佈端、接收端,它們由消息隊列進行鏈接,消息隊列會保存消息,直到消息被髮送給接收端,如圖所示。3d

  下面開始建立項目: 

  建立項目後,兩個項目均需安裝RabbitMQ.Client的Nuget包,並restore。rest

  在正式開始編寫發送端代碼以前,咱們須要安裝RabbitMQ,這裏我在Docker上安裝RabbitMQ,所用系統爲CentOS7。
  首先創新新的文件夾目錄/docker/rabbitmq/data,用做RabbitMQ容器的數據卷掛載。code

  而後運行下面的語句,若是本地沒有鏡像,Docker會自動去Dockerhub上拉取鏡像:blog

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq --volume /rabbitmq/data:/var/lib/rabbitmq rabbitmq:management

  鏡像下載完畢後能夠看到容器已經運行,其中,15671用於外網訪問,5672用於內網鏈接。

  訪問http://ip:15671,不出意外你能夠進入到RabbitMQ的可視化管理界面。

  • 生產者端

  下面能夠開始編寫生產者端的代碼,以下所示。

using System;
using System.Text;
using RabbitMQ.Client;

namespace Send
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "148.70.210.208" };
            //鏈接RabbitMQ
            using (var connection = factory.CreateConnection())
            {
                //建立一個信道
                using (var channel = connection.CreateModel())
                {
                    //聲明一個Hello隊列,信道就與Hello隊列鏈接起來了
                    channel.QueueDeclare(queue: "hello",
                        durable: false,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    string message = "Hello World!";
                    //消息體
                    var body = Encoding.UTF8.GetBytes(message);

                    //經過信道,將消息發送到隊列
                    //須要指明隊列的名稱,才能轉發到特定隊列
                    channel.BasicPublish(exchange: "",
                        routingKey: "hello",
                        basicProperties: null,
                        body: body);

                    Console.WriteLine(" [x] Sent [0]", message);
                }

                Console.WriteLine("Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

  程序首先經過Socket與RabbitMQ進行了鏈接,鏈接成功後,建立了一個信道,並經過這個信道聲明瞭一個名叫"hello"的隊列,這樣,隊列就和這個信道綁定起來,在發送數據時,只須要在routingKey中指明所用的隊列名,消息就會經過direct exchange的模式被髮送到指明名稱的隊列中,這就好像你在網上買了商品,而後你告訴賣家,必定要發某個快遞同樣,這樣,你的商品就會經過這個快遞公司郵寄給你。

  direct exchange模式如圖所示。

  整個流程能夠以下進行歸納,首先創建發佈端和RabbitMQ進行Socket鏈接,而且聲明Socket鏈接中的信道,經過這個信道聲明特定名稱的隊列,這樣,數據就能夠經過這個信道發送到RabbitMQ的隊列中,等待被轉發到消費端,這與官網給出的流程圖一模一樣。

  

  

  • 消費者端 

  下面咱們就能夠開始編寫消費端的程序,對於消費者端來講,它一直在進行監聽,咱們須要一直運行咱們的監聽程序,一旦收到消息,就把消息經過控制檯進行打印。
  打開咱們的Recieve項目,代碼以下所示。

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Receive
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "your ip address" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "hello",
                        durable: false,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>
                     {
                         var body = ea.Body;
                         var message = Encoding.UTF8.GetString(body);
                         Console.WriteLine($"Received message:{message}");
                     };

                    channel.BasicConsume(queue: "hello",
                    autoAck: true,
                    consumer: consumer);

                    Console.WriteLine("Press [enter] to exit");
                    Console.ReadLine();
                }
            }
        }
    }
}

  一開始的代碼與發送端一致,創建鏈接、建立信道、聲明隊列,須要注意這裏聲明的隊列名須要與發送端綁定的隊列名一致。這裏之因此從新聲明隊列,是爲了保證當接收端程序比發送端先啓動時,隊列依然是存在的,可是須要保證名稱同樣。

  以上準備工做完成後,接收端能夠開始異步的接收隊列中的數據了。咱們使用EventBasicConsumer.Received事件來綁定接收完成的回調函數。即:當消費端成功從"hello"隊列中接收到數據後,就會調用這個匿名的lambda表達式,從消息體中獲取數據,將數據轉碼成字符串格式而且打印到控制檯中,至此,一條消息的處理就完畢了。
  須要注意,在消費端收到信息後,須要給客戶端確認,即返回autoAck,若是沒有返回,客戶端會默認消費者沒有收到,消息會存儲在隊列中,直到被消費。這裏設置autoAck:true,則表示消費者一旦收到消息,則馬上自動的向客戶端迴應,這樣,消息就會從隊列中刪除。

 

  • 測試代碼

  運行代碼能夠看到,不管先運行消費端仍是生產者端的代碼,消費端都能收到消息:

  若是先運行生產者端程序,消息會存儲在消息隊列中,當運行消費者端程序後,保存在消息隊列中的消息將會被消費,進而從隊列中刪除。
  例如,咱們先不運行消費者端程序,只運行生產者端,這是咱們刷新RabbitMQ的可視化界面。

  能夠看到有一條消息處於Ready狀態,可是還未被消費,因此被存儲,咱們關閉生產者端程序,再次運行,能夠看到隊列中未被消費的消息變爲兩條。

  這時,咱們運行消費者端程序,運行成功後能夠看到,消費者端一次收到了兩條消息,而且隊列中的消息被清除。

  

  因而可知,對於RabbitMQ的消息隊列,生產者和消費者端是徹底分離的。  下一講咱們將繼續探索RabbitMQ的工做隊列。

相關文章
相關標籤/搜索