PipedReader繼承自Reader類,是字符管道輸入流,它的功能與管道字節輸出流PipedInputStream極爲類似,經過綁定一個管道輸出流PipedWriter實現了相似管道的功能,實現線程間通訊,一個線程在字符管道輸出流中寫入數據,基於管道特性這些數據實際會傳送到(其實保存可能更恰當)它鏈接的字符管道輸入流的內置字符緩衝區buffer中,其餘線程從管道字符輸入流的字符緩衝區中讀取數據如此實現了多線程環境下的線程間通訊。java
package java.io; public class PipedReader extends Reader { //鏈接的管道輸出流的關閉狀態 boolean closedByWriter = false; //鏈接的管道輸入流的關閉狀態 boolean closedByReader = false; //當前字符管道輸入流的鏈接狀態 boolean connected = false; //從管道中讀取數據的線程 Thread readSide; //向管道中寫入數據的線程 Thread writeSide; /** * 內置緩衝區的默認大小 */ private static final int DEFAULT_PIPE_SIZE = 1024; /** * 存儲字符數據的內部緩衝區 */ char buffer[]; /** * 緩衝區的位置,標識緩衝區數組存儲下一次從管道輸出流PipedWriter讀取的字符的位置,若in<0標識緩衝區目前是空的, * 還未放入數據,若in==full則表示緩衝區是滿的 */ int in = -1; /** * 緩衝區位置,用於標識管道輸入流PipedReader下一次從緩衝區讀取字符的位置 */ int out = 0; }
/** * 構造函數,指定了鏈接的字符管道輸出流,並使用默認緩衝區容量初始化內部緩衝區數組 */ public PipedReader(PipedWriter src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } /** * 構造函數,指定了鏈接的字符管道輸出流PipedWrite和內部字符緩衝區buffer的容量,基於參數與字符管道輸出流創建鏈接 * 造成管道,並初始化內部緩衝區 */ public PipedReader(PipedWriter src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } /** * 無參構造函數,建立一個字符管道輸入流,但還未與PipedWriter創建鏈接,在使用以前必須與指定字符管道輸出流 * PipedWriter創建鏈接 */ public PipedReader() { initPipe(DEFAULT_PIPE_SIZE); }
咱們選擇第二個構造函數PipedReader(PipedWriter src, int pipeSize)分析下在調用構造函數實例化PipedWriter對象的時候作了什麼。在該構造函數內部首先調用了initPipe方法咱們首先進入該方法源碼進行分析:數組
private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe size <= 0"); } buffer = new char[pipeSize]; }
沒作啥事無非是對指定的緩衝區容量參數pipeSize作了合法性校驗,在經過校驗後爲內部緩衝區分配了一個大小爲pipeSize的緩衝區。咱們接着往下分析,接着方法構造函數調用了connect方法,進入該方法數據結構
public void connect(PipedWriter src) throws IOException { src.connect(this); }
該方法調用了PipedWriter的src方法,進入該方法源碼多線程
public synchronized void connect(PipedReader snk) throws IOException { if (snk == null) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } else if (snk.closedByReader || closed) { throw new IOException("Pipe closed"); } sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; }
這裏方法主要作了如下事情:判斷方法傳入的字符管道輸入流對象是否爲空若是爲空拋出空指針異常、判斷字符輸入流對象是否已經創建鏈接若是已創建鏈接拋出IO異常提示「已經創建鏈接「、判斷管道兩端的字符流是否有任意一端已經關閉,若是有則拋出IO異常提示「管道已關閉「,若經過上述校驗則經過指定當前字符管道輸入流鏈接的字符管道輸出流對象內部成員變量sink(鏈接的管道輸入流),初始化字符管道輸入流的讀寫位置和鏈接狀態,完成管道的鏈接。ide
到這裏咱們能夠總結PipedReader的構造函數主要作了兩件事:1.初始化內部字符緩衝區;2基於方法傳入的管道字符輸出流對象PipedWriter創建管道鏈接函數
/** * 接收一個字符,將其插入到字符緩衝區數組,若是當前沒有可用的輸入,方法會阻塞 * 該方法一般由鏈接的字符管道輸出流寫入方法調用將字符數據寫入到當前緩衝區 */ synchronized void receive(int c) throws IOException { //檢查當前管道的鏈接狀態 if (!connected) { throw new IOException("Pipe not connected"); //判斷管道任意一端是否已關閉 } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); //判斷當前讀取字符數據線程即便用字符管道輸出流讀取數據的線程狀態 } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } //獲取調用方即管道寫入端(字符管道輸出流)線程 writeSide = Thread.currentThread(); //若是寫入管道的數據已經讀取完 while (in == out) { //若管道讀取端線程不活躍,拋出IO異常提示管道已損壞 if ((readSide != null) && !readSide.isAlive()) { throw new IOException("Pipe broken"); } 喚醒其餘線程繼續讀取管道字符數據 notifyAll(); try { //等待釋放鎖 wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } //若in<0重置字符寫入位置in和讀取位置out if (in < 0) { in = 0; out = 0; } //在緩衝區對應位置插入字符c buffer[in++] = (char) c; //若插入字符後寫入位置超出字符緩衝區的限制,則會重置寫入位置in到緩衝區數組開始位置,覆蓋緩衝區原有的數據 if (in >= buffer.length) { in = 0; } }
receive方法是字符管道輸入流PipedReader用於接收字符管道輸出流PipedWriter寫入的數據,也是PipedReader與PipedWrite組合使用實現線程間通訊的核心方法。管道寫入端線程經過輸出流PipedWriter對象調用本方法往PipedReader的緩衝區寫入字符數據,其餘管道讀取端線程經過讀取該緩衝區數據以此實現線程間的數據傳遞和通訊。能夠參照本方法分析本類的void receive(char c[], int off, int len)方法源碼分析
read()方法用於從當前管道讀取下一個字符,若是當前管道沒有字符數據可讀取即已經到達字符管道輸入流的末尾,那麼返回-1,在輸入數據能夠,到達流末尾或者拋出異常前,該方法會一直阻塞this
public synchronized int read() throws IOException { //檢查鏈接狀態 if (!connected) { throw new IOException("Pipe not connected"); } //檢查管道鏈接的字符輸入流流狀態 else if (closedByReader) { throw new IOException("Pipe closed"); } //檢查管道寫入端狀態(包括使用字符輸出流線程和字符輸出流狀態) else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } //獲取當前管道使用字符輸入流的線程 readSide = Thread.currentThread(); int trials = 2; //當前緩衝區無數據 while (in < 0) { //若鏈接的字符管道輸出流已經關閉,返回-1讀取結束 if (closedByWriter) { /* closed by writer, return EOF */ return -1; } //寫入端線程未處於活躍狀態,管道損壞 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /** 喚醒管道等待寫入的線程 **/ notifyAll(); try { //阻塞等待 wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } //當前緩衝區有數據 //從緩衝區獲取單個字符,並更新讀取位置out指向下一次讀取位置 int ret = buffer[out++]; //若下一次讀取位置超出緩衝區限制則重置到緩衝區開頭,參照接收字符數據超出緩衝區的處理 if (out >= buffer.length) { out = 0; } //當前緩衝區數據已所有讀取完 if (in == out) { /* now empty */ in = -1; } return ret; }
read()方法的邏輯較爲簡單,方法開始首先基於管道、管道鏈接的字符輸入流、管道鏈接的字符輸出流三者的狀態信息判斷,若是狀態不符結束拋出IO異常,經過上述檢測以後若讀取的緩衝區存儲的字符數據爲空,那麼判斷是否是管道寫入端使用PipedWrite寫入字符數據所在的線程關閉了或者線程未處於活躍狀態,若是排除了上述兩種狀況那麼當前讀取線程阻塞等待,喚醒其餘等待寫入的線程往緩衝區寫入字符數據。接着阻塞直到當緩衝區存在可讀字符數據時,從緩衝區讀取單個字符並返回。spa
/** * 將字符數組c從下標off開始最多len個字符讀入當前字符管道輸入流的內部緩衝區。該方法將一直阻塞直到字符數據所有寫入 * 緩衝區,或者發生異常。 */ synchronized void receive(char c[], int off, int len) throws IOException { while (--len >= 0) { receive(c[off++]); } } /** * 通知全部等待線程最後一個字符已接收,一般用於管道字符輸出流關閉通知寫入結束 */ synchronized void receivedLast() { closedByWriter = true; notifyAll(); } /** * 將最多len個字符從管道輸入流讀取到方法指定字符數組cbuf,從off下標出開始填入讀取字符 * 若是提早到達輸入流末尾(即緩衝區暫時沒有那麼多字符數據可讀取)那麼讀取的字符數可能小於len * 在檢測到達流末尾、拋出異常或者至少有一個字符可讀以前該方法將一直阻塞 */ public synchronized int read(char cbuf[], int off, int len) throws IOException { //檢測鏈接狀態 if (!connected) { throw new IOException("Pipe not connected"); } //管道字符輸入流是否關閉 else if (closedByReader) { throw new IOException("Pipe closed"); } //管道輸出流以及使用它進行寫入的線程狀態判斷 else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } //方法寫入位置參數off的範圍合法性校驗 if ((off < 0) || (off > cbuf.length) || (len < 0) || ((off + len) > cbuf.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* 讀取單個字符可能會阻塞 */ int c = read(); if (c < 0) { return -1; } cbuf[off] = (char)c; int rlen = 1; //循環讀取len個字符,直到流末尾或者讀取結束 while ((in >= 0) && (--len > 0)) { cbuf[off + rlen] = buffer[out++]; rlen++; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } /** * 返回當前字符管道輸入流是否已準備好被讀取。當內部緩衝區字符數據不爲空則該字符管道輸入流已準備好被讀取 * 當管道未鏈接、損壞、關閉方法將拋出IO異常 */ public synchronized boolean ready() throws IOException { //檢測管道鏈接狀態 if (!connected) { throw new IOException("Pipe not connected"); } //當前管道字符輸出流關閉 else if (closedByReader) { throw new IOException("Pipe closed"); } //管道字符輸出流已關閉或所在線程不活躍 else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } //寫入緩衝區數據爲空 if (in < 0) { return false; } else { return true; } } /** * 關閉當前管道字符輸入流,釋放相關係統資源,其實就是更新輸入流狀態和管道寫入位置 */ public void close() throws IOException { in = -1; closedByReader = true; }