RingBuffer 在 Puma 中的應用

什麼是 RingBuffer

環形緩衝區:https://zh.wikipedia.org/wiki/環形緩衝區html

維基百科的解釋是:它是一種用於表示一個固定尺寸、頭尾相連的緩衝區的數據結構,適合緩存數據流。java

底層數據結構很是簡單,一個固定長度的數組加一個寫指針和一個讀指針。git

RingBuffer

只要像這張圖同樣,把這個數組辦彎,它就成了一個 RingBuffer。github

那它到底有什麼精妙的地方呢?數據庫

我最近作的項目正好要用到相似的設計思路,因此翻出了之前在點評寫的 Puma 系統,看了看之前本身寫的代碼。順便寫個文章總結一下。編程

Puma 是什麼,爲何要用 RingBuffer

Puma 簡介

Puma 是一個 MySQL 數據庫 Binlog 訂閱消費系統。相似於阿里的 Canel設計模式

Puma 會假裝成一個 MySQL Slave,而後消費 Binlog 數據,並緩存在本地。當有客戶端鏈接上來的時候,就會從本地讀取數據給客戶端消費。數組

若是用很通常的設計,一個單獨的線程會從 MySQL 消費數據,存到本地文件中。而後每一個客戶端的鏈接都會有一個線程,從本地文件中讀取數據。緩存

沒錯,初版就是這麼簡單粗暴,固然,它是有效的。數據結構

利用緩存優化性能

不用作性能測試就知道,系統壓力大了之後這裏一定會是一個瓶頸。讀寫數據會有一點延時,若是多個客戶端同時讀取同一份數據又會形成不少的浪費。而後數據的編碼解碼還會有很多損耗。

因此這裏固然要加一個緩存了。

一個線程寫數據到磁盤,而後它能夠同時把數據傳遞給各個客戶端。

聽起來像發佈者訂閱者模式?也有點像生產者消費者模式?

可是,它並不只僅是發佈者訂閱者模式,由於這裏的「發佈者」和「訂閱者」是徹底異步的,並且每一個「訂閱者」的消費速度是不同的。

它也不只僅是生產者消費者模式,由於這裏的「消費者」是同時消費全部數據,而不是把數據分發給各個「消費者」。

它們的消費速度不同,還會出現緩存內的數據過新,「消費者」不得不去磁盤讀取。

感受這裏的需求是兩種設計模式的結合。

不只如此,這是一套高併發的系統,怎麼保證性能,怎麼保證數據一致性?

因此,這個緩存看上去簡單,其實它不簡單。

常規解決思路

目標明確後,看看 Java 的併發集合中有什麼能知足需求的吧。

第一個想到的就是BlockingQueue,爲每個客戶端建立一個BlockingQueueBlockingQueue內部是經過加鎖來實現的,雖然鎖衝突不會不少,但高併發的狀況下,最好仍是能作到無鎖。

Disruptor

當時正好看到了一系列介紹 Disruptor 的文章:傳送門

因此就在想能不能把 RingBuffer 用來解決咱們的問題呢?

對 RingBuffer 進行改進

看完了 RingBuffer 的基本原理後,就要開始用它來適應咱們的系統了。這裏遇到了幾個問題:

  1. 如何支持多個消費者

  2. 如何判斷當前有無新數據,如何判斷當前數據是否已經被新數據覆蓋

  3. 如何保證數據一致性

第一個問題

這個問題簡單,原始的 RingBuffer 只有一個寫指針和一個讀指針。

要支持多個消費者的話,只要爲每一個消費者建立一個讀指針便可。

第二個問題

RingBuffer 的一個精髓就是,寫指針和讀指針的大小是會超過數組長度的,寫入和讀取數據的時候,是採用writeIndex % CACHED_SIZE這樣的形式來讀取的。

爲何要這麼作?這就是爲了解決判斷有無新數據和數據是否已被覆蓋的問題。

假設我內部2個指針,分別叫nextWriteIndexnextReadIndex

那麼判斷有無新數據的邏輯就是if (nextReadIndex >= nextWriteIndex),返回true的話就是沒有新數據了。

而判斷數據是否被覆蓋的邏輯就是if (nextReadIndex < nextWriteIndex - CACHED_SIZE),返回true的話就是數據已經被覆蓋了。

拿實際數據舉個例子:

一個長度爲10的 RingBuffer,內部是一個長度爲10的數組。

此時nextWriteIndex=12,意味着它下一次寫入的數據會在 12%10=2 上。此時,可讀的有效範圍是 2~11,對應的數組內的索引就是 2, 3, 4, 5, 6, 7, 8, 9, 0, 1。

因此,當nextReadIndex=12 的時候,會讀到最老的數據2,這是老數據,不是新數據,此時表示沒有行數據了。

nextReadIndex=1 的時候,是新數據,而不是想要的老數據,老數據已經被覆蓋掉了,此時它沒辦法從緩存裏讀數據了。

第三個問題

最棘手的第三個問題來了,這個系統是要支持高併發的,若是是同步的操做,上面的代碼沒有任何問題。或者說,若是是同步的代碼,幹嗎還要用 RingBuffer 呢?

上面寫入和讀取,都有兩步操做,更改數據和更改索引,按照邏輯上來說,它們應該是強一致性的。只能加鎖了?若是要加鎖,爲什麼不直接用BlockingQueue

因此,是否能夠經過什麼方法,高併發和最終一致性呢?

直接貼代碼吧,根據代碼一步步分析:

