圖片來自pexelsjava
最近在閱讀《多處理器編程藝術》一書,學習了不少Java多線程的底層知識,如今就作一下書中鏈表-鎖的做用一章的總結。 node
粗粒度同步
所謂粗粒度同步其實很簡單,就是在List的 add
, remove
, contains
函數的開始就直接使用Lock加鎖,而後在函數結尾釋放。 git
add
函數的代碼以下所示,函數的主體就是鏈表的遍歷添加邏輯,只不過在開始和結束進行了鎖的獲取和釋放。程序員
private Node head;private Lock lock = new ReentrantLock();public boolean add(T item) { Node pred, curr; int key = item.hashCode(); lock.lock(); try { pred = head; curr = pred.next; while(curr.key < key) { pred = curr; curr = pred.next; } if (key == curr.key) { return false; } else { Node node = new Node(item); node.next = curr; pred.next = node; return true; } } finally { lock.unlock(); }}
github
算法
編程
微信
多線程
併發
你們看到這裏就會想到,這不就是相似於 Hashtable
的實現方式嗎?把可能出現多線程問題的函數都用一個重入鎖鎖住。
可是這個方法的缺點很明顯,若是競爭激烈的話,對鏈表的操做效率會很低,由於 add
, remove
, contains
三個函數都須要獲取鎖,也都須要等待鎖的釋放。至於如何優化,咱們能夠一步一步往下看
細粒度同步
咱們能夠經過鎖定單個節點而不是整個鏈表來提升併發。給每一個節點增長一個Lock變量以及相關的lock()和unlock()函數,當線程遍歷鏈表的時候,若它是第一個訪問節點的線程,則鎖住被訪問的節點,在隨後的某個時刻釋放鎖。這種細粒度的鎖機制容許併發線程以流水線的方式遍歷鏈表。
使用這種方式來遍歷鏈表,必須同時獲取兩個相鄰節點的鎖,經過「交叉手」的方式來獲取鎖:除了初始的head哨兵節點外,只有在已經獲取pred的鎖時,才能獲取curr的鎖。
//每一個Node對象中都有一個Lock對象,能夠進行lock()和unlock()操做public boolean add(T item) { int key = item.hashCode(); head.lock(); Node pred = head; try { Node curr = pred.next; curr.lock(); try { while (curr.key < key) { // 釋放前一個節點的鎖 pred.unlock(); pred = curr; curr = pred.next; // 獲取當前節點的鎖 curr.lock(); } if (curr.key == key) { return false; } Node newNode = new Node(item); newNode.next = curr; pred.next = newNode; return true; } finally { curr.unlock(); } } finally { pred.unlock(); }}
樂觀同步
雖然細粒度鎖是對單一粒度鎖的一種改進,但它仍然出現很長的獲取鎖和釋放鎖的序列。並且,訪問鏈表中不一樣部分的線程仍然可能相互阻塞。例如,一個正在刪除鏈表中第二個元素的線程將會阻塞全部試圖查找後繼節點的線程。
減小同步代價的一種方式就是樂觀:不須要獲取鎖就能夠查找,對找到的節點進行加鎖,而後確認鎖住的節點是正確的;若是一個同步衝突致使節點被錯誤的鎖定,則釋放這些鎖從新開始。
public boolean add(T item) { int key = item.hashCode(); while (true) { //若是不成功,就進行重試 Node pred = head; Node curr = pred.next; while (curr.key < key) { pred = curr; curr = pred.next; } //找到目標相關的pred和curr以後再將兩者鎖住 pred.lock(); curr.lock(); try { //鎖住兩者以後再進行判斷,是否存在併發衝突 if (validate(pred, curr)) { //若是不存在,那麼就直接進行正常操做 if (curr.key == key) { return false; } else { Node node = new Node(item); node.next = curr; pred.next = node; } } } finally { pred.unlock(); curr.unlock(); } }}public boolean validate(Node pred, Node curr) { //從隊列頭開始查找pred和curr,判斷是否存在併發衝突 Node node = head; while (node.key <= pred.key) { if (node == pred) { return pred.next == curr; } node = node.next; } return false;}
因爲再也不使用能保護併發修改的鎖,因此每一個方法調用均可能遍歷那些已經被刪除的節點,因此在進行添加,刪除獲取判斷是否存在的以前必須再次進行驗證。
惰性同步
當不用鎖遍歷兩次鏈表的代價比使用鎖遍歷一次鏈表的代價小不少時,樂觀同步的實現效果很是好。可是這種算法的缺點之一就是contains()方法在遍歷時須要鎖,這一點並不使人滿意,其緣由在於對contains()的調用要比其餘方法的調用頻繁得多。
使用惰性同步的方法,使得contains()調用是無等待的,同時add()和remove()方法即便在被阻塞的狀況下也只須要遍歷一次鏈表。
對每一個節點增長一個布爾類型的marked域,用於說明該節點是否在節點集合中。如今,遍歷再也不須要鎖定目標結點,也沒有必須經過從新遍歷整個鏈表來驗證結點是否可達。全部未被標記的節點必然是可達的。
//add方法和樂觀同步的方法一致,只有檢驗方法作了修改。//只須要檢測節點的marked變量就能夠,而且查看pred的next是否仍是指向curr,須要注意的是marked變量必定是voliate的。private boolean validate(Node pred, Node curr) { return !pred.marked && !curr.marked && pred.next == curr;}
惰性同步的優勢之一就是可以將相似於設置一個flag這樣的邏輯操做與相似於刪除結點的連接這種對結構的物理改變分開。
一般狀況下,延遲操做能夠是批量處理方式進行,且在某個方便的時候再懶惰地進行處理,從而下降了對結構進行物理修改的總體破裂性。惰性同步的主要缺點是add()和remove()調用是阻塞的,若是一個線程延遲,那麼其餘線程也將延遲。
非阻塞同步
使用惰性同步的思惟是很是有益處的。咱們能夠進一步將add(),remove()和contains()這三個方法都變成非阻塞的。
前兩個方法是無鎖的,最後一個方法是無等待的。咱們沒法直接使用compareAndSet方法來改變next域來實現,由於這樣會出現問題。可是咱們能夠將結點的next域和marked域看做是單個的原子單位,當marked域爲true時,對next域的任何修改都將失敗。
咱們可使用AtomicMarkableReference
public Window find(Node head, int key) { Node pred = null, curr = null, succ = null; boolean[] marked = {false}; boolean snip; retry: while(true) { pred = head; curr = curr.next.get(marked); while(true) { succ = curr.next.get(marked); //獲取succ,而且查看是被被標記 while (marked[0]) {//若是被標記了,說明curr被邏輯刪除了,須要繼續物理刪除 snip = pred.next.compareAndSet(curr, succ, false, false);// if (!snip) continue retry; curr = succ; succ = curr.next.get(marked); } //當不須要刪除後,才繼續遍歷 if (curr.key >= key) { return new Window(pred, curr); } pred = curr; curr = succ; } }}public boolean add(T item) { int key = item.hashCode(); while(true) { Window window = find(head, key); Node pred = window.pred, curr = window.curr; if (curr.key == key) { return false; } else { Node node = new Node(item); node.next = new AtomicMarkableReference<>(curr, false); if (pred.next.compareAndSet(curr, node, false, false)) { return true; } } }}public boolean remove(T item) { int key = item.hashCode(); boolean sinp; while(true) { Window window = find(head, key); Node pred = window.pred, curr = window.curr; if (curr.key != key) { return false; } else { Node succ = curr.next.getReference(); //要進行刪除了,那麼就直接將curr.next設置爲false,而後在進行真正的物理刪除。 sinp = curr.next.compareAndSet(curr, succ, false, true); if (!sinp) { continue; } pred.next.compareAndSet(curr, succ, false, false); return true; } }}class Node { AtomicMarkableReference<Node> next;}
後記
文中的代碼在個人github的這個repo中均可以找到。
本文分享自微信公衆號 - 程序員歷小冰(gh_a1d0b50d8f0a)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。