exchange

import java.util.concurrent.locks.*
    private V item;    
   
    private int arrivalCount;   
    private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
        lock.lock();        try {
            V other;            // If arrival count already at two, we must wait for
            // a previous pair to finish and reset the count;
            while (arrivalCount == 2) {                if (!timed)
                    taken.await();                else if (nanos > 0) 
                    nanos = taken.awaitNanos(nanos);                else 
                    throw new TimeoutException();
            }            int count = ++arrivalCount;            // If item is already waiting, replace it and signal other thread
            if (count == 2) { 
                other = item;
                item = x;
                taken.signal();                return other;
            }            // Otherwise, set item and wait for another thread to
            // replace it and signal us.

            item = x;
            InterruptedException interrupted = null;            try { 
                while (arrivalCount != 2) {                    if (!timed)
                        taken.await();                    else if (nanos > 0) 
                        nanos = taken.awaitNanos(nanos);                    else 
                        break; // timed out
                }
            } catch (InterruptedException ie) {
                interrupted = ie;
            }            // Get and reset item and count after the wait.
            // (We need to do this even if wait was aborted.)
            other = item;
            item = null;
            count = arrivalCount;
            arrivalCount = 0; 
            taken.signal();            
            // If the other thread replaced item, then we must
            // continue even if cancelled.
            if (count == 2) {                if (interrupted != null)
                    Thread.currentThread().interrupt();                return other;
            }            // If no one is waiting for us, we can back out
            if (interrupted != null) 
                throw interrupted;            else  // must be timeout
                throw new TimeoutException();
        } finally {
            lock.unlock();
        }
    }  
    public Exchanger() {
    }   
    public V exchange(V x) throws InterruptedException {        try {            return doExchange(x, false, 0);
        } catch (TimeoutException cannotHappen) { 
            throw new Error(cannotHappen);
        }
    }  
   
    public V exchange(V x, long timeout, TimeUnit unit) 
        throws InterruptedException, TimeoutException {        return doExchange(x, true, unit.toNanos(timeout));
    }

}
  • 以上爲源碼:java

  • 此類提供對外的操做是同步的;算法

  • 用於成對出現的線程之間交換數據;apache

  • 能夠視做雙向的同步隊列;app

  • 可應用於基因算法、流水線設計等場景。ide

     從官方的javadoc能夠知道,當一個線程到達exchange調用點時,若是它的夥伴線程此前已經調用了此方法,那麼它的夥伴會被調度喚醒並與之進行對象交換,而後各自返回。若是它的夥伴還沒到達交換點,那麼當前線程將會被掛起,直至夥伴線程到達——完成交換正常返回;或者當前線程被中斷——拋出中斷異常;又或者是等候超時——拋出超時異常
this

一個簡單的例子
spa

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/**
 * @Title: ExchangerTest
 * @Description: Test class for Exchanger
 * @Company: CSAIR
 * @Author: lixuanbin
 * @Creation: 2014年12月14日
 * @Version:1.0
 */
public class ExchangerTest {
    protected static final Logger log = Logger.getLogger(ExchangerTest.class);
    private static volatile boolean isDone = false;

    static class ExchangerProducer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 1;
        ExchangerProducer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                for (int i = 1; i <= 3; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        data = i;
                        System.out.println("producer before: " + data);
                        data = exchanger.exchange(data);
                        System.out.println("producer after: " + data);
                    } catch (InterruptedException e) {
                        log.error(e, e);
                    }
                }
                isDone = true;
            }
        }
    }

    static class ExchangerConsumer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        ExchangerConsumer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                data = 0;
                System.out.println("consumer before : " + data);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = exchanger.exchange(data);
                } catch (InterruptedException e) {
                    log.error(e, e);
                }
                System.out.println("consumer after : " + data);
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        ExchangerProducer producer = new ExchangerProducer(exchanger);
        ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
        exec.execute(producer);
        exec.execute(consumer);
        exec.shutdown();
        try {
            exec.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error(e, e);
        }
    }
}
相關文章
相關標籤/搜索