一個client發送消息給orleans, 就只須要掉用Grain的函數就好了. 可是有時候Grain須要發送消息給client, 在orleans裏面, 就只能經過Observer
來實現.服務器
1 public interface IChat : IGrainObserver 2 { 3 void ReceiveMessage(string message); 4 } 5 6 public class Chat : IChat 7 { 8 public void ReceiveMessage(string message) 9 { 10 Console.WriteLine(message); 11 } 12 } 13 14 class HelloGrain : Grain, IHello 15 { 16 private ObserverSubscriptionManager<IChat> _subsManager; 17 public override async Task OnActivateAsync() 18 { 19 _subsManager = new ObserverSubscriptionManager<IChat>(); 20 await base.OnActivateAsync(); 21 } 22 public async Task Subscribe(IChat observer) 23 { 24 _subsManager.Subscribe(observer); 25 } 26 public async Task UnSubscribe(IChat observer) 27 { 28 _SubsManager.Unsubscribe(observer); 29 } 30 } 31 32 public Task SendUpdateMessage(string message) 33 { 34 _SubsManager.Notify(s => s.ReceiveMessage(message)); 35 return TaskDone.Done; 36 } 37 38 //下面就是Grain發送消息給Client的代碼 39 var friend = GrainClient.GrainFactory.GetGrain<IHello>(0); 40 Chat c = new Chat(); 41 42 var obj = await GrainClient.GrainFactory.CreateObjectReference<IChat>(c); 43 await friend.Subscribe(obj);
有了上面的代碼, 咱們就能夠按照本身的需求造一個廣播出來.async
1 enum DestType 2 { 3 DestType_All = 1, 4 DestType_Server = 2, 5 DestType_Player = 3, 6 } 7 8 //這是咱們的觀察者 9 public interface IGatewayObserver : IGrainObserver 10 { 11 void SendMessage(int destType, long dest, int msgid, byte[] buffer); 12 } 13 14 public interface IAllGatewayGrain : IGrainWithIntegerKey 15 { 16 //註冊網關 17 Task RegisterGateway(string key); 18 Task UnRegisterGateway(string key); 19 20 //發送消息 21 Task SendMessage(int destType, long dest, int msgid, byte[] buffer); 22 23 //註冊觀察者 24 Task RegisterObserver(string gateway, IGatewayObserver); 25 } 26 27 public interface IGatewayGrain : IGrainWithStringKey 28 { 29 Task SendMessage(int destType, long dest, int msgid, byte[] buffer); 30 31 Task RegisterObserver(string gateway, IGatewayObserver); 32 }
上面是接口的設計, 而後只須要在Client啓動的時候, 把本身註冊到兩個Grain
裏面去, 而後其餘的Grain就能夠經過兩個Grain來發送針對全部人
, 服務器
, 和我的
的消息了.ide