Elixir: Syn,一個分佈式進程註冊表和進程組管理器

什麼是進程註冊表?

全局進程註冊表, 用單個 Key 在集羣中的全部節點上註冊一個進程. 其做用相似於 DNS 服務器的做用: 經過域名獲取一個IP地址, 這裏的進程註冊表的做用是: 經過 Key 獲取進程, 簡單的說就是經過索引Key 去進程 Hash 表中查詢對應的 Pid.node

經典用例:git

  • 在系統上註冊一個處理物理設備鏈接的進程(使用其序列號)github

  • 聊天客戶端後端

什麼是進程組?

進程組是一個命名組, 包含多個最終運行在多個不一樣節點上的進程. 經過組名稱, 能夠獲取運行在集羣中任意節點上的全部進程, 或者發佈一條消息給進程組中的全部進程. 該機制能夠實現 發佈/訂閱 模式.服務器

經典用例:網絡

  • 聊天室架構

  • 發佈/訂閱模式的應用場景app

什麼是 Syn ?

  • 一個全局進程註冊表分佈式

  • 一個進程組管理器函數

  • 任何項式能夠用做鍵和名稱

  • 消息能夠發送給進程組中的全部成員進程(PubSub機制)

  • 快速寫

  • 自動處理衝突決策(網絡分區)

  • 可配置回調

  • 自動監控進程, 死亡時自動移除

在任何分佈式系統中都要面對一致性的問題, 一般用一個主節點執行全部鞋操做, (經過領導者選舉機制) 或經過原子事務).

Syn 誕生於物聯網領域的應用. 在這個場景下, 用於標識一個進程的Key 通常是物理對象的惟一標識符(例如, 序列號,或Mac地址), 所以, 這些鍵已經保證了惟一性.

寫速度在Syn架構中是一個決定性因素, 所以, 可用性大於一致性, Syn 選擇了最終一致性

安裝

Rebar3

若是使用 [rebar3](), 在 rebar.config 配置文件添加 syn 做爲依賴

{syn, {git, "git://github.com/ostinelli/syn.git", {tag, "1.3.1"}}}

或者, 若是使用 Hex.pm 做爲包管理器(和 rebar3_hex)插件

{syn, "1.3.1"}

而後, 編譯

$ rebar3 compile

Rebar2

若是使用 [rebar](), 在 rebar.config 配置文件中添加 syn 依賴:

{syn, ".*", {git, "git://github.com/ostinelli/syn.git", {tag, "1.3.1"}}}

而後, 獲取依賴並編譯:

$ rebar get-deps
$ rebar compile

使用

設置

確保Syn已經在你的應用程序中啓動, 能夠添加到 .app 文件中讓它隨你的應用自動啓動, 也能夠經過 syn:start(). 手工啓動.

你的應用程序在集羣中運行, 一旦節點相互鏈接, 確保初始化 Sync (這回設置底層的 mnesia 後端)

syn:init().

請保證在一個節點上只初始化一次Syn, 即便該節點從集羣中斷開, 並重連到集羣, 不要從新初始化它. 這回讓 Syn 不能正確地自動決斷衝突

推薦的啓動位置爲: 主應用模塊的 start/2 函數.

-module(myapp_app).
-behaviour(application).

-export([start/2, stop/1]).

start(_StartType, _StartArgs) ->
    %% connect to nodes
    connect_nodes(),
    %% init syn
    syn:init(),
    %% start sup
    myapp_sup:start_link().

connect_nodes() ->
    %% list of nodes contained in ENV variable `nodes`
    Nodes = application:get_env(nodes),
    %% connect to nodes
    [net_kernel:connect_node(Node) || Node <- Nodes].

進程註冊和查詢

註冊一個進程:

syn:register(Key, Pid) -> ok | {error, Error}.

Types:
    Key = any()
    Pid = pid()
    Error = taken | pid_already_registered

註冊一個進程並附加元數據:

syn:register(Key, Pid, Meta) -> ok | {error, Error}.

Types:
    Key = any()
    Pid = pid()
    Meta = any()
    Error = taken | pid_already_registered
