理解nodejs中的stream(流)

閱讀目錄html

一:nodeJS中的stream(流)的概念及做用?node

什麼是流呢?平常生活中有水流,咱們很容易想獲得的就是水龍頭,那麼水龍頭流出的水是有序且有方向的(從高處往低處流)。咱們在nodejs中的流也是同樣的,他們也是有序且有方向的。nodejs中的流是可讀的、或可寫的、或可讀可寫的。
而且流繼承了EventEmitter。所以全部的流都是EventEmitter的實列。api

Node.js中有四種基本的流類型,以下:服務器

1. Readable--可讀的流(好比 fs.createReadStream()).
2. Writable--可寫的流(好比 fs.createWriteStream()).
3. Duplex--可讀寫的流
4. Transform---在讀寫過程當中能夠修改和變換數據的Duplex流。app

nodeJS中的流最大的做用是:讀取大文件的過程當中,不會一次性的讀入到內存中。每次只會讀取數據源的一個數據塊。
而後後續過程當中能夠當即處理該數據塊(數據處理完成後會進入垃圾回收機制)。而不用等待全部的數據。koa

咱們先來看一個簡單的流的實列來理解下:async

1. 首先咱們來建立一個大文件,以下代碼:測試

const fs = require('fs');
const file = fs.createWriteStream('./big.txt');
// 循環500萬次
for (let i = 0; i <= 5000000; i++) {
  file.write('我是空智,我來測試一個大文件, 你看看我會有多大?');
}

file.end();

我在我項目文件裏面新建一個app.js文件,而後把上面的代碼放入到 app.js 裏面去,能夠看到循環了500萬次後,寫入500萬次數據到 big.txt中去,所以會在文件目錄下生成一個 big.txt文件,以下:
ui

該文件在我磁盤中顯示345兆。編碼

readFile讀取該文件:

下面咱們使用 readFile 來讀取該文件看看(readFile會一次性讀入到內存中)。

咱們把app.js代碼改爲以下:

const fs = require('fs');
const Koa = require('koa');

const app = new Koa();

app.use(async(ctx, next) => {
  const res = ctx.res;
  fs.readFile('./big.txt', (err, data) => {
    if (err) {
      throw err;
    } else {
      res.end(data);
    }
  })
});

app.listen(3001, () => {
  console.log('listening on 3001');
});

當咱們運行node app.js 後,咱們查看下該代碼佔用的內存(12MB)以下:

可是當咱們運行 http://localhost:3001/ 後,發現佔用的內存(有338MB了)以下:

readFile 它會把 big.txt的文件內容整個的讀進以Buffer格式存入到內存中,而後再寫進返回對象,那麼這樣的效率很是低的,而且若是該文件若是是1G或2G以上的文件,那麼內存會直接被卡死掉的。或者服務器直接會奔潰掉。

下面咱們使用 Node中的createReadStream方法就能夠避免佔用內存多的狀況發生。咱們把app.js 代碼改爲以下所示:

const fs = require('fs');
const Koa = require('koa');

const app = new Koa();

app.use(async(ctx, next) => {
  const res = ctx.res;
  const file = fs.createReadStream('./big.txt');
  file.pipe(res);
});

app.listen(3001, () => {
  console.log('listening on 3001');
});

而後咱們繼續查看內存的使用狀況,以下所示:

能夠看到咱們的佔用的內存只有12.8兆。也就是說:createReadStream 在讀取大文件的過程當中,不會一次性的讀入到內存中。
每次只會讀取數據源的一個數據塊。這就是流的優勢。下面咱們來分別看下流吧。

二:fs.createReadStream() 可讀流

其基本使用方法以下:

const fs = require('fs');
const rs = fs.createReadStream('./big.txt', {
  flags: 'r', // 文件的操做方式,同readFile中的配置同樣,這裏默認是可讀的是 r
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取文件操做系統內部使用的文件描述符
  start: 0, // 開始讀取的位置
  end: 5, // 結束讀取的位置
  highWaterMark: 1 // 每次讀取的個數
});

fs.createReadStream有如下監聽事件:
具體有哪些事件能夠查看官網(http://nodejs.cn/api/stream.html#stream_class_stream_readable) 這邊先截圖出來簡單看看,以下所示:

有了上面這些監聽方法,咱們能夠先看一個完整的實列,以下代碼:

const fs = require('fs');
const file = fs.createReadStream('./msg.txt', {
  flags: 'r', // 文件的操做方式,同readFile中的配置同樣,這裏默認是可讀的是 r
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取文件操做系統內部使用的文件描述符
  start: 0, // 開始讀取的位置
  end: 5, // 結束讀取的位置
  highWaterMark: 1 // 每次讀取的個數
});

file.on('open', () => {
  console.log('開始讀取文件');
});

file.on('data', (data) => {
  console.log('讀取到的數據:');
  console.log(data);
});

file.on('end', () => {
  console.log('文件所有讀取完畢');
});

file.on('close', () => {
  console.log('文件被關閉');
});

file.on('error', (err) => {
  console.log('讀取文件失敗');
});

執行以下圖所示:

從上圖咱們能夠看到,先打開文件,執行open事件,而後就是不斷的觸發data事件,等data事情讀取結束後會觸發end事件,而後會將文件關閉,觸發close事件。

注意:msg.txt文件內容以下:hello world; 可是上面爲何只讀了 hello了,那是由於咱們上面限制了從開始讀取位置讀取,而後到結束位置結束(5). 而且限定了 highWaterMark: 1,每次讀取的個數爲1。固然若是咱們改爲每次讀取的個數爲2的話,那麼每次會讀2個字符。

pause() 方法:

若是咱們在讀取的過程當中,想暫停事件的讀取,咱們可使用 ReadStream對象的pause方法暫停data事件的觸發。 以下代碼:

file.on('data', (data) => {
  console.log('讀取到的數據:');
  console.log(data);
  file.pause();
});

而後以下圖所示:

上面暫停了使用 pause()方法,若是咱們如今想從新讀取,須要使用 resume()方法,以下所示:

setTimeout(() => {
  file.resume();
}, 100);

執行結果以下:

其餘的一些事件,好比 readable事件等,能夠看官方文檔 (http://nodejs.cn/api/stream.html#stream_event_readable). 這裏就很少分析了。

三:fs.createWriteStream() 可寫流

 以下代碼演示:

const fs = require('fs');
const file = fs.createWriteStream('./1.txt', {
  flags: 'w', // 文件的操做方式,同writeFile中的配置同樣,這裏默認是可讀的是 w
  encoding: 'utf-8', // 編碼格式
  autoClose: true, // 是否關閉讀取文件操做系統內部使用的文件描述符
  start: 0, // 開始讀取的位置
  highWaterMark: 1 // 每次寫入的個數
});

let f1 = file.write('1', 'utf-8', () => {
  console.log('寫入成功1111');
});

f1 = file.write('2', 'utf-8', () => {
  console.log('寫入成功2222');
});

f1 = file.write('3', 'utf-8', () => {
  console.log('寫入成功3333');
});

// 標記文件末尾
file.end();

// 處理事件
file.on('finish', () => {
  console.log('寫入完成');
});

file.on('error', (err) => {
  console.log(err);
});

在我項目的根目錄下會生成一個 1.txt文件,裏面有123內容。

詳細請看官網(http://nodejs.cn/api/fs.html#fs_fs_writefile_file_data_options_callback

管道流(pipe)

咱們須要把咱們上面可讀流讀到的數據須要放到可寫流中去寫入到文件裏面去。咱們能夠以下操做代碼:

const fs = require('fs');

// 讀取msg.txt中的字符串 hello world
const msg = fs.createReadStream('./msg.txt', {
  highWaterMark: 5
});

// 寫入到1.txt中
const f1 = fs.createWriteStream('./1.txt', {
  encoding: 'utf-8',
  highWaterMark: 1
});

// 監聽讀取的數據過程,把讀取的數據寫入到咱們的1.txt文件裏面去
msg.on('data', (chunk) => {
  f1.write(chunk, 'utf-8', () => {
    console.log('寫入成功');
  });
});

可是實現如上的機制,咱們可使用管道機制,管道提供了一個輸出流到輸入流的機制。一般咱們用於從一個流中獲取數據並將數據傳遞到另一個流中。以下圖所示:

如上代碼,咱們能夠改爲以下代碼:

const fs = require('fs');

// 讀取msg.txt中的字符串 hello world
const msg = fs.createReadStream('./msg.txt', {
  highWaterMark: 5
});

// 寫入到1.txt中
const f1 = fs.createWriteStream('./1.txt', {
  encoding: 'utf-8',
  highWaterMark: 1
});

const res = msg.pipe(f1);
console.log(res);

如上打印 res後,咱們在命令行中查看下基本信息以下:

相關文章
相關標籤/搜索