import java.util.concurrent.locks.* private V item; private int arrivalCount; private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException { lock.lock(); try { V other; // If arrival count already at two, we must wait for // a previous pair to finish and reset the count; while (arrivalCount == 2) { if (!timed) taken.await(); else if (nanos > 0) nanos = taken.awaitNanos(nanos); else throw new TimeoutException(); } int count = ++arrivalCount; // If item is already waiting, replace it and signal other thread if (count == 2) { other = item; item = x; taken.signal(); return other; } // Otherwise, set item and wait for another thread to // replace it and signal us. item = x; InterruptedException interrupted = null; try { while (arrivalCount != 2) { if (!timed) taken.await(); else if (nanos > 0) nanos = taken.awaitNanos(nanos); else break; // timed out } } catch (InterruptedException ie) { interrupted = ie; } // Get and reset item and count after the wait. // (We need to do this even if wait was aborted.) other = item; item = null; count = arrivalCount; arrivalCount = 0; taken.signal(); // If the other thread replaced item, then we must // continue even if cancelled. if (count == 2) { if (interrupted != null) Thread.currentThread().interrupt(); return other; } // If no one is waiting for us, we can back out if (interrupted != null) throw interrupted; else // must be timeout throw new TimeoutException(); } finally { lock.unlock(); } } public Exchanger() { } public V exchange(V x) throws InterruptedException { try { return doExchange(x, false, 0); } catch (TimeoutException cannotHappen) { throw new Error(cannotHappen); } } public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { return doExchange(x, true, unit.toNanos(timeout)); } }
以上爲源碼:java
此類提供對外的操做是同步的;算法
用於成對出現的線程之間交換數據;apache
能夠視做雙向的同步隊列;app
可應用於基因算法、流水線設計等場景。ide
從官方的javadoc能夠知道,當一個線程到達exchange調用點時,若是它的夥伴線程此前已經調用了此方法,那麼它的夥伴會被調度喚醒並與之進行對象交換,而後各自返回。若是它的夥伴還沒到達交換點,那麼當前線程將會被掛起,直至夥伴線程到達——完成交換正常返回;或者當前線程被中斷——拋出中斷異常;又或者是等候超時——拋出超時異常。
this
一個簡單的例子
spa
import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; /** * @Title: ExchangerTest * @Description: Test class for Exchanger * @Company: CSAIR * @Author: lixuanbin * @Creation: 2014年12月14日 * @Version:1.0 */ public class ExchangerTest { protected static final Logger log = Logger.getLogger(ExchangerTest.class); private static volatile boolean isDone = false; static class ExchangerProducer implements Runnable { private Exchanger<Integer> exchanger; private static int data = 1; ExchangerProducer(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && !isDone) { for (int i = 1; i <= 3; i++) { try { TimeUnit.SECONDS.sleep(1); data = i; System.out.println("producer before: " + data); data = exchanger.exchange(data); System.out.println("producer after: " + data); } catch (InterruptedException e) { log.error(e, e); } } isDone = true; } } } static class ExchangerConsumer implements Runnable { private Exchanger<Integer> exchanger; private static int data = 0; ExchangerConsumer(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && !isDone) { data = 0; System.out.println("consumer before : " + data); try { TimeUnit.SECONDS.sleep(1); data = exchanger.exchange(data); } catch (InterruptedException e) { log.error(e, e); } System.out.println("consumer after : " + data); } } } /** * @param args */ public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); Exchanger<Integer> exchanger = new Exchanger<Integer>(); ExchangerProducer producer = new ExchangerProducer(exchanger); ExchangerConsumer consumer = new ExchangerConsumer(exchanger); exec.execute(producer); exec.execute(consumer); exec.shutdown(); try { exec.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error(e, e); } } }