j.u.c系列(06)---之鎖條件:Condition

  

寫在前面

  在沒有Lock以前,咱們使用synchronized來控制同步,配合Object的wait()、notify()系列方法能夠實現等待/通知模式。在Java SE5後,Java提供了Lock接口,相對於Synchronized而言,Lock提供了條件Condition,對線程的等待、喚醒操做更加詳細和靈活。java

Condition簡介

  Condition的做用是對鎖進行更精確的控制。Condition中的await()方法至關於Object的wait()方法,Condition中的signal()方法至關於Object的notify()方法,Condition中的signalAll()至關於Object的notifyAll()方法。不一樣的是,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關鍵字)捆綁使用的;而Condition是須要與"互斥鎖"/"共享鎖"捆綁使用的。node

void await()
// 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。
boolean await(long time, TimeUnit unit)
// 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。
long awaitNanos(long nanosTimeout)
// 形成當前線程在接到信號以前一直處於等待狀態。【注意:該方法對中斷不敏感】
void awaitUninterruptibly()
// 形成當前線程在接到信號、被中斷或到達指定最後期限以前一直處於等待狀態。
boolean awaitUntil(Date deadline)
// 喚醒一個等待線程。
void signal()
// 喚醒全部等待線程。
void signalAll()

 Condtion的實現

  獲取一個Condition必需要經過Lock的newCondition()方法。該方法定義在接口Lock下面,返回的結果是綁定到此 Lock 實例的新 Condition 實例。Condition爲一個接口,其下僅有一個實現類ConditionObject,因爲Condition的操做須要獲取相關的鎖,而AQS則是同步鎖的實現基礎,因此ConditionObject則定義爲AQS的內部類。定義以下:安全

public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    public class ConditionObject implements Condition, java.io.Serializable {}
}

等待隊列

  每一個Condition對象都包含着一個FIFO隊列,該隊列是Condition對象通知/等待功能的關鍵。在隊列中每個節點都包含着一個線程引用,該線程就是在該Condition對象上等待的線程。咱們看Condition的定義就明白了:多線程

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    
    //頭節點
    private transient Node firstWaiter;
    //尾節點
    private transient Node lastWaiter;

    public ConditionObject() {
    }
    
    /** 省略方法 **/
}

  從上面代碼能夠看出Condition擁有首節點(firstWaiter),尾節點(lastWaiter)。當前線程調用await()方法,將會以當前線程構形成一個節點(Node),並將節點加入到該隊列的尾部。結構以下:函數

 

  Node裏面包含了當前線程的引用。Node定義與AQS的CLH同步隊列的節點使用的都是同一個類(AbstractQueuedSynchronized.Node靜態內部類)。ui

  Condition的隊列結構比CLH同步隊列的結構簡單些,新增過程較爲簡單隻須要將原尾節點的nextWaiter指向新增節點,而後更新lastWaiter便可。this

