何時使用Streamsjavascript
不管咱們使用fs.readFileSync仍是fs.readFile進行文件的讀取, 咱們都要將文件所有讀取進內存中. 若是文件特別的大, 那麼解決方案應該是對大數據一塊一塊的讀取, 即讀取完一塊數據後, 要求繼續讀取下一塊數據.java
讀取單個文件算法
假設咱們使用fs.readFile進行單文件的讀取:編程
var fs = require('fs'); fs.readFile(__filename, (err, chunk) => { if (err) return console.error(err); if (chunk) { console.log(chunk.toString()); } });
若是文件過大, 甚至文件大於0x3FFFFFFF(Node.js最大的緩存大小)怎麼辦? 這時候咱們應該使用流來處理數據.json
var fs = require('fs'); fs.createReadStream(__filename).pipe(process.stdout);
錯誤處理緩存
由於stream是繼承於EventEmitter, 因此它一樣監聽error事件, 用於錯誤處理.服務器
var fs = require('fs'); var stream = fs.createReadStream('not-found'); stream.on('error', (err) => { console.trace(); console.error('Stack:', err.stack); console.error('The error raised was:', err); });
正確的繼承stream base classes網絡
Readable: 輸入流app
Writable: 輸出流異步
Transform: 解析數據時候改變數據
Duplex: 輸入輸出流
PassThrough: 測試, 分析, 檢查數據
實現一個readable stream
咱們能夠經過繼承於stream.Readable而且實現_read(size)方法, 來實現一個readable.stream.
在_read中, 須要執行push將數據讀取出, 終止讀取則push(null).
var fs = require('fs'); var ReadStream = require('stream').Readable; class MyRead extends ReadStream { constructor(options) { super(options); } _read(size) { this.push('hello\n'); this.push(null); } } var myRead = new MyRead(); myRead.pipe(process.stdout);
實現一個writable stream
經過繼承stream.Writable並實現_write方法, 來實現一個輸出流.
var fs = require('fs'); var WriteStream = require('stream').Writable; class MyWrite extends WriteStream { constructor(options) { super(options); } _write(chunk, encoding, cb) { process.stdout.write(chunk); cb(); } } var myWrite = new MyWrite(); process.stdin.pipe(myWrite);
實現一個duplex streams
繼承stream.Duplex而且實現_read/_write方法.
var fs = require('fs'); var Duplex = require('stream').Duplex; class MyDuplex extends Duplex { constructor(options) { super(options); this.waiting = false; } _read(size) { if (!this.waiting) { this.push('Feed me data! >'); this.waiting = true; } } _write(chunk, encoding, cb) { this.waiting = false; this.push(chunk); cb(); } } var myDuplex = new MyDuplex(); process.stdin.pipe(myDuplex).pipe(process.stdout);
實現一個transform streams
繼承stream.Transform並實現_transform方法.
var fs = require('fs'); var Transform = require('stream').Transform; class MyTransform extends Transform { constructor(options) { super(options); } _transform(chunk, encoding, cb) { this.push(chunk + ':::\n'); cb(); } } var myTransform = new MyTransform(); process.stdin.pipe(myTransform).pipe(process.stdout);
fs其實是POSIX的包裝模塊:
POSIX方法提供了一些底層的操做, 例如:
var fs = require('fs'); var assert = require('assert'); var fd = fs.openSync('./file.txt', 'w+'); var writeBuf = Buffer.from('some data to write'); fs.writeSync(fd, writeBuf, 0, writeBuf.length, 0); var readBuf = Buffer.alloc(writeBuf.length); fs.readSync(fd, readBuf, 0, writeBuf.length, 0); assert.equal(writeBuf.toString(), readBuf.toString()); fs.closeSync(fd);
Streaming
fs模塊提供fs.createReadStream/fs.createWriteStream功能模塊. 它們分別可建立一個輸入輸出流, 例如可用於pipe.
var fs = require('fs'); var read = fs.createReadStream('./file.txt'); var write = fs.createWriteStream('./out.txt'); read.pipe(write);
Bulk file I/O
fs模塊也提供了fs.readFile/fs.writeFile/fs.appendFile, 用於將文件所有讀取.
var fs = require('fs'); fs.readFile('./file.txt', function(err, buf) { console.log(buf.toString()); });
File watching
fs模塊提供了fs.watch/fa.watchFile, 來觀察文件是否被改變.
同步異步操做文件
考慮存在config.json文件, 咱們分別使用同步和異步讀取文件:
var fs = require('fs'); // 異步 fs.readFile('./config.json', function(err, buf) { if (err) throw err; var config = JSON.parse(buf.toString()); doThisThing(config); }); // 同步 try { var config = JSON.parse(fs.readFileSync('./config.json').toString()); doThisThing(config); } catch (err) { console.error(err); }
備註: try/catch不可用於異步編程.
文件描述符的使用
任何一個文件操做, 均可經過文件描述符來操做, 默認狀況下, 標準輸入, 輸出和錯誤分別對應0,1,2.
var fs = require('fs'); var fd = fs.openSync('./file.txt', 'r'); var buf = fs.readFileSync(fd); console.log(buf.toString()); fs.closeSync(fd);
文件鎖的操做
Node.js並未原生支持文件鎖的操做. 通常咱們能夠經過如下兩種方法達到相似鎖的操做: 使用exclusive標誌來建立鎖文件, 使用mkdir來建立鎖文件.
var fs = require('fs'); fs.open('file.txt', 'wx', function(err) { if (err) return console.error(err); });
存在'x'標誌狀況下, 若是文件存在, 則拋出異常.
同理, 咱們能夠建立一個以前不存在的新目錄, 在新目錄下進行文件的讀寫, 也能夠達到相似的鎖功能.
文件遞歸操做
可查看fs的API, 經過readdir來讀取文件夾, 經過fs.stat來判斷當前路徑爲文件仍是目錄.
var fs = require('fs'); function readFileName(filename, dir_path) { fs.stat(dir_path + '/' + filename, (err, stats) => { if (err) return console.error(err); if (stats.isFile()) console.log(dir_path + '/' + filename); else if (stats.isDirectory()) { fs.readdir(dir_path + '/' + filename, (err, files) => { if (err) return console.error(err); for (let i = 0; i < files.length; i++) { readFileName(files[i], dir_path + '/' + filename); } }); } }); } readFileName('bin', '/usr');
觀察文件/文件夾的變更
可以使用fs.watch/fs.watchFile來觀察文件/文件夾是否變更.
var fs = require('fs'); fs.watch('./watchdir', console.log); fs.watchFile('./watchdir', console.log);
打開兩個窗口, 一個運行上例代碼, 一個在watchdir目錄內執行touch/mv等操做, 則可看到效果.
基本網絡層次
TCP/IP
在IP協議中, 一個host由一個IP地址標識. 在Node.js中由net模塊產生TCP鏈接.
但IP協議不能保證數據傳輸的完整性, 因此須要加入TCP傳輸協議.
UDP
數據包是UDP中基本的單元. UDP是不能保證數據傳輸的完整性.
在Node.js中, 建立服務端很簡單, 使用net模塊, 建立服務器, 監聽端口便可:
var net = require('net'); var clients = 0; var server = net.createServer((client) => { clients++; var clientId = clients; console.log('Client connected:', clientId); client.on('end', () => { console.log('Client disconnected:', clientId); }); client.write('Welcome client:' + clientId + '\r\n'); client.pipe(client); }); server.listen(8000, () => { console.log('Server started on port 8000'); });
這裏簡單介紹一下可能難理解的一段代碼:
client.pipe(client);
由於TCP/IP是一個duplex, 即既可輸入, 又可輸出的stream.
leicj@leicj:~/test$ telnet localhost 8000 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Welcome client:1 hello world hello world i love this wrold i love this wrold
若是咱們想要編寫一個低時延的實時應用, 則咱們須要socket.setNoDelay()去開啓TCP_NODELAY.
因爲Nagle's算法的存在, 小數據包會累積成大的數據包, 而後發送出去.
var net = require('net'); var server = net.createServer((c) => { // 設置非延時 c.setNoDelay(true); c.write('377375042377373001', 'binary'); console.log('server connected'); c.on('end', () => { console.log('server disconnected'); // 當沒有客戶端鏈接時候, 自動關閉 server.unref(); }); c.on('data', (data) => { process.stdout.write(data.toString()); c.write(data.toString()); }); }); server.listen(8000, () => { console.log('Server started on port 8000'); });
咱們嘗試使用UDP進行文件的傳輸. 咱們能夠使用dgram模塊來建立數據包, 經過socket.send來發送數據.
var dgram = require('dgram'), fs = require('fs'), port = 41230, defaultSize = 16; function Client(remoteIP) { var inStream = fs.createReadStream(__filename), socket = dgram.createSocket('udp4'); inStream.on('readable', () => { sendData(); }); function sendData() { var msg = inStream.read(defaultSize); if (!msg) return socket.unref(); socket.send(msg, 0, msg.length, port, remoteIP, (err, bytes) => { sendData(); }); } } function Server() { var socket = dgram.createSocket('udp4'); socket.on('message', (msg, rinfo) => { process.stdout.write(msg.toString()); }); socket.on('listening', () => { console.log('Server ready:', socket.address()); }); socket.bind(port); } if (process.argv[2] === 'client') { new Client(process.argv[3]); } else { new Server(); }
對上述代碼進行簡單的解釋: