ASP.NET Core2利用MassTransit集成RabbitMQ

在ASP.NET Core上利用MassTransit來集成使用RabbitMQ真的很簡單,代碼也很簡潔。近期由於項目須要,我便在這基礎上再次進行了封裝,抽成了公共方法,使得使用RabbitMQ的調用變得更方便簡潔。那麼,就讓我們來瞧瞧其魅力所在吧。html

 

MassTransitgit

先看看MassTransit是個什麼寶貝(MassTransit官網的簡介):github

MassTransit是一個免費的開源輕量級消息總線,用於使用.NET框架建立分佈式應用程序。MassTransit在現有的頂級消息傳輸上提供了一系列普遍的功能,從而以開發人員友好的方式使用基於消息的會話模式異步鏈接服務。基於消息的通訊是實現面向服務的體系結構的可靠且可擴展的方式。json

通俗描述:app

MassTransit就是一套基於消息服務的高級封裝類庫,下游可聯接RabbitMQ、Redis、MongoDb等服務。框架

github官網:https://github.com/MassTransit/MassTransit異步

 

RabbitMQasync

RabbitMQ是成熟的MQ隊列服務,是由 Erlang 語言開發的 AMQP 的開源實現。關於介紹RabbitMQ的中文資料也不少,有須要能夠自行查找。我這裏貼出其官網與下載安裝的連接,以下:分佈式

官網:http://www.rabbitmq.com測試

下載與安裝:http://www.rabbitmq.com/download.html

 

實現代碼

經過上面的介紹,我們已對MassTransit與RabbitMQ有了初步瞭解,那麼如今來看看如何在ASP.NET Core上優雅的使用RabbitMQ吧。

一、建立一個名爲「RabbitMQHelp.cs」公共類,用於封裝操做RabbitMQ的公共方法,並經過Nuget來管理並引用「MassTransit」與「MassTransit.RabbitMQ」類庫。

二、「RabbitMQHelp.cs」公共類主要對外封裝兩個靜態方法,其代碼以下:

  1 using MassTransit;
  2 using MassTransit.RabbitMqTransport;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.Text;
  6 using System.Threading.Tasks;
  7 
  8 namespace Lezhima.Comm
  9 {
 10     /// <summary>
 11     /// RabbitMQ公共操做類,基於MassTransit庫
 12     /// </summary>
 13     public class RabbitMQHelp
 14     {
 15         #region 交換器
 16 
 17         /// <summary>
 18         /// 操做日誌交換器
 19         /// 同時需在RabbitMQ的管理後臺建立同名交換器
 20         /// </summary>
 21         public static readonly string actionLogExchange = "Lezhima.ActionLogExchange";
 22 
 23 
 24         #endregion
 25 
 26 
 27         #region 聲明變量
 28 
 29         /// <summary>
 30         /// MQ聯接地址,建議放到配置文件
 31         /// </summary>
 32         private static readonly string mqUrl = "rabbitmq://192.168.1.181/";
 33 
 34         /// <summary>
 35         /// MQ聯接帳號,建議放到配置文件
 36         /// </summary>
 37         private static readonly string mqUser = "admin";
 38 
 39         /// <summary>
 40         /// MQ聯接密碼,建議放到配置文件
 41         /// </summary>
 42         private static readonly string mqPwd = "admin";
 43 
 44         #endregion
 45 
 46         /// <summary>
 47         /// 建立鏈接對象
 48         /// 不對外公開
 49         /// </summary>
 50         private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null)
 51         {
 52             //經過MassTransit建立MQ聯接工廠
 53             return Bus.Factory.CreateUsingRabbitMq(cfg =>
 54             {
 55                 var host = cfg.Host(new Uri(mqUrl), hst =>
 56                 {
 57                     hst.Username(mqUser);
 58                     hst.Password(mqPwd);
 59                 });
 60                 registrationAction?.Invoke(cfg, host);
 61             });
 62         }
 63 
 64 
 65         /// <summary>
 66         /// MQ生產者
 67         /// 這裏使用fanout的交換類型
 68         /// </summary>
 69         /// <param name="obj"></param>
 70         public async static Task PushMessage(string exchange, object obj)
 71         {
 72             var bus = CreateBus();
 73             var sendToUri = new Uri($"{mqUrl}{exchange}");
 74             var endPoint = await bus.GetSendEndpoint(sendToUri);
 75             await endPoint.Send(obj);
 76         }
 77 
 78         /// <summary>
 79         /// MQ消費者
 80         /// 這裏使用fanout的交換類型
 81         /// consumer必需是實現IConsumer接口的類實例
 82         /// </summary>
 83         /// <param name="obj"></param>
 84         public static void ReceiveMessage(string exchange, object consumer)
 85         {
 86             var bus = CreateBus((cfg, host) =>
 87             {
 88                 //從指定的消息隊列獲取消息 經過consumer來實現消息接收
 89                 cfg.ReceiveEndpoint(host, exchange, e =>
 90                 {
 91                     e.Instance(consumer);
 92                 });
 93             });
 94             bus.Start();
 95         }
 96     }
 97 }
 98 

 

