環形緩衝區:https://zh.wikipedia.org/wiki/環形緩衝區html
維基百科的解釋是:它是一種用於表示一個固定尺寸、頭尾相連的緩衝區的數據結構,適合緩存數據流。java
底層數據結構很是簡單,一個固定長度的數組加一個寫指針和一個讀指針。git
只要像這張圖同樣,把這個數組辦彎,它就成了一個 RingBuffer。github
那它到底有什麼精妙的地方呢?數據庫
我最近作的項目正好要用到相似的設計思路,因此翻出了之前在點評寫的 Puma 系統,看了看之前本身寫的代碼。順便寫個文章總結一下。編程
Puma 是一個 MySQL 數據庫 Binlog 訂閱消費系統。相似於阿里的 Canel。設計模式
Puma 會假裝成一個 MySQL Slave,而後消費 Binlog 數據,並緩存在本地。當有客戶端鏈接上來的時候,就會從本地讀取數據給客戶端消費。數組
若是用很通常的設計,一個單獨的線程會從 MySQL 消費數據,存到本地文件中。而後每一個客戶端的鏈接都會有一個線程,從本地文件中讀取數據。緩存
沒錯,初版就是這麼簡單粗暴,固然,它是有效的。數據結構
不用作性能測試就知道,系統壓力大了之後這裏一定會是一個瓶頸。讀寫數據會有一點延時,若是多個客戶端同時讀取同一份數據又會形成不少的浪費。而後數據的編碼解碼還會有很多損耗。
因此這裏固然要加一個緩存了。
一個線程寫數據到磁盤,而後它能夠同時把數據傳遞給各個客戶端。
聽起來像發佈者訂閱者模式?也有點像生產者消費者模式?
可是,它並不只僅是發佈者訂閱者模式,由於這裏的「發佈者」和「訂閱者」是徹底異步的,並且每一個「訂閱者」的消費速度是不同的。
它也不只僅是生產者消費者模式,由於這裏的「消費者」是同時消費全部數據,而不是把數據分發給各個「消費者」。
它們的消費速度不同,還會出現緩存內的數據過新,「消費者」不得不去磁盤讀取。
感受這裏的需求是兩種設計模式的結合。
不只如此,這是一套高併發的系統,怎麼保證性能,怎麼保證數據一致性?
因此,這個緩存看上去簡單,其實它不簡單。
目標明確後,看看 Java 的併發集合中有什麼能知足需求的吧。
第一個想到的就是BlockingQueue
,爲每個客戶端建立一個BlockingQueue
,BlockingQueue
內部是經過加鎖來實現的,雖然鎖衝突不會不少,但高併發的狀況下,最好仍是能作到無鎖。
當時正好看到了一系列介紹 Disruptor 的文章:傳送門
因此就在想能不能把 RingBuffer 用來解決咱們的問題呢?
看完了 RingBuffer 的基本原理後,就要開始用它來適應咱們的系統了。這裏遇到了幾個問題:
如何支持多個消費者
如何判斷當前有無新數據,如何判斷當前數據是否已經被新數據覆蓋
如何保證數據一致性
這個問題簡單,原始的 RingBuffer 只有一個寫指針和一個讀指針。
要支持多個消費者的話,只要爲每一個消費者建立一個讀指針便可。
RingBuffer 的一個精髓就是,寫指針和讀指針的大小是會超過數組長度的,寫入和讀取數據的時候,是採用writeIndex % CACHED_SIZE
這樣的形式來讀取的。
爲何要這麼作?這就是爲了解決判斷有無新數據和數據是否已被覆蓋的問題。
假設我內部2個指針,分別叫nextWriteIndex
,nextReadIndex
。
那麼判斷有無新數據的邏輯就是if (nextReadIndex >= nextWriteIndex)
,返回true
的話就是沒有新數據了。
而判斷數據是否被覆蓋的邏輯就是if (nextReadIndex < nextWriteIndex - CACHED_SIZE)
,返回true
的話就是數據已經被覆蓋了。
拿實際數據舉個例子:
一個長度爲10的 RingBuffer,內部是一個長度爲10的數組。
此時nextWriteIndex
=12,意味着它下一次寫入的數據會在 12%10=2 上。此時,可讀的有效範圍是 2~11,對應的數組內的索引就是 2, 3, 4, 5, 6, 7, 8, 9, 0, 1。
因此,當nextReadIndex
=12 的時候,會讀到最老的數據2,這是老數據,不是新數據,此時表示沒有行數據了。
當nextReadIndex
=1 的時候,是新數據,而不是想要的老數據,老數據已經被覆蓋掉了,此時它沒辦法從緩存裏讀數據了。
最棘手的第三個問題來了,這個系統是要支持高併發的,若是是同步的操做,上面的代碼沒有任何問題。或者說,若是是同步的代碼,幹嗎還要用 RingBuffer 呢?
上面寫入和讀取,都有兩步操做,更改數據和更改索引,按照邏輯上來說,它們應該是強一致性的。只能加鎖了?若是要加鎖,爲什麼不直接用BlockingQueue
?
因此,是否能夠經過什麼方法,高併發和最終一致性呢?
直接貼代碼吧,根據代碼一步步分析:
public class CachedDataStorage { private static final int CACHED_SIZE = 5000; private final ChangedEventWithSequence[] data = new ChangedEventWithSequence[CACHED_SIZE]; private volatile long nextWriteIndex = 0; public void append(Object dataValue) { data[(int) (nextWriteIndex % CACHED_SIZE)] = dataValue; nextWriteIndex++; } public Reader createReader() { return new Reader(); } public class Reader { private Reader() { } private volatile long nextReadIndex = 0; public Object next() throws IOException { if (nextReadIndex >= nextWriteIndex) { return null; } if (nextReadIndex <= nextWriteIndex - CACHED_SIZE) { throw new IOException("data outdated"); } Object dataValue = data[(int) (nextReadIndex % CACHED_SIZE)]; if (nextReadIndex <= nextWriteIndex - CACHED_SIZE) { throw new IOException("data outdated"); } else { nextReadIndex++; return dataValue; } } } }
咱們來一步步分析,先看內部的Reader
和createReader()
方法,每來一個客戶端就會建立一個Reader
,每一個Reader
會維護一個nextReadIndex
。
而後看append()
方法,能夠說沒有任何邏輯,直接寫入數據,修改索引就結束了。可是,別小看了這兩個步驟的操做順序。
好了,到了最複雜的next()
方法了,這裏可就大有講究了。
一進來馬上執行if (nextReadIndex >= nextWriteIndex)
,用來判斷當前是否還有更新的數據。
由於寫入的時候是先寫數據再改索引,因此可能會出現明明有數據,可是這裏認爲沒數據的狀況。
可是並無關係,咱們更關注最終一致性,由於咱們要的是確保這裏必定不會讀錯數據,而不必定要確保這裏有新數據就要馬上處理。就算這一輪沒讀到,下一輪也必定會讀取到了。
下一步是這一行if (nextReadIndex <= nextWriteIndex - CACHED_SIZE)
,判斷想要讀取的數據有沒有被新數據覆蓋。等一下,這裏爲何和上面介紹的不同?
上面寫的是<
,而這裏倒是<=
。上面提到,同步操做的狀況下,用<
是沒有問題的,可是這裏的異步的。
寫入數據的時候,可能會出現數據已被覆蓋,而索引未被更新的問題,因此這樣子判斷能夠保證不會讀錯數據。
既然上下邊界都檢查過了,那麼就讀取數據吧!就當這裏準備讀數據的時候,寫數據的線程居然又寫入了好多數據,致使讀出來的數據已經被覆蓋了!
因此,必定要在讀完數據後,再次檢查數據是否被覆蓋。
最終,整個過程實現了無鎖,高併發和最終一致性。
在 Puma 系統中,啓用緩存和關閉緩存,一寫五讀的狀況下,性能整整提升了一倍。測試仍是在我 SSD 上進行的,若是是傳統硬盤,提高會更明顯。
代碼實現完,就能夠和BlockingQueue
對比一下了。
首先,RingBuffer 徹底是無鎖的,沒有任何鎖衝突。而利用BlockingQueue
的話它內部會加鎖,雖然鎖衝突不會不少,可是沒鎖確定比有鎖好。並且,當 Writer 往多個BlockingQueue
中順序寫入數據的時候,會有相互影響。而利用 RingBuffer 實現的話,不管有多少 Reader,都不會影響寫入性能。
而後是內存上的優點,每次多一個 Reader 僅僅是多一個對象,Reader
對象內部也只有一個變量,佔用內存很是很是小。而使用BlockingQueue
的話,須要建立的就不只僅是一個對象了,會有一系列的東西。
目前看來,該實現能知足咱們的需求且無明顯缺點,並且已經在系統中平穩運行了將近一年了。在我離職以前的最後一個項目,就包括爲點評訂單系統接入 Puma,訂單系統在活動期間的寫入 QPS 很是高。但對 Puma 來講也是毫無壓力的,最終的瓶頸都不是在 Puma 上,而是在目標數據庫的寫入性能上。
首先,這部分的代碼能夠在這裏找到:傳送門
完整的代碼還包含了老數據被覆蓋無數據可讀時的數據源切換邏輯,還有當無消費者時關閉 RingBuffer 的邏輯。上面的代碼已經被簡化了不少,想看完整代碼的話能夠在上面的連接中看到。
之後有空還會再介紹更多 Puma 中遇到的問題和解決的思路。
而後談談高併發系統的設計。
Java 併發編程的第一重境界是善用各類鎖,儘可能減小鎖衝突,不能有死鎖。
第二重境界就是善用 Java 的各類併發包,Java 的併發包裏有的是無鎖的,例如AtomicLong
中用了CAS
;有的是用了各類手段減小鎖衝突,例如ConcurrentHashMap
中就用了鎖分段技術。總體效率都很是高,能熟練應用後也能寫出很高效的程序。
再下一個境界就很是搞腦子了,每每是放棄了強一致性,而去追求最終一致性。其中會用到AtomicLong
等無鎖,或鎖分段技術,而且經常會把它們結合起來用。就像上面那部分代碼,看似簡單,但實際上卻要把各類邊界條件思考地很全面,由於是最終一致性,因此中間的狀態很是多。