js-csp 異步編程的一些簡單的例子

發現一個月沒刷技術文章了, 有點慌, 整理一篇短的 CSP 用法出來,
只包含最基本的用法, 在 Go 裏邊最清晰, 不過我是在 Clojure 寫的 CSP.
js 版本的 CSP 實現包含異步, 用法會更繁瑣一些, 可是也值得看看.
我相信 async/await 普及以前, js-csp 仍是一個頗有意思的選擇.git

個人代碼寫的是 CoffeeScript, 能夠自動腦補圓括號花括號上去...
注意包含 yield 的函數自動被轉成 function*() {}, 因此注意腦補.
腦補不出來的只好貼在這邊編譯下了 http://coffeescript.org/github

使用 timeout

首先是最基本的 CSP 的例子, 也就是用同步的代碼寫異步的邏輯,
CSP 當中最核心的概念是 Channel, 最簡單的 csp.timeout(1000) 建立 channel.數組

csp = require 'js-csp'

# 用 csp.go 啓動一個 yield 函數
csp.go ->
  # 有 csp.take 從這個管道取出數據, yield 來模擬阻塞的效果
  yield csp.take csp.timeout(1000)
  console.log 'Gone 1s'

運行一下:緩存

=>> coffee async.coffee
Gone 1s

我注意到對於 timeout 來講, 省掉 csp.take 也是可以正常運行的:dom

csp = require 'js-csp'

csp.go -> # 腦補 yield 函數

  yield csp.timeout 1000
  console.log 'Gone 1s'
  yield csp.timeout 2000
  console.log 'Gone 2s'
  yield csp.timeout 3000
  console.log 'Gone 3s. End'

運行一下:異步

=>> coffee async.coffee
Gone 1s
Gone 2s
Gone 3s. End

使用 put 和 take

csp.timeout 比較特殊, 默認就會產生數據, 只要進行 csp.take 就行了.
通常的 Channel 的話, 須要手動建立出來, 而後手動推數據,
好比下面的代碼建立了一個數據, 用 csp.go 啓動另外一個"進程"往 Channel 推數據,
這裏的"進程"的說法並非真正的進程, 只是模擬進程的行爲:async

csp = require 'js-csp'

talk = (ch) ->
  yield csp.timeout 3000
  console.log 'Done 3s timeout'
  # 等待 3s 而後往 Channel 當中寫入數據, yield 會產生等待
  yield csp.put ch, 'some result'

csp.go ->
  ch = csp.chan()

  # 啓動另外一個"進程"
  csp.go talk, [ch] # 數組裏是傳給 talk 函數的參數

  # 使用 yield.take 從 Channel 取出數據, 使用 yield 模擬等待
  result = yield csp.take ch
  console.log 'Result:', JSON.stringify(result)

運行一下:函數

=>> coffee async.coffee
Done 3s timeout
Result: "some result"

僞裝有兩個進程

一樣是上邊的代碼, 只是調整一下寫法, 看上去像是分別啓動了兩個"進程",
雖然它們的運行時獨立的, 可是能夠經過管道進行通訊,
並且在對應的 csp.takecsp.put 操做過程當中, 會經過 yield 進行等待:oop

csp = require 'js-csp'

talk = (ch) ->
  yield csp.timeout 3000
  console.log 'Done 3s timeout'
  yield csp.put ch, 'some result'

listen = (ch) ->
  result = yield csp.take ch
  console.log 'Result:', JSON.stringify(result)

# 建立 Channel, 啓動兩個"進程"
theCh = csp.chan()
# csp.go 後面第一個是 yield 函數, 第二個是參數的數組, 雖然比較難看
csp.go talk, [theCh]
csp.go listen, [theCh]

運行一下:ui

=>> coffee async.coffee
Done 3s timeout
Result: "some result"

封裝異步事件

實際使用當中, 會須要把 js 環境的異步代碼封裝成管道的形式,
不封裝成管道, 就不能借助 csp.go 來封裝同步代碼了,
因爲 js 不像 Go 那樣整個語言層面作了處理, 實際上會有奇怪的寫法,
因此 js-csp 提供了 csp.putAsynccsp.takeAsync:

csp = require 'js-csp'

talk = (ch) ->
  setTimeout ->
    csp.putAsync ch, 'some result'
    console.log 'Finished 3s of async'
  , 3000

listen = (ch) ->
  result = yield csp.take ch
  console.log 'Result:', JSON.stringify(result)

theCh = csp.chan()
talk theCh
csp.go listen, [theCh]

運行一下:

=>> coffee async.coffee
Finished 3s of async
Result: "some result"

處理超時

一個操做是否超時的問題, 能夠同時啓動一個定時的"進程",
而後觀察兩個"進程"哪個先執行完成, 從而判斷是否超時,
這就用到了 csp.alts 函數, 這個奇怪的命名是用 Clojure 帶過來的:

csp = require 'js-csp'

talk = (ch) ->
  time = Math.random() * 4 * 1000
  setTimeout ->
    console.log "Get result after #{time}ms"
    csp.putAsync ch, 'some result'
  , time

listen = (ch) ->
  hurry = csp.timeout 2000
  # 經過 csp.alts 同時等待多個 Channel 返回數據
  result = yield csp.alts [ch, hurry]
  # result.channel 能夠用於判斷數據的來源, result.value 纔是真正的數據
  if result.channel is hurry
    console.log 'Too slow, got no result'
    # close 只是設置 Channel 的狀態, 其實還須要手工處理一些邏輯
    hurry.close()
  else
    console.log 'Fast enough, got', JSON.stringify(result.value)