三、「RabbitMQHelp.cs」公共類已經有了MQ「生產者」與「消費者」兩個對外的靜態公共方法,其中「生產者」方法能夠在業務代碼中直接調用,可傳遞JSON、對象等類型的參數向指定的交換器發送數據。而「消費者」方法是從指定交換器中進行接收綁定,但接收到的數據處理功能則交給了「consumer」類(由於在實際項目中,不一樣的數據有不一樣的業務處理邏輯,因此這裏咱們直接就經過IConsumer接口交給具體的實現類去作了)。那麼,下面咱們再來看看消費者裏傳遞進來的「consumer」類的代碼吧:

  1 using MassTransit;
  2 using System;
  3 using System.Collections.Generic;
  4 using System.Text;
  5 using System.Threading.Tasks;
  6 
  7 namespace Lezhima.Storage.Consumer
  8 {
  9     /// <summary>
 10     /// 從MQ接收並處理數據
 11     /// 實現MassTransit的IConsumer接口
 12     /// </summary>
 13     public class LogConsumer : IConsumer<ActionLog>
 14     {
 15         /// <summary>
 16         /// 實現Consume方法
 17         /// 接收並處理數據
 18         /// </summary>
 19         /// <param name="context"></param>
 20         /// <returns></returns>
 21         public Task Consume(ConsumeContext<ActionLog> context)
 22         {
 23             return Task.Run(async () =>
 24             {
 25                 //獲取接收到的對象
 26                 var amsg = context.Message;
 27                 Console.WriteLine($"Recevied By Consumer:{amsg}");
 28                 Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}");
 29             });
 30         }
 31     }
 32 }
 33 

 

調用代碼

一、生產者調用代碼以下:

  1 /// <summary>
  2 /// 測試MQ生產者
  3 /// </summary>
  4 /// <returns></returns>
  5 [HttpGet]
  6 public async Task<MobiResult> AddMessageTest()
  7 {
  8     //聲明一個實體對象
  9     var model = new ActionLog();
 10     model.ActionLogId = Guid.NewGuid();
 11     model.CreateTime = DateTime.Now;
 12     model.UpdateTime = DateTime.Now;
 13     //調用MQ
 14     await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model);
 15 
 16     return new MobiResult(1000, "操做成功");
 17 }

 

二、消費者調用代碼以下:

  1 using Lezhima.Storage.Consumer;
  2 using Microsoft.Extensions.Configuration;
  3 using System;
  4 using System.IO;
  5 
  6 namespace Lezhima.Storage
  7 {
  8     class Program
  9     {
 10         static void Main(string[] args)
 11         {
 12             var conf = new ConfigurationBuilder()
 13               .SetBasePath(Directory.GetCurrentDirectory())
 14               .AddJsonFile("appsettings.json", true, true)
 15               .Build();
 16 
 17             //調用接收者
 18             RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange,
 19              new LogConsumer()
 20             );
 21 
 22             Console.ReadLine();
 23         }
 24     }
 25 }
 26 

 

總結

一、基於MassTransit庫使得咱們使用RabbitMQ變得更簡潔、方便。而基於再次封裝後,生產者與消費者將不須要關注具體的業務,也跟業務代碼解耦了,更能適應項目的須要。

二、RabbitMQ的交換器需在其管理後臺自行建立,而這裏使用的fanout類型是由於其發送速度最快,且能知足個人項目須要,各位可視自身狀況選用不一樣的類型。fanout類型不會存儲消息,必須要消費者綁定交換器後纔會發送給消費者。

 

聲明

本文爲做者原創,轉載請備註出處與保留原文地址,謝謝。如文章能給您帶來幫助,請點下推薦或關注,感謝您的支持!

相關文章
相關標籤/搜索