高性能SPSC無鎖隊列設計之路

本文整理了Single Producer/Consumer lock free Queue step by step這篇文章裏頭關於高性能的SPSC無鎖隊列使用遵循的幾個原則:html

  • 單寫原則java

  • 使用lazySet替代volatile set數組

  • 使用位運算替代取模運算緩存

  • 避免僞共享性能優化

  • 減小緩存一致性衝突數據結構

1.Single Writer Principle(單寫原則)

若是隻有一個線程對資源進行寫操做,它其實是比你想象的更容易,這個方案是可行的,無需CPU浪費管理資源爭奪或上下文切換。固然,若是有多個線程讀取相同的數據。CPU能夠經過高速緩存一致性的子系統廣播只讀數據的拷貝到其餘核。這雖然有成本的,但它的尺度很是好。
多個線程若是同時寫同一個資源,必有爭奪,就須要用鎖或樂觀鎖等堵塞方法,而非堵塞的單線程寫比多線程寫要快,能得到高吞吐量和低延遲,特別是多核狀況,一個線程一個CPU核,大大增長其餘CPU核並行運行其餘線程的機率。多線程

Method Time (ms)
One Thread 300
One Thread with Memory Barrier 4,700
One Thread with CAS 5,700
Two Threads with CAS 18,000
One Thread with Lock 10,000
Two Threads with Lock 118,000

Disruptor分離了關注,真正實現單寫原則。(Disruptor的特色是將多線程生產者經過Ringbuffer變成單線程消費者,經過單線程消費者對共享資源進行寫操做)
目前 Node.js, Erlang, Actor 模式, SEDA 都採起了單寫解決方案,可是他們大多數使用基於隊列的下實現的,它打破單獨寫原則socket

2.使用lazySet替代volatile set

  • lazySet是使用Unsafe.putOrderedObject方法,會前置一個store-store barrier(在當前的硬件體系下或者是no-op或者很是輕),而不是store-load barrier。oop

  • store-load barrier較慢,老是用在volatile的寫操做上。在操做序列Store1; StoreStore;Store2中,Store1的數據會在Store2和後續寫操做以前對其它處理器可見。換句話說,就是保證了對其它數據可見的寫的順序。性能

  • 若是隻有一個線程寫咱們就用不着store-load barrier,lazySet和volatile set在單寫原則下面是等價的。

  • 這種性能提高是有代價的,雖然廉價,也就是寫後結果並不會被其餘線程看到,甚至是本身的線程,一般是幾納秒後被其餘線程看到,lazySet的寫在實踐上來延遲是納秒級,這個時間比較短,因此代價能夠忍受。

  • 相似Unsafe.putOrderedObject還有unsafe.putOrderedLong等方法,unsafe.putOrderedLong比使用 volatile long要快3倍左右。

3.使用位運算替代取模運算

好比這段

public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail;
        final long wrapPoint = currentTail - buffer.length;
        if (head <= wrapPoint) {
            return false;
        }

        buffer[(int) (currentTail % buffer.length)] = e;
        tail = currentTail + 1;

        return true;
    }

使用位運算以後

