簡要理解鎖、同步器之間的關係

自定義獨佔鎖

package com.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by cxx on 2018/1/16.
 *
 * 獨佔鎖示例
 */
public class Mutex implements Lock {

    //靜態內部類,自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer{

        //是否處於佔用狀態
        protected boolean isHeldExclusively(){
            return getState() == 1;
        }

        //當狀態爲0的時候,獲取鎖
        public boolean tryAcquire(int acquires){
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //釋放鎖,將狀態設置爲0
        public boolean tryRelease(int releases){
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //返回一個condition,每一個condition包含了一個condition隊列
        Condition newCondition(){
            return new ConditionObject();
        }
    }

    /***
     * 將操做代理到Sync上便可
     */
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);

    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public boolean isLocked(){
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads(){
        return sync.hasQueuedThreads();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);

    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

如代碼所示,獨佔鎖實現了在同一時刻只能用一個線程獲取到鎖,而其餘獲取鎖的線程只能處於同步等待隊列中等待,只有獲取鎖的線程釋放了鎖,後繼的線程纔可以獲取鎖。java

#鎖、同步器、使用者node

  • 鎖是面向使用者的,定義了使用者與鎖交互的接口,隱藏了實現細節。
  • 同步器面向的是鎖的實現着,簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的隊列、等待與喚醒等底層操做。
  • 鎖和同步器很好的隔離了使用者和實現着所須要關注的領域。

如上的mutex鎖同樣,具體的實現代理到sync,mutex只須要向用戶定義交互方式便可。安全

AQS的方法

公有方法:

獨佔鎖的獲取與釋放(包括了對同步隊列的操做)ide

  • acquire(int arg):獨佔式獲取同步狀態,若是當前線程獲取同步狀態成功,則由該方法返回,不然,將會進入同步隊列等待,該方法將會調用可重寫的tryAcquire(int arg)方法;工具

  • release(int arg):獨佔式釋放同步狀態,該方法會在釋放同步狀態以後,將同步隊列中第一個節點包含的線程喚醒;ui

共享鎖的獲取與釋放(包括了對同步隊列的操做)this

  • acquireShared(int arg):共享式獲取同步狀態,若是當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨佔式的主要區別是在同一時刻能夠有多個線程獲取到同步狀態;
  • releaseShared(int arg):共享式釋放同步狀態;

須要由子類實現的保護方法

獨佔鎖的獲取與釋放的具體實現(沒有對同步隊列的操做,功能單一)線程

  • tryAcquire(int arg):獨佔式獲取同步狀態,獲取同步狀態成功後,其餘線程須要等待該線程釋放同步狀態才能獲取同步狀態;
  • tryRelease(int arg):獨佔式釋放同步狀態;

共享鎖的獲取與釋放(沒有對同步隊列的操做)設計

  • tryAcquireShared(int arg):共享式獲取同步狀態,返回值大於等於0則表示獲取成功,不然獲取失敗;
  • tryReleaseShared(int arg):共享式釋放同步狀態;

操做隊列同步器的狀態代理

  • getState():返回同步狀態的當前值;
  • setState(int newState):設置當前同步狀態;
  • compareAndSetState(int expect, int update):使用CAS設置當前狀態,該方法可以保證狀態設置的原子性;

其餘的方法

  • isHeldExclusively():當前同步器是否在獨佔式模式下被線程佔用,通常該方法表示是否被當前線程所獨佔;
  • acquireInterruptibly(int arg):與acquire(int arg)相同,可是該方法響應中斷,當前線程爲獲取到同步狀態而進入到同步隊列中,若是當前線程被中斷,則該方法會拋出InterruptedException異常並返回;
  • tryAcquireNanos(int arg,long nanos):超時獲取同步狀態,若是當前線程在nanos時間內沒有獲取到同步狀態,那麼將會返回false,已經獲取則返回true;
  • acquireSharedInterruptibly(int arg):共享式獲取同步狀態,響應中斷;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式獲取同步狀態,增長超時限制;

AQS的所有方法如圖所示

輸入圖片說明

輸入圖片說明

輸入圖片說明

AQS小結

  • 在基於AQS構建的同步器中,只能在一個時刻發生阻塞,從而下降上下文切換的開銷,提升了吞吐量。同時在設計AQS時充分考慮了可伸縮行,所以J.U.C中全部基於AQS構建的同步器都可以得到這個優點。

  • AQS的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態。

  • AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操做,固然AQS能夠確保對state的操做是安全的。

  • AQS經過內置的FIFO同步隊列來完成資源獲取線程的排隊工做,若是當前線程獲取同步狀態失敗時,AQS則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程

  • 當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。

CLH 同步隊列的實現

在CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),其定義以下:

static final class Node {
    /** 共享 */
    static final Node SHARED = new Node();

    /** 獨佔 */
    static final Node EXCLUSIVE = null;

