預覽。javascript
先給出一個基礎類代碼。java
const EventEmitter = require('events') const debug = require('debug')('transform') class Transform extends EventEmitter { constructor (options) { super() this.concurrency = 1 Object.assign(this, options) this.pending = [] this.working = [] this.finished = [] this.failed = [] this.ins = [] this.outs = [] } push (x) { this.pending.push(x) this.schedule() } pull () { let xs = this.finished this.finished = [] this.schedule() return xs } isBlocked () { return !!this.failed.length || // blocked by failed !!this.finished.length || // blocked by output buffer (lazy) this.outs.some(t => t.isBlocked()) // blocked by outputs transform } isStopped () { return !this.working.length && this.outs.every(t => t.isStopped()) } root () { return this.ins.length === 0 ? this : this.ins[0].root() } pipe (next) { this.outs.push(next) next.ins.push(this) return next } print () { debug(this.name, this.pending.map(x => x.name), this.working.map(x => x.name), this.finished.map(x => x.name), this.failed.map(x => x.name), this.isStopped()) this.outs.forEach(t => t.print()) } schedule () { // stop working if blocked if (this.isBlocked()) return this.pending = this.ins.reduce((acc, t) => [...acc, ...t.pull()], this.pending) while (this.working.length < this.concurrency && this.pending.length) { let x = this.pending.shift() this.working.push(x) this.transform(x, (err, y) => { this.working.splice(this.working.indexOf(x), 1) if (err) { x.error = err this.failed.push(x) } else { if (this.outs.length) { this.outs.forEach(t => t.push(y)) } else { if (this.root().listenerCount('data')) { this.root().emit('data', y) } else { this.finished.push(y) } } } this.schedule() this.root().emit('step', this.name, x.name) }) } } } module.exports = Transform
這段代碼目前仍是雛形。node
Transform
類的設計相似node裏的stream.Transform
,可是它的設計目的不是buffering或流性能,而是做爲併發編程的基礎模塊。程序員
若是你熟悉流式編程,Transform
的設計就很容易理解;在內部,Transform
維護四個隊列:編程
pending
是input bufferapi
working
是當前正在執行的任務數組
finished
是output buffer,它的目的不是爲了buffer輸出,而是在沒有其餘輸出辦法的時候做一下buffer。restful
failed
是失敗的任務併發
Transform
能夠組合成DAG(Directed Acyclic Graph)使用,ins
和outs
用來存儲前置和後置Transform
的引用,pipe
方法負責設置這種雙向連接;最多見的狀況是雙向鏈表,即ins
和outs
都只有一個對象。但把他們設計成數組就能夠容許fan-in, fan-out的結構。異步
push
和pull
是write和read的等價物。
schedule
是核心函數,它的任務是填充working
隊列。在構造函數的參數裏應該提供一個名字爲transform
的異步函數,schedule
使用這個函數運行任務,在運行結束後,根據結果把任務推到failed
隊列、推到下一個Transformer
、用root節點的emit輸出、或者推到本身的finished
隊列裏。
Transform
設計的核心思想,就是把併發任務的狀態,不使用對象屬性來編碼,只使用隊列位置來編碼;任何一個子任務,在任什麼時候刻,僅存在於一個Transform
對象的某個隊列中。換句話說,它等於把併發任務用資源來建模。若是你熟悉restful api對過程或狀態的建模方式就很容易理解這一點。
在
Transform
中,任何transform
異步函數的返回,都是一個step
;step
是用Transform
實現併發組合的最重要概念;
每一次transform
函數返回,都會發生改變本身的隊列或向後續的Transform
對象push
任務的動做,這個push
動做會觸發後續Transform
的schedule
方法;step
結束時本身的schedule
方法也會被調用,它會從新填充任務。在這些動做結束後,全部Transform
的隊列變化,就是整個組合任務狀態機的下一個狀態。
這個狀態是顯式的,能夠打印出來看,對debug很是有幫助;雖然異步i/o會讓這種狀態具備不肯定性,但至少這裏堅持了組合狀態機模型在處理併發問題時的同步原則,每一個step
結束時總體作一次狀態遷移,這個狀態遷移能夠良好定義和觀察,這是Event模型下併發編程和Thread模型的重要區別。後者遇到併發邏輯引發的微妙錯誤時,很難捕捉現場分析,由於每個Thread是黑盒。
從transform
返回開始到emit(step)
之間的一連串連鎖動做都是中間過程,最終實現一次完整的狀態遷移,這個過程必須是同步的。不該在這裏出現異步、setImmediate或者process.nextTick等調用,這會帶來額外的不肯定因素和極難發現和修復的bug。
在前面很長一段時間的併發編程實踐中,我指出過Promise的race/settle和錯誤處理邏輯在一些場景下的困難。Promise的過程邏輯不完備。我也花了不少力氣試圖在Process代數層面上把error, success, finish, race, settle, abort, pause, resume, 和他們的組合邏輯定義出來,但最終發現這很困難,由於實際編程中各類處理狀況太多了。
因此在Transform
的設計中,這些邏輯所有被拋棄了,由於事實上它們都不是真正的基礎併發邏輯。
Transform
試圖實現組合的基礎併發邏輯只有一個:stopped
。stopped
的定義很是簡單:在一次step
結束時,全部的Transform
的working
隊列爲空,就是(總體的)stopped
。這裏要再次強調前述的step
結束時同步方法的必要性,若是你在schedule
裏使用了異步方法調用,那麼這個stopped
的判斷就多是錯的,由於schedule
可能會在event loop裏放置了一個立刻就會產生新的working
任務的動做,而isStopped()
的判斷就錯了。
stopped
時,總體組合狀態多是success, error, paused, 等等,都不難判斷,但目前代碼還沒有穩定,我不打算加入語法糖。
在blocking i/o和同步的編程模式下,因果鏈和代碼書寫形式是一致的,可是在異步編程下,因果是異步和併發的,你只能去改變因,而後去觀察果,這是不少程序員不適應異步編程的根本緣由,由於它要改變思惟的習慣。
使用Transform
來處理併發編程,仍然是在試圖重建這個因果鏈,即便他們是併發的,可是咱們要有一個辦法把他們串起來;
前面說到的isStopped()
是觀察到的果,可以影響它的因,是isBlocked()
函數,這個函數在schedule
中被調用,若是估值爲true
,就會阻止schedule
繼續向working
隊列調度任務。
這裏寫的isBlocked()
的代碼實現只是一個例子;能夠阻止schedule
的緣由可能有不少,好比出現錯誤,或者輸出buffer滿了,這些能夠由實現者本身去定義。他們是policy,isBlocked()
自己是mechanism。這個策略的粒度是每一個Transform
對象均可以有本身的策略。好比一個刪除臨時文件的操做,結果是無關痛癢的,那麼它不應由於error就block。
isBlocked()
邏輯能夠象示例代碼裏那樣向下chain起來,即只要有後續任務block了,前置任務就該停下來;這在絕大多數狀況下都是合理的邏輯。由於雖然咱們寫的是流式處理辦法,可是咱們不是在處理octet-stream,追求性能的buffering和flow control都沒什麼意義,若是前面任務在copy文件後面的任務要移動到目標文件夾,若是目標文件夾出了問題前面快速移動了大量文件最終也沒法成功。
若是組合狀態機中止了,向其中的任何一個Transform
對象執行push或者pull操做均可以讓整個狀態機繼續動起來。從root節點push
是常見狀況,從leaf節點pull
也是,向中間節點push
也是可能的;
資源建模的一個好處是你能夠把狀態呈現給用戶,若是一個複製文件的任務由於文件名衝突而fail,你還可讓用戶選擇處理策略,例如覆蓋或者重命名,在用戶選擇了操做以後,代碼會從某個Transform
對象的failed
隊列中取走一個對象,修改策略參數後從新push進去,那麼這個狀態機能夠繼續執行下去;這種可處理的錯誤不應成爲block整個狀態機工做(複製其餘文件和文件夾)的緣由,除非他們積累到可觀的數量,在Transform
模式下這些都很是容易實現,開發者能夠很簡單的編寫isBlocked()
的策略;
和node的stream同樣,Transform
是lazy的,純粹的push machine可能會在中間節點buffer大量的任務,這對把任務做爲流處理來講是不合適的;同時,Lazy對於停下來的組合狀態機能繼續run起來很重要,pull
方法就是這個設計目的,它的schedule
邏輯和push
同樣,只是方向相反;若是設置了Leaf節點會由於輸出緩衝而block,它就能夠block整個狀態機(或者其中的一部分),這在某些狀況下也是有用的功能,若是整個狀態機的輸出由於某種緣由暫時沒法被馬上消費掉。
abort
邏輯沒有在代碼中實現,但它很容易,能夠遍歷全部的Transform
,若是working
隊列中的對象有abort
方法,就調用它;這不是個當即的停止,該對象仍然要經過callback返回才能stop。若是要全局的block,能夠把全部的Leaf Node都pipe到一個sink節點去,把這個sink節點強制設置成isBlocked,能夠block所有。pause
和resume
也是很是相似的邏輯。
固然你可能會遇到相似finally的邏輯是必須去執行的,即便在發生錯誤的時候,它意味着這個Transform
要向前傳遞isBlocked
信息,可是它的Schedule方法沒必要中止工做。它能夠一直運行到把全部隊列任務都處理完爲止。
重載schedule
方法也是可能的;例如你的任務之間有先後依賴的邏輯,你就能夠重載schedule
方法實現本身的調度方式。另外這裏的schedule
代碼只基於transform函數,很顯然若是transform自己是一個Transform
對象它也應該工做,實現組合過程,包括Sequencer,Parallel等等,這些都是須要實現的。
總而言之,isBlocked
和schedule
是分開的邏輯,它們有各自不一樣的設計目的和使命,你能夠重載它們得到本身想要的結果。因此寫在這裏的代碼,重要的不是他們的實現,而是其機制設計和界面設計,以及接口承諾;全部邏輯都是足夠原子化的,每一個函數只作一件事,isBlocked
是因,能夠根據須要選擇策略,isStopped
是果,經過step觀察和實現後續邏輯。應該避免經過向基類添加新方法來擴展能力,由於Transform
使用隊列和任務描述狀態,這個描述是完備的,機制也是完善的。
就像我在另外一篇介紹JavaScript語言的文章裏寫的同樣,若是針對問題的模型具備完備性,即便抽象,也能夠經過組合基本操做和概念得到更多的特性,而不是在模型上增長概念,除非你認爲模型不夠完備。
軟件工程中不是什麼地方都要上狀態機(automaton)這麼嚴格的模型工具,項目軟件裏寫到bug數量足夠低就能夠了,可是若是你要寫系統軟件或者對正確性有苛刻要求的東西,若是你沒有用狀態機建模,那麼實際上你沒有完備設計。
固然有了完備設計也不意味着軟件沒bug了,但一個好的設計可讓你對問題的理解、遇到問題時找到緣由,有極大的幫助。
在複雜系統中,上述的同步方法狀態機組合,和Hierarchical的狀態機組合,是咱們目前已知的兩種具備完備性的模型方法。可是二者不一樣。雖然Transform
的組合看起來是一個Hierarchy,可是它就像你在紙上畫一棵樹,它仍然是二維的,每一個step
的總體狀態聯動的遷移只是在populate一次狀態遷移的範圍,並非幾何級數的增長狀態組合;因此咱們仍然能夠構築一個線性的因果鏈,每一個step
因果因果這樣的繼續下去,和沒有併發的狀態機是同樣。
本質上這是數學概括法:若是咱們能證實若是n正確,那麼n+1是正確的,這就能夠證實chain下去的狀態組合即便是無窮也是正確的。
第二段代碼是使用的一個示例,這個class沒有必要,是爲了保證和老代碼接口兼容,由於有一些項目內其餘代碼的依賴性就不解釋了,很容易看明白大概邏輯;列在這裏只是展現一下Transform
使用時pipe過程的代碼樣子。
const Promise = require('bluebird') const path = require('path') const fs = Promise.promisifyAll(require('fs')) const EventEmitter = require('events') const debug = require('debug')('dircopy') const rimraf = require('rimraf') const Transform = require('../lib/transform') const { forceXstat } = require('../lib/xstat') const fileCopy = require('./filecopy') class DirCopy extends EventEmitter { constructor (src, tmp, files, getDirPath) { super() let dst = getDirPath() let pipe = new Transform({ name: 'copy', concurrency: 4, transform: (x, callback) => (x.abort = fileCopy(path.join(src, x.name), path.join(tmp, x.name), (err, fingerprint) => { delete x.abort if (err) { callback(err) } else { callback(null, (x.fingerprint = fingerprint, x)) } })) }).pipe(new Transform({ name: 'stamp', transform: (x, callback) => forceXstat(path.join(tmp, x.name), { hash: x.fingerprint }, (err, xstat) => err ? callback(err) : callback(null, (x.uuid = xstat.uuid, x))) })).pipe(new Transform({ name: 'move', transform: (x, callback) => fs.link(path.join(tmp, x.name), path.join(dst, x.name), err => err ? callback(err) : callback(null, x)) })).pipe(new Transform({ name: 'remove', transform: (x, callback) => rimraf(path.join(tmp, x.name), () => callback(null)) })).root() let count = 0 // drain data pipe.on('data', data => this.emit('data', data)) pipe.on('step', (tname, xname) => { debug('------------------------------------------') debug(`step ${count++}`, tname, xname) pipe.print() if (pipe.isStopped()) this.emit('stopped') }) files.forEach(name => pipe.push({ name })) pipe.print() this.pipe = pipe } } module.exports = DirCopy