轉載自併發編程網 – ifeve.com
http://ifeve.com/%e9%ab%98%e5...
借用Java併發編程實踐中的話」編寫正確的程序並不容易,而編寫正常的併發程序就更難了」,相比於順序執行的狀況,多線程的線程安全問題是微妙並且出乎意料的,由於在沒有進行適當同步的狀況下多線程中各個操做的順序是不可預期的,本文算是對多線程狀況下同步策略的一個簡單介紹。node
線程安全問題是指當多個線程同時讀寫一個狀態變量,而且沒有任何同步措施時候,致使髒數據或者其餘不可預見的結果的問題。Java中首要的同步策略是使用Synchronized關鍵字,它提供了可重入的獨佔鎖。算法
要談可見性首先須要介紹下多線程處理共享變量時候的Java中內存模型。數據庫
Java內存模型規定了全部的變量都存放在主內存中,當線程使用變量時候都是把主內存裏面的變量拷貝到了本身的工做空間或者叫作工做內存。編程
當線程操做一個共享變量時候操做流程爲:*segmentfault
那麼假如線程A和B同時去處理一個共享變量,會出現什麼狀況那?緩存
首先他們都會去走上面的三個流程,假如線程A拷貝共享變量到了工做內存,而且已經對數據進行了更新可是尚未更新會主內存(結果可能目前存放在當前cpu的寄存器或者高速緩存),這時候線程B拷貝共享變量到了本身的工做內存進行處理,處理後,線程A才把本身的處理結果更更新到主內存或者緩存,可知 線程B處理的並非線程A處理後的結果,也就是說線程A處理後的變量值對線程B不可見,這就是共享變量的不可見性問題。安全
構成共享變量內存不可見緣由是由於三步流程不是原子性操做,下面知道使用恰當同步就能夠解決這個問題。多線程
咱們知道ArrayList是線程不安全的,由於他的讀寫方法沒有同步策略,會致使髒數據和不可預期的結果,下面咱們就一一講解如何解決。架構
這是線程不安全的 public class ArrayList<E> { public E get(int index) { rangeCheck(index); return elementData(index); } public E set(int index, E element) { rangeCheck(index); E oldValue = elementData(index); elementData[index] = element; return oldValue; } }
假設線程A執行操做Ao和線程B執行操做Bo ,那麼從A看,當B線程執行Bo操做時候,那麼Bo操做所有執行,要麼所有不執行,咱們稱Ao和Bo操做互爲原子性操做,在設計計數器時候通常都是先讀取當前值,而後+1,而後更新會變量,是讀-改-寫的過程,這個過程必須是原子性的操做。併發
public class ThreadNotSafeCount { private Long value; public Long getCount() { return value; } public void inc() { ++value; } }
如上代碼是線程不安全的,由於不能保證++value是原子性操做。方法一是使用Synchronized進行同步以下:
public class ThreadSafeCount { private Long value; public synchronized Long getCount() { return value; } public synchronized void inc() { ++value; } }
注意: 這裏不能簡單的使用volatile修飾value進行同步,由於變量值依賴了當前值
使用Synchronized確實能夠實現線程安全,即實現可見性和同步,可是Synchronized是獨佔鎖,沒有獲取內部鎖的線程會被阻塞掉,那麼有沒有恰好的實現那?答案是確定的。
原子變量類比鎖更輕巧,好比AtomicLong表明了一個Long值,並提供了get,set方法,get,set方法語義和volatile相同,由於AtomicLong內部就是使用了volatile修飾的真正的Long變量。另外提供了原子性的自增自減操做,因此計數器能夠改下爲:
public class ThreadSafeCount { private AtomicLong value = new AtomicLong(0L); public Long getCount() { return value.get(); } public void inc() { value.incrementAndGet(); } }
那麼相比使用synchronized的好處在於原子類操做不會致使線程的掛起和從新調度,由於他內部使用的是cas的非阻塞算法。
經常使用的原子類變量爲:
CAS 即CompareAndSet,也就是比較並設置,CAS有三個操做數分別爲:內存位置,舊的預期值,新的值,操做含義是當內存位置的變量值爲舊的預期值時候使用新的值替換舊的值。通俗的說就是看內存位置的變量值是否是我給的舊的預期值,若是是則使用我給的新的值替換他,若是不是返回給我舊值。這個是處理器提供的一個原子性指令。上面介紹的AtomicLong的自增就是使用這種方式實現:
public final long incrementAndGet() { for (;;) { long current = get();(1) long next = current + 1;(2) if (compareAndSet(current, next))(3) return next; } } public final boolean compareAndSet(long expect, long update) { return unsafe.compareAndSwapLong(this, valueOffset, expect, update); }
假如當前值爲1,那麼線程A和檢查B同時執行到了(3)時候各自的next都是2,current=1,假如線程A先執行了3,那麼這個是原子性操做,會把檔期值更新爲2而且返回1,if判斷true因此incrementAndGet返回2.這時候線程B執行3,由於current=1而當前變量實際值爲2,因此if判斷爲false,繼續循環,若是沒有其餘線程去自增變量的話,此次線程B就會更新變量爲3而後退出。
這裏使用了無限循環使用CAS進行輪詢檢查,雖然必定程度浪費了cpu資源,可是相比鎖來講避免的線程上下文切換和調度。
當一個線程要獲取一個被其餘線程佔用的鎖時候,該線程會被阻塞,那麼當一個線程再次獲取它本身已經獲取的鎖時候是否會被阻塞那?若是不須要阻塞那麼咱們說該鎖是可重入鎖,也就是說只要該線程獲取了該鎖,那麼能夠無限制次數進入被該鎖鎖住的代碼。
先看一個例子若是鎖不是可重入的,看看會出現什麼問題。
public class Hello{ public Synchronized void helloA(){ System.out.println("hello"); } public Synchronized void helloB(){ System.out.println("hello B"); helloA(); } }
如上面代碼當調用helloB函數前會先獲取內置鎖,而後打印輸出,而後調用helloA方法,調用前會先去獲取內置鎖,若是內置鎖不是可重入的那麼該調用就會致使死鎖了,由於線程持有並等待了鎖。
實際上內部鎖是可重入鎖,例如synchronized關鍵字管理的方法,可重入鎖的原理是在鎖內部維護了一個線程標示,標示該鎖目前被那個線程佔用,而後關聯一個計數器,一開始計數器值爲0,說明該鎖沒有被任何線程佔用,當一個線程獲取了該鎖,計數器會變成1,其餘線程在獲取該鎖時候發現鎖的全部者不是本身因此被阻塞,可是當獲取該鎖的線程再次獲取鎖時候發現鎖擁有者是本身會把計數器值+1, 當釋放鎖後計數器會-1,當計數器爲0時候,鎖裏面的線程標示重置爲null,這時候阻塞的線程會獲取被喚醒來獲取該鎖。
synchronized塊是Java提供的一種強制性內置鎖,每一個Java對象均可以隱式的充當一個用於同步的鎖的功能,這些內置的鎖被稱爲內部鎖或者叫監視器鎖,執行代碼在進入synchronized代碼塊前會自動獲取內部鎖,這時候其餘線程訪問該同步代碼塊時候會阻塞掉。拿到內部鎖的線程會在正常退出同步代碼塊或者異常拋出後釋放內部鎖,這時候阻塞掉的線程才能獲取內部鎖進入同步代碼塊。
內部鎖是一種互斥鎖,具體說是同時只有一個線程能夠拿到該鎖,當一個線程拿到該鎖而且沒有釋放的狀況下,其餘線程只能等待。
對於上面說的ArrayList可使用synchronized進行同步來處理可見性問題。
使用synchronized對方法進行同步 public class ArrayList<E> { public synchronized E get(int index) { rangeCheck(index); return elementData(index); } public synchronized E set(int index, E element) { rangeCheck(index); E oldValue = elementData(index); elementData[index] = element; return oldValue; } }
如圖當線程A獲取內部鎖進入同步代碼塊後,線程B也準備要進入同步塊,可是因爲A還沒釋放鎖,因此B如今進入等待,使用同步能夠保證線程A獲取鎖到釋放鎖期間的變量值對B獲取鎖後均可見。也就是說當B開始執行A執行的代碼同步塊時候能夠看到A操做的全部變量值,這裏具體說是當線程B獲取b的值時候可以保證獲取的值是2。這時由於線程A進入同步塊修改變量值後,會在退出同步塊前把值刷新到主內存,而線程B在進入同步塊前會首先清空本地內存內容,從主內存從新獲取變量值,因此實現了可見性。可是要注意一點全部線程使用的是同一個鎖。
注意: Synchronized關鍵字會引發線程上下文切換和線程調度。
使用synchronized能夠實現同步,可是缺點是同時只有一個線程能夠訪問共享變量,可是正常狀況下,對於多個讀操做操做共享變量時候是不須要同步的,synchronized時候沒法實現多個讀線程同時執行,而大部分狀況下讀操做次數多於寫操做,因此這大大下降了併發性,因此出現了ReentrantReadWriteLock,它能夠實現讀寫分離,多個線程同時進行讀取,可是最多一個寫線程存在。
對於上面的方法如今能夠修改成:
public class ArrayList<E> { private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public E get(int index) { Lock readLock = readWriteLock.readLock(); readLock.lock(); try { return list.get(index); } finally { readLock.unlock(); } } public E set(int index, E element) { Lock wirteLock = readWriteLock.writeLock(); wirteLock.lock(); try { return list.set(index, element); } finally { wirteLock.unlock(); } } }
如代碼在get方法時候經過 readWriteLock.readLock()獲取了讀鎖,多個線程能夠同時獲取這讀鎖,set方法經過readWriteLock.writeLock()獲取了寫鎖,同時只有一個線程能夠獲取寫鎖,其餘線程在獲取寫鎖時候會阻塞直到寫鎖被釋放。假如一個線程已經獲取了讀鎖,這時候若是一個線程要獲取寫鎖時候要等待直到釋放了讀鎖,若是一個線程獲取了寫鎖,那麼全部獲取讀鎖的線程須要等待直到寫鎖被釋放。因此相比synchronized來講運行多個讀者同時存在,因此提升了併發量。
注意: 須要使用者顯示調用Lock與unlock操做
對於避免不可見性問題,Java還提供了一種弱形式的同步,即便用了volatile關鍵字。該關鍵字確保了對一個變量的更新對其餘線程可見。當一個變量被聲明爲volatile時候,線程寫入時候不會把值緩存在寄存器或者或者在其餘地方,當線程讀取的時候會從主內存從新獲取最新值,而不是使用當前線程的拷貝內存變量值。
volatile雖然提供了可見性保證,可是不能使用他來構建複合的原子性操做,也就是說當一個變量依賴其餘變量或者更新變量值時候新值依賴當前老值時候不在適用。
與synchronized類似之處在於如圖
如圖線程A修改了volatile變量b的值,而後線程B讀取了改變量值,那麼全部A線程在寫入變量b值前可見的變量值,在B讀取volatile變量b後對線程B都是可見的,圖中線程B對A操做的變量a,b的值均可見的。volatile的內存語義和synchronized有相似之處,具體說是說當線程寫入了volatile變量值就等價於線程退出synchronized同步塊(會把寫入到本地內存的變量值同步到主內存),讀取volatile變量值就至關於進入同步塊(會先清空本地內存變量值,從主內存獲取最新值)。
下面的Integer也是線程不安全的,由於沒有進行同步措施:
public class ThreadNotSafeInteger { private int value; public int get() { return value; } public void set(int value) { this.value = value; } }
使用synchronized關鍵字進行同步以下:
public class ThreadSafeInteger { private int value; public synchronized int get() { return value; } public synchronized void set(int value) { this.value = value; } }
等價於使用volatile進行同步以下:
public class ThreadSafeInteger { private volatile int value; public int get() { return value; } public void set(int value) { this.value = value; } }
這裏使用synchronized和使用volatile是等價的,可是並非全部狀況下都是等價,通常只有知足下面全部條件才能使用volatile
另外 加鎖能夠同時保證可見性和原子性,而volatile只保證變量值的可見性。
注意: volatile關鍵字不會引發線程上下文切換和線程調度。另外volatile還用來解決重排序問題,後面會講到。
悲觀鎖,指數據被外界修改持保守態度(悲觀),在整個數據處理過程當中,將數據處於鎖定狀態。 悲觀鎖的實現,每每依靠數據庫提供的鎖機制 。數據庫中實現是對數據記錄進行操做前,先給記錄加排它鎖,若是獲取鎖失敗,則說明數據正在被其餘線程修改,則等待或者拋出異常。若是加鎖成功,則獲取記錄,對其修改,而後事務提交後釋放排它鎖。
一個例子:select * from 表 where .. for update;
悲觀鎖是先加鎖再訪問策略,處理加鎖會讓數據庫產生額外的開銷,還有增長產生死鎖的機會,另外在多個線程只讀狀況下不會產生數據不一致行問題,不必使用鎖,只會增長系統負載,下降併發性,由於當一個事務鎖定了該條記錄,其餘讀該記錄的事務只能等待。
樂觀鎖是相對悲觀鎖來講的,它認爲數據通常狀況下不會形成衝突,因此在訪問記錄前不會加排他鎖,而是在數據進行提交更新的時候,纔會正式對數據的衝突與否進行檢測,具體說根據update返回的行數讓用戶決定如何去作。樂觀鎖並不會使用數據庫提供的鎖機制,通常在表添加version字段或者使用業務狀態來作。
樂觀鎖直到提交的時候纔去鎖定,因此不會產生任何鎖和死鎖。
根據鎖可以被單個線程仍是多個線程共同持有,鎖又分爲獨佔鎖和共享鎖。獨佔鎖保證任什麼時候候都只有一個線程能讀寫權限,ReentrantLock就是以獨佔方式實現的互斥鎖。共享鎖則能夠同時有多個讀線程,但最多隻能有一個寫線程,讀和寫是互斥的,例如ReadWriteLock讀寫鎖,它容許一個資源能夠被多線程同時進行讀操做,或者被一個線程 寫操做,但二者不能同時進行。
獨佔鎖是一種悲觀鎖,每次訪問資源都先加上互斥鎖,這限制了併發性,由於讀操做並不會影響數據一致性,而獨佔鎖只容許同時一個線程讀取數據,其餘線程必須等待當前線程釋放鎖才能進行讀取。
共享鎖則是一種樂觀鎖,它放寬了加鎖的條件,容許多個線程同時進行讀操做。
根據線程獲取鎖的搶佔機制鎖能夠分爲公平鎖和非公平鎖,公平鎖表示線程獲取鎖的順序是按照線程加鎖的時間多少來決定的,也就是最先加鎖的線程將最先獲取鎖,也就是先來先得的FIFO順序。而非公平鎖則運行闖入,也就是先來不必定先得。
ReentrantLock提供了公平和非公平鎖的實現:
若是構造函數不傳遞參數,則默認是非公平鎖。
在沒有公平性需求的前提下儘可能使用非公平鎖,由於公平鎖會帶來性能開銷。
假設線程A已經持有了鎖,這時候線程B請求該鎖將會被掛起,當線程A釋放鎖後,假如當前有線程C也須要獲取該鎖,若是採用非公平鎖方式,則根據線程調度策略線程B和C二者之一可能獲取鎖,這時候不須要任何其餘干涉,若是使用公平鎖則須要把C掛起,讓B獲取當前鎖。
AbstractQueuedSynchronizer提供了一個隊列,大多數開發者可能歷來不會直接用到AQS,AQS有個變量用來存放狀態信息 state,能夠經過protected的getState,setState,compareAndSetState函數進行調用。對於ReentrantLock來講,state能夠用來表示該線程獲可重入鎖的次數,semaphore來講state用來表示當前可用信號的個數,FutuerTask用來表示任務狀態(例如還沒開始,運行,完成,取消)。
public class Test { private static final int ThreadNum = 10; public static void main(String[] args) { //建立一個CountDownLatch實例,管理計數爲ThreadNum CountDownLatch countDownLatch = new CountDownLatch(ThreadNum); //建立一個固定大小的線程池 ExecutorService executor = Executors.newFixedThreadPool(ThreadNum); //添加線程到線程池 for(int i =0;i<ThreadNum;++i){ executor.execute(new Person(countDownLatch, i+1)); } System.out.println("開始等待全員簽到..."); try { //等待全部線程執行完畢 countDownLatch.await(); System.out.println("簽到完畢,開始吃飯"); } catch (InterruptedException e) { e.printStackTrace(); }finally { executor.shutdown(); } } static class Person implements Runnable{ private CountDownLatch countDownLatch; private int index; public Person(CountDownLatch cdl,int index){ this.countDownLatch = cdl; this.index = index; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("person " + index +"簽到"); //線程執行完畢,計數器減一 countDownLatch.countDown(); } } }
如上代碼,建立一個線程池和CountDownLatch實例,每一個線程經過構造函數傳入CountDownLatch的實例,主線程經過await等待線程池裏面線程任務所有執行完畢,子線程則執行完畢後調用countDown計數器減一,等全部子線程執行完畢後,主線程的await纔會返回。
先看下類圖:
可知CountDownLatch內部仍是使用AQS實現的。
首先經過構造函數初始化AQS的狀態值
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
而後看下await方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //若是線程被中斷則拋異常 if (Thread.interrupted()) throw new InterruptedException(); //嘗試看當前是否計數值爲0,爲0則直接返回,否者進入隊列等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
若是tryAcquireShared返回-1則 進入doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //加入隊列狀態爲共享節點 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //若是多個線程調用了await被放入隊列則一個個返回。 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //shouldParkAfterFailedAcquire會把當前節點狀態變爲SIGNAL類型,而後調用park方法把當先線程掛起, if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
調用await後,當前線程會被阻塞,直到全部子線程調用了countdown方法,並在計數爲0時候調用該線程unpark方法激活線程,而後該線程從新tryAcquireShared會返回1。
而後看下 countDown方法:
委託給sync public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
首先看下tryReleaseShared
protected boolean tryReleaseShared(int releases) { //循環進行cas,直到當前線程成功完成cas使計數值(狀態值state)減一更新到state for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
該函數一直返回false直到當前計數器爲0時候才返回true。
返回true後會調用doReleaseShared,該函數主要做用是調用uppark方法激活調用await的線程,代碼以下:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //節點類型爲SIGNAL,把類型在經過cas設置回去,而後調用unpark激活調用await的線程 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
激活主線程後,主線程會在調用tryAcquireShared獲取鎖。
可知ReentrantLock最終仍是使用AQS來實現,而且根據參數決定內部是公平仍是非公平鎖,默認是非公平鎖
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
加鎖代碼:
public void lock() { sync.lock(); }
lock方法最終調用FairSync重寫的tryAcquire方法
protected final boolean tryAcquire(int acquires) { //獲取當前線程和狀態值 final Thread current = Thread.currentThread(); int c = getState(); //狀態爲0說明該鎖未被任何線程持有 if (c == 0) { //爲了實現公平,首先看隊列裏面是否有節點,有的話再看節點所屬線程是否是當前線程,是的話hasQueuedPredecessors返回false,而後使用原子操做compareAndSetState保證一個線程更新狀態爲1,設置排他鎖歸屬爲當前線程。其餘線程經過cass則返回false. if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //狀態不爲0說明該鎖已經被線程持有,則看是不是當前線程持有,是則重入鎖次數+1. else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
公平性保證代碼:
public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
再看看unLock方法,最終調用了Sync的tryRelease方法:
protected final boolean tryRelease(int releases) { //若是不是鎖持有者調用UNlock則拋出異常。 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //若是當前可重入次數爲0,則清空鎖持有線程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //設置可重入次數爲原始值-1 setState(c); return free; }
final void lock() { //若是當前鎖空閒0,則設置狀態爲1,而且設置當前線程爲鎖持有者 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);//調用重寫的tryAcquire方法->nonfairTryAcquire方法 }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//狀態爲0說明沒有線程持有該鎖 if (compareAndSetState(0, acquires)) {//cas原子性操做,保證只有一個線程能夠設置狀態 setExclusiveOwnerThread(current);//設置鎖全部者 return true; } }//若是當前線程是鎖持有者則可重入鎖計數+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
可知公平與非公平都是先執行tryAcquire嘗試獲取鎖,若是成功則直接獲取鎖,若是不成功則把當前線程放入隊列。對於放入隊列裏面的第一個線程A在unpark後會進行自旋調用tryAcquire嘗試獲取鎖,假如這時候有一個線程B執行了lock操做,那麼也會調用tryAcquire方法嘗試獲取鎖,可是線程B並不在隊列裏面,可是線程B有可能比線程A優先獲取到鎖,也就是說雖然線程A先請求的鎖,可是卻有可能沒有B先獲取鎖,這是非公平鎖實現。而公平鎖要保證線程A要比線程B先獲取鎖。因此公平鎖相比非公平鎖在tryAcquire裏面添加了hasQueuedPredecessors方法用來保證公平性。
如圖讀寫鎖內部維護了一個ReadLock和WriteLock,而且也提供了公平和非公平的實現,下面只介紹下非公平的讀寫鎖實現。咱們知道AQS裏面只維護了一個state狀態,而ReentrantReadWriteLock則須要維護讀狀態和寫狀態,一個state是沒法表示寫和讀狀態的。因此ReentrantReadWriteLock使用state的高16位表示讀狀態也就是讀線程的個數,低16位表示寫鎖可重入量。
static final int SHARED_SHIFT = 16; 共享鎖(讀鎖)狀態單位值65536 static final int SHARED_UNIT = (1 << SHARED_SHIFT); 共享鎖線程最大個數65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; 排它鎖(寫鎖)掩碼 二進制 15個1 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 返回讀鎖線程數 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 返回寫鎖可重入個數 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
lock 獲取鎖
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //c!=0說明讀鎖或者寫鎖已經被某線程獲取 if (c != 0) { //w=0說明已經有線程獲取了讀鎖或者w!=0而且當前線程不是寫鎖擁有者,則返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; //說明某線程獲取了寫鎖,判斷可重入個數 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 設置可重入數量(1) setState(c + acquires); return true; } //寫線程獲取寫鎖 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
unlock 釋放鎖:
protected final boolean tryRelease(int releases) { // 看是不是寫鎖擁有者調用的unlock if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //獲取可重入值,這裏沒有考慮高16位,由於寫鎖時候讀鎖狀態值確定爲0 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //若是寫鎖可重入值爲0則釋放鎖,否者只是簡單更新狀態值。 if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
對應讀鎖只須要分析下Sync的tryAcquireShared和tryReleaseShared
lock 獲取鎖:
protected final int tryAcquireShared(int unused) { //獲取當前狀態值 Thread current = Thread.currentThread(); int c = getState(); //若是寫鎖計數不爲0說明已經有線程獲取了寫鎖,而後看是否是當前線程獲取的寫鎖。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //獲取讀鎖計數 int r = sharedCount(c); //嘗試獲取鎖,多個讀線程只有一個會成功,不成功的進入下面fullTryAcquireShared進行重試 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
unlock 釋放鎖:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //循環直到本身的讀計數-1 cas更新成功 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
Java內存模型中,容許編譯器和處理器對指令進行重排序,可是重排序能夠保證最終執行的結果是與程序順序執行的結果一致,而且只會對不存在數據依賴性的指令進行重排序,這個重排序在單線程下對最終執行結果是沒有影響的,可是在多線程下就會存在問題。
int a = 1;(1) int b = 2;(2) int c= a + b;(3)
如上c的值依賴a和b的值,因此重排序後可以保證(3)的操做在(2)(1)以後,可是(1)(2)誰先執行就不必定了,這在單線程下不會存在問題,由於並不影響最終結果。
public static class ReadThread extends Thread { public void run() { while(!Thread.currentThread().isInterrupted()){ if(ready){(1) System.out.println(num+num);(2) } System.out.println("read thread...."); } } } public static class Writethread extends Thread { public void run() { num = 2;(3) ready = true;(4) System.out.println("writeThread set over..."); } } private static int num =0; private static boolean ready = false; public static void main(String[] args) throws InterruptedException { ReadThread rt = new ReadThread(); rt.start(); Writethread wt = new Writethread(); wt.start(); Thread.sleep(10); rt.interrupt(); System.out.println("main exit"); }
如代碼因爲(1)(2)(3)(4) 之間不存在依賴,因此寫線程(3)(4)可能被重排序爲先執行(4)在執行(3),那麼執行(4)後,讀線程可能已經執行了(1)操做,而且在(3)執行前開始執行(2)操做,這時候打印結果爲0而不是4.
解決:使用volatile 修飾ready能夠避免重排序。
Java中斷機制是一種線程間協做模式,經過中斷並不能直接終止另外一個線程,而是須要被中斷的線程根據中斷狀態自行處理。
例如當線程A運行時,線程B能夠調用A的 interrupt()方法來設置中斷標誌爲true,並當即返回。設置標誌僅僅是設置標誌,線程A並無實際被中斷,會繼續往下執行的,而後線程A能夠調用isInterrupted方法來看本身是否是被中斷了,返回true說明本身被別的線程中斷了,而後根據狀態來決定是否終止本身活或者幹些其餘事情。
Interrupted經典使用代碼
public void run(){ try{ .... //線程退出條件 while(!Thread.currentThread().isInterrupted()&& more work to do){ // do more work; } }catch(InterruptedException e){ // thread was interrupted during sleep or wait } finally{ // cleanup, if required } }
故意調用interrupt()設置中斷標誌,做爲線程退出條件:
public static class MyThread extends Thread { public void run() { while (!Thread.currentThread().isInterrupted()) { System.out.println("do Someing...."); } } } public static void main(String[] args) throws InterruptedException { MyThread t = new MyThread(); t.start(); Thread.sleep(1000); t.interrupt(); }
當線程中爲了等待一些特定條件的到來時候,通常會調用Thread.sleep(),wait,join方法在阻塞當前線程,好比sleep(3000);那麼到3s後纔會從阻塞下變爲激活狀態,可是有可能在在3s內條件已經知足了,這時候能夠調用該線程的interrupt方法,sleep方法會拋出InterruptedException異常,線程恢復激活狀態:
public static class SleepInterrupt extends Object implements Runnable{ public void run(){ try{ System.out.println("thread-sleep for 2000 seconds"); Thread.sleep(2000000); System.out.println("thread -waked up"); }catch(InterruptedException e){ System.out.println("thread-interrupted while sleeping"); return; } System.out.println("thread-leaving normally"); } } public static void main(String[] args) throws InterruptedException { SleepInterrupt si = new SleepInterrupt(); Thread t = new Thread(si); t.start(); //主線程休眠2秒,從而確保剛纔啓動的線程有機會執行一段時間 try { Thread.sleep(2000); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("main() - interrupting other thread"); //中斷線程t t.interrupt(); System.out.println("main() - leaving"); }
InterruptedException的處理
若是拋出 InterruptedException那麼就意味着拋出異常的方法是阻塞方法,好比Thread.sleep,wait,join。
那麼接受到異常後如何處理的,醉簡單的是直接catch掉,不作任何處理,可是中斷髮生通常是爲了取消任務或者退出線程來使用的,因此若是直接catch掉那麼就會失去作這些處理的時機,出發你能肯定不須要根據中斷條件作其餘事情。
static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子線程在進行計算"); Thread.sleep(1000); int sum = 0; for (int i = 0; i < 100; i++) sum += i; return sum; } } public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask); System.out.println("主線程在執行任務"); try { System.out.println("task運行結果" + futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("全部任務執行完畢"); executor.shutdown(); }
如上代碼主線程會在futureTask.get()出阻塞直到task任務執行完畢,而且會返回結果。
FutureTask 內部有一個state用來展現任務的狀態,而且是volatile修飾的:
/** Possible state transitions: * NEW -> COMPLETING -> NORMAL 正常的狀態轉移 * NEW -> COMPLETING -> EXCEPTIONAL 異常 * NEW -> CANCELLED 取消 * NEW -> INTERRUPTING -> INTERRUPTED 中斷 */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
其中構造FutureTask實例時候狀態爲new
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; }
把FutureTask提交到線程池或者線程執行start時候會調用run方法:
public void run() { //若是當前不是new狀態,或者當前cas設置當前線程失敗則返回,只有一個線程能夠成功。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //當前狀態爲new 則調用任務的call方法執行任務 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex);完成NEW -> COMPLETING -> EXCEPTIONAL 狀態轉移 } //執行任務成功則保存結果更新狀態,unpark全部等待線程。 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { //狀態從new->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //狀態從COMPLETING-》NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //unpark全部等待線程。 finishCompletion(); } }
任務提交後,會調用 get方法獲取結果,這個get方法是阻塞的。
public V get() throws InterruptedException, ExecutionException { int s = state; //若是當前狀態是new或者COMPLETING則等待,由於位normal或者exceptional時候才說明數據計算完成了。 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //若是被中斷,則拋異常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } //組建單列表 int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); //超時則返回 if (nanos <= 0L) { removeWaiter(q); return state; } //否者設置park超時時間 LockSupport.parkNanos(this, nanos); } else //直接掛起當前線程 LockSupport.park(this); } }
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
在submit任務後還能夠調用futuretask的cancel來取消任務:
public boolean cancel(boolean mayInterruptIfRunning) { //只有任務是new的才能取消 if (state != NEW) return false; //運行時容許中斷 if (mayInterruptIfRunning) { //完成new->INTERRUPTING if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); //完成INTERRUPTING->INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //不容許中斷則直接new->CANCELLED else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
翻看ConcurrentHashMap的源碼知道ConcurrentHashMap使用分離鎖,整個map分段segment,每一個segments是繼承了ReentrantLock,使用ReentrantLock的獨佔鎖用來控制同一個段只能有一個線程進行寫,可是不一樣段能夠多個線程同時寫。另外不管是段內仍是段外多個線程均可以同時讀取,由於他使用了volatile語義的讀,並沒加鎖。而且當前段有寫線程時候,該段也容許多個讀線程存在。
put的大概邏輯,首先計算key的hash值,而後根據必定算法(位移和與操做)計算出該元素應該放到那個segment,而後調用segment.put方法,該方法裏面使用ReentrantLock進行寫控制,第一個線程tryLock獲取鎖進行寫入,其餘寫線程則自旋調用tryLock 循環嘗試。
get的大概邏輯,使用UNSAFE.getObjectVolatile 在不加鎖狀況下獲取volatile語義的值。
關注公衆號《架構文摘》,天天一篇架構領域重磅好文,涉及一線互聯網公司應用架構(高可用、高性能、高穩定)、大數據、機器學習、Java架構等各個熱門領域。