Java併發(五)任務間使用管道進行通訊

經過I/O在線程間進行通訊一般頗有用。提供線程功能的類庫以「管道」的形式對線程間的 I/O 提供了支持。它們在Java I/O 類庫中的對應物就是PipedWriter(容許任務向管道寫)和PipedReader(容許不一樣的任務從同一個管道中讀取)。這個模型能夠看作是「生產者-消費者」問題的變體,這裏的管道就是一個封裝好的解決方案。管道基本上是一個阻塞隊列, 存在於多個引入BlockingQueue以前的Java版本中。java

下面是一個簡單的例子,兩個任務使用一個管道進行通訊:c++

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 發送端
 */
class Sender implements Runnable {
    private Random rand = new Random(47);
    private PipedWriter writer = new PipedWriter();
    public PipedWriter getWriter() { return writer; }
    @Override
    public void run() {
        try {
            while(true) {
                for (char c = 'A'; c < 'z'; c++) {
                    writer.write(c);
                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
                }
            }
        } catch (IOException e) {
            System.out.println(e + " Sender write Exception");
        } catch (InterruptedException e) {
            System.out.println(e + " Sender sleep Interrupted");
        }
    }
}

/**
 * 接收端
 */
class Receiver implements Runnable {
    private PipedReader reader;
    public Receiver(Sender sender) throws IOException {
        reader = new PipedReader(sender.getWriter());
    }
    @Override
    public void run() {
        int count = 0;
        try {
            while(true) {
                //在讀取到內容以前,會一直阻塞
                char s = (char)reader.read();
                System.out.print("Read: " + s + ", ");
                if (++count % 5 == 0) {
                    System.out.println();
                }
            }
        } catch (IOException e) {
            System.out.println(e + " Receiver read Exception.");
        }
    }
}

public class PipedIO {
    public static void main(String[] args) throws Exception {
        Sender sender = new Sender();
        Receiver receiver = new Receiver(sender);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(sender);
        exec.execute(receiver);
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}

執行結果(可能的結果):dom

Read: A, Read: B, Read: C, Read: D, Read: E, 
Read: F, Read: G, Read: H, Read: I, Read: J, 
Read: K, Read: L, Read: M, Read: N, Read: O, 
Read: P, Read: Q, Read: R, Read: S, Read: T, 
Read: U, java.io.InterruptedIOException Receiver read Exception.
java.lang.InterruptedException: sleep interrupted Sender sleep Interrupted

Sender和Receiver表明了須要互相通訊的兩個任務。Sender建立了一個PipedWriter,它是一個單獨的對象;可是對於Receiver,PipedReader的創建必須在構造器中與一個PipedWriter相關聯。就是說,PipedReader與PipedWriter的構造能夠經過以下兩種方式:ide

//方式一:先構造PipedReader,再經過它構造PipedWriter。
PipedReader reader = new PipedReader();
PipedWriter writer = new PipedWriter(reader);

//方式二:先構造PipedWriter,再經過它構造PipedReader。
PipedWriter writer2 = new PipedWriter();
PipedReader reader2 = new PipedReader(writer2);

Sender把數據放進Writer,而後休眠一段時間(隨機數)。然而,Receiver沒有sleep()和wait。但當它調用read()時,若是沒有更多的數據,管道將自動阻塞線程

注意Sender和Receiver是在main()中啓動的,即對象構造完全完畢以後。若是你啓動了一個沒有構造完畢的對象,在不一樣的平臺上管道可能會產生不一致的行爲(注意,BlockingQueue使用起來更加健壯而容易)。code

在shutdownNow()被調用時,能夠看到PipedReader與普通I/O之間最重要的差別——PipedReader是能夠中斷的。若是你將reader.read()替換爲System.in.read(),那麼interrupt()將不能打斷read()調用。對象

相關文章
相關標籤/搜索