斜體爲抽象類,下橫線爲接口
java
聚合關係總結:node
鎖實現總結:多線程
性質:性能
Lock接口定義了鎖的行爲ui
public interface Lock {
//上鎖(不響應Thread.interrupt()直到獲取鎖)
void lock();
//上鎖(響應Thread.interrupt())
void lockInterruptibly() throws InterruptedException;
//嘗試獲取鎖(以nonFair方式獲取鎖)
boolean tryLock();
//在指定時間內嘗試獲取鎖(響應Thread.interrupt(),支持公平/二階段非公平)
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解鎖
void unlock();
//獲取Condition
Condition newCondition();
}
複製代碼
//鎖具體實現
private final Sync sync;
//根據傳入參數選擇FairSync或NonfairSync實現
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
#java.util.concurrent.locks.ReentrantLock.Sync
abstract void lock();
複製代碼
加入同步隊列(當同步隊列爲空時會直接得到鎖),等待鎖this
#java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
acquire(1);
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
acquire()流程:spa
tryAcquire():模板方法,獲取鎖線程
#java.util.concurrent.locks.ReentrantLock.FairSync
protected final boolean tryAcquire(int acquires) {
//獲取當前線程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//當前鎖沒被佔用
if (!hasQueuedPredecessors() &&//1.判斷同步隊列中是否有節點在等待
compareAndSetState(0, acquires)) {//2.若是上面!1成立,修改state值(代表當前鎖已被佔用)
setExclusiveOwnerThread(current);//3.若是2成立,修改當前佔用鎖的線程爲當前線程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//佔用鎖線程==當前線程(重入)
int nextc = c + acquires;//
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//修改status
return true;
}
return false;//直接獲取鎖失敗
}
複製代碼
acquireQueued(addWaiter(Node.EXCLUSIVE), arg):加入同步隊列指針
#java.util.concurrent.locks.AbstractQueuedSynchronizer
//1
private Node addWaiter(Node mode) {
//生成node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
//將node加到隊列尾部
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//若是加入失敗(多線程競爭或者tail指針爲null)
enq(node);
return node;
}
//1.1
private Node enq(final Node node) {
//死循環加入節點(cas會失敗)
for (;;) {
Node t = tail;
if (t == null) { //tail爲null,同步隊列初始化
//設置head指針
if (compareAndSetHead(new Node()))//注意這裏是個空節點!!
tail = head;//將tail也指向head
} else {
node.prev = t;//將當前node加到隊尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;//注意這裏才返回
}
}
}
}
//2
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//表示是否被打斷
boolean interrupted = false;
for (;;) {
//獲取node.pre節點
final Node p = node.predecessor();
if (p == head //當前節點是不是同步隊列中的第二個節點
&& tryAcquire(arg)) {//獲取鎖,head指向當前節點
setHead(node);//head=head.next
p.next = null;//置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //是否空轉(由於空轉喚醒是個耗時操做,進入空轉前判斷pre節點狀態.若是pre節點即將釋放鎖,則不進入空轉)
parkAndCheckInterrupt())//利用unsafe.park()進行空轉(阻塞)
interrupted = true;//若是Thread.interrupt()被調用,(不會真的被打斷,會繼續循環空轉直到獲取到鎖)
}
} finally {
if (failed)//tryAcquire()過程出現異常致使獲取鎖失敗,則移除當前節點
cancelAcquire(node);
}
}
複製代碼
過程總結:code
注意:這裏有兩次tryAcquire()過程.第一次,爲了不同步隊列爲空時還插入隊列產生的性能耗費(cas空轉).第二次,就是正常的流程.先插入隊尾,而後等待喚醒,再獲取鎖
selfInterrupt(): 喚醒當前線程
static void selfInterrupt() {//在獲取鎖以後 響應intterpt()請求
Thread.currentThread().interrupt();
}
複製代碼
一階段
#java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {
//在acquire()以前先嚐試獲取鎖
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
複製代碼
二階段 acquire()流程與公平鎖如出一轍,惟一區別在於tryAcquire()實現中
#java.util.concurrent.locks.ReentrantLock.NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
#java.util.concurrent.locks.ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {//這個過程其實和FairSync.tryAcquire()基本一致
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//惟一區別: 這裏不會去判斷隊列中是否爲空
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
區別點 | lock()過程(一階段) | tryAcquire()過程(二階段) |
---|---|---|
FairSync | 直接acquire() | 當前若無線程持有鎖,若是同步隊列爲空,獲取鎖 |
NonFairSync | 先嚐試獲取鎖,再acquire() | 當前若無線程持有鎖,獲取鎖 |
#java.util.concurrent.locks.ReentrantLock
public void unlock() {
sync.release(1);
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {//釋放鎖
Node h = head;
if (h != null &&//head節點爲空(非公平鎖直接獲取鎖)
h.waitStatus != 0)
unparkSuccessor(h);//喚醒同步隊列中離head最近的一個waitStatus<=0的節點
return true;
}
return false;
}
#java.util.concurrent.locks.ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//持有鎖的線程==當前線程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//重入鎖所有釋放
free = true;
//置空持有鎖線程
setExclusiveOwnerThread(null);
}
//state==0(此時持有鎖,不用cas)
setState(c);
return free;
}
複製代碼
lockInterruptibly()與lock()過程基本相同,區別在於Thread.intterpt()的應對措施不一樣
//lock()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//表示是否被打斷
boolean interrupted = false;
for (;;) {
//獲取node.pre節點
final Node p = node.predecessor();
if (p == head //當前節點是不是同步隊列中的第二個節點
&& tryAcquire(arg)) {//獲取鎖,當前head指向當前節點
setHead(node);//head=head.next
p.next = null;//置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //是否空轉(由於空轉喚醒是個耗時操做,進入空轉前判斷pre節點狀態.若是pre節點即將釋放鎖,則不進入空轉)
parkAndCheckInterrupt())//利用unsafe.park()進行空轉(阻塞)
interrupted = true;//若是Thread.interrupt()被調用,(不會真的被打斷,會繼續循環空轉直到獲取到鎖)
}
} finally {
if (failed)//tryAcquire()過程出現異常致使獲取鎖失敗,則移除當前節點
cancelAcquire(node);
}
}
// lockInterruptibly()
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//惟一區別當Thread.intterpt()打斷時,直接拋出異常
throw new InterruptedException();
}
} finally {
if (failed)//而後移除當前節點
cancelAcquire(node);
}
}
複製代碼
#java.util.concurrent.locks.ReentrantLock
public boolean tryLock() {
//嘗試獲取非公平鎖
return sync.nonfairTryAcquire(1);
}
複製代碼
#java.util.concurrent.locks.ReentrantLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||//獲取鎖(公平/非公平)
doAcquireNanos(arg, nanosTimeout);//在指定時間內等待鎖(空轉)
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
...
final long deadline = System.nanoTime() + nanosTimeout;
//加入隊尾
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
//上面與acquireQueued()相同,重點看這裏
//計算剩餘時間
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
//利用parkNanos()指定空轉時間
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())//若是被Thread.interrupt(),則拋異常
throw new InterruptedException();
}
} finally {
if (failed)//移除節點
cancelAcquire(node);
}
}
複製代碼
public Condition newCondition() {
return sync.newCondition();
}
#java.util.concurrent.locks.ReentrantLock.Sync
final ConditionObject newCondition() {
return new ConditionObject();
}
複製代碼