從前面文章能夠看出,消息總線是EDA(事件驅動架構)與微服務架構的核心部件,沒有消息總線,就沒法很好的實現微服務之間的解耦與通信。一般咱們能夠利用現有成熟的消息代理產品或雲平臺提供的消息服務來構建本身的消息總線;也能夠本身徹底寫一個消息代理產品,而後基於它構建本身的消息總線。一般咱們不用重複造輪子(除非公司有特殊的要求,好比一些大型互聯網公司考慮到自主可控的白盒子),能夠利用好比像RabbitMq這樣成熟的消息代理產品做爲消息總線的底層支持。json
RabbitMq核心組件解釋:服務器
Connection:消息的發送方或訂閱方經過它鏈接到RabbitMq服務器。微信
Channel:消息的發送方或訂閱方經過Connection鏈接到RabbitMq服務器後,經過Channel創建會話通道。架構
Exchange:消息的發送方向Exchange發送消息,經過RabbitMq服務器中Exchange與Queue的綁定關係,Exchange會將消息路由到匹配的Queue中。async
Queue:消息的承載者,消息的發送者的消息最終經過Exchange路由到匹配的Queue,消息的接收者從Queue接收消息並進行處理。ide
Exchange模式:在消息發送到Exchange時,須要路由到匹配的Queue中,至於如何路由,則是由Exchange模式決定的。函數
1.Direct模式:特定的路由鍵(消息類型)轉發到該Exchange的指定Queue中。微服務
2.Fanout模式:發送到該Exchange的消息,被同時發送到Exchange下綁定的全部Queue中。性能
3.Topic模式:具備某種特徵的消息轉發到該Exchange的指定Queue中。this
咱們最多見的使用是Direct模式,若是消息要被多個消費者消費,則可使用Fanout模式。
實現基於RabbitMq的消息總線:
咱們首先須要安裝Erlang與RabbitMq到服務器上,而後就能夠進行基於RabbitMq的消息總線的開發了,開發的整體思路與步驟以下:
1.首先創建一個項目做爲消息總線,而後引入Rabbitmq.Client 這個nuget包,這樣就有了RabbitMq開發的支持。
2.前面實現了基本的消息總線,全部基於RabbitMq的消息總線是從它繼承下來的,並須要傳入特定的參數到消息總線的構造函數中:
public RabbitMqEB(IConnectionFactory connectionFactory,IEventHandlerExecutionContext context, string exchangeName,string exchangeType,string queueName,int publisherorconsumer, bool autoAck = true) : base(context) { this.connectionFactory = connectionFactory; this.connection = this.connectionFactory.CreateConnection(); this.exchangeName = exchangeName; this.exchangeType = exchangeType; this.autoAck = autoAck; this.queueName = queueName; if (publisherorconsumer == 2) { this.channel = CreateComsumerChannel(); } }
connectionFactory:RabbitMq.Client中的類型,用於與RabbitMq服務器創建鏈接時須要使用的對象。
context:消息與消息處理器之間的關聯關係的對象。
exchangeName:生產者或消費者須要鏈接到的Exchange的名字。
exchangeType:前面所描述的Exchange模式。
queueName:生產者或消費者發送或接收消息時的Queue的名字。
publisherorconsumer:指定鏈接到消息總線的組件是消息總線的生產者仍是消費者,消費者和生產者會有不一樣,消費者(publisherorconsumer==2)會構建一個消費通道,用於從Queue接收消息並調用父類的ieventHandlerExecutionContext的HandleAsync方法來處理消息。
3.創建到RabbitMq的鏈接:
//判斷是否已經創建了鏈接 public bool IsConnected { get { return this.connection != null && this.connection.IsOpen; } }
public bool TryConnect() { //出現鏈接異常時的重試策略,一般經過第三方nuget包實現重試功能,這裏出現鏈接異常時,每一個1秒重試一次,共重試5次 var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>() .WaitAndRetry(5, p => TimeSpan.FromSeconds(1),(ex,time)=> { //記錄錯誤日誌 }); policy.Execute(() => { //創建RabbitMq Server的鏈接 this.connection = this.connectionFactory.CreateConnection(); }); if (IsConnected) { return true; } return false; }
4.建立消費者通道:
private IModel CreateComsumerChannel() { if (!IsConnected) { TryConnect(); } var channel = this.connection.CreateModel(); channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); //消費者接收到消息的處理 consumer.Received += async (model, ea) => { var eventbody = ea.Body; var json = Encoding.UTF8.GetString(eventbody); var @event = (IEvent)JsonConvert.DeserializeObject(json); //調用關聯對象中消息對應的處理器的處理方法 await this.eventHandlerExecutionContext.HandleAsync(@event); //向會話通道確認此消息已被處理 channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: this.queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { this.channel.Dispose(); this.channel = CreateComsumerChannel(); }; return channel; }
5.對生產者發佈消息到交換機隊列的支持:
public override void Publish<TEvent>(TEvent @event) { if (!IsConnected) { TryConnect(); } using(var channel = this.connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); //發佈到交換機,根據交換機與隊列的綁定以及交換機模式,最終發佈到指定的隊列中 channel.BasicPublish(this.exchangeName, @event.GetType().FullName,null, body); } }
6.對訂閱者從交換機隊列中訂閱消息的支持:
public override void Subscribe<TEvent, TEventHandler>() { //註冊接收到的消息類型到訂閱方的處理器之間的關係 if (!this.eventHandlerExecutionContext.IsRegisterEventHandler < TEvent,TEventHandler>()){ this.eventHandlerExecutionContext.RegisterEventHandler<TEvent, TEventHandler>(); //消費者進行隊列綁定 this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName); } }
從上面的6個步驟,咱們基本上就完成了基於RabbitMq消息總線的基本功能,這裏須要說明的是,上述代碼只是演示,在實際生產環境中,不能直接使用以上代碼,還須要當心的重構此代碼以保證可靠性與性能。
QQ討論羣:309287205
微服務實戰視頻請關注微信公衆號: