Node.js的流就是爲了在有限的內存中實現咱們操做"海量"數據的目標。node
流是一組有序的,有起點和終點的字節數據傳輸手段,它是一個抽象的接口,被 Node 中的不少對象所實現。node裏不少內容都應用到流,好比HTTP 服務器request和response對象都是流。緩存
它不關心文件的總體內容,只關注是否從文件中讀到了數據,以及讀到數據以後的處理。bash
Node.js中Stream 有四種流類型。服務器
可讀流分爲:流動模式(flowing mode)
和暫停模式(paused mode)
異步
可讀流在建立時都是暫停模式。暫停模式和流動模式能夠互相轉換。函數
1) 流動模式(flowing mode)流動模式下,數據會源源不斷地生產出來,造成「流動」現象。監聽流的data事件即可進入該模式。ui
2) 暫停模式(paused mode)暫停模式下,須要顯示地調用read(),觸發data事件。this
在初始狀態下,監聽data事件,會使流進入流動模式。但若是在暫停模式下,監聽data事件並不會使它進入流動模式。爲了消耗流,須要顯示調用read()方法。編碼
3)相互轉化
若是不存在管道目標,調用readable.resume()可以使流進入流動模式spa
若是存在管道目標,調用 stream.unpipe()並取消'data'事件監聽
var rs = fs.createReadStream(path,[options]);複製代碼
path讀取文件的路徑
options
flags 打開文件要作的操做,默認爲'r'讀取
encoding 默認爲null,表明buffer。若是指定utf8編碼highWaterMark要大於3個字節
start 開始讀取的索引位置
end 結束讀取的索引位置(包括結束位置)
highWaterMark 讀取緩存區默認的大小64kb
autoClose 讀取完畢後是否自動關閉
let fs=require('fs');
let path=require('path');
let rs=fs.createReadStream(path.join(__dirname,'1.txt'),{ //這裏的參數通常不會寫
flags:'r',//文件的操做是讀取操做
encoding:'utf8', // 默認是null null表明的是buffer
autoClose:true, // 讀取完畢後自動關閉
highWaterMark:3,// 默認是64k 64*1024b
start:0, //讀取的起始位置
end:3 // 讀取的結束位置,包前又包後,至關於閉區間
})
//默認狀況下 不會將文件中的內容輸出
//內部會先建立一個buffer先讀取3b
//至關於有蓋子的水管,不會流出來,存儲在管中
//有兩種模式 非流動模式/暫停模式
//由於建立時第二個參數通常不會寫,讀出來的類型是buffer,這個方法能夠指定編碼
rs.setEncoding('utf8');
//打開文件
rs.on('open',function(data){
console.log(data)
})
//關閉文件
rs.on('close',function(data){
console.log(data)
})
//有錯誤就會報錯誤
rs.on('err',function(data){
console.log(data)
})
//暫停模式->流動模式
//流動模式只要監聽了會瘋狂的觸發data事件,直到讀取完畢
rs.on('data',function(data){
console.log(data);
//一打開水龍頭就嘩嘩出水,有個方法可讓它暫停
rs.pause(); //暫停方法,表示暫停讀取,暫停data事件觸發
})
setInterval(function(){
rs.resume(); //恢復data事件的觸發,變爲流動模式繼續讀取
},3000)
rs.on('end',function(data){ //先end再close關閉
console.log(data)
})
複製代碼
let fs=require('fs');
let path=require('path');
let rs=fs.createReadStream(path.join(__dirname,'1.txt'));
rs.setEncoding('utf8');
// 當我只要建立一個流,就會先把緩存區填滿,等待着你本身消費
// 若是當前緩存區被清空後會再次觸發readable事件
// 當你消費小於最高水位線時,會自動添加highWater這麼多數據
rs.on('readable', () => {
let d = rs.read(1)
console.log(d)
})複製代碼
let EventEmitter = require('events');
let fs = require('fs');
class ReadStream extends EventEmitter {
constructor(path,options){
super();
this.path = path;
this.flags = options.flags || 'r';
this.autoClose = options.autoClose || true;
this.highWaterMark = options.highWaterMark|| 64*1024;
this.start = options.start||0;
this.end = options.end;
this.encoding = options.encoding || null
this.open();//打開文件 fd
this.flowing = null; // null就是暫停模式
// 看是否監聽了data事件,若是監聽了 就要變成流動模式
// 要創建一個buffer 這個buffer就是要一次讀多少
this.buffer = Buffer.alloc(this.highWaterMark);
this.pos = this.start; // pos 讀取的位置 可變 start不變的
this.on('newListener',(eventName,callback)=>{
if(eventName === 'data'){
// 至關於用戶監聽了data事件
this.flowing = true;
// 監聽了 就去讀
this.read(); // 去讀內容了
}
})
}
read(){
// 此時文件還沒打開呢
if(typeof this.fd !== 'number'){
// 當文件真正打開的時候 會觸發open事件,觸發事件後再執行read,此時fd確定有了
return this.once('open',()=>this.read())
}
// 此時有fd了
let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
// 讀到了多少個 累加
if(bytesRead>0){
this.pos+= bytesRead;
let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);
this.emit('data',data);
// 當讀取的位置 大於了末尾 就是讀取完畢了
if(this.pos > this.end){
this.emit('end');
this.destroy();
}
if(this.flowing) { // 流動模式繼續觸發
this.read();
}
}else{
this.emit('end');
this.destroy();
}
});
}
resume(){
this.flowing = true;
this.read();
}
pause(){
this.flowing = false;
}
destroy(){
// 先判斷有沒有fd 有關閉文件 觸發close事件
if(typeof this.fd ==='number'){
fs.close(this.fd,()=>{
this.emit('close');
});
return;
}
this.emit('close'); // 銷燬
};
open(){
// copy 先打開文件
fs.open(this.path,this.flags,(err,fd)=>{
if(err){
this.emit('error',err);
if(this.autoClose){ // 是否自動關閉
this.destroy();
}
return;
}
this.fd = fd; // 保存文件描述符
this.emit('open'); // 文件打開了
});
}
}
module.exports = ReadStream;複製代碼
.pipe()
函數是接受一個源頭src
並將數據輸出到一個可寫的流dst
中簡單來講,邊讀邊寫東西,讀太快,來不及寫,就先暫停讀,等寫完了再繼續讀。
let fs = require('fs');
let path = require('path');
let ReadStream = require('./ReadStream');
let WriteStream = require('./WriteStream');
let rs = new ReadStream(path.join(__dirname,'./1.txt'),{
highWaterMark:4
});
let ws = new WriteStream(path.join(__dirname,'./2.txt'),{
highWaterMark:1
});
// 讀四個,寫一個
rs.pipe(ws); // pipe就是讀一點寫一點複製代碼
pipe原理實現,寫在ReadStream的方法中
pipe(ws){
this.on('data',(chunk)=>{
let flag = ws.write(chunk);
if(!flag){
this.pause();
}
});
ws.on('drain',()=>{
this.resume();
})
}複製代碼
let fs = require('fs');
let EventEmitter = require('events');
//當讀取內容大於緩存區,從新計算讀取數量n的大小的方法
function computeNewHighWaterMark(n) {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
return n;
}
class ReadStream extends EventEmitter {
constructor(path, options) {
super();
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true;
this.start = 0;
this.end = options.end;
this.flags = options.flags || 'r';
this.buffers = []; // 緩存區
this.pos = this.start;
this.length = 0; // 緩存區大小
this.emittedReadable = false;
this.reading = false; // 不是正在讀取的
this.open();
this.on('newListener', (eventName) => {
if (eventName === 'readable') {
this.read();
}
})
}
read(n) { // 想取1個
if(n>this.length){
// 更改緩存區大小 讀取五個就找 2的幾回放最近的
this.highWaterMark = computeNewHighWaterMark(n)
this.emittedReadable = true;
this._read();
}
// 若是n>0 去緩存區中取吧
let buffer=null;
let index = 0; // 維護buffer的索引的
let flag = true;
if (n > 0 && n <= this.length) { // 讀的內容 緩存區中有這麼多
// 在緩存區中取 [[2,3],[4,5,6]]
buffer = Buffer.alloc(n); // 這是要返回的buffer
let buf;
while (flag&&(buf = this.buffers.shift())) {
for (let i = 0; i < buf.length; i++) {
buffer[index++] = buf[i];
if(index === n){ // 拷貝夠了 不須要拷貝了
flag = false;
this.length -= n;
let bufferArr = buf.slice(i+1); // 取出留下的部分
// 若是有剩下的內容 在放入到緩存中
if(bufferArr.length > 0){
this.buffers.unshift(bufferArr);
}
break;
}
}
}
}
// 當前緩存區 小於highWaterMark時在去讀取
if (this.length == 0) {
this.emittedReadable = true;
}
if (this.length < this.highWaterMark) {
if(!this.reading){
this.reading = true;
this._read(); // 異步的
}
}
return buffer
}
// 封裝的讀取的方法
_read() {
// 當文件打開後在去讀取
if (typeof this.fd !== 'number') {
return this.once('open', () => this._read());
}
// 上來我要喝水 先倒三升水 []
let buffer = Buffer.alloc(this.highWaterMark);
fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
if (bytesRead > 0) {
// 默認讀取的內容放到緩存區中
this.buffers.push(buffer.slice(0, bytesRead));
this.pos += bytesRead; // 維護讀取的索引
this.length += bytesRead;// 維護緩存區的大小
this.reading = false;
// 是否須要觸發readable事件
if (this.emittedReadable) {
this.emittedReadable = false; // 下次默認不觸發
this.emit('readable');
}
} else {
this.emit('end');
this.destroy();
}
})
}
destroy() {
if (typeof this.fd !== 'number') {
return this.emit('close')
}
fs.close(this.fd, () => {
this.emit('close')
})
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err);
if (this.autoClose) {
this.destroy();
}
return
}
this.fd = fd;
this.emit('open');
});
}
}
module.exports = ReadStream;複製代碼
var ws = fs.createWriteStream(path,[options]);
path寫入的文件路徑
options
flags打開文件要作的操做,默認爲'w'
encoding默認爲utf8
highWaterMark寫入緩存區的默認大小16kb
let fs=require('fs');
let path=require('path');
//寫的時候文件不存在,會建立文件
let ws = fs.createWriteStream('./1.txt',{
flags:'w',
mode:0o666,
autoClose:true,
highWaterMark:3, // 默認寫是16k
encoding:'utf8',
start:0
});
//第一個參數寫入的數據必須是字符串或者Buffer
//第二個參數寫入以什麼編碼寫進去
//第三個參數callback
//有返回值,表明是否能繼續寫,寫的時候,有個緩存區的概念。可是返回false,也不會丟失,就是會把內容放到內存中
let flag=ws.write(1+'','utf8',()=>{})//這是異步的方法
//傳入的參數,寫完後也會寫入文件內
ws.end('ok'); //當寫完後,就不能再繼續寫了
//抽乾方法,當寫入完後會觸發drain方法
//緩存區必須滿了,滿了清空後纔會觸發drain
//若是調用end後,再調用這個方法沒有意義了
ws.on('drain',function(){
console.log('drain')
})
複製代碼
let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter{
constructor(path,options){
super();
this.path = path;
this.highWaterMark = options.highWaterMark||16*1024;
this.autoClose = options.autoClose||true;
this.mode = options.mode;
this.start = options.start||0;
this.flags = options.flags||'w';
this.encoding = options.encoding || 'utf8';
// 可寫流 要有一個緩存區,當正在寫入文件是,內容要寫入到緩存區中
// 在源碼中是一個鏈表 => []
this.buffers = [];
// 標識 是否正在寫入
this.writing = false;
// 是否知足觸發drain事件
this.needDrain = false;
// 記錄寫入的位置
this.pos = 0;
// 記錄緩存區的大小
this.length = 0;
this.open();
}
destroy(){
if(typeof this.fd !=='number'){
return this.emit('close');
}
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
this.emit('error',err);
if(this.autoClose){
this.destroy();
}
return
}
this.fd = fd;
this.emit('open');
})
}
write(chunk,encoding=this.encoding,callback=()=>{}){
chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
// write 返回一個boolean類型
this.length+=chunk.length;
let ret = this.length<this.highWaterMark; // 比較是否達到了緩存區的大小
this.needDrain = !ret; // 是否須要觸發needDrain
// 判斷是否正在寫入 若是是正在寫入 就寫入到緩存區中
if(this.writing){
this.buffers.push({
encoding,
chunk,
callback
}); // []
}else{
// 專門用來將內容 寫入到文件內
this.writing = true;
this._write(chunk,encoding,()=>{
callback();
this.clearBuffer();
}); // 8
}
return ret;
}
clearBuffer(){
let buffer = this.buffers.shift();
if(buffer){
this._write(buffer.chunk,buffer.encoding,()=>{
buffer.callback();
this.clearBuffer()
});
}else{
this.writing = false;
if(this.needDrain){ // 是否須要觸發drain 須要就發射drain事件
this.needDrain = false;
this.emit('drain');
}
}
}
_write(chunk,encoding,callback){
if(typeof this.fd !== 'number'){
return this.once('open',()=>this._write(chunk,encoding,callback));
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
this.length -= byteWritten;
this.pos += byteWritten;
callback(); // 清空緩存區的內容
});
}
}
module.exports = WriteStream;複製代碼
啊~~文章彷佛太長太囉嗦了,看來怎麼把給本身看的筆記整理成一個好的文章也是一門學問!