線程安全的無鎖RingBuffer的實現

這裏的線程安全,是指一個讀線程和一個寫線程,讀寫兩個線程是安全的,而不是說多個讀線程和多個寫線程是安全的。。java

 

 

在程序設計中,咱們有時會遇到這樣的狀況,一個線程將數據寫到一個buffer中,另一個線程從中讀數據。因此這裏就有多線程競爭的問題。一般的解決辦法是對競爭資源加鎖。可是,通常加鎖的損耗較高。其實,對於這樣的一個線程寫,一個線程讀的特殊狀況,能夠以一種簡單的無鎖RingBuffer來實現。這樣代碼的運行效率很高。git

本文借鑑了Disruptor項目代碼。github

代碼我在github上放了一份,須要的同窗能夠去下載(RingBuffer.java)。本文最後也會附上一份。安全

代碼的基本原理以下。多線程

如圖所示,假定buffer的長度是bufferSize. 咱們設置兩個指針。head指向的是下一次讀的位置,而tail指向的是下一次寫的位置。因爲這裏是環形buffer (ring buffer),這裏就有一個問題,怎樣判斷buffer是滿或者空。這裏採用的規則是,buffer的最後一個單元不存儲數據。因此,若是head == tail,那麼說明buffer爲空。若是 head == tail + 1 (mod bufferSize),那麼說明buffer滿了。post

接下來就是最重要的內容了:怎樣以無鎖的方式進行線程安全的buffer的讀寫操做。基本原理是這樣的。在進行讀操做的時候,咱們只修改head的值,而在寫操做的時候咱們只修改tail的值。在寫操做時,咱們在寫入內容到buffer以後才修改tail的值;而在進行讀操做的時候,咱們會讀取tail的值並將其賦值給copyTail。賦值操做是原子操做。因此在讀到copyTail以後,從head到copyTail之間必定是有數據能夠讀的,不會出現數據沒有寫入就進行讀操做的狀況。一樣的,讀操做完成以後,纔會修改head的數值;而在寫操做以前會讀取head的值判斷是否有空間能夠用來寫數據。因此,這時候tail到head - 1之間必定是有空間能夠寫數據的,而不會出現一個位置的數據尚未讀出就被寫操做覆蓋的狀況。這樣就保證了RingBuffer的線程安全性。google

最後附上代碼供參考。歡迎批評指正,也歡迎各類討論!spa

複製代碼
 1 public class RingBuffer {
 2 
 3     private final static int bufferSize = 1024;
 4     private String[] buffer = new String[bufferSize];
 5     private int head = 0;
 6     private int tail = 0;
 7     
 8     private Boolean empty() {
 9         return head == tail;
10     }
11     private Boolean full() {
12         return (tail + 1) % bufferSize == head;
13     }
14     public Boolean put(String v) {
15         if (full()) {
16             return false;
17         }
18         buffer[tail] = v;
19         tail = (tail + 1) % bufferSize;
20         return true;
21     }
22     public String get() {
23         if (empty()) {
24             return null;
25         }
26         String result = buffer[head];
27         head = (head + 1) % bufferSize;
28         return result;
29     }
30     public String[] getAll() {
31         if (empty()) {
32             return new String[0];
33         }
34         int copyTail = tail;
35         int cnt = head < copyTail ? copyTail - head : bufferSize - head + copyTail;
36         String[] result = new String[cnt];
37         if (head < copyTail) {
38             for (int i = head; i < copyTail; i++) {
39                 result[i - head] = buffer[i];
40             }
41         } else {
42             for (int i = head; i < bufferSize; i++) {
43                 result[i - head] = buffer[i];
44             }
45             for (int i = 0; i < copyTail; i++) {
46                 result[bufferSize - head + i] = buffer[i];
47             }
48         }
49         head = copyTail;
50         return result;
51     }
52 }
複製代碼

 

 
 
標籤:  ring buffer多線程無鎖
相關文章
相關標籤/搜索