從JDK 1.5以後,在java.util.concurrent包下引入了好多的處理多線程的工具類,本文將介紹用來控制資源同時訪問個數的Semaphore工具類, 而後採用Semaphore給出一個泊車的實例,最後給出Semaphore和CountDownLatch的幾點比較。java
/** * A counting semaphore. Conceptually, a semaphore maintains a set of * permits. Each {@link #acquire} blocks if necessary until a permit is * available, and then takes it. Each {@link #release} adds a permit, * potentially releasing a blocking acquirer. * However, no actual permit objects are used; the <tt>Semaphore</tt> just * keeps a count of the number available and acts accordingly. * * <p>Semaphores are often used to restrict the number of threads than can * access some (physical or logical) resource. */
從Semaphore的註釋中能夠看出以下幾點: 多線程
從概念上講,信號量維護了一個許可集。若有必要,在許可可用前會阻塞每個 acquire(),而後再獲取該許可。每一個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。app
Semaphore並不使用實際的許可對象,Semaphore 只對可用許可進行計數,並採起相應的行動。 less
Semaphore 一般用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。 ide
Semaphore中定義了一個內部類Sync,該類繼承AbstractQueuedSynchronizer。
從代碼中能夠看出,Semaphore的方法基本上都調用了Sync的方法來實現。Smaphore還提供了公平和非公平的兩種方式. 工具
/* * @(#)Semaphore.java 1.8 04/07/12 * * Copyright 2004 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package java.util.concurrent; import java.util.*; import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; /** * A counting semaphore. Conceptually, a semaphore maintains a set of * permits. Each {@link #acquire} blocks if necessary until a permit is * available, and then takes it. Each {@link #release} adds a permit, * potentially releasing a blocking acquirer. * However, no actual permit objects are used; the <tt>Semaphore</tt> just * keeps a count of the number available and acts accordingly. * * <p>Semaphores are often used to restrict the number of threads than can * access some (physical or logical) resource. For example, here is * a class that uses a semaphore to control access to a pool of items: * <pre> * class Pool { * private static final MAX_AVAILABLE = 100; * private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); * * public Object getItem() throws InterruptedException { * available.acquire(); * return getNextAvailableItem(); * } * * public void putItem(Object x) { * if (markAsUnused(x)) * available.release(); * } * * // Not a particularly efficient data structure; just for demo * * protected Object[] items = ... whatever kinds of items being managed * protected boolean[] used = new boolean[MAX_AVAILABLE]; * * protected synchronized Object getNextAvailableItem() { * for (int i = 0; i < MAX_AVAILABLE; ++i) { * if (!used[i]) { * used[i] = true; * return items[i]; * } * } * return null; // not reached * } * * protected synchronized boolean markAsUnused(Object item) { * for (int i = 0; i < MAX_AVAILABLE; ++i) { * if (item == items[i]) { * if (used[i]) { * used[i] = false; * return true; * } else * return false; * } * } * return false; * } * * } * </pre> * * <p>Before obtaining an item each thread must acquire a permit from * the semaphore, guaranteeing that an item is available for use. When * the thread has finished with the item it is returned back to the * pool and a permit is returned to the semaphore, allowing another * thread to acquire that item. Note that no synchronization lock is * held when {@link #acquire} is called as that would prevent an item * from being returned to the pool. The semaphore encapsulates the * synchronization needed to restrict access to the pool, separately * from any synchronization needed to maintain the consistency of the * pool itself. * * <p>A semaphore initialized to one, and which is used such that it * only has at most one permit available, can serve as a mutual * exclusion lock. This is more commonly known as a [i]binary * semaphore[/i], because it only has two states: one permit * available, or zero permits available. When used in this way, the * binary semaphore has the property (unlike many {@link Lock} * implementations), that the "lock" can be released by a * thread other than the owner (as semaphores have no notion of * ownership). This can be useful in some specialized contexts, such * as deadlock recovery. * * <p> The constructor for this class optionally accepts a * [i]fairness[/i] parameter. When set false, this class makes no * guarantees about the order in which threads acquire permits. In * particular, [i]barging[/i] is permitted, that is, a thread * invoking {@link #acquire} can be allocated a permit ahead of a * thread that has been waiting - logically the new thread places itself at * the head of the queue of waiting threads. When fairness is set true, the * semaphore guarantees that threads invoking any of the {@link * #acquire() acquire} methods are selected to obtain permits in the order in * which their invocation of those methods was processed * (first-in-first-out; FIFO). Note that FIFO ordering necessarily * applies to specific internal points of execution within these * methods. So, it is possible for one thread to invoke * <tt>acquire</tt> before another, but reach the ordering point after * the other, and similarly upon return from the method. * Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not * honor the fairness setting, but will take any permits that are * available. * * <p>Generally, semaphores used to control resource access should be * initialized as fair, to ensure that no thread is starved out from * accessing a resource. When using semaphores for other kinds of * synchronization control, the throughput advantages of non-fair * ordering often outweigh fairness considerations. * * <p>This class also provides convenience methods to {@link * #acquire(int) acquire} and {@link #release(int) release} multiple * permits at a time. Beware of the increased risk of indefinite * postponement when these methods are used without fairness set true. * * @since 1.5 * @author Doug Lea * */ public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync; /** * Synchronization implementation for semaphore. Uses AQS state * to represent permits. Subclassed into fair and nonfair * versions. */ abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int p = getState(); if (compareAndSetState(p, p + releases)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } /** * NonFair version */ final static class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * Fair version */ final static class FairSync extends Sync { FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { Thread current = Thread.currentThread(); for (;;) { Thread first = getFirstQueuedThread(); if (first != null && first != current) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } /** * Creates a <tt>Semaphore</tt> with the given number of * permits and nonfair fairness setting. * @param permits the initial number of permits available. This * value may be negative, in which case releases must * occur before any acquires will be granted. */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * Creates a <tt>Semaphore</tt> with the given number of * permits and the given fairness setting. * @param permits the initial number of permits available. This * value may be negative, in which case releases must * occur before any acquires will be granted. * @param fair true if this semaphore will guarantee first-in * first-out granting of permits under contention, else false. */ public Semaphore(int permits, boolean fair) { sync = (fair)? new FairSync(permits) : new NonfairSync(permits); } /** * Acquires a permit from this semaphore, blocking until one is * available, or the thread is {@link Thread#interrupt interrupted}. * * <p>Acquires a permit, if one is available and returns immediately, * reducing the number of available permits by one. * <p>If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * one of two things happens: * [list] * <li>Some other thread invokes the {@link #release} method for this * semaphore and the current thread is next to be assigned a permit; or * <li>Some other thread {@link Thread#interrupt interrupts} the current * thread. * [/list] * * <p>If the current thread: * [list] * <li>has its interrupted status set on entry to this method; or * <li>is {@link Thread#interrupt interrupted} while waiting * for a permit, * [/list] * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted * * @see Thread#interrupt */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * Acquires a permit from this semaphore, blocking until one is * available. * * <p>Acquires a permit, if one is available and returns immediately, * reducing the number of available permits by one. * <p>If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * some other thread invokes the {@link #release} method for this * semaphore and the current thread is next to be assigned a permit. * * <p>If the current thread * is {@link Thread#interrupt interrupted} while waiting * for a permit then it will continue to wait, but the time at which * the thread is assigned a permit may change compared to the time it * would have received the permit had no interruption occurred. When the * thread does return from this method its interrupt status will be set. * */ public void acquireUninterruptibly() { sync.acquireShared(1); } /** * Acquires a permit from this semaphore, only if one is available at the * time of invocation. * <p>Acquires a permit, if one is available and returns immediately, * with the value <tt>true</tt>, * reducing the number of available permits by one. * * <p>If no permit is available then this method will return * immediately with the value <tt>false</tt>. * * <p>Even when this semaphore has been set to use a * fair ordering policy, a call to <tt>tryAcquire()</tt> [i]will[/i] * immediately acquire a permit if one is available, whether or not * other threads are currently waiting. * This "barging" behavior can be useful in certain * circumstances, even though it breaks fairness. If you want to honor * the fairness setting, then use * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) } * which is almost equivalent (it also detects interruption). * * @return <tt>true</tt> if a permit was acquired and <tt>false</tt> * otherwise. */ public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } /** * Acquires a permit from this semaphore, if one becomes available * within the given waiting time and the * current thread has not been {@link Thread#interrupt interrupted}. * <p>Acquires a permit, if one is available and returns immediately, * with the value <tt>true</tt>, * reducing the number of available permits by one. * <p>If no permit is available then * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: * [list] * <li>Some other thread invokes the {@link #release} method for this * semaphore and the current thread is next to be assigned a permit; or * <li>Some other thread {@link Thread#interrupt interrupts} the current * thread; or * <li>The specified waiting time elapses. * [/list] * <p>If a permit is acquired then the value <tt>true</tt> is returned. * <p>If the current thread: * [list] * <li>has its interrupted status set on entry to this method; or * <li>is {@link Thread#interrupt interrupted} while waiting to acquire * a permit, * [/list] * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * <p>If the specified waiting time elapses then the value <tt>false</tt> * is returned. * If the time is less than or equal to zero, the method will not wait * at all. * * @param timeout the maximum time to wait for a permit * @param unit the time unit of the <tt>timeout</tt> argument. * @return <tt>true</tt> if a permit was acquired and <tt>false</tt> * if the waiting time elapsed before a permit was acquired. * * @throws InterruptedException if the current thread is interrupted * * @see Thread#interrupt * */ public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * Releases a permit, returning it to the semaphore. * <p>Releases a permit, increasing the number of available permits * by one. * If any threads are trying to acquire a permit, then one * is selected and given the permit that was just released. * That thread is (re)enabled for thread scheduling purposes. * <p>There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link #acquire}. * Correct usage of a semaphore is established by programming convention * in the application. */ public void release() { sync.releaseShared(1); } /** * Acquires the given number of permits from this semaphore, * blocking until all are available, * or the thread is {@link Thread#interrupt interrupted}. * * <p>Acquires the given number of permits, if they are available, * and returns immediately, * reducing the number of available permits by the given amount. * * <p>If insufficient permits are available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * one of two things happens: * [list] * <li>Some other thread invokes one of the {@link #release() release} * methods for this semaphore, the current thread is next to be assigned * permits and the number of available permits satisfies this request; or * <li>Some other thread {@link Thread#interrupt interrupts} the current * thread. * [/list] * * <p>If the current thread: * [list] * <li>has its interrupted status set on entry to this method; or * <li>is {@link Thread#interrupt interrupted} while waiting * for a permit, * [/list] * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * Any permits that were to be assigned to this thread are instead * assigned to other threads trying to acquire permits, as if * permits had been made available by a call to {@link #release()}. * * @param permits the number of permits to acquire * * @throws InterruptedException if the current thread is interrupted * @throws IllegalArgumentException if permits less than zero. * * @see Thread#interrupt */ public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } /** * Acquires the given number of permits from this semaphore, * blocking until all are available. * * <p>Acquires the given number of permits, if they are available, * and returns immediately, * reducing the number of available permits by the given amount. * * <p>If insufficient permits are available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * some other thread invokes one of the {@link #release() release} * methods for this semaphore, the current thread is next to be assigned * permits and the number of available permits satisfies this request. * * <p>If the current thread * is {@link Thread#interrupt interrupted} while waiting * for permits then it will continue to wait and its position in the * queue is not affected. When the * thread does return from this method its interrupt status will be set. * * @param permits the number of permits to acquire * @throws IllegalArgumentException if permits less than zero. * */ public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } /** * Acquires the given number of permits from this semaphore, only * if all are available at the time of invocation. * * <p>Acquires the given number of permits, if they are available, and * returns immediately, with the value <tt>true</tt>, * reducing the number of available permits by the given amount. * * <p>If insufficient permits are available then this method will return * immediately with the value <tt>false</tt> and the number of available * permits is unchanged. * * <p>Even when this semaphore has been set to use a fair ordering * policy, a call to <tt>tryAcquire</tt> [i]will[/i] * immediately acquire a permit if one is available, whether or * not other threads are currently waiting. This * "barging" behavior can be useful in certain * circumstances, even though it breaks fairness. If you want to * honor the fairness setting, then use {@link #tryAcquire(int, * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) } * which is almost equivalent (it also detects interruption). * * @param permits the number of permits to acquire * * @return <tt>true</tt> if the permits were acquired and <tt>false</tt> * otherwise. * @throws IllegalArgumentException if permits less than zero. */ public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } /** * Acquires the given number of permits from this semaphore, if all * become available within the given waiting time and the * current thread has not been {@link Thread#interrupt interrupted}. * <p>Acquires the given number of permits, if they are available and * returns immediately, with the value <tt>true</tt>, * reducing the number of available permits by the given amount. * <p>If insufficient permits are available then * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: * [list] * <li>Some other thread invokes one of the {@link #release() release} * methods for this semaphore, the current thread is next to be assigned * permits and the number of available permits satisfies this request; or * <li>Some other thread {@link Thread#interrupt interrupts} the current * thread; or * <li>The specified waiting time elapses. * [/list] * <p>If the permits are acquired then the value <tt>true</tt> is returned. * <p>If the current thread: * [list] * <li>has its interrupted status set on entry to this method; or * <li>is {@link Thread#interrupt interrupted} while waiting to acquire * the permits, * [/list] * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * Any permits that were to be assigned to this thread, are instead * assigned to other threads trying to acquire permits, as if * the permits had been made available by a call to {@link #release()}. * * <p>If the specified waiting time elapses then the value <tt>false</tt> * is returned. * If the time is * less than or equal to zero, the method will not wait at all. * Any permits that were to be assigned to this thread, are instead * assigned to other threads trying to acquire permits, as if * the permits had been made available by a call to {@link #release()}. * * @param permits the number of permits to acquire * @param timeout the maximum time to wait for the permits * @param unit the time unit of the <tt>timeout</tt> argument. * @return <tt>true</tt> if all permits were acquired and <tt>false</tt> * if the waiting time elapsed before all permits were acquired. * * @throws InterruptedException if the current thread is interrupted * @throws IllegalArgumentException if permits less than zero. * * @see Thread#interrupt * */ public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } /** * Releases the given number of permits, returning them to the semaphore. * <p>Releases the given number of permits, increasing the number of * available permits by that amount. * If any threads are trying to acquire permits, then one * is selected and given the permits that were just released. * If the number of available permits satisfies that thread's request * then that thread is (re)enabled for thread scheduling purposes; * otherwise the thread will wait until sufficient permits are available. * If there are still permits available * after this thread's request has been satisfied, then those permits * are assigned in turn to other threads trying to acquire permits. * * <p>There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link Semaphore#acquire acquire}. * Correct usage of a semaphore is established by programming convention * in the application. * * @param permits the number of permits to release * @throws IllegalArgumentException if permits less than zero. */ public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } /** * Returns the current number of permits available in this semaphore. * <p>This method is typically used for debugging and testing purposes. * @return the number of permits available in this semaphore. */ public int availablePermits() { return sync.getPermits(); } /** * Acquire and return all permits that are immediately available. * @return the number of permits */ public int drainPermits() { return sync.drainPermits(); } /** * Shrinks the number of available permits by the indicated * reduction. This method can be useful in subclasses that use * semaphores to track resources that become unavailable. This * method differs from <tt>acquire</tt> in that it does not block * waiting for permits to become available. * @param reduction the number of permits to remove * @throws IllegalArgumentException if reduction is negative */ protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } /** * Returns true if this semaphore has fairness set true. * @return true if this semaphore has fairness set true. */ public boolean isFair() { return sync instanceof FairSync; } /** * Queries whether any threads are waiting to acquire. Note that * because cancellations may occur at any time, a <tt>true</tt> * return does not guarantee that any other thread will ever * acquire. This method is designed primarily for use in * monitoring of the system state. * * @return true if there may be other threads waiting to acquire * the lock. */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * Returns an estimate of the number of threads waiting to * acquire. The value is only an estimate because the number of * threads may change dynamically while this method traverses * internal data structures. This method is designed for use in * monitoring of the system state, not for synchronization * control. * @return the estimated number of threads waiting for this lock */ public final int getQueueLength() { return sync.getQueueLength(); } /** * Returns a collection containing threads that may be waiting to * acquire. Because the actual set of threads may change * dynamically while constructing this result, the returned * collection is only a best-effort estimate. The elements of the * returned collection are in no particular order. This method is * designed to facilitate construction of subclasses that provide * more extensive monitoring facilities. * @return the collection of threads */ protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } /** * Returns a string identifying this semaphore, as well as its state. * The state, in brackets, includes the String * "Permits =" followed by the number of permits. * @return a string identifying this semaphore, as well as its * state */ public String toString() { return super.toString() + "[Permits = " + sync.getPermits() + "]"; } }
本文給出一個使用Semaphore模式30輛車泊車的場景。post
車位有10個,當車位滿時,只能先出來一輛車,而後才能進入一輛車。學習
import java.util.concurrent.Semaphore; /** * * @author wangmengjun * */ public class Car implements Runnable { private final Semaphore parkingSlot; private int carNo; /** * @param parkingSlot * @param carName */ public Car(Semaphore parkingSlot, int carNo) { this.parkingSlot = parkingSlot; this.carNo = carNo; } public void run() { try { parkingSlot.acquire(); parking(); sleep(300); leaving(); parkingSlot.release(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void parking() { System.out.println(String.format("%d號車泊車", carNo)); } private void leaving() { System.out.println(String.format("%d號車離開車位", carNo)); } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * * @author wangmengjun * */ public class ParkingCars { public static void main(String[] args) { Semaphore parkingSlot = new Semaphore(10, true); ExecutorService service = Executors.newCachedThreadPool(); for (int carNo = 1; carNo <= 30; carNo++) { service.execute(new Car(parkingSlot, carNo)); } } }
某次運行結果測試
1號車泊車 7號車泊車 9號車泊車 5號車泊車 10號車泊車 6號車泊車 4號車泊車 3號車泊車 8號車泊車 2號車泊車 8號車離開車位 11號車泊車 3號車離開車位 13號車泊車 4號車離開車位 6號車離開車位 12號車泊車 14號車泊車 10號車離開車位 15號車泊車 5號車離開車位 9號車離開車位 16號車泊車 17號車泊車 7號車離開車位 18號車泊車 1號車離開車位 19號車泊車 2號車離開車位 20號車泊車 12號車離開車位 13號車離開車位 11號車離開車位 14號車離開車位 22號車泊車 21號車泊車 23號車泊車 24號車泊車 15號車離開車位 16號車離開車位 17號車離開車位 27號車泊車 25號車泊車 19號車離開車位 26號車泊車 20號車離開車位 28號車泊車 29號車泊車 18號車離開車位 30號車泊車 24號車離開車位 21號車離開車位 23號車離開車位 22號車離開車位 30號車離開車位 29號車離開車位 26號車離開車位 28號車離開車位 25號車離開車位 27號車離開車位
二者都是用於線程同步的工具類,都經過定義了一個繼承AbstractQueuedSynchronizer的內部類Sync來實現具體的功能。ui
使用CountDownLatch時,它關注的一個線程或者多個線程須要在其它在一組線程完成操做以後,在去作一些事情。好比:服務的啓動等。使用Semaphore時,它關注的是某一個資源最多同時能被幾個線程訪問。
因爲知識的緣由,上述例子以及CountDownLatch和Semaphore的比較上會存在不足,若是有問題請你們指正,也但願你們可以提供二者其它方面的不一樣之處,一塊兒學習分享。