隊列對於任何語言來講都是重要的,io 的串行,請求的並行等等。在 JavaScript 中,又因爲單線程的緣由,異步編程又是很是重要的。昨天由一道面試題的啓發,我去實現 JS 中的異步隊列的時候,借鑑了 express 中間件思想,併發散到 co 實現 與 generator,以及 asyncToGenerator。javascript
本次用例代碼都在此,能夠 clone 下來試一下java
不少面試的時候會問一個問題,就是怎麼讓異步函數能夠順序執行。方法有不少,callback,promise,觀察者,generator,async/await,這些 JS 中處理異步編程的,均可以作到這種串行的需求。可是很麻煩的是,處理起來是挺麻煩的,你要不停的手動在上一個任務調用下一個任務。好比 promise,像這樣:git
a.then(() => b.then(() => c.then(...)))
代碼嵌套的問題,有點嚴重。因此要是有一個隊列就行了,往隊列裏添加異步任務,執行的時候讓隊列開始 run 就行了。先制定一下 API,咱們有一個 queue,隊列都在內部維護,經過 queue.add 添加異步任務,queue.run 執行隊列,能夠先想一想。es6
參照以前 express 中間件的實現,給異步任務 async-fun 傳入一個 next 方法,只有調用 next,隊列纔會繼續往下走。那這個 next 就相當重要了,它會控制隊列日後移一位,執行下一個 async-fun。咱們須要一個隊列,來保存 async-fun,也須要一個遊標,來控制順序。github
如下是個人簡單實現:面試
const queue = () => { const list = []; // 隊列 let index = 0; // 遊標 // next 方法 const next = () => { if (index >= list.length - 1) return; // 遊標 + 1 const cur = list[++index]; cur(next); } // 添加任務 const add = (...fn) => { list.push(...fn); } // 執行 const run = (...args) => { const cur = list[index]; typeof cur === 'function' && cur(next); } // 返回一個對象 return { add, run, } } // 生成異步任務 const async = (x) => { return (next) => {// 傳入 next 函數 setTimeout(() => { console.log(x); next(); // 異步任務完成調用 }, 1000); } } const q = queue(); const funs = '123456'.split('').map(x => async(x)); q.add(...funs); q.run();// 1, 2, 3, 4, 5, 6 隔一秒一個。
我這裏沒去構造一個 class,而是經過閉包的特性去處理的。queue 方法返回一個包含 add,run 的對象,add 即爲像隊列中添加異步方法,run 就是開始執行。在 queue 內部,咱們定義了幾個變量,list 用來保存隊列,index 就是遊標,表示隊列如今走到哪一個函數了,另外,最重要的是 next 方法,它是控制遊標向後移動的。算法
run 函數一旦執行,隊列即開始 run。一開始執行隊列裏的第一個 async 函數,咱們把 next 函數傳給了它,而後由 async 函數決定何時執行 next,即開始執行下一個任務。咱們沒有並不知道異步任務何時纔算完成,只能經過打成某種共識,來告知 queue 某個任務完成。就是傳給任務的 next 函數。其實 async 返回的這個函數,有一個名字,叫 Thunk,後面咱們會簡單介紹。express
thunk 實際上是爲了解決 「傳名調用」 的。就是我傳給函數 A 一個表達式做參數 x + 1,可是我不肯定這個 x + 1 何時會用到,以及會不會用到,若是在傳入就執行,這個求值是沒有必要的。因此就出現了一個臨時函數 Thunk,來保存這個表達式,傳入函數 A 中,待須要時再調用。編程
const thunk = () => { return x + 1; }; const A = thunk => { return thunk() * 2; }
嗯... 其實就是一個回調函數...數組
其實只要某個任務,不繼續調用 next,隊列就已經不會繼續往下走了。好比咱們 async 任務里加一個判斷(一般是異步 io,請求的容錯處理):
// queue 函數不變, // async 加限制條件 const async = (x) => { return (next) => { setTimeout(() => { if(x > 3) { console.log(x); q.run(); //重試 return; } console.log(x); next(); }, 1000); } } const q = queue(); const funs = '123456'.split('').map(x => async(x)); q.add(...funs); q.run(); //打印結果: 1, 2, 3, 4, 4,4, 4,4 一直是 4
當執行到第四個任務的時候,x 是 4 的時候,再也不繼續,就能夠直接 return,再也不調用 next。也有多是出現錯誤,咱們須要再重試,那就再調用 q.run 就能夠了,由於遊標保存的就是當前的 async 任務的索引。
另外,還有一種方式,就是添加 stop 方法。雖然感受上面的方法就 OK 了,可是 stop 的好處在於,你能夠主動的中止隊列,而不是在 async 任務里加限制條件。固然,有暫停就有繼續了,兩種方式,一個是 retry,就是從新執行上一次暫停的那個;另外一個就是 goOn,無論上次最後一個如何,繼續下一個。上代碼:
const queue = () => { const list = []; let index = 0; let isStop = false; const next = () => { // 加限制 if (index >= list.length - 1 || isStop) return; const cur = list[++index]; cur(next); } const add = (...fn) => { list.push(...fn); } const run = (...args) => { const cur = list[index]; typeof cur === 'function' && cur(next); } const stop = () => { isStop = true; } const retry = () => { isStop = false; run(); } const jump = () => { isStop = false; next(); } return { add, run, stop, retry, goOn, } } const async = (x) => { return (next) => { setTimeout(() => { console.log(x); next(); }, 1000); } } const q = queue(); const funs = '123456'.split('').map(x => async(x)); q.add(...funs); q.run(); setTimeout(() => { q.stop(); }, 3000) setTimeout(() => { q.goOn(); }, 5000)
其實仍是加攔截... 只不過從 async 函數中,換到了 next 函數裏面,利用 isStop 這個變量切換 true/false,開關暫停。我加了兩個定時器,一個是 3 秒後暫停,一個是 5 秒後繼續,(請忽略定時器的偏差),按道理應該是隊列到三秒的時候,也就是第三個任務執行完暫停,而後再隔 2 秒,繼續。結果打印到 3 的時候,停住,兩秒以後繼續 4,5,6.
兩種思路,請結合場景思考問題。
上面的都是在作串行,假如 run 的時候我要並行呢... 也很簡單,把隊列一次性跑完就能夠了。
// 爲了代碼短一些,把 retry,goOn 先去掉了。 const queue = () => { const list = []; let index = 0; let isStop = false; let isParallel = false; const next = () => { if (index >= list.length - 1 || isStop || isParallel) return; const cur = list[++index]; cur(next); } const add = (...fn) => { list.push(...fn); } const run = (...args) => { const cur = list[index]; typeof cur === 'function' && cur(next); } const parallelRun = () => { isParallel = true; for(const fn of list) { fn(next); } } const stop = () => { isStop = true; } return { add, run, stop, parallelRun, } } const async = (x) => { return (next) => { setTimeout(() => { console.log(x); next(); }, 1000); } } const q = queue(); const funs = '123456'.split('').map(x => async(x)); q.add(...funs); q.parallelRun(); // 一秒後所有輸出 1, 2, 3, 4, 5, 6
我添加了一個 parallelRun 方法,用於並行,我以爲仍是不要放到 run 函數裏面了,抽象單元儘可能細化仍是。而後還加了一個 isParallel 的變量,默認是 false,考慮到 next 函數有可能會被調用,因此須要加一個攔截,保證不會處亂。
以上就是利用僅用 thunk 函數,結合 next 實現的異步隊列控制器,queue,跟你能夠把 es6 代碼都改爲 es5,保證兼容,固然是足夠簡單的,不適用於負責的場景 ?,僅提供思路。
爲何要介紹 generator,首先它也是用來解決異步回調的,另外它的使用方式也是調用 next 函數,generator 纔會往下執行,默認是暫停狀態。yield 就至關於上面的 q.add,往隊列中添加任務。因此我也打算一塊兒介紹,來更好的拓寬思路。發散思惟,類似的知識點作好概括,而後某一天你就會忽然有一種:原來是這麼回事,原來 xxx 是借鑑子 yyy,而後你又去研究 yyy - -。
簡單介紹回顧一下,由於有同窗不常常用,確定會有遺忘。
// 一個簡單的栗子,介紹它的用法 function* gen(x) { const y = yield x + 1; console.log(y, 'here'); // 12 return y; } const g = gen(1); const value = g.next().value; // {value: 2, done: false} console.log(value); // 2 console.log(g.next(value + 10)); // {value: 12, done: true}
首先生成器其實就是一個經過函數體內部定義迭代算法,而後返回一個 iterator 對象。關於iterator,能夠看我另外一篇文章。
gen 執行返回一個對象 g,而不是返回結果。g 跟其餘 iterator 同樣,經過調用 next 方法,保證遊標 + 1,而且返回一個對象,包含了 value(yield 語句的結果),和 done(迭代器是否完成)。另外,yield 語句的值,好比上面代碼中的 y,是下一次調用 next 傳入的參數,也就是 value + 10,因此是 12.這樣設計是有好處的,由於這樣你就能夠在 generator 內部,定義迭代算法的時候,拿到上次的結果(或者是處理後的結果)了。
可是 generator 有一個弊端就是不會自動執行,TJ 大神寫了一個 co,來自動執行 generator,也就是自動調用 next。它要求 yield 後面的函數/語句,必須是 thunk 函數或者是 promise 對象,由於只有這樣纔會串聯執行完,這跟咱們最開始實現 queue 的思路是同樣的。co 的實現有兩種思想,一個是 thunk,一個是 promise,咱們都來試一下。
還記得最開始的 queue 怎麼實現的嗎,內部定義 next 函數,來保證遊標的前進,async 函數會接收 next,去執行 next。到這裏是同樣的,咱們只要在 co 函數內部定義一個一樣的 next 函數,來保證繼續執行,那麼 generator 是沒有提供索引的,不過它提供了 g.next 函數啊,因此咱們只須要給 async 函數傳 g.next 不就行了,async 就是 yield 後面的語句啊,也就是 g.value。可是並不能直接傳 g.next,爲何?由於下一次的 thunk 函數,要經過 g.next 的返回值 value 取到啊,木有 value,下一個 thunk 函數不就沒了... 因此咱們仍是須要定義一個 next 函數去包裝一下的。
上代碼:
const coThunk = function(gen, ...params) { const g = gen(...params); const next = (...args) => { // args 用於接收參數 const ret = g.next(...args); // args 傳給 g.next,即賦值給上一個 yield 的值。 if(!ret.done) { // 去判斷是否完成 ret.value(next); // ret.value 就是下一個 thunk 函數 } } next(); // 先調用一波 } // 返回 thunk 函數的 asyncFn const asyncFn = (x) => { return (next) => { // 接收 next const data = x + 1; setTimeout(() => { next && next(data); }, 1000) } } const gen = function* (x) { const a = yield asyncFn(x); console.log(a); const b = yield asyncFn(a); console.log(b); const c = yield asyncFn(b); console.log(c); const d = yield asyncFn(c); console.log(d); console.log('done'); } coThunk(gen, 1); // 2, 3, 4, 5, done
這裏定義的 gen,功能很簡單,就是傳入參數 1,而後每一個 asyncFn 異步累加,即多個異步操做串行,而且下一個依賴上一個的返回值。
其實思路都是同樣的,只不過調用 next,換到了 co 內部。由於 yield 後面的語句是 promise 對象的話,咱們能夠在 co 內部拿到了,而後在 g.next().value
的 then 語句執行 next 就行了。
// 定義 co const coPromise = function(gen) { // 爲了執行後的結果能夠繼續 then return new Promise((resolve, reject) => { const g = gen(); const next = (data) => { // 用於傳遞,只是換個名字 const ret = g.next(data); if(ret.done) { // done 後去執行 resolve,即co().then(resolve) resolve(data); // 最好把最後一次的結果給它 return; } ret.value.then((data) => { // then 中的第一個參數就是 promise 對象中的 resolve,data 用於接受並傳遞。 next(data); //調用下一次 next }) } next(); }) } const asyncPromise = (x) => { return new Promise((resolve) => { setTimeout(() => { resolve(x + 1); }, 1000) }) } const genP = function* () { const data1 = yield asyncPromise(1); console.log(data1); const data2 = yield asyncPromise(data1); console.log(data2); const data3 = yield asyncPromise(data2); console.log(data3); } coPromise(genP).then((data) => { setTimeout(() => { console.log(data + 1); // 5 }, 1000) }); // 同樣的 2, 3, 4, 5
其實 co 的源碼就是經過這兩種思路實現的,只不過它作了更多的 catch 錯誤的處理,並且支持你 yield 一個數組,對象,經過 promise.all 去實現。另外 yield thunk 函數的時候,它統一轉成 promise 去處理了。感興趣的能夠去看一下 co,相信如今必定很明朗了。
如今 JS 中用的最經常使用的異步解決方案了,不過 async 也是基於 generator 的實現,只不過是作了封裝。若是把 async/await 轉化成 generate/yield,只須要把 await 語法換成 yield,再扔到一個 generate 函數中,async 的執行換成 coPromise(gennerate) 就行了。
const asyncPromise = (x) => { return new Promise((resolve) => { setTimeout(() => { resolve(x + 1); }, 1000) }) } async function fn () { const data = await asyncPromise(1); console.log(data); } fn(); // 那轉化成 generator 可能就是這樣了。 coPromise 就是上面的實現 function* gen() { const data = yield asyncPromise(1); console.log(data); } coPromise(gen);
asyncToGenerator 就是這樣的原理,事實上 babel 也是這樣轉化的。
我首先是經過 express 的中間件思想,實現了一個 JS 中需求常見的 queue (異步隊列解決方案),而後再接着去實現一個簡單的 coThunk,最後把 thunk 換成 promise。由於異步解決方案在 JS 中是很重要的,去使用現成的解決方案的時候,若是能去深刻思考一下實現的原理,我相信是有助於咱們學習進步的。
歡迎 star 我的 blog:https://github.com/sunyongjia... ?