流在Node.js中是處理流數據的抽象接口。
stream
模塊提供了基礎的API。使用這些API能夠很容易地來構建實現流接口的對象。 Node.js裏提供了不少流對象。例如http.IncomingMessage
類、fs.createReadStream
類等等。都繼承流的私有屬性和公有方法。 因此學習流,有助於學習Node的其餘模塊。javascript
const EE = require('events');
const util = require('util');
function Stream() {
EE.call(this);
}
util.inherits(Stream, EE);
複製代碼
Stream
繼承EventEmitter
。流能夠是可讀的、可寫的,或是可讀寫的。Stream
分爲Readable
(可讀流)、Writable
(可寫流)、Duplex
(可讀寫流)、Transform
(讀寫過程當中能夠修改和變換數據的 Duplex 流)。Stream.prototype.pipe = function(dest, options){
var source = this;
source.on('data', ondata);
dest.on('drain', ondrain);
if (!dest._isStdio && (!options || options.end !== false)) {
source.on('end', onend);
source.on('close', onclose);
}
source.on('error', onerror);
dest.on('error', onerror);
source.on('end', cleanup);
source.on('close', cleanup);
dest.on('close', cleanup);
dest.emit('pipe', source);
return dest;
};
複製代碼
Stream
公有方法pipe
Readable.pipe(Writable)
。function ondata(chunk) {
if (dest.Writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
}
複製代碼
dest.Writable
當寫完時會賦值爲false。dest.write(chunk)
返回false。source.pause
暫停寫入。function ondrain() {
if (source.Readable && source.resume) {
source.resume();
}
}
複製代碼
source.Readable
在讀到末尾時會賦值爲false。source.resume()
表示會從新觸發Writable的data事件。source.resume()
,觸發Writable的data事件。function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);
source.removeListener('close', onclose);
source.removeListener('error', onerror);
dest.removeListener('error', onerror);
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
dest.removeListener('close', cleanup);
}
複製代碼
function onerror(er) {
cleanup();
if (EE.listenerCount(this, 'error') === 0) {
throw er; // Unhandled stream error in pipe.
}
}
複製代碼
if (!dest._isStdio && (!options || options.end !== false)) {
source.on('end', onend);
source.on('close', onclose);
}
複製代碼
dest._isStdio
暫時沒理解。var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;
dest.end();
}
複製代碼
dest.end()
,中止寫入。function onclose() {
if (didOnEnd) return;
didOnEnd = true;
if (typeof dest.destroy === 'function') dest.destroy();
}
複製代碼
dest.destroy()
摧毀這個流,併發出傳過來的錯誤。當這個函數被調用後,這個寫入流就結束了。