Java併發新構件之Exchanger

    Exchanger是在兩個任務之間交換對象的柵欄。當兩個任務進入柵欄時,它們各自擁有一個對象,當它們離開時,它們都擁有對方的對象。Exchanger的典型應用場景是:一個任務在建立對象,而這些對象的生產代價很高,另外一個任務在消費這些對象。經過這種方式,能夠有更多的對象在被建立的同時被消費。
java

    爲了演示Exchanger類,咱們將建立生產者和消費者任務。ExchangerProducer和ExchangerConsumer使用一個List<Fat>做爲要求交換的對象,它們都包含一個用於這個List<Fat>的Exchanger。當你調用Exchanger.exchange()方法時,它將阻塞直至對方任務調用它本身的exchange()方法,那時,這兩個exchange()方法將同時完成,而List<Fat>被交換:
ide

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class ExchangerProducer implements Runnable {
    private List<Fat> holder;
    private Exchanger<List<Fat>> exchanger;
    public ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
        this.exchanger = exchanger;
        this.holder = holder;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //填充列表
                for (int i = 0;i < ExchangerDemo.size; i++) {
                    holder.add(new Fat());
                }
                //等待交換
                holder = exchanger.exchange(holder);
            }
        } catch (InterruptedException e) {
        }
        System.out.println("Producer stopped.");
    }
}

class ExchangerConsumer implements Runnable {
    private List<Fat> holder;
    private Exchanger<List<Fat>> exchanger;
    private volatile Fat value;
    private static int num = 0;
    public ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
        this.exchanger = exchanger;
        this.holder = holder;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //等待交換
                holder = exchanger.exchange(holder);
                //讀取列表並移除元素
                for (Fat x : holder) {
                    num++;
                    value = x;
                    //在循環內刪除元素,這對於CopyOnWriteArrayList是沒有問題的
                    holder.remove(x);
                }
                if (num % 10000 == 0) {
                    System.out.println("Exchanged count=" + num);
                }
            }
        } catch (InterruptedException e) {
            
        }
        System.out.println("Consumer stopped. Final value: " + value);
    }
}

public class ExchangerDemo {
    static int size = 10;
    static int delay = 5; //秒
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        List<Fat> producerList = new CopyOnWriteArrayList<>();
        List<Fat> consumerList = new CopyOnWriteArrayList<>();
        Exchanger<List<Fat>> exchanger = new Exchanger<>();
        exec.execute(new ExchangerProducer(exchanger, producerList));
        exec.execute(new ExchangerConsumer(exchanger, consumerList));
        TimeUnit.SECONDS.sleep(delay);
        exec.shutdownNow();
    }
}

class Fat {
    private volatile double d;
    private static int counter = 1;
    private final int id = counter++;
    public Fat() {
        //執行一段耗時的操做
        for (int i = 1; i<10000; i++) {
            d += (Math.PI + Math.E) / (double)i;
        }
    }
    public void print() {System.out.println(this);}
    public String toString() {return "Fat id=" + id;}
}

執行結果(可能的結果):this

Exchanged count=10000
Exchanged count=20000
Exchanged count=30000
Exchanged count=40000
Exchanged count=50000
Exchanged count=60000
Exchanged count=70000
Exchanged count=80000
Consumer stopped. Final value: Fat id=88300
Producer stopped.

    在main()中,建立了用於兩個任務的單一的Exchanger,以及兩個用於互換的CopyOnWriteArrayList。這個特定的List變體容許列表在被遍歷的時候調用remove()方法,而不會拋出ConcurrentModifiedException異常。ExchangerProducer將填充這個List,而後將這個滿列表跟ExchangerConsumer的空列表交換。交換以後,ExchangerProducer能夠繼續的生產Fat對象,而ExchangerConsumer則開始使用滿列表中的對象。由於有了Exchanger,填充一個列表和消費另外一個列表便同時發生了。
code

相關文章
相關標籤/搜索