java併發系列-lock-AQS-condition

1、背景

上一篇已經解釋了ReentrantLock,AQS基本框架html

AQS是JUC重要的同步器,全部的鎖基於這個同步器實現,先溫習下 主要由如下幾個重要部分組成java

  • Node 節點
  • head 頭節點
  • tail 尾節點
  • state 當前鎖的狀態
  • acquire(int arg) 獲取鎖
  • acquireQueued(final Node node, int arg) 獲取鎖隊列
  • addWaiter(Node mode) 加入等待隊列
  • release(int) 釋放鎖
  • unparkSuccessor(Node) 喚醒繼任節點
  • ConditionObject 條件對象,功能相似wait/notify

那麼今天就是重點介紹AQS中的ConditionObject 功能和實現。node

2、什麼是condition

在AQS中,Node節點經過nextWaiter指針串起來的就是條件隊列,中有ConditionObject對象,實現接口Condition,由具體的lock,如ReentrantLock.newCondition建立和調用。api

Condition主要接口有:緩存

public interface Condition {  
    void await() throws InterruptedException;  
    void awaitUninterruptibly();  
    long awaitNanos(long nanosTimeout) throws InterruptedException;  
    boolean await(long time, TimeUnit unit) throws InterruptedException;  
    boolean awaitUntil(Date deadline) throws InterruptedException;  
    void signal();  
    void signalAll();  
}
  • 相同點:

在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通訊方式,Condition均可以實現。多線程

  • 不一樣點:

**好比一個對象裏面能夠有多個Condition,能夠註冊在不一樣的condition,能夠有選擇性的調度線程,很靈活。而Synchronized只有一個condition(就是對象自己),全部的線程都註冊在這個conditon身上,線程調度不靈活。 ** Condition和傳統的線程通訊沒什麼區別,Condition的強大之處在於它能夠爲多個線程間創建不一樣的Condition,下面引入API中的一段代碼(原文地址,http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html),ArrayBlockingQueue 提供了相似的功能,實習原理相似。oracle

class BoundedBuffer {  
   final Lock lock = new ReentrantLock();//鎖對象  
   final Condition notFull  = lock.newCondition();//寫線程條件   
   final Condition notEmpty = lock.newCondition();//讀線程條件   
  
   final Object[] items = new Object[100];//緩存隊列  
   int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/;  
  
   public void put(Object x) throws InterruptedException {  
     lock.lock();  
     try {  
       while (count == items.length)//若是隊列滿了   
         notFull.await();//阻塞寫線程  
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;//若是寫索引寫到隊列的最後一個位置了,那麼置爲0  
       ++count;//個數++  
       notEmpty.signal();//喚醒讀線程  
     } finally {  
       lock.unlock();  
     }  
   }  
  
   public Object take() throws InterruptedException {  
     lock.lock();  
     try {  
       while (count == 0)//若是隊列爲空  
         notEmpty.await();//阻塞讀線程  
       Object x = items[takeptr];//取值   
       if (++takeptr == items.length) takeptr = 0;//若是讀索引讀到隊列的最後一個位置了,那麼置爲0  
       --count;//個數--  
       notFull.signal();//喚醒寫線程  
       return x;  
     } finally {  
       lock.unlock();  
     }  
   }   
 }

在多線程環境下,緩存區提供了兩個方法,put and take,put是存數據的,take是取數據的,items充當緩存隊列,運行場景是有多個線程同時往調用put和take;這裏的讀和寫共用的實際上是一把鎖lock,實際讀寫不能同時並行;當寫入一個數據後,便通知讀線程,寫滿後調用notFull.await()阻塞寫;當讀入一個數據後,便通知寫線程,讀空後調用notEmpty.await()阻塞讀,等待寫條件通知信號;框架

3、condition源碼實現

接下來咱們講解AQS.ConditionObject 內部源碼。 主要分析如下幾個方法:less

  • await 阻塞線程,加入條件隊列,等待條件信號釋放
  1. addConditionWaiter
  2. fullyRelease
  3. isOnSyncQueue
  • signal 釋放條件信號,加入鎖等待隊列syc,等待獲取鎖
  1. doSignal
  2. transferForSignal

