走進 Akka.NET

官方文檔:https://getakka.net/index.htmlhtml

官網:https://petabridge.com/node

 

1、Akka.NET 是什麼?mysql

Akka 是一個構建高併發、分佈式和彈性消息驅動的工具包。Akka.NET 是 Akka 的一個 .NET 的移植庫。git

Akka.NET 內部都是 Actor 構成的,Actor 是一個狀態、行爲、郵箱、子節點和監視者策略構成的容器。github

 

2、Akka.NET 的一些基礎模塊sql

Akka - 核心 Actor 庫
Akka.Remote - 跨節點 Actor 部署/通訊
Akka.Cluster - 彈性 Actor 網絡(HA)
Akka.Persistence - 事件源, 持久化 Actor 狀態 & 恢復
Akka.Streams - 流式工做流
Akka.Cluster.Tools - 集羣單例, 分佈式發佈/訂閱
Akka.Cluster.Sharding - 持久化狀態分區
Akka.DData - 最終一直的數據複製數據庫

 

3、Akka.NET 的架構api

 

如圖所示,一個 akka 系統有一個跟節點 root,而後 root 有2個子節點,user 和 system ,你定義的 actor 都在 user 下,system 下都是系統定義的緩存

 

4、Akka.NET 發送消息的規則安全

  • 最多一次交付

  • 每一個配對的發送者、接收者對消息保持排序。

 

5、基礎 Actor 類型

  • ReceiverActor:若是使用這個類型的 Actor 爲基類,使用 Receive<T>(msg=>{}); 來接收消息
  • UntypedActor:若是使用這個類型的 Actor 爲基類,須要重寫 OnReceive(object message) 方法,而後區分 message 類型

 

6、使用 Akka.NET

  • 普通模式

由於 Helloworld 的例子太多了,官網也有,這裏直接略過。

代碼在這裏  

代碼裏有些特殊狀況的處理沒寫,這裏簡單說一下

好比 sender 發送給了 receiver 一個消息,可是 receiver 還有些事情沒有準備好,好比發送來一個消息,可是可能須要先登陸,那就先緩存下來,等辦完事再處理

流程是:先 Stash 消息,開啓 Scheduler ,發送 GetToken 消息,token 獲取成功返回 GetTokenSuccess 消息,取消 Scheduler ,變成 Redy 狀態,正常處理消息

public ReceiveActor()
{
        private ICancelable _cancel;

        public ReceiveActor()
        {
            Receive<TestMsg>(msg =>
            {
                Stash.Stash();
            });

            Receive<GetToken>(msg =>
            {
                GetTokenProcess();
            }

            Receive<GetTokenSuccess>(msg =>
            {
                _cancel?.Cancel();
                _cancel = null;

                Become(Ready);
            });

            if (_cancel == null)
            {
                _cancel = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.Zero, TimeSpan.FromSeconds(10), Self, new GetToken(), Self);
            }
        }

        private void Ready()
        {
            Receive<TestMsg>(msg =>
            {
                //Todo
            });

            Stash.UnstashAll();
        }

        private void GetTokenProcess()
        {
            var tokenActor = Context.ActorOf(Props.Create(() => new TokenActor("url")));
            tokenActor.Ask<GetTokenSuccess>(new GetToken
            {
                Token = "",
            }).ContinueWith(tr =>
            {
                if (tr.IsCanceled || tr.IsFaulted)
                {
                    return new GetTokenSuccess();
                }

                return tr.Result;
            }).PipeTo(Self);
        }

        public class GetToken
        {
            public string Token { get; set; }
        }
}

 

 

  • Router

Actor 內部實際上是單線程的處理消息的,可是用了 Router 能夠把消息分出去給多個節點處理,這裏分爲2種模式,group 和 pool

先來看 group 配置

akka {
   actor{
      provider = cluster
      deployment {
        /api/myClusterGroupRouter {
          router = broadcast-group # routing strategy
          routees.paths = ["/user/api"] # path of routee on each node
          nr-of-instances = 3 # max number of total routees
          cluster {
             enabled = on
             allow-local-routees = on
             use-role = crawler
          }
        }
      }
   }
}

 

 如圖看到在 89節點上註冊了 90 和 91 節點的 Routees

 

pool 模式

akka {
   actor{
      provider = cluster
      deployment {
        /api/myClusterPoolRouter {
          router = round-robin-pool # routing strategy
          nr-of-instances = 10 # max number of total routees
          cluster {
             enabled = on
             allow-local-routees = on
             use-role = crawler
             max-nr-of-instances-per-node = 1
          }
        }
      }
   }
}

