在沒有Lock以前,咱們使用synchronized來控制同步,配合Object的wait()、notify()系列方法能夠實現等待/通知模式。在Java SE5後,Java提供了Lock接口,相對於Synchronized而言,Lock提供了條件Condition,對線程的等待、喚醒操做更加詳細和靈活。java
Condition的做用是對鎖進行更精確的控制。Condition中的await()方法至關於Object的wait()方法,Condition中的signal()方法至關於Object的notify()方法,Condition中的signalAll()至關於Object的notifyAll()方法。不一樣的是,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關鍵字)捆綁使用的;而Condition是須要與"互斥鎖"/"共享鎖"捆綁使用的。node
void await() // 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。 boolean await(long time, TimeUnit unit) // 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。 long awaitNanos(long nanosTimeout) // 形成當前線程在接到信號以前一直處於等待狀態。【注意:該方法對中斷不敏感】 void awaitUninterruptibly() // 形成當前線程在接到信號、被中斷或到達指定最後期限以前一直處於等待狀態。 boolean awaitUntil(Date deadline) // 喚醒一個等待線程。 void signal() // 喚醒全部等待線程。 void signalAll()
獲取一個Condition必需要經過Lock的newCondition()方法。該方法定義在接口Lock下面,返回的結果是綁定到此 Lock 實例的新 Condition 實例。Condition爲一個接口,其下僅有一個實現類ConditionObject,因爲Condition的操做須要獲取相關的鎖,而AQS則是同步鎖的實現基礎,因此ConditionObject則定義爲AQS的內部類。定義以下:安全
public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { public class ConditionObject implements Condition, java.io.Serializable {} }
每一個Condition對象都包含着一個FIFO隊列,該隊列是Condition對象通知/等待功能的關鍵。在隊列中每個節點都包含着一個線程引用,該線程就是在該Condition對象上等待的線程。咱們看Condition的定義就明白了:多線程
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; //頭節點 private transient Node firstWaiter; //尾節點 private transient Node lastWaiter; public ConditionObject() { } /** 省略方法 **/ }
從上面代碼能夠看出Condition擁有首節點(firstWaiter),尾節點(lastWaiter)。當前線程調用await()方法,將會以當前線程構形成一個節點(Node),並將節點加入到該隊列的尾部。結構以下:函數
Node裏面包含了當前線程的引用。Node定義與AQS的CLH同步隊列的節點使用的都是同一個類(AbstractQueuedSynchronized.Node靜態內部類)。ui
Condition的隊列結構比CLH同步隊列的結構簡單些,新增過程較爲簡單隻須要將原尾節點的nextWaiter指向新增節點,而後更新lastWaiter便可。this
調用Condition的await()方法會使當前線程進入等待狀態,同時會加入到Condition等待隊列同時釋放鎖。當從await()方法返回時,當前線程必定是獲取了Condition相關連的鎖。spa
public final void await() throws InterruptedException { // 當前線程中斷 if (Thread.interrupted()) throw new InterruptedException(); //當前線程加入等待隊列 Node node = addConditionWaiter(); //釋放鎖 long savedState = fullyRelease(node); int interruptMode = 0; /** * 檢測此節點的線程是否在同步隊上,若是不在,則說明該線程還不具有競爭鎖的資格,則繼續等待 * 直到檢測到此節點在同步隊列上 */ while (!isOnSyncQueue(node)) { //線程掛起 LockSupport.park(this); //若是已經中斷了,則退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //競爭同步狀態 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //清理下條件隊列中的不是在等待條件的節點 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
此段代碼的邏輯是:首先將當前線程新建一個節點同時加入到條件隊列中,而後釋放當前線程持有的同步狀態。而後則是不斷檢測該節點表明的線程釋放出如今CLH同步隊列中(收到signal信號以後就會在AQS隊列中檢測到),若是不存在則一直掛起,不然參與競爭同步狀態。線程
加入條件隊列(addConditionWaiter())源碼以下:code
private Node addConditionWaiter() { Node t = lastWaiter; //尾節點 //Node的節點狀態若是不爲CONDITION,則表示該節點不處於等待狀態,須要清除節點 if (t != null && t.waitStatus != Node.CONDITION) { //清除條件隊列中全部狀態不爲Condition的節點 unlinkCancelledWaiters(); t = lastWaiter; } //當前線程新建節點,狀態CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); /** * 將該節點加入到條件隊列中最後一個位置 */ if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
該方法主要是將當前線程加入到Condition條件隊列中。固然在加入到尾節點以前會清楚全部狀態不爲Condition的節點。
fullyRelease(Node node),負責釋放該線程持有的鎖。
final long fullyRelease(Node node) { boolean failed = true; try { //節點狀態--其實就是持有鎖的數量 long savedState = getState(); //釋放鎖 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
isOnSyncQueue(Node node):若是一個節點剛開始在條件隊列上,如今在同步隊列上獲取鎖則返回true
final boolean isOnSyncQueue(Node node) { //狀態爲Condition,獲取前驅節點爲null,返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //後繼節點不爲null,確定在CLH同步隊列中 if (node.next != null) return true; return findNodeFromTail(node); }
unlinkCancelledWaiters():負責將條件隊列中狀態不爲Condition的節點刪除
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
調用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節點(條件隊列裏的首節點),在喚醒節點前,會將節點移到CLH同步隊列中。
public final void signal() { //檢測當前線程是否爲擁有鎖的獨 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //頭節點,喚醒條件隊列中的第一個節點 Node first = firstWaiter; if (first != null) doSignal(first); //喚醒 }
該方法首先會判斷當前線程是否已經得到了鎖,這是前置條件。而後喚醒條件隊列中的頭節點。
doSignal(Node first):喚醒頭節點
private void doSignal(Node first) { do { //修改頭結點,完成舊頭結點的移出工做 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal(Node first)主要是作兩件事:1.修改頭節點,2.調用transferForSignal(Node first) 方法將節點移動到CLH同步隊列中。transferForSignal(Node first)源碼以下:
final boolean transferForSignal(Node node) { //將該節點從狀態CONDITION改變爲初始狀態0, if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //將節點加入到syn隊列中去,返回的是syn隊列中node節點前面的一個節點 Node p = enq(node); int ws = p.waitStatus; //若是結點p的狀態爲cancel 或者修改waitStatus失敗,則直接喚醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
整個通知的流程以下:
一個線程獲取鎖後,經過調用Condition的await()方法,會將當前線程先加入到條件隊列中,而後釋放鎖,最後經過isOnSyncQueue(Node node)方法不斷自檢看節點是否已經在CLH同步隊列了,若是是則嘗試獲取鎖,不然一直掛起。當線程調用signal()方法後,程序首先檢查當前線程是否獲取了鎖,而後經過doSignal(Node first)方法喚醒CLH同步隊列的首節點。被喚醒的線程,將從await()方法中的while循環中退出來,而後調用acquireQueued()方法競爭同步狀態。
synchronized原理在java中,每個對象有且僅有一個同步鎖。這也意味着,同步鎖是依賴於對象而存在。
當咱們調用某對象的synchronized方法時,就獲取了該對象的同步鎖。例如,synchronized(obj)就獲取了「obj這個對象」的同步鎖。
不一樣線程對同步鎖的訪問是互斥的。也就是說,某時間點,對象的同步鎖只能被一個線程獲取到!經過同步鎖,咱們就能在多線程中,實現對「對象/方法」的互斥訪問。 例如,如今有兩個線程A和線程B,它們都會訪問「對象obj的同步鎖」。假設,在某一時刻,線程A獲取到「obj的同步鎖」並在執行一些操做;而此時,線程B也企圖獲取「obj的同步鎖」 —— 線程B會獲取失敗,它必須等待,直到線程A釋放了「該對象的同步鎖」以後線程B才能獲取到「obj的同步鎖」從而才能夠運行。
synchronized基本規則咱們將synchronized的基本規則總結爲下面3條,並經過實例對它們進行說明。
第一條: 當一個線程訪問「某對象」的「synchronized方法」或者「synchronized代碼塊」時,其餘線程對「該對象」的該「synchronized方法」或者「synchronized代碼塊」的訪問將被阻塞。
第二條: 當一個線程訪問「某對象」的「synchronized方法」或者「synchronized代碼塊」時,其餘線程仍然能夠訪問「該對象」的非同步代碼塊。
第三條: 當一個線程訪問「某對象」的「synchronized方法」或者「synchronized代碼塊」時,其餘線程對「該對象」的其餘的「synchronized方法」或者「synchronized代碼塊」的訪問將被阻塞。
實例鎖 -- 鎖在某一個實例對象上。若是該類是單例,那麼該鎖也具備全局鎖的概念。
實例鎖對應的就是synchronized關鍵字。
全局鎖 -- 該鎖針對的是類,不管實例多少個對象,那麼線程都共享該鎖。
全局鎖對應的就是static synchronized(或者是鎖在該類的class或者classloader對象上)。
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); //獲取鎖 try { // 若是「緩衝已滿」,則等待;直到「緩衝」不是滿的,纔將x添加到緩衝中。 while (count == items.length) notFull.await(); // 將x添加到緩衝中 items[putptr] = x; // 將「put統計數putptr+1」;若是「緩衝已滿」,則設putptr爲0。 if (++putptr == items.length) putptr = 0; // 將「緩衝」數量+1 ++count; // 喚醒take線程,由於take線程經過notEmpty.await()等待 notEmpty.signal(); // 打印寫入的數據 System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x); } finally { lock.unlock(); // 釋放鎖 } } public Object take() throws InterruptedException { lock.lock(); //獲取鎖 try { // 若是「緩衝爲空」,則等待;直到「緩衝」不爲空,纔將x從緩衝中取出。 while (count == 0) notEmpty.await(); // 將x從緩衝中取出 Object x = items[takeptr]; // 將「take統計數takeptr+1」;若是「緩衝爲空」,則設takeptr爲0。 if (++takeptr == items.length) takeptr = 0; // 將「緩衝」數量-1 --count; // 喚醒put線程,由於put線程經過notFull.await()等待 notFull.signal(); // 打印取出的數據 System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x); return x; } finally { lock.unlock(); // 釋放鎖 } } } public class ConditionTest2 { private static BoundedBuffer bb = new BoundedBuffer(); public static void main(String[] args) { // 啓動10個「寫線程」,向BoundedBuffer中不斷的寫數據(寫入0-9); // 啓動10個「讀線程」,從BoundedBuffer中不斷的讀數據。 for (int i=0; i<10; i++) { new PutThread("p"+i, i).start(); new TakeThread("t"+i).start(); } } static class PutThread extends Thread { private int num; public PutThread(String name, int num) { super(name); this.num = num; } public void run() { try { Thread.sleep(1); // 線程休眠1ms bb.put(num); // 向BoundedBuffer中寫入數據 } catch (InterruptedException e) { } } } static class TakeThread extends Thread { public TakeThread(String name) { super(name); } public void run() { try { Thread.sleep(10); // 線程休眠1ms Integer num = (Integer)bb.take(); // 從BoundedBuffer中取出數據 } catch (InterruptedException e) { } } } }
p1 put 1 p4 put 4 p5 put 5 p0 put 0 p2 put 2 t0 take 1 p3 put 3 t1 take 4 p6 put 6 t2 take 5 p7 put 7 t3 take 0 p8 put 8 t4 take 2 p9 put 9 t5 take 3 t6 take 6 t7 take 7 t8 take 8 t9 take 9
(01) BoundedBuffer 是容量爲5的緩衝,緩衝中存儲的是Object對象,支持多線程的讀/寫緩衝。多個線程操做「一個BoundedBuffer對象」時,它們經過互斥鎖lock對緩衝區items進行互斥訪問;並且同一個BoundedBuffer對象下的所有線程共用「notFull」和「notEmpty」這兩個Condition。 notFull用於控制寫緩衝,notEmpty用於控制讀緩衝。當緩衝已滿的時候,調用put的線程會執行notFull.await()進行等待;當緩衝區不是滿的狀態時,就將對象添加到緩衝區並將緩衝區的容量count+1,最後,調用notEmpty.signal()緩衝notEmpty上的等待線程(調用notEmpty.await的線程)。 簡言之,notFull控制「緩衝區的寫入」,當往緩衝區寫入數據以後會喚醒notEmpty上的等待線程。 同理,notEmpty控制「緩衝區的讀取」,當讀取了緩衝區數據以後會喚醒notFull上的等待線程。(02) 在ConditionTest2的main函數中,啓動10個「寫線程」,向BoundedBuffer中不斷的寫數據(寫入0-9);同時,也啓動10個「讀線程」,從BoundedBuffer中不斷的讀數據。