一樣是修改數據,一個採用加鎖的方式保證原子性,一個採用CAS的方式保證原子性。緩存
都是可以達到目的的,可是經常使用的鎖(例如顯式的Lock和隱式的synchonized),都會把獲取不到鎖的線程掛起,相對於CAS的不掛起,多了掛起和喚醒的開銷。ide
題外話:CAS與鎖的關係工具
CAS只是在這個場景下,比使用鎖來得更純粹,由於只作數據更新,因此開銷更少。可是業務上爲了保證一系列操做的原子性,仍是要使用鎖的。並且鎖的底層實現,也依賴於相似於CAS這樣的原子性操做。this
別的帖子都說RingBuffer中不維護尾指針,尾指針由消費者維護(所謂維護指針,就是修改、移動指針)其實這一句話有點誤導性,若是RingBuffer不知道尾部在哪裏,那它的數據存儲確定就會出問題,例如把還沒消費過的數據給覆蓋了。spa
確實,消費者會自行維護本身的消費指針,RingBuffer也不會去幹涉消費者指針的維護,可是它會引用全部消費者的指針,讀取他們的值,以此做爲「尾部」的判斷依據。實際上就是最慢的那個消費者爲準。線程
注:消費者指針是消費者消費過的最後一條數據的序號指針
咱們直接來看代碼,這個是RingBuffer的publishEvent方法,咱們看到,它首先取得一個可用的序列號,而後再將數據放入該序列號的對應位置中。咱們來看看這個序列號是如何取得的。code
@Override public void publishEvent(EventTranslator<E> translator) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence); }
咱們先看Sequencer的SingleProducerSequencer實現。這裏就是判斷若是生產者新指針的位置是否會超過尾部,若是超過尾部就掛起等待。注意這裏的等待方式也是自旋方式,只不過,每次失敗後都會自行掛起片刻。blog
這裏附上幾個圖可能更好理解:(畫圖工具不太好,沒法經過單元格上色的方式體現空閒狀況)隊列
狀況1:隊列已滿,生產者嘗試使用新序號14,但因爲(14 - 8 = 6),因爲最慢的消費者目前消費的最後一條數據的序號是5,5號以後的數據還沒被消費,6 > 5,因此序號14還不能用。生產者線程掛起,下次再次嘗試。
狀況2:消費者1消費了序號6的數據。(14 - 8 = 6) 不大於 6,這時序號14可用,生產者獲得可用的序號。
@Override public long next() { return next(1); } /** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1 || n > bufferSize) { throw new IllegalArgumentException("n must be > 0 and < bufferSize"); } long nextValue = this.nextValue; //當前RingBuffer的遊標,即生產者的位置指針 long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; //減掉一圈 long cachedGatingSequence = this.cachedValue; //上一次緩存的最小的消費者指針 //條件1:生產者指針的位置超過當前消費最小的指針 //條件2:爲特殊狀況,這裏先不考慮,詳見: if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fence long minSequence; //再次遍歷全部消費者的指針,確認是否超過 //若是超過,則等待 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; }
另外對於多生產者的狀況,在不會越界的狀況下,須要經過CAS來保證獲取序號的原子性。具體能夠查看MultiProducerSequencer的next方法。
RingBuffer如何知道有哪些消費者?哪些gatingSequense是從哪裏來的?
在構建RingBuffer註冊處理類的時候,就將消費者Sequense註冊到RingBuffer中了。
看代碼的話,定位到gatingSequences在AbastractSequencer,對應的有個addGatingSequenses方法用於注入gatingSequence
public abstract class AbstractSequencer implements Sequencer { //... protected volatile Sequence[] gatingSequences = new Sequence[0]; @Override public final void addGatingSequences(Sequence... gatingSequences) { SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences); } //... }
再查看addGatingSequences被調用的地方,即經過RingBuffer的方法,設置到Sequencer中,這個Sequence是生產者使用的序號管理器
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { //... protected final Sequencer sequencer; public void addGatingSequences(Sequence... gatingSequences) { sequencer.addGatingSequences(gatingSequences); } //... }
而RingBuffer的addGatingSequence則在Disruptor配置處理器的時候被調用
public class Disruptor<T> { //... private final RingBuffer<T> ringBuffer; private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>(); public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors) { for (final EventProcessor processor : processors) { consumerRepository.add(processor); } final Sequence[] sequences = new Sequence[processors.length]; for (int i = 0; i < processors.length; i++) { sequences[i] = processors[i].getSequence(); } ringBuffer.addGatingSequences(sequences); return new EventHandlerGroup<>(this, consumerRepository, Util.getSequencesFor(processors)); } //... }
緩存的意義是什麼?
咱們看到在SiingleProducerSequencer的next方法中,會緩存上一次的消費者最小序列號,這有什麼用呢?
用途就是不須要每次都讀取各消費者的序號,只要沒超過上一次的最小值的地方均可以直接分配,若是超過了,則進行再次判斷
爲啥讀取最小值不須要保證原子性?
看了這個獲取最小消費序號的,可能會奇怪,爲啥這個操做不須要上鎖,這個不是會獲取到舊值嗎?
確實,這個最小值獲取到的時候,實際上數值已經變動。可是因爲咱們的目的是爲了防止指針越位,因此用舊值是沒有問題的。(舊值<=實際上的最小值)
public static long getMinimumSequence(final Sequence[] sequences, long minimum) { for (int i = 0, n = sequences.length; i < n; i++) { long value = sequences[i].get(); minimum = Math.min(minimum, value); } return minimum; }