mafintosh/end-of-stream

https://github.com/mafintosh/end-of-streamnode

Call a callback when a readable/writable/duplex stream has completed or failed.git

 

end-of-stream

A node module that calls a callback when a readable/writable/duplex stream has completed or failed.github

就是當一個可讀/可寫。雙工流完成或失敗時能夠回調的模塊,如eos(readableStream,cb)意思就是當將這個回調函數cb與流封裝到一塊兒啦,readableStream以後能夠進行各類操做,pipe/destroy等,而後相應地觸發cb回調函數npm

npm install end-of-stream

 

Usage

Simply pass a stream and a callback to the eos. Both legacy streams, streams2 and stream3 are supported.dom

var eos = require('end-of-stream');

eos(readableStream, function(err) {
  // this will be set to the stream instance
    if (err) return console.log('stream had an error or closed early');
    console.log('stream has ended', this === readableStream);//readableStream關閉可讀是end,上下文環境this即readableStream
});

eos(writableStream, function(err) {
    if (err) return console.log('stream had an error or closed early');
    console.log('stream has finished', this === writableStream);//writableStream關閉可寫是finish
});

eos(duplexStream, function(err) {
    if (err) return console.log('stream had an error or closed early');
    console.log('stream has ended and finished', this === duplexStream);//duplexStream關閉要end和finish
});

eos(duplexStream, {readable:false}, function(err) {
    if (err) return console.log('stream had an error or closed early');
    console.log('stream has finished but might still be readable');//{readable:false}即不end,該duplexStream可能還能讀
});

eos(duplexStream, {writable:false}, function(err) {
    if (err) return console.log('stream had an error or closed early');
    console.log('stream has ended but might still be writable');//{writable:false}即不finish,該duplexStream可能還能寫
});

eos(readableStream, {error:false}, function(err) {
    // do not treat emit('error', err) as a end-of-stream
});

 

end-of-stream/index.jssocket

在這裏設置是finish後是不可寫,end後是不可讀函數

var once = require('once');

var noop = function() {};

var isRequest = function(stream) {
    return stream.setHeader && typeof stream.abort === 'function';
};

var isChildProcess = function(stream) {
    return stream.stdio && Array.isArray(stream.stdio) && stream.stdio.length === 3
};

