ClojureScript core.async 豐富的語義和示例

這篇筆記主要是基於文檔展開一下 core.async 在 ClojureScript 當中的基本用法.
具體的內容能夠看原文章, 已經比較詳細了, 不少在 API 文檔的 demo 當中.
關於基礎知識跟 cljs 跟 clj 的區別, 這篇文章就不涉及了.html

以前用到 core.async , 發現本身中間不少理解缺失了, 趁有時間趕忙看一下.
從 API 文檔能夠看到 core.async 的函數語義是比較豐富的, 幾十個函數,
我順着看了一圈, 整理下來大體是幾個功能, 大體分紅幾塊.node

  • 多對一git

    • 相似 Promise.all
    • 多個 channel 的數據進行 map
    • 經過 merge 合併多個 channel 到一個
    • 經過 mix 控制多個 channel 具體合併/解開的狀況
  • 多選一github

    • alts! 的多選一, 對應 Promise.race
    • alt! 的語法套路
  • 一拆二/過濾數組

    • split 直接拆成兩個
    • 用 pipeline 搭配 filter 進行過濾
    • 用 transducer 寫法進行過濾
  • 一對多promise

    • 經過 mult 發送給多個接收端

原本個人觸發點是想看 core.async 是都能對應到 Promise 經常使用的功能,
這樣看下來, core.async 功能是過於豐富了, 反而有些用不上.
因爲我對 Go 熟悉度有限, 很差跟 Go Channel 作對比了.async

後面逐個看一下示例. 爲了方便演示, 我增長了兩個輔助函數,函數

  • fake-task-chan, 生成一個隨機定時任務, 返回一個 channel
  • display-all, 打印一個 channel 全部返回的非 nil 數據, nil 表示結束.

多對一

相似 Promise.all

(defn demo-all []
  (go
         ; 首先 tasks 獲得向量, 內部函數多個 channel
   (let [tasks (->> (range 10)
                    (map (fn [x] (fake-task-chan (str "rand task " x) 10 x))))]
     (println "result"
        ; loop 函數逐個取 tasks 的值, 從頭取, 一次次 recur, 直到取完, 結束
        (loop [acc [], xs tasks]
          (if (empty? xs)
              acc
                               ; <! 表示從 channel 取數據, 在 go block 內阻塞邏輯
              (recur (conj acc (<! (first xs)))
                     (rest xs))))))))

因爲任務在 loop 以前已經開始了, 相似 Promise.all 的效果.
一個個等待結果, 最終就是所有的值, 耗時就是最長的那個等待的時間.oop

=>> node target/server.js
rand task 0 will take 0 secs
rand task 1 will take 1 secs
rand task 2 will take 2 secs
rand task 3 will take 3 secs
rand task 4 will take 9 secs
rand task 5 will take 6 secs
rand task 6 will take 7 secs
rand task 7 will take 1 secs
rand task 8 will take 2 secs
rand task 9 will take 9 secs
rand task 0 finished
rand task 1 finished
rand task 7 finished
rand task 2 finished
rand task 8 finished
rand task 3 finished
rand task 5 finished
rand task 6 finished
rand task 4 finished
rand task 9 finished
result [0 1 2 3 4 5 6 7 8 9]

能夠看到最終以數組形式返回了每一個 channel 返回的數據了..net

多個 channel 的數據進行 map

我其實不大清楚這個 map 用在什麼樣的場景, 就是取兩個 channel 計算獲得新的數字.

(defn demo-map []
  (let [<c1 (to-chan! (range 10))
        <c2 (to-chan! (range 100 120))
        <c3 (async/map + [<c1 <c2])]
    (display-all <c3)))

因此就是 0 + 100 1 + 101... 獲得 10 個數據

=>> node target/server.js
100
102
104
106
108
110
112
114
116
118
nil

整體上仍是多個 channel 合併成一個了.

經過 merge 合併多個 channel 到一個

merge 就是把多個 channel 的數據合併到一個, 字面意義的意思.
從獲得的新的 channel, 能夠獲取到原來 channel 的數據.

(defn demo-merge []
  (let [<c1 (chan),
        <c2 (chan),
        <c3 (async/merge [<c1 <c2])]
    (go (>! <c1 "a") (>! <c2 "b"))
    (display-all <c3)))

因此從 c3 就能拿到寫到原來的兩個 channel 的數據了,

=>> node target/server.js
a
b

經過 mix 控制多個 channel 具體合併/解開的狀況

mix 跟 merge 很類似, 區別是中間多了一個控制層, 定義成 mix-out,
經過 admix unmix 兩個函數能夠調整 mix-out 上的關係,
這個例子當中

