.NET Core微服務之基於EasyNetQ使用RabbitMQ消息隊列

Tip: 此篇已加入.NET Core微服務基礎系列文章索引html

1、消息隊列與RabbitMQ

1.1 消息隊列

  「消息」是在兩臺計算機間傳送的數據單位。消息能夠很是簡單,例如只包含文本字符串;也能夠更復雜,可能包含嵌入對象。消息被髮送到隊列中,「消息隊列」是在消息的傳輸過程當中保存消息的容器git

  消息隊列(Message Queue),是分佈式系統中重要的組件,其通用的使用場景能夠簡單地描述爲:github

當不須要當即得到結果,可是併發量又須要進行控制的時候,差很少就是須要使用消息隊列的時候。  web

  消息隊列主要解決了應用耦合、異步處理、流量削鋒等問題。當前使用較多的消息隊列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分數據庫如Redis、Mysql以及phxsql也可實現消息隊列的功能。更多詳細內容請參考:《消息隊列及其應用場景介紹sql

  我也在前幾年寫過一篇基於Redis作消息隊列的文章,對消息隊列的一個應用場景作了介紹,沒有了解過的童鞋能夠看看。數據庫

1.2 RabbitMQ

  

  RabbitMQ是一款基於AMQP(高級消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、Erlang等。json

  網上有不少性能比較的文章,例如在1百萬條1k的消息下,每秒種的收發狀況以下圖所示:api

  性能比較

  這裏不過多介紹RabbitMQ,有關RabbitMQ的一些須要瞭解的概念你能夠經過下面的文章瞭解:瀏覽器

  顏聖傑,《RabbitMQ知多少安全

  若是你想了解RabbitMQ與Kafka的對比,能夠閱讀這篇文章:《開源軟件成熟度評測報告-分佈式消息中間件

  而EasyNetQ呢,它是一款基於RabbitMQ.Client封裝的API庫,正如其名,使用起來比較Easy,它把原RabbitMQ.Client中的不少操做都進行了再次封裝,讓開發人員減小了不少工做量。

2、RabbitMQ的安裝

2.1 Linux下的安裝

  這裏不演示如何在Linux下安裝,但推薦生產環境使用Linux,下面是一些參考資料:

  mcgrady,《Linux下RabbitMQ的安裝

  曉晨Master,《.NET Core使用RabbitMQ

  牛頭人,《Linux安裝RabbitMQ

  一隻豬兒蟲,《RabbitMQ Linux安裝

2.2 Windows下的安裝

  開發環境下,我通常使用Windows Server虛擬機,因此這裏說明下如何在Windows下安裝:

  (1)下載ErlangRabbitMQ (這裏我選則的並不是最新版本,而是etp20.3和rabbitmq3.7.5)

  

  (2)首先安裝Erlang,而後添加環境變量(若是添加了,則skip這一步)並加到PATH中

  

  (3)其次安裝RabbitMQ,一路Next,安裝完成後也爲其添加環境變量並添加到PATH中

  

  

  (4)檢查是否安裝成功:rabbitmqctl status

  這裏我碰到了以下的錯誤:

  

  解決方法:

  更正erlang.cookie文件,詳情請參考:https://blog.csdn.net/u012637358/article/details/80078610

  最終狀態:

  

  檢查Windows服務,發現已經自動註冊了一個服務:

  

  (5)激活Web管理插件,而後檢查是否可見(http://127.0.0.1:15672)

  

  

2.3 一些必要的配置

  (1)使用默認帳號:guest/guest登陸進去,添加一個新用戶(Administrator權限),並設置其Permission

  

  (2)添加新的虛擬機(默認爲/,這裏我添加一個名爲EDCVHOST的虛擬機)

  

  (3)綁定新添加的用戶到新的虛擬機上,接下來在咱們的程序中就主要使用admin這個用戶和EDCVHOST這個虛擬機

  

  *.固然,爲了安全考慮,你也能夠把guest用戶remove掉

3、Quick Start:第一個消息隊列

3.1 項目準備

  這裏爲了快速的演示如何使用EasyNetQ,咱們來一個QuickStart,準備三個項目:兩個Console程序和一個Class Library。

  

  其中,對Publisher和Subscriber項目安裝EasyNetQ:

NuGet>Install-Package EasyNetQ  

  針對Messages類庫,新增一個class以下:

    public class TextMessage
    {
        public string Text { get; set; }
    }

3.2 我是Publisher

  添加如下代碼:

複製代碼
    public class Program
    {
        public static void Main(string[] args)
        {
            var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison";

            using (var bus = RabbitHutch.CreateBus(connStr))
            {
                var input = "";
                Console.WriteLine("Please enter a message. 'Quit' to quit.");
                while ((input = Console.ReadLine()) != "Quit")
                {
                    bus.Publish(new TextMessage
                    {
                        Text = input
                    });
                }
            }
        }
    }
複製代碼

  能夠看到,咱們在其中使用EasyNetQ高度封裝的接口建立了一個IBus接口的實例,經過這個IBus實例咱們能夠經過一個超級Easy的Publish接口進行發佈消息。這裏主要是讀取用戶在控制檯中輸入的消息字符串進行發送。實際中,發送的通常都是一個或多個複雜的實體對象。

3.3 我是Subscriber

  添加以下所示代碼:

複製代碼
    public class Program
    {
        public static void Main(string[] args)
        {
            var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison";

            using (var bus = RabbitHutch.CreateBus(connStr))
            {
                bus.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage);

                Console.WriteLine("Listening for messages. Hit <return> to quit.");
                Console.ReadLine();
            }
        }

        public static void HandleTextMessage(TextMessage textMessage)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("Got message: {0}", textMessage.Text);
            Console.ResetColor();
        }
    }
