java中的隊列同步器AQS -- AbstractQueuedSynchronizer

1)原理:java

1)使用一個int成員變量表示同步狀態(private volatile int state;),經過同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理(即:線程的排隊)。
2)當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構形成爲一個節點(Node)並將其加入到同步隊列的尾部,而且當前線程(因進行自旋操做)被阻塞。
3)首節點是成功獲取到同步狀態的節點,首節點的線程在釋放同步狀態時,將會喚醒後繼節點,然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點。
	
同步隊列中的節點(Node):用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和後繼節點。

2)說明:node

1)AQS是用來構建鎖或者其它同步工具的基礎框架,鎖和同步工具都採用封裝一個AQS的子類(靜態內部類)的方式來實現其功能。
2)同步器的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態,
3)經過getState()、setState(int newState)和compareAndSetState(int expect,int update)來安全地修改狀態。

3)同步器中的方法:編程

1)同步器中能夠被子類重寫的方法:
	protected boolean tryAcquire(int arg) // 獨佔式獲取同步狀態
	protected boolean tryRelease(int arg) // 獨佔式釋放同步狀態
	protected int tryAcquireShared(int arg) // 共享式獲取同步狀態,當返回值大於等於0時,表示獲取成功。
	protected boolean tryReleaseShared(int arg) // 共享式釋放同步狀態
	protected boolean isHeldExclusively() // 當前同步器是否在獨佔模式下被佔用。
		
2)模板方法:

	1>獨佔式操做同步狀態的方法:
		public final void acquire(int arg) 
			獨佔式獲取同步狀態,若是當前線程成功獲取同步狀態,則該方法返回;不然,當前線程會進入同步隊列等待。
			該方法將會調用子類重寫的tryAcquire(int arg)方法;若是後續對線程進行中斷操做,線程不會從同步隊列中移出。
		
		public final void acquireInterruptibly(int arg) throws InterruptedException {
			在acquire(int arg)的基礎上增長了中斷響應。
			若當前線程未獲取到同步狀態,進入同步隊列等待時,若是當前線程被中斷,則該方法拋出終端異常並返回。
			
		public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
			在acquireInterruptibly(int arg)的基礎上增長了超時限制。
			在指定的時間內,獲取到同步狀態返回true,不然返回false。
			
		public final boolean release(int arg)
			獨佔式釋放同步狀態,該方法會在釋放同步狀態以後,會喚醒其後繼節點(即:將同步隊列中第一個節點包含的線程喚醒)。
			該方法將會調用子類重寫的tryRelease(int arg)方法;

	2>共享式操做同步狀態的方法:
		public final void acquireShared(int arg)
			共享式的獲取同步狀態,若是當前線程成功獲取同步狀態,則該方法返回;不然,當前線程會進入同步隊列等待。
			該方法將會調用子類重寫的tryAcquireShared(int arg)方法。tryAcquireShared(int arg)方法返回值爲int類型,當返回值大於等於0時,表示可以獲取到同步狀態。
			所以,在共享式獲取的自旋過程當中,成功獲取到同步狀態並退出自旋的條件就是tryAcquireShared(int arg)方法返回值大於等於0。
			
			與獨佔式獲取的區別:在同一個時刻,能夠有多個線程同時獲取到同步狀態。
		
		public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {	
			在acquireShared(int arg)的基礎上增長了超時限制。
			
		public final boolean releaseShared(int arg)
			共享式釋放同步狀態
			該方法將會調用子類重寫的tryReleaseShared(int arg)方法;
			tryReleaseShared(int arg)方法必須確保同步狀態的安全釋放(通常是經過循環和CAS來保證同步狀態的安全釋放)。
			
3)方法源碼分析:
	
獨佔式:
	1)獲取同步狀態的方法:
		public final void acquire(int arg) {
			if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
				selfInterrupt();
		}
		
		final boolean acquireQueued(final Node node, int arg) {
			boolean failed = true;
			try {
				boolean interrupted = false;
				for (;;) { // 自旋
					final Node p = node.predecessor();
					// 1移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成功獲取了同步狀態。
					// 2當前驅節點是頭節點時,纔會去嘗試獲取同步狀態
					if (p == head && tryAcquire(arg)) { 
						setHead(node);
						p.next = null; // help GC
						failed = false;
						return interrupted;
					}
					if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 進入等待狀態並檢查時是否被中斷
						interrupted = true;
				}
			} finally {
				if (failed)
					cancelAcquire(node);
			}
		}
		
		1>調用自定義同步器實現的tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態。
		2>若是同步狀態獲取失敗,則構造獨佔式(Node.EXCLUSIVE)同步節點並經過addWaiter(Node node)方法將該節點加入到同步隊列的尾部。
		3>最後調用acquireQueued(Node node,int arg)方法,使得該節點以自旋的方式獲取同步狀態。
		4>若是獲取不到同步狀態,則阻塞節點中的線程。
		5>被阻塞線程的喚醒主要依靠前驅節點的出隊或阻塞線程被中斷來實現。
		
	2)釋放同步狀態的方法:
		public final boolean release(int arg) {
			if (tryRelease(arg)) {
				Node h = head;
				if (h != null && h.waitStatus != 0)
					unparkSuccessor(h);
				return true;
			}
			return false;
		}
		
		
共享式:
	1)獲取同步狀態的方法:
		public final void acquireShared(int arg) {
			if (tryAcquireShared(arg) < 0)
				doAcquireShared(arg);
		}
		
		private void doAcquireShared(int arg) {
			final Node node = addWaiter(Node.SHARED);	// 則構造共享式(Node.SHARED)同步節點,並將該節點加入到同步隊列的尾部
			boolean failed = true;
			try {
				boolean interrupted = false;
				for (;;) { // 自旋
					final Node p = node.predecessor();
					if (p == head) { 					// 當前驅節點是頭節點時,纔會去嘗試獲取同步狀態
						int r = tryAcquireShared(arg);	
						if (r >= 0) {					// 當返回值大於等於0時,表示獲取同步狀態成功並從退出自旋
							setHeadAndPropagate(node, r);
							p.next = null; // help GC
							if (interrupted) selfInterrupt();
							failed = false;
							return;
						}
					}
					if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
						interrupted = true;
				}
			} finally {
				if (failed)
					cancelAcquire(node);
			}
		}
	
	2)釋放同步狀態的方法:
		public final boolean releaseShared(int arg) {
			if (tryReleaseShared(arg)) {
				doReleaseShared();
				return true;
			}
			return false;
		}

		private void doReleaseShared() {
			for (;;) {	// 經過循環和CAS來保證同步狀態的安全釋放
				Node h = head;
				if (h != null && h != tail) {
					int ws = h.waitStatus;
					if (ws == Node.SIGNAL) {
						if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
							continue;            // loop to recheck cases
						unparkSuccessor(h);
					}
					else if (ws == 0 &&
							 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
						continue;                // loop on failed CAS
				}
				if (h == head)                   // loop if head changed
					break;
			}
		}

參考書籍:<java併發編程的藝術>安全

相關文章
相關標籤/搜索