等待(await)

  調用Condition的await()方法會使當前線程進入等待狀態,同時會加入到Condition等待隊列同時釋放鎖。當從await()方法返回時,當前線程必定是獲取了Condition相關連的鎖。spa

    public final void await() throws InterruptedException {
        // 當前線程中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        //當前線程加入等待隊列
        Node node = addConditionWaiter();
        //釋放鎖
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        /**
         * 檢測此節點的線程是否在同步隊上,若是不在,則說明該線程還不具有競爭鎖的資格,則繼續等待
         * 直到檢測到此節點在同步隊列上
         */
        while (!isOnSyncQueue(node)) {
            //線程掛起
            LockSupport.park(this);
            //若是已經中斷了,則退出
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //競爭同步狀態
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //清理下條件隊列中的不是在等待條件的節點
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

  此段代碼的邏輯是:首先將當前線程新建一個節點同時加入到條件隊列中,而後釋放當前線程持有的同步狀態。而後則是不斷檢測該節點表明的線程釋放出如今CLH同步隊列中(收到signal信號以後就會在AQS隊列中檢測到),若是不存在則一直掛起,不然參與競爭同步狀態。線程

  加入條件隊列(addConditionWaiter())源碼以下:code

    private Node addConditionWaiter() {
        Node t = lastWaiter;    //尾節點
        //Node的節點狀態若是不爲CONDITION,則表示該節點不處於等待狀態,須要清除節點
        if (t != null && t.waitStatus != Node.CONDITION) {
            //清除條件隊列中全部狀態不爲Condition的節點
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //當前線程新建節點,狀態CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        /**
         * 將該節點加入到條件隊列中最後一個位置
         */
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

  該方法主要是將當前線程加入到Condition條件隊列中。固然在加入到尾節點以前會清楚全部狀態不爲Condition的節點。

  fullyRelease(Node node),負責釋放該線程持有的鎖。

    final long fullyRelease(Node node) {
        boolean failed = true;
        try {
            //節點狀態--其實就是持有鎖的數量
            long savedState = getState();
            //釋放鎖
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

  isOnSyncQueue(Node node):若是一個節點剛開始在條件隊列上,如今在同步隊列上獲取鎖則返回true

    final boolean isOnSyncQueue(Node node) {
        //狀態爲Condition,獲取前驅節點爲null,返回false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //後繼節點不爲null,確定在CLH同步隊列中
        if (node.next != null)
            return true;

        return findNodeFromTail(node);
    }

  unlinkCancelledWaiters():負責將條件隊列中狀態不爲Condition的節點刪除

        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

通知(signal)

  調用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節點(條件隊列裏的首節點),在喚醒節點前,會將節點移到CLH同步隊列中。

    public final void signal() {
        //檢測當前線程是否爲擁有鎖的獨
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //頭節點,喚醒條件隊列中的第一個節點
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);    //喚醒
    }

  該方法首先會判斷當前線程是否已經得到了鎖,這是前置條件。而後喚醒條件隊列中的頭節點。

  doSignal(Node first):喚醒頭節點

    private void doSignal(Node first) {
        do {
            //修改頭結點,完成舊頭結點的移出工做
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

  doSignal(Node first)主要是作兩件事:1.修改頭節點,2.調用transferForSignal(Node first) 方法將節點移動到CLH同步隊列中。transferForSignal(Node first)源碼以下:

     final boolean transferForSignal(Node node) {
        //將該節點從狀態CONDITION改變爲初始狀態0,
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        //將節點加入到syn隊列中去,返回的是syn隊列中node節點前面的一個節點
        Node p = enq(node);
        int ws = p.waitStatus;
        //若是結點p的狀態爲cancel 或者修改waitStatus失敗,則直接喚醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

整個通知的流程以下:

  1. 判斷當前線程是否已經獲取了鎖,若是沒有獲取則直接拋出異常,由於獲取鎖爲通知的前置條件。
  2. 若是線程已經獲取了鎖,則將喚醒條件隊列的首節點
  3. 喚醒首節點是先將條件隊列中的頭節點移出,而後調用AQS的enq(Node node)方法將其安全地移到CLH同步隊列中
  4. 最後判斷若是該節點的同步狀態是否爲Cancel,或者修改狀態爲Signal失敗時,則直接調用LockSupport喚醒該節點的線程。

總結

  一個線程獲取鎖後,經過調用Condition的await()方法,會將當前線程先加入到條件隊列中,而後釋放鎖,最後經過isOnSyncQueue(Node node)方法不斷自檢看節點是否已經在CLH同步隊列了,若是是則嘗試獲取鎖,不然一直掛起。當線程調用signal()方法後,程序首先檢查當前線程是否獲取了鎖,而後經過doSignal(Node first)方法喚醒CLH同步隊列的首節點。被喚醒的線程,將從await()方法中的while循環中退出來,而後調用acquireQueued()方法競爭同步狀態。

synchronized原理

  synchronized原理在java中,每個對象有且僅有一個同步鎖。這也意味着,同步鎖是依賴於對象而存在。
當咱們調用某對象的synchronized方法時,就獲取了該對象的同步鎖。例如,synchronized(obj)就獲取了「obj這個對象」的同步鎖。
不一樣線程對同步鎖的訪問是互斥的。也就是說,某時間點,對象的同步鎖只能被一個線程獲取到!經過同步鎖,咱們就能在多線程中,實現對「對象/方法」的互斥訪問。 例如,如今有兩個線程A和線程B,它們都會訪問「對象obj的同步鎖」。假設,在某一時刻,線程A獲取到「obj的同步鎖」並在執行一些操做;而此時,線程B也企圖獲取「obj的同步鎖」 —— 線程B會獲取失敗,它必須等待,直到線程A釋放了「該對象的同步鎖」以後線程B才能獲取到「obj的同步鎖」從而才能夠運行。

synchronized基本規則

  synchronized基本規則咱們將synchronized的基本規則總結爲下面3條,並經過實例對它們進行說明。
  第一條: 當一個線程訪問「某對象」的「synchronized方法」或者「synchronized代碼塊」時,其餘線程對「該對象」的該「synchronized方法」或者「synchronized代碼塊」的訪問將被阻塞。
  第二條: 當一個線程訪問「某對象」的「synchronized方法」或者「synchronized代碼塊」時,其餘線程仍然能夠訪問「該對象」的非同步代碼塊。
  第三條: 當一個線程訪問「某對象」的「synchronized方法」或者「synchronized代碼塊」時,其餘線程對「該對象」的其餘的「synchronized方法」或者「synchronized代碼塊」的訪問將被阻塞。

 實例鎖 -- 鎖在某一個實例對象上。若是該類是單例,那麼該鎖也具備全局鎖的概念。
               實例鎖對應的就是synchronized關鍵字。

全局鎖 -- 該鎖針對的是類,不管實例多少個對象,那麼線程都共享該鎖。
               全局鎖對應的就是static synchronized(或者是鎖在該類的class或者classloader對象上)。

Condtion的實現

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();    //獲取鎖
        try {
            // 若是「緩衝已滿」,則等待;直到「緩衝」不是滿的,纔將x添加到緩衝中。
            while (count == items.length)
                notFull.await();
            // 將x添加到緩衝中
            items[putptr] = x; 
            // 將「put統計數putptr+1」;若是「緩衝已滿」,則設putptr爲0。
            if (++putptr == items.length) putptr = 0;
            // 將「緩衝」數量+1
            ++count;
            // 喚醒take線程,由於take線程經過notEmpty.await()等待
            notEmpty.signal();

            // 打印寫入的數據
            System.out.println(Thread.currentThread().getName() + " put  "+ (Integer)x);
        } finally {
            lock.unlock();    // 釋放鎖
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();    //獲取鎖
        try {
            // 若是「緩衝爲空」,則等待;直到「緩衝」不爲空,纔將x從緩衝中取出。
            while (count == 0) 
                notEmpty.await();
            // 將x從緩衝中取出
            Object x = items[takeptr]; 
            // 將「take統計數takeptr+1」;若是「緩衝爲空」,則設takeptr爲0。
            if (++takeptr == items.length) takeptr = 0;
            // 將「緩衝」數量-1
            --count;
            // 喚醒put線程,由於put線程經過notFull.await()等待
            notFull.signal();

            // 打印取出的數據
            System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
            return x;
        } finally {
            lock.unlock();    // 釋放鎖
        }
    } 
}

public class ConditionTest2 {
    private static BoundedBuffer bb = new BoundedBuffer();

    public static void main(String[] args) {
        // 啓動10個「寫線程」,向BoundedBuffer中不斷的寫數據(寫入0-9);
        // 啓動10個「讀線程」,從BoundedBuffer中不斷的讀數據。
        for (int i=0; i<10; i++) {
            new PutThread("p"+i, i).start();
            new TakeThread("t"+i).start();
        }
    }

    static class PutThread extends Thread {
        private int num;
        public PutThread(String name, int num) {
            super(name);
            this.num = num;
        }
        public void run() {
            try {
                Thread.sleep(1);    // 線程休眠1ms
                bb.put(num);        // 向BoundedBuffer中寫入數據
            } catch (InterruptedException e) {
            }
        }
    }

    static class TakeThread extends Thread {
        public TakeThread(String name) {
            super(name);
        }
        public void run() {
            try {
                Thread.sleep(10);                    // 線程休眠1ms
                Integer num = (Integer)bb.take();    // 從BoundedBuffer中取出數據
            } catch (InterruptedException e) {
            }
        }
    }
}

 

p1 put  1
p4 put  4
p5 put  5
p0 put  0
p2 put  2
t0 take 1
p3 put  3
t1 take 4
p6 put  6
t2 take 5
p7 put  7
t3 take 0
p8 put  8
t4 take 2
p9 put  9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9

(01) BoundedBuffer 是容量爲5的緩衝,緩衝中存儲的是Object對象,支持多線程的讀/寫緩衝。多個線程操做「一個BoundedBuffer對象」時,它們經過互斥鎖lock對緩衝區items進行互斥訪問;並且同一個BoundedBuffer對象下的所有線程共用「notFull」和「notEmpty」這兩個Condition。       notFull用於控制寫緩衝,notEmpty用於控制讀緩衝。當緩衝已滿的時候,調用put的線程會執行notFull.await()進行等待;當緩衝區不是滿的狀態時,就將對象添加到緩衝區並將緩衝區的容量count+1,最後,調用notEmpty.signal()緩衝notEmpty上的等待線程(調用notEmpty.await的線程)。 簡言之,notFull控制「緩衝區的寫入」,當往緩衝區寫入數據以後會喚醒notEmpty上的等待線程。       同理,notEmpty控制「緩衝區的讀取」,當讀取了緩衝區數據以後會喚醒notFull上的等待線程。(02) 在ConditionTest2的main函數中,啓動10個「寫線程」,向BoundedBuffer中不斷的寫數據(寫入0-9);同時,也啓動10個「讀線程」,從BoundedBuffer中不斷的讀數據。

相關文章
相關標籤/搜索