在上一篇筆記中講述了java io 中的文件(file)以及如何用文件流來對文件進行讀寫操做,本篇則要講述的是java IO中的管道流。java
java IO中的管道流可使得同一進程中的不一樣線程進行通訊,若是不明白進程和線程的區別的話,能夠去網上搜搜資料,能夠看作提供同一jvm的通訊能力。在java IO中管道的建立須要經過PipedInputStream和PipedOutputStream兩個類,能夠經過二者的構造方法進行互相關聯也能夠經過其中的connect方法進行關聯。數組
package pipedIO; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.Date; public class PipedIO { private PipedInputStream pis = new PipedInputStream(); private PipedOutputStream pos = new PipedOutputStream(); public static void main(String[] args) throws IOException { PipedIO pipedIO = new PipedIO(); pipedIO.initThread(); } private void initThread() throws IOException { pos.connect(pis); Thread input = new Thread(new Runnable() { @Override public void run() { try { while (true) { String time = new Date().toString(); pos.write(time.getBytes()); System.out.println("輸出流完成一次數據寫出,數據爲"+time); Thread.sleep(1000); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }); Thread output = new Thread(new Runnable() { @Override public void run() { byte[] temp = new byte[1024]; try { int len; while ((len = pis.read(temp)) != -1) { System.out.println("輸入流完成一次數據寫入,數據爲:"+new String(temp, 0, len)); Thread.sleep(1000); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }); input.start(); output.start(); } }
package java.io; public class PipedInputStream extends InputStream { boolean closedByWriter = false; volatile boolean closedByReader = false; boolean connected = false; /* REMIND: identification of the read and write sides needs to be more sophisticated. Either using thread groups (but what about pipes within a thread?) or using finalization (but it may be a long time until the next GC). */ Thread readSide; Thread writeSide; private static final int DEFAULT_PIPE_SIZE = 1024; protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; protected byte buffer[]; protected int in = -1; protected int out = 0; public PipedInputStream(PipedOutputStream src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } public PipedInputStream() { initPipe(DEFAULT_PIPE_SIZE); } public PipedInputStream(int pipeSize) { initPipe(pipeSize); } private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; } public void connect(PipedOutputStream src) throws IOException { src.connect(this); } protected synchronized void receive(int b) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); if (in == out) awaitSpace(); if (in < 0) { in = 0; out = 0; } buffer[in++] = (byte)(b & 0xFF); if (in >= buffer.length) { in = 0; } } synchronized void receive(byte b[], int off, int len) throws IOException { checkStateForReceive(); writeSide = Thread.currentThread(); int bytesToTransfer = len; while (bytesToTransfer > 0) { if (in == out) awaitSpace(); int nextTransferAmount = 0; if (out < in) { nextTransferAmount = buffer.length - in; } else if (in < out) { if (in == -1) { in = out = 0; nextTransferAmount = buffer.length - in; } else { nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount; off += nextTransferAmount; in += nextTransferAmount; if (in >= buffer.length) { in = 0; } } } private void checkStateForReceive() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } } private void awaitSpace() throws IOException { while (in == out) { checkStateForReceive(); /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } } synchronized void receivedLast() { closedByWriter = true; notifyAll(); } public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } public synchronized int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; while ((in >= 0) && (len > 1)) { int available; if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { available = buffer.length - out; } // A byte is read beforehand outside the loop if (available > (len - 1)) { available = len - 1; } System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } public synchronized int available() throws IOException { if(in < 0) return 0; else if(in == out) return buffer.length; else if (in > out) return in - out; else return in + buffer.length - out; } public void close() throws IOException { closedByReader = true; synchronized (this) { in = -1; } } }
package java.io; import java.io.*; public class PipedOutputStream extends OutputStream { private PipedInputStream sink; public PipedOutputStream(PipedInputStream snk) throws IOException { connect(snk); } public PipedOutputStream() { } public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; } public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } sink.receive(b); } public void write(byte b[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } sink.receive(b, off, len); } public synchronized void flush() throws IOException { if (sink != null) { synchronized (sink) { sink.notifyAll(); } } } public void close() throws IOException { if (sink != null) { sink.receivedLast(); } } }
package pipedIO; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class PipedIO1 { private PipedInputStream pis = new PipedInputStream(); private PipedOutputStream pos = new PipedOutputStream(); public static void main(String[] args) throws IOException { PipedIO1 pipedIO1 = new PipedIO1(); pipedIO1.initThread(); } private void initThread() throws IOException { pos.connect(pis); Thread input = new Thread(new Runnable() { @Override public void run() { try { byte[] inBytes = new byte[500]; byte[] outBytes = new byte[1000]; pos.write(inBytes); System.out.println("輸出流完成一次數據寫出"); int len = pis.read(outBytes); System.out.println("輸入流完成一次數據讀出"); while (len != -1) { pos.write(outBytes); System.out.println("輸出流完成一次數據寫出"); len = pis.read(inBytes); System.out.println("輸入流完成一次數據讀出"); Thread.sleep(1000); } } catch (IOException | InterruptedException e) { e.printStackTrace(); System.out.println("there are some mistakes"); } } }); input.start(); } }
上面的代碼模擬了PipedInputStream和PipedOutputStream在同一線程中同時工做的狀況,PipedOutputStream每次向buffer中寫入1000字節的數據,PipedInputStream每次向buffer中度卻500字節的數據,沒執行一次讀寫操做,buffer中都會剩餘500字節的數據未讀取,由於沒有調用PipedInputStream中的void initPipe(int pipeSize)方法,因此buffer默認的大小爲1024字節,當執行完第二次讀寫操做時,緩衝區只剩餘24字節的空間,並不足夠再一次寫入1000字節的數據了,因此此時PipedInputStream的write操做就會阻塞等待有人從buffer中讀取並清空緩存區。然而由於讀寫在同一線程中,PipedInputStream的read操做又在等待PipedOutputStream的write操做完成後,再執行read,從而形成相似死鎖的狀況,從控制檯能夠看出,程序卡在了第三次讀寫時。要解決這個狀況也很簡單,只要讀寫操做不在一個線程中就能夠了,能夠參考第一個例子。