Semaphore原理分析

      計數信號量用來控制同時訪問某個特定資源的操做數量,或者同時執行某個指定操做的數量。計數信號量還能夠用來實現某種資源池,或者對容器施加邊界,如數據庫鏈接池,固定大小的數據集。
      Semaphore 管理一組虛擬的許可,在執行操做的時能夠先得到許可,並在使用之後釋放許可。若是沒有許可,那麼 acquire 將阻塞直到有許可,release 方法返回一個可用信號量。Semaphore 內部是經過 AQS 來實現的。

Semaphore 主要實現方法:java

  1. Semaphore 構造函數,傳入信號數目,Semaphore 將信號數目設置到 AQS 的 state 上;
  2. acquire 獲取信號,若是信號數目大於0,經過 CAS 將信號數減1, 獲取成功;不然被阻塞,並將線程記錄到 AQS 的等待鎖隊列中;
  3. release 釋放信號,經過 CAS 將信號數加1,並喚醒等待隊列上隊首線程從新嘗試獲取信號。


Semapore 結構:node

public class Semaphore implements Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Semaphore.Sync sync;

    public Semaphore(int var1) {
        this.sync = new Semaphore.NonfairSync(var1);
    }

    public Semaphore(int var1, boolean var2) {
        this.sync = (Semaphore.Sync)(var2?new Semaphore.FairSync(var1):new Semaphore.NonfairSync(var1));
    }

    public void acquire() throws InterruptedException {
        this.sync.acquireSharedInterruptibly(1);
    }

    public void acquireUninterruptibly() {
        this.sync.acquireShared(1);
    }

    public boolean tryAcquire() {
        return this.sync.nonfairTryAcquireShared(1) >= 0;
    }

    public boolean tryAcquire(long var1, TimeUnit var3) throws InterruptedException {
        return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1));
    }

    public void release() {
        this.sync.releaseShared(1);
    }

    public void acquire(int var1) throws InterruptedException {
        if(var1 < 0) {
            throw new IllegalArgumentException();
        } else {
            this.sync.acquireSharedInterruptibly(var1);
        }
    }

    public void acquireUninterruptibly(int var1) {
        if(var1 < 0) {
            throw new IllegalArgumentException();
        } else {
            this.sync.acquireShared(var1);
        }
    }

    public boolean tryAcquire(int var1) {
        if(var1 < 0) {
            throw new IllegalArgumentException();
        } else {
            return this.sync.nonfairTryAcquireShared(var1) >= 0;
        }
    }

    public boolean tryAcquire(int var1, long var2, TimeUnit var4) throws InterruptedException {
        if(var1 < 0) {
            throw new IllegalArgumentException();
        } else {
            return this.sync.tryAcquireSharedNanos(var1, var4.toNanos(var2));
        }
    }

    public void release(int var1) {
        if(var1 < 0) {
            throw new IllegalArgumentException();
        } else {
            this.sync.releaseShared(var1);
        }
    }   
 }

從 Semaphore 中咱們能夠看到 Semaphore 的鎖是經過 Sync 這個類完成的,Sync 則繼承自 AQS ,AQS 是獨佔鎖和共享鎖的父類,經過繼承 AQS 實現共享鎖。
Sync 有兩個子類是 FairSync 和 NonfairSync,分別表明公平鎖和非公平鎖。Semaphore 默認是非公平鎖。爲何使用非公平鎖,這個是性能上的考慮,若是每次都去喚醒線程去獲取信號,這是很是消耗資源的,非公平鎖的性能和吞吐量也明顯優於公平鎖。

FairSync 結構:數據庫

static final class FairSync extends Semaphore.Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int var1) {
            super(var1);
        }

        protected int tryAcquireShared(int var1) {
            int var2;
            int var3;
            do {
                if(this.hasQueuedPredecessors()) {
                    return -1;
                }

                var2 = this.getState();
                var3 = var2 - var1;
            } while(var3 >= 0 && !this.compareAndSetState(var2, var3));

            return var3;
        }
    }

FairSync 判斷是否有線程等待,若是沒有則嘗試獲取信號,若是有則加入到等待隊列。

NonfariSync 結構:函數

static final class NonfairSync extends Semaphore.Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int var1) {
            super(var1);
        }

        protected int tryAcquireShared(int var1) {
            return this.nonfairTryAcquireShared(var1);
        }
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int var1) {
            this.setState(var1);
        }

        final int getPermits() {
            return this.getState();
        }

        final int nonfairTryAcquireShared(int var1) {
            int var2;
            int var3;
            do {
                var2 = this.getState();
                var3 = var2 - var1;
            } while(var3 >= 0 && !this.compareAndSetState(var2, var3));

            return var3;
        }

        protected final boolean tryReleaseShared(int var1) {
            int var2;
            int var3;
            do {
                var2 = this.getState();
                var3 = var2 + var1;
                if(var3 < var2) {
                    throw new Error("Maximum permit count exceeded");
                }
            } while(!this.compareAndSetState(var2, var3));

            return true;
        }

        final void reducePermits(int var1) {
            int var2;
            int var3;
            do {
                var2 = this.getState();
                var3 = var2 - var1;
                if(var3 > var2) {
                    throw new Error("Permit count underflow");
                }
            } while(!this.compareAndSetState(var2, var3));

        }

        final int drainPermits() {
            int var1;
            do {
                var1 = this.getState();
            } while(var1 != 0 && !this.compareAndSetState(var1, 0));

            return var1;
        }
    }

NonfairSync 則先嚐試獲取鎖,若是獲取失敗則再加入等待隊列。

AQS 結構:oop

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable {
    private static final long serialVersionUID = 7373984972572414691L;
    private transient volatile AbstractQueuedSynchronizer.Node head;
    private transient volatile AbstractQueuedSynchronizer.Node tail;
    private volatile int state;
    static final long spinForTimeoutThreshold = 1000L;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    protected AbstractQueuedSynchronizer() {
    }
	
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    // 獲取失敗會被阻塞
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 設置等待線程頭部,並傳播喚醒下一個頭部等待線程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 喚醒等待線程隊首線程
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    private void doReleaseShared() {
        while(true) {
            AbstractQueuedSynchronizer.Node var1 = this.head;
            if(var1 != null && var1 != this.tail) {
                int var2 = var1.waitStatus;
                if(var2 == -1) {
                    if(!compareAndSetWaitStatus(var1, -1, 0)) {
                        continue;
                    }

                    this.unparkSuccessor(var1);
                } else if(var2 == 0 && !compareAndSetWaitStatus(var1, 0, -3)) {
                    continue;
                }
            }

            if(var1 == this.head) {
                return;
            }
        }
    }
}

Semaphore 調用 acquire方法時,調用 AQS 的 acquireSharedInterruptibly 方法,AQS 則調用子類的 tryAcquireShared 方法,若是獲取成功,則直接返回;若是獲取失敗,則調用調用 AQS 的方法 doAcquireSharedInterruptibly 阻塞並加入到等待隊列等待喚醒。
Semaphore 調用 release 方法時,調用 AQS 的 releaseShared 方法,AQS 則調用子類的 tryReleaseShared 方法,若是釋放成功,則調用 doReleaseShared 方法喚醒等待隊列隊首線程,線程啓動後,若是 tryAcquireShared 返回值大於等於 0,則經過 setHeadAndPropagate 方法進行傳播,喚醒下一個線程。性能

相關文章
相關標籤/搜索