mask = capacity - 1;
public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - buffer.length;
        if (head.get() <= wrapPoint) {
            return false;
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

性能對比

x % 8 == x & (8 - 1) 可是位運算速度更快

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 20, time = 3, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class ModuloMaskTest {

    private static final int LENGTH = 16;
    int[] ints = new int[LENGTH];
    int mask = LENGTH - 1;
    int someIndex = 5;

    @Benchmark
    public int moduloLengthNoMask() {
        return someIndex % ints.length;
    }

    @Benchmark
    public int moduloLengthMask() {
        return someIndex & (ints.length - 1);
    }

    @Benchmark
    public int moduloConstantLengthNoMask() {
        return someIndex % LENGTH;
    }

    @Benchmark
    public int moduloMask() {
        return someIndex & mask;
    }

    @Benchmark
    public int consume() {
        return someIndex;
    }
    @Benchmark
    public void noop() {
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(".*" +ModuloMaskTest.class.getSimpleName()+ ".*")
                .forks(1)
                .build();
        new Runner(opt).run();
    }
}

結果以下:

# Run complete. Total time: 00:07:34

Benchmark                                  Mode  Cnt  Score   Error  Units
ModuloMaskTest.consume                     avgt   20  3.099 ± 0.152  ns/op
ModuloMaskTest.moduloConstantLengthNoMask  avgt   20  3.430 ± 0.509  ns/op
ModuloMaskTest.moduloLengthMask            avgt   20  3.505 ± 0.058  ns/op
ModuloMaskTest.moduloLengthNoMask          avgt   20  6.490 ± 0.143  ns/op
ModuloMaskTest.moduloMask                  avgt   20  3.304 ± 0.159  ns/op
ModuloMaskTest.noop                        avgt   20  0.404 ± 0.010  ns/op

能夠發現%操做性能最差要6.x納秒,&操做基本在3ns左右

4.避免僞共享

L1 L2 L3 cache

當 CPU 執行運算的時候,它先去 L1 查找所需的數據,再去 L2,而後是L3,最後若是這些緩存中都沒有,所需的數據就要去主內存拿。走得越遠,運算耗費的時間就越長。因此若是你在作一些很頻繁的事,你要確保數據在 L1 緩存中。

從CPU到 大約須要的CPU週期 大約須要的時間
主存 約60-80ns
QPI 總線傳輸(between sockets, not drawn) 約20ns
L3 cache 約40-45 cycles 約15ns
L2 cache 約10 cycles 約3ns
L1 cache 約3-4 cycles 約1ns
寄存器 1 cycle

可見CPU讀取主存中的數據會比從L1中讀取慢了近2個數量級。

定義

Cache是由不少個cache line組成的。每一個cache line一般是64字節,而且它有效地引用主內存中的一起地址。一個Java的long類型變量是8字節,所以在一個緩存行中能夠存8個long類型的變量。
CPU每次從主存中拉取數據時,會把相鄰的數據也存入同一個cache line。
在訪問一個long數組的時候,若是數組中的一個值被加載到緩存中,它會自動加載另外7個。所以你能很是快的遍歷這個數組。事實上,你能夠很是快速的遍歷在連續內存塊中分配的任意數據結構。這種沒法充分使用緩存行特性的現象,稱爲僞共享。

圖片描述

當多線程修改互相獨立的變量時,若是這些變量共享同一個緩存行,就會無心中影響彼此的性能,這就是僞共享。緩存行上的寫競爭是運行在SMP系統中並行線程實現可伸縮性最重要的限制因素。有人將僞共享描述成無聲的性能殺手。

圖片描述

圖1說明了僞共享的問題。在覈心1上運行的線程想更新變量X,同時核心2上的線程想要更新變量Y。不幸的是,這兩個變量在同一個緩存行中。每一個線程都要去競爭緩存行的全部權來更新變量。若是核心1得到了全部權,緩存子系統將會使核心2中對應的緩存行失效。當核心2得到了全部權而後執行更新操做,核心1就要使本身對應的緩存行失效。這會來來回回的通過L3緩存,大大影響了性能。若是互相競爭的核心位於不一樣的插槽,就要額外橫跨插槽鏈接,問題可能更加嚴重。

解決

對於僞共享,通常的解決方案是,增大數組元素的間隔使得由不一樣線程存取的元素位於不一樣的緩存行上,以空間換時間。在jdk1.8中,有專門的註解@Contended來避免僞共享,更優雅地解決問題。

@Contended
public class VolatileLong {
    public volatile long value = 0L;
}

public class FalseSharingJdk8 implements Runnable {
    public static int NUM_THREADS = 4; // change
    public final static long ITERATIONS = 500L * 1000L * 1000L;
    private final int arrayIndex;
    private static VolatileLong[] longs;

    public FalseSharingJdk8(final int arrayIndex) {
        this.arrayIndex = arrayIndex;
    }

    /**
     * -XX:-RestrictContended
     * –XX:+PrintFieldLayout  --- 只是在調試版jdk有效
     * @param args
     * @throws Exception
     */
    public static void main(final String[] args) throws Exception {
        Thread.sleep(10000);
        System.out.println("starting....");
        if (args.length == 1) {
            NUM_THREADS = Integer.parseInt(args[0]);
        }

        longs = new VolatileLong[NUM_THREADS];
        for (int i = 0; i < longs.length; i++) {
            longs[i] = new VolatileLong();
        }
        final long start = System.nanoTime();
        runTest();
        System.out.println("duration = " + (System.nanoTime() - start));
    }

    private static void runTest() throws InterruptedException {
        Thread[] threads = new Thread[NUM_THREADS];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new FalseSharingJdk8(i));
        }
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    public void run() {
        long i = ITERATIONS + 1;
        while (0 != --i) {
            longs[arrayIndex].value = i;
        }
    }
}