這裏我先定義下兩個概念:ui

  1. 鎖等待隊列(同步隊列) 即獲取鎖後等待解鎖的隊列,AQS中的syc (FIFO)
  2. 條件等待隊列 即調用await後進入條件隊列,等待signal釋放信號,進入鎖等待隊列(FIFO),ConditionObject 中

await

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();  //添加到Condition本身維護的一個鏈表中,經過nextWaiter鏈接起來,即條件等待隊列  
            int savedState = fullyRelease(node); //釋當前節點的鎖,並返回釋放以前的鎖狀態,執行await必定是先拿到了當前鎖的
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {  //判斷該節點是否在鎖等待隊列中,當釋放鎖後就不在鎖隊列中了,等待條件隊列的線程應當被繼續阻塞,若是在鎖等待隊列中,則說明有資格獲取鎖,執行下一步,聰明的你發現是在signal時被再次加入鎖等待隊列
                LockSupport.park(this); //在條件隊列中阻塞該線程
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
        // 被喚醒後,從新開始正式競爭鎖,一樣,若是競爭不到仍是會將本身沉睡,等待喚醒從新開始競爭。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // 存在下一個節點,從條件隊列中取消不是在CONDITION的線程節點
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

addConditionWaiter

當前waiter線程加入條件等待隊列裏,從隊尾入隊,並將節點線程狀態設置爲CONDITION

/**
         *當前waiter線程加入條件等待隊列裏,從隊尾入隊,並將節點線程狀態設置爲CONDITION
         * [@return](https://my.oschina.net/u/556800) 返回新的節點
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

fullyRelease:

釋當前節點的鎖,並返回釋放以前的鎖狀態,執行await必定是先拿到了當前鎖的

/**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */

//釋當前節點的鎖,並返回釋放以前的鎖狀態,執行await必定是先拿到了當前鎖的

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
 

isOnSyncQueue

判斷這個節點是否在鎖等待隊列中,顯然waitStatus爲CONDITION這種結點,只屬於條件隊列,不在鎖等待隊列中。

判斷這個節點是否在鎖等待隊列中,顯然waitStatus爲CONDITION這種結點,只屬於條件隊列,不在鎖等待隊列中。
   
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

signal

從條件隊列中的頭節點開始,釋放第一個節點線程,加入到鎖等待隊列

//從條件隊列中的頭節點開始,釋放第一個節點線程,加入到鎖等待隊列
        public final void signal() {
            //當前線程不是擁有鎖線程這拋出異常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;//條件隊列中第一個節點
         
            if (first != null)
                doSignal(first);
        }

doSignal

doSignal()方法是一個while循環,直到transferForSignal成功爲止, 不斷地完成這樣一個transfer(條件隊列--->鎖同步等待隊列)操做,直到有一個成功爲止。

/**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //將第一個節點的繼任變量設置空,意義是first與next節點斷掉,便於gc
                first.nextWaiter = null;
              //  將節點從條件隊列轉移到鎖等待隊列,若是成功則返回true,while循環終止,若是轉移失敗,則從first日後找,聰明的你確定就知道,若是是doSignalAll,這從first一直日後執行transferForSignal
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal

將節點從條件隊列轉移到鎖等待隊列,若是成功則返回true transferForSignal作了兩件事

  1. 將結點從條件隊列移到鎖同步等待隊列中
  2. 設置同步中它的前趨結點的waitStatus爲SIGNAL,並掛起當前線程。
/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal).
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
 //假設在條件隊列中,將node狀態設置0,若是waitStatus不能被更改,則該節點已經被取消
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
       //經過enq將節點加入到鎖等待隊列,從隊尾插入,並返回前驅節點。
        Node p = enq(node);
        int ws = p.waitStatus;
      //若是該前驅節點被取消或者更改waitStatus狀態爲SIGNAL失敗,說明前驅失效,則喚醒在鎖同步等待隊列當前節點
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

4、總結

。。。待續

相關文章
相關標籤/搜索