node.js中的流是一種數據傳輸手段,流是有順序的。流不關心總體流程,只管取出數據,獲取數據後的操做。
流有四種基本的類型
Readable - 可讀的流 (例如 fs.createReadStream()).
Writable - 可寫的流 (例如 fs.createWriteStream()).
Duplex - 可讀寫的流 (例如 net.Socket).
Transform - 在讀寫過程當中能夠修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).
可讀流有兩種模式
flowing 流動模式
paused 暫停模式
流動模式:flowing 沒有緩存區。讀一點數據,發射一點數據。當數據所有讀完了觸發一個end事件。例如:pipe(),resume()方法不走緩存.
data事件,當你一旦開始監聽data事件的時候,流就能夠讀文件的內容而且發射data 。默認請況下,當你監聽data事件以後,會不停的讀數據,而後觸發data事件,觸發完data事件後再次讀數據。
html
let rs=fs.createReadStream('./11.txt',{
highWaterMark:3
});
rs.setEncoding('utf8');rs.on('data',function(data){
//data獲取到的是個buffer,要想獲取字符須要設置編碼
console.log(data);
});
rs.on('end',function(){
console.log('文件讀完了');
});複製代碼
pipe是可讀流 的方法node
ReadStream.prototype.pipe = function (dest) {
this.on('data', (data)=>{
let flag = dest.write(data);//寫入數據,返回true,說明緩存區沒滿還能夠繼續寫。返回
false暫停一下。監聽drain事件,等到觸發drain事件說明數據消化完了,再繼續讀取數據
if(!flag){
this.pause();
}
});
dest.on('drain', ()=>{
this.resume();
});
this.on('end', ()=>{
dest.end();
});
}
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
複製代碼
dest 數據寫入目標
能夠在單個可讀流上綁定多個可寫流。
複製代碼
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);複製代碼
暫停模式:paused (初始化模式) 內部設置一個緩存區,緩存區默認大小64kb.實際大小以highWaterMark的值爲準。當你監聽 readable事件的時候,會進入暫停模式。讀取highWaterMark的值放入緩存區,觸發readable事件。
api
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
rs.on('readable',()=>{
onsole.log(rs._readableState.length);//3
let ch = rs.read(1);
//當你讀了一個字節後,發現只剩下2個字節,不夠highWaterMark,會再次讀取highWaterMark個字節並填到
緩存區內
console.log(rs._readableState.length);//2
let ch = rs.read(3);
setTimeout(()=>{
console.log(rs._readableState.length);//5
},200)
});
複製代碼
可寫流就是往裏面寫數據
當你往可寫流裏寫數據的時候,不是會馬上寫入文件的,而是會先寫入緩存區,緩存區的大小就是highWaterMark的值,默認是16k。 而後等緩存區滿了以後再次真正的寫入文件裏。
緩存
let fs=require('fs');
let ws=fs.createWriteStream('22.txt',{
flags:'w',
mode:0o666,
start:0,
highWaterMark:3
});
//若是緩存區已滿,返回false.若是緩存區沒滿返回true.
//若是能接着寫,返回true.若是不能寫返回false.
//按理說若是返回了false,就不能再往裏面寫了。可是若是繼續往裏面寫,也不會丟失,會緩存在內存裏。
等緩存區清空以後再從內存裏讀出來let flag=ws.write('1');
console.log(flag);//true
flag=ws.write('2');
console.log(flag);//true
flag=ws.write('3');
console.log(flag);//false複製代碼
自定義流
let {Writable,Readable,Duplex,Transform} = require('stream');複製代碼
自定義可讀流
爲了實現 可讀流,引用Readable接口並用它構造新對象。
- 咱們能夠直接把供使用的數據push出去。
- 當push一個null對象就意味着咱們想發出信號——這個流沒有更多數據了。
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
stream.Readable.call(this, options);
this._index = 0;
}
Counter.prototype._read = function() {
if(this._index++<3){
this.push(this._index+'');
}else{
this.push(null);
}
};
var counter = new Counter();
counter.on('data', function(data){
console.log("讀到數據: " + data.toString());//no maybe
});
counter.on('end', function(data){
console.log("讀完了");
});
複製代碼
自定義可寫流
爲了實現可寫流,咱們須要使用流模塊中的Writable構造函數。 咱們只需給Writable構造函數傳遞一些選項並建立一個對象。惟一須要的選項是write函數,該函數揭露數據塊要往哪裏寫。bash
- chunk一般是一個buffer,除非咱們配置不一樣的流。
- encoding是在特定狀況下須要的參數,一般咱們能夠忽略它。
- callback是在完成處理數據塊後須要調用的函數。這是寫數據成功與否的標誌。若要發出故障信號,請用錯誤對象調用回調函數
var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
setTimeout(()=>{
stock.push(chunk.toString('utf8'));
console.log("增長: " + chunk);
callback();
},500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
w.write("項目:" + i, 'utf8');
}
w.end("結束寫入",function(){
console.log(stock);
});
複製代碼
雙工流
雙工流(可讀可寫流)是 可讀流和 可寫流的實現。例如:net.Socket
let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
read(){
if(index++<3)
this.push('a');
else
this.push(null);
},
write(chunk,encoding,cb){
console.log(chunk.toString().toUpperCase());
cb();
}
});
//process.stdin 標準輸入流
//proces.stdout標準輸出流
process.stdin.pipe(s).pipe(process.stdout);複製代碼
Transform轉換流
轉換流是實現數據轉換的,( 可讀流 和 可寫流)只能實現一種。
let {Transform} = require('stream');
let t = Transform({
transform(chunk,encoding,cb){
this.push(chunk.toString().toUpperCase());
cb();
}
});
process.stdin.pipe(t).pipe(process.stdout);
複製代碼