public class CachedDataStorage {

    private static final int CACHED_SIZE = 5000;

    private final ChangedEventWithSequence[] data = new ChangedEventWithSequence[CACHED_SIZE];

    private volatile long nextWriteIndex = 0;

    public void append(Object dataValue) {
        data[(int) (nextWriteIndex % CACHED_SIZE)] = dataValue;
        nextWriteIndex++;
    }

    public Reader createReader() {
        return new Reader();
    }

    public class Reader {

        private Reader() {
        }

        private volatile long nextReadIndex = 0;

        public Object next() throws IOException {
            if (nextReadIndex >= nextWriteIndex) {
                return null;
            }

            if (nextReadIndex <= nextWriteIndex - CACHED_SIZE) {
                throw new IOException("data outdated");
            }

            Object dataValue = data[(int) (nextReadIndex % CACHED_SIZE)];

            if (nextReadIndex <= nextWriteIndex - CACHED_SIZE) {
                throw new IOException("data outdated");
            } else {
                nextReadIndex++;
                return dataValue;
            }
        }
    }
}

咱們來一步步分析,先看內部的ReadercreateReader()方法,每來一個客戶端就會建立一個Reader,每一個Reader會維護一個nextReadIndex

而後看append()方法,能夠說沒有任何邏輯,直接寫入數據,修改索引就結束了。可是,別小看了這兩個步驟的操做順序。

好了,到了最複雜的next()方法了,這裏可就大有講究了。

一進來馬上執行if (nextReadIndex >= nextWriteIndex),用來判斷當前是否還有更新的數據。

由於寫入的時候是先寫數據再改索引,因此可能會出現明明有數據,可是這裏認爲沒數據的狀況。

可是並無關係,咱們更關注最終一致性,由於咱們要的是確保這裏必定不會讀錯數據,而不必定要確保這裏有新數據就要馬上處理。就算這一輪沒讀到,下一輪也必定會讀取到了。

下一步是這一行if (nextReadIndex <= nextWriteIndex - CACHED_SIZE),判斷想要讀取的數據有沒有被新數據覆蓋。等一下,這裏爲何和上面介紹的不同?

上面寫的是<,而這裏倒是<=。上面提到,同步操做的狀況下,用<是沒有問題的,可是這裏的異步的。

寫入數據的時候,可能會出現數據已被覆蓋,而索引未被更新的問題,因此這樣子判斷能夠保證不會讀錯數據。

既然上下邊界都檢查過了,那麼就讀取數據吧!就當這裏準備讀數據的時候,寫數據的線程居然又寫入了好多數據,致使讀出來的數據已經被覆蓋了!

因此,必定要在讀完數據後,再次檢查數據是否被覆蓋。

最終,整個過程實現了無鎖,高併發和最終一致性。

在 Puma 系統中,啓用緩存和關閉緩存,一寫五讀的狀況下,性能整整提升了一倍。測試仍是在我 SSD 上進行的,若是是傳統硬盤,提高會更明顯。

利用 RingBuffer 實現後的優勢

代碼實現完,就能夠和BlockingQueue對比一下了。

首先,RingBuffer 徹底是無鎖的,沒有任何鎖衝突。而利用BlockingQueue的話它內部會加鎖,雖然鎖衝突不會不少,可是沒鎖確定比有鎖好。並且,當 Writer 往多個BlockingQueue中順序寫入數據的時候,會有相互影響。而利用 RingBuffer 實現的話,不管有多少 Reader,都不會影響寫入性能。

而後是內存上的優點,每次多一個 Reader 僅僅是多一個對象,Reader對象內部也只有一個變量,佔用內存很是很是小。而使用BlockingQueue的話,須要建立的就不只僅是一個對象了,會有一系列的東西。

目前看來,該實現能知足咱們的需求且無明顯缺點,並且已經在系統中平穩運行了將近一年了。在我離職以前的最後一個項目,就包括爲點評訂單系統接入 Puma,訂單系統在活動期間的寫入 QPS 很是高。但對 Puma 來講也是毫無壓力的,最終的瓶頸都不是在 Puma 上,而是在目標數據庫的寫入性能上。

高併發系統的設計思路

首先,這部分的代碼能夠在這裏找到:傳送門

完整的代碼還包含了老數據被覆蓋無數據可讀時的數據源切換邏輯,還有當無消費者時關閉 RingBuffer 的邏輯。上面的代碼已經被簡化了不少,想看完整代碼的話能夠在上面的連接中看到。

之後有空還會再介紹更多 Puma 中遇到的問題和解決的思路。

而後談談高併發系統的設計。

Java 併發編程的第一重境界是善用各類鎖,儘可能減小鎖衝突,不能有死鎖。

第二重境界就是善用 Java 的各類併發包,Java 的併發包裏有的是無鎖的,例如AtomicLong中用了CAS;有的是用了各類手段減小鎖衝突,例如ConcurrentHashMap中就用了鎖分段技術。總體效率都很是高,能熟練應用後也能寫出很高效的程序。

再下一個境界就很是搞腦子了,每每是放棄了強一致性,而去追求最終一致性。其中會用到AtomicLong等無鎖,或鎖分段技術,而且經常會把它們結合起來用。就像上面那部分代碼,看似簡單,但實際上卻要把各類邊界條件思考地很全面,由於是最終一致性,因此中間的狀態很是多。

源地址:http://www.dozer.cc/2016/09/ringbuffer-in-puma.html

相關文章
相關標籤/搜索