12,EasyNetQ-自動訂閱

EasyNetQ自v0.7.1.30附帶一個簡單的AutoSubscriber。 您可使用它輕鬆掃描實現接口IConsume <T>或IConsumeAsync <T>的類的特定程序集,而後讓自動訂戶將這些使用者訂閱到您的總線。 IConsume <T>的實現將使用總線Subscribe方法,而IConsumeAsync <T>的實現將使用總線SubscribeAsync方法,請參閱Subscribe以瞭解詳細信息。 你固然可讓你的消費者處理多個消息。 咱們來看看一些樣品。服務器

注意:從版本0.13.0開始,全部AutoSubscriber類都位於EasyNetQ.AutoSubscribe命名空間中,所以請添加如下using語句: using EasyNetQ.AutoSubscribe; app

讓咱們定義一個簡單的消費者,處理三條消息:MessageA, MessageB and MessageC.函數

public class MyConsumer : IConsume<MessageA>, IConsume<MessageB>, IConsumeAsync<MessageC>
{
    public void Consume(MessageA message) {...}

    public void Consume(MessageB message) {...}

    public Task Consume(MessageC message) {...}
}

首先建立AutoSubscriber的新實例,將IBus實例和subscriptionId前綴傳遞給構造函數。 subscriptionId前綴的前綴是全部自動生成的subscriptionIds,但不是自定義subscriptionIds(請參見下文)。fetch

爲了註冊這個以及同一個Assembly中的全部其餘使用者,咱們只須要將包含您的使用者的程序集傳遞給:AutoSubscriber.Subscribe(assembly)。 注意! 這是你應該只作一次,最好在應用程序啓動時。this

var subscriber = new AutoSubscriber(bus, "my_applications_subscriptionId_prefix");
subscriber.Subscribe(Assembly.GetExecutingAssembly());

1,經過主題訂閱(s)spa

默認狀況下,AutoSubscriber將綁定無主題。 在下面的示例中,MessageA註冊了兩個主題。插件

注意! 若是你運行沒有ForTopic屬性的代碼,它將有一個「#」的路由鍵,它將接收任何消息類型的訂閱。 假設安裝了默認端口和管理插件,只需訪問http:// localhost:15672 /#/隊列並根據須要解除綁定便可。code

public class MyConsumer : IConsume<MessageA>, IConsume<MessageB>, IConsumeAsync<MessageC>
{
    [ForTopic("Topic.Foo")]
    [ForTopic("Topic.Bar")]
    public void Consume(MessageA message) {...}

    public void Consume(MessageB message) {...}

    public Task Consume(MessageC message) {...}
}

//To publish by topic
var bus = RabbitHutch.CreateBus("host=localhost");

var msg1 = new MessageA(msg1, "Topic.Foo");   //picked up
var msg2 = new MessageA(msg2, "Topic.Bar");   //picked up
var msg3 = new MessageA(msg3);                //not picked up

2,指定一個特定的SubscriptionIdblog

默認狀況下,AutoSubscriber將爲每一個消息/消費者組合生成惟一的SubscriptionId。 這意味着您將啓動同一個使用者的多個實例,而且它們將以循環方式(工做模式)從相同的隊列中讀取。接口

若是您但願修訂訂閱ID,則可使用AutoSubscriberConsumerAttribute修飾Consume方法。 爲何你會修正它,你能夠在這裏閱讀。

比方說,上面的消費者應該有一個固定的SubscriptionId用於MessageB的消費者方法。 剛剛裝飾它併爲SubscriptionId定義一個值。

[AutoSubscriberConsumer(SubscriptionId = "MyExplicitId")]
public void Consume(MessageB message) { }

3,控制SubscriptionId生成

您固然也能夠控制實際的SubscriptionId生成。 只需替換AutoSubscriber.GenerateSubscriptionId:Func <ConsumerInfo,string>。

var subscriber = new AutoSubscriber(bus)
{
    CreateConsumer = t => objectResolver.Resolve(t),
    GenerateSubscriptionId = c => AppDomain.CurrentDomain.FriendlyName + c.ConcreteType.Name
};
subscriber.Subscribe(Assembly.GetExecutingAssembly());

注意! 只是一個示例實現。 確保您已閱讀並理解SubscriptionId值的重要性。

4,控制消費者配置設置

使用自動訂閱服務器訂閱隊列時,能夠設置ISubscriptionConfiguration值,例如AutoDelete,Priority等。

經過在建立AutoSubscriber時設置操做。

var subscriber = new AutoSubscriber(bus)
{    
    ConfigureSubscriptionConfiguration = c => c.WithAutoDelete()
                                               .WithPriority(10)
};
subscriber.Subscribe(Assembly.GetExecutingAssembly());

或者,您能夠將一個屬性應用於使用方法,該方法優先於ConfigureSubscriptionConfiguration操做設置的任何配置值。

public class MyConsumer : IConsume<MessageA>
{
    [SubscriptionConfiguration(CancelOnHaFailover = true, PrefetchCount = 10)]
    public void Consume(MessageA message) {...}
}

5,在AutoSubscriber中使用IoC容器

AutoSubscriber有一個屬性MessageDispatcher,它容許您插入本身的消息分派代碼。 這使您能夠從IoC容器中解析消費者或執行其餘自定義調度時間任務。

咱們編寫一個定製的IAutoSubscriberMessageDispatcher來解析Windsor IoC容器中的消費者

public class WindsorMessageDispatcher : IAutoSubscriberMessageDispatcher
{
    private readonly IWindsorContainer container;

    public WindsorMessageDispatcher(IWindsorContainer container)
    {
        this.container = container;
    }

    public void Dispatch<TMessage, TConsumer>(TMessage message) where TMessage : class where TConsumer : IConsume<TMessage>
    {
        var consumer = container.Resolve<TConsumer>();
        try
        {
            consumer.Consume(message);
        }
        finally
        {
            container.Release(consumer);
        }
    }

    public Task DispatchAsync<TMessage, TConsumer>(TMessage message) where TMessage: class where TConsumer: IConsumeAsync<TMessage>
    {
        var consumer = _container.Resolve<TConsumer>();           
        return consumer.Consume(message).ContinueWith(t=>_container.Release(consumer));            
    }
}

如今咱們須要在咱們的IoC容器中註冊咱們的客戶:

var container = new WindsorContainer();
container.Register(
    Component.For<MyConsumer>().ImplementedBy<MyConsumer>()
    );

接下來使用咱們的自定義IMessageDispatcher設置AutoSubscriber:

var bus = RabbitHutch.CreateBus("host=localhost");

var autoSubscriber = new AutoSubscriber(bus, "My_subscription_id_prefix")
{
    MessageDispatcher = new WindsorMessageDispatcher(container)
};
autoSubscriber.Subscribe(GetType().Assembly);
autoSubscriber.SubscribeAsync(GetType().Assembly);

如今,每次收到消息時,咱們的消費者的新實例都將從咱們的容器中解析出來。

相關文章
相關標籤/搜索