1、簡介html
EQueue是一個參照RocketMQ實現的開源消息隊列中間件,兼容Mono,具體能夠參看做者的文章《分享一個c#寫的開源分佈式消息隊列equeue》。項目開源地址:https://github.com/tangxuehua/equeue,項目中包含了隊列的所有源代碼以及如何使用的示例。git
2、安裝EQueuegithub
Producer、Consumer、Broker支持分佈式部署,安裝EQueue須要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是個類庫,須要本身實現Broker的宿主,能夠參照QuickStart,建立一個QuickStart.BrokerServer項目,經過Visual Studio的Nuget 查找equeuec#
using System; using System.Text; using ECommon.Autofac; using ECommon.Configurations; using ECommon.JsonNet; using ECommon.Log4Net; using EQueue.Broker; using EQueue.Configurations; using EQueue.Protocols; namespace QuickStart.BrokerServer { class Program { static void Main(string[] args) { InitializeEQueue(); var setting = new BrokerSetting(); setting.NotifyWhenMessageArrived = false; setting.DeleteMessageInterval = 1000; new BrokerController(setting).Initialize().Start(); Console.ReadLine(); } static void InitializeEQueue() { Configuration .Create() .UseAutofac() .RegisterCommonComponents() .UseLog4Net() .UseJsonNet() .RegisterEQueueComponents(); } } }
InitializeEQueue方法初始化EQueue的環境,使用了Autofac做爲IOC容器,使用log4Net記錄日誌, 咱們看一下RegisterEQueueComponents方法:服務器
public static class ConfigurationExtensions { public static Configuration RegisterEQueueComponents(this Configuration configuration) { configuration.SetDefault<IAllocateMessageQueueStrategy, AverageAllocateMessageQueueStrategy>(); configuration.SetDefault<IQueueSelector, QueueHashSelector>(); configuration.SetDefault<ILocalOffsetStore, DefaultLocalOffsetStore>(); configuration.SetDefault<IMessageStore, InMemoryMessageStore>(); configuration.SetDefault<IMessageService, MessageService>(); configuration.SetDefault<IOffsetManager, InMemoryOffsetManager>(); return configuration; } }
代碼中涉及到6個組件:分佈式
DeleteMessageInterval 這個屬性是用來設置equeue的定時刪除間隔,單位爲毫秒,默認值是一個小時。另外還有ProducerSocketSetting 和 ConsumerSocketSetting 分別用於設置Producer鏈接Broker和Consumer鏈接Broker的IP和端口,默認端口是5000和5001。學習
public class BrokerSetting { public SocketSetting ProducerSocketSetting { get; set; } public SocketSetting ConsumerSocketSetting { get; set; } public bool NotifyWhenMessageArrived { get; set; } public int DeleteMessageInterval { get; set; } public BrokerSetting() { ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 }; ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 }; NotifyWhenMessageArrived = true; DeleteMessageInterval = 1000 * 60 * 60; } }
運行項目,若是顯示下面相似內容,說明Broker啓動成功:測試
2014-03-23 20:10:30,255 INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]ui
3、在Visual Studio中開發測試this
1.建立一個VS項目 QuickStart.ProducerClient,經過Nuget引用EQueue,編寫下面Producer代碼
using System; using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Threading; using System.Threading.Tasks; using ECommon.Autofac; using ECommon.Configurations; using ECommon.IoC; using ECommon.JsonNet; using ECommon.Log4Net; using ECommon.Scheduling; using EQueue.Clients.Producers; using EQueue.Configurations; using EQueue.Protocols; namespace QuickStart.ProducerClient { class Program { static void Main(string[] args) { InitializeEQueue(); var scheduleService = ObjectContainer.Resolve<IScheduleService>(); var producer = new Producer().Start(); var total = 1000; var parallelCount = 10; var finished = 0; var messageIndex = 0; var watch = Stopwatch.StartNew(); var action = new Action(() => { for (var index = 1; index <= total; index++) { var message = "message" + Interlocked.Increment(ref messageIndex); producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask => { var finishedCount = Interlocked.Increment(ref finished); if (finishedCount % 1000 == 0) { Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds)); } }); } }); var actions = new List<Action>(); for (var index = 0; index < parallelCount; index++) { actions.Add(action); } Parallel.Invoke(actions.ToArray()); Console.ReadLine(); } static void InitializeEQueue() { Configuration .Create() .UseAutofac() .RegisterCommonComponents() .UseLog4Net() .UseJsonNet() .RegisterEQueueComponents(); } } }
Producer對象在使用以前必需要調用Start初始化,初始化一次便可, 注意:切記不能夠在每次發送消息時,都調用Start方法。Producer 默認鏈接本機的5000端口,能夠經過ProducerSetting 進行設置,能夠參看下面的代碼:
public class ProducerSetting { public string BrokerAddress { get; set; } public int BrokerPort { get; set; } public int SendMessageTimeoutMilliseconds { get; set; } public int UpdateTopicQueueCountInterval { get; set; } public ProducerSetting() { BrokerAddress = SocketUtils.GetLocalIPV4().ToString(); BrokerPort = 5000; SendMessageTimeoutMilliseconds = 1000 * 10; UpdateTopicQueueCountInterval = 1000 * 5; }
二、建立一個VS項目 QuickStart.ConsumerClient,經過Nuget引用EQueue,編寫下面Consumer代碼
using System; using System.Linq; using System.Text; using System.Threading; using ECommon.Autofac; using ECommon.Configurations; using ECommon.IoC; using ECommon.JsonNet; using ECommon.Log4Net; using ECommon.Scheduling; using EQueue.Broker; using EQueue.Clients.Consumers; using EQueue.Configurations; using EQueue.Protocols; namespace QuickStart.ConsumerClient { class Program { static void Main(string[] args) { InitializeEQueue(); var messageHandler = new MessageHandler(); var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler); var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler); var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler); var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler); Console.WriteLine("Start consumer load balance, please wait for a moment."); var scheduleService = ObjectContainer.Resolve<IScheduleService>(); var waitHandle = new ManualResetEvent(false); var taskId = scheduleService.ScheduleTask(() => { var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId); var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId); var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId); var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId); if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1) { Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}", string.Join(",", c1AllocatedQueueIds), string.Join(",", c2AllocatedQueueIds), string.Join(",", c3AllocatedQueueIds), string.Join(",", c4AllocatedQueueIds))); waitHandle.Set(); } }, 1000, 1000); waitHandle.WaitOne(); scheduleService.ShutdownTask(taskId); Console.ReadLine(); } static void InitializeEQueue() { Configuration .Create() .UseAutofac() .RegisterCommonComponents() .UseLog4Net() .UseJsonNet() .RegisterEQueueComponents(); } } class MessageHandler : IMessageHandler { private int _handledCount; public void Handle(QueueMessage message, IMessageContext context) { var count = Interlocked.Increment(ref _handledCount); if (count % 1000 == 0) { Console.WriteLine("Total handled {0} messages.", count); } context.OnMessageHandled(message); } } }
使用方式給用戶感受是消息從EQueue服務器推到了應用客戶端。
可是實際Consumer內部是使用長輪詢Pull方式從EQueue服務器拉消息,而後再回調用戶Listener方法。Consumer默認鏈接本機的5001端口,能夠經過ConsumerSetting 進行設置,能夠參看下面的代碼:
public class ConsumerSetting { public string BrokerAddress { get; set; } public int BrokerPort { get; set; } public int RebalanceInterval { get; set; } public int UpdateTopicQueueCountInterval { get; set; } public int HeartbeatBrokerInterval { get; set; } public int PersistConsumerOffsetInterval { get; set; } public PullRequestSetting PullRequestSetting { get; set; } public MessageModel MessageModel { get; set; } public MessageHandleMode MessageHandleMode { get; set; } public ConsumerSetting() { BrokerAddress = SocketUtils.GetLocalIPV4().ToString(); BrokerPort = 5001; RebalanceInterval = 1000 * 5; HeartbeatBrokerInterval = 1000 * 5; UpdateTopicQueueCountInterval = 1000 * 5; PersistConsumerOffsetInterval = 1000 * 5; PullRequestSetting = new PullRequestSetting(); MessageModel = MessageModel.Clustering; MessageHandleMode = MessageHandleMode.Parallel; }
EQueue兼容Linux/Mono,下面是CentOS 6.4/Mono 3.2.3 環境下的運行結果: