原文:chenmingyu.top/concurrent-…java
鎖是用來控制多個線程訪問共享資源的方式,java
中可使用synchronized
和Lock
實現鎖的功能node
synchronized
是java中的關鍵字,隱藏獲取和釋放鎖的過程,Lock
是java中的接口,須要主動的獲取鎖和釋放鎖,synchronized
是排他鎖,而Lock
支持可中斷獲取鎖,超時獲取鎖算法
Lock
提供的接口編程
public interface Lock {
/** * 獲取鎖,調用該方法後當前線程獲取鎖,獲取到鎖以後從該方法返回 */
void lock();
/** * 可中斷的獲取鎖,在獲取鎖的過程當中能夠中斷當前線程 */
void lockInterruptibly() throws InterruptedException;
/** * 嘗試非阻塞的獲取鎖,調用方法後當即返回,獲取到鎖則返回true,不然返回false */
boolean tryLock();
/** * 超時獲取鎖,在超時時間內獲取到鎖,在超時時間被中斷,超時時間內爲獲取到鎖,三種狀況下會從該方法返回 */
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/** * 釋放鎖 */
void unlock();
/** * 獲取等待通知組件,只有當前線程獲取到鎖以後才能夠調用該組件的wait()方法,釋放鎖 */
Condition newCondition();
}
複製代碼
隊列同步器AbstractQueuedSynchronizer
(AQS
簡稱同步器)是用來構建鎖或者其餘同步組件的基礎框架安全
java
中鎖的實現基本都是經過聚合了一個同步器的子類完成線程訪問控制的,同步器是實現鎖的關鍵,能夠這麼理解,鎖面向編程者,隱藏了實現細節,同步器面向鎖的實現,簡化了鎖的實現方式,屏蔽了同步狀態管理,線程排隊,等待與喚醒等底層操做,經過AbstractQueuedSynchronizer
咱們能夠很方便的實現一個鎖併發
同步器的設計基於模板方法模式,提供的模板方法主要包括:獨佔鎖獲取鎖與釋放同步狀態,共享式獲取與釋放同步狀態,獲取同步隊列中等待線程狀況框架
獨佔式操做ide
想要實現一個獨佔式鎖須要重寫如下方法函數
方法名 | 描述 |
---|---|
void acquire(int arg) | 獨佔式獲取同步狀態,同一時刻只能有一個線程能夠獲取到同步狀態,獲取失敗進入同步隊列等待 |
void acquireInterruptibly(int arg) | 獨佔式獲取同步狀態,響應中斷操做,被中斷時會拋異常並返回 |
boolean tryAcquireNanos(int arg, long nanosTimeout) | 獨佔式獲取同步狀態,響應中斷操做,而且增長了超時限制,若是規定時間沒有得到同步狀態就返回false,不然返回true |
boolean release(int arg) | 獨佔式釋放同步狀態,在釋放同步狀態以後,將同步隊列中的第一個節點包含的線程喚醒 |
共享式操做源碼分析
想要實現一個共享鎖須要重寫如下方法
方法名 | 描述 |
---|---|
void acquireShared(int arg) | 共享式獲取同步狀態,同一時刻能夠有多個線程獲取到同步狀態 |
void acquireSharedInterruptibly(int arg) | 共享式獲取同步狀態,響應中斷操做 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 共享式獲取同步狀態,響應中斷操做,而且增長了超時限制,若是規定時間沒有得到同步狀態就返回false,不然返回true |
boolean releaseShared(int arg) | 共享式釋放同步狀態 |
方法名 | 描述 |
---|---|
Collection getQueuedThreads() | 獲取同步隊列上的線程集合 |
在這些模板方法中,屢次提到了同步隊列,咱們看一下AQS
是如何實現同步隊列的
首先看下AbstractQueuedSynchronizer
的類圖
Node
類是AbstractQueuedSynchronizer
類的內部類,同步器依靠內部的一個同步隊列來完成同步狀態的管理,當前線程獲取同步狀態失敗的時候,同步器會將當前線程及等待信息構形成一個Node
節點加入到同步隊列中
屬性 | 描述 |
---|---|
waitStatus | 該線程等待狀態,包含以下: CANCELLED 值爲1,表示須要從同步隊列中取消等待 SIGNAL值爲-1,表示後繼節點處於等待狀態,若是當前節點釋放同步狀態會通知後繼節點,使得後繼節點的線程可以運行 CONDITION值爲-2,表示節點在等待隊列中 PROPAGATE值爲-3,表示下一次共享式同步狀態獲取將會無條件傳播下去 INITIAL值爲0,表示初始狀態 |
prev:Node | 前驅節點 |
next:Node | 後繼節點 |
thread:Thread | 當前線程 |
nextWaiter:Node | 下一個等待節點 |
能夠看到AQS中
的節點信息包含前驅和後繼節點,因此咱們知道了AQS的同步隊列是雙向鏈表結構的
AQS
中的幾個重要屬性
屬性 | 描述 |
---|---|
state:int | 同步狀態:若是等於0,鎖屬於空閒狀態,若是等於1,標識鎖被佔用,若是大於1,則表示鎖被當前持有的線程屢次加鎖,即重入狀態 |
head:Node | 隊列的頭節點 |
tail:Node | 隊列的尾節點 |
unsafe:Unsafe | AQS中的cas算法實現 |
AQS
中提供了三個方法對同步狀態進行操做
getState()
獲取到同步狀態setState(int newState)
設置同步狀態compareAndSetState(int expect, int update)
使用CAS
設置當前狀態,該方法可以保證設置的原子性AQS
的基本結構以下圖所示
在同步器中head
和tail
的節點的引用指向同步隊列的頭,尾節點,這樣在後面操做節點入列和出列的時候只須要操做同步器中的head
和tail
節點就能夠
ReentrantLock
重入鎖,內部AQS的實現是基於獨佔式獲取/釋放同步狀態的。咱們學習一下ReentrantLock
的實現原理來進一步加深對AQS
的理解
重進入是指任意線程在獲取到鎖以後可以再次獲取該鎖而不會被鎖阻塞,它表示一個線程能夠對資源重複加鎖,同時支持獲取鎖時使用公平鎖仍是非公平鎖
例:
/** * @author: chenmingyu * @date: 2019/4/12 15:09 * @description: ReentrantLock */
public class ReentrantLockTest {
private static Lock LOCK = new ReentrantLock();
public static void main(String[] args) {
Runnable r1 = new TestThread();
new Thread(r1,"r1").start();
Runnable r2 = new TestThread();
new Thread(r2,"r2").start();
}
public static class TestThread implements Runnable{
@Override
public void run() {
LOCK.lock();
try {
System.out.println(Thread.currentThread().getName()+":獲取到鎖 "+LocalTime.now());
TimeUnit.SECONDS.sleep(3L);
}catch (Exception e){
e.printStackTrace();
}finally {
LOCK.unlock();
}
}
}
}
複製代碼
輸出
只有在r1
線程釋放鎖以後r2
線程才獲取到鎖去執行代碼打印數據
建立的實例,默認使用非公平鎖,若是須要公平鎖,須要調用有參的構造函數
/** * 非公平鎖 * 建立ReentrantLock實例,默認使用非公平鎖 */
public ReentrantLock() {
sync = new NonfairSync();
}
/** * 公平鎖 * 建立ReentrantLock實例,fair爲true使用公平鎖 */
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
複製代碼
NonfairSync
與FairSync
都是ReentrantLock
類的內部類,繼承自ReentrantLock
類的內部類Sync
,Sync
類繼承了AbstractQueuedSynchronizer
類圖以下
非公平鎖的實現
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
複製代碼
非公平鎖會在調用lock()
方法的時候首先調用compareAndSetState(0, 1)
方法嘗試獲取鎖,若是沒有獲取到鎖則調用acquire(1)
方法
compareAndSetState(0, 1)
方法是一個CAS
操做,如過設置成功,則爲獲取到同步狀態,並調用setExclusiveOwnerThread(Thread.currentThread());
方法將當前線程設置爲獨佔模式同步狀態的全部者
咱們所說的獲取同步狀態其實指的就是獲取鎖的狀態,獲取同步狀態成功則加鎖成功
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
複製代碼
acquire(1)
方法是提供的模板方法,調用tryAcquire(arg)
和acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
tryAcquire(arg)
方法調用的是子類的實現,NonfairSync
的tryAcquire
方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
複製代碼
nonfairTryAcquire(acquires)
方法
/** * 非公平嘗試獲取同步狀態 */
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
/** * 首先根據`getState()`方法獲取同步狀態,若是等於0嘗試調用`compareAndSetState(0, * acquires)`方法獲取同步狀態,若是設置成功則獲取同步狀態成功,設置當前線程爲獨佔模式同步狀態的 * 全部者 */
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
getState()
方法獲取同步狀態,若是等於0嘗試調用compareAndSetState(0, acquires)
方法獲取同步狀態,若是設置成功則獲取同步狀態成功,設置當前線程爲獨佔模式同步狀態的全部者state
+1,表示當前線程屢次加鎖若是tryAcquire(arg)
返回false,表示沒有獲取到同步狀態,即沒有拿到鎖,因此須要調用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法將當前線程加入到同步隊列中,而且以死循環的方式獲取同步狀態,若是獲取不到則阻塞節點中的線程,而被阻塞的線程只能經過前驅節點的出隊,或者阻塞線程被中斷來實現喚醒
addWaiter(Node.EXCLUSIVE)
方法的做用就是構造同步隊列的節點信息,而後加入到同步隊列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
複製代碼
首先調用Node
類的構造方法建立一個實例,tail
是AQS
中隊列的尾節點
若是tail
節點不爲空,將實例的前驅節點置爲tail
指向的節點,而後調用compareAndSetTail(pred, node)
方法,compareAndSetTail(pred, node)
方法調用unsafe.compareAndSwapObject(this, tailOffset, expect, update)
,此方法是一個CAS
操做,不可中斷,用來保證節點可以被線程安全的添加,設置成功後,將節點tail
的後繼節點指向當前實例,以此來實現將當前實例加入到同步隊列尾部
若是tail
節點等於空或者compareAndSetTail(pred, node)
設置失敗,則會調用enq(node)
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
在這個方法中利用for
循環構造了一個死循環,若是當前AQS
的tail
節點爲空,則證實當前同步隊列中沒有等待的線程,也就是沒有節點,調用compareAndSetHead(new Node())
方法構造了一個頭節點,而後循環調用compareAndSetTail(t, node)
將當前實例加入到隊列的尾部,若是失敗就一直調用,直到成功爲止
在調用addWaiter(Node mode)
方法後會調用acquireQueued(final Node node, int arg)
方法,做用是在每一個節點進入到同步隊列中後就進入了一個自旋的狀態,經過校驗本身的前驅節點是不是頭節點,而且是否獲取到同步狀態爲條件進行判斷,若是知足條件則從自旋中退出,負責一直自旋
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
方法內也是一個for
的死循環,經過node.predecessor()
方法獲取傳入的Node
實例的前驅節點並與AQS
的head
節點進行比較,若是相等,則嘗試獲取同步狀態獲取鎖,若是獲取成功就調用setHead(node);
方法將當前Node
實例節點設置爲head
節點,將原來head
節點的後繼節點置爲null,有助於GC回收
setHead(node);
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
複製代碼
若是傳入的Node
實例的前驅節點與AQS
的head
節點不相等或者獲取同步狀態失敗,則調用shouldParkAfterFailedAcquire(p, node)
和parkAndCheckInterrupt()
方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/* * This node has already set status asking a release * to signal it, so it can safely park. */
return true;
if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
經過CAS
操做,設置節點的前驅節點等待狀態爲Node.SIGNAL
,若是設置失敗,返回false,由於外層是死循環,會重複當前方法直到設置成功
parkAndCheckInterrupt()
方法調用LookSupport.park()
阻塞線程,而後清除掉中斷標識
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
從acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法返回後,調用selfInterrupt()
,將線程中斷
公平鎖的實現
在瞭解acquire(1);
方法的做用以後,在理解公平鎖的實現就容易了
final void lock() {
acquire(1);
}
複製代碼
對比非公平鎖的實現少了一步上來就獲取同步狀態的操做,其他操做跟非公平鎖的實現同樣
公平鎖與非公平鎖總結:
獨佔式獲取鎖的流程
ReentrantLock
的unlock()
方法實際調用的AQS
的release(int arg)
方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
首先調用tryRelease(arg)
釋放同步狀態
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製代碼
獲取同步狀態,並減1,若是此時c==0則釋放鎖,將當前獨佔式鎖的擁有線程置爲null,而後設置state
爲0
而後調用unparkSuccessor(Node node)
方法喚醒後繼節點的線程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
/* * 喚醒後繼節點的線程 */
LockSupport.unpark(s.thread);
}
複製代碼
總結一下獨佔式獲取鎖和釋放鎖的過程:
AQS
維護的同步隊列的尾部,而且開始自旋,跳出自旋的條件就是前驅節點爲AQS
的頭節點而且獲取到了同步狀態,此時將節點移除同步隊列在瞭解了ReentrantLock
的實現原理以後,咱們就能夠仿照着本身去實現一個自定義獨佔式鎖了
步驟
LockTest
類,實現Lock
接口,重寫必要的接口LockTest
類裏建立一個內部類Sync
,繼承AQS
,由於要實現獨佔式鎖,因此重寫tryAcquire(int arg)
和tryRelease(int arg)
方法就能夠了LockTest
代碼
/** * @author: chenmingyu * @date: 2019/4/11 15:11 * @description: 自定義獨佔式鎖 */
public class LockTest implements Lock{
private final Sync SYNC = new Sync();
public static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if(getState()<1){
throw new IllegalMonitorStateException("釋放同步狀態不可小於1");
}
int c = getState() - arg;
if (c == 0) {
setExclusiveOwnerThread(null);
}
setState(c);
return true;
}
}
@Override
public void lock() {
SYNC.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
SYNC.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return SYNC.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
SYNC.release(1);
}
@Override
public Condition newCondition() {
return null;
}
}
複製代碼
/** * @author: chenmingyu * @date: 2019/4/12 15:09 * @description: LockTest */
public class ReentrantLockTest {
private static Lock LOCKTEST = new LockTest();
public static void main(String[] args) {
Runnable r1 = new TestThread();
new Thread(r1,"LockTest 1").start();
Runnable r2 = new TestThread();
new Thread(r2,"LockTest 2").start();
}
public static class TestThread implements Runnable{
@Override
public void run() {
LOCKTEST.lock();
try {
System.out.println(Thread.currentThread().getName()+":獲取到鎖 "+LocalTime.now());
TimeUnit.SECONDS.sleep(3L);
}catch (Exception e){
e.printStackTrace();
}finally {
LOCKTEST.unlock();
}
}
}
}
複製代碼
輸出
ReentrantReadWriteLock
是讀寫鎖的實現,實現ReadWriteLock
接口
ReentrantReadWriteLock
內部一樣維護這一個Sync
內部類,實現了AQS
,經過重寫對應方法實現讀鎖和寫鎖
如今已經知道了同步狀態是由AQS
維護的一個整型變量state
,獨佔式鎖獲取到鎖時會對其進行加1,支持重入,而讀寫鎖ReentrantReadWriteLock
在設計的時候也是經過一個整型變量進行讀鎖的同步狀態和寫鎖的同步狀態維護,在一個變量上維護兩種狀態就須要對整型變量進行按位分割,一個int類型的變量包含4個字符,一個字符8個bit,就是32bit,在ReentrantReadWriteLock
中,高16位表示讀,低16位表示寫
寫鎖的獲取
讀寫鎖中的寫鎖,支持重進入的排它鎖
重寫ReentrantReadWriteLock
的內部類Sync
中的tryAcquire(int acquires)
方法
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
/* * 1,若是同步狀態c不等於0,表明着有讀鎖或者寫鎖 */
if (c != 0) {
// 2,若是c不等於0,w寫鎖的同步狀態爲0,切當前線程不是持有鎖的線程,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
複製代碼
解讀
若是存在讀鎖,寫鎖不能被獲取,必需要等到其餘讀線程釋放讀鎖,才能夠獲取到寫鎖,這麼作的緣由是要確保寫鎖作的操做對讀鎖可見,若是寫鎖被獲取,則其餘讀寫線程的後續訪問均會被阻塞
寫鎖的釋放
讀寫鎖中的讀鎖,支持重進入的共享鎖
寫鎖的釋放與獨佔式鎖釋放過程類似,每次都是減小寫鎖的同步狀態,直到爲0時,表示寫鎖已被釋放
讀鎖的獲取與釋放
讀鎖是一個支持重入的共享鎖,重寫ReentrantReadWriteLock
的內部類Sync
中的tryAcquireShared(int unused)
方法
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
複製代碼
若是其餘線程獲取了寫鎖,則當前線程獲取讀鎖狀態失敗進入等待狀態,若是當前線程獲取了寫鎖或者寫鎖未被獲取,則當前線程獲取同步狀態成功,獲取到讀鎖
釋放讀鎖的時候就是每次釋放都會對同步狀態進行-1,直到爲0時,表示讀鎖已被釋放
鎖降級
鎖降級是指將寫鎖降級爲讀鎖,這個過程就是當前線程已經獲取到寫鎖的時候,在獲取到讀鎖,隨後釋放寫鎖的過程,這麼作的目的爲的就是保證數據的可見性
當前線程A獲取到寫鎖後,對數據進行修改,以後在獲取到讀鎖,而後釋放寫鎖,完成鎖降級,這時候線程A還沒釋放讀鎖,別的線程就沒法獲取到寫鎖,就沒法對數進行修改,以此來保證數據的可見性
參考:java併發編程的藝術
推薦: