AbstractQueuedSynchronizer AQS鎖原理及ReentrantLock非公平鎖的實現

AbstractQueuedSynchronizer AQS鎖原理及ReentrantLock非公平鎖的實現

AbstractQueuedSynchronizer是基於一個FIFO雙向鏈隊列 ==CLH隊列==,用於構建鎖或者同步裝置的類,也稱爲Java同步器,ReentrantLock的公平鎖與非公平鎖就是由該同步器構成,鏈隊列結構圖以下。node

你能夠理解爲銀行ATM機取錢,一我的先去取,獲取到了鎖,在這個時間內其餘線程處於阻塞狀態,只有等他取完錢了,他走了,釋放了鎖,排在它後面的人才能夠獲取到釋放的鎖並進行取錢。多線程

img

該同步器利用一個int值表示狀態,實現方式是==使用內部類繼承該同步器的方式==實現它的tryRelease、tryAcquire等方法管理狀態,管理狀態使用如下三個方法:ide

  • getState() 獲取狀態
  • setState() 基本設置狀態
  • compareAndSetSate(int,int) 基於CAS實現的原子性設置狀態

AQS節點

節點包含的狀態有:測試

  1. CANCELLED,值爲1,表示當前的線程被取消;
  2. SIGNAL,值爲-1,表示當前節點的後繼節點包含的線程須要運行,也就是unpark;
  3. CONDITION,值爲-2,表示當前節點在等待condition,也就是在condition隊列中;
  4. PROPAGATE,值爲-3,表示當前場景下後續的acquireShared可以得以執行;
  5. 值爲0,表示當前節點在sync隊列中,等待着獲取鎖。

節點其餘信息:ui

Node prev
前驅節點
Node next
後繼節點
Node nextWaiter 存儲condition隊列中的後繼節點
Thread thread
入隊列時的當前線程

獨佔鎖

鎖在一個時間點只能被一個線程鎖佔有,AQS實現的ReentrantLock,又分爲公平鎖和非公平鎖this

  • 公平鎖

保障了多線程下各線程獲取鎖的順序,先到的線程優先獲取鎖spa

  • 非公平鎖

加鎖時不考慮排隊等待問題,直接嘗試獲取鎖,獲取不到自動到隊尾等待線程

共享鎖

鎖在一個時間點能夠被多個線程同時獲取,AQS實現的CountDownLatch、ReadWriteLockcode

1、AQS實現ReentrantLock非公平鎖

ReentrantLock非公平鎖獲取鎖

一、lock()

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }複製代碼

二、acquire()

根據指定狀態獲取,能獲取到 執行compareAndSetState方法設置新狀態cdn

public final void acquire(int arg) {
//tryAcquire成功的話 acquire結束;
        if (!tryAcquire(arg) &&
            //AcquireQueued方法進行阻塞等待,直到獲取鎖爲止
            //addWaiter把當前線程添加到隊列尾部
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //tryAcquire失敗而且acquiredQueued成功的話把當前線程中斷
            selfInterrupt();
    }複製代碼

  1. 嘗試獲取鎖;
  2. 若是獲取不到,將當前線程構形成節點Node並加入隊列;
    addWaiter方法把節點加入隊列,每一個線程都是一個節點Node,從而造成了一個雙向隊列,相似CLH隊列。
  3. 再次嘗試獲取,若是沒有獲取到那麼將當前線程從線程調度器上摘下,進入等待狀態。

三、tryAcquire()

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
    //獲取狀態
            int c = getState();
    //若是該鎖未被任何線程佔有,該鎖能被當前線程獲取
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
    //若被獲取到 查看是否被當前線程
            else if (current == getExclusiveOwnerThread()) {
                //是當前線程的話再次獲取,計數+1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }複製代碼

在tryAcquire方法中使用了同步器提供的對state操做的方法,利用CAS原理保證只有一個線程可以對狀態進行成功修改,而沒有成功修改的線程將進入隊列排隊。

四、acquireQueued()

AcquireQueued方法進行阻塞等待,直到獲取鎖爲止

//node爲null,排他方式阻塞等待
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //p爲當前節點的前一個節點
                final Node p = node.predecessor();
                //若是p爲頭結點而且獲取成功就把當前線節點設置爲頭結點
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //若是線程須要被阻塞 則interrputr爲true
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }複製代碼

  1. 獲取當前節點的前驅節點;
  2. 當前驅節點是頭結點而且可以獲取狀態,表明該當前節點佔有鎖;
  3. 不然進入等待狀態。

五、shouldParkAfterFailedAcquire()

判斷線程是否須要阻塞

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //線程須要運行
        if (ws == Node.SIGNAL)
            return true;
       //ws>0 處於CANCEL狀態的線程
        if (ws > 0) {
        //把這些線程從隊列中清除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
         //把等待的設置爲運行狀態
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }複製代碼

  1. 先判斷線程是否處於運行狀態
  2. 若是處於CANCEL狀態則把他們都從隊列中清除
  3. 把當前節點下一個節點等待狀態設置爲準備運行

六、parkAndCheckInterrupt()

// park方法讓其餘線程處於等待狀態
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }複製代碼

  1. 調用阻塞線程本地方法park
  2. 返回線程是否被阻塞

