在ASP.NET Core上利用MassTransit來集成使用RabbitMQ真的很簡單,代碼也很簡潔。近期由於項目須要,我便在這基礎上再次進行了封裝,抽成了公共方法,使得使用RabbitMQ的調用變得更方便簡潔。那麼,就讓我們來瞧瞧其魅力所在吧。html
MassTransitgit
先看看MassTransit是個什麼寶貝(MassTransit官網的簡介):github
MassTransit是一個免費的開源輕量級消息總線,用於使用.NET框架建立分佈式應用程序。MassTransit在現有的頂級消息傳輸上提供了一系列普遍的功能,從而以開發人員友好的方式使用基於消息的會話模式異步鏈接服務。基於消息的通訊是實現面向服務的體系結構的可靠且可擴展的方式。json
通俗描述:app
MassTransit就是一套基於消息服務的高級封裝類庫,下游可聯接RabbitMQ、Redis、MongoDb等服務。框架
github官網:https://github.com/MassTransit/MassTransit異步
RabbitMQasync
RabbitMQ是成熟的MQ隊列服務,是由 Erlang 語言開發的 AMQP 的開源實現。關於介紹RabbitMQ的中文資料也不少,有須要能夠自行查找。我這裏貼出其官網與下載安裝的連接,以下:分佈式
下載與安裝:http://www.rabbitmq.com/download.html
實現代碼
經過上面的介紹,我們已對MassTransit與RabbitMQ有了初步瞭解,那麼如今來看看如何在ASP.NET Core上優雅的使用RabbitMQ吧。
一、建立一個名爲「RabbitMQHelp.cs」公共類,用於封裝操做RabbitMQ的公共方法,並經過Nuget來管理並引用「MassTransit」與「MassTransit.RabbitMQ」類庫。
二、「RabbitMQHelp.cs」公共類主要對外封裝兩個靜態方法,其代碼以下:
1 using MassTransit; 2 using MassTransit.RabbitMqTransport; 3 using System; 4 using System.Collections.Generic; 5 using System.Text; 6 using System.Threading.Tasks; 7 8 namespace Lezhima.Comm 9 { 10 /// <summary> 11 /// RabbitMQ公共操做類,基於MassTransit庫 12 /// </summary> 13 public class RabbitMQHelp 14 { 15 #region 交換器 16 17 /// <summary> 18 /// 操做日誌交換器 19 /// 同時需在RabbitMQ的管理後臺建立同名交換器 20 /// </summary> 21 public static readonly string actionLogExchange = "Lezhima.ActionLogExchange"; 22 23 24 #endregion 25 26 27 #region 聲明變量 28 29 /// <summary> 30 /// MQ聯接地址,建議放到配置文件 31 /// </summary> 32 private static readonly string mqUrl = "rabbitmq://192.168.1.181/"; 33 34 /// <summary> 35 /// MQ聯接帳號,建議放到配置文件 36 /// </summary> 37 private static readonly string mqUser = "admin"; 38 39 /// <summary> 40 /// MQ聯接密碼,建議放到配置文件 41 /// </summary> 42 private static readonly string mqPwd = "admin"; 43 44 #endregion 45 46 /// <summary> 47 /// 建立鏈接對象 48 /// 不對外公開 49 /// </summary> 50 private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null) 51 { 52 //經過MassTransit建立MQ聯接工廠 53 return Bus.Factory.CreateUsingRabbitMq(cfg => 54 { 55 var host = cfg.Host(new Uri(mqUrl), hst => 56 { 57 hst.Username(mqUser); 58 hst.Password(mqPwd); 59 }); 60 registrationAction?.Invoke(cfg, host); 61 }); 62 } 63 64 65 /// <summary> 66 /// MQ生產者 67 /// 這裏使用fanout的交換類型 68 /// </summary> 69 /// <param name="obj"></param> 70 public async static Task PushMessage(string exchange, object obj) 71 { 72 var bus = CreateBus(); 73 var sendToUri = new Uri($"{mqUrl}{exchange}"); 74 var endPoint = await bus.GetSendEndpoint(sendToUri); 75 await endPoint.Send(obj); 76 } 77 78 /// <summary> 79 /// MQ消費者 80 /// 這裏使用fanout的交換類型 81 /// consumer必需是實現IConsumer接口的類實例 82 /// </summary> 83 /// <param name="obj"></param> 84 public static void ReceiveMessage(string exchange, object consumer) 85 { 86 var bus = CreateBus((cfg, host) => 87 { 88 //從指定的消息隊列獲取消息 經過consumer來實現消息接收 89 cfg.ReceiveEndpoint(host, exchange, e => 90 { 91 e.Instance(consumer); 92 }); 93 }); 94 bus.Start(); 95 } 96 } 97 } 98
三、「RabbitMQHelp.cs」公共類已經有了MQ「生產者」與「消費者」兩個對外的靜態公共方法,其中「生產者」方法能夠在業務代碼中直接調用,可傳遞JSON、對象等類型的參數向指定的交換器發送數據。而「消費者」方法是從指定交換器中進行接收綁定,但接收到的數據處理功能則交給了「consumer」類(由於在實際項目中,不一樣的數據有不一樣的業務處理邏輯,因此這裏咱們直接就經過IConsumer接口交給具體的實現類去作了)。那麼,下面咱們再來看看消費者裏傳遞進來的「consumer」類的代碼吧:
1 using MassTransit; 2 using System; 3 using System.Collections.Generic; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace Lezhima.Storage.Consumer 8 { 9 /// <summary> 10 /// 從MQ接收並處理數據 11 /// 實現MassTransit的IConsumer接口 12 /// </summary> 13 public class LogConsumer : IConsumer<ActionLog> 14 { 15 /// <summary> 16 /// 實現Consume方法 17 /// 接收並處理數據 18 /// </summary> 19 /// <param name="context"></param> 20 /// <returns></returns> 21 public Task Consume(ConsumeContext<ActionLog> context) 22 { 23 return Task.Run(async () => 24 { 25 //獲取接收到的對象 26 var amsg = context.Message; 27 Console.WriteLine($"Recevied By Consumer:{amsg}"); 28 Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}"); 29 }); 30 } 31 } 32 } 33
調用代碼
一、生產者調用代碼以下:
1 /// <summary> 2 /// 測試MQ生產者 3 /// </summary> 4 /// <returns></returns> 5 [HttpGet] 6 public async Task<MobiResult> AddMessageTest() 7 { 8 //聲明一個實體對象 9 var model = new ActionLog(); 10 model.ActionLogId = Guid.NewGuid(); 11 model.CreateTime = DateTime.Now; 12 model.UpdateTime = DateTime.Now; 13 //調用MQ 14 await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model); 15 16 return new MobiResult(1000, "操做成功"); 17 }
二、消費者調用代碼以下:
1 using Lezhima.Storage.Consumer; 2 using Microsoft.Extensions.Configuration; 3 using System; 4 using System.IO; 5 6 namespace Lezhima.Storage 7 { 8 class Program 9 { 10 static void Main(string[] args) 11 { 12 var conf = new ConfigurationBuilder() 13 .SetBasePath(Directory.GetCurrentDirectory()) 14 .AddJsonFile("appsettings.json", true, true) 15 .Build(); 16 17 //調用接收者 18 RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange, 19 new LogConsumer() 20 ); 21 22 Console.ReadLine(); 23 } 24 } 25 } 26
總結
一、基於MassTransit庫使得咱們使用RabbitMQ變得更簡潔、方便。而基於再次封裝後,生產者與消費者將不須要關注具體的業務,也跟業務代碼解耦了,更能適應項目的須要。
二、RabbitMQ的交換器需在其管理後臺自行建立,而這裏使用的fanout類型是由於其發送速度最快,且能知足個人項目須要,各位可視自身狀況選用不一樣的類型。fanout類型不會存儲消息,必須要消費者綁定交換器後纔會發送給消費者。
聲明
本文爲做者原創,轉載請備註出處與保留原文地址,謝謝。如文章能給您帶來幫助,請點下推薦或關注,感謝您的支持!