(defn demo-mix []
  (let [<c0 (chan)
        <c1 (async/to-chan! (range 40))
        <c2 (async/to-chan! (range 100 140))
        mix-out (async/mix <c0)]
    ; mix 過來兩個 channel
    (async/admix mix-out <c1)
    (async/admix mix-out <c2)
    (go
               ; 先取 20 個數據打印
     (doseq [x (range 20)] (println "loop1" (<! <c0)))
     (println "removing c2")
     ; 去掉那個數字特別大的 channel
     (async/unmix mix-out <c2)
               ; 再取 20 個數據打印
     (doseq [x (range 20)] (println "loop2" (<! <c0))))))

獲得結果,

=>> node target/server.js
loop1 0
loop1 100
loop1 1
loop1 101
loop1 2
loop1 102
loop1 3
loop1 103
loop1 104
loop1 4
loop1 105
loop1 5
loop1 106
loop1 6
loop1 107
loop1 108
loop1 109
loop1 110
loop1 7
loop1 8
removing c2
loop2 111
loop2 9
loop2 10
loop2 11
loop2 12
loop2 13
loop2 14
loop2 15
loop2 16
loop2 17
loop2 18
loop2 19
loop2 20
loop2 21
loop2 22
loop2 23
loop2 24
loop2 25
loop2 26
loop2 27

能夠看到剛開始的時候, 從返回的 channel 能夠獲取到兩個來源 channel 的數據,
進行一次 unmix 以後, 大數的來源不見了, 後面基本上是小的數字.

這個順序看上去是有一些隨機性的, 甚至 unmix 還有一次大數的打印, 後面穩定了.
注意 mix-out 只是用於控制, 獲取數據在代碼裏仍是要經過 c0 獲取的.

多選一

alts! 的多選一, 對應 Promise.race

這個比較清晰的

(defn demo-alts []
  (go
   (let [<search (fake-task-chan "searching" 20 "searched x")
         <cache (fake-task-chan "looking cache" 15 "cached y")
         <wait (fake-task-chan "timeout" 15 nil)
                       ; 數組裏邊三個可選的 channel
         [v ch] (alts! [<cache <search <wait])]
     (if (= ch <wait ) (println "final: timeout")
                       (println "get result:" v)))))

就是隨機的時間, 取返回最快的結果. 我多跑幾回

=>> node target/server.js
searching will take 3 secs
looking cache will take 14 secs
timeout will take 9 secs
searching finished
get result: searched x
^C
=>> node target/server.js
searching will take 10 secs
looking cache will take 1 secs
timeout will take 4 secs
looking cache finished
get result: cached y
timeout finished
searching finished
^C
=>> node target/server.js
searching will take 19 secs
looking cache will take 4 secs
timeout will take 1 secs
timeout finished
final: timeout
looking cache finished
^C
=>> node target/server.js
searching will take 0 secs
looking cache will take 6 secs
timeout will take 1 secs
searching finished
get result: searched x
timeout finished
looking cache finished
^C

能夠看到打印的結果都是最短期結束的任務對應的返回值.
timeout 是這種狀況當中比較經常使用的一個定時器, 控制超時.

alt! 的語法套路

alt! 跟 alts! 就是相似了, 主要是語法比較豐富一點,

(defn demo-alt-syntax []
  (let [<search1 (fake-task-chan "search1" 10 "search1 found x1")
        <search2 (fake-task-chan "search2" 10 "search2 found x2")
        <log (chan)
        <wait (fake-task-chan "timeout" 10 nil)]
    (go
     (loop []
       (let [t (rand-int 10)]
         (println "read log waits" t)
         (<! (timeout (* 1000 t)))
         (println "got log" (<! <log))
         (recur))))
    (go
     (println "result"
       (async/alt!
         ; 匹配單個 channel 的寫法
         <wait :timeout
         ; 這個是往 channel 發送消息的寫法, 發送也是等待對方讀取, 也受時間影響
         ;  這個兩層數組是挺邪乎的寫法...
         [[<log :message]] :sent-log
         ; 這個匹配向量包裹的多個 channel, 後面經過 ch 能夠區分命中的 channel
         [<search1 <search2] ([v ch] (do (println "got" v "from" ch)
                                         :hit-search)))))))

直接多跑幾回了, 效果跟上邊一個差很少的,

