如何使用RabbitMQ實現事件總線

1 前置閱讀

在閱讀本文章以前,你能夠先閱讀:json

  • RabbitMQ入門
  • 什麼是觀察者模式
  • 什麼是事件總線

2 實現

首先,事件源與事件處理的映射字典。服務器

private static Dictionary<string, List<object>> eventHandlers = new Dictionary<string, List<object>>();

而後,初始化RabbitMQ,建立到服務器的鏈接,建立一個通道等this

public RabbitMQEventBus(IConnectionFactory connectionFactory,
    string exchangeName,
    string exchangeType = ExchangeType.Fanout,
    string queueName = null,
    bool autoAck = false)
{
    this.connectionFactory = connectionFactory;
    this.connection = this.connectionFactory.CreateConnection();
    this.channel = this.connection.CreateModel();
    this.exchangeType = exchangeType;
    this.exchangeName = exchangeName;
    this.autoAck = autoAck;

    this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType);

    this.queueName = this.InitializeEventConsumer(queueName);
}

接着,實現訂閱,往字典表中添加事件處理實例,並綁定隊列spa

public void Subscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : IEvent
{
    var eventTypeName = typeof(TEvent).FullName;
    if (eventHandlers.ContainsKey(eventTypeName))
    {
        var handlers = eventHandlers[eventTypeName];
        handlers.Add(eventHandler);
    }
    else
    {
        eventHandlers.Add(eventTypeName, new List<object> { eventHandler });
    }
    this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
}

接着,實現取消訂閱,從字典表中刪除事件處理實例,並取消綁定隊列code

public void Unsubscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : IEvent
{

    var eventType = typeof(TEvent).FullName;
    if (eventHandlers.ContainsKey(eventType))
    {
        var handlers = eventHandlers[eventType];
        if (handlers != null && handlers.Exists(s => s.GetType() == eventHandler.GetType()))
        {
            var handlerToRemove = handlers.First(s => s.GetType() == eventHandler.GetType());
            handlers.Remove(handlerToRemove);

            this.channel.QueueUnbind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
        }
    }
}

接着,實現發佈,往隊列發佈事件隊列

public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
    var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
    var eventBody = Encoding.UTF8.GetBytes(json);
    channel.BasicPublish(this.exchangeName,
        @event.GetType().FullName,
        null,
        eventBody);
}

接着,在EventingBasicConsumer.Received事件處理中,經過事件源找到對應的事件處理類,並執行它事件

private string InitializeEventConsumer(string queue)
{
    var localQueueName = queue;
    if (string.IsNullOrEmpty(localQueueName))
    {
        localQueueName = this.channel.QueueDeclare().QueueName;
    }
    else
    {
        this.channel.QueueDeclare(localQueueName, true, false, false, null);
    }

    var consumer = new EventingBasicConsumer(this.channel);
    consumer.Received += (model, eventArgument) =>
    {
        var eventBody = eventArgument.Body.ToArray();
        var json = Encoding.UTF8.GetString(eventBody);
        var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
        var eventTypeName = eventArgument.RoutingKey;

        if (eventHandlers.ContainsKey(eventTypeName))
        {
            var handlers = eventHandlers[eventTypeName];
            try
            {
                foreach (var handler in handlers)
                {
                    MethodInfo meth = handler.GetType().GetMethod("Handle");
                    meth.Invoke(handler, new Object[] { @event });
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        if (!autoAck)
        {
            channel.BasicAck(eventArgument.DeliveryTag, false);
        }
    };

    this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer);

    return localQueueName;
}

最後,建立客戶端類,具體事件源類,具體事件處理類。get

using Example.EventBus;
using RabbitMQ.Client;
using System;

namespace Eaxmple.EventBus.RabbitMQ.ConsoleApp01
{
    public class SendedEvent : IEvent
    {
        public string Name { get; private set; }
        public SendedEvent(string name)
        {
            Name = name;
        }
    }

    public class CustomerASendedEventHandler : IEventHandler<SendedEvent>
    {
        public void Handle(SendedEvent @event)
        {
            Console.WriteLine($"顧客A收到{@event.Name}通知!");
        }
    }

    public class CustomerBSendedEventHandler : IEventHandler<SendedEvent>
    {
        public void Handle(SendedEvent @event)
        {
            Console.WriteLine($"顧客B收到{@event.Name}通知!");
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var connectionFactory = new ConnectionFactory { HostName = "localhost" };
            var eventBus = new RabbitMQEventBus(connectionFactory, "Eaxmple.EventBus.RabbitMQ.ConsoleApp01.Exchange", queueName: "Eaxmple.EventBus.RabbitMQ.ConsoleApp01.Queue");

            var sendedEvent = new SendedEvent("優惠");

            var customerASendedEventHandler = new CustomerASendedEventHandler();
            eventBus.Subscribe<SendedEvent>(customerASendedEventHandler);
            var customerBSendedEventHandler = new CustomerBSendedEventHandler();
            eventBus.Subscribe<SendedEvent>(customerBSendedEventHandler);
            Console.WriteLine($"商店發了{sendedEvent.Name}通知!");
            eventBus.Publish<SendedEvent>(sendedEvent);

            Console.ReadKey();
        }
    }
}

讓咱們來看看輸出結果:string

商店發佈優惠通知!
顧客A收到優惠通知。
顧客B收到優惠通知。
相關文章
相關標籤/搜索