[譯]介紹 `core.async` 核心的一些概念

源文檔是 core.async 倉庫的一個代碼文件, 包含大量的教程性質的註釋
https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
中間不肯定的兩句留了原文, 有讀懂的同窗請回復幫我糾正git


這份攻略介紹 core.async 核心的一些概念github

clojure.core.async namespace 包含了公開的 API.異步

(require '[clojure.core.async :as async :refer :all])

Channel

數據經過相似隊列的 Channel 來傳輸, Channel 默認不進行 buffer(長度爲 0)
須要生產者和消費者進行約定從而在 Channel 當中傳送數據socket

chan 能夠建立一個不進行 buffer 的 Channel:async

(chan)

傳一個數字以建立限定了 buffer 大小的 Channel:ui

(chan 10)

close! 用來關閉 Channel 終結接受消息傳入, 已存在的數據依然能夠取出
取盡的 Channel 在取值時返回 nil, nil 是不能直接經過 Channel 發送的!spa

(let [c (chan)]
  (close! c))

通常的 Thread

對在通常的 Thread 中, 使用 >!!(阻塞的 put) 和 <!!(阻塞的 take)
與 Channel 進行通訊線程

(let [c (chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (close! c))

因爲是這些調用是阻塞的, 若是嘗試把數據放進沒有 buffer 的 Channel, 那麼整個 Thread 都會被卡住.
因此須要 thread(比如 future) 在線程池當中執行代碼主體, 而且經過 Channel 傳回數據
例子中啓動了一個後臺任務把 "hello" 放進 Channel, 而後在主線程讀取數據code

(let [c (chan)]
  (thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (close! c))

go 代碼塊和反轉控制(IoC) thread

go 是一個宏, 能把它的 body 在特殊的線程池裏異步執行
不一樣的是原本會阻塞的 Channel 操做會暫停, 不會有線程被阻塞
這套機制封裝了事件/回調系統當中須要外部代碼的反轉控制
go block 內部, 咱們使用 >!(put) 和 <!(take)orm

這裏把前面 Channel 的例子轉化成 go block:

(let [c (chan)]
  (go (>! c "hello"))
  (assert (= "hello" (<!! (go (<! c)))))
  (close! c))

這裏使用了 go block 來模擬生產者, 而不是直接用 Thread 和阻塞調用
消費者用 go block 進行獲取, 返回 Channel 做爲結果, 對這個 Channel 作阻塞的讀取
(原文: The consumer uses a go block to take, then returns a result channel, from which we do a blocking take.)

選擇性(alts)

Channel 對比隊列一個啥手機應用是可以同時等待多個 Channel(像是 socket select)
經過 alts!!(通常 thread)或者 alts!(用於 go block)

能夠經過 alts 建立後臺線程講兩個任意的 Channel 結合到一塊兒
alts!! 獲取集合中某個操做的來執行
或者是能夠 take 的 Channel, 或者是能夠 put [channel value] 的 Channel
並返回包含具體的值(對於 put 返回 nil)以及獲取成功的 Channel:
(原文: alts!! takes either a set of operations to perform either a channel to take from a [channel value] to put and returns the value (nil for put) and channel that succeeded:)

(let [c1 (chan)
      c2 (chan)]
  (thread (while true
            (let [[v ch] (alts!! [c1 c2])]
              (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))

打印內容(在 stdout, 可能你的 REPL 當中看不到):
#<ManyToManyChannel ...> 讀取 hi
#<ManyToManyChannel ...> 讀取 there

使用 alts! 來作和 go block 同樣的事情:

(let [c1 (chan)
      c2 (chan)]
  (go (while true
        (let [[v ch] (alts! [c1 c2])]
          (println "Read" v "from" ch))))
  (go (>! c1 "hi"))
  (go (>! c2 "there")))

由於 go block 是輕量級的進程而而不是限於 thread, 能夠同時有大量的實例
這裏建立 1000 個 go block 在 1000 個 Channel 裏同時發送 hi
它們穩當時用 alts!! 來讀取

(let [n 1000
      cs (repeatedly n chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

timeout 建立 Channel 並等待設定的毫秒時間, 而後關閉:

(let [t (timeout 100)
      begin (System/currentTimeMillis)]
  (<!! t)
  (println "Waited" (- (System/currentTimeMillis) begin)))

能夠結合 timeoutalts 來作有時限的 Channel 等待
這裏是花 100ms 等待數據到達 Channel, 沒有的話放棄:

(let [c (chan)
      begin (System/currentTimeMillis)]
  (alts!! [c (timeout 100)])
  (println "Gave up after" (- (System/currentTimeMillis) begin)))

ALT

todo

其餘 Buffer

Channel 能夠定製不一樣的策略來處理 Buffer 填滿的狀況
這套 API 中提供了兩個實用的例子.

使用 dropping-buffer 控制當 buffer 填滿時丟棄最新鮮的值:

(chan (dropping-buffer 10))

使用 sliding-buffer 控制當 buffer 填滿時丟棄最久遠的值:

(chan (sliding-buffer 10))
相關文章
相關標籤/搜索