ReentrantLock非公平鎖釋放鎖

一、unlock()

調用release釋放一個鎖

public void unlock() {
    //釋放一個鎖
        sync.release(1);
    }複製代碼

二、release()

釋放鎖

public final boolean release(int arg) {
//調用tryRelease嘗試釋放一個鎖 
        if (tryRelease(arg)) {
            Node h = head;
            //釋放成功後
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }複製代碼

  1. 調用tryRelease方法嘗試釋放鎖,失敗返回false
  2. 釋放成功判斷頭結點不爲null而且狀態不爲等待獲取鎖狀態
  3. 知足條件則喚醒線程並返回true

三、tryRelease()

嘗試釋放當前鎖

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }複製代碼

  1. 釋放鎖必需是當前線程,若是不是當前線程拋出異常
  2. 若是獲取鎖的線程爲當前線程則判斷釋放後的狀態是否爲0,即釋放前爲CANCEL狀態,若是是,則設置獨佔模式線程爲null,並設置狀態爲0,返回true。

四、unparkSuccessor()

鎖釋放後喚醒線程,一同競爭CPU資源

private void unparkSuccessor(Node node) {
        //獲取當前節點狀態
        int ws = node.waitStatus;
        //把狀態設置爲等待獲取鎖狀態
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //若是沒有下一個節點或者下一個節點狀態爲CANCEL,則把它們清除
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //不然調用LockSupport中unpark方法,喚醒後一個節點
        if (s != null)
            LockSupport.unpark(s.thread);
    }複製代碼

  1. 獲取當前節點狀態
  2. 判斷若是狀態<0,即不爲等待獲取鎖狀態,則把狀態設置爲等待獲取鎖狀態 <="" li="">
  3. 若是沒有下一個節點或者下一個節點狀態爲CANCEL,則把它們清除
  4. 噹噹前節點下一個節點不爲null時,調用LockSupport中unpark方法,喚醒後一個節點

五、unpark()

調用UNSAFE的本地方法unpark喚醒線程

public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }複製代碼

2、AQS實現ReentrantLock公平鎖

AQS實現ReentrantLock公平鎖與非公平鎖最大的區別在下面這段代碼:

1563697815749

源碼以下:

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }複製代碼

==判斷"當前線程"是否是在CLH隊列的隊首,來實現公平性==。

3、AQS實現的自定義鎖

public class MyAQSLock implements Lock {
    private final Sync sync;
    public MyAQSLock() {
        sync = new Sync();
    }
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
    /**
     * 把lock、unlock實現使用AQS構建爲內部類
     */
    private class Sync extends AbstractQueuedSynchronizer{
        Condition newCondition(){
            return new ConditionObject();
        }
        @Override
        protected boolean tryAcquire(int arg) {
            //第一個線程進來拿到鎖
            int state = getState();
            //用於重入鎖判斷
            Thread current = Thread.currentThread();
            if(state==0){
                if(compareAndSetState(0,arg)){
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            }
            //重入鎖判斷 當前線程和獨佔鎖線程相同,則再次獲取
            else if(current==getExclusiveOwnerThread()){
                int next = state+arg;
                if(next<0){
                    throw new RuntimeException();
                }
                setState(next);
                return true;
            }
            return false;
        }
        /**
         * 可重入釋放鎖
         * @param arg
         * @return
         */
        @Override
        protected boolean tryRelease(int arg) {
            if(Thread.currentThread()!=getExclusiveOwnerThread()){
                throw new RuntimeException();
            }
            int state = getState()-arg;
            if(state==0){
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
            setState(0);
            return false;
        }

    }
}複製代碼

測試(可重入鎖)

public class TestAQSLock2 {
    MyAQSLock myLock = new MyAQSLock();
    private int value;
    private int value2;
    public int a(){
        myLock.lock();
        try {
            b();
            return value++;
        }finally {
            myLock.unlock();
        }
    }
    public void b(){
        myLock.lock();
        try {
            System.out.println(++value2);
        }finally {
            myLock.unlock();
        }
    }
    public static void main(String[] args) {
        TestAQSLock2 myLock = new TestAQSLock2();
        for(int i=0;i<50;i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + ":" + myLock.a());
            }).start();
        }
    }
}複製代碼
相關文章
相關標籤/搜索