複製代碼

  這裏主要是經過IBus實例去訂閱消息(這裏是除非用戶關閉程序不然一直處於監聽狀態),當發佈者發佈了指定類型的消息以後,這裏就把它打印出來(紅色字體顯示)。

3.4 簡單測試 

  經過控制檯信息查看結果:

  

  經過RabbitMQ管理界面查看:

  (1)經過Connections Tab能夠發現咱們的兩個客戶端都在Running中

  

  (2)經過Queues Tab查看目前已有的隊列=>能夠看到目前咱們只註冊了一個隊列

  

4、在ASP.NET Core中的使用

4.1 案例結構與說明

  這裏假設有這樣一個場景,客戶經過瀏覽器提交了一個保單,這個保單中包含一些客戶信息,ClientService將這些信息處理後發送一個消息到RabbitMQ中,NoticeService和ZAPEngineService訂閱了這個消息。NoticeService會將客戶信息取出來並獲取一些更多信息爲客戶發送Email,而ZAPEngineService則會根據客戶的一些關鍵信息(好比:年齡,是否吸菸,學歷,年收入等等)去數據庫讀取一些規則來生成一份Question List並存入數據庫。

4.2 項目準備工做

  建立上面提到的這幾個項目,這裏我選擇ASP.NET Core WebAPI類型。

  分別爲這幾個項目經過NuGet安裝EasyNetQ組件,而且經過如下代碼注入統一的IBus實例對象:

複製代碼
    public IServiceProvider ConfigureServices(IServiceCollection services)
    {
      // IoC - EventBus
      services.AddSingleton(RabbitHutch.CreateBus(Configuration["MQ:Dev"]));
      ......
    }
複製代碼

  這裏我將鏈接字符串寫到了配置文件中,請參考上面的QuickStart中的內容。

  下面是這個demo用到的一個消息對象實體:經過標籤聲明隊列名稱。

複製代碼
    [Queue("Qka.Client", ExchangeName = "Qka.Client")]
    public class ClientMessage
    {
        public int ClientId { get; set; }
        public string ClientName { get; set; }
        public string Sex { get; set; }
        public int Age { get; set; }
        // N: Non-Smoker, S: Smoker
        public string SmokerCode { get; set; }
        // Bachelor, Master, Doctor
        public string Education { get; set; }
        public decimal YearIncome { get; set; }
    }
複製代碼

  此外,爲了充分簡化代碼量,EasyNetQ提供了一個AutoSubscriber的方式,能夠經過接口和標籤快速地讓一個類成爲Consumer。詳細內容參考:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber

  這裏爲了快速的在項目中使用Subscriber,添加一個擴展方法,它會從注入的服務中取出IBus實例對象,並自動幫咱們進行Subscriber(那些實現了IConsume接口的類)的註冊。具體用法見後面的介紹。