這種模式實際上是在 90 和 91 的節點上分別註冊了 89 的 routee,89上是akka.tcp://Sys@localhost:90/remote/akka/tcp/localhost:89/$a 和 akka.tcp://Sys@localhost:91/remote/akka/tcp/localhost:89/$a

 具體代碼在這裏 

  • Shard

用邏輯 id 聯繫,不關心集羣中的物理位置或管理他們的建立,能夠 rebalance (消耗資源比較大,只有差別比較大的時候纔會,默認 rebalance-threshold = 10,shard 數量設置爲集羣節點最大數量的10倍,shard 數量太大消耗大)

整個 ActorSystem 中只能有一個Codinator,在最老的節點上,由於最老的節點被認爲是安全的。

如圖所示整個系統分爲了 Coordinator ,Shard region(typeName), Shard,Entity 部分,節點的路徑是  /user/sharding/<typeName>/<shardId>/<entityId> 和普通的路徑 /user/<actorName> 不同

這裏使用了 Persistent ,每種數據庫的配置徹底不一樣,我是 Redis 的配置,建議使用即時更新的數據庫,不更新的極可能是棄用的,好比 mysql。

具體代碼在這裏

這裏須要注意的是:

  1. 只能有一個 Lighthourse —— 否在爆 ID 13 error
  2. 建立 ActorSystem 的時候後邊加上 _config.WithFallback(ClusterClientReceptionist.DefaultConfig()).WithFallback(DistributedPubSub.DefaultConfig()); —— 否在爆 ID 9 error
  • 區別

Shard 看着好像也能處理 Router 的問題,但這裏又有些區別,此處摘自官方文檔,用的自動翻譯,請自行理解下:

在一致的散列場景中,整個密鑰空間被拼接在咱們在路由器配置中定義的參與者數量之間。這究竟意味着什麼?單個參與者實例負責處理整個範圍的密鑰。例如:假設在一個一致的哈希路由器後面,咱們有一個數字1-100的密鑰空間,在5個參與者之間平均共享。這意味着,第一個參與者將負責處理ID爲1-20、第二個參與者21-40、第三個參與者41-60等的全部消息。若是您的目標是每一個標識符有一個惟一的參與者,那麼對於您的狀況來講,這不是一個有效的場景。在集羣分片場景的另外一邊,每一個實體(這就是咱們如何引用分片參與者)由對(ShardId,EntityId)標識。它是1-1關係,所以用同一對標識的消息保證老是路由到相同的單個實體。 另外一個區別是靈活性。一致的哈希路由器幾乎是靜態的,在您想要調整集羣大小的狀況下,它們不能很好地工做。你記得之前提到過的拼接鍵空間的概念嗎?想象一下,一旦咱們改變分配給它的參與者/節點的實際數量,它將如何工做。固然,這些關鍵範圍也必須改變。所以,在集羣大小調整以後,之前處理ID爲1-20的消息的演員如今能夠處理不一樣的範圍(即,1-15)。這稱爲「分區切換」。當使用集羣切分時,咱們的參與者將可以隨着集羣大小的改變遷移到其餘節點,即便在集羣大小改變時,也可以保持消息id和分割實體之間的關係。

 

7、總結

除了 stream 沒研究,其餘的多少都看了一點,由於我沒有使用 stream 的場景,其餘發現 Helloworld 的入門真的沒什麼用,只是讓你理解 Actor 的開發的思想和模式,實際使用須要多看文檔,一點點研究,還有就是官網的博文,再就是一些大神寫的例子,

這裏放幾個網址供你們學習

GitHub:

https://github.com/cgstevens/FileProcessor

https://github.com/Horusiath/AkkaDemos

https://github.com/petabridge/akkadotnet-code-samples

https://github.com/Lutando/Akkatecture

 

文檔:

Streams:https://petabridge.com/blog/akkadotnet-11-cluster-streams/

Distributed Pub-Sub:https://petabridge.com/blog/distributed-pub-sub-intro-akkadotnet/

Shard:https://petabridge.com/blog/introduction-to-cluster-sharding-akkadotnet/

Shard:https://petabridge.com/blog/cluster-sharding-technical-overview-akkadotnet/

Petabridge.Cmd:https://petabridge.com/blog/petabridgecmd-release/

Moonitoring:https://github.com/petabridge/akka-monitoring

你是否是感受很奇怪,沒有 Dashboard 呢,實際上是有,可是收費,在這裏:https://phobos.petabridge.com/

相關文章
相關標籤/搜索