Node.js in Practice總結2

Streams

Streams概述

何時使用Streamsjavascript

不管咱們使用fs.readFileSync仍是fs.readFile進行文件的讀取, 咱們都要將文件所有讀取進內存中. 若是文件特別的大, 那麼解決方案應該是對大數據一塊一塊的讀取, 即讀取完一塊數據後, 要求繼續讀取下一塊數據.java

屏幕快照 2016-11-20 下午1.52.07

內建Streams

讀取單個文件算法

假設咱們使用fs.readFile進行單文件的讀取:編程

var fs = require('fs');

fs.readFile(__filename, (err, chunk) => {
  if (err) return console.error(err);
  if (chunk) {
    console.log(chunk.toString());
  }
});

若是文件過大, 甚至文件大於0x3FFFFFFF(Node.js最大的緩存大小)怎麼辦? 這時候咱們應該使用流來處理數據.json

var fs = require('fs');

fs.createReadStream(__filename).pipe(process.stdout);

錯誤處理緩存

由於stream是繼承於EventEmitter, 因此它一樣監聽error事件, 用於錯誤處理.服務器

var fs = require('fs');

var stream = fs.createReadStream('not-found');

stream.on('error', (err) => {
  console.trace();
  console.error('Stack:', err.stack);
  console.error('The error raised was:', err);
});

使用Stream base classes

正確的繼承stream base classes網絡

Readable: 輸入流app

Writable: 輸出流異步

Transform: 解析數據時候改變數據

Duplex: 輸入輸出流

PassThrough: 測試, 分析, 檢查數據

實現一個readable stream

咱們能夠經過繼承於stream.Readable而且實現_read(size)方法, 來實現一個readable.stream.

在_read中, 須要執行push將數據讀取出, 終止讀取則push(null).

var fs = require('fs');
var ReadStream = require('stream').Readable;

class MyRead extends ReadStream {
  constructor(options) {
    super(options);
  }
  _read(size) {
    this.push('hello\n');
    this.push(null);
  }
}

var myRead = new MyRead();
myRead.pipe(process.stdout);

 

實現一個writable stream

經過繼承stream.Writable並實現_write方法, 來實現一個輸出流.

var fs = require('fs');
var WriteStream = require('stream').Writable;

class MyWrite extends WriteStream {
  constructor(options) {
    super(options);
  }
  _write(chunk, encoding, cb) {
    process.stdout.write(chunk);
    cb();
  }
}

var myWrite = new MyWrite();
process.stdin.pipe(myWrite);

實現一個duplex streams

繼承stream.Duplex而且實現_read/_write方法.

var fs = require('fs');
var Duplex = require('stream').Duplex;

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.waiting = false;
  }
  _read(size) {
    if (!this.waiting) {
      this.push('Feed me data! >');
      this.waiting = true;
    }
  }
  _write(chunk, encoding, cb) {
    this.waiting = false;
    this.push(chunk);
    cb();
  }
}

var myDuplex = new MyDuplex();

process.stdin.pipe(myDuplex).pipe(process.stdout);

實現一個transform streams

繼承stream.Transform並實現_transform方法.

var fs = require('fs');
var Transform = require('stream').Transform;

class MyTransform extends Transform {
  constructor(options) {
    super(options);
  }
  _transform(chunk, encoding, cb) {
    this.push(chunk + ':::\n');
    cb();
  }
}

var myTransform = new MyTransform();

process.stdin.pipe(myTransform).pipe(process.stdout);

文件系統: 同步和異步操做文件

fs模塊概述

fs其實是POSIX的包裝模塊:

屏幕快照 2016-11-20 下午7.15.38

POSIX方法提供了一些底層的操做, 例如:

var fs = require('fs');
var assert = require('assert');

var fd = fs.openSync('./file.txt', 'w+');
var writeBuf = Buffer.from('some data to write');
fs.writeSync(fd, writeBuf, 0, writeBuf.length, 0);

var readBuf = Buffer.alloc(writeBuf.length);
fs.readSync(fd, readBuf, 0, writeBuf.length, 0);
assert.equal(writeBuf.toString(), readBuf.toString());

fs.closeSync(fd);

Streaming

fs模塊提供fs.createReadStream/fs.createWriteStream功能模塊. 它們分別可建立一個輸入輸出流, 例如可用於pipe.

var fs = require('fs');

var read = fs.createReadStream('./file.txt');
var write = fs.createWriteStream('./out.txt');
read.pipe(write);

Bulk file I/O

fs模塊也提供了fs.readFile/fs.writeFile/fs.appendFile, 用於將文件所有讀取.

var fs = require('fs');
fs.readFile('./file.txt', function(err, buf) {
  console.log(buf.toString());
});

File watching

fs模塊提供了fs.watch/fa.watchFile, 來觀察文件是否被改變.

文件操做

同步異步操做文件

考慮存在config.json文件, 咱們分別使用同步和異步讀取文件:

var fs = require('fs');
// 異步
fs.readFile('./config.json', function(err, buf) {
  if (err) throw err;
  var config = JSON.parse(buf.toString());
  doThisThing(config);
});

// 同步
try {
  var config = JSON.parse(fs.readFileSync('./config.json').toString());
  doThisThing(config);
} catch (err) {
  console.error(err);
}

備註: try/catch不可用於異步編程.

文件描述符的使用