沒有使用註解的話,須要本身去填充

public final static class ValuePadding {
        protected long p1, p2, p3, p4, p5, p6, p7;
        protected volatile long value = 0L;
        protected long p9, p10, p11, p12, p13, p14;
        protected long p15;
    }

5.減小緩存一致性衝突

只要系統只有一個CPU核在工做,一切都沒問題。若是有多個核,每一個核又都有本身的緩存,那麼咱們就遇到問題了:若是某個CPU緩存段中對應的內存內容被另一個CPU偷偷改了,會發生什麼?
緩存一致性協議就是爲了解決這個問題而設計的,使多組緩存的內容保持一致,即便用多組緩存,但使它們的行爲看起來就像只有一組緩存那樣。

private final AtomicLong tail = new AtomicLong(0);
    private final AtomicLong head = new AtomicLong(0);

    public static class PaddedLong {
        public long value = 0, p1, p2, p3, p4, p5, p6;
    }

    private final PaddedLong tailCache = new PaddedLong();
    private final PaddedLong headCache = new PaddedLong();

    public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - capacity;
        if (headCache.value <= wrapPoint) {
            headCache.value = head.get();
            if (headCache.value <= wrapPoint) {
                return false;
            }
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

    public E poll() {
        final long currentHead = head.get();
        if (currentHead >= tailCache.value) {
            tailCache.value = tail.get();
            if (currentHead >= tailCache.value) {
                return null;
            }
        }

        final int index = (int) currentHead & mask;
        final E e = buffer[index];
        buffer[index] = null;
        head.lazySet(currentHead + 1);

        return e;
    }

對比沒有cache的版本

private final AtomicLong tail = new AtomicLong(0);
private final AtomicLong head = new AtomicLong(0);

public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - buffer.length;
        if (head.get() <= wrapPoint) {
            return false;
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

    public E poll() {
        final long currentHead = head.get();
        if (currentHead >= tail.get()) {
            return null;
        }

        final int index = (int) currentHead & mask;
        final E e = buffer[index];
        buffer[index] = null;
        head.lazySet(currentHead + 1);

        return e;
    }

對比數據

0 - ops/sec=56,689,539 - OneToOneConcurrentArrayQueue2 result=777
1 - ops/sec=33,578,974 - OneToOneConcurrentArrayQueue2 result=777
2 - ops/sec=54,105,692 - OneToOneConcurrentArrayQueue2 result=777
3 - ops/sec=84,290,815 - OneToOneConcurrentArrayQueue2 result=777
4 - ops/sec=79,851,727 - OneToOneConcurrentArrayQueue2 result=777
-----
0 - ops/sec=110,506,679 - OneToOneConcurrentArrayQueue3 result=777
1 - ops/sec=117,252,276 - OneToOneConcurrentArrayQueue3 result=777
2 - ops/sec=115,639,936 - OneToOneConcurrentArrayQueue3 result=777
3 - ops/sec=116,555,884 - OneToOneConcurrentArrayQueue3 result=777
4 - ops/sec=115,712,336 - OneToOneConcurrentArrayQueue3 result=777

總體上有必定的提高。

doc

相關文章
相關標籤/搜索