theCh = csp.chan()
talk theCh
csp.go listen, [theCh]

用了隨機數, 運行屢次試一下, 能夠看到根據不一樣的時間, 結果是不同的:

=>> coffee async.coffee
Too slow, got no result
Get result after 3503.6168682995008ms

=>> coffee async.coffee
Too slow, got no result
Get result after 3095.264637685924ms

=>> coffee async.coffee
Get result after 703.6501633183257ms
Fast enough, got "some result"

=>> coffee async.coffee
Too slow, got no result
Get result after 3729.5125755664317ms

=>> coffee async.coffee
Get result after 101.51519531067788ms
Fast enough, got "some result"

循環任務

yield 用法相似, 若是有循環的代碼, 也能夠用 CSP 寫出來,
這個的話不用怎麼想應該能明白了, loop 只是 while true 的語法糖:

csp = require 'js-csp'

chatter = (ch) ->
  counter = 0
  loop
    yield csp.timeout 1000
    counter += 1
    yield csp.put ch, counter

repeat = (ch) ->
  loop
    something = yield csp.take ch
    console.log 'Hear something:', something

theCh = csp.chan()
csp.go chatter, [theCh]
csp.go repeat, [theCh]

運行一下:

=>> coffee async.coffee
Hear something: 1
Hear something: 2
Hear something: 3
Hear something: 4
^C

多個數據的消費者

實際場景當中會遇到多個消費者從單個生產者讀取數據的需求,
這是一個用 Channel 比較合適的場景, 啓動兩個"進程"讀取一個 Channel 就行了,
下面我模擬的是不一樣的處理時間 300ms 和 800ms 讀取 100ms 頻率的數據,
由於 CSP 自動處理了等待, 整個代碼看上去挺簡單的:

csp = require 'js-csp'

chatter = (ch) ->
  counter = 0
  loop
    yield csp.timeout 100
    counter += 1
    yield csp.put ch, counter

repeat = (ch) ->
  loop
    yield csp.timeout 800
    something = yield csp.take ch
    console.log 'Hear at 1:', something

repeat2 = (ch) ->
  loop
    yield csp.timeout 300
    something = yield csp.take ch
    console.log 'Hear at 2:', something

theCh = csp.chan()
csp.go chatter, [theCh]
csp.go repeat, [theCh]
csp.go repeat2, [theCh]

運行一下:

=>> coffee async.coffee
Hear at 2: 1
Hear at 2: 2
Hear at 1: 3
Hear at 2: 4
Hear at 2: 5
Hear at 2: 6
Hear at 1: 7
Hear at 2: 8
Hear at 2: 9
Hear at 1: 10
Hear at 2: 11
Hear at 2: 12
Hear at 2: 13
Hear at 1: 14
Hear at 2: 15
Hear at 2: 16
Hear at 1: 17
Hear at 2: 18
Hear at 2: 19
Hear at 2: 20
Hear at 1: 21
Hear at 2: 22
Hear at 2: 23
Hear at 1: 24
^C

使用 buffer

默認狀況下管道是阻塞的, csp.put csp.take 成對進行,
也就是說, 只有一個就緒的話, 它會等待另外一個開始, 而後一塊兒執行,
可是用 buffer 的話, 管道就會先在必定範圍內進行緩存,
這樣 csp.put 就能夠先運行下去了, 這個是不難理解的...
管道實際上有 3 種策略, fixed, dropping, sliding:

  • fixed, 緩存放滿之後就會開始造成阻塞了

  • dropping, 緩存滿了之後, 新的數據就會丟棄

  • sliding, 緩存滿之後, 會丟棄掉舊的數據讓新數據能放進緩存

隨便演示一個丟棄數據的例子:

csp = require 'js-csp'

chatter = (ch) ->
  counter = 0
  loop
    yield csp.timeout 200
    counter += 1
    console.log 'Write data:', counter
    yield csp.put ch, counter

repeat = (ch) ->
  loop
    yield csp.timeout 300
    something = yield csp.take ch
    console.log 'Hear:', something

theCh = csp.chan(csp.buffers.dropping(3))
csp.go chatter, [theCh]
csp.go repeat, [theCh]

運行一下, 能夠看到 "Hear" 部分丟失了一些數據, 但前三個數據不會丟:

=>> coffee async.coffee
Write data: 1
Hear: 1
Write data: 2
Hear: 2
Write data: 3
Write data: 4
Hear: 3
Write data: 5
Hear: 4
Write data: 6
Write data: 7
Hear: 5
Write data: 8
Hear: 6
Write data: 9
Write data: 10
Hear: 7
Write data: 11
Hear: 8
Write data: 12
Write data: 13
Hear: 9
Write data: 14
Hear: 11
Write data: 15
Write data: 16
Hear: 12
Write data: 17
Hear: 14
^C

小結

因爲 CSP 是在 Go 語言發明的, 完整的用法仍是看 Go 的教程比較好,
到了 Clojure 和 js 當中不免會增長一些坑, 特別是 js 當中...
上面提到的 API 在 js-csp 的文檔上有描述, 例子也有, 可是挺少的:

另外還有一些高級一點的用法, 好比數據的 transform 和 pipe 之類的,其實就是 Stream 的用法在 Channel 上的改版, 某種程度上 Channel 也是 Stream,對於我我的來講, Channel 的抽象比起 Stream 的抽象舒服多了.

相關文章
相關標籤/搜索