記一次導出 CSV 產生的研究

引入

工做中作一個將數據庫數據導出爲 CSV 格式的功能,開發環境下一切很是正常,而後滿意地上到測試環境進行測試,結果測試同窗反饋導出大量數據時 Chrome 直接報錯網絡錯誤,我就很納悶,本身開發的時候也用了大數據量來測試,爲何沒有出問題,一句 It works on my computer 就要脫口而出了,本着嚴謹的原則仍是本身跑測試環境測試了一下,發現只要數據量達到100萬條左右時幾乎每一次導出都會報錯。javascript

一開始先想到是否是網絡超時,但這個下載過程實際上是一直都有數據傳輸的,不該該觸發超時設定纔對,否則其餘下載文件的過程也應該超時了,所以須要另尋思路查一查這個問題。java

尋找線索

Stack Overflow 上面搜索一番,主要說法是如下兩種:數據庫

  1. ChromePredict network actions to improve page load performance 選項
  2. 殺毒軟件不兼容

兩種解決方式嘗試無果,並且一看就不像正確答案/捂臉express

因而用了一個偏門的方法,把這個導出接口的響應頭部設置爲瀏覽器

Content-Disposition: inline;緩存

即把接口傳輸出來的 csv 數據直接當作網頁顯示在瀏覽器裏面,也可以藉助 Chrome DevTools 看看詳細的報錯信息。服務器

爲了模擬耗時較長的下載過程,將 Chrome DevToolsNetwork 面板的 Throttle 選項打開,設置成 Slow 3G ,這樣能夠模擬測試環境外網的網絡速度。網絡

測試了四次,有三次在 7 分鐘左右報錯,一次在 10 分鐘左右停下來,Chrome 的下載結果只顯示一個 失敗-網絡錯誤,但打開控制檯就發現線索了,每次的報錯都是同樣的:數據結構

net::ERR_INCOMPLETED_CHUNK_ENCODING 200異步

藉着這個報錯信息再搜索 Stack Overflow ,彷佛發現了問題所在:

This is typically caused by a server not sending us the terminal 0-length chunk. We sit around waiting for more data with the request hung until the server closes the socket. At that point, we have no way to know whether we've received the entire file or not. This seems to be working as intended. The server needs to be fixed.

附上連接 Bug Chrome

大概意思是服務器沒有發送表明結束的最後一個 0-length chunk ,至關於響應的數據是不完整的,這個與報錯信息中的 INCOMPLETED 一致。

分析緣由

線索有了,來看一下業務代碼:

// res 指的是 Express 的 Response 對象
const stream = Model.find({}).stream();

stream.on("data", data => {
    // do some stuff
    res.write(data);
});

stream.on("error", err => {
   	// process and log the error 
});

stream.on("end", () => {
    res.end();
});
複製代碼

這裏須要介紹一下相關的概念,首先 stream 是一個 mongoose 的可讀流,resExpressResponse 對象,stream 是一個可讀流,res 是一個可寫流。咱們知道一般讀取數據的速度都要快於寫入數據的速度,所以爲了維持讀寫速度的平衡,從可讀流讀取出來的數據會在緩衝區堆積,等待可寫流空閒時再繼續寫入操做。

express 流式寫入 response 對象在 http 協議層面使用的是 Transfer-Encoding 的首部:

Transfer-Encoding: Chunked

對於這個首部,http 協議有如下規定:

The terminating chunk is a regular chunk, with the exception that its length is zero.

在最終的分塊傳輸完成以後,須要再傳輸一個零長度的塊,告知客戶端流數據已經傳輸完畢。

以上代碼的問題是,沒有考慮到讀寫速度的平衡,只考慮了數據是否被讀取出來,實際上從可讀流讀取出來的數據可能大量積壓,此時可讀流 stream 觸發 end 事件時,數據並不必定徹底寫入到了 res 流中,TCP Socket 在把緩衝區的數據經過網絡發送出去以後並不會當即把緩衝區的數據刪除,而是須要等待對端的 ACK 報文到達纔會真正把數據刪除,若是網絡狀況很差,則對端的 ACK 報文可能須要很長時間才能到達,此時緩衝區很快就滿了,而且沒法再寫入,在測試環境使用外網時,網絡狀況較差,express 寫入到 TCP Socket 寫緩衝區的數據都被阻塞,此時調用 res.end() 方法會致使實際上寫入的數據並不完整,而且最後的零長度分塊也沒寫入,客戶端天然就斷定爲數據接收不完整,就報錯了,這也是爲何開發時內網環境沒有出現這種狀況的緣由。

Backpressure的簡單介紹

這裏涉及到一個概念: Backpressure

一般在數據處理的時候咱們會遇到一個廣泛的問題:背壓,意思是在數據傳輸過程當中有一大堆數據在緩存以後積壓着。每次當數據到達結尾又遇到複雜的運算,又或者不管什麼緣由它比預期的慢,這樣累積下來,從源頭來的數據就會變得很龐大,像一個塞子同樣堵塞住。