var eos = function(stream, opts, callback) {
    if (typeof opts === 'function') return eos(stream, null, opts);
    if (!opts) opts = {};

    callback = once(callback || noop);

    var ws = stream._writableState;
    var rs = stream._readableState;
    var readable = opts.readable || (opts.readable !== false && stream.readable);
    var writable = opts.writable || (opts.writable !== false && stream.writable);

    var onlegacyfinish = function() {
        if (!stream.writable) onfinish();
    };

    var onfinish = function() {
        writable = false;
        if (!readable) callback.call(stream);
    };

    var onend = function() {
        readable = false;
        if (!writable) callback.call(stream);
    };

    var onexit = function(exitCode) {
        callback.call(stream, exitCode ? new Error('exited with error code: ' + exitCode) : null);
    };

    var onerror = function(err) {
        callback.call(stream, err);
    };

    var onclose = function() {
        if (readable && !(rs && rs.ended)) return callback.call(stream, new Error('premature close'));
        if (writable && !(ws && ws.ended)) return callback.call(stream, new Error('premature close'));
    };

    var onrequest = function() {
        stream.req.on('finish', onfinish);
    };

    if (isRequest(stream)) {
        stream.on('complete', onfinish);
        stream.on('abort', onclose);
        if (stream.req) onrequest();
        else stream.on('request', onrequest);
    } else if (writable && !ws) { // legacy streams
        stream.on('end', onlegacyfinish);
        stream.on('close', onlegacyfinish);
    }

    if (isChildProcess(stream)) stream.on('exit', onexit);
    
//三類事件的監聽 stream.on(
'end', onend); stream.on('finish', onfinish); if (opts.error !== false) stream.on('error', onerror); stream.on('close', onclose); return function() { stream.removeListener('complete', onfinish);//移除改事件並調用onfinish函數 stream.removeListener('abort', onclose); stream.removeListener('request', onrequest); if (stream.req) stream.req.removeListener('finish', onfinish); stream.removeListener('end', onlegacyfinish); stream.removeListener('close', onlegacyfinish); stream.removeListener('finish', onfinish); stream.removeListener('exit', onexit); stream.removeListener('end', onend); stream.removeListener('error', onerror); stream.removeListener('close', onclose); }; }; module.exports = eos;

因此當調用eos(...)時,基本上就是監聽了各式各樣的事件,而後返回一個函數。這個函數裏面是各種的removeListener,就是將你以前的監聽所有移除,並觸發相應的事件,運行返回的函數的方法就是eos(...)(),以下面的第四個例子oop

end-of-stream/test.js
var assert = require('assert');
var eos = require('./index');

var expected = 10;//下面有十個例子
var fs = require('fs');
var cp = require('child_process');//nodejs子進程,看博客
var net = require('net');
var http = require('http');

var ws = fs.createWriteStream('/dev/null');
eos(ws, function(err) {
    expected--;
    assert(!!err);
    assert(this === ws);
    if (!expected) process.exit(0);
});
ws.destroy();//將會觸發error和close事件

var rs1 = fs.createReadStream('/dev/urandom');
eos(rs1, function(err) {
    expected--;
    assert(!!err);
    assert(this === rs1);
    if (!expected) process.exit(0);//說明運行成功
});
rs1.destroy();//將會觸發error和close事件

var rs2 = fs.createReadStream(__filename);
eos(rs2, function(err) {
    expected--;
    assert.ifError(err);
    assert(this === rs2);
    if (!expected) process.exit(0);
});
rs2.pipe(fs.createWriteStream('/dev/null'));

var rs3 = fs.createReadStream(__filename);
eos(rs3, function(err) {
    assert.ifError(err);
    assert(this === rs);
    throw new Error('no go');
})();//運行eos的返回函數,即各個removeListener
rs3.pipe(fs.createWriteStream('/dev/null'));

var exec = cp.exec('echo hello world');
eos(exec, function(err) {
    expected--;
    assert.ifError(err);
    assert(this === exec);
    if (!expected) process.exit(0);
});

var spawn = cp.spawn('echo', ['hello world']);
eos(spawn, function(err) {
    expected--;
    assert.ifError(err);
    assert(this === spawn);
    if (!expected) process.exit(0);
});

var socket = net.connect(50000);
eos(socket, function(err) {
    expected--;
    assert(!!err);
    assert(this === socket);
    if (!expected) process.exit(0);
});

var server = net.createServer(function(socket) {
    eos(socket, function(err) {
        expected--;
        assert(!!err);
        assert(this === socket);
        if (!expected) process.exit(0);
    });
    socket.destroy();
}).listen(30000, function() {
    var socket = net.connect(30000);
    eos(socket, function(err) {
        expected--;
        assert.ifError(err);
        assert(this === socket);
        if (!expected) process.exit(0);
    });
});

var server2 = http.createServer(function(req, res) {
    eos(res, function(err) {
        expected--;
        assert.ifError(err);
    });
    res.end();
}).listen(function() {
    var port = server2.address().port;
    http.get('http://localhost:' + port, function(res) {
        eos(res, function(err) {
            expected--;
            assert.ifError(err);
            server2.close();
        });
        res.resume();
    });
});

setTimeout(function() {
    assert(expected === 0);
    process.exit(0);
}, 1000);
    • process.exit(0)表示成功完成,回調函數中,err將爲null;ui

    • process.exit(非0)表示執行失敗,回調函數中,err不爲null,err.code就是咱們傳給exit的數字。this

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息