c#開源消息隊列中間件EQueue 教程

1、簡介html

EQueue是一個參照RocketMQ實現的開源消息隊列中間件,兼容Mono,具體能夠參看做者的文章《分享一個c#寫的開源分佈式消息隊列equeue》。項目開源地址:https://github.com/tangxuehua/equeue,項目中包含了隊列的所有源代碼以及如何使用的示例。git

2、安裝EQueuegithub

Producer、Consumer、Broker支持分佈式部署,安裝EQueue須要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是個類庫,須要本身實現Broker的宿主,能夠參照QuickStart,建立一個QuickStart.BrokerServer項目,經過Visual Studio的Nuget 查找equeuec#

image

using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.BrokerServer
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();
            var setting = new BrokerSetting();
            setting.NotifyWhenMessageArrived = false;
            setting.DeleteMessageInterval = 1000;
            new BrokerController(setting).Initialize().Start();
            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

InitializeEQueue方法初始化EQueue的環境,使用了Autofac做爲IOC容器,使用log4Net記錄日誌, 咱們看一下RegisterEQueueComponents方法:服務器

   public static class ConfigurationExtensions
    {
        public static Configuration RegisterEQueueComponents(this Configuration configuration)
        {
            configuration.SetDefault<IAllocateMessageQueueStrategy, AverageAllocateMessageQueueStrategy>();
            configuration.SetDefault<IQueueSelector, QueueHashSelector>();
            configuration.SetDefault<ILocalOffsetStore, DefaultLocalOffsetStore>();
            configuration.SetDefault<IMessageStore, InMemoryMessageStore>();
            configuration.SetDefault<IMessageService, MessageService>();
            configuration.SetDefault<IOffsetManager, InMemoryOffsetManager>();
            return configuration;
        }
    }

代碼中涉及到6個組件:分佈式

  • IAllocateMessageQueueStrategy
  • IQueueSelector
  • ILocalOffsetStore
  • IMessageStore
  • IMessageService
  • IOffsetManager

DeleteMessageInterval 這個屬性是用來設置equeue的定時刪除間隔,單位爲毫秒,默認值是一個小時。另外還有ProducerSocketSetting 和 ConsumerSocketSetting 分別用於設置Producer鏈接Broker和Consumer鏈接Broker的IP和端口,默認端口是5000和5001。學習

 public class BrokerSetting
    {
        public SocketSetting ProducerSocketSetting { get; set; }
        public SocketSetting ConsumerSocketSetting { get; set; }
        public bool NotifyWhenMessageArrived { get; set; }
        public int DeleteMessageInterval { get; set; }

        public BrokerSetting()
        {
            ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 };
            ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 };
            NotifyWhenMessageArrived = true;
            DeleteMessageInterval = 1000 * 60 * 60;
        }
    }

運行項目,若是顯示下面相似內容,說明Broker啓動成功:測試

2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]ui

3、在Visual Studio中開發測試this

1.建立一個VS項目 QuickStart.ProducerClient,經過Nuget引用EQueue,編寫下面Producer代碼

 using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ProducerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var producer = new Producer().Start();
            var total = 1000;
            var parallelCount = 10;
            var finished = 0;
            var messageIndex = 0;
            var watch = Stopwatch.StartNew();

            var action = new Action(() =>
            {
                for (var index = 1; index <= total; index++)
                {
                    var message = "message" + Interlocked.Increment(ref messageIndex);
                    producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask =>
                    {
                        var finishedCount = Interlocked.Increment(ref finished);
                        if (finishedCount % 1000 == 0)
                        {
                            Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
                        }
                    });
                }
            });

            var actions = new List<Action>();
            for (var index = 0; index < parallelCount; index++)
            {
                actions.Add(action);
            }

            Parallel.Invoke(actions.ToArray());

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

Producer對象在使用以前必需要調用Start初始化,初始化一次便可, 注意:切記不能夠在每次發送消息時,都調用Start方法。Producer 默認鏈接本機的5000端口,能夠經過ProducerSetting 進行設置,能夠參看下面的代碼:

 public class ProducerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int SendMessageTimeoutMilliseconds { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }

        public ProducerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5000;
            SendMessageTimeoutMilliseconds = 1000 * 10;
            UpdateTopicQueueCountInterval = 1000 * 5;
        }

二、建立一個VS項目 QuickStart.ConsumerClient,經過Nuget引用EQueue,編寫下面Consumer代碼

using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ConsumerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var messageHandler = new MessageHandler();
            var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);

            Console.WriteLine("Start consumer load balance, please wait for a moment.");
            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var waitHandle = new ManualResetEvent(false);
            var taskId = scheduleService.ScheduleTask(() =>
            {
                var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId);
                var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId);
                var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId);
                var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId);
                if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1)
                {
                    Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}",
                        string.Join(",", c1AllocatedQueueIds),
                        string.Join(",", c2AllocatedQueueIds),
                        string.Join(",", c3AllocatedQueueIds),
                        string.Join(",", c4AllocatedQueueIds)));
                    waitHandle.Set();
                }
            }, 1000, 1000);

            waitHandle.WaitOne();
            scheduleService.ShutdownTask(taskId);

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }

    class MessageHandler : IMessageHandler
    {
        private int _handledCount;

        public void Handle(QueueMessage message, IMessageContext context)
        {
            var count = Interlocked.Increment(ref _handledCount);
            if (count % 1000 == 0)
            {
                Console.WriteLine("Total handled {0} messages.", count);
            }
            context.OnMessageHandled(message);
        }
    }
}

使用方式給用戶感受是消息從EQueue服務器推到了應用客戶端。 可是實際Consumer內部是使用長輪詢Pull方式從EQueue服務器拉消息,而後再回調用戶Listener方法。Consumer默認鏈接本機的5001端口,能夠經過ConsumerSetting 進行設置,能夠參看下面的代碼:

    public class ConsumerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int RebalanceInterval { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }
        public int HeartbeatBrokerInterval { get; set; }
        public int PersistConsumerOffsetInterval { get; set; }
        public PullRequestSetting PullRequestSetting { get; set; }
        public MessageModel MessageModel { get; set; }
        public MessageHandleMode MessageHandleMode { get; set; }

        public ConsumerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5001;
            RebalanceInterval = 1000 * 5;
            HeartbeatBrokerInterval = 1000 * 5;
            UpdateTopicQueueCountInterval = 1000 * 5;
            PersistConsumerOffsetInterval = 1000 * 5;
            PullRequestSetting = new PullRequestSetting();
            MessageModel = MessageModel.Clustering;
            MessageHandleMode = MessageHandleMode.Parallel;
        }

EQueue兼容Linux/Mono,下面是CentOS 6.4/Mono 3.2.3 環境下的運行結果:

image

 

Kafka/Metaq設計思想學習筆記

EQueue - 詳細談一下消息持久化以及消息堆積的設計

相關文章
相關標籤/搜索