任何一個文件操做, 均可經過文件描述符來操做, 默認狀況下, 標準輸入, 輸出和錯誤分別對應0,1,2.

var fs = require('fs');
var fd = fs.openSync('./file.txt', 'r');
var buf = fs.readFileSync(fd);
console.log(buf.toString());
fs.closeSync(fd);

文件鎖的操做

Node.js並未原生支持文件鎖的操做. 通常咱們能夠經過如下兩種方法達到相似鎖的操做: 使用exclusive標誌來建立鎖文件, 使用mkdir來建立鎖文件.

var fs = require('fs');
fs.open('file.txt', 'wx', function(err) {
  if (err) return console.error(err);
});

存在'x'標誌狀況下, 若是文件存在, 則拋出異常.

同理, 咱們能夠建立一個以前不存在的新目錄, 在新目錄下進行文件的讀寫, 也能夠達到相似的鎖功能.

文件遞歸操做

可查看fs的API, 經過readdir來讀取文件夾, 經過fs.stat來判斷當前路徑爲文件仍是目錄.

var fs = require('fs');
function readFileName(filename, dir_path) {
  fs.stat(dir_path + '/' + filename, (err, stats) => {
    if (err) return console.error(err);
    if (stats.isFile()) console.log(dir_path + '/' + filename);
    else if (stats.isDirectory()) {
      fs.readdir(dir_path + '/' + filename, (err, files) => {
        if (err) return console.error(err);
        for (let i = 0; i < files.length; i++) {
          readFileName(files[i], dir_path + '/' + filename);
        }
      });
    }
  });
}

readFileName('bin', '/usr');

觀察文件/文件夾的變更

可以使用fs.watch/fs.watchFile來觀察文件/文件夾是否變更.

var fs = require('fs');

fs.watch('./watchdir', console.log);
fs.watchFile('./watchdir', console.log);

打開兩個窗口, 一個運行上例代碼, 一個在watchdir目錄內執行touch/mv等操做, 則可看到效果.

網絡

網絡概述

基本網絡層次

屏幕快照 2016-11-25 下午7.14.31

TCP/IP

在IP協議中, 一個host由一個IP地址標識. 在Node.js中由net模塊產生TCP鏈接.

但IP協議不能保證數據傳輸的完整性, 因此須要加入TCP傳輸協議.

UDP

數據包是UDP中基本的單元. UDP是不能保證數據傳輸的完整性.

TCP客戶端和服務端

在Node.js中, 建立服務端很簡單, 使用net模塊, 建立服務器, 監聽端口便可:

var net = require('net');
var clients = 0;

var server = net.createServer((client) => {
  clients++;
  var clientId = clients;
  console.log('Client connected:', clientId);

  client.on('end', () => {
    console.log('Client disconnected:', clientId);
  });

  client.write('Welcome client:' + clientId + '\r\n');
  client.pipe(client);
});

server.listen(8000, () => {
  console.log('Server started on port 8000');
});

這裏簡單介紹一下可能難理解的一段代碼:

client.pipe(client);

由於TCP/IP是一個duplex, 即既可輸入, 又可輸出的stream.

leicj@leicj:~/test$ telnet localhost 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome client:1
hello world
hello world
i love this wrold
i love this wrold

若是咱們想要編寫一個低時延的實時應用, 則咱們須要socket.setNoDelay()去開啓TCP_NODELAY.

因爲Nagle's算法的存在, 小數據包會累積成大的數據包, 而後發送出去.

var net = require('net');

var server = net.createServer((c) => {
  // 設置非延時
  c.setNoDelay(true);
  c.write('377375042377373001', 'binary');
  console.log('server connected');
  c.on('end', () => {
    console.log('server disconnected');
    // 當沒有客戶端鏈接時候, 自動關閉
    server.unref();
  });
  c.on('data', (data) => {
    process.stdout.write(data.toString());
    c.write(data.toString());
  });
});

server.listen(8000, () => {
  console.log('Server started on port 8000');
});

UDP客戶端和服務端

咱們嘗試使用UDP進行文件的傳輸. 咱們能夠使用dgram模塊來建立數據包, 經過socket.send來發送數據.

var dgram = require('dgram'),
  fs = require('fs'),
  port = 41230,
  defaultSize = 16;

function Client(remoteIP) {
  var inStream = fs.createReadStream(__filename),
    socket = dgram.createSocket('udp4');

  inStream.on('readable', () => {
    sendData();
  });

  function sendData() {
    var msg = inStream.read(defaultSize);
    if (!msg) return socket.unref();
    socket.send(msg, 0, msg.length, port, remoteIP, (err, bytes) => {
      sendData();
    });
  }
}

function Server() {
  var socket = dgram.createSocket('udp4');

  socket.on('message', (msg, rinfo) => {
    process.stdout.write(msg.toString());
  });

  socket.on('listening', () => {
    console.log('Server ready:', socket.address());
  });

  socket.bind(port);
}

if (process.argv[2] === 'client') {
  new Client(process.argv[3]);
} else {
  new Server();
}

對上述代碼進行簡單的解釋:

  1. 對於UDP來講, 使用dgram.createSocket('udp4')建立一個socket對象, 使用socket.sendData進行數據的傳輸.
  2. 做爲服務器的socket, 則不單單須要綁定端口(socket.bind(port)), 還會監聽兩個信息: message爲接收數據時候的事件, 而listening爲服務器作好接收數據.
相關文章
相關標籤/搜索