複製代碼
    public static class AppBuilderExtension
    {
        public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
        {
            var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;

            var lifeTime = services.GetService<IApplicationLifetime>();
            var bus = services.GetService<IBus>();
            lifeTime.ApplicationStarted.Register(() =>
            {
                var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
                subscriber.Subscribe(assembly);
                subscriber.SubscribeAsync(assembly);
            });

            lifeTime.ApplicationStopped.Register(() => bus.Dispose());

            return appBuilder;
        }
    }
複製代碼

4.3 Publisher:ClientService

  ClientService做爲發佈者,這裏假設咱們在API中處理完業務代碼後,將message發佈給RabbitMQ:

複製代碼
    [Produces("application/json")]
    [Route("api/Client")]
    public class ClientController : Controller
    {
        private readonly IClientService clientService;
        private readonly IBus bus;

        public ClientController(IClientService _clientService, IBus _bus)
        {
            clientService = _clientService;
            bus = _bus;
        }

        ......

        [HttpPost]
        public async Task<string> Post([FromBody]ClientDTO clientDto)
        {
            // Business Logic here...
            // eg.Add new client to your service databases via EF
            // Sample Publish
            ClientMessage message = new ClientMessage
            {
                ClientId = clientDto.Id.Value,
                ClientName = clientDto.Name,
                Sex = clientDto.Sex,
                Age = 29,
                SmokerCode = "N",
                Education = "Master",
                YearIncome = 100000
            };
            await bus.PublishAsync(message);

            return "Add Client Success! You will receive some letter later.";
        }
    }
複製代碼

  固然,你可使用同步方法:bus.Publish(message);

4.4 Subscriber: NoticeService & ZAPEngineService

  (1)NoticeService:新增一個實現IConsume接口的Consumer類

複製代碼
    public class ClientMessageConsumer: IConsumeAsync<ClientMessage>
    {
        [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.Notice")]
        public Task ConsumeAsync(ClientMessage message)
        {
            // Your business logic code here
            // eg.Build one email to client via SMTP service
            // Sample console code
            System.Console.ForegroundColor = System.ConsoleColor.Red;
            System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will send one email to client.", message.ClientName);
            System.Console.ResetColor();

            return Task.CompletedTask;
        }
    }
複製代碼

  這裏爲了演示效果,增長了一些輸出信息的代碼,下面的ZAPEngineService也是同樣,再也不贅述。

  (2)ZAPEngineService:新增一個實現IConsume接口的Consumer類

複製代碼
    public class ClientMessageConsumer : IConsumeAsync<ClientMessage>
    {
        [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.ZapQuestion")]
        public Task ConsumeAsync(ClientMessage message)
        {
            // Your business logic code here
            // eg.Generate one ZAP question records into database and send to client
            // Sample console code
            System.Console.ForegroundColor = System.ConsoleColor.Red;
            System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName);
            System.Console.ResetColor();

            return Task.CompletedTask;
        }
    }
複製代碼

  注意兩個Consumer的SubscriptionId不能同樣,不然沒法接受到消息。

  (3)爲兩個Consumer使用擴展方法:UseSubscribe

複製代碼
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        // easyNetQ
        app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly());
    }
複製代碼

4.5 簡單測試

  (1)藉助Postman向ClientService發起Post請求

  (2)查看NoticeService的日誌信息

  (3)查看ZAPEngineService的日誌信息

  (4)查看RabbitMQ的管理控制檯:

5、小結

  本篇超級簡單地介紹了一下消息隊列與RabbitMQ,經過使用EasyNetQ這個基於RabbitMQ.Client的客戶端作了一個QuickStart演示了在.NET Core環境下如何進行消息的發佈與訂閱,並經過一個微服務的小案例演示瞭如何在ASP.NET Core環境下如何基於EasyNetQ完成消息的發佈與訂閱,看起來就像一個相似於簡單的事件總線。固然,本篇的內容都十分基礎,若是要應用好RabbitMQ,還得把那些基礎概念(如:Channel,Exchange等)弄清楚,而後去理解一下事件總線的概念,實際中還得考慮數據一致性等等,路途漫漫,繼續加油吧!

示例代碼

  Click Here => 點我下載

參考資料

EasyNetQ官方文檔:https://github.com/EasyNetQ/EasyNetQ/wiki/Introduction

focus-lei,《.net core使用EasyNetQ作EventBus

常山造紙農,《RabbitMQ安裝配置和基於EasyNetQ驅動的基礎使用

相關文章
相關標籤/搜索