在java多線程通訊中管道通訊是一種重要的通訊方式,在java中咱們經過配套使用管道輸出流PipedOutputStream和管道輸入流PipedInputStream完成線程間通訊。多線程管道通訊的主要流程是在一個線程中向PipedOutputStream寫入數據,這些數據會自動傳送到對應的管道輸入流PipedInputStream中,其餘線程經過讀取PipeInputStream中緩衝的數據實現多線程間通訊。java
PipeInputStream是管道輸入流,繼承自InputStream,鏈接到一個管道輸出流PipedOutputStream。能夠緩存鏈接的管道輸出流PipedOutputStream寫入的字節數據。一般在一個線程使用PipedInputStream讀取數據,在其餘線程使用PipedOutputStream寫入字節數據。不推薦在一個線程中使用PipedInputStream和PipedOutputStream可能會在線程中形成死鎖,管道輸入流包含一個緩衝區buff用於讀操做和寫操做。數組
1)成員變量緩存
package java.io; public class PipedInputStream extends InputStream { //管道輸出流是否關閉標記 boolean closedByWriter = false; //管道輸入流是否標記 volatile boolean closedByReader = false; //管道輸入流與管道輸出流是否創建鏈接 boolean connected = false; //讀取「管道」數據即PipedInputStream線程 Thread readSide; //向管道寫入數據即PipedOutputStream線程 Thread writeSide; //管道默認大小 private static final int DEFAULT_PIPE_SIZE = 1024; protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE; // 緩衝區 protected byte buffer[]; //下一個寫入字節的位置 protected int in = -1; //下一個讀取字節的位置。若out==in說明管道輸出流寫入的數據所有被讀取 protected int out = 0; }
2)構造方法數據結構
public PipedInputStream(PipedOutputStream src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } public PipedInputStream() { initPipe(DEFAULT_PIPE_SIZE); }
從源碼咱們能夠知道管道輸入流PipedInputStream構造方法作了兩件事,按照指定大小pipeSize初始化緩衝區,若是還指定了關聯的管道輸出流PipedOutputStream,那麼調用connect方法鏈接它。若是指定的pipeSize小於等於0那麼拋出IllegalArgumentException異常,若是當前的管道輸入流已經和指定的管道輸出流創建鏈接那麼拋出IOException異常提示鏈接已經創建。多線程
3)其餘成員方法ide
//根據指定大小pipeSize初始化緩衝區 private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe Size <= 0"); } buffer = new byte[pipeSize]; } //綁定管道輸入流與管道輸出流 public void connect(PipedOutputStream src) throws IOException { src.connect(this); } //接收綁定的管道輸出流PipedOutputStream的write(int b)方法寫入的int類型數據 protected synchronized void receive(int b) throws IOException { //檢查管道狀態 checkStateForReceive(); //獲取「寫入管道「即PipedOutputStream的線程 writeSide = Thread.currentThread(); //若管道輸出流寫入的數據所有被讀取則等待 if (in == out) awaitSpace(); if (in < 0) { in = 0; out = 0; } //將讀取的字節b保存到緩衝區 buffer[in++] = (byte)(b & 0xFF); //若是當前寫入的字節數大於緩衝區大小那麼從頭覆蓋以前寫入的字節數據 if (in >= buffer.length) { in = 0; } } //接收管道輸出流的write(byte b[],int off, int len)方法調用寫入的字節數組b synchronized void receive(byte b[], int off, int len) throws IOException { //檢查管道狀態 checkStateForReceive(); //獲取」寫入管道「線程 writeSide = Thread.currentThread(); //獲取寫入字節長度 int bytesToTransfer = len; //循環將字節數組b寫入管道輸入流內部緩衝數組buffer while (bytesToTransfer > 0) { //若寫入管道的字節長度in等於讀取字節長度長度out,則等待 if (in == out) awaitSpace(); int nextTransferAmount = 0; //若管道中讀取的字節數out小於寫入的字節數in,nextTransferAmount等於buffer.length-in if (out < in) { nextTransferAmount = buffer.length - in; } //若管道中寫入的字節數小於讀取的字節數, else if (in < out) { if (in == -1) { in = out = 0; nextTransferAmount = buffer.length - in; } else { nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); //將數據複製到緩衝區buffer中 System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount; off += nextTransferAmount; in += nextTransferAmount; //緩衝區溢出繼續寫入則覆蓋原有字節數據 if (in >= buffer.length) { in = 0; } } } //檢查管道狀態 private void checkStateForReceive() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } } //等待。若」寫入管道「的數據被所有讀取完,則喚醒」讀取管道「線程繼續讀取字節數據以讓緩衝區空出空間繼續寫入數據,等待 //1000ms private void awaitSpace() throws IOException { while (in == out) { checkStateForReceive(); /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } } //鏈接的管道輸出流關閉時被調用用於更新管道輸入流關閉狀態,並喚醒讀取管道線程 synchronized void receivedLast() { closedByWriter = true; notifyAll(); } //從管道輸入流更確切的說緩衝區buffer中讀取一個字節並轉化爲int值 public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; //若是寫入字節數小於0,且非寫入管道線程異常終止或者管道輸出流寫入結束正常關閉,那麼喚醒寫入管道等待1s while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } //從管道輸入流緩衝數組buffer中讀取字節數據並填入數組b中邏輯與read()相似 public synchronized int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } b[off] = (byte) c; int rlen = 1; while ((in >= 0) && (len > 1)) { int available; if (in > out) { available = Math.min((buffer.length - out), (in - out)); } else { available = buffer.length - out; } // A byte is read beforehand outside the loop if (available > (len - 1)) { available = len - 1; } System.arraycopy(buffer, out, b, off + rlen, available); out += available; rlen += available; len -= available; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } //返回可不受阻塞地今後輸入流中讀取的字節數,注意available返回是緩衝數組中的可讀字節長度 public synchronized int available() throws IOException { if(in < 0) return 0; else if(in == out) return buffer.length; else if (in > out) return in - out; else return in + buffer.length - out; } //關閉輸出流 public void close() throws IOException { closedByReader = true; synchronized (this) { in = -1; } }
PipedOutputStream是管道輸出流,繼承自OutputStream。經過與一個管道輸入流PipedInputStream創建鏈接往管道輸入流中寫入數據,其實質是往綁定的管道輸入流的內部緩存區中填入數據。PipedInputStream和PipedOutputStream主要用於線程間通訊,不建議在單線程中使用PipedInputStream和PipedOutputStream可能形成死鎖。函數
public class PipedOutputStream extends OutputStream { //綁定的管道輸入流對象 private PipedInputStream sink; }
1)構造函數oop
public PipedOutputStream(PipedInputStream snk) throws IOException { connect(snk); } public PipedOutputStream() { } public synchronized void connect(PipedInputStream snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
PipedOutputStream類內部定義了兩個構造函數,一個無參構造函數沒作啥,咱們分析下入參是管道輸入流對象引用的構造函數。由源碼可知與PipedInputStream構造函數作的同樣其實PipedInputStream帶參構造函數方法內部調用的也是本方法,參數判空,判斷是否此前是否已創建鏈接,內部管道輸入流對象引用指向指定對象,初始化對象的讀取寫入位置,設置鏈接狀態爲已鏈接。源碼分析
2)void write(byte b[], int off, int len)寫入字節數據this
public void write(byte b[], int off, int len) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } sink.receive(b, off, len); }
這個方法首先對當前綁定的管道輸入流對象進行判空,爲空拋出一個IO異常提示未創建管道鏈接,對寫入的字節數組b判空爲空拋出NullException異常,接着對待寫入字節數組b寫入起點off,寫入長度len進行範圍合法性校驗,校驗失敗拋出IndexOutOfBoundsException數組越界異常,若是寫入長度爲0直接返回,下面就是調用綁定的管道輸入流對象sink.receive(b, off, len)方法,源碼解析在前面關於PipedInputStream源碼的解析部分。從這邊咱們知道PipedOutputStream寫入數據其實就是調用綁定輸入流的receive方法往內部緩衝數組buffer中填入字節數據。
3)其餘成員方法
//寫入單個字節,實質就是往綁定的管道輸入流內部緩衝數組填入一個字節數據 public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } sink.receive(b); } //釋放輸入流對象鎖,繼續讀取緩衝區數據 public synchronized void flush() throws IOException { if (sink != null) { synchronized (sink) { sink.notifyAll(); } } } //調用管道輸入流的receiveLast關閉通道 public void close() throws IOException { if (sink != null) { sink.receivedLast(); } } }