如何優雅的使用RabbitMQ

目錄git

RabbitMQ無疑是目前最流行的消息隊列之一,對各類語言環境的支持也很豐富,做爲一個.NET developer有必要學習和了解這一工具。消息隊列的使用場景大概有3種:github

一、系統集成,分佈式系統的設計。各類子系統經過消息來對接,這種解決方案也逐步發展成一種架構風格,即「經過消息傳遞的架構」。數據庫

二、當系統中的同步處理方式嚴重影響了吞吐量,好比日誌記錄。假如須要記錄系統中全部的用戶行爲日誌,若是經過同步的方式記錄日誌勢必會影響系統的響應速度,當咱們將日誌消息發送到消息隊列,記錄日誌的子系統就會經過異步的方式去消費日誌消息。服務器

三、系統的高可用性,好比電商的秒殺場景。當某一時刻應用服務器或數據庫服務器收到大量請求,將會出現系統宕機。若是可以將請求轉發到消息隊列,再由服務器去消費這些消息將會使得請求變得平穩,提升系統的可用性。架構

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。異步

1、開始使用RabbitMQasync

RabbitMQ官網提供了詳細的安裝步驟,另外官網還提供了RabbitMQ在六種場景的使用教程。其中教程一、三、6將覆蓋99%的使用場景,因此正常來講只須要搞清楚這3個教程便可快速上手。分佈式

2、簡單分析函數

咱們以官方提供的教程1作個簡單梳理:該教程展現了Producer如何向一個消息隊列(message queue)發送一個消息(message),消息消費者(Consumer)收到該消息後消費該消息。微服務

一、producer端:

 

var factory = new ConnectionFactory() { HostName = "localhost" };
 using (var connection = factory.CreateConnection())
 {
 while (Console.ReadLine() != null)
 {
 using (var channel = connection.CreateModel())
 {
 //建立一個名叫"hello"的消息隊列
 channel.QueueDeclare(queue: "hello",
 durable: false,
 exclusive: false,
 autoDelete: false,
 arguments: null);
 
 var message = "Hello World!";
 var body = Encoding.UTF8.GetBytes(message);
 
 //向該消息隊列發送消息message
 channel.BasicPublish(exchange: "",
 routingKey: "hello",
 basicProperties: null,
 body: body);
 Console.WriteLine(" [x] Sent {0}", message);
 }
 }
 }

該段代碼很是簡單,幾乎到了沒法精簡的地步:建立了一個信道(channel)->建立一個隊列->向該隊列發送消息。

二、Consumer端

 

var factory = new ConnectionFactory() { HostName = "localhost" };
 using (var connection = factory.CreateConnection())
 {
 using (var channel = connection.CreateModel())
 {
 //建立一個名爲"hello"的隊列,防止producer端沒有建立該隊列
 channel.QueueDeclare(queue: "hello",
 durable: false,
 exclusive: false,
 autoDelete: false,
 arguments: null);
 
 //回調,當consumer收到消息後會執行該函數
 var consumer = new EventingBasicConsumer(channel);
 consumer.Received += (model, ea) =>
 {
 var body = ea.Body;
 var message = Encoding.UTF8.GetString(body);
 Console.WriteLine(" [x] Received {0}", message);
 };
 
 //消費隊列"hello"中的消息
 channel.BasicConsume(queue: "hello",
 noAck: true,
 consumer: consumer);
 
 Console.WriteLine(" Press [enter] to exit.");
 Console.ReadLine();
 }
 }

該段代碼能夠理解爲:建立信道->建立隊列->定義回調函數->消費消息。

該實例描述了Send/Receive模式,能夠簡單理解爲1(producer) VS 1(consumer)的場景;

如何優雅的使用RabbitMQ

 

實例3則描述了Publish/Subscriber模式,即1(producer) VS 多個(consumer);

如何優雅的使用RabbitMQ

 

在以上兩個示例中,producer只須要發送消息便可,並不關心consumer的返回結果。實例6則描述了一個RPC調用場景,producer發送消息後還要接收consumer的返回結果,這一場景看起來跟使用消息隊列的目的有點相悖。由於使用消息隊列的目的之一就是要異步,可是這一場景彷佛又將異步變成了同步,不過這一場景也頗有用,好比一個用戶操做產生了一個消息,應用服務收到該消息後執行了一些邏輯並使得數據庫發生了變化,UI會一直等待應用服務的返回結果才刷新頁面。

如何優雅的使用RabbitMQ

 

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

3、 發現抽象

我桌子上放着一本RabbitMQ in Action,另外官網提供的文檔也很詳細,我感受在一個月內我就能精通RabbitMQ,到時候簡歷上又能夠寫上「精通…」,感受有點小得意呢... ,可是我知道這並非使用RabbitMQ的最佳方式。

咱們知道合理的抽象能夠幫咱們隱藏掉一些技術細節,讓咱們將重心放在覈心業務上,好比一我的問你:「大雁塔如何走?」你的回答多是「小寨往東,一直走兩站,右手邊」,若是你回答:「右轉45度,向前走100米,再轉90度…」,對方就會迷失在這些細節中。

消息隊列的使用過程當中實際隱藏着一種抽象——服務總線(Service Bus)。

咱們在回頭看第一個例子,這個例子隱含的業務是:ClientA發送一個指令,ClientB收到該指令後作出反應。若是是這樣,咱們爲何要關心如何建立channel,如何建立一個queue? 我僅僅是要發送一個消息而已。另外這個例子寫的其實不夠健壯:

沒有重試機制:若是ClientB第一次沒有執行成功如何對該消息處理?

沒有錯誤處理機制:若是ClientB在重試了N次以後仍是異常如何處理該消息?

沒有熔斷機制;

