Nodejs 實時監控文件內容的變化及按行讀取文本文件

需求問題

在使用nodejs開發項目的過程當中,有一個需求須要實時監控指定文件的變動並按行讀取最新的文件內容,在nodejs中有相應地API麼?具體怎麼使用呢~~~node

fs.watchFile-實時監控文件變化

nodejs的fs模塊提供了watchFile方法,在文件內容每次發生變化時就觸發相應回調函數。回調函數提供2個參數-當前的文件狀態對象fs.state和文件發生變化時的文件狀態fs.state,比較2者的最後修改時間就能知道文件內容是否發生過變化。unwatchfile取消對指定文件的監控數據庫

fs.read/fs.createReadStream-讀取文件內容

在nodejs中讀取文件內容最基本的API爲fs.read,也能夠經過建立一個文件流,監聽data/end事件讀取文件內容緩存

  • fs.read方法能夠自由的控制要讀取多少內容從那個位置讀取,對於一個大文件來講須要反覆調用才能獲取所有內容。app

    fs.read(fd, buffer, offset, length, position, callback)
    • fd爲打開的文件描述符,也就數文件對象函數

    • buffer指定這次read操做讀取的數據寫入的緩存區對象ui

    • offset讀取的數據從緩衝區對象的哪一個位置開始寫入this

    • length這次read操做想要讀取的數據長度code

    • position指定從fd的哪一個位置開始讀取數據,指定爲null,那麼數據此文件的當前位置開始讀取對象

    • callback 函數提供3個參數err、bytesRead,buffer-是否發生了錯誤,本地讀到的字節長度,保存讀到的數據的緩衝區接口

  • fs.createReadStream 文件流的方式則用經過事件監聽的方式順序的讀取文件內容,沒法從指定的位置開始讀取文件內容

    var fileReadStream=fs.createReadStream(filename,{
        encoding:'utf8'
    });
    var fileContent="";
    fileReadStream.on('data',functon(data){
        fileContent+=data;
    });
    fileReadStream.on('end',function(){
        console.log('file read ');
    });

Readline模塊-按行讀取文件內容

nodejs提供readline模塊按行讀取文件流數據內容的方法。經過建立一個readline的interface對象,監聽readline事件就能夠獲取input輸入流的行數據。當輸入流讀取完畢後將觸發close事件

var readline = require('readline');
var rl = readline.createInterface({
  input: fs.createReadStream(filename,{
      enconding:'utf8'
  }),
  output: null
});

rl.on('readline',function(line){
    console.log('got line content:%s',line);
});

解決方案

文件內容變化的監控用fs.watchFile方法實現,最新內容的讀取就只有fs.read方法可使用了。
由於不管是使用readline模塊仍是使用fs.createReadStream文件流接口都沒法從特定的位置(也就是上次文件內容的最後更新位置)開始讀取數據,只能從頭開始讀取文件內容,這個對於-文件監控並讀取最新的內容-這個需求來講是不合適也是徹底沒有效率的。
使用fs.read方法須要提供一種機制可以按行解析出數據而且可以觸發消息通知監聽者,有行數據讀取完成。而且在讀取到的數據長度不夠獲取沒有檢測到換行標識符(\r\n)時,保留這些數據。

最終的方案是這樣的:

  1. fs.watchFile檢查到文件內容發生了變化

  2. 調用fs.read方法讀取指定的字節數據到到buffer緩衝對象中

  3. 將獲取的字節數據和上次read行解析後遺留的內容合併成一個新的buffer對象

  4. 將buffer對象經過換行符解析出行數據

  5. 每解析出一條行數據,就觸發一個line事件,通知監聽者已讀到新的行數據

  6. 保留行解析後遺留數據

  7. 若是本次read讀取到的實際數據長度小於buffer緩衝區長度,說明已經到達文件的末尾,沒有更多地數據可以讀取到了。回到1 等到內容變化的通知

  8. 不然回到2,並從上次讀取的最後位置開始讀取

  9. 若是讀取行數據內容爲===END===或者行解析後的遺留內容爲===END===,那麼將調用fs.unwatchFile中止文件內容的監控並再也不調用fs.read

代碼

var fs=require('fs');
var EM = require("events").EventEmitter;
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var newlines = [
    //13, // \r
    10  // \n
];
function createLineReader(fileName) {
    if (!(this instanceof createLineReader)) return new createLineReader(fileName,monitorFlag);
    var self=this;
    var currentFileUpdateFlag=0;
    var fileOPFlag="a+";
    fs.open(fileName,fileOPFlag,function(error,fd){
        var buffer;
        var remainder = null;
           fs.watchFile(fileName,{
           persistent: true,
           interval: 1000
        },function(curr, prev){
           //console.log('the current mtime is: ' + curr.mtime);
           //console.log('the previous mtime was: ' + prev.mtime);
           if(curr.mtime>prev.mtime){
               //文件內容有變化,那麼通知相應的進程能夠執行相關操做。例如讀物文件寫入數據庫等
               continueReadData();
           }else{
               //console.log('curr.mtime<=prev.mtime');
           }

           });
            
        //先讀取原來文件中內容
        continueReadData();

        function continueReadData(){
            //var fileUpdateFlag=fileUpdateFlagIn;
            buffer=new Buffer(2048);
            var start = 0,i=0,tmp;
            fs.read(fd,buffer,0,buffer.length,null,function(err, bytesRead, buffer){

                var data=buffer.slice(0,bytesRead)
                if(remainder != null){//append newly received data chunk
                    //console.log("remainder length:"+remainder.length);
                    tmp = new Buffer(remainder.length+bytesRead);
                    remainder.copy(tmp);
                    //data=buffer.slice(0,bytesRead);
                    data.copy(tmp,remainder.length)
                    data = tmp;
                }
                //console.log("data length:"+data.length);
                for(i=0; i<data.length; i++){
                    if(newlines.indexOf(data[i]) >=0){ //\r \n new line
                        var line = data.slice(start,i);
                        self.emit("line", line);
                        start = i+1;
                    }
                }

                if(start<data.length){
                    remainder = data.slice(start);
                    if(remainder.toString()==='===END==='){
                        self.emit("end");
                        stopWatch();
                        return;
                    }
                }else{
                    remainder = null;
                }

                if(bytesRead<buffer.length){
                    return;
                }else{
                    //console.log('~~continue~~');
                    continueReadData();
                }
            });


        }

        function stopWatch(){
            fs.unwatchFile(fileName);
        }
    });

}

util.inherits(createLineReader, EventEmitter);

module.exports=createLineReader;
相關文章
相關標籤/搜索