    /**
     * 由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態;
     */
    static final int CANCELLED =  1;

    /**
     * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行
     */
    static final int SIGNAL    = -1;

    /**
     * 節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,改節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中
     */
    static final int CONDITION = -2;

    /**
     * 表示下一次共享式同步狀態獲取將會無條件地傳播下去
     */
    static final int PROPAGATE = -3;

    /** 等待狀態 */
    volatile int waitStatus;

    /** 前驅節點 */
    volatile Node prev;

    /** 後繼節點 */
    volatile Node next;

    /** 獲取同步狀態的線程 */
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

狀態(waitStatus)

  • CANCELLED :值爲1,因爲在同步隊列中等待的線程等待超時或者被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會發生變化。
  • SIGNAL:值爲 -1,後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行。
  • CONDITION: 值爲-1,節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()方法後,該節點將會從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中。
  • Propagate:值爲 -3,表示下一次共享式同步狀態獲取將會無條件地被傳播下去。
  • Initial:值爲0,初始狀態。

入隊

輸入圖片說明

  • compareAndSetTail(Node expect,Node update)方法來確保節點可以被線程安全添加。
  • enq(final Node node),同步器經過 「死循環」來保證節點的正確添加,在「死循環」中只有經過CAS將節點設置成爲尾節點以後,當前線程纔可以從該方法返回,不然,當前線程不斷地嘗試設置。
  • acquireQueued ,進入一個自旋的過程,每一個節點都在自省地觀察,當條件知足,獲取到同步狀態,就能夠從這個自旋過程當中退出。

第一,頭結點是成功獲取到同步狀態的節點,而頭結點的線程釋放了同步狀態以後,將會喚醒其後繼節點,後繼節點的線程被喚醒後須要檢查本身的前驅節點是不是頭結點。

第二,維護同步隊列的FIFO原則。

AQS:阻塞和喚醒線程

在線程獲取同步狀態時若是獲取失敗,則加入CLH同步隊列,經過經過自旋的方式不斷獲取同步狀態,可是在自旋的過程當中則須要判斷當前線程是否須要阻塞,其主要方法在acquireQueued():

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

經過這段代碼咱們能夠看到,在獲取同步狀態失敗後,線程並非立馬進行阻塞,須要檢查該線程的狀態,檢查狀態的方法爲 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,該方法主要靠前驅節點判斷當前線程是否應該被阻塞,代碼以下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驅節點
        int ws = pred.waitStatus;
        //狀態爲signal,表示當前線程處於等待狀態,直接放回true
        if (ws == Node.SIGNAL)
            return true;
        //前驅節點狀態 > 0 ,則爲Cancelled,代表該節點已經超時或者被中斷了,須要從同步隊列中取消
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } 
        //前驅節點狀態爲Condition、propagate
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

這段代碼主要檢查當前線程是否須要被阻塞,具體規則以下:

  • 若是當前線程的前驅節點狀態爲SINNAL,則代表當前線程須要被阻塞,調用unpark()方法喚醒,直接返回true,當前線程阻塞

  • 若是當前線程的前驅節點狀態爲CANCELLED(ws > 0),則代表該線程的前驅節點已經等待超時或者被中斷了,則須要從CLH隊列中將該前驅節點刪除掉,直到回溯到前驅節點狀態 <= 0 ,返回false

  • 若是前驅節點非SINNAL,非CANCELLED,則經過CAS的方式將其前驅節點設置爲SINNAL,返回false

若是 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,則調用parkAndCheckInterrupt()方法阻塞當前線程:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt() 方法主要是把當前線程掛起,從而阻塞住線程的調用棧,同時返回當前線程的中斷狀態。其內部則是調用LockSupport工具類的park()方法來阻塞該方法。

當線程釋放同步狀態後,則須要喚醒該線程的後繼節點:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
				//喚醒後繼節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

調用unparkSuccessor(Node node)喚醒後繼節點:

private void unparkSuccessor(Node node) {
        //當前節點狀態
        int ws = node.waitStatus;
        //當前狀態 < 0 則設置爲 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //當前節點的後繼節點
        Node s = node.next;
        //後繼節點爲null或者其狀態 > 0 (超時或者被中斷了)
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從tail節點來找可用節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒後繼節點
        if (s != null)
            LockSupport.unpark(s.thread);
    }

可能會存在當前線程的後繼節點爲null,超時、被中斷的狀況,若是遇到這種狀況了,則須要跳過該節點,可是爲什麼是從tail尾節點開始,而不是從node.next開始呢?緣由在於node.next仍然可能會存在null或者取消了,因此採用tail回溯辦法找第一個可用的線程。最後調用LockSupport的unpark(Thread thread)方法喚醒該線程。

連接

AQS:阻塞和喚醒線程

相關文章
相關標籤/搜索