許多業務流程都要求將事件安排在將來的某個日期。 例如,在與客戶進行初次銷售聯繫以後,咱們可能但願在未來某個時間安排後續電話。 EasyNetQ能夠經過其將來發布功能幫助您實現此功能。 例如,咱們在這裏使用FuturePublish擴展方法來安排將來一個月的後續銷售電話。 請注意,FuturePublish使用UTC時間。git
var followUpCallMessage = new FollowUpCallMessage( .. ); bus.FuturePublish(DateTime.UtcNow.AddMonths(3), followUpCallMessage);
從如今開始三個月後,EasyNetQ將發佈該消息,而且FollowUpCallMessage的任何訂戶都將收到原始消息的副本。github
FuturePublish要求EasyNetQ.Scheduler服務正在運行。sql
1,它是如何工做的?數據庫
當您調用bus.FuturePublish(publishDate,message)時,EasyNetQ將您的消息包裝在系統消息「ScheduleMe」中並將其發佈到RabbitMQ。 調度程序服務訂閱此消息。 當它收到一個ScheduleMe消息時,它將其存儲在其本地數據庫中。 調度程序服務在其數據庫中查找調度日期到期的消息,當它發現任何到期的消息時,它將原始消息從ScheduleMe消息解開併發布到總線。服務器
2,安裝調度程序服務併發
在SQL Server中,建立一個新的數據庫EasyNetQ.Scheduler編輯器
獲取EasyNetQ的源代碼性能
git clone git@github.com:mikehadlow/EasyNetQ.gitspa
在Visual Studio中打開EasyNetQ.2012解決方案。 在文件夾DatabaseScripts - > EasyNetQ.Scheduler中,您能夠找到許多SQL腳本。 在EasyNetQ.Scheduler數據庫中打開並運行它們。 您須要首先運行CreateWorkTables.sql,其餘則是存儲過程腳本,而且能夠按任意順序運行。插件
構建解決方案。
找到\ Source \ EasyNetQ.Scheduler \ bin \ Debug並將內容複製到您選擇的部署文件夾。
在文本編輯器中打開EasyNetQ.Scheduler.exe.config,並將「rabbit」和「scheduleDb」鏈接字符串分別指向您的RabbitMQ代理和SQL Server實例。
打開控制檯窗口並將路徑更改成部署EasyNetQ.Scheduler的文件夾。
運行如下命令將EasyNetQ.Scheduler安裝爲Windows服務:
EasyNetQ.Scheduler.exe install
Configuration Result: [Success] Name EasyNetQ.Scheduler [Success] ServiceName EasyNetQ.Scheduler Topshelf v3.1.106.0, .NET Framework v4.0.30319.18051
Running a transacted installation.
Beginning the Install phase of the installation. Installing EasyNetQ.Scheduler service Installing service EasyNetQ.Scheduler... Service EasyNetQ.Scheduler has been successfully installed. Creating EventLog source EasyNetQ.Scheduler in log Application...
The Install phase completed successfully, and the Commit phase is beginning.
The Commit phase completed successfully.
The transacted install has completed.
3,支持延遲消息插件
RabbitMQ延遲消息插件將新的交換類型添加到RabbitMQ,從而容許延遲消息傳遞。
EasyNetQ經過定義新的調度程序類型來提供對使用該交換的支持:DelayedExchangeScheduler。
這容許您像之前同樣使用相同的Future Publish接口,但取消將來的消息除外。 因爲延遲消息插件不支持消息取消,所以不管什麼時候調用FuturePublish指定cancelKey,或者調用CancelFuturePublish時,調度程序都會拋出NotImplementedException異常。
如下示例顯示瞭如何發佈將在三個月後交付的消息:
bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IScheduler, DelayedExchangeScheduler>()); var followUpCallMessage = new FollowUpCallMessage( .. ); bus.FuturePublish(DateTime.UtcNow.AddMonths(3), followUpCallMessage);
第一行指示EasyNetQ使用支持延遲消息交換的新調度程序。 接下來,該消息將建立併發布,交付時間設置爲三個月。 請注意,FuturePublish使用UTC時間。
DelayedExchangeScheduler須要安裝Delayed Message Plugin。
①插件安裝
延遲消息插件能夠在社區插件頁面上找到。 爲您的RabbitMQ安裝下載相應的.ez文件,將其複製到RabbitMQ的插件文件夾中,而後經過運行如下命令啓用它:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
該插件須要RabbitMQ 3.4或更新版本。
②怎麼運行的
當您調用bus.FuturePublish(...)時,EasyNetQ會自動建立新的x延遲消息交換以及正常交換並將它們綁定在一塊兒。 該消息在延遲交換中發佈,該交換將存儲該消息,直到交付它爲止。 此時,該消息被路由到正常交換並從那裏到綁定隊列。
當您調用Publish(...)方法時,消息將發佈到正常交換,從而防止與使用新的x延遲消息交換相關的任何性能降低。
延遲交換持續使用Mnesia的消息。 這能夠防止在服務器停機時丟失信息。 一旦服務器恢復,全部符合條件的消息都將按計劃交付。