源文檔是
core.async
倉庫的一個代碼文件, 包含大量的教程性質的註釋
https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
中間不肯定的兩句留了原文, 有讀懂的同窗請回復幫我糾正git
這份攻略介紹 core.async
核心的一些概念github
clojure.core.async
namespace 包含了公開的 API.異步
(require '[clojure.core.async :as async :refer :all])
數據經過相似隊列的 Channel 來傳輸, Channel 默認不進行 buffer(長度爲 0)
須要生產者和消費者進行約定從而在 Channel 當中傳送數據socket
用 chan
能夠建立一個不進行 buffer 的 Channel:async
(chan)
傳一個數字以建立限定了 buffer 大小的 Channel:ui
(chan 10)
close!
用來關閉 Channel 終結接受消息傳入, 已存在的數據依然能夠取出
取盡的 Channel 在取值時返回 nil
, nil
是不能直接經過 Channel 發送的!spa
(let [c (chan)] (close! c))
對在通常的 Thread 中, 使用 >!!
(阻塞的 put) 和 <!!
(阻塞的 take)
與 Channel 進行通訊線程
(let [c (chan 10)] (>!! c "hello") (assert (= "hello" (<!! c))) (close! c))
因爲是這些調用是阻塞的, 若是嘗試把數據放進沒有 buffer 的 Channel, 那麼整個 Thread 都會被卡住.
因此須要 thread
(比如 future
) 在線程池當中執行代碼主體, 而且經過 Channel 傳回數據
例子中啓動了一個後臺任務把 "hello"
放進 Channel, 而後在主線程讀取數據code
(let [c (chan)] (thread (>!! c "hello")) (assert (= "hello" (<!! c))) (close! c))
go
代碼塊和反轉控制(IoC) threadgo
是一個宏, 能把它的 body 在特殊的線程池裏異步執行
不一樣的是原本會阻塞的 Channel 操做會暫停, 不會有線程被阻塞
這套機制封裝了事件/回調系統當中須要外部代碼的反轉控制
在 go
block 內部, 咱們使用 >!
(put
) 和 <!
(take
)orm
這裏把前面 Channel 的例子轉化成 go
block:
(let [c (chan)] (go (>! c "hello")) (assert (= "hello" (<!! (go (<! c))))) (close! c))
這裏使用了 go
block 來模擬生產者, 而不是直接用 Thread 和阻塞調用
消費者用 go
block 進行獲取, 返回 Channel 做爲結果, 對這個 Channel 作阻塞的讀取
(原文: The consumer uses a go block to take, then returns a result channel, from which we do a blocking take.)
alts
)Channel 對比隊列一個啥手機應用是可以同時等待多個 Channel(像是 socket select)
經過 alts!!
(通常 thread)或者 alts!
(用於 go
block)
能夠經過 alts
建立後臺線程講兩個任意的 Channel 結合到一塊兒alts!!
獲取集合中某個操做的來執行
或者是能夠 take
的 Channel, 或者是能夠 put
[channel value]
的 Channel
並返回包含具體的值(對於 put
返回 nil
)以及獲取成功的 Channel:
(原文: alts!!
takes either a set of operations to perform either a channel to take from a [channel value] to put and returns the value (nil for put) and channel that succeeded:)
(let [c1 (chan) c2 (chan)] (thread (while true (let [[v ch] (alts!! [c1 c2])] (println "Read" v "from" ch)))) (>!! c1 "hi") (>!! c2 "there"))
打印內容(在 stdout, 可能你的 REPL 當中看不到):
從 #<ManyToManyChannel ...>
讀取 hi
從 #<ManyToManyChannel ...>
讀取 there
使用 alts!
來作和 go
block 同樣的事情:
(let [c1 (chan) c2 (chan)] (go (while true (let [[v ch] (alts! [c1 c2])] (println "Read" v "from" ch)))) (go (>! c1 "hi")) (go (>! c2 "there")))
由於 go
block 是輕量級的進程而而不是限於 thread, 能夠同時有大量的實例
這裏建立 1000 個 go
block 在 1000 個 Channel 裏同時發送 hi
它們穩當時用 alts!!
來讀取
(let [n 1000 cs (repeatedly n chan) begin (System/currentTimeMillis)] (doseq [c cs] (go (>! c "hi"))) (dotimes [i n] (let [[v c] (alts!! cs)] (assert (= "hi" v)))) (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
timeout
建立 Channel 並等待設定的毫秒時間, 而後關閉:
(let [t (timeout 100) begin (System/currentTimeMillis)] (<!! t) (println "Waited" (- (System/currentTimeMillis) begin)))
能夠結合 timeout
和 alts
來作有時限的 Channel 等待
這裏是花 100ms 等待數據到達 Channel, 沒有的話放棄:
(let [c (chan) begin (System/currentTimeMillis)] (alts!! [c (timeout 100)]) (println "Gave up after" (- (System/currentTimeMillis) begin)))
Channel 能夠定製不一樣的策略來處理 Buffer 填滿的狀況
這套 API 中提供了兩個實用的例子.
使用 dropping-buffer
控制當 buffer 填滿時丟棄最新鮮的值:
(chan (dropping-buffer 10))
使用 sliding-buffer
控制當 buffer 填滿時丟棄最久遠的值:
(chan (sliding-buffer 10))