ReentrantLock詳解

RenntrantLock

依賴關係

斜體爲抽象類,下橫線爲接口java

聚合關係總結:node

  1. ReentrantLock實現了Lock,Serializable接口
  2. ReentrantLock.Sync(內部類)繼承了AQS
  3. ReentrantLock.NonfairSync和ReentrantLock.FairSync繼承了ReentrantLock.Sync
  4. ReentrantLock持有ReentrantLock.Sync對象(實現鎖功能)

鎖實現總結:多線程

  1. 由Node節點組成一條同步隊列(有head,tail兩個指針,而且head初始化時指向空節點)
  2. int state標記鎖使用數量(獨佔鎖時,一般爲1,發生重入時>1)
  3. lock()時加到隊列尾部
  4. unlock()時,釋放head節點,並指向下一個節點head=head.next,而後喚醒當前head節點

性質:性能

  1. 獨佔鎖(排它鎖):只能有一個線程獲取鎖
  2. 重入鎖:一個線程能夠屢次lock()
  3. 公平/非公平鎖:只針對上鎖過程
    1. 非公平鎖:嘗試獲取鎖,若成功馬上返回,失敗則加入同步隊列
    2. 公平鎖:直接加入同步隊列

Lock

Lock接口定義了鎖的行爲ui

public interface Lock {
	//上鎖(不響應Thread.interrupt()直到獲取鎖)
    void lock();
	//上鎖(響應Thread.interrupt())
    void lockInterruptibly() throws InterruptedException;
	//嘗試獲取鎖(以nonFair方式獲取鎖)
    boolean tryLock();
  	//在指定時間內嘗試獲取鎖(響應Thread.interrupt(),支持公平/二階段非公平)
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
	//解鎖
    void unlock();
	//獲取Condition
    Condition newCondition();
}

複製代碼

lock()過程

//鎖具體實現
private final Sync sync;
//根據傳入參數選擇FairSync或NonfairSync實現
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
	sync.lock();
}
#java.util.concurrent.locks.ReentrantLock.Sync
abstract void lock();
複製代碼

公平鎖

加入同步隊列(當同步隊列爲空時會直接得到鎖),等待鎖this

#java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
	acquire(1);
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
	if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}
複製代碼

acquire()流程:spa

  1. tryAcquire():模板方法,獲取鎖線程

    #java.util.concurrent.locks.ReentrantLock.FairSync
     protected final boolean tryAcquire(int acquires) {
     	//獲取當前線程
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {//當前鎖沒被佔用
     	   if (!hasQueuedPredecessors() &&//1.判斷同步隊列中是否有節點在等待
     		   compareAndSetState(0, acquires)) {//2.若是上面!1成立,修改state值(代表當前鎖已被佔用)
     		   setExclusiveOwnerThread(current);//3.若是2成立,修改當前佔用鎖的線程爲當前線程
     		   return true;
     	   }
        }
        else if (current == getExclusiveOwnerThread()) {//佔用鎖線程==當前線程(重入)
     	   int nextc = c + acquires;//
     	   if (nextc < 0)
     		   throw new Error("Maximum lock count exceeded");
     	   setState(nextc);//修改status
     	   return true;
        }
        return false;//直接獲取鎖失敗
    }
    複製代碼
  2. acquireQueued(addWaiter(Node.EXCLUSIVE), arg):加入同步隊列指針

    #java.util.concurrent.locks.AbstractQueuedSynchronizer
    //1
    private Node addWaiter(Node mode) {
     //生成node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
     	//將node加到隊列尾部
     	   node.prev = pred;
     	   if (compareAndSetTail(pred, node)) {
     		   pred.next = node;
     		   return node;
     	   }
        }
        //若是加入失敗(多線程競爭或者tail指針爲null)
        enq(node);
        return node;
    }
    //1.1  
    private Node enq(final Node node) {
     //死循環加入節點(cas會失敗)
        for (;;) {
     	   Node t = tail;
     	   if (t == null) { //tail爲null,同步隊列初始化
     		//設置head指針
     		   if (compareAndSetHead(new Node()))//注意這裏是個空節點!!
     			   tail = head;//將tail也指向head
     	   } else {
     		   node.prev = t;//將當前node加到隊尾
     		   if (compareAndSetTail(t, node)) {
     			   t.next = node;
     			   return t;//注意這裏才返回
     		   }
     	   }
        }
    }
    //2
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
     	//表示是否被打斷
     	   boolean interrupted = false;
     	   for (;;) {
     		//獲取node.pre節點
     		   final Node p = node.predecessor();
     		   if (p == head //當前節點是不是同步隊列中的第二個節點
     		   && tryAcquire(arg)) {//獲取鎖,head指向當前節點
     			   setHead(node);//head=head.next
     			   p.next = null;//置空 
     			   failed = false;
     			   return interrupted;
     		   }
    
     		   if (shouldParkAfterFailedAcquire(p, node) && //是否空轉(由於空轉喚醒是個耗時操做,進入空轉前判斷pre節點狀態.若是pre節點即將釋放鎖,則不進入空轉)
     			   parkAndCheckInterrupt())//利用unsafe.park()進行空轉(阻塞)
     			   interrupted = true;//若是Thread.interrupt()被調用,(不會真的被打斷,會繼續循環空轉直到獲取到鎖)
     	   }
        } finally {
     	   if (failed)//tryAcquire()過程出現異常致使獲取鎖失敗,則移除當前節點
     		   cancelAcquire(node);
        }
    }
    複製代碼

    過程總結:code

    1. 空轉(若是當前節點是同步隊列中的第二個節點,則直接得到鎖返回)
    2. 得到鎖

    注意:這裏有兩次tryAcquire()過程.第一次,爲了不同步隊列爲空時還插入隊列產生的性能耗費(cas空轉).第二次,就是正常的流程.先插入隊尾,而後等待喚醒,再獲取鎖

  3. selfInterrupt(): 喚醒當前線程

    static void selfInterrupt() {//在獲取鎖以後 響應intterpt()請求
    	Thread.currentThread().interrupt();
    }
    複製代碼

