基於 CAS 無鎖實現的 Disruptor.NET 竟然慢於 BlockingCollection,是真的嗎?

StackOverflow 有人說本身的 Disruptor.NET 代碼比 BlockingCollection 還有慢 2 倍,而且把完整代碼貼出,樓下幾個老外也的回覆說了一堆,可是沒研究出個因此然來,討論到最後甚至說可能你的場景不適合 Disruptor,我對此表示懷疑,BlockingCollection 內部實現比較簡單粗暴,必要時就加鎖,取數據時用信號量等待添加操做完成,而 Disruptor 卻不是通常的實現,是專門針對 CPU 緩存的特性優化過的,內部沒有鎖只有 CAS 原子操做,並且還考慮到了 false sharing,所以理論上 Disruptor 不會比 BlockingCollection 慢。緩存

 

但是既然實際應用上出現問題,那就要分析下緣由了。async

把他的代碼弄下來看了一下,問題多多啊。oop

在 Disruptor EventHandler 裏面不定時調用 Console.WriteLine ,可是在 BlockingCollection 的 Handler 裏面卻只是記錄了數據, Console.WriteLine 內部但是有鎖的,調用的開銷很大,如何能取得公平的結果呢?post

另外 RingBuffer 的 Size 過小隻有 64,嚴重影響 Disruptor 的表現,實際測試對比下來,應該 1024 或更大。性能

還有 BlockingCollection 裏面的 while (!dataItems.IsCompleted) 寫的也有問題,即便 BlockingCollection Producer 在循環中一直作添加操做,BlockingCollection 內部狀態也並非一直在添加狀態中,這樣致使添加循環還沒作完,但是計時器的循環已經提早結束,致使 BlockingCollection 測得時間少於實際實際。測試

Task.Factory.StartNew(() => {
    while (!dataItems.IsCompleted)
    {

        ValueEntry ve = null;
        try
        {
    ve = dataItems.Take();
    long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    results[ve.Value] = microseconds;

    //Console.WriteLine("elapsed microseconds = " + microseconds);
    //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
        }
        catch (InvalidOperationException) { }
    }
}, TaskCreationOptions.LongRunning);


for (int i = 0; i < length; i++)
{
    var valueToSet = i;

    ValueEntry entry = new ValueEntry();
    entry.Value = valueToSet;

    sw[i].Restart();
    dataItems.Add(entry);

    //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
    //Thread.Sleep(1000);
}

 

而後從新修改了他的代碼,實測 Disruptor 10 倍速度於 BlockingCollection (這裏插一句題外話,Disruptor .NET 版本的速度全面快於 Java 版本,很多場景下的速度比 Java 版本要快 10 倍,.NET 版是從 Java 移植過來的實現也和 Java 保持一直,是哪些語言特性致使性能差別這麼大呢?)。優化

Disruptor is 10x faster than BlockingCollection with multi producer (10 parallel producet), 2x faster than BlockingCollection with Single producer:spa

 

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
using NUnit.Framework;

namespace DisruptorTest.Ds
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class MyHandler : IEventHandler<ValueEntry>
    {
        public void OnEvent(ValueEntry data, long sequence, bool endOfBatch)
        {
        }
    }

    [TestFixture]
    public class DisruptorPerformanceTest
    {
        private volatile bool collectionAddEnded;

        private int producerCount = 10;
        private int runCount = 1000000;
        private int RingBufferAndCapacitySize = 1024;

        [TestCase()]
        public async Task TestBoth()
        {
            for (int i = 0; i < 1; i++)
            {
                foreach (var rs in new int[] {64, 512, 1024, 2048 /*,4096,4096*2*/})
                {
                    Console.WriteLine($"RingBufferAndCapacitySize:{rs}, producerCount:{producerCount}, runCount:{runCount} of {i}");
                    RingBufferAndCapacitySize = rs;
                    await DisruptorTest();
                    await BlockingCollectionTest();
                }
            }
        }

        [TestCase()]
        public async Task BlockingCollectionTest()
        {
            var sw = new Stopwatch();
            BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(RingBufferAndCapacitySize);

            sw.Start();

            collectionAddEnded = false;

            // A simple blocking consumer with no cancellation.
            var task = Task.Factory.StartNew(() =>
            {
                while (!collectionAddEnded && !dataItems.IsCompleted)
                {
                    //if (!dataItems.IsCompleted && dataItems.TryTake(out var ve))
                    if (dataItems.TryTake(out var ve))
                    {
                    }
                }
            }, TaskCreationOptions.LongRunning);


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        ValueEntry entry = new ValueEntry();
                        entry.Id = i;
                        dataItems.Add(entry);
                    }
                });
            }

            await Task.WhenAll(tasks);

            collectionAddEnded = true;
            await task;

            sw.Stop();

            Console.WriteLine($"BlockingCollectionTest Time:{sw.ElapsedMilliseconds/1000d}");
        }


        [TestCase()]
        public async Task DisruptorTest()
        {
            var disruptor =
                new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingBufferAndCapacitySize, TaskScheduler.Default,
                    producerCount > 1 ? ProducerType.Multi : ProducerType.Single, new BlockingWaitStrategy());
            disruptor.HandleEventsWith(new MyHandler());

            var _ringBuffer = disruptor.Start();

            Stopwatch sw = Stopwatch.StartNew();

            sw.Start();


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        long sequenceNo = _ringBuffer.Next();
                        _ringBuffer[sequenceNo].Id = 0;
                        _ringBuffer.Publish(sequenceNo);
                    }
                });
            }


            await Task.WhenAll(tasks);


            disruptor.Shutdown();

            sw.Stop();
            Console.WriteLine($"DisruptorTest Time:{sw.ElapsedMilliseconds/1000d}s");
        }
    }
}
 

BlockingCollectionTest with a shared ValueEntry instance (no new ValueEntry() in for loop)

  • RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0pwa

    DisruptorTest Time:16.962scode

    BlockingCollectionTest Time:18.399

  • RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0 DisruptorTest Time:6.101s

    BlockingCollectionTest Time:19.526

  • RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.928s

    BlockingCollectionTest Time:20.25

  • RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.448s

    BlockingCollectionTest Time:20.649

BlockingCollectionTest create a new ValueEntry() in for loop

  • RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:27.374s

    BlockingCollectionTest Time:21.955

  • RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:5.011s

    BlockingCollectionTest Time:20.127

  • RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.877s

    BlockingCollectionTest Time:22.656

  • RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.384s

    BlockingCollectionTest Time:23.567

相關文章
相關標籤/搜索