ERROR DESC
taken 鍵已經被其餘進程佔用
pid_already_registered Pid已經被其餘鍵註冊

你能夠從新註冊一個進程屢次, 好比更新元數據的時候, 當進程被註冊時候, Syn 會自動監控它. 所以須要的時候你能夠處理進程的'DOWN'事件

經過Key獲取進程的Pid:

syn:find_by_key(Key) -> Pid | undefined.

Types:
    Key = any()
    Pid = pid()

獲取一個進程的Pid, 及其元數據:

syn:find_by_key(Key, with_meta) -> {Pid, Meta} | undefined.

Types:
    Key = any()
    Pid = pid()
    Meta = any()

經過Pid查找Key:

syn:find_by_pid(Pid) -> Key | undefined.

Types:
    Pid = pid()
    Key = any()

經過Pid及其元數據查找Key:

syn:find_by_pid(Pid, with_meta) -> {Key, Meta} | undefined.

Types:
    Pid = pid()
    Key = any()
    Meta = any()

註銷一個進程:

syn:unregister(Key) -> ok | {error, Error}.

Types:
    Key = any()
    Error = undefined

你不須要註銷死亡進程的Key, 所以它被Syn自動監控, 而且被自動移除. 若是你在進程死亡前手動註銷一個進程(看下面), 進程的退出回調函數可能不會調用.

獲取集羣中測註冊進程數:

syn:registry_count() -> non_neg_integer().

獲取指定節點上的註冊進程數:

syn:registry_count(Node) -> non_neg_integer().

Types:
    Node = atom()

進程組的加入和離開

不須要手動建立/刪除進程組, Syn 將會幫你管理這些.

添加一個進程到組:

syn:join(Name, Pid) -> ok | {error, Error}.

Types:
    Name = any()
    Pid = pid()
    Error = pid_already_in_group

一個進程能夠加入多個組, 當一個進程加入一個組, Syn會自動地監控它.

從組中刪除一個進程:

syn:leave(Name, Pid) -> ok | {error, Error}.

Types:
    Name = any()
    Pid = pid()
    Error = undefined | pid_not_in_group

你不須要刪除已經死亡的進程, 所以其被Syn監控, 而且在死亡的時候自動從組中刪除.

獲取進程組中的成員進程列表:

syn:get_members(Name) -> [pid()].

Types:
    Name = any()

無論你是從哪一個節點上調用 syn:get_members(Name), Syn保證了其結果順序是同樣的. 可是並無保證這個結果的順序和進程加入的順序同樣.

查詢一個進程是否在一個組中:

syn:member(Pid, Name) -> boolean().

Types:
    Pid = pid()
    Name = any()

發佈一條消息給全部成員:

syn:publish(Name, Message) -> {ok, RecipientCount}.

Types:
    Name = any()
    Message = any()
    RecipientCount = non_neg_integer()

調用全部的組成員, 並獲取其響應:

syn:multi_call(Name, Message) -> {Replies, BadPids}.

等同於 syn:multi_call(Name, Message, 5000).

syn:multi_call(Name, Message, Timeout) -> {Replies, BadPids}.

Types:
    Name = any()
    Message = any()
    Timeout = non_neg_integer()
    Replies = [{MemberPid, Reply}]
    BadPids = [MemberPid]
      MemberPid = pid()
      Reply = any()

接收全部成員的響應的過程可能會超時, 超時值可經過 Timeout() 參數設定, 在 Timeout() 超時後, 全部沒有及時得到迴應的成員進程或者崩潰的進程會被移到 BadPids 列表中.

響應的格式:

{syn_multi_call, CallerPid, Message}

Types:
    CallerPid = pid()
    Message = any()

To reply, every member must use the method:

syn:multi_call_reply(CallerPid, Reply) -> ok.

Types:
    CallerPid = pid()
    Reply = any()

選項

選項能夠設置在環境變量 syn 中, 可能最好的是使用應用程序配置文件(在發佈包的 sys.config 配置文件中). 全部可用的選項包括:

{syn, [
    %% define callback function on process exit
    {registry_process_exit_callback, [module1, function1]},

    %% define callback function on conflicting process (instead of kill)
    {registry_conflicting_process_callback, [module2, function2]}
]}

