Condition及阻塞隊列(六)

在前面篇幅中講 synchronized 的時候,有講到 wait/notify 的基本使用,結合 synchronized 能夠實現對線程的通訊,既然 J.U.C 裏面提供了鎖的實現機制,那 J.U.C 裏面應該也有提供相似的線程通訊的工具;這個工具類就是 Condition 工具類。Condition 是一個多線程協調通訊的工具類,可讓某些線程一塊兒等待某個條件(condition),只有知足條件時,線程纔會被喚醒

一.Condition 的基本使用

和之前同樣,開篇先用一個例子來演示,咱們就拿之前寫的wait/notify的例子結合Lock進行改造下java

public class Producer implements Runnable{

    private Queue<String> msg;

    private int maxSize;

    Lock lock;
    Condition condition;

    public Producer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
        this.msg = msg;
        this.maxSize = maxSize;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            lock.lock();
                while(msg.size()==maxSize){
                    System.out.println("生產者隊列滿了,先等待");
                    try {
                        condition.await(); //阻塞線程並釋放鎖
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("生產消息:"+i);
                msg.add("生產者的消息內容"+i);
                condition.signal(); //喚醒阻塞狀態下的線程
            lock.unlock();
        }
    }
}

  

public class Consumer implements Runnable{
    private Queue<String> msg;

    private int maxSize;

    Lock lock;
    Condition condition;

    public Consumer(Queue<String> msg, int maxSize, Lock lock, Condition condition) {
        this.msg = msg;
        this.maxSize = maxSize;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            lock.lock(); //synchronized
            while(msg.isEmpty()){
                System.out.println("消費者隊列空了,先等待");
                try {
                    condition.await(); //阻塞線程並釋放鎖   wait
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("消費消息:"+msg.remove());
            condition.signal(); //喚醒阻塞狀態下的線程
            lock.unlock();
        }
    }
}

  

 

 

 有了之前篇幅的介紹,咱們如今應該很清楚lock.lock();作了啥事情,下面咱們就condition.await();作了啥事情分析下,進入 condition.await();方法;node

  public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
//有Node我第一個想到的節點,這段代碼其實就是構造一個Condition的隊列,咱們能夠點擊進去分析下 Node node = addConditionWaiter();
//解決重入問題,跟進代碼看下 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

  

 private Node addConditionWaiter() {
//lastWaiter表示最後一個等待節點,咱們看在ConeitionObject類能夠發現其實他定義了一個首節點和一個尾節點分別是firstWaiter和lastWaiter; Node t = lastWaiter;
//剛剛初始化時t必定是空的 // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) {
//判斷是不是偏鎖狀態 unlinkCancelledWaiters(); t = lastWaiter; }
//構造一個Node節點,傳的是當前線程,當前線程表示的是得到鎖的線程,CONDITION是一個條件隊列;這裏面New Node()和咱們上一篇幅講的同樣會初始一個waitStatus值,初始值是0 Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

  

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
//獲得state值 int savedState = getState();
//進入判斷的relelase方法會發現這是一個釋放鎖的過程;釋放完成後他會叫醒阻梗的線程 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }

  走到這一步阻塞的隊列就會喚醒,新的線程會從新走await方法數組

 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //有Node我第一個想到的節點,這段代碼其實就是構造一個Condition的隊列,咱們能夠點擊進去分析下
            Node node = addConditionWaiter();
            //解決重入問題,跟進代碼看下
            int savedState = fullyRelease(node);
            int interruptMode = 0;
//判斷節點在不在AQS同步隊列中 while (!isOnSyncQueue(node)) {
//若是不在同步隊列中就能夠掛起了 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

  上面的阻塞過程講解完了下面說下喚醒機制是怎麼作的,咱們進入condition.signal();緩存

      

 public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
//獲取頭部節點,此時頭部節點是在Condition隊列中等待被喚醒的頭部節點,喚醒後會移到AQS隊列中去搶佔鎖(由於condition隊列是等待隊列,AQS隊列是搶佔鎖隊列) Node first = firstWaiter; if (first != null)
//頭節點不會爲空因此進入下面方法 doSignal(first); }

  

 private void doSignal(Node first) {
            do {
//條件成立說明當前的Condition隊列中沒有阻塞的線程了 if ( (firstWaiter = first.nextWaiter) == null)
//節點清空 lastWaiter = null;
//斷掉喚醒的節點和等待喚醒的節點間的指向引用 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }

 

  final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
//若是是偏鎖狀態說明這個節點是有問題的,不必進行搶佔鎖,直接斷掉引用進行回收 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
//將當前節點移到AQS隊列中去,從condition等待隊列中移除 Node p = enq(node); int ws = p.waitStatus;
//若是ws>0表示是偏鎖狀態,若是是偏鎖狀態就喚醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

  

 二.阻塞隊列的成員

        

 

 

下面分別簡單介紹一下:多線程

  • ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每個線程在獲取鎖的時候可能都會排隊等待,若是在等待時間上,先獲取鎖的線程的請求必定先被知足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】併發

  • LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度爲Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。less

  • PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認天然序進行排序,也能夠自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。ide

  • DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在建立元素時,能夠指定多久才能從隊列中獲取當前元素。只有延時期滿後才能從隊列中獲取元素。(DelayQueue能夠運用在如下應用場景:1.緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從好比TimerQueue就是使用DelayQueue實現的。)工具

  • SynchronousQueue: 一個不存儲元素的阻塞隊列,每個put操做必須等待take操做,不然不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了60秒後會被回收。ui

  • LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,至關於其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。

  • LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部均可以添加和移除元素,多線程併發時,能夠將鎖的競爭最多降到一半。

相關文章
相關標籤/搜索