java IO筆記(PipedInputStream/PipedOutputStream)

在上一篇筆記中講述了java io 中的文件(file)以及如何用文件流來對文件進行讀寫操做,本篇則要講述的是java IO中的管道流。java

java IO中的管道流可使得同一進程中的不一樣線程進行通訊,若是不明白進程和線程的區別的話,能夠去網上搜搜資料,能夠看作提供同一jvm的通訊能力。在java IO中管道的建立須要經過PipedInputStream和PipedOutputStream兩個類,能夠經過二者的構造方法進行互相關聯也能夠經過其中的connect方法進行關聯。數組

下面將用一個最簡單的例子來代表其功能。緩存

 

package pipedIO;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Date;

public class PipedIO {

	private PipedInputStream pis = new PipedInputStream();
	private PipedOutputStream pos = new PipedOutputStream();

	public static void main(String[] args) throws IOException {
		PipedIO pipedIO = new PipedIO();
		pipedIO.initThread();
	}

	private void initThread() throws IOException {
		pos.connect(pis);
		Thread input = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while (true) {
						String time = new Date().toString();
						pos.write(time.getBytes());
						System.out.println("輸出流完成一次數據寫出,數據爲"+time);
						Thread.sleep(1000);
					}
				} catch (IOException | InterruptedException e) {
					e.printStackTrace();
				}
			}
		});

		Thread output = new Thread(new Runnable() {
			@Override
			public void run() {
				byte[] temp = new byte[1024];
				try {
					int len;
					while ((len = pis.read(temp)) != -1) {
						System.out.println("輸入流完成一次數據寫入,數據爲:"+new String(temp, 0, len));
						Thread.sleep(1000);
					}
				} catch (IOException | InterruptedException e) {
					e.printStackTrace();
				}
			}
		});

		input.start();
		output.start();
	}

}

執行上述代碼,能夠看到以下打印:jvm

 

從控制檯輸出能夠看出,兩個線程之間完成了通訊,看上去十分簡單。但管道流真正的使用時,還須要注意一些事項。下面說說管道流的工做原理吧。ide

PipedInputStream.java工具

 

package java.io;

public class PipedInputStream extends InputStream {
    boolean closedByWriter = false;
    volatile boolean closedByReader = false;
    boolean connected = false;

        /* REMIND: identification of the read and write sides needs to be
           more sophisticated.  Either using thread groups (but what about
           pipes within a thread?) or using finalization (but it may be a
           long time until the next GC). */
    Thread readSide;
    Thread writeSide;

    private static final int DEFAULT_PIPE_SIZE = 1024;

    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    protected byte buffer[];

    protected int in = -1;

    protected int out = 0;

    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }

    public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {
         initPipe(pipeSize);
         connect(src);
    }

    
    public PipedInputStream() {
        initPipe(DEFAULT_PIPE_SIZE);
    }

    public PipedInputStream(int pipeSize) {
        initPipe(pipeSize);
    }

    private void initPipe(int pipeSize) {
         if (pipeSize <= 0) {
            throw new IllegalArgumentException("Pipe Size <= 0");
         }
         buffer = new byte[pipeSize];
    }

    public void connect(PipedOutputStream src) throws IOException {
        src.connect(this);
    }

    protected synchronized void receive(int b) throws IOException {
        checkStateForReceive();
        writeSide = Thread.currentThread();
        if (in == out)
            awaitSpace();
        if (in < 0) {
            in = 0;
            out = 0;
        }
        buffer[in++] = (byte)(b & 0xFF);
        if (in >= buffer.length) {
            in = 0;
        }
    }

    synchronized void receive(byte b[], int off, int len)  throws IOException {
        checkStateForReceive();
        writeSide = Thread.currentThread();
        int bytesToTransfer = len;
        while (bytesToTransfer > 0) {
            if (in == out)
                awaitSpace();
            int nextTransferAmount = 0;
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) {
                in = 0;
            }
        }
    }

    private void checkStateForReceive() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }
    }

    private void awaitSpace() throws IOException {
        while (in == out) {
            checkStateForReceive();

            /* full: kick any waiting readers */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }

    synchronized void receivedLast() {
        closedByWriter = true;
        notifyAll();
    }

   
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        int trials = 2;
        while (in < 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }

        return ret;
    }

   
    public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        /* possibly wait on the first character */
        int c = read();
        if (c < 0) {
            return -1;
        }
        b[off] = (byte) c;
        int rlen = 1;
        while ((in >= 0) && (len > 1)) {

            int available;

            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }

            // A byte is read beforehand outside the loop
            if (available > (len - 1)) {
                available = len - 1;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;
            rlen += available;
            len -= available;

            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                /* now empty */
                in = -1;
            }
        }
        return rlen;
    }

  
    public synchronized int available() throws IOException {
        if(in < 0)
            return 0;
        else if(in == out)
            return buffer.length;
        else if (in > out)
            return in - out;
        else
            return in + buffer.length - out;
    }

    public void close()  throws IOException {
        closedByReader = true;
        synchronized (this) {
            in = -1;
        }
    }
}