這些選項在下面解釋.

註冊表選項

運行設置的進程註冊表選項包括:

  • registry_process_exit_callback

  • registry_conflicting_process_callback

進程退出回調

registry_process_exit_callback 選項容許你指定 modulefunction做爲註冊進程退出的回調函數. 該回調僅在進程運行的節點上調用.

回調函數定義爲:

CallbackFun = fun(Key, Pid, Meta, Reason) -> any().

Types:
    Key = any()
    Pid = pid()
    Meta = any()
    Reason = any()

KeyPid 爲進程退出時的 KeyPid, Reason 爲該進程退出的緣由.

例如, 當進程退出時, 若是你想要打印日誌, 定義以下回調函數

-define(my_callback).
-export([callback_on_process_exit /4]).

callback_on_process_exit() ->
    error_logger:info_msg(
        "Process with key ~p, Pid ~p and Meta ~p exited with reason ~p~n",
        [Key, Pid, Meta, Reason]
    ).

並設置選項:

{syn, [
    %% define callback function
    {registry_process_exit_callback, [my_callback, callback_on_process_exit]}
]}

若是你不設置該選項, 不會觸發回調.

若是進程由於衝突而死亡, 進程退出回調函數任然會被調用, 這時 KeyMeta 的值爲 undefined.

經過回調來決斷衝突

在有競態條件的狀況下, 或者網絡斷開, 一個 Key 可能在兩個不一樣的節點上同時註冊, 當網絡恢復或者競態條件消失的時候, 進程註冊表會致使一個名稱衝突.

當這種狀況發生時, Syn 會在衝突的進程中選擇一個, 殺死另外一個(互斥, 只能存在一個)來解決進程註冊表衝突. Syn 將會拋棄運行在衝突決斷的節點上的進程, 而且默認將會發送一個 kill 信號(exit(Pid, kill)來殺死它.

若是這不是你指望的, 你能夠設置 registry_conflicting_process_callback 選項來讓 Syn 觸發一個回調, 這樣你能夠指向一些自定義的操做(好比 graceful shutdown). 在這種場景下, 進程不會被 Syn 殺掉, 你必須決定要作什麼事情. 該回調僅在進程運行的節點上被調用.

注: 實際處理應該避免自動處理衝突. 應該重啓節點, 讓它去同步 Mnesia 集羣的數據.

該回調函數定義爲:

CallbackFun = fun(Key, Pid, Meta) -> any().

Types:
    Key = any()
    Pid = pid()
    Meta = any()

Key, PidMeta 屬於被拋棄的進程的. 例如, 若是你想發送一個 shutdown 時間給被拋棄的進程:

-module(my_callback).
-export([callback_on_conflicting_process/3]).

callback_on_conflicting_process(_Key, Pid, _Meta) ->
    Pid ! shutdown

選項設置:

{syn, [
    %% define callback function
    {registry_conflicting_process_callback, [my_callback, callback_on_conflicting_process]}
]}

重要事項: 衝突決斷方法應該在全部集羣的節點上以相同的的方式定義. 不一樣節點上若是存在不一樣的衝突決斷方式, 將會致使意外的結果.

進程組選項

當前沒有進程組選項

內部機制

在底層, Syn 採用髒讀/寫到分佈式的基於內存的Mnesia表中, 跨集羣的多個節點進行復制.

要自動決斷衝突, Syn 實現了一個unsplit機制的一個簡化版本.

Syn 的壓力測試結果

1 Node 2 Nodes 3 Nodes 4 Nodes
Reg / second 106,324 52,792 60,958 40,929
Retrieve registered Key (ms) 0 0 0 56
Unreg / second 105,506 50,591 67,042 42,896
Retrieve unregistered Key (ms) 0 0 0 0
Re-Reg / second 106,424 51,322 77,258 47,125
Retrieve re-registered Key (ms) 0 0 0 0
Retrieve Key of killed Pid (ms) 719 995 1,577 1,825

項目主頁

https://github.com/ostinelli/syn

相關文章
相關標籤/搜索