TIJ -- 任務間使用管道進行輸入/輸出

  1. 經過輸入/輸出在線程間進行通訊一般頗有用。提供線程功能的類庫以「管道」的形式對線程間的輸入/輸出提供了支持。它們在Java輸入/輸出類庫中的對應物就是PipedWriter類(容許任務向管道寫)和PipedReader類(容許不一樣任務從同一個管道中讀取)。這個模型能夠當作是「生產者 - 消費者」問題的變體,這裏的管道就是一個封裝好的解決方案。管道基本上是一個阻塞隊列,存在於多個引入BlockingQueue以前的Java版本中。html

  2. 下面是一個簡單例子,兩個任務使用一個管道進行通訊:java

    Class : c++

package lime.thinkingInJava._021._005._005;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author : Lime
 * @Description :
 * @Remark :
 */
class Sender implements Runnable{
    private Random rand = new Random(47);
    private PipedWriter out = new PipedWriter();
    public PipedWriter getPipedWriter(){
        return out;
    }
    public void run(){
        try{
            while (true){
                for(char c = 'A'; c <= 'z';c++){
                    out.write(c);
                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
                }
            }
        } catch (InterruptedException e) {
            System.out.println(e + " Sender sleep Interrupted");
        } catch (IOException e) {
            System.out.println(e + " Sender write exception");
        }
    }
}
class Receiver implements Runnable{
    private PipedReader in;
    public Receiver(Sender sender) throws IOException {
        in = new PipedReader(sender.getPipedWriter());
    }
    public void run(){
        try{
            while (true){
                //Blocks until characters are there;
                System.out.println("Read : " + (char)in.read());
            }
        } catch (IOException e) {
            System.out.println(e + " Receiver read exception");
        }
    }
}
public class PipedIO {
    public static void main(String[] args) throws IOException, InterruptedException {
        Sender sender = new Sender();
        Receiver receiver = new Receiver(sender);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(sender);
        exec.execute(receiver);
        TimeUnit.SECONDS.sleep(4);
        exec.shutdownNow();
    }
}

  3. Console : dom

Read : A
Read : B
Read : C
Read : D
Read : E
Read : F
Read : G
Read : H
Read : I
Read : J
Read : K
Read : L
Read : M
Read : N
Read : O
Read : P
Read : Q
java.lang.InterruptedException: sleep interrupted Sender sleep Interrupted
java.io.InterruptedIOException Receiver read exception

  4. Sender和Receiver表明了須要互相通訊兩個任務。Sender建立了一個PipedWriter,它是一個單獨的對象;可是對於Receiver,PipedReader的創建必須在構造器中與一個PipedWriter相關聯。Sender把數據放進Writer,而後休眠一段時間(隨機數)。然而,Receiver沒有Sleep()和wait()。但當它調用read()時,若是沒有更多的數據,管道將自動阻塞。ide

  注意sender和receiver是在main()中啓動的,即對象構造完全完畢之後。若是你啓動了一個沒有構造完畢的對象,在不一樣的平臺上管道可能會產生不一致的行爲(注意,BlockingQueue使用起來更加健壯而容易)。源碼分析

  在shutdownNow()被調用時,能夠看到PipedReader與普通I/O之間最重要的差別 ------ PipedReader是可中斷的。若是你將in.read()調用修改成System.in.read(),那麼interrupt()將不能打斷read()調用。this

  5. PipedWriter的wirte() 源碼解析spa

    /**
     * Writes the specified <code>char</code> to the piped output stream.
     * If a thread was reading data characters from the connected piped input
     * stream, but the thread is no longer alive, then an
     * <code>IOException</code> is thrown.
     * <p>
     * Implements the <code>write</code> method of <code>Writer</code>.
     *
     * @param      c   the <code>char</code> to be written.
     * @exception  IOException  if the pipe is
     *          <a href=PipedOutputStream.html#BROKEN> <code>broken</code></a>,
     *          {@link #connect(java.io.PipedReader) unconnected}, closed
     *          or an I/O error occurs.
     */
    public void write(int c)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        //調用PipedReader的receive(c)方法,將c放入PipedReader的char buffer[]中
        sink.receive(c);
    }
    /**
     * Receives a char of data. This method will block if no input is
     * available.
     */
    synchronized void receive(int c) throws IOException {
        if (!connected) {
            //判斷兩個I/O流鏈接狀態
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            //判斷兩個I/O流開啓狀態
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            //判斷輸入流線程是否存活
            throw new IOException("Read end dead");
        }

        //獲取輸出流線程
        writeSide = Thread.currentThread();
        while (in == out) {
            //判斷char buffer[] 是否已滿
            if ((readSide != null) && !readSide.isAlive()) {
                //判斷輸入流狀態是否存活
                throw new IOException("Pipe broken");
            }
            /* full: kick any waiting readers */
            // 間隔1000毫秒喚醒寫線程 -- start
            notifyAll();
            try {
                //阻塞1000毫秒
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
            // 間隔1000毫秒喚醒寫線程 -- end
        }
        if (in < 0) {
            //判斷char buffer[] 爲空
            in = 0;
            out = 0;
        }
        buffer[in++] = (char) c;
        if (in >= buffer.length) {
            in = 0;
        }
    }

  6. PipedRead的read() 源碼解析.net

    /**
     * Reads the next character of data from this piped stream.
     * If no character is available because the end of the stream
     * has been reached, the value <code>-1</code> is returned.
     * This method blocks until input data is available, the end of
     * the stream is detected, or an exception is thrown.
     *
     * @return     the next character of data, or <code>-1</code> if the end of the
     *             stream is reached.
     * @exception  IOException  if the pipe is
     *          <a href=PipedInputStream.html#BROKEN> <code>broken</code></a>,
     *          {@link #connect(java.io.PipedWriter) unconnected}, closed,
     *          or an I/O error occurs.
     */
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        //設定超時重連次數
        int trials = 2;
        while (in < 0) {
            //判斷char buffer[] 爲空
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            // 間隔1000毫秒喚醒讀線程 -- start
            notifyAll();
            try {
                //阻塞1000毫秒
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
            // 間隔1000毫秒喚醒讀線程 -- start
        }
        int ret = buffer[out++];
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }
        return ret;
    }

  7. 鳴謝線程

    PipedWriter PipedReader 源碼分析

 

 

 

 

 

啦啦啦

相關文章
相關標籤/搜索