=>> node target/server.js
search1 will take 3 secs
search2 will take 7 secs
timeout will take 3 secs
read log waits 8
search1 finished
timeout finished
got search1 found x1 from #object[cljs.core.async.impl.channels.ManyToManyChannel]
result :hit-search
search2 finished
^C
=>> node target/server.js
search1 will take 2 secs
search2 will take 0 secs
timeout will take 4 secs
read log waits 2
search2 finished
got search2 found x2 from #object[cljs.core.async.impl.channels.ManyToManyChannel]
result :hit-search
search1 finished
timeout finished
^C
=>> node target/server.js
search1 will take 9 secs
search2 will take 0 secs
timeout will take 9 secs
read log waits 6
search2 finished
got search2 found x2 from #object[cljs.core.async.impl.channels.ManyToManyChannel]
result :hit-search
search1 finished
timeout finished
^C
=>> node target/server.js
search1 will take 2 secs
search2 will take 2 secs
timeout will take 2 secs
read log waits 9
search1 finished
search2 finished
timeout finished
got search1 found x1 from #object[cljs.core.async.impl.channels.ManyToManyChannel]
result :hit-search
^C
=>> node target/server.js
search1 will take 6 secs
search2 will take 3 secs
timeout will take 1 secs
read log waits 6
timeout finished
result :timeout
search2 finished

一拆二/過濾

split 直接拆成兩個

看文檔好像就是直接這樣拆成兩個的, 對應 true/false,

(defn demo-split []
  (let [<c0 (to-chan! (range 20))]
    (let [[<c1 <c2] (async/split odd? <c0)]
      (go (display-all <c2 "from c2"))
      (go (display-all <c1 "from c1")))))

而後獲得數據就是分別從不一樣的 channel 才能獲得了, 奇書和偶數,

=>> node target/server.js
from c2 0
from c1 1
from c1 3
from c2 2
from c2 4
from c2 6
from c1 5
from c1 7
from c1 9
from c2 8
from c2 10
from c2 12
from c1 11
from c1 13
from c1 15
from c2 14
from c2 16
from c2 18
from c1 17
from c1 19
from c1 nil
from c2 nil

用 pipeline 搭配 filter 進行過濾

這個 pipeline 就是中間插入一個函數, 例子裏是 filter, 直接進行過濾.

(defn demo-pipeline-filter []
  (let [<c1 (to-chan! (range 20)),
        <c2 (chan)]
    (async/pipeline 1 <c2 (filter even?) <c1)
    (display-all <c2)))

效果就是從 c2 取數據時, 只剩下偶數的值了,

=>> node target/server.js
0
2
4
6
8
10
12
14
16
18
nil

用 transducer 寫法進行過濾

transducer 比較高級一點, 用到高階函數跟比較複雜的抽象,
可是簡單的功能寫出來, 主要發揮做用的函數那個 (filter even?),
柯理化的用法, 返回函數, 而後被 comp 拿去組合,

(defn demo-transduce-filter []
  (let [<c1 (to-chan! (range 20)),
        <c2 (chan 1 (comp (filter even?)))]
    (async/pipe <c1 <c2)
    (display-all <c2)))

獲得的結果跟上邊是同樣的, 都是過濾出偶數,

=>> node target/server.js
0
2
4
6
8
10
12
14
16
18
nil

一對多

經過 mult 發送給多個接收端

就是把一個數據變成多份, 供多個 channel 過來取數據,

(defn demo-mult []
  (let [<c0 (async/to-chan! (range 10)),
        <c1 (chan),
        <c2 (chan),
        ; mult-in 也是一個控制, 而不是一個 channel
        mult-in (async/mult <c0)]
    (async/tap mult-in <c1)
    (async/tap mult-in <c2)
    (display-all <c1 "from c1")
    (comment "need to take from c2, otherwise c0 is blocked")
    (display-all <c2 "from c2")))

能夠看到運行之後就是 c1 c2 分別拿到一份同樣的數據了,

=>> node target/server.js
from c1 0
from c2 0
from c1 1
from c1 2
from c2 1
from c2 2
from c1 3
from c1 4
from c2 3
from c2 4
from c1 5
from c1 6
from c2 5
from c2 6
from c1 7
from c1 8
from c2 7
from c2 8
from c1 9
from c1 nil
from c2 9
from c2 nil

大概的場景應該就是一個數據發佈到多個 channel 去吧.
不過這個跟監聽還有點不同, 監聽廣播時發送者是非阻塞的, 這邊是 channel 是阻塞的.

結尾

代碼後續會同步到 github 去 https://github.com/worktools/... .

這邊主要仍是 API 的用法, 業務場景當中使用 core.async 會複雜一些,
好比 debounce 的邏輯用 timeout 搭配 loop 處理就比較順,
具體代碼參考, https://zhuanlan.zhihu.com/p/...
但通常都是會攪尾遞歸在裏邊, 進行邏輯控制和狀態的傳遞.
網上別人的例子加上業務邏輯還會複雜不少不少...

但總的說, Clojure 提供的 API, 還有抽象能力, 應該是能夠用來應對不少場景的.

相關文章
相關標籤/搜索