Rx.js使用之結合node的讀寫流進行數據處理

前幾天接到任務要使用第三方API處理幾千張圖片,獲得結果集。個人作法就是使用Rx.js結合node的讀寫流來完成數據讀入、接口請求、數據處理、數據寫入這些操做。本篇就來分享這個代碼和其邏輯。node

Rx.js是什麼

Rx.js是一個響應式編程庫,能簡化事件/異步處理邏輯代碼。其視全部的事件/數據爲__流__,提供各類流處理的operators,將輸入與輸出平滑的連接起來。能夠類比爲linux上的pipe操做符: ls | grep a*b | lesslinux

Node的讀寫流

  • readline模塊提供readline.createInterface來建立行讀取流,即讀取文件的每一行做爲持續的輸入數據編程

  • fs模塊提供fs.createWriteStream來建立寫入流, 其返回的writerwriteend兩個方法,來完成流式的寫入與結束寫入。api

第三方接口的使用狀況

  • 併發數有限制,3個是出現其出現併發錯誤機率最低的最大併發數服務器

  • 接口請求過於頻繁,會較大機率出現連續的併發錯誤, 大概延遲400秒效果尚可併發

  • 提供給第三方的圖片是連接,其須要服務器本身下載,會出現操做超時或者長時間不返回的狀況。app

任務列表

  1. 從文件讀取圖片文件名less

  2. 拼接url異步

  3. 發送3個併發請求post

  4. 請求出現超時問題重試3次,最後若是失敗則放棄

  5. 出現非超時錯誤(如併發錯誤等)則一直重試,直到成功

  6. 請求成功後延遲400秒繼續發起下一個請求

  7. 處理返回的數據

  8. 寫入文件

代碼分析

引入依賴,建立讀取與寫入流

const https = require('https');
const querystring = require('querystring');
const Rx = require('rxjs');
const readline = require('readline');
const fs = require('fs');

const imgStream = readline.createInterface({  // 建立行讀取流
    input: fs.createReadStream('filelist.txt')
});

const writeStream = fs.createWriteStream('output.txt');  // 建立寫入流

使用Rx處理讀取並反饋結果給寫入

Rx.Observable.fromEvent(imgStream, 'line')  // 將行讀取流轉化爲Rx的事件流
.takeUntil(Rx.Observable.fromEvent(imgStream, 'close'))  // 讀取流截止時終止Rx流
.map(img => generateData(img))  // 將文件名處理成post的數據
 // 發起請求,併發3個,請求返回後延遲400ms後再進行下一步處理併發起下一個請求
.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) 
.subscribe(data => {
    // 處理數據並寫入文件
    let str = data.url;
    if (data.status === 200 && data.data.xxx.length) {
        zzz = data.data.xxx.map(x => x.zzz);
        str += `    ${JSON.stringify(zzz)}`;
    }
    writeStream.write(`${str}\n`);
}, (err) => {
    console.log(err);
    console.log('!!!!!!!!!!!ERROR!!!!!!!!!');
}, () => {
    console.log('=====complete======');
    writeStream.end();
});

其中的須要關注的點在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) ,這裏內部requestAPI返回一個封裝了http異步請求並延遲400ms的Rx流,當請求完成並延遲完成後將數據返回上一層繼續進行處理(能夠類比爲Promisethen)

使用Rx的自定義流封裝一個帶錯誤重試機制的http請求

const requestFacepp = dataStr => {
    const options = {
        hostname: 'api.xxx.com',
        port: 443,
        path: '/xxx',
        method: 'POST',
        headers: {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Content-Length': Buffer.byteLength(dataStr)
        }
    };
    const reqData = querystring.parse(dataStr);
    const retry$ = new Rx.Subject();  // 觸發重試的流,當其發出數據時會使`retryWhen`觸發重試錯誤流
    let retryTimes = 3;  // 設置非正常失敗(超時)重試的上限

    // 使用Rx的自定義流封裝一個帶錯誤重試機制的http請求,能夠類比爲new Promise
    // 但要注意的是Rx是流,即數據是能夠持續的,而Promise則只有一個結果和狀態
    return Rx.Observable.create(observer => {
        const req = https.request(options, res => {
            let data = '';
            res.setEncoding('utf8');
            res.on('data', chunk => {
                data += chunk;
            });
            res.on('end', () => {
                if (res.statusCode === 200) {
                    // 請求正常返回,向流內推送結果並結束
                    observer.next({
                        status: res.statusCode,
                        url: reqData.image_url,
                        data: JSON.parse(data)
                    });
                    observer.complete();
                } else {
                    // 請求正常返回,但不是正常結果,拋出錯誤並重試
                    console.log(`retring: ${reqData.image_url}`);
                    observer.error({
                        status: res.statusCode,
                        url: reqData.image_url
                    });
                    retry$.next(true);
                }
            });
        });

        req.setTimeout(4000, () => {
            // 設置請求4s超時,超時後終止,引起請求拋錯
            req.abort();
        });

        req.on('error', err => {
            console.log(`retring(${retryTimes}): ${reqData.image_url}`);
            // 請求拋錯時重試,超出次數則終止本次請求
            observer.error(`error: ${err.message}`);
            if (retryTimes > 0) {
                retryTimes--;
                retry$.next(true);
            } else {
                retry$.complete();
            }
        });

        req.write(dataStr);

        req.end();
        return () => { req.abort() };  // 返回終止流的處理回調
    })
    .retryWhen(errs => errs.switchMap(err => {
        // 未超過次數返回重試流,超出則返回錯誤數據並終止本次Rx流
        return retryTimes > 0 ? retry$ : Rx.Observable.of({
            status: 500,
            url: reqData.image_url
        });
    }));
};

收尾

到此就搬磚完畢,開個車讓他慢慢跑就能夠了。
本篇展現了Rx在流數據處理與異步處理上的方式,邏輯與代碼都挺清晰、扁平。在處理交雜的邏輯時也不錯(重試部分)。若是喜歡或者有幫助的話能夠後面在發一篇Rx在複雜DOM事件處理上的應用。;-)


本文始發於本人的公衆號:楓之葉。公衆號二維碼
640?wx_fmt=jpeg&wxfrom=5&wx_lazy=1

相關文章
相關標籤/搜索