clojure 中使用 actor

背景

Actor 模型 是讓 Earlang 聲名卓著的關鍵特性。它是 Erlang 平臺實現分佈式編程的關鍵內容,在 Clojure 語言設計時, Rich Hickey 考慮過在 Clojure 語言中是否實現 Actor,他最終認爲:這僅僅是適合於分佈式編程的一種特徵,若是成爲語言的本質,將限制 Clojure 成爲一種服務器領域語言,所以他決定使用其餘更簡單直接的異步通訊模型。但他也沒有排除將來在 Clojure 中引入 Actor 的可能。編程

簡單實現

隨着 core.async 庫的推出和成熟,咱們實際上已經能夠用 core.async 來實現一個最簡單的 Actor 模型:服務器

(require '[clojure.core.async :as a])

(defn actor [f]
  (let [mail-box (a/chan (a/dropping-buffer 32))]
    (a/go-loop [f f]
      (when-let [v (a/<! mail-box)]
        (recur (f v))))
    mail-box))

(def ! a/put!) ;erlang 的操做符

能夠看到,上面的 actor 函數能夠將一個 core.async 通道封裝成一個主動單元,其中擁有本身的事件循環,不斷地在 mail-box 上等待新的消息。數據結構

下面咱們定義一個簡單的調試 actor:異步

(def debug-actor (actor (fn debug[x] (prn x) debug)))

(! debug-actor "Hello, world!")
;;輸出 Hello, world

這最簡單的 actor 固然尚未支持分佈式編程,但使用這個模型,咱們將程序的組件變成了主動的單元,從而在通道的基礎上提供了另外一種抽象。async

與 Transducers 的關係

Clojure 1.7 開始引入的 transducer 被普遍使用,它能夠使用在任何連續的數據結構上,例如序列,以及 core.async 的通道,若是咱們已經定義好了一個 transducer 或者用它定義的操做 (xf),是否能夠用於 Actor 呢?分佈式

(defn actor-xf [xf out-actor]
  (let [f-out (fn
                ([b] b)
                ([b itm] (a/put! b itm) b))]
    (fn f-actor
      ([v] (f-actor out-actor v))
      ([acc v]
       (let [acc ((xf f-out) acc v)]
         (partial f-actor acc))))))

這個函數能夠用定義好的 xf 生成一個 actor,它將對消息處理後將轉化後的消息送往 out-actor。例如:函數

(def inc-actor (actor (actor-xf (map inc) debug-actor)))
(! inc-actor 6)
;;輸出7
(def complex-actor (actor (actor-xf (mapcat #(repeatedly % vector)) debug-actor)))
(! complex-actor 2)
;;[]
;;[]

其實 core.async 庫中已經實現了相似的功能,就是 chan 函數自己!咱們不過須要用 pipe 函數將內外兩個通道連起來就能夠了:oop

(defn actor-xf
  [xf out-ch]
  (let [ch (a/chan (a/dropping-buffer 32) xf)]
    (a/pipe ch out-ch)
    ch))
相關文章
相關標籤/搜索