node.js中的流

node.js中的流是一種數據傳輸手段,流是有順序的。流不關心總體流程,只管取出數據,獲取數據後的操做。
流有四種基本的類型
Readable - 可讀的流 (例如 fs.createReadStream()). 
Writable - 可寫的流 (例如 fs.createWriteStream()).
Duplex - 可讀寫的流 (例如 net.Socket). 
Transform - 在讀寫過程當中能夠修改和變換數據的 Duplex 流 (例如 zlib.createDeflate()).
可讀流有兩種模式
flowing 流動模式
paused 暫停模式

流動模式:flowing 沒有緩存區。讀一點數據,發射一點數據。當數據所有讀完了觸發一個end事件。例如:pipe(),resume()方法不走緩存.
data事件,當你一旦開始監聽data事件的時候,流就能夠讀文件的內容而且發射data 。默認請況下,當你監聽data事件以後,會不停的讀數據,而後觸發data事件,觸發完data事件後再次讀數據。
html

let rs=fs.createReadStream('./11.txt',{
    highWaterMark:3
});
rs.setEncoding('utf8');rs.on('data',function(data){
    //data獲取到的是個buffer,要想獲取字符須要設置編碼
    console.log(data);
});
rs.on('end',function(){
    console.log('文件讀完了');
});複製代碼

pipe是可讀流 的方法node

ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);//寫入數據,返回true,說明緩存區沒滿還能夠繼續寫。返回
        false暫停一下。監聽drain事件,等到觸發drain事件說明數據消化完了,再繼續讀取數據

        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}
複製代碼

dest 數據寫入目標
能夠在單個可讀流上綁定多個可寫流。
複製代碼

const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);複製代碼

暫停模式:paused (初始化模式) 內部設置一個緩存區,緩存區默認大小64kb.實際大小以highWaterMark的值爲準。當你監聽 readable事件的時候,會進入暫停模式。讀取highWaterMark的值放入緩存區,觸發readable事件。
api

let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
rs.on('readable',()=>{
   onsole.log(rs._readableState.length);//3
   let ch = rs.read(1);
   //當你讀了一個字節後,發現只剩下2個字節,不夠highWaterMark,會再次讀取highWaterMark個字節並填到
緩存區內
       console.log(rs._readableState.length);//2
       let ch = rs.read(3);
       setTimeout(()=>{
           console.log(rs._readableState.length);//5
       },200)
});
複製代碼


可寫流就是往裏面寫數據

當你往可寫流裏寫數據的時候,不是會馬上寫入文件的,而是會先寫入緩存區,緩存區的大小就是highWaterMark的值,默認是16k。 而後等緩存區滿了以後再次真正的寫入文件裏。
緩存

let fs=require('fs');
let ws=fs.createWriteStream('22.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
//若是緩存區已滿,返回false.若是緩存區沒滿返回true.
//若是能接着寫,返回true.若是不能寫返回false.
//按理說若是返回了false,就不能再往裏面寫了。可是若是繼續往裏面寫,也不會丟失,會緩存在內存裏。
等緩存區清空以後再從內存裏讀出來let flag=ws.write('1');
console.log(flag);//true

flag=ws.write('2');
console.log(flag);//true

flag=ws.write('3');
console.log(flag);//false複製代碼
自定義流
let {Writable,Readable,Duplex,Transform} = require('stream');複製代碼
自定義可讀流
爲了實現 可讀流,引用Readable接口並用它構造新對象。
  • 咱們能夠直接把供使用的數據push出去。
  • 當push一個null對象就意味着咱們想發出信號——這個流沒有更多數據了。


var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+'');
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on('data', function(data){
    console.log("讀到數據: " + data.toString());//no maybe
});
counter.on('end', function(data){
    console.log("讀完了");
});
複製代碼

自定義可寫流

爲了實現可寫流,咱們須要使用流模塊中的Writable構造函數。 咱們只需給Writable構造函數傳遞一些選項並建立一個對象。惟一須要的選項是write函數,該函數揭露數據塊要往哪裏寫。bash

  • chunk一般是一個buffer,除非咱們配置不一樣的流。
  • encoding是在特定狀況下須要的參數,一般咱們能夠忽略它。
  • callback是在完成處理數據塊後須要調用的函數。這是寫數據成功與否的標誌。若要發出故障信號,請用錯誤對象調用回調函數

var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString('utf8'));
        console.log("增長: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("項目:" + i, 'utf8');
}
w.end("結束寫入",function(){
    console.log(stock);
});
複製代碼

雙工流
雙工流(可讀可寫流)是 可讀流可寫流的實現。例如:net.Socket

let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
    read(){
        if(index++<3)
          this.push('a'); 
          else 
       this.push(null);   
    },
    write(chunk,encoding,cb){
       console.log(chunk.toString().toUpperCase());
       cb();
    }
});
//process.stdin 標準輸入流
//proces.stdout標準輸出流
process.stdin.pipe(s).pipe(process.stdout);複製代碼

Transform轉換流
轉換流是實現數據轉換的,( 可讀流 可寫流)只能實現一種。

let {Transform}  = require('stream');
let t = Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
複製代碼
相關文章
相關標籤/搜索