如何對ClientA作一個schedule(計劃安排),好比定時發送等;

沒有消息審計機制;

沒法對消息的各個狀態作追蹤;

事物處理等。

服務總線正是這種場景的抽象,而且爲咱們提供了這些機制,讓咱們趕快來看個究竟吧。

4、初識MassTransit

MassTransit是.NET平臺下的一款開源免費的ESB產品,官網:http://masstransit-project.com/,GitHub 700 star,500 Fork,相似的產品還有NServiceBus,之因此要選用MassTransit是由於他要比NServiceBus輕量級,另外在MassTransit開發之初就選用了RabbitMQ做爲消息傳輸組建;同時我想拿他跟NServiceBus作個比較,看看他們到底有哪些側重點。

一、新建控制檯應用程序:Masstransit.RabbitMQ.GreetingClient

使用MassTransit能夠從Nuget中安裝:

Install-Package MassTransit.RabbitMQ

二、建立服務總線,發送一個命令

static void Main(string[] args)
{
 Console.WriteLine("Press 'Enter' to send a message.To exit, Ctrl + C");
 
 var bus = BusCreator.CreateBus();
 var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}");
 
 while (Console.ReadLine()!=null)
 {
 Task.Run(() => SendCommand(bus, sendToUri)).Wait();
 }
 
 Console.ReadLine();
}
 
private static async void SendCommand(IBusControl bus,Uri sendToUri)
{
 var endPoint =await bus.GetSendEndpoint(sendToUri);
 var command = new GreetingCommand()
 {
 Id = Guid.NewGuid(),
 DateTime = DateTime.Now
 };
 
 await endPoint.Send(command);
 
 Console.WriteLine($"send command:id={command.Id},{command.DateTime}"); 
}

這一段代碼隱藏了衆多關於消息隊列的細節,將咱們的注意力集中在發送消息上,同時ServiceBus提供的API也更接近業務,咱們雖然發送的是一個消息,可是在這種場景下體現出來是一個命令,Send(command)這一API描述了咱們的意圖。

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

三、服務端接收這一命令

新建一個命令臺控制程序:Masstransit.RabbitMQ.GreetingServer

 

var bus = BusCreator.CreateBus((cfg, host) =>
{
 cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingQueue, e =>
 {
 e.Consumer<GreetingConsumer>();
 
 });
});

這一代碼能夠理解爲服務端在監聽消息,咱們在服務端註冊了一個名爲「GreetingConsumer」的消費者,GreetingConsumer的定義:

 

public class GreetingConsumer :IConsumer<GreetingCommand>
{
 public async Task Consume(ConsumeContext<GreetingCommand> context)
 {
 
 await Console.Out.WriteLineAsync($"receive greeting commmand: {context.Message.Id},{context.Message.DateTime}");
 }
}

該consumer能夠消費類型爲GreetingCommand的消息。這一實例幾乎隱藏了有關RabbitMQ的技術細節,將代碼中心放在了業務中,將這兩個控制檯應用跑起來試試:

如何優雅的使用RabbitMQ

 

5、實現Publish/Subscribe模式

發佈/訂閱模式使得基於消息傳遞的軟件架構成爲可能,這一能力表現爲ClientA發送消息X,ClientB和ClientC均可以訂閱消息X。

一、咱們在上面的例子中改造一下,當GreetingConsumer收到GreetingCommand後發送一個GreetingEvent:

 

var greetingEvent = new GreetingEvent()
 {
 Id = context.Message.Id,
 DateTime = DateTime.Now
 };
 
 await context.Publish(greetingEvent);

二、新建控制檯程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA用來訂閱GreetingEvent消息:

 

var bus = BusCreator.CreateBus((cfg, host) =>
 {
 cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingEventSubscriberAQueue, e =>
 {
 e.Consumer<GreetingEventConsumer>();
 });
 });
 
 bus.Start();

定義GreetingEventConsumer:

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

public class GreetingEventConsumer:IConsumer<Greeting.Message.GreetingEvent>
 {
 public async Task Consume(ConsumeContext<Greeting.Message.GreetingEvent> context)
 {
 await Console.Out.WriteLineAsync($"receive greeting event: id {context.Message.Id}");
 }
 }

這一代碼跟Masstransit.RabbitMQ.GreetingServer接受一個命令幾乎如出一轍,惟一的區別在於:

在Send/Receive模式中Client首先要得到對方(Server)的終結點(endpoint),直接向該終結點發送命令。Server方監聽本身的終結點並消費命令。

而Publish/Subscribe模式中Client publish一個事件,SubscriberA在本身的終結點(endpointA)監聽事件,SubscriberB在本身的終結點(endpointB)監聽事件。

三、根據上面的分析再定義一個Masstransit.RabbitMQ.GreetingEvent.SubscriberB

四、將4個控制檯應用程序跑起來看看

如何優雅的使用RabbitMQ

 

6、實現RPC模式

這一模式在Masstransit中被稱做Request/Response模式,經過IRequestClient<IRequest, IResponse> 接口來實現相關操做。一個相關的例子在官方的github。

結束語:本篇文章分析瞭如何使用Masstransit來抽象業務,避免直接使用具體的消息隊列,固然本文提到的衆多服務總線機制,如「重試、熔斷等」並無在該文中出現,須要你們進一步去了解該項目。

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

經過對Masstransit的一些試用和NServiceBus的對比,Masstransit在實際項目中很容易上手而且免費,各類API定義的也很是清晰,可是官方的文檔有點過於簡單,實際使用中還須要去作深刻的研究。做爲.NET平臺下爲數很少的ESB開源產品,其關注程度仍是不夠,期待你們爲開源項目作出貢獻。

相關文章
相關標籤/搜索