Exchanger 是一個同步輔助類,用於兩個併發線程之間在一個同步點進行數據交換。
容許兩個線程在某一個點進行數據交換。
能夠視做雙向的同步隊列;
可應用於基因算法、流水線設計等場景算法
Exchanger提供了 一個同步點 , 在這個同步點,兩個線程能夠交換數據 。每一個線程經過exchange()方法的入口提供數據給另外的線程,並接收其它線程提供的數據,並返回。 安全
public class Exchanger1Test { // 場景描述:一對一的 生產者和消費者,生產者每次生產5個商品,而後消費者把空的商品容器和生產者交換。 // 生產者線程必定要先生產數據,再交換數據,消費者線程必定要先交換數據,再消費數據,不然會出現少消費數據的現象 // 容許原子性的交換兩個(多個)對象,但同時只有一對纔會成功 // exchange方法真的幫一對線程交換了數據; // exchange方法真的會阻塞調用方線程直至另外一方線程參與交易。 public static void main(String[] args) { // Exchanger能夠在兩個線程之間交換數據,只能是2個線程,他不支持更多的線程之間互換數據。 // 兩個線程必須使用同一個Exchanger對象,且只能是兩個線程間的數據交換 // 當線程A調用Exchange對象的exchange()方法後,他會陷入阻塞狀態,直到線程B也調用了exchange()方法,而後以線程安全的方式交換數據,以後線程A和B繼續運行 Exchanger<ArrayList<String>> exchanger = new Exchanger<ArrayList<String>>(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Producer(exchanger)); exec.execute(new Consumer(exchanger)); exec.shutdown(); try { exec.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { // log.error(e, e); } } } // 生產者 class Producer implements Runnable { private ArrayList<String> goods = new ArrayList<String>(); // 商品容器 private Exchanger<ArrayList<String>> exchanger = new Exchanger<ArrayList<String>>(); //控制交易雙方線程的退出 private static AtomicBoolean isDone = new AtomicBoolean(true); public Producer(Exchanger<ArrayList<String>> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && isDone.get()) {// for (int i = 0; i < 3; i++) { // 生產3次 System.out.println("------------------------生產者生產第 " + i + "次"); for (int j = 0; j < 3; j++) { // 每次生產3個商品 String e = (long) (Math.random() * 1000) + ""; goods.add(e); System.out.println("生產了商品:" + e); } try { // 生產者線程必定要先生產數據,再交換數據,消費者線程必定要先交換數據,再消費數據 // exchanger.exchange(v)的時候,當前線程會被阻塞,直到另外一個線程執行該方法,同時完成數據的交換 goods = exchanger.exchange(goods); // 交換數據 System.out.println("生產者:數據交換完畢:得到交換的商品容器大小:" + goods.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } } } // 消費者 class Consumer implements Runnable { private ArrayList<String> goods = new ArrayList<String>(); // 商品容器 private Exchanger<ArrayList<String>> exchanger = new Exchanger<ArrayList<String>>(); private static AtomicBoolean isDone = new AtomicBoolean(true); public Consumer(Exchanger<ArrayList<String>> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && isDone.get()) {//&& !isDone for (int i = 0; i < 3; i++) { // 消費3次 try { // 生產者線程必定要先生產數據,再交換數據,消費者線程必定要先交換數據,再消費數據 // exchanger.exchange(v)的時候,當前線程會被阻塞,直到另外一個線程執行該方法,同時完成數據的交換 goods = exchanger.exchange(goods); // 交換數據 System.out.println("消費者:數據交換完畢:得到交換的商品容器大小:" + goods.size()); // 消費商品 Iterator<String> it = goods.iterator(); if (goods.size() > 0) { System.out.println("*********************消費者消費第 " + i + "次"); while (it.hasNext()) { String next = it.next(); System.out.println("消費了商品:" + next); it.remove(); // 移除消費了的商品 } } } catch (InterruptedException e) { e.printStackTrace(); } } } } }