Exceptionsession
public class SubmitOrderConsumer : IConsumer<SubmitOrder> { public Task Consume(ConsumeContext<SubmitOrder> context) { throw new Exception("Very bad things happened"); } }
UseMessageRetryapp
var sessionFactory = CreateSessionFactory(); var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.Host("rabbitmq://localhost/"); cfg.ReceiveEndpoint("submit-order", e => { e.UseMessageRetry(r => r.Immediate(5)); e.Consumer(() => new SubmitOrderConsumer(sessionFactory)); }); });
// 當即重試:一共連續重試10次 ep.UseMessageRetry(r => r.Immediate(10)); // 間隔重試:一共重試10次,每次間隔10秒 ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10))); // 多個間隔重試:5秒後第一次,5+10秒後第二次,5+10+15秒後第三次 ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15))); // 指數級間隔重試:共10次,每次間隔:當前重試次數 * 60秒 ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60))); // 每次疊加50秒 ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));
e.UseMessageRetry(r => { r.Handle<ArgumentNullException>(); r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException)); r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal"); });
過濾某些異常類型不進行重試async
cfg.ReceiveEndpoint("submit-order", e => { e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); e.UseMessageRetry(r => r.Immediate(5)); e.Consumer(() => new SubmitOrderConsumer(sessionFactory)); });
消息衝隊列移除以後,在必定時間以後從新投入消息隊列。須要配置調度模塊(scheduling)ui
cfg.ReceiveEndpoint("submit-order", e => { e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); e.UseMessageRetry(r => r.Immediate(5)); e.UseInMemoryOutbox(); e.Consumer(() => new SubmitOrderConsumer(sessionFactory)); });
有些消息是在 consume 方法中發送或發佈的,若是在發送以後 consume 中產生了異常,那原來發出去的消息就須要撤回,若是使用信箱以後,在 consume 中要發佈/發送的消息就會先暫存在內存中直到 consume 方法成功以後才真正發出去3d
public interface Fault<T> where T : class { Guid FaultId { get; } Guid? FaultedMessageId { get; } DateTime Timestamp { get; } ExceptionInfo[] Exceptions { get; } HostInfo Host { get; } T Message { get; } }
Fault
public class DashboardFaultConsumer : IConsumer<Fault<SubmitOrder>> { public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context) { // update the dashboard } }
Fault
cfg.ReceiveEndpoint("input-queue", ec => { ec.DiscardFaultedMessages(); });
默認狀況下錯誤的消息會被投遞到了 _error 隊列,能夠配置直接拋棄錯誤信息rabbitmq
cfg.ReceiveEndpoint("input-queue", ec => { ec.DiscardSkippedMessages(); });
死信隊列:沒有消費者的消息會被移到 _skipped 隊列,但能夠配置爲不移到 _skipped 隊列隊列
本做品採用知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議進行許可。事件
歡迎轉載、使用、從新發布,但務必保留文章署名 鄭子銘 (包含連接: http://www.cnblogs.com/MingsonZheng/ ),不得用於商業目的,基於本文修改後的做品務必以相同的許可發佈。
若有任何疑問,請與我聯繫 (MingsonZheng@outlook.com) 。