公平鎖是指多個線程按照申請鎖的順序來獲取鎖。java
非公平鎖是指多個線程獲取鎖的順序並非按照申請鎖的順序,有可能後申請的線程比先申請的線程優先獲取鎖。有可能,會形成優先級反轉或者飢餓現象。node
對於 Java ReentrantLock
而言,經過構造函數指定該鎖是不是公平鎖,默認是非公平鎖。非公平鎖的優勢在於吞吐量比公平鎖大。git
對於Synchronized
而言,也是一種非公平鎖。因爲其並不像ReentrantLock
是經過 AQS 的來實現線程調度,因此並無任何辦法使其變成公平鎖。github
可重入鎖又名遞歸鎖,是指在同一個線程在外層方法獲取鎖的時候,在進入內層方法會自動獲取鎖。算法
說的有點抽象,下面會有一個代碼的示例。對於 Java ReentrantLock
而言, 他的名字就能夠看出是一個可重入鎖,其名字是Re entrant Lock
從新進入鎖。對於Synchronized
而言,也是一個可重入鎖。可重入鎖的一個好處是可必定程度避免死鎖。編程
synchronized void setA() throws Exception{ Thread.sleep(1000); setB(); } synchronized void setB() throws Exception{ Thread.sleep(1000); }
上面的代碼就是一個可重入鎖的一個特色,若是不是可重入鎖的話,setB 可能不會被當前線程執行,可能形成死鎖。數組
獨享鎖是指該鎖一次只能被一個線程所持有。多線程
共享鎖是指該鎖可被多個線程所持有。併發
對於 Java ReentrantLock
而言,其是獨享鎖。可是對於 Lock 的另外一個實現類ReadWriteLock
,其讀鎖是共享鎖,其寫鎖是獨享鎖。讀鎖的共享鎖可保證併發讀是很是高效的,讀寫,寫讀 ,寫寫的過程是互斥的。獨享鎖與共享鎖也是經過 AQS 來實現的,經過實現不一樣的方法,來實現獨享或者共享。對於Synchronized
而言,固然是獨享鎖。框架
上面講的獨享鎖/共享鎖就是一種廣義的說法,互斥鎖/讀寫鎖就是具體的實現。互斥鎖在 Java 中的具體實現就是ReentrantLock
讀寫鎖在 Java 中的具體實現就是ReadWriteLock
樂觀鎖與悲觀鎖不是指具體的什麼類型的鎖,而是指看待併發同步的角度。悲觀鎖認爲對於同一個數據的併發操做,必定是會發生修改的,哪怕沒有修改,也會認爲修改。所以對於同一個數據的併發操做,悲觀鎖採起加鎖的形式。悲觀的認爲,不加鎖的併發操做必定會出問題。樂觀鎖則認爲對於同一個數據的併發操做,是不會發生修改的。在更新數據的時候,會採用嘗試更新,不斷從新的方式更新數據。樂觀的認爲,不加鎖的併發操做是沒有事情的。
從上面的描述咱們能夠看出,悲觀鎖適合寫操做很是多的場景,樂觀鎖適合讀操做很是多的場景,不加鎖會帶來大量的性能提高。悲觀鎖在 Java 中的使用,就是利用各類鎖。樂觀鎖在 Java 中的使用,是無鎖編程,經常採用的是 CAS 算法,典型的例子就是原子類,經過 CAS 自旋實現原子操做的更新。
分段鎖實際上是一種鎖的設計,並非具體的一種鎖,對於ConcurrentHashMap
而言,其併發的實現就是經過分段鎖的形式來實現高效的併發操做。咱們以ConcurrentHashMap
來講一下分段鎖的含義以及設計思想,ConcurrentHashMap
中的分段鎖稱爲 Segment,它即相似於 HashMap(JDK7 與 JDK8 中 HashMap 的實現)的結構,即內部擁有一個 Entry 數組,數組中的每一個元素既是一個鏈表;同時又是一個 ReentrantLock(Segment 繼承了 ReentrantLock)。當須要 put 元素的時候,並非對整個 hashmap 進行加鎖,而是先經過 hashcode 來知道他要放在那一個分段中,而後對這個分段進行加鎖,因此當多線程 put 的時候,只要不是放在一個分段中,就實現了真正的並行的插入。可是,在統計 size 的時候,可就是獲取 hashmap 全局信息的時候,就須要獲取全部的分段鎖才能統計。分段鎖的設計目的是細化鎖的粒度,當操做不須要更新整個數組的時候,就僅僅針對數組中的一項進行加鎖操做。
這三種鎖是指鎖的狀態,而且是針對Synchronized
。在 Java 5 經過引入鎖升級的機制來實現高效Synchronized
。
這三種鎖的狀態是經過對象監視器在對象頭中的字段來代表的。
偏向鎖是指一段同步代碼一直被一個線程所訪問,那麼該線程會自動獲取鎖。下降獲取鎖的代價。
輕量級鎖是指當鎖是偏向鎖的時候,被另外一個線程所訪問,偏向鎖就會升級爲輕量級鎖,其餘線程會經過自旋的形式嘗試獲取鎖,不會阻塞,提升性能。
重量級鎖是指當鎖爲輕量級鎖的時候,另外一個線程雖然是自旋,但自旋不會一直持續下去,當自旋必定次數的時候,尚未獲取到鎖,就會進入阻塞,該鎖膨脹爲重量級鎖。重量級鎖會讓其餘申請的線程進入阻塞,性能下降。
在 Java 中,自旋鎖是指嘗試獲取鎖的線程不會當即阻塞,而是採用循環的方式去嘗試獲取鎖,這樣的好處是減小線程上下文切換的消耗,缺點是循環會消耗 CPU。
synchronized 的缺陷
Lock、ReadWriteLock 相較於 synchronized,解決了以上的缺陷:
若是採用 Lock,必須主動去釋放鎖,而且在發生異常時,不會自動釋放鎖。所以通常來講,使用 Lock 必須在 try catch 塊中進行,而且將釋放鎖的操做放在 finally 塊中進行,以保證鎖必定被被釋放,防止死鎖的發生。
lock()
方法的做用是獲取鎖。若是鎖已被其餘線程獲取,則進行等待。
tryLock()
方法的做用是嘗試獲取鎖,若是成功,則返回 true;若是失敗(即鎖已被其餘線程獲取),則返回 false。也就是說,這個方法不管如何都會當即返回,獲取不到鎖時不會一直等待。
tryLock(long time, TimeUnit unit)
方法和 tryLock()
方法是相似的,區別僅在於這個方法在獲取不到鎖時會等待必定的時間,在時間期限以內若是還獲取不到鎖,就返回 false。若是若是一開始拿到鎖或者在等待期間內拿到了鎖,則返回 true。
lockInterruptibly()
方法比較特殊,當經過這個方法去獲取鎖時,若是線程正在等待獲取鎖,則這個線程可以響應中斷,即中斷線程的等待狀態。也就使說,當兩個線程同時經過 lock.lockInterruptibly()
想獲取某個鎖時,倘若此時線程 A 獲取到了鎖,而線程 B 只有在等待,那麼對線程 B 調用 threadB.interrupt()
方法可以中斷線程 B 的等待過程。因爲 lockInterruptibly()
的聲明中拋出了異常,因此 lock.lockInterruptibly()
必須放在 try 塊中或者在調用 lockInterruptibly()
的方法外聲明拋出 InterruptedException
。
注意:當一個線程獲取了鎖以後,是不會被 interrupt() 方法中斷的。由於自己在前面的文章中講過單獨調用 interrupt() 方法不能中斷正在運行過程當中的線程,只能中斷阻塞過程當中的線程。所以當經過 lockInterruptibly() 方法獲取某個鎖時,若是不能獲取到,只有進行等待的狀況下,是能夠響應中斷的。
unlock()
方法的做用是釋放鎖。
ReentrantLock 是惟一實現了 Lock 接口的類。
ReentrantLock 字面意爲可重入鎖。
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
ReentrantLock 的核心方法固然是 Lock 中的方法(具體實現徹底基於 Sync
類中提供的方法)。
此外,ReentrantLock 有兩個構造方法,功能參考下面源碼片斷中的註釋。
// 同步機制徹底依賴於此 private final Sync sync; // 默認初始化 sync 的實例爲非公平鎖(NonfairSync) public ReentrantLock() {} // 根據 boolean 值選擇初始化 sync 的實例爲公平的鎖(FairSync)或不公平鎖(NonfairSync) public ReentrantLock(boolean fair) {}
Sync
類是 ReentrantLock
的內部類,也是一個抽象類。ReentrantLock
的同步機制幾乎徹底依賴於Sync
。使用 AQS 狀態來表示鎖的保留數(詳細介紹參見 AQS)。Sync
是一個抽象類,有兩個子類:
FairSync
- 公平鎖版本。NonfairSync
- 非公平鎖版本。public class ReentrantLockDemo { private ArrayList<Integer> arrayList = new ArrayList<Integer>(); private Lock lock = new ReentrantLock(); public static void main(String[] args) { final ReentrantLockDemo demo = new ReentrantLockDemo(); new Thread(() -> demo.insert(Thread.currentThread())).start(); new Thread(() -> demo.insert(Thread.currentThread())).start(); } private void insert(Thread thread) { lock.lock(); try { System.out.println(thread.getName() + "獲得了鎖"); for (int i = 0; i < 5; i++) { arrayList.add(i); } } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(thread.getName() + "釋放了鎖"); lock.unlock(); } } }
👉 更多示例
對於特定的資源,ReadWriteLock 容許多個線程同時對其執行讀操做,可是隻容許一個線程對其執行寫操做。
ReadWriteLock 維護一對相關的鎖。一個是讀鎖;一個是寫鎖。將讀寫鎖分開,有利於提升併發效率。
ReentrantReadWriteLock 實現了 ReadWriteLock 接口,因此它是一個讀寫鎖。
「讀-讀」線程之間不存在互斥關係。
「讀-寫」線程、「寫-寫」線程之間存在互斥關係。
public interface ReadWriteLock { /** * 返回用於讀操做的鎖 */ Lock readLock(); /** * 返回用於寫操做的鎖 */ Lock writeLock(); }
public class ReentrantReadWriteLockDemo { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo(); new Thread(() -> demo.get(Thread.currentThread())).start(); new Thread(() -> demo.get(Thread.currentThread())).start(); } public synchronized void get(Thread thread) { rwl.readLock().lock(); try { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName() + "正在進行讀操做"); } System.out.println(thread.getName() + "讀操做完畢"); } finally { rwl.readLock().unlock(); } } }
AQS 做爲構建鎖或者其餘同步組件的基礎框架,有必要好好了解一下其原理。
做用:AQS,AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其餘同步組件的基礎框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等)。
場景:在 LOCK 包中的相關鎖(經常使用的有 ReentrantLock、 ReadWriteLock)都是基於 AQS 來構建。然而這些鎖都沒有直接來繼承 AQS,而是定義了一個 Sync 類去繼承 AQS。那麼爲何要這樣呢?because:鎖面向的是使用用戶,而同步器面向的則是線程控制,那麼在鎖的實現中聚合同步器而不是直接繼承 AQS 就能夠很好的隔離兩者所關注的事情。
原理:AQS 在內部定義了一個 int 變量 state,用來表示同步狀態。AQS 經過一個雙向的 FIFO 同步隊列來完成同步狀態的管理,當有線程獲取鎖失敗後,就被添加到隊列末尾。
AbstractQueuedSynchronizer 繼承自 AbstractOwnableSynchronize。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** 等待隊列的隊頭,懶加載。只能經過 setHead 方法修改。 */ private transient volatile Node head; /** 等待隊列的隊尾,懶加載。只能經過 enq 方法添加新的等待節點。*/ private transient volatile Node tail; /** 同步狀態 */ private volatile int state; }
AQS 維護了一個 Node 類型雙鏈表,經過 head 和 tail 指針進行訪問。
Node
static final class Node { /** 該等待同步的節點處於共享模式 */ static final Node SHARED = new Node(); /** 該等待同步的節點處於獨佔模式 */ static final Node EXCLUSIVE = null; /** 等待狀態,這個和 state 是不同的:有 1,0,-1,-2,-3 五個值 */ volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** 前驅節點 */ volatile Node prev; /** 後繼節點 */ volatile Node next; /** 等待鎖的線程 */ volatile Thread thread; }
很顯然,Node 是一個雙鏈表結構。
waitStatus 5 個狀態值的含義:
acquire
/** * 先調用 tryAcquire 查看同步狀態。 * 若是成功獲取同步狀態,則結束方法,直接返回; * 反之,則先調用 addWaiter,再調用 acquireQueued。 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
addWaiter
addWaiter
方法的做用是將當前線程插入等待同步隊列的隊尾。
private Node addWaiter(Node mode) { // 1. 將當前線程構建成 Node 類型 Node node = new Node(Thread.currentThread(), mode); // 2. 判斷尾指針是否爲 null Node pred = tail; if (pred != null) { // 2.2 將當前節點插入隊列尾部 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 2.1. 尾指針爲 null,說明當前節點是第一個加入隊列的節點 enq(node); return node; }
enq
enq
方法的做用是經過自旋(死循環),不斷嘗試利用 CAS 操做將節點插入隊列尾部,直到成功爲止。
private Node enq(final Node node) { // 設置死循環,是爲了避免斷嘗試 CAS 操做,直到成功爲止 for (;;) { Node t = tail; if (t == null) { // 1. 構造頭結點(必須初始化,須要領會雙鏈表的精髓) if (compareAndSetHead(new Node())) tail = head; } else { // 2. 經過 CAS 操做將節點插入隊列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued
acquireQueued
方法的做用是經過自旋(死循環),不斷嘗試爲等待隊列中線程獲取獨佔鎖。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 1. 得到當前節點的上一個節點 final Node p = node.predecessor(); // 2. 當前節點可否獲取獨佔式鎖 // 2.1 若是當前節點是隊列中第一個節點,而且成功獲取同步狀態,便可以得到獨佔式鎖 // 說明:當前節點的上一個節點是頭指針,即意味着當前節點是隊列中第一個節點。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 2.2 獲取鎖失敗,線程進入等待狀態等待獲取獨佔式鎖 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued Before
setHead
方法
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
將當前節點經過 setHead 方法設置爲隊列的頭結點,而後將以前的頭結點的 next 域設置爲 null,而且 pre 域也爲 null,即與隊列斷開,無任何引用方便 GC 時可以將內存進行回收。
shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire
方法的做用是使用 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
將節點狀態由 INITIAL 設置成 SIGNAL,表示當前線程阻塞。
當 compareAndSetWaitStatus 設置失敗,則說明 shouldParkAfterFailedAcquire 方法返回 false,從新進入外部方法 acquireQueued。因爲 acquireQueued 方法中是死循環,會再一次執行 shouldParkAfterFailedAcquire,直至 compareAndSetWaitStatus 設置節點狀態位爲 SIGNAL。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt
parkAndCheckInterrupt
方法的做用是調用 LookSupport.park
方法,該方法是用來阻塞當前線程的。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
acquire 流程
綜上所述,就是 acquire 的完整流程。能夠以一幅圖來講明:
release
release 方法以獨佔模式發佈。若是 tryRelease 返回 true,則經過解鎖一個或多個線程來實現。這個方法能夠用來實現 Lock.unlock 方法。
public final boolean release(int arg) { // 判斷同步狀態釋放是否成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
unparkSuccessor
unparkSuccessor 方法做用是喚醒 node 的下一個節點。
頭指針的後繼節點
private void unparkSuccessor(Node node) { /* * 若是狀態爲負值(便可能須要信號),請嘗試清除信號。 * 若是失敗或狀態因爲等待線程而改變也是正常的。 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /** * 釋放後繼節點的線程。 * 若是狀態爲 CANCELLED 放或節點明顯爲空, * 則從尾部向後遍歷以找到狀態不是 CANCELLED 的後繼節點。 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 後繼節點不爲 null 時喚醒該線程 if (s != null) LockSupport.unpark(s.thread); }
總結
acquireInterruptibly
Lock 能響應中斷,這是相較於 synchronized 的一個顯著優勢。
那麼 Lock 響應中斷的特性是如何實現的?答案就在 acquireInterruptibly 方法中。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) // 線程獲取鎖失敗 doAcquireInterruptibly(arg); }
doAcquireInterruptibly
獲取同步狀態失敗後就會調用 doAcquireInterruptibly 方法
private void doAcquireInterruptibly(int arg) throws InterruptedException { // 將節點插入到同步隊列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 獲取鎖出隊 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 線程中斷拋異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
與 acquire 方法邏輯幾乎一致,惟一的區別是當 parkAndCheckInterrupt 返回 true 時(即線程阻塞時該線程被中斷),代碼拋出被中斷異常。
tryAcquireNanos
經過調用 lock.tryLock(timeout,TimeUnit) 方式達到超時等待獲取鎖的效果,該方法會在三種狀況下才會返回:
咱們仍然經過採起閱讀源碼的方式來學習底層具體是怎麼實現的,該方法會調用 AQS 的方法 tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || // 實現超時等待的效果 doAcquireNanos(arg, nanosTimeout); }
doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 1. 根據超時時間和當前時間計算出截止時間 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 2. 當前線程得到鎖出隊列 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 3.1 從新計算超時時間 nanosTimeout = deadline - System.nanoTime(); // 3.2 超時返回 false if (nanosTimeout <= 0L) return false; // 3.3 線程阻塞等待 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 3.4 線程被中斷拋出被中斷異常 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
acquireShared
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
嘗試獲取共享鎖失敗,調用 doAcquireShared
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 當該節點的前驅節點是頭結點且成功獲取同步狀態 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
以上代碼和 acquireQueued 的代碼邏輯十分類似,區別僅在於自旋的條件以及節點出隊的操做有所不一樣。
releaseShared
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
doReleaseShared
當成功釋放同步狀態以後即 tryReleaseShared 會繼續執行 doReleaseShared 方法
發送後繼信號並確保傳播。 (注意:對於獨佔模式,若是須要信號,釋放就至關於調用頭的 unparkSuccessor。)
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 若是 CAS 失敗,繼續自旋 continue; } // 若是頭指針變化,break if (h == head) break; } }
acquireSharedInterruptibly 方法與 acquireInterruptibly 幾乎一致,再也不贅述。
tryAcquireSharedNanos 方法與 tryAcquireNanos 幾乎一致,再也不贅述。
免費Java資料須要本身領取,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高併發分佈式等教程。