.NET開發,從事手遊開發有8年多,真愛呀 有木有html
代碼前往這裏 https://github.com/GuojieChen/netmq-patterns-samplegit
目前項目在肯定業務須要的技術過程當中遇到了一個選擇
消息隊列使用哪一種方式好,性能又是如何github
目前網上找到幾篇文章
國內:https://www.cnblogs.com/pasoraku/p/4673039.html
國外:https://gist.github.com/hmartiro/85b89858d2c12ae1a0f9redis
國外這篇文章中提到socket
ZeroMQ, Pub/Sub: 481,000 msg/s, latency <1 ms
Redis Pub/Sub (async via libevent): 59,000 msg/s, latency <1 msasync
對於這個結論(基於C++)這個暫時沒有模擬去測試tcp
生產者:10個線程,不斷產生數據,這裏用時間的記時週期數DateTime.Now.Ticks
消費者:接收數據,計算消耗的時間
每10s統計一次每秒的平均值性能
PubSub.Server 訂閱方,消費者測試
using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using NetMQ; using NetMQ.Sockets; namespace PubSub.Server { class Program { static void Main(string[] args) { SubscriberSocket socket = new SubscriberSocket(">tcp://localhost:1012"); socket.Subscribe("aaa"); socket.ReceiveReady += Socket_ReceiveReady; NetMQ.NetMQPoller poller = new NetMQ.NetMQPoller(); poller.Add(socket); poller.RunAsync(); Task.Factory.StartNew(() => { while (true) { Thread.Sleep(1*1000); var tmp = d; d = new List<TimeSpan>(); if (tmp.Any()) Console.WriteLine($"{DateTime.Now}\t{tmp.Count / 1}/s\t{tmp.Average(x => x.TotalMilliseconds)}"); else Console.WriteLine("-"); } }); Console.Read(); } private static List<TimeSpan> d = new List<TimeSpan>(); private static void Socket_ReceiveReady(object sender, NetMQ.NetMQSocketEventArgs e) { var topic = e.Socket.ReceiveFrameString(); var dt = e.Socket.ReceiveFrameString(); //Console.WriteLine(dt); var t = DateTime.Now.Subtract(new DateTime(Convert.ToInt64(dt))); d.Add(t); } } }
PubSub.Proxy 中間代理 模擬正式環境中的Proxy層,也等同redis serverspa
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using NetMQ; using NetMQ.Sockets; namespace PubSub.Proxy { class Program { static void Main(string[] args) { NetMQ.Proxy proxy = new NetMQ.Proxy(new XSubscriberSocket("@tcp://*:1011"),new XPublisherSocket("@tcp://*:1012")); Console.WriteLine("running..."); proxy.Start(); } } }
PubSub.Client 消費方,生產者
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using NetMQ; using NetMQ.Sockets; namespace PubSub.Client { class Program { private static PublisherSocket socket = new PublisherSocket(">tcp://localhost:1011"); private static NetMQQueue<string> queue = new NetMQQueue<string>(); static void Main(string[] args) { queue.ReceiveReady += (s, e) => { var msg = e.Queue.Dequeue(); socket.SendMoreFrame("aaa").SendFrame(msg); }; NetMQPoller poller = new NetMQPoller(); poller.Add(queue); poller.RunAsync(); for (var i = 0; i < 100; i++) { Task.Factory.StartNew(Run, i); } Console.WriteLine("running..."); Console.Read(); } private static void Run(object obj) { while (true) { queue.Enqueue(Convert.ToString(DateTime.Now.Ticks)); Thread.Sleep(1); } } } }
RedisPubSub.Server
using StackExchange.Redis; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RedisPubSub.Server { class Program { private static ConnectionMultiplexer ConnectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:5379,127.0.0.1:5380,password=123456"); private static List<TimeSpan> d = new List<TimeSpan>(); static void Main(string[] args) { var date = DateTime.Now; ConnectionMultiplexer.GetSubscriber().Subscribe("aaa", (c, m) => { var t = DateTime.Now.Subtract(new DateTime(Convert.ToInt64(m))); d.Add(t); }); Task.Factory.StartNew(() => { while (true) { Thread.Sleep(1 * 1000); var tmp = d; d = new List<TimeSpan>(); if (tmp.Any()) Console.WriteLine($"{DateTime.Now}\t{tmp.Count / 1}/s\t{tmp.Average(x => x.TotalMilliseconds)}"); else Console.WriteLine("-"); } }); Console.Read(); } } }
RedisPubSub.Client
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using StackExchange.Redis; namespace RedisPubSub.Client { class Program { private static ConnectionMultiplexer ConnectionMultiplexer = ConnectionMultiplexer.Connect("127.0.0.1:5379,127.0.0.1:5380,password=123456"); static void Main(string[] args) { for (var i = 0; i < 100; i++) { Task.Factory.StartNew(Run,i); } Console.WriteLine("running..."); Console.Read(); } private static void Run(object obj) { while (true) { ConnectionMultiplexer.GetSubscriber().Publish("aaa", DateTime.Now.Ticks); Thread.Sleep(1); } } } }
netmq 運行結果
2018/9/5 21:38:07 4810/s 0.0773472247630134 2018/9/5 21:38:17 4953/s 0.0345690630488765 2018/9/5 21:38:27 4956/s 0.0315224333756985 2018/9/5 21:38:37 4838/s 0.0788584731751667 2018/9/5 21:38:47 4968/s 0.0267414818244033 2018/9/5 21:38:57 4927/s 0.0456701723053194 2018/9/5 21:39:07 4921/s 0.0432689302042063 2018/9/5 21:39:17 4927/s 0.0381800129878038 2018/9/5 21:39:27 4816/s 0.0886798555027297 2018/9/5 21:39:37 4666/s 0.118044937753114 2018/9/5 21:39:47 4938/s 0.0384998096665115 2018/9/5 21:39:57 4956/s 0.029773836282561 2018/9/5 21:40:07 4947/s 0.0333202635781134 2018/9/5 21:40:17 4846/s 0.0800275512730572
redis 測試結果
2018/9/5 21:41:18 4948/s 0.0181999898967447 2018/9/5 21:41:28 4987/s 0.0271869203472404 2018/9/5 21:41:38 4988/s 0.0162716218925421 2018/9/5 21:41:48 4995/s 0.0106067283246252 2018/9/5 21:41:58 4991/s 0.00916983653191234 2018/9/5 21:42:08 4983/s 0.0864149871578777 2018/9/5 21:42:18 4989/s 0.0139901451076282 2018/9/5 21:42:28 4993/s 0.0140108803620779 2018/9/5 21:42:38 4988/s 0.0111812829507868 2018/9/5 21:42:48 4994/s 0.00812532134705482 2018/9/5 21:42:58 4991/s 0.0198666673345088 2018/9/5 21:43:08 4987/s 0.025591579622687 2018/9/5 21:43:18 4988/s 0.0872425344264263 2018/9/5 21:43:28 4989/s 0.0124017437665357 2018/9/5 21:43:38 4992/s 0.0293738252613868 2018/9/5 21:43:48 4972/s 0.0869289167286693
結論就你們本身總結拉,並且目前是小量數據的測試 以上有不妥的地方還但願各位大佬指出,感謝