AbstractQueuedSynchronizer 隊列同步器

一、當多個線程併發執行的時候,如何完成線程的同步?java

加一把鎖,好比設置一個變量state=0;多個線程同時修改變量state=1;修改爲功的線程,表示拿到了鎖,能夠繼續執行。爲了保證多個線程同時修改時,只有一個線程能夠修改爲功,能夠用方法UNSAFE.compareAndSwapInt。node

/**
             * var1 操做的對象
             * var2 操做的對象屬性
             * var3 var2與var3比較,相等才更新
             * var4 更新值
             */
           unsafe.compareAndSwapInt(this, stateOffset, expect, update);

二、對於獲取鎖失敗的線程該如何處理呢?安全

獲取鎖失敗的線程,咱們能夠調用方法 LockSupport.park(this); 使當前線程中止,而後將線程插入到隊列的尾部。head的後繼節點始終指向隊列的第一個節點。釋放鎖,經過head獲取隊列的第一個節點,而後調用方法併發

LockSupport.unpark(thread);

喚醒該節點上的線程,而後喚醒起來的線程再次經過設置state變量爲0,獲取鎖。ide

上面就是AbstractQueuedSynchronizer(aqs) 的基本原理。測試

三、採用aqs實現一把鎖ui

定義靜態內部類Syn,該類繼承自AbstractQueuedSynchronizer ,重寫方法tryAcquire和tryRelease。AbstractQueuedSynchronizer類採用了模板方法的模式,實現了同步器的基本骨架。this

/**
 * protected boolean tryAcquire(int arg)	獨佔式獲取同步狀態,試着獲取,成功返回true,反之爲false
 * protected boolean tryRelease(int arg) 	獨佔式釋放同步狀態,等待中的其餘線程此時將有機會獲取到同步狀態
 * protected boolean isHeldExclusively() : 是否在獨佔模式下被線程佔用。
 */
public class SelfLock{
    // 靜態內部類,自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否處於佔用狀態
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        // 當狀態爲0的時候獲取鎖
        public boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 釋放鎖,將狀態設置爲0 因爲持有鎖的線程只有一個 故不須要用cas更改狀態
        protected boolean tryRelease(int releases) {
            if (getState() == 0) throw new
                    IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }
    // 僅須要將操做代理到Sync上便可
    private final Sync sync = new Sync();
    public void lock() {
        sync.acquire(1);
    }
    public void unlock() {
        sync.release(0);
    }
}

測試線程

public class TestSelfLock {
    private static SelfLock selfLock = new SelfLock();
    private static int count;
    public static void main(String[] args) throws IOException {
        for (int i = 0; i < 100; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        selfLock.lock();//獲取鎖
                        count++;
                        System.out.println(Thread.currentThread().getName()+"正在運行;count="+count);
                    }finally {
                        selfLock.unlock();//釋放鎖
                    }

                }
            }).start();
        }
        System.in.read();
    }

}

從運行結果能夠看出該鎖保證cout變量的線程安全。代理

 

四、AQS獲取鎖分析

SelfLock的lock方法調用sync.acquire(1),方法acquire以下:

//tryAcquire就是咱們重寫的方法。若是tryAcquire調用返回false,那麼會執行&&後面的代碼,
//將當前線程封裝成一個節點放到aqs內部的隊列裏。
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter方法:從下面的源碼能夠看出addWaiter實際上就是將當前線程封裝成node節點,而後將該節點插入到隊列的尾部。爲了保證每個節點均可以安全的插入到隊列的尾部,採用了for循環+cas的方式。

private Node addWaiter(Node mode) {
        //將當前線程封裝成Node節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //若是tail不爲null
        Node pred = tail;//將pred 指向tail指向的對象。
        if (pred != null) {
            //將tail節點設置當前節點的前驅節點
            node.prev = pred;
            //將node 設置成尾部節點,用了cas操做,若是設置成功,tail指向的對象變爲node,
            //pred依然指向的原來的tail指向的對象
            if (compareAndSetTail(pred, node)) {
                //若是設置成功,那麼將pred的後繼節點設置成node
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    //同步器經過「死循環」來保證節點的正確添加,在「死循
    //環」中只有經過CAS將節點設置成爲尾節點以後,當前線程才能從該方法返回,不然,當前線
    //程不斷地嘗試設置。能夠看出,enq(final Node node)方法將併發添加節點的請求經過CAS變
    //得「串行化」了。
    private Node enq(final Node node) {
        for (;;) {//經過for循環,直到設置成功。
            Node t = tail;
            if (t == null) { // Must initialize
                //若是tail爲null 必須初始化tail和head
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {//設置尾節點 cas操做。
                    t.next = node;
                    return t;
                }
            }
        }
    }

 acquireQueued方法:若是當前線程獲取鎖失敗,那麼該線程就會被阻塞在for循環裏。當被喚醒後,就會再次嘗試獲取鎖,獲取鎖成功後,跳出循環,繼續日後執行。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //若是node的前驅節點爲head,而且tryAcquire方法返回true,則將
                //將頭結點設置成node
                if (p == head && tryAcquire(arg)) {
                    //設置node爲head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //parkAndCheckInterrupt方法裏利用LockSupport.park(this)將當前線程掛起 ,
                //等待被前驅節點喚醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 

五、AQS釋放鎖分析

unlock方法:該方法會調用隊列同步器的release方法。

public void unlock() {
    sync.release(1);
}

release方法會調用咱們重寫的tryRelease方法, release方法主要的做用就是經過head找到head的後繼節點,而後調用LockSupport.unpark(thread)喚醒該節點上的線程。

public final boolean release(int arg) {
    //調用咱們從新的方法tryRelease
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//獲取head的後繼節點
                               //利用LockSupport.unpark(thread)喚醒後繼節點的線程
        return true;
    }
    return false;
}

 

最後:AQS是JUC中不少同步組件的構建基礎,好比ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier,具體能夠詳細查看這些類的內部實現。

相關文章
相關標籤/搜索