Thread.AQS

AQS

  • AbstractQueuedSynchronizer隊列同步器,Lock接口實現的核心,可自定義同步器。
*      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
  • CLH鎖實現圖,將每一個線程構形成Node節點,加入鏈表,新加入線程在列表隊列最後,每次頭結點獲取到所,後續節點繼續獲取鎖。
  • AQS暴露操做方法,隱藏實現細節,集成AQS,重寫獲取鎖方法,可自定義同步鎖。
  • 內部經過voliate修飾的int變量,unsafe方法提供的compareAndSet方法提供同步,逐步包裝成同步隊列,同步鎖。
  • 原理總結,用AQS的volatile的同步特性,設置int鎖數量,unsafe對象的CAS同步設置方法,構造同步隊列,最後unsafe的park,unpark直接操做線程掛起釋放。
static final class Node {
       
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
 
        static final int SIGNAL    = -1;
     
        static final int CONDITION = -2;
  
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
    
        Node nextWaiter;
 
    }
  • 以Lock的實現Reentranlock爲例
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final Sync sync;
    //實現AQS重寫鎖
    //Lock的全部操做,經過代理內部類調用AQS,先看非公平鎖
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        abstract void lock();
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        //2. sync的非公平實現,快速調用父類AQS的CAS設置state是,若是失敗,進入acquire構造Node節點,加入CLK
        //
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        
        //3. AQS的acruird,tryAcquire交給子類自定實現,addWaiter加入CLK節點,acquireQueued,全部節點自旋,獲取同步狀態
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    //1. 外部調用Lock
    public void lock() {
        sync.lock();
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public Condition newCondition() {
        return sync.newCondition();
    }
}

    //4. AQS中的實現,tryAcquire交給子類
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
      //5. ReentranLock裏Sync非公平鎖實現
      final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //獲取AQS中state
            int c = getState();
            if (c == 0) {
                //AQS的CAS同步設置state,並設置持有線程
                if (compareAndSetState(0, acquires)) {
                    //設置持有線程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //可重入關鍵,同一線程可重複獲取鎖
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    //6. 加入CLK鏈表尾部,經過AQS,CAS設置tail節點,構造鏈表見自定容器Stack
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //快速設置tail,若是失敗進入enq
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    //死循環設置尾節點,直到成功,以死循環的方式,同步設置tail節點
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    //7. 最後,每一個構造的節點,都以自旋的方式,獲取前一節點,若是前一節點是頭節點,而且獲取到鎖,把當前本身設置成頭結點,獲的鎖。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //parkAndCheckInterrupt 調用LockSupport掛起線程,進入阻塞狀態
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    //unlock,調用AQS釋放
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            //LockSupport的unparking,釋放線程掛起狀態    
            unparkSuccessor(h);
            return true;
        }
        return false;
    }
     //1. 交給子類實現
     protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息