前幾天接到任務要使用第三方API處理幾千張圖片,獲得結果集。個人作法就是使用Rx.js結合node的讀寫流來完成數據讀入、接口請求、數據處理、數據寫入這些操做。本篇就來分享這個代碼和其邏輯。node
Rx.js是一個響應式編程庫,能簡化事件/異步處理邏輯代碼。其視全部的事件/數據爲__流__,提供各類流處理的operators
,將輸入與輸出平滑的連接起來。能夠類比爲linux
上的pipe
操做符: ls | grep a*b | less
。linux
readline
模塊提供readline.createInterface
來建立行讀取流,即讀取文件的每一行做爲持續的輸入數據編程
fs
模塊提供fs.createWriteStream
來建立寫入流, 其返回的writer
有write
和end
兩個方法,來完成流式的寫入與結束寫入。api
併發數有限制,3個是出現其出現併發錯誤
機率最低的最大併發數服務器
接口請求過於頻繁,會較大機率出現連續的併發錯誤
, 大概延遲400秒效果尚可併發
提供給第三方的圖片是連接,其須要服務器本身下載,會出現操做超時或者長時間不返回的狀況。app
從文件讀取圖片文件名less
拼接url異步
發送3個併發請求post
請求出現超時問題重試3次,最後若是失敗則放棄
出現非超時錯誤(如併發錯誤等)則一直重試,直到成功
請求成功後延遲400秒繼續發起下一個請求
處理返回的數據
寫入文件
const https = require('https'); const querystring = require('querystring'); const Rx = require('rxjs'); const readline = require('readline'); const fs = require('fs'); const imgStream = readline.createInterface({ // 建立行讀取流 input: fs.createReadStream('filelist.txt') }); const writeStream = fs.createWriteStream('output.txt'); // 建立寫入流
Rx.Observable.fromEvent(imgStream, 'line') // 將行讀取流轉化爲Rx的事件流 .takeUntil(Rx.Observable.fromEvent(imgStream, 'close')) // 讀取流截止時終止Rx流 .map(img => generateData(img)) // 將文件名處理成post的數據 // 發起請求,併發3個,請求返回後延遲400ms後再進行下一步處理併發起下一個請求 .mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) .subscribe(data => { // 處理數據並寫入文件 let str = data.url; if (data.status === 200 && data.data.xxx.length) { zzz = data.data.xxx.map(x => x.zzz); str += ` ${JSON.stringify(zzz)}`; } writeStream.write(`${str}\n`); }, (err) => { console.log(err); console.log('!!!!!!!!!!!ERROR!!!!!!!!!'); }, () => { console.log('=====complete======'); writeStream.end(); });
其中的須要關注的點在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3)
,這裏內部requestAPI
返回一個封裝了http異步請求並延遲400ms的Rx流,當請求完成並延遲完成後將數據返回上一層繼續進行處理(能夠類比爲Promise
的then
)
const requestFacepp = dataStr => { const options = { hostname: 'api.xxx.com', port: 443, path: '/xxx', method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', 'Content-Length': Buffer.byteLength(dataStr) } }; const reqData = querystring.parse(dataStr); const retry$ = new Rx.Subject(); // 觸發重試的流,當其發出數據時會使`retryWhen`觸發重試錯誤流 let retryTimes = 3; // 設置非正常失敗(超時)重試的上限 // 使用Rx的自定義流封裝一個帶錯誤重試機制的http請求,能夠類比爲new Promise // 但要注意的是Rx是流,即數據是能夠持續的,而Promise則只有一個結果和狀態 return Rx.Observable.create(observer => { const req = https.request(options, res => { let data = ''; res.setEncoding('utf8'); res.on('data', chunk => { data += chunk; }); res.on('end', () => { if (res.statusCode === 200) { // 請求正常返回,向流內推送結果並結束 observer.next({ status: res.statusCode, url: reqData.image_url, data: JSON.parse(data) }); observer.complete(); } else { // 請求正常返回,但不是正常結果,拋出錯誤並重試 console.log(`retring: ${reqData.image_url}`); observer.error({ status: res.statusCode, url: reqData.image_url }); retry$.next(true); } }); }); req.setTimeout(4000, () => { // 設置請求4s超時,超時後終止,引起請求拋錯 req.abort(); }); req.on('error', err => { console.log(`retring(${retryTimes}): ${reqData.image_url}`); // 請求拋錯時重試,超出次數則終止本次請求 observer.error(`error: ${err.message}`); if (retryTimes > 0) { retryTimes--; retry$.next(true); } else { retry$.complete(); } }); req.write(dataStr); req.end(); return () => { req.abort() }; // 返回終止流的處理回調 }) .retryWhen(errs => errs.switchMap(err => { // 未超過次數返回重試流,超出則返回錯誤數據並終止本次Rx流 return retryTimes > 0 ? retry$ : Rx.Observable.of({ status: 500, url: reqData.image_url }); })); };
到此就搬磚完畢,開個車讓他慢慢跑就能夠了。
本篇展現了Rx在流數據處理與異步處理上的方式,邏輯與代碼都挺清晰、扁平。在處理交雜的邏輯時也不錯(重試部分)。若是喜歡或者有幫助的話能夠後面在發一篇Rx在複雜DOM事件處理上的應用。;-)
本文始發於本人的公衆號:楓之葉。公衆號二維碼