在閱讀本文章以前,你能夠先閱讀:json
首先,事件源與事件處理的映射字典。服務器
private static Dictionary<string, List<object>> eventHandlers = new Dictionary<string, List<object>>();
而後,初始化RabbitMQ,建立到服務器的鏈接,建立一個通道等this
public RabbitMQEventBus(IConnectionFactory connectionFactory, string exchangeName, string exchangeType = ExchangeType.Fanout, string queueName = null, bool autoAck = false) { this.connectionFactory = connectionFactory; this.connection = this.connectionFactory.CreateConnection(); this.channel = this.connection.CreateModel(); this.exchangeType = exchangeType; this.exchangeName = exchangeName; this.autoAck = autoAck; this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType); this.queueName = this.InitializeEventConsumer(queueName); }
接着,實現訂閱,往字典表中添加事件處理實例,並綁定隊列spa
public void Subscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : IEvent { var eventTypeName = typeof(TEvent).FullName; if (eventHandlers.ContainsKey(eventTypeName)) { var handlers = eventHandlers[eventTypeName]; handlers.Add(eventHandler); } else { eventHandlers.Add(eventTypeName, new List<object> { eventHandler }); } this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName); }
接着,實現取消訂閱,從字典表中刪除事件處理實例,並取消綁定隊列code
public void Unsubscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : IEvent { var eventType = typeof(TEvent).FullName; if (eventHandlers.ContainsKey(eventType)) { var handlers = eventHandlers[eventType]; if (handlers != null && handlers.Exists(s => s.GetType() == eventHandler.GetType())) { var handlerToRemove = handlers.First(s => s.GetType() == eventHandler.GetType()); handlers.Remove(handlerToRemove); this.channel.QueueUnbind(this.queueName, this.exchangeName, typeof(TEvent).FullName); } } }
接着,實現發佈,往隊列發佈事件隊列
public void Publish<TEvent>(TEvent @event) where TEvent : IEvent { var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); var eventBody = Encoding.UTF8.GetBytes(json); channel.BasicPublish(this.exchangeName, @event.GetType().FullName, null, eventBody); }
接着,在EventingBasicConsumer.Received事件處理中,經過事件源找到對應的事件處理類,並執行它事件
private string InitializeEventConsumer(string queue) { var localQueueName = queue; if (string.IsNullOrEmpty(localQueueName)) { localQueueName = this.channel.QueueDeclare().QueueName; } else { this.channel.QueueDeclare(localQueueName, true, false, false, null); } var consumer = new EventingBasicConsumer(this.channel); consumer.Received += (model, eventArgument) => { var eventBody = eventArgument.Body.ToArray(); var json = Encoding.UTF8.GetString(eventBody); var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); var eventTypeName = eventArgument.RoutingKey; if (eventHandlers.ContainsKey(eventTypeName)) { var handlers = eventHandlers[eventTypeName]; try { foreach (var handler in handlers) { MethodInfo meth = handler.GetType().GetMethod("Handle"); meth.Invoke(handler, new Object[] { @event }); } } catch (Exception ex) { throw ex; } } if (!autoAck) { channel.BasicAck(eventArgument.DeliveryTag, false); } }; this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer); return localQueueName; }
最後,建立客戶端類,具體事件源類,具體事件處理類。get
using Example.EventBus; using RabbitMQ.Client; using System; namespace Eaxmple.EventBus.RabbitMQ.ConsoleApp01 { public class SendedEvent : IEvent { public string Name { get; private set; } public SendedEvent(string name) { Name = name; } } public class CustomerASendedEventHandler : IEventHandler<SendedEvent> { public void Handle(SendedEvent @event) { Console.WriteLine($"顧客A收到{@event.Name}通知!"); } } public class CustomerBSendedEventHandler : IEventHandler<SendedEvent> { public void Handle(SendedEvent @event) { Console.WriteLine($"顧客B收到{@event.Name}通知!"); } } class Program { static void Main(string[] args) { var connectionFactory = new ConnectionFactory { HostName = "localhost" }; var eventBus = new RabbitMQEventBus(connectionFactory, "Eaxmple.EventBus.RabbitMQ.ConsoleApp01.Exchange", queueName: "Eaxmple.EventBus.RabbitMQ.ConsoleApp01.Queue"); var sendedEvent = new SendedEvent("優惠"); var customerASendedEventHandler = new CustomerASendedEventHandler(); eventBus.Subscribe<SendedEvent>(customerASendedEventHandler); var customerBSendedEventHandler = new CustomerBSendedEventHandler(); eventBus.Subscribe<SendedEvent>(customerBSendedEventHandler); Console.WriteLine($"商店發了{sendedEvent.Name}通知!"); eventBus.Publish<SendedEvent>(sendedEvent); Console.ReadKey(); } } }
讓咱們來看看輸出結果:string
商店發佈優惠通知! 顧客A收到優惠通知。 顧客B收到優惠通知。