Java中的鎖---隊列同步器

Lock接口

鎖是用來控制多個線程訪問共享資源的方式,通常來講,一個鎖可以防止多個線程同時訪問共享資源。Lock接口在使用時須要顯式地獲取和釋放鎖,擁有了鎖獲取與釋放的可操做性、可中斷的獲取以及超時獲取鎖等多種Synchronized關鍵字所不具有的同步特性。java

Lock lock = new ReentrantLock();
lock.lock();
try{}finally{
    lock.unlock();
}

在finally塊中釋放鎖,目的是保證在獲取鎖以後,最終可以被釋放。 不要將獲取鎖的過程寫在try塊中,由於若是早獲取鎖時發生了了異常,異常拋出的同時,也會致使鎖無端釋放。node

隊列同步器

隊列同步器是用來構建鎖或者其餘同步組件的基礎框架,使用了一個int成員變量表示同步狀態,經過內置的FIFO隊列來完成資源獲取線程的排隊工做。安全

同步器的主要使用方式是繼承,子類經過集成同步器並實現它的抽象方法來管理同步狀態,在抽象方法的實現過程當中免不了要對同步狀態進行更改,用到同步器提供的三個方法(getState(),setState()和compartAndSetState(int expect,int update))來進行操做,由於他們可以保證狀態的改變是安全的。子類推薦被定義爲自定義同步組件的靜態內部類,同步器自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步組件使用,同步器既能夠支持獨佔式地獲取同步狀態,也能夠支持共享式地獲取同步狀態。 鎖與同步器之間的關係:鎖是面向使用者的,定義了使用者與鎖交互的接口,隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操做。併發

同步器提供的模版方法分爲三類:獨佔式獲取與釋放同步狀態、共享式獲取與釋放同步狀態 和 查詢同步隊列中的等待線程狀況。框架

package com.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by cxx on 2018/1/16.
 * 
 * 獨佔鎖示例
 */
public class Mutex implements Lock {

    //靜態內部類,自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer{

        //是否處於佔用狀態
        protected boolean isHeldExclusively(){
            return getState() == 1;
        }

        //當狀態爲0的時候,獲取鎖
        public boolean tryAcquire(int acquires){
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //釋放鎖,將狀態設置爲0
        public boolean tryRelease(int releases){
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        //返回一個condition,每一個condition包含了一個condition隊列
        Condition newCondition(){
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);

    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public boolean isLocked(){
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads(){
        return sync.hasQueuedThreads();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);

    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

獨佔鎖Mutex 是一個自定義同步組件,它在同一時刻只容許一個線程佔用鎖。Mutex中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨佔式獲取和釋放同步狀態。ide

隊列同步器的實現分析

同步隊列

同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待新狀態等信息構形成爲一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。工具

同步器擁有首節點(head)和尾節點(tail),沒有成功獲取同步狀態的線程將會成爲節點加入該隊列的尾部。當一個線程狀態成功地獲取了同步狀態(或鎖),其餘線程將沒法獲取到同步狀態,轉而被構形成爲節點並加入到同步隊列中,而這個加入隊列的過程必須保證線程安全,所以同步器提供了一個基於CAS的設置尾節點的方法:compareAndSetTail(Node expect。Node update),它須要傳遞當前線程「認爲的尾節點」和當前節點,只有設置成功後,當前節點才正式與以前的尾節點創建關聯。oop

2.獨佔式同步狀態獲取與釋放ui

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

主要邏輯是:首先調用自定義同步器實現的 tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態,若是同步狀態獲取失敗,則構造同步節點(獨佔式 Node.EXCLUSIVE,同一時刻只能用一個線程成功獲取同步狀態),並經過 addWaiter方法將該節點加入到同步隊列的尾部,最後調用 acquireQueued方法,使得該節點以」死循環「的方式獲取同步狀態。若是獲取不到阻塞節點中的線程,而被阻塞線程的喚醒主要依靠前驅節點的出隊或阻塞線程被中斷來實現。this

//添加到同步隊列中
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
        節點插入到尾節點:將併發添加節點的請求經過CAS變得」串行化「了。
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在 acquireQueued 方法中,當前線程在」死循環「中嘗試獲取同步狀態,而只有前驅節點是頭結點纔可以嘗試獲取同步狀態。緣由以下:

  1. 頭節點是成功獲取到同步狀態的節點,而頭結點的線程線程釋放了同步狀態後,將會喚醒其後繼節點,後繼節點的線程在被喚醒後須要檢查本身的前驅節點是不是頭結點。
  2. 維護同步隊列的FIFO原則。該方法中,節點自旋獲取同步狀態。

自旋的實現:p == head && tryAcquire(arg)

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列(或中止自旋)的條件是前驅節點爲頭結點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用 tryRelease(int arg)方法釋放同步狀態,而後喚醒頭結點的後繼結點。

共享式同步狀態獲取與釋放

共享式獲取與獨佔式獲取最主要的區別在於同一時刻可否有多個線程同時進入同步狀態。經過調用同步器的acquireShared方法能夠共享式地獲取同步狀態。

/**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在 acquireShared 方法中,同步器調用 tryAcquireShared 嘗試獲取同步狀態。doAcquireShared 方法的自旋過程當中,若是當前節點的前驅爲頭結點時,嘗試獲取同步狀態,若是返回值大於等於0,表示該次獲取同步狀態成功並從自旋過程當中退出。

/**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

releaseShared 在釋放同步狀態以後,將會喚醒後續處於等待狀態的節點。對於可以支持多個線程同時訪問的併發組件,和獨佔式主要區別在於 tryReleaseShared 方法必須確保同步狀態線程安全釋放,通常是經過循環和CAS來保證的,由於釋放同步狀態的操做會同時來自多個線程。

獨佔式超時獲取同步狀態

經過調用同步器的 doAcquireNanos 方法能夠超時獲取同步狀態,既在指定時間段內獲取同步狀態,若是獲取同步狀態則返回 true,不然返回false。

/**
     * Acquires in exclusive timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

若是當前線程獲取同步狀態失敗,則判斷是否超時,若是沒有超時,從新計算超時間隔,而後使當線程等待。若是已經超時,則中斷當前線程,而後獲取同步狀態。

自定義同步組件--TwinsLock

設計一個同步工具:

第一步:肯定訪問模式,共享仍是獨佔。

第二步:定義資源數,

第三部:組合自定義同步器。

TwinsLock 實現了Lock接口,提供了面向使用者的接口,使用者調用Lock() 方法獲取鎖,隨後調用unlock方法釋放鎖。TwinsLock 同時包含了一個自定義同步器 Sync,而該同步器面向線程訪問和同步狀態控制。

相關文章
相關標籤/搜索