thread_Exchanger數據交換

 Exchanger 是一個同步輔助類,用於兩個併發線程之間在一個同步點進行數據交換。
 容許兩個線程在某一個點進行數據交換。
 能夠視做雙向的同步隊列;
 可應用於基因算法、流水線設計等場景算法

Exchanger提供了 一個同步點 , 在這個同步點,兩個線程能夠交換數據 。每一個線程經過exchange()方法的入口提供數據給另外的線程,並接收其它線程提供的數據,並返回。 安全

public class Exchanger1Test {

    // 場景描述:一對一的 生產者和消費者,生產者每次生產5個商品,而後消費者把空的商品容器和生產者交換。
    // 生產者線程必定要先生產數據,再交換數據,消費者線程必定要先交換數據,再消費數據,不然會出現少消費數據的現象
    // 容許原子性的交換兩個(多個)對象,但同時只有一對纔會成功
    // exchange方法真的幫一對線程交換了數據;
    // exchange方法真的會阻塞調用方線程直至另外一方線程參與交易。
    public static void main(String[] args) {

        // Exchanger能夠在兩個線程之間交換數據,只能是2個線程,他不支持更多的線程之間互換數據。
        // 兩個線程必須使用同一個Exchanger對象,且只能是兩個線程間的數據交換
        // 當線程A調用Exchange對象的exchange()方法後,他會陷入阻塞狀態,直到線程B也調用了exchange()方法,而後以線程安全的方式交換數據,以後線程A和B繼續運行
        Exchanger<ArrayList<String>> exchanger = new Exchanger<ArrayList<String>>();
        ExecutorService exec = Executors.newCachedThreadPool();
 
        exec.execute(new Producer(exchanger));
        exec.execute(new Consumer(exchanger));
        exec.shutdown();
        try {
            exec.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            // log.error(e, e);
        }    
    }
}

// 生產者
class Producer implements Runnable {
    private ArrayList<String> goods = new ArrayList<String>(); // 商品容器
    private Exchanger<ArrayList<String>> exchanger = new Exchanger<ArrayList<String>>();
    //控制交易雙方線程的退出
    private static AtomicBoolean isDone = new AtomicBoolean(true);
    public Producer(Exchanger<ArrayList<String>> exchanger) {
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        while (!Thread.interrupted() &&  isDone.get()) {//
            
            for (int i = 0; i < 3; i++) { // 生產3次
                System.out.println("------------------------生產者生產第 " + i + "次");
                for (int j = 0; j < 3; j++) { // 每次生產3個商品
                    String e = (long) (Math.random() * 1000) + "";
                    goods.add(e);
                    System.out.println("生產了商品:" + e);
                }
                try {
                    // 生產者線程必定要先生產數據,再交換數據,消費者線程必定要先交換數據,再消費數據
                    // exchanger.exchange(v)的時候,當前線程會被阻塞,直到另外一個線程執行該方法,同時完成數據的交換
                    goods = exchanger.exchange(goods); // 交換數據
                    System.out.println("生產者:數據交換完畢:得到交換的商品容器大小:" + goods.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

// 消費者
class Consumer implements Runnable {
    private ArrayList<String> goods = new ArrayList<String>(); // 商品容器
    private Exchanger<ArrayList<String>> exchanger = new Exchanger<ArrayList<String>>();
    private static AtomicBoolean isDone = new AtomicBoolean(true);
    public Consumer(Exchanger<ArrayList<String>> exchanger) {
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        while (!Thread.interrupted() && isDone.get()) {//&& !isDone
            for (int i = 0; i < 3; i++) { // 消費3次
                try {
                    // 生產者線程必定要先生產數據,再交換數據,消費者線程必定要先交換數據,再消費數據
                    // exchanger.exchange(v)的時候,當前線程會被阻塞,直到另外一個線程執行該方法,同時完成數據的交換
                    goods = exchanger.exchange(goods); // 交換數據
                    System.out.println("消費者:數據交換完畢:得到交換的商品容器大小:" + goods.size());
                    
                    // 消費商品
                    Iterator<String> it = goods.iterator();
                    if (goods.size() > 0) {
                        System.out.println("*********************消費者消費第 " + i + "次");
                        while (it.hasNext()) {
                            String next = it.next();
                            System.out.println("消費了商品:" + next);
                            it.remove(); // 移除消費了的商品
                        }
                    }
                    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
        }
    }
}
相關文章
相關標籤/搜索