咱們須要一個合理的機制來處理這種狀況,實際上 Node.js 已經提供了該類問題的解決方案。

  1. 在調用可寫流的 write 方法時,該方法會根據緩衝區的數據堆積狀況來肯定返回結果,若能夠繼續寫入則返回 true,若暫時沒法繼續寫入則返回 false(緣由多是寫隊列繁忙或者讀取出來的數據塊太大)。在該方法返回 false 時,不該該繼續調用 write 方法,而是應該等待 drain 事件發出以後再繼續。
  2. 使用流的 pipe 方法,將讀寫平衡的控制交給 Node.js 本身來完成。

可是實際上 pipe 方法使用的也是第一種方案,只不過 Node.js 已經幫咱們作了這些處理了,在 ReadableStream 的源碼中有這麼一段

src.on('data', ondata);
function ondata(chunk) {
    debug('ondata');
    const ret = dest.write(chunk);
    debug('dest.write', ret);
    if (ret === false) {
        // If the user unpiped during `dest.write()`, it is possible
        // to get stuck in a permanently paused state if that write
        // also returned false.
        // => Check whether `dest` is still a piping destination.
        if (!cleanedUp) {
            if (state.pipes.length === 1 && state.pipes[0] === dest) {
                debug('false write response, pause', 0);
                state.awaitDrainWriters = dest;
                state.multiAwaitDrain = false;
            } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
                debug('false write response, pause', state.awaitDrainWriters.size);
                state.awaitDrainWriters.add(dest);
            }
            src.pause();
        }
        if (!ondrain) {
            // When the dest drains, it reduces the awaitDrain counter
            // on the source. This would be more elegant with a .once()
            // handler in flow(), but adding and removing repeatedly is
            // too slow.
            ondrain = pipeOnDrain(src, dest);
            dest.on('drain', ondrain);
        }
    }
}
複製代碼

src 流也就是管道的源頭髮起 data 事件時,執行 ondata 回調,調用 dest.write 方法寫入數據到目標流也就是管道的另外一端,判斷返回值若爲 falsecleanedupfalse 表示仍然有事件監聽器在監聽 src 上的事件),表示此時寫入的目標流沒法繼續寫入,因此須要暫停數據的寫入過程,同時保存須要等待 drain 事件的可寫流對象,併爲該可寫流添加 drain 事件監聽器,它的具體實現是這樣的:

function pipeOnDrain(src, dest) {
  return function pipeOnDrainFunctionResult() {
    const state = src._readableState;

    // `ondrain` will call directly,
    // `this` maybe not a reference to dest,
    // so we use the real dest here.
    if (state.awaitDrainWriters === dest) {
      debug('pipeOnDrain', 1);
      state.awaitDrainWriters = null;
    } else if (state.multiAwaitDrain) {
      debug('pipeOnDrain', state.awaitDrainWriters.size);
      state.awaitDrainWriters.delete(dest);
    }

    if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
      EE.listenerCount(src, 'data')) {
      state.flowing = true;
      flow(src);
    }
  };
}
複製代碼

監聽到 drain 事件以後就恢復源也就是可讀流的流動狀態,繼續讀取數據,注意這裏若是是一個可讀流寫入多個可寫流,則必須等待全部可寫流都監聽到 drain 事件後才能恢復流動。

結合實際談談用法

結合我本身的在這一塊的工做內容,對於以上兩種解決方案,說說我本身的見解。首先說說第二種,平時工做上使用流導出 CSV 的方式一般是生成一個 MongoDB 的可讀流,而後在 data 事件的回調中作數據結構的轉換和處理,再寫入 res 流中,若是使用 pipe 方法,則須要本身寫一個 Transform 轉換流,而後這樣調用:

const { Transform } = require('stream');

class MyTransform extends Transform {
    ...
}

// export.js
const stream = Model.find({}).stream();
const transform = new MyTransform();

stream.pipe(transform).pipe(res);

// 發生錯誤時斷開管道
stream.on('error', err => {
    ...
});

transform.on('error', err => {
    ...
});
複製代碼

由於導出時常常須要根據第一次查到的數據去其餘表或庫查另外的數據,因此須要在處理數據時發起異步請求,若是須要在 Transform 流中作異步操做,則須要本身寫不一樣的 Transform 流的子類,很是麻煩,因此一般在不須要異步查其餘數據時使用這種解決方式。

那麼須要異步查詢其餘數據庫或表時怎麼作呢,一般用第一種解決方式,比較靈活,具體寫法以下:

const stream = Model.find({}).stream();

stream.on('data', data => {
    asyncOperation(data).then(result => {
        const finalData = doSomething(result);
        let writable = res.write(finalData);
        if (!writable) {
            stream.pause();
        }
    });
});


// 緩衝區空閒時流對象會發起 drain 事件,表明此時能夠繼續寫入了
res.on('drain', () => {
    stream.resume();
});
複製代碼

結語

按照上述方案修改以後,這個困擾了我一個多月的問題成功解決了,順帶收穫了一波知識點,舒坦。 萌新第一次發掘金,以上有寫錯的地方,歡迎大佬指正。

相關文章
相關標籤/搜索