Java併發工具類(四):線程間交換數據的Exchanger

簡介

Exchanger(交換者)是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點兩個線程能夠交換彼此的數據。這兩個線程經過exchange方法交換數據, 若是第一個線程先執行exchange方法,它會一直等待第二個線程也執行exchange,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。java

Exchanger的應用場景

一、Exchanger能夠用於遺傳算法,遺傳算法裏須要選出兩我的做爲交配對象,這時候會交換兩人的數據,並使用交叉規則得出2個交配結果。
二、Exchanger也能夠用於校對工做。好比咱們須要將紙製銀流經過人工的方式錄入成電子銀行流水,爲了不錯誤,採用AB崗兩人進行錄入,錄入到Excel以後,系統須要加載這兩個Excel,並對這兩個Excel數據進行校對,看看是否錄入的一致。代碼以下:算法

 1 public class ExchangerTest {
 2 
 3     private static final Exchanger<String> exgr = new Exchanger<String>();
 4 
 5     private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
 6 
 7     public static void main(String[] args) {
 8 
 9         threadPool.execute(new Runnable() {
10             @Override
11             public void run() {
12                 try {
13                     String A = "銀行流水A";// A錄入銀行流水數據
14                     exgr.exchange(A);
15                 } catch (InterruptedException e) {
16                 }
17             }
18         });
19 
20         threadPool.execute(new Runnable() {
21             @Override
22             public void run() {
23                 try {
24                     String B = "銀行流水B";// B錄入銀行流水數據
25                     String A = exgr.exchange("B");
26                     System.out.println("A和B數據是否一致:" + A.equals(B) + ",A錄入的是:"
27                             + A + ",B錄入是:" + B);
28                 } catch (InterruptedException e) {
29                 }
30             }
31         });
32 
33         threadPool.shutdown();
34 
35     }
36 }

三、這個類在遇到相似生產者和消費者問題時,是很是有用的。來一個很是經典的併發問題:你有相同的數據buffer,一個或多個數據生產者,和一個或多個數據消費者。只是Exchange類只能同步2個線程,因此你只能在你的生產者和消費者問題中只有一個生產者和一個消費者時使用這個類。編程

在這個指南,你將學習如何使用 Exchanger 類來解決只有一個生產者和一個消費者的生產者和消費者問題。併發

按照這些步驟來實現下面的例子:ide

 1 package tool;
 2 import java.util.List;
 3 import java.util.concurrent.Exchanger;
 4 
 5 //1. 首先,從實現producer開始吧。建立一個類名爲Producer並必定實現 Runnable 接口。
 6 public class Producer implements Runnable {
 7 
 8 // 2. 聲明 List<String>對象,名爲 buffer。這是等等要被相互交換的數據類型。
 9 private List<String> buffer;
10 
11 // 3. 聲明 Exchanger<List<String>>; 對象,名爲exchanger。這個 exchanger 對象是用來同步producer和consumer的。
12 private final Exchanger<List<String>> exchanger;
13 
14 // 4. 實現類的構造函數,初始化這2個屬性。
15 public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
16 this.buffer = buffer;
17 this.exchanger = exchanger;
18 }
19 
20 // 5. 實現 run() 方法. 在方法內,實現10次交換。
21 @Override
22 public void run() {
23 int cycle = 1;
24 for (int i = 0; i < 10; i++) {            System.out.printf("Producer: Cycle %d\n", cycle);
25 
26 // 6. 在每次循環中,加10個字符串到buffer。
27 for (int j = 0; j <10; j++) {
28 String message = "Event " + ((i * 10) + j);
29 System.out.printf("Producer: %s\n", message);
30 buffer.add(message);
31 }
32 
33 // 7. 調用 exchange() 方法來與consumer交換數據。此方法可能會拋出InterruptedException 異常, 加上處理代碼。
34 try {
35 buffer = exchanger.exchange(buffer);
36 } catch (InterruptedException e) {
37 e.printStackTrace();
38 }
39 System.out.println("Producer: " + buffer.size());
40 cycle++;
41 }
42 }
43 }
 1 //8. 如今, 來實現consumer。建立一個類名爲Consumer並必定實現 Runnable 接口。
 2 package tool;
 3 import java.util.List;
 4 import java.util.concurrent.Exchanger;
 5 public class Consumer implements Runnable {
 6 
 7 // 9. 聲明名爲buffer的 List<String>對象。這個對象類型是用來相互交換的。
 8 private List<String> buffer;
 9 
10 // 10. 聲明一個名爲exchanger的 Exchanger<List<String>> 對象。用來同步 producer和consumer。
11 private final Exchanger<List<String>> exchanger;
12 
13 // 11. 實現類的構造函數,並初始化2個屬性。
14 public Consumer(List<String>buffer, Exchanger<List<String>> exchanger) {
15 this.buffer = buffer;
16 this.exchanger = exchanger;
17 }
18 
19 // 12. 實現 run() 方法。在方法內,實現10次交換。
20 @Override
21 public void run() {
22 int cycle = 1;
23 for (int i = 0; i < 10; i++) {
24 System.out.printf("Consumer: Cycle %d\n", cycle);
25 
26 // 13. 在每次循環,首先調用exchange()方法來與producer同步。Consumer須要消耗數據。此方法可能會拋出InterruptedException異常, 加上處理代碼。
27 try {
28 buffer = exchanger.exchange(buffer);
29 } catch (InterruptedException e) {                e.printStackTrace();
30 }
31 
32 // 14. 把producer發來的在buffer裏的10字符串寫到操控臺並從buffer內刪除,留空。System.out.println("Consumer: " + buffer.size());
33 for (int j = 0; j <10; j++) {
34 String message = buffer.get(0);
35 System.out.println("Consumer: " + message);
36 buffer.remove(0);
37 }
38 cycle++;
39 }
 1 //15.如今,實現例子的主類經過建立一個類,名爲Core並加入 main() 方法。
 2 package tool;
 3 import java.util.ArrayList;
 4 mport java.util.List;
 5 import java.util.concurrent.Exchanger;
 6 
 7 public class Core {
 8 public static void main(String[] args) {
 9 
10 // 16. 建立2個buffers。分別給producer和consumer使用.
11 List<String> buffer1 = new ArrayList<String>();
12 List<String> buffer2 = new ArrayList<String>();
13 
14 // 17. 建立Exchanger對象,用來同步producer和consumer。
15 Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
16 
17 // 18. 建立Producer對象和Consumer對象。
18 Producer producer = new Producer(buffer1, exchanger);
19 Consumer consumer = new Consumer(buffer2, exchanger);
20 
21 // 19. 建立線程來執行producer和consumer並開始線程。
22 Thread threadProducer = new Thread(producer);
23 Thread threadConsumer = new Thread(consumer); threadProducer.start();
24 threadConsumer.start();
25 }

消費者開始時是空白的buffer,而後調用Exchanger來與生產者同步。由於它須要數據來消耗。生產者也是從空白的buffer開始,而後建立10個字符串,保存到buffer,並使用exchanger與消費者同步。函數

在這兒,2個線程(生產者和消費者線程)都是在Exchanger裏並交換了數據類型,因此當消費者從exchange() 方法返回時,它有10個字符串在buffer內。當生產者從 exchange() 方法返回時,它有空白的buffer來從新寫入。這樣的操做會重複10遍。工具

如你執行例子,你會發現生產者和消費者是如何併發的執行任務和在每一個步驟它們是如何交換buffers的。與其餘同步工具同樣會發生這種狀況,第一個調用 exchange()方法會進入休眠直到其餘線程的達到。學習

其餘方法

若是兩個線程有一個沒有到達exchange方法,則會一直等待,若是擔憂有特殊狀況發生,避免一直等待,可使用exchange(V data, long time, TimeUnit unit)設置最大等待時長。this

--- V是聲明Phaser的參數種類(例子裏是 List)。 此線程會休眠直到另外一個線程到達並中斷它,或者特定的時間過去了。TimeUnit類有多種常量:DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 和 SECONDS。spa

 

參考:《Java併發編程的藝術》

相關文章
相關標籤/搜索