stream 類型 Readable, Writable, Duplex, Transform
Readable 能夠看作消息隊列的生產者。Readable 有兩種模式,流動模式和非流動模式。node
流動模式promise
流動模式用戶監聽 data 事件,能夠自行決定暫停和者恢復。讀如數據先寫入 buffer, 若是 buffer 值高於 highWaterMark
中止讀取。source => buffer => customer,舉個栗子,將一個大水缸 A 的水,引入另外一個水缸 B 。咱們在水缸 A 接一根管子,管子另外一頭接一水桶(緩存池),水桶另外一端接流出水管,流向 B, B 有一個開關。一旦水桶達到highWaterMark,水缸 A 將暫停向水桶中注水。緩存
非流動模式async
非流動模式下用戶須要自行讀取 buffer 的數據,讀取大小自行決定,故而也再也不須要 pause 和 resume, readable 事件觸發後便可自行調用 read(size) 讀取數據. readable 事件觸發的條件是有數據到達緩存池,因此須要在消費者自行加條件鎖,以避免消費不及時形成問題。post
原理和 Readable Stream 相似, 再也不贅述。ui
Stream 都自帶管子(pipeline),因此流均可以經過管子相互鏈接。你能夠將一個 Readable Stream 直接連到一個 Writable Stream 上。prototype
const readable = fs.createReadableStream('/tmp/input.txt'); const writable = fs.createWritableStream('/tmp/output.txt'); readable.pipe(writable);
Readable Stream 是生產者, Writable Stream 是消費者,不能本末倒置寫成writable.pipe(readable)
code
Duplex 讀寫流捆綁在一個主體上,即該類型 Stream 即有 Readable Stream 又有Writable Stream, 可是兩種流相互獨立互不關聯,至關於把兩個對象繼承在一個對象上了。orm
Transform 這個流頗有用,有時候你獲得一個Writable Stream,立刻要把Writable Stream轉換成 Readable Stream pipe 到另外一管道中去(上面說到了,Writable Stream 不能做爲生產者)。對象
Transform 須要實踐 Transform. prototype._transform 方法
const { Transform } = require('stream'); class TransformStream extends Transform { constructor(options) { super(options); } _transform (data, encoding, callback) { callback(null, data); } }
舉個栗子:從 upyun dump 圖片下來再上傳,下載獲得的是 WritableStream, 上傳須要的是 ReadableStream。若是不使用 transform 得先將 getFile 得結果保存,在建立一個 readable stream, 再有 readable stream pipe. 若是使用 Transform stream 將很簡單.
async function foo() { try { const transform = new TransformStream(); // 不能在這裏 await Transform 只是轉換 stream 從 writable 到 readable, 並無 pipe 數據 const pass = client.getFile(path, transform); // 這裏不能 await, 若是使用 await upyun getFile 返回錯誤將沒法處理 const pipe = transform.pipe(request({ uri: someuri, method: 'post', headers: { 'Content-Type': 'image/jpeg' }, encoding: null, }); const through = await pass; // hanlde getFile promise 見 upyun node sdk 源碼。 if (!through) { // 可能 getFile 不存在或者 upyun 服務限制上傳 return false; } const reponse = await pipe; // handle pipe request } catch(err) { console.log(err); } }