PipedOutputStream.javaoop

 

package java.io;

import java.io.*;


public class PipedOutputStream extends OutputStream {

    private PipedInputStream sink;

    public PipedOutputStream(PipedInputStream snk)  throws IOException {
        connect(snk);
    }

   
    public PipedOutputStream() {
    }

    public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
            throw new IOException("Already connected");
        }
        sink = snk;
        snk.in = -1;
        snk.out = 0;
        snk.connected = true;
    }

    public void write(int b)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(b);
    }

    public void write(byte b[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0) ||
                   ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }
        sink.receive(b, off, len);
    }

    public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
                sink.notifyAll();
            }
        }
    }

    
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }
}

 

上面貼出了PipedInputStream和PipedOutputStream的源碼,從源碼中不難看出,在PipedOutputStream中封裝了一個PipedInputStream,當調用兩個類中的的connect方法時,最終都回到了PipedOutputStream中的connect方法,使得兩個流進行鏈接關係。優化

咱們能夠看出不管是PipedInputStream的read方法,仍是PipedOutputStream中的write方法,數據都是存放在PipedInputStream中的一個byte數組的緩存中的,該緩存默認大小爲1024字節。那麼這就有一個問題了,當PipedInputStream中的緩存區已經裝滿的時候,必需要等到讀取PipedInputStream數據緩存並清除相對應的數據時,才能繼續往緩存中寫入,若是一直等不到,將再次產生相似死鎖的狀況。this

所以向PipedOutputStream中寫入數據的線程不該是負責從對應PipedInputStream中讀取數據的惟一線程,下面舉例說明:線程

 

package pipedIO;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class PipedIO1 {

	private PipedInputStream pis = new PipedInputStream();
	private PipedOutputStream pos = new PipedOutputStream();

	public static void main(String[] args) throws IOException {
		PipedIO1 pipedIO1 = new PipedIO1();
		pipedIO1.initThread();
	}

	private void initThread() throws IOException {
		pos.connect(pis);
		Thread input = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					byte[] inBytes = new byte[500];
					byte[] outBytes = new byte[1000];
					pos.write(inBytes);
					System.out.println("輸出流完成一次數據寫出");
					int len = pis.read(outBytes);
					System.out.println("輸入流完成一次數據讀出");
					while (len != -1) {
						pos.write(outBytes);
						System.out.println("輸出流完成一次數據寫出");
						len = pis.read(inBytes);
						System.out.println("輸入流完成一次數據讀出");
						Thread.sleep(1000);
					}
				} catch (IOException | InterruptedException e) {
					e.printStackTrace();
					System.out.println("there are some mistakes");
				}
			}
		});
		input.start();
	}

}

 

執行上述代碼後,控制檯能夠看到以下打印:

上面的代碼模擬了PipedInputStream和PipedOutputStream在同一線程中同時工做的狀況,PipedOutputStream每次向buffer中寫入1000字節的數據,PipedInputStream每次向buffer中度卻500字節的數據,沒執行一次讀寫操做,buffer中都會剩餘500字節的數據未讀取,由於沒有調用PipedInputStream中的void initPipe(int pipeSize)方法,因此buffer默認的大小爲1024字節,當執行完第二次讀寫操做時,緩衝區只剩餘24字節的空間,並不足夠再一次寫入1000字節的數據了,因此此時PipedInputStream的write操做就會阻塞等待有人從buffer中讀取並清空緩存區。然而由於讀寫在同一線程中,PipedInputStream的read操做又在等待PipedOutputStream的write操做完成後,再執行read,從而形成相似死鎖的狀況,從控制檯能夠看出,程序卡在了第三次讀寫時。要解決這個狀況也很簡單,只要讀寫操做不在一個線程中就能夠了,能夠參考第一個例子。

除了上面避免進入死鎖的狀況還要注意,在進行數據傳輸的時候,讀寫線程是否一直保存存活(isAlive),不管是任何一個線程不在活躍,此時進行讀寫操做就頗有可能拋出IOEception。

固然你也能夠本身寫一個工具類,優化一下,好比使用ByteArrayOutputStream的自動擴充緩存的特色來避免管道流由於緩存區空間不夠而形成的死鎖狀況。只有最適合本身需求的纔是最棒的,不是嗎。

除了管道以外,java中不一樣線程之間還有不少的通訊方式,若是須要在線程之間傳遞字節數據,管道流就是要一個不錯的選擇,儘管大部分時候通訊之間可能直接傳遞的是對象而不是簡單的字節數據。

相關文章
相關標籤/搜索