發現一個月沒刷技術文章了, 有點慌, 整理一篇短的 CSP 用法出來,
只包含最基本的用法, 在 Go 裏邊最清晰, 不過我是在 Clojure 寫的 CSP.
js 版本的 CSP 實現包含異步, 用法會更繁瑣一些, 可是也值得看看.
我相信 async/await 普及以前, js-csp 仍是一個頗有意思的選擇.git
個人代碼寫的是 CoffeeScript, 能夠自動腦補圓括號花括號上去...
注意包含 yield
的函數自動被轉成 function*() {}
, 因此注意腦補.
腦補不出來的只好貼在這邊編譯下了 http://coffeescript.org/github
首先是最基本的 CSP 的例子, 也就是用同步的代碼寫異步的邏輯,
CSP 當中最核心的概念是 Channel, 最簡單的 csp.timeout(1000)
建立 channel.數組
csp = require 'js-csp' # 用 csp.go 啓動一個 yield 函數 csp.go -> # 有 csp.take 從這個管道取出數據, yield 來模擬阻塞的效果 yield csp.take csp.timeout(1000) console.log 'Gone 1s'
運行一下:緩存
=>> coffee async.coffee Gone 1s
我注意到對於 timeout
來講, 省掉 csp.take
也是可以正常運行的:dom
csp = require 'js-csp' csp.go -> # 腦補 yield 函數 yield csp.timeout 1000 console.log 'Gone 1s' yield csp.timeout 2000 console.log 'Gone 2s' yield csp.timeout 3000 console.log 'Gone 3s. End'
運行一下:異步
=>> coffee async.coffee Gone 1s Gone 2s Gone 3s. End
csp.timeout
比較特殊, 默認就會產生數據, 只要進行 csp.take
就行了.
通常的 Channel 的話, 須要手動建立出來, 而後手動推數據,
好比下面的代碼建立了一個數據, 用 csp.go
啓動另外一個"進程"往 Channel 推數據,
這裏的"進程"的說法並非真正的進程, 只是模擬進程的行爲:async
csp = require 'js-csp' talk = (ch) -> yield csp.timeout 3000 console.log 'Done 3s timeout' # 等待 3s 而後往 Channel 當中寫入數據, yield 會產生等待 yield csp.put ch, 'some result' csp.go -> ch = csp.chan() # 啓動另外一個"進程" csp.go talk, [ch] # 數組裏是傳給 talk 函數的參數 # 使用 yield.take 從 Channel 取出數據, 使用 yield 模擬等待 result = yield csp.take ch console.log 'Result:', JSON.stringify(result)
運行一下:函數
=>> coffee async.coffee Done 3s timeout Result: "some result"
一樣是上邊的代碼, 只是調整一下寫法, 看上去像是分別啓動了兩個"進程",
雖然它們的運行時獨立的, 可是能夠經過管道進行通訊,
並且在對應的 csp.take
和 csp.put
操做過程當中, 會經過 yield 進行等待:oop
csp = require 'js-csp' talk = (ch) -> yield csp.timeout 3000 console.log 'Done 3s timeout' yield csp.put ch, 'some result' listen = (ch) -> result = yield csp.take ch console.log 'Result:', JSON.stringify(result) # 建立 Channel, 啓動兩個"進程" theCh = csp.chan() # csp.go 後面第一個是 yield 函數, 第二個是參數的數組, 雖然比較難看 csp.go talk, [theCh] csp.go listen, [theCh]
運行一下:ui
=>> coffee async.coffee Done 3s timeout Result: "some result"
實際使用當中, 會須要把 js 環境的異步代碼封裝成管道的形式,
不封裝成管道, 就不能借助 csp.go
來封裝同步代碼了,
因爲 js 不像 Go 那樣整個語言層面作了處理, 實際上會有奇怪的寫法,
因此 js-csp 提供了 csp.putAsync
和 csp.takeAsync
:
csp = require 'js-csp' talk = (ch) -> setTimeout -> csp.putAsync ch, 'some result' console.log 'Finished 3s of async' , 3000 listen = (ch) -> result = yield csp.take ch console.log 'Result:', JSON.stringify(result) theCh = csp.chan() talk theCh csp.go listen, [theCh]
運行一下:
=>> coffee async.coffee Finished 3s of async Result: "some result"
一個操做是否超時的問題, 能夠同時啓動一個定時的"進程",
而後觀察兩個"進程"哪個先執行完成, 從而判斷是否超時,
這就用到了 csp.alts
函數, 這個奇怪的命名是用 Clojure 帶過來的:
csp = require 'js-csp' talk = (ch) -> time = Math.random() * 4 * 1000 setTimeout -> console.log "Get result after #{time}ms" csp.putAsync ch, 'some result' , time listen = (ch) -> hurry = csp.timeout 2000 # 經過 csp.alts 同時等待多個 Channel 返回數據 result = yield csp.alts [ch, hurry] # result.channel 能夠用於判斷數據的來源, result.value 纔是真正的數據 if result.channel is hurry console.log 'Too slow, got no result' # close 只是設置 Channel 的狀態, 其實還須要手工處理一些邏輯 hurry.close() else console.log 'Fast enough, got', JSON.stringify(result.value) theCh = csp.chan() talk theCh csp.go listen, [theCh]
用了隨機數, 運行屢次試一下, 能夠看到根據不一樣的時間, 結果是不同的:
=>> coffee async.coffee Too slow, got no result Get result after 3503.6168682995008ms =>> coffee async.coffee Too slow, got no result Get result after 3095.264637685924ms =>> coffee async.coffee Get result after 703.6501633183257ms Fast enough, got "some result" =>> coffee async.coffee Too slow, got no result Get result after 3729.5125755664317ms =>> coffee async.coffee Get result after 101.51519531067788ms Fast enough, got "some result"
跟 yield
用法相似, 若是有循環的代碼, 也能夠用 CSP 寫出來,
這個的話不用怎麼想應該能明白了, loop
只是 while true
的語法糖:
csp = require 'js-csp' chatter = (ch) -> counter = 0 loop yield csp.timeout 1000 counter += 1 yield csp.put ch, counter repeat = (ch) -> loop something = yield csp.take ch console.log 'Hear something:', something theCh = csp.chan() csp.go chatter, [theCh] csp.go repeat, [theCh]
運行一下:
=>> coffee async.coffee Hear something: 1 Hear something: 2 Hear something: 3 Hear something: 4 ^C
實際場景當中會遇到多個消費者從單個生產者讀取數據的需求,
這是一個用 Channel 比較合適的場景, 啓動兩個"進程"讀取一個 Channel 就行了,
下面我模擬的是不一樣的處理時間 300ms 和 800ms 讀取 100ms 頻率的數據,
由於 CSP 自動處理了等待, 整個代碼看上去挺簡單的:
csp = require 'js-csp' chatter = (ch) -> counter = 0 loop yield csp.timeout 100 counter += 1 yield csp.put ch, counter repeat = (ch) -> loop yield csp.timeout 800 something = yield csp.take ch console.log 'Hear at 1:', something repeat2 = (ch) -> loop yield csp.timeout 300 something = yield csp.take ch console.log 'Hear at 2:', something theCh = csp.chan() csp.go chatter, [theCh] csp.go repeat, [theCh] csp.go repeat2, [theCh]
運行一下:
=>> coffee async.coffee Hear at 2: 1 Hear at 2: 2 Hear at 1: 3 Hear at 2: 4 Hear at 2: 5 Hear at 2: 6 Hear at 1: 7 Hear at 2: 8 Hear at 2: 9 Hear at 1: 10 Hear at 2: 11 Hear at 2: 12 Hear at 2: 13 Hear at 1: 14 Hear at 2: 15 Hear at 2: 16 Hear at 1: 17 Hear at 2: 18 Hear at 2: 19 Hear at 2: 20 Hear at 1: 21 Hear at 2: 22 Hear at 2: 23 Hear at 1: 24 ^C
默認狀況下管道是阻塞的, csp.put
csp.take
成對進行,
也就是說, 只有一個就緒的話, 它會等待另外一個開始, 而後一塊兒執行,
可是用 buffer 的話, 管道就會先在必定範圍內進行緩存,
這樣 csp.put
就能夠先運行下去了, 這個是不難理解的...
管道實際上有 3 種策略, fixed, dropping, sliding:
fixed, 緩存放滿之後就會開始造成阻塞了
dropping, 緩存滿了之後, 新的數據就會丟棄
sliding, 緩存滿之後, 會丟棄掉舊的數據讓新數據能放進緩存
隨便演示一個丟棄數據的例子:
csp = require 'js-csp' chatter = (ch) -> counter = 0 loop yield csp.timeout 200 counter += 1 console.log 'Write data:', counter yield csp.put ch, counter repeat = (ch) -> loop yield csp.timeout 300 something = yield csp.take ch console.log 'Hear:', something theCh = csp.chan(csp.buffers.dropping(3)) csp.go chatter, [theCh] csp.go repeat, [theCh]
運行一下, 能夠看到 "Hear" 部分丟失了一些數據, 但前三個數據不會丟:
=>> coffee async.coffee Write data: 1 Hear: 1 Write data: 2 Hear: 2 Write data: 3 Write data: 4 Hear: 3 Write data: 5 Hear: 4 Write data: 6 Write data: 7 Hear: 5 Write data: 8 Hear: 6 Write data: 9 Write data: 10 Hear: 7 Write data: 11 Hear: 8 Write data: 12 Write data: 13 Hear: 9 Write data: 14 Hear: 11 Write data: 15 Write data: 16 Hear: 12 Write data: 17 Hear: 14 ^C
因爲 CSP 是在 Go 語言發明的, 完整的用法仍是看 Go 的教程比較好,
到了 Clojure 和 js 當中不免會增長一些坑, 特別是 js 當中...
上面提到的 API 在 js-csp 的文檔上有描述, 例子也有, 可是挺少的:
另外還有一些高級一點的用法, 好比數據的 transform 和 pipe 之類的,其實就是 Stream 的用法在 Channel 上的改版, 某種程度上 Channel 也是 Stream,對於我我的來講, Channel 的抽象比起 Stream 的抽象舒服多了.