netmq VS redis 訂閱發佈性能研究

簡單介本身紹下:

.NET開發,從事手遊開發有8年多,真愛呀 有木有html

代碼前往這裏 https://github.com/GuojieChen/netmq-patterns-samplegit

目前項目在肯定業務須要的技術過程當中遇到了一個選擇
消息隊列使用哪一種方式好,性能又是如何github

目前網上找到幾篇文章
國內:https://www.cnblogs.com/pasoraku/p/4673039.html
國外:https://gist.github.com/hmartiro/85b89858d2c12ae1a0f9redis

國外這篇文章中提到socket

ZeroMQ, Pub/Sub: 481,000 msg/s, latency <1 ms
Redis Pub/Sub (async via libevent): 59,000 msg/s, latency <1 msasync

對於這個結論(基於C++)這個暫時沒有模擬去測試tcp

用例介紹

生產者:10個線程,不斷產生數據,這裏用時間的記時週期數DateTime.Now.Ticks
消費者:接收數據,計算消耗的時間
每10s統計一次每秒的平均值性能

netmq

PubSub.Server 訂閱方,消費者測試

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace PubSub.Server
{
    class Program
    {

        static void Main(string[] args)
        {
            SubscriberSocket socket = new SubscriberSocket(">tcp://localhost:1012");
            socket.Subscribe("aaa");
            socket.ReceiveReady += Socket_ReceiveReady;
            NetMQ.NetMQPoller poller = new NetMQ.NetMQPoller();
            poller.Add(socket);
            poller.RunAsync();

            Task.Factory.StartNew(() => 
            {
                while (true)
                {
                    Thread.Sleep(1*1000);

                    var tmp = d;
                    d = new List<TimeSpan>();

                    if (tmp.Any())
                        Console.WriteLine($"{DateTime.Now}\t{tmp.Count / 1}/s\t{tmp.Average(x => x.TotalMilliseconds)}");
                    else
                        Console.WriteLine("-");
                }
            });

            Console.Read();
        }

        private static List<TimeSpan> d = new List<TimeSpan>();
        
        private static void Socket_ReceiveReady(object sender, NetMQ.NetMQSocketEventArgs e)
        {
            var topic = e.Socket.ReceiveFrameString();
            var dt = e.Socket.ReceiveFrameString();
            //Console.WriteLine(dt);
            var t = DateTime.Now.Subtract(new DateTime(Convert.ToInt64(dt)));
            d.Add(t);
        }
    }
}

PubSub.Proxy 中間代理 模擬正式環境中的Proxy層,也等同redis serverspa

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace PubSub.Proxy
{
    class Program
    {
        static void Main(string[] args)
        {
            NetMQ.Proxy proxy = new NetMQ.Proxy(new XSubscriberSocket("@tcp://*:1011"),new XPublisherSocket("@tcp://*:1012"));
            Console.WriteLine("running...");
            proxy.Start();
        }
    }
}

PubSub.Client 消費方,生產者

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;

namespace PubSub.Client
{
    class Program
    {
        private static PublisherSocket socket = new PublisherSocket(">tcp://localhost:1011");
        private static NetMQQueue<string> queue = new NetMQQueue<string>();
        static void Main(string[] args)
        {
            queue.ReceiveReady += (s, e) =>
            {
                var msg = e.Queue.Dequeue();
                socket.SendMoreFrame("aaa").SendFrame(msg);
            };

            NetMQPoller poller = new NetMQPoller();
            poller.Add(queue);
            poller.RunAsync();

            for (var i = 0; i < 100; i++)
            {
                Task.Factory.StartNew(Run, i);
            }

            Console.WriteLine("running...");
            Console.Read();
        }

        private static void Run(object obj)
        {
            while (true)
            {
                queue.Enqueue(Convert.ToString(DateTime.Now.Ticks));

                Thread.Sleep(1);
            }
        }
    }
}

redis測試模型

RedisPubSub.Server

using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RedisPubSub.Server
{
    class Program
    {
        private static ConnectionMultiplexer ConnectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:5379,127.0.0.1:5380,password=123456");


        private static List<TimeSpan> d = new List<TimeSpan>();
        
        static void Main(string[] args)
        {
            var date = DateTime.Now;

            ConnectionMultiplexer.GetSubscriber().Subscribe("aaa", (c, m) =>
            { 
                var t = DateTime.Now.Subtract(new DateTime(Convert.ToInt64(m)));

                d.Add(t);
            });


            Task.Factory.StartNew(() => 
            {
                while (true)
                {
                    Thread.Sleep(1 * 1000);

                    var tmp = d;
                    d = new List<TimeSpan>();

                    if (tmp.Any())
                        Console.WriteLine($"{DateTime.Now}\t{tmp.Count / 1}/s\t{tmp.Average(x => x.TotalMilliseconds)}");
                    else
                        Console.WriteLine("-");
                }
            });
            Console.Read();
        }
    }
}

RedisPubSub.Client

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace RedisPubSub.Client
{
    class Program
    {
        private static ConnectionMultiplexer ConnectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:5379,127.0.0.1:5380,password=123456");

        static void Main(string[] args)
        {
            for (var i = 0; i < 100; i++)
            {
                Task.Factory.StartNew(Run,i);
            }

            Console.WriteLine("running...");
            Console.Read();
        }

        private static void Run(object obj)
        {
            while (true)
            {
                ConnectionMultiplexer.GetSubscriber().Publish("aaa", DateTime.Now.Ticks);
                Thread.Sleep(1);
            }
        }
    }
}

netmq 運行結果

2018/9/5 21:38:07       4810/s  0.0773472247630134
2018/9/5 21:38:17       4953/s  0.0345690630488765
2018/9/5 21:38:27       4956/s  0.0315224333756985
2018/9/5 21:38:37       4838/s  0.0788584731751667
2018/9/5 21:38:47       4968/s  0.0267414818244033
2018/9/5 21:38:57       4927/s  0.0456701723053194
2018/9/5 21:39:07       4921/s  0.0432689302042063
2018/9/5 21:39:17       4927/s  0.0381800129878038
2018/9/5 21:39:27       4816/s  0.0886798555027297
2018/9/5 21:39:37       4666/s  0.118044937753114
2018/9/5 21:39:47       4938/s  0.0384998096665115
2018/9/5 21:39:57       4956/s  0.029773836282561
2018/9/5 21:40:07       4947/s  0.0333202635781134
2018/9/5 21:40:17       4846/s  0.0800275512730572

redis 測試結果

2018/9/5 21:41:18       4948/s  0.0181999898967447
2018/9/5 21:41:28       4987/s  0.0271869203472404
2018/9/5 21:41:38       4988/s  0.0162716218925421
2018/9/5 21:41:48       4995/s  0.0106067283246252
2018/9/5 21:41:58       4991/s  0.00916983653191234
2018/9/5 21:42:08       4983/s  0.0864149871578777
2018/9/5 21:42:18       4989/s  0.0139901451076282
2018/9/5 21:42:28       4993/s  0.0140108803620779
2018/9/5 21:42:38       4988/s  0.0111812829507868
2018/9/5 21:42:48       4994/s  0.00812532134705482
2018/9/5 21:42:58       4991/s  0.0198666673345088
2018/9/5 21:43:08       4987/s  0.025591579622687
2018/9/5 21:43:18       4988/s  0.0872425344264263
2018/9/5 21:43:28       4989/s  0.0124017437665357
2018/9/5 21:43:38       4992/s  0.0293738252613868
2018/9/5 21:43:48       4972/s  0.0869289167286693

結論就你們本身總結拉,並且目前是小量數據的測試 以上有不妥的地方還但願各位大佬指出,感謝

相關文章
相關標籤/搜索