非公平鎖

一階段

#java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {
	//在acquire()以前先嚐試獲取鎖
	if (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
		acquire(1);
}
複製代碼

二階段 acquire()流程與公平鎖如出一轍,惟一區別在於tryAcquire()實現中

#java.util.concurrent.locks.ReentrantLock.NonfairSync
protected final boolean tryAcquire(int acquires) {
 	return nonfairTryAcquire(acquires);
 }
 
#java.util.concurrent.locks.ReentrantLock.Sync
 final boolean nonfairTryAcquire(int acquires) {//這個過程其實和FairSync.tryAcquire()基本一致
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		//惟一區別: 這裏不會去判斷隊列中是否爲空
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}
複製代碼
區別點 lock()過程(一階段) tryAcquire()過程(二階段)
FairSync 直接acquire() 當前若無線程持有鎖,若是同步隊列爲空,獲取鎖
NonFairSync 先嚐試獲取鎖,再acquire() 當前若無線程持有鎖,獲取鎖

unlock()過程

#java.util.concurrent.locks.ReentrantLock
public void unlock() {
	sync.release(1);
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {//釋放鎖
	Node h = head;
	if (h != null &&//head節點爲空(非公平鎖直接獲取鎖)
	h.waitStatus != 0)
		unparkSuccessor(h);//喚醒同步隊列中離head最近的一個waitStatus<=0的節點
	return true;
}
return false;
}
#java.util.concurrent.locks.ReentrantLock
protected final boolean tryRelease(int releases) {
	int c = getState() - releases;
	//持有鎖的線程==當前線程
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {//重入鎖所有釋放
		free = true;
		//置空持有鎖線程
		setExclusiveOwnerThread(null);
	}
	//state==0(此時持有鎖,不用cas)
	setState(c);
	return free;
}
複製代碼

lockInterruptibly()過程

lockInterruptibly()與lock()過程基本相同,區別在於Thread.intterpt()的應對措施不一樣

//lock()
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		//表示是否被打斷
		boolean interrupted = false;
		for (;;) {
			//獲取node.pre節點
			final Node p = node.predecessor();
			if (p == head //當前節點是不是同步隊列中的第二個節點
			&& tryAcquire(arg)) {//獲取鎖,當前head指向當前節點
				setHead(node);//head=head.next
				p.next = null;//置空 
				failed = false;
				return interrupted;
			}

			if (shouldParkAfterFailedAcquire(p, node) && //是否空轉(由於空轉喚醒是個耗時操做,進入空轉前判斷pre節點狀態.若是pre節點即將釋放鎖,則不進入空轉)
				parkAndCheckInterrupt())//利用unsafe.park()進行空轉(阻塞)
				interrupted = true;//若是Thread.interrupt()被調用,(不會真的被打斷,會繼續循環空轉直到獲取到鎖)
		}
	} finally {
		if (failed)//tryAcquire()過程出現異常致使獲取鎖失敗,則移除當前節點
			cancelAcquire(node);
	}
}
// lockInterruptibly()
private void doAcquireInterruptibly(int arg) throws InterruptedException {
	final Node node = addWaiter(Node.EXCLUSIVE);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; 
				failed = false;
				return;
			}
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())//惟一區別當Thread.intterpt()打斷時,直接拋出異常
				throw new InterruptedException();
		}
	} finally {
		if (failed)//而後移除當前節點
			cancelAcquire(node);
	}
}
複製代碼

tryLock()

#java.util.concurrent.locks.ReentrantLock
public boolean tryLock() {
	//嘗試獲取非公平鎖
	return sync.nonfairTryAcquire(1);
}
複製代碼

tryLock(long timeout, TimeUnit unit)

#java.util.concurrent.locks.ReentrantLock
public boolean tryLock(long timeout, TimeUnit unit)
		throws InterruptedException {
	return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
		throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	return tryAcquire(arg) ||//獲取鎖(公平/非公平)
		doAcquireNanos(arg, nanosTimeout);//在指定時間內等待鎖(空轉)
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
		throws InterruptedException {
	...
	final long deadline = System.nanoTime() + nanosTimeout;
	//加入隊尾
	final Node node = addWaiter(Node.EXCLUSIVE);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; 
				failed = false;
				return true;
			}
		  //上面與acquireQueued()相同,重點看這裏
		  //計算剩餘時間
			nanosTimeout = deadline - System.nanoTime();
			if (nanosTimeout <= 0L)
				return false;
			if (shouldParkAfterFailedAcquire(p, node) &&
				nanosTimeout > spinForTimeoutThreshold)
				//利用parkNanos()指定空轉時間
				LockSupport.parkNanos(this, nanosTimeout);
			if (Thread.interrupted())//若是被Thread.interrupt(),則拋異常
				throw new InterruptedException();
		}
	} finally {
		if (failed)//移除節點
			cancelAcquire(node);
	}
}
複製代碼

newCondition()

public Condition newCondition() {
	return sync.newCondition();
}
#java.util.concurrent.locks.ReentrantLock.Sync
final ConditionObject newCondition() {
	return new ConditionObject();
}
複製代碼
相關文章
相關標籤/搜索