//鏈接代碼。 using (var client = await StartClientWithRetries()) { }
從方法看,只是一個簡單容許重試的啓動客戶端。追蹤進去會發現關於重試邏輯的實踐,Socket編程的實踐,基於內存的消息隊列的實踐,依賴注入。再看源碼的基礎上,最好能配合一些理論書籍來看。理論指導實踐,實踐反饋理論,纔是技術成長的步驟。css
這篇文章只涉及Connect所引用方法的部分說明,一步一步來加深理解。
原本我是打算把orleans研究透以後再來寫一篇,但看了一週以後,發下connect裏面調用了不少類,每一個類又有不少方法,這樣下去沒有盡頭,到最終估計什麼也寫不成。html
分析源碼原本就是循環漸進的過程,也是一個熟悉框架/原理/實踐的過程。直接跳過這個步驟,必然損失良多。因此這部分就叫開胃菜吧。在查看connect過程,會愈來愈接觸到各類知識。git
本篇暫不涉及數據持久化,主要依賴.netcore內置方法操縱內存實現。github
擴展知識之Timer&TimerQueue
Timerweb
Timer 在設置的間隔後生成事件,並提供生成重複事件的選項 TimerQueue 時間隊列
擴展知識之信號量
SemaphoreSlim
SemaphoreSlim 實現編程
//信號量 SemaphoreSlim 表示Semaphore的輕量級替代,它限制了能夠同時訪問資源或資源池的線程數 >>Release 釋放 >> Wait 等待。 信號量有兩種類型:本地信號量和命名系統信號量。前者是應用程序的本地。後者在整個操做系統中是可見的,而且適用於進程間同步。該SemaphoreSlim是一個輕量級替代信號量不使用Windows內核中的信號類。與Semaphore類不一樣,SemaphoreSlim類不支持命名系統信號量。您只能將其用做本地信號量。所述SemaphoreSlim類爲單一的應用程序內的同步推薦的信號量。
擴展知識之BlockingCollection
BlockingCollection介紹
利用BlockingCollection實現生產者和消費者隊列json
BlockingCollection 爲實現 IProducerConsumerCollection<T> 的線程安全集合提供阻塞和限制功能。 >> Take >> Add 有這個類型,
擴展知識之Interlocked
Interlocked數組
Interlocked爲多個線程共享的變量提供原子操做。 >>Add >>Decrement以原子操做的形式遞減指定變量的值並存儲結果。 >>Increment以原子操做的形式遞增指定變量的值並存儲結果 >>Exchange >>CompareExchange >>Read 我的想法:和Redis的Increment/Decrement相似,部分狀況下能夠取代Redis的increment/decrement,提升速度。
擴展知識之SpinWait
SpinWait
兩階段提交
Monitor安全
SpinWait 爲基於旋轉的等待提供支持。 SpinWait是一種值類型,這意味着低級代碼可使用SpinWait而沒必要擔憂沒必要要的分配開銷。SpinWait一般不適用於普通應用程序。在大多數狀況下,您應該使用.NET Framework提供的同步類,例如Monitor >> SpinOnce
擴展知識之Queue&Stack
Queue
Stackruby
Queue<T> 表示先進先出的對象集合,此類將通用隊列實現爲循環數組。存儲在隊列<T>中的對象在一端插入並從另外一端移除。 >Enqueue >Dequeue >Peek Stack<T> 表示具備相同指定類型的實例的可變大小後進先出(LIFO)集合。 >Push >Pop >PeeK ConcurrentQueue <T> 表示線程安全的先進先出的對象集合 ConcurrentStack <T> 表示線程安全的後進先出(LIFO)集合 若是須要以與存儲在集合中的順序相同的順序訪問信息,請使用Queue <T>。若是須要以相反的順序訪問信息,請使用Stack <T>。使用ConcurrentQueue <T>或ConcurrentStack <T> 若是您須要同時從多個線程訪問該集合。
擴展知識之Task
TaskCompletionSource
基於Task的異步模式--全面介紹
TaskCompletionSource表示未綁定到委託的Task <TResult>的生產者端,經過Task屬性提供對使用者端的訪問。
擴展知識之線程安全的集合
System.Collections.Concurrent
ConcurrentDictionary
ConcurrentDictionary 對決 Dictionary+Locking
System.Collections.Concurrent提供了應在的地方對應的類型在使用幾個線程安全的集合類System.Collections中和System.Collections.Generic命名空間,只要多線程併發訪問的集合。 可是,經過當前集合實現的其中一個接口訪問的成員(包括擴展方法)不保證是線程安全的,而且可能須要由調用者同步。 ConcurrentDictionary:表示能夠由多個線程同時訪問的鍵/值對的線程安全集合 對於ConcurrentDictionary <TKey,TValue>類上的全部其餘操做,全部這些操做都是原子操做而且是線程安全的。惟一的例外是接受委託的方法,即AddOrUpdate和GetOrAdd。對於字典的修改和寫入操做,ConcurrentDictionary <TKey,TValue>使用細粒度鎖定來確保線程安全。(對字典的讀取操做是以無鎖方式執行的。)可是,這些方法的委託在鎖外部調用,以免在鎖定下執行未知代碼時可能出現的問題。所以,這些代理執行的代碼不受操做的原子性影響。
擴展知識之網絡編程
Socket微軟官方文檔
Socket博客園
Socket 類提供一組豐富的方法和屬性進行網絡通訊 TCP協議 >BeginConnect >EndConnect >BeginSend >EndSend >BeginReceive >EndReceive >BeginAccept >EndAccept UDP協議 >BeginSendTo >EndSendTo >BeginReceiveFromandEndReceiveFrom
擴展知識之線程通知:
AutoResetEvent
ManualResetEvent
ManualResetEventSlim
AutoResetEvent容許線程經過信令相互通訊。一般,當線程須要對資源的獨佔訪問時,可使用此類。 >Set釋放線程 >WaitOne等待線程 ManualResetEvent 通知一個或多個等待線程發生了事件 ManualResetEventSlim 當等待時間預期很是短,而且事件未跨越進程邊界時,您可使用此類以得到比ManualResetEvent更好的性能
擴展知識之依賴注入:
ActivatorUtilities
擴展.net-使用.netcore進行依賴注入
服務能夠經過兩種機制來解析: IServiceProvider ActivatorUtilities – 容許在依賴關係注入容器中建立沒有服務註冊的對象。 ActivatorUtilities 用於面向用戶的抽象,例如標記幫助器、MVC 控制器、SignalR 集線器和模型綁定器。 >ActivatorUtilities.CreateInstance >ActivatorUtilities.GetServiceOrCreateInstance
//鏈接代碼。 using (var client = await StartClientWithRetries()) { await DoClientWork(client); Console.ReadKey(); }
UseLocalhostClustering 用來配置鏈接參數:端口/ClusterId/ServiceId等。 配置一個鏈接本地silo的客戶端,也有其餘類型的如: UseServiceProviderFactory,UseStaticClustering
ConfigureLogging配置日誌參數擴展閱讀
Build用來註冊默認服務和構建容器,擴展瞭解依賴注入知識。微軟自帶Microsoft.Extensions.DependencyInjection庫
private static async Task<IClusterClient> StartClientWithRetries() { attempt = 0; IClusterClient client; client = new ClientBuilder() .UseLocalhostClustering() .Configure<ClusterOptions>(options => { options.ClusterId = "dev"; options.ServiceId = "HelloWorldApp"; }) .ConfigureLogging(logging => logging.AddConsole()) .Build(); await client.Connect(RetryFilter); Console.WriteLine("Client successfully connect to silo host"); return client; }
這裏的LockAsync,內部用了SemaphoreSlim.Wait須要擴展瞭解下。和lock的區別。信號量本地信號量和系統信號量。
這裏用state來維護生命週期
public async Task Connect(Func<Exception, Task<bool>> retryFilter = null) { this.ThrowIfDisposedOrAlreadyInitialized(); using (await this.initLock.LockAsync().ConfigureAwait(false)) { this.ThrowIfDisposedOrAlreadyInitialized(); if (this.state == LifecycleState.Starting) { throw new InvalidOperationException("A prior connection attempt failed. This instance must be disposed."); } this.state = LifecycleState.Starting; if (this.runtimeClient is OutsideRuntimeClient orc) await orc.Start(retryFilter).ConfigureAwait(false); await this.clusterClientLifecycle.OnStart().ConfigureAwait(false); this.state = LifecycleState.Started; } }
public async Task Start(Func<Exception, Task<bool>> retryFilter = null) { // Deliberately avoid capturing the current synchronization context during startup and execute on the default scheduler. // This helps to avoid any issues (such as deadlocks) caused by executing with the client's synchronization context/scheduler. await Task.Run(() => this.StartInternal(retryFilter)).ConfigureAwait(false); logger.Info(ErrorCode.ProxyClient_StartDone, "{0} Started OutsideRuntimeClient with Global Client ID: {1}", BARS, CurrentActivationAddress.ToString() + ", client GUID ID: " + handshakeClientId); }
gateways獲取網關列表
transport用來維護客戶端消息管理。
RunClientMessagePump用來處理接收分發消息。
private async Task StartInternal(Func<Exception, Task<bool>> retryFilter) { // Initialize the gateway list provider, since information from the cluster is required to successfully // initialize subsequent services. var initializedGatewayProvider = new[] {false}; await ExecuteWithRetries(async () => { if (!initializedGatewayProvider[0]) { await this.gatewayListProvider.InitializeGatewayListProvider(); initializedGatewayProvider[0] = true; } var gateways = await this.gatewayListProvider.GetGateways(); if (gateways.Count == 0) { var gatewayProviderType = this.gatewayListProvider.GetType().GetParseableName(); var err = $"Could not find any gateway in {gatewayProviderType}. Orleans client cannot initialize."; logger.Error(ErrorCode.GatewayManager_NoGateways, err); throw new SiloUnavailableException(err); } }, retryFilter); var generation = -SiloAddress.AllocateNewGeneration(); // Client generations are negative transport = ActivatorUtilities.CreateInstance<ClientMessageCenter>(this.ServiceProvider, localAddress, generation, handshakeClientId); transport.Start(); CurrentActivationAddress = ActivationAddress.NewActivationAddress(transport.MyAddress, handshakeClientId); listeningCts = new CancellationTokenSource(); var ct = listeningCts.Token; listenForMessages = true; // Keeping this thread handling it very simple for now. Just queue task on thread pool. Task.Run( () => { while (listenForMessages && !ct.IsCancellationRequested) { try { RunClientMessagePump(ct); } catch (Exception exc) { logger.Error(ErrorCode.Runtime_Error_100326, "RunClientMessagePump has thrown exception", exc); } } }, ct).Ignore(); await ExecuteWithRetries( async () => this.GrainTypeResolver = await transport.GetGrainTypeResolver(this.InternalGrainFactory), retryFilter); this.typeMapRefreshTimer = new AsyncTaskSafeTimer( this.logger, RefreshGrainTypeResolver, null, this.typeMapRefreshInterval, this.typeMapRefreshInterval); ClientStatistics.Start(transport, clientId); await ExecuteWithRetries(StreamingInitialize, retryFilter); async Task ExecuteWithRetries(Func<Task> task, Func<Exception, Task<bool>> shouldRetry) { while (true) { try { await task(); return; } catch (Exception exception) when (shouldRetry != null) { var retry = await shouldRetry(exception); if (!retry) throw; } } } }
用來處理消息分發等,也涉及網關部分調用。
public ClientMessageCenter( IOptions<GatewayOptions> gatewayOptions, IOptions<ClientMessagingOptions> clientMessagingOptions, IPAddress localAddress, int gen, GrainId clientId, IGatewayListProvider gatewayListProvider, SerializationManager serializationManager, IRuntimeClient runtimeClient, MessageFactory messageFactory, IClusterConnectionStatusListener connectionStatusListener, ExecutorService executorService, ILoggerFactory loggerFactory, IOptions<NetworkingOptions> networkingOptions, IOptions<StatisticsOptions> statisticsOptions) { this.loggerFactory = loggerFactory; this.openConnectionTimeout = networkingOptions.Value.OpenConnectionTimeout; this.SerializationManager = serializationManager; this.executorService = executorService; lockable = new object(); MyAddress = SiloAddress.New(new IPEndPoint(localAddress, 0), gen); ClientId = clientId; this.RuntimeClient = runtimeClient; this.messageFactory = messageFactory; this.connectionStatusListener = connectionStatusListener; Running = false; GatewayManager = new GatewayManager(gatewayOptions.Value, gatewayListProvider, loggerFactory); PendingInboundMessages = new BlockingCollection<Message>(); gatewayConnections = new Dictionary<Uri, GatewayConnection>(); numMessages = 0; grainBuckets = new WeakReference[clientMessagingOptions.Value.ClientSenderBuckets]; logger = loggerFactory.CreateLogger<ClientMessageCenter>(); if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Proxy grain client constructed"); IntValueStatistic.FindOrCreate( StatisticNames.CLIENT_CONNECTED_GATEWAY_COUNT, () => { lock (gatewayConnections) { return gatewayConnections.Values.Count(conn => conn.IsLive); } }); statisticsLevel = statisticsOptions.Value.CollectionLevel; if (statisticsLevel.CollectQueueStats()) { queueTracking = new QueueTrackingStatistic("ClientReceiver", statisticsOptions); } }
WaitMessage裏面利用了BlockingCollection.Take
private void RunClientMessagePump(CancellationToken ct) { incomingMessagesThreadTimeTracking?.OnStartExecution(); while (listenForMessages) { var message = transport.WaitMessage(Message.Categories.Application, ct); if (message == null) // if wait was cancelled break; // when we receive the first message, we update the // clientId for this client because it may have been modified to // include the cluster name if (!firstMessageReceived) { firstMessageReceived = true; if (!handshakeClientId.Equals(message.TargetGrain)) { clientId = message.TargetGrain; transport.UpdateClientId(clientId); CurrentActivationAddress = ActivationAddress.GetAddress(transport.MyAddress, clientId, CurrentActivationAddress.Activation); } else { clientId = handshakeClientId; } } switch (message.Direction) { case Message.Directions.Response: { ReceiveResponse(message); break; } case Message.Directions.OneWay: case Message.Directions.Request: { this.localObjects.Dispatch(message); break; } default: logger.Error(ErrorCode.Runtime_Error_100327, $"Message not supported: {message}."); break; } } incomingMessagesThreadTimeTracking?.OnStopExecution(); }
這裏主要是對response作一些判斷處理。
public void ReceiveResponse(Message response) { if (logger.IsEnabled(LogLevel.Trace)) logger.Trace("Received {0}", response); // ignore duplicate requests if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.DuplicateRequest) return; CallbackData callbackData; var found = callbacks.TryGetValue(response.Id, out callbackData); if (found) { // We need to import the RequestContext here as well. // Unfortunately, it is not enough, since CallContext.LogicalGetData will not flow "up" from task completion source into the resolved task. // RequestContextExtensions.Import(response.RequestContextData); callbackData.DoCallback(response); } else { logger.Warn(ErrorCode.Runtime_Error_100011, "No callback for response message: " + response); } } //DoCallBack public void DoCallback(Message response) { if (this.IsCompleted) return; var requestStatistics = this.shared.RequestStatistics; lock (this) { if (this.IsCompleted) return; if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.Transient) { if (this.shared.ShouldResend(this.Message)) { return; } } this.IsCompleted = true; if (requestStatistics.CollectApplicationRequestsStats) { this.stopwatch.Stop(); } this.shared.Unregister(this.Message); } if (requestStatistics.CollectApplicationRequestsStats) { requestStatistics.OnAppRequestsEnd(this.stopwatch.Elapsed); } // do callback outside the CallbackData lock. Just not a good practice to hold a lock for this unrelated operation. this.shared.ResponseCallback(response, this.context); } //this.shared.Unregister(this.Message);
這裏面用ConcurrentDictionary<GuidId, LocalObjectData>來判斷ObserverId是否存在,不存在移除。
若是存在,利用Queue的Enqueue將消息插入隊列。
若是啓動成功,異步調用LocalObjectMessagePumpAsync,而後利用Queue
的Dequeue來取的最新消息,
而後調用SendResponseAsync來發送消息
private async Task LocalObjectMessagePumpAsync(LocalObjectData objectData) { while (true) { try { Message message; lock (objectData.Messages) { if (objectData.Messages.Count == 0) { objectData.Running = false; break; } message = objectData.Messages.Dequeue(); } if (ExpireMessageIfExpired(message, MessagingStatisticsGroup.Phase.Invoke)) continue; RequestContextExtensions.Import(message.RequestContextData); var request = (InvokeMethodRequest)message.GetDeserializedBody(this.serializationManager); var targetOb = (IAddressable)objectData.LocalObject.Target; object resultObject = null; Exception caught = null; try { // exceptions thrown within this scope are not considered to be thrown from user code // and not from runtime code. var resultPromise = objectData.Invoker.Invoke(targetOb, request); if (resultPromise != null) // it will be null for one way messages { resultObject = await resultPromise; } } catch (Exception exc) { // the exception needs to be reported in the log or propagated back to the caller. caught = exc; } if (caught != null) this.ReportException(message, caught); else if (message.Direction != Message.Directions.OneWay) this.SendResponseAsync(message, resultObject); } catch (Exception) { // ignore, keep looping. } } }
SendResponseAsync通過序列化,DeepCopy,賦值各類請求參數等各類操做之後,來到最關鍵的部分
transport.SendMessage
第一步先獲取活動的網關(silo),如沒有則創建GatewayConnection
第二步啓動ConnectionConnect--調用socket建立鏈接
Start--GatewayClientReceiver間接調用Socket來接收消息,
public void SendMessage(Message msg) { GatewayConnection gatewayConnection = null; bool startRequired = false; if (!Running) { this.logger.Error(ErrorCode.ProxyClient_MsgCtrNotRunning, $"Ignoring {msg} because the Client message center is not running"); return; } // If there's a specific gateway specified, use it if (msg.TargetSilo != null && GatewayManager.GetLiveGateways().Contains(msg.TargetSilo.ToGatewayUri())) { Uri addr = msg.TargetSilo.ToGatewayUri(); lock (lockable) { if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive) { gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, executorService, this.loggerFactory, this.openConnectionTimeout); gatewayConnections[addr] = gatewayConnection; if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Creating gateway to {0} for pre-addressed message", addr); startRequired = true; } } } // For untargeted messages to system targets, and for unordered messages, pick a next connection in round robin fashion. else if (msg.TargetGrain.IsSystemTarget || msg.IsUnordered) { // Get the cached list of live gateways. // Pick a next gateway name in a round robin fashion. // See if we have a live connection to it. // If Yes, use it. // If not, create a new GatewayConnection and start it. // If start fails, we will mark this connection as dead and remove it from the GetCachedLiveGatewayNames. lock (lockable) { int msgNumber = numMessages; numMessages = unchecked(numMessages + 1); IList<Uri> gatewayNames = GatewayManager.GetLiveGateways(); int numGateways = gatewayNames.Count; if (numGateways == 0) { RejectMessage(msg, "No gateways available"); logger.Warn(ErrorCode.ProxyClient_CannotSend, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager); return; } Uri addr = gatewayNames[msgNumber % numGateways]; if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive) { gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout); gatewayConnections[addr] = gatewayConnection; if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayUnordered, "Creating gateway to {0} for unordered message to grain {1}", addr, msg.TargetGrain); startRequired = true; } // else - Fast path - we've got a live gatewayConnection to use } } // Otherwise, use the buckets to ensure ordering. else { var index = msg.TargetGrain.GetHashCode_Modulo((uint)grainBuckets.Length); lock (lockable) { // Repeated from above, at the declaration of the grainBuckets array: // Requests are bucketed by GrainID, so that all requests to a grain get routed through the same bucket. // Each bucket holds a (possibly null) weak reference to a GatewayConnection object. That connection instance is used // if the WeakReference is non-null, is alive, and points to a live gateway connection. If any of these conditions is // false, then a new gateway is selected using the gateway manager, and a new connection established if necessary. var weakRef = grainBuckets[index]; if ((weakRef != null) && weakRef.IsAlive) { gatewayConnection = weakRef.Target as GatewayConnection; } if ((gatewayConnection == null) || !gatewayConnection.IsLive) { var addr = GatewayManager.GetLiveGateway(); if (addr == null) { RejectMessage(msg, "No gateways available"); logger.Warn(ErrorCode.ProxyClient_CannotSend_NoGateway, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager); return; } if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_NewBucketIndex, "Starting new bucket index {0} for ordered messages to grain {1}", index, msg.TargetGrain); if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive) { gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout); gatewayConnections[addr] = gatewayConnection; if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayToGrain, "Creating gateway to {0} for message to grain {1}, bucket {2}, grain id hash code {3}X", addr, msg.TargetGrain, index, msg.TargetGrain.GetHashCode().ToString("x")); startRequired = true; } grainBuckets[index] = new WeakReference(gatewayConnection); } } } if (startRequired) { gatewayConnection.Start(); if (!gatewayConnection.IsLive) { // if failed to start Gateway connection (failed to connect), try sending this msg to another Gateway. RejectOrResend(msg); return; } } try { gatewayConnection.QueueRequest(msg); if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_QueueRequest, "Sending message {0} via gateway {1}", msg, gatewayConnection.Address); } catch (InvalidOperationException) { // This exception can be thrown if the gateway connection we selected was closed since we checked (i.e., we lost the race) // If this happens, we reject if the message is targeted to a specific silo, or try again if not RejectOrResend(msg); } }
public void Connect() { if (!MsgCenter.Running) { if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_MsgCtrNotRunning, "Ignoring connection attempt to gateway {0} because the proxy message center is not running", Address); return; } // Yes, we take the lock around a Sleep. The point is to ensure that no more than one thread can try this at a time. // There's still a minor problem as written -- if the sending thread and receiving thread both get here, the first one // will try to reconnect. eventually do so, and then the other will try to reconnect even though it doesn't have to... // Hopefully the initial "if" statement will prevent that. lock (Lockable) { if (!IsLive) { if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_DeadGateway, "Ignoring connection attempt to gateway {0} because this gateway connection is already marked as non live", Address); return; // if the connection is already marked as dead, don't try to reconnect. It has been doomed. } for (var i = 0; i < ClientMessageCenter.CONNECT_RETRY_COUNT; i++) { try { if (Socket != null) { if (Socket.Connected) return; MarkAsDisconnected(Socket); // clean up the socket before reconnecting. } if (lastConnect != new DateTime()) { // We already tried at least once in the past to connect to this GW. // If we are no longer connected to this GW and it is no longer in the list returned // from the GatewayProvider, consider directly this connection dead. if (!MsgCenter.GatewayManager.GetLiveGateways().Contains(Address)) break; // Wait at least ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY before reconnection tries var millisecondsSinceLastAttempt = DateTime.UtcNow - lastConnect; if (millisecondsSinceLastAttempt < ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY) { var wait = ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY - millisecondsSinceLastAttempt; if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_PauseBeforeRetry, "Pausing for {0} before trying to connect to gateway {1} on trial {2}", wait, Address, i); Thread.Sleep(wait); } } lastConnect = DateTime.UtcNow; Socket = new Socket(Silo.Endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); Socket.EnableFastpath(); SocketManager.Connect(Socket, Silo.Endpoint, this.openConnectionTimeout); NetworkingStatisticsGroup.OnOpenedGatewayDuplexSocket(); MsgCenter.OnGatewayConnectionOpen(); SocketManager.WriteConnectionPreamble(Socket, MsgCenter.ClientId); // Identifies this client Log.Info(ErrorCode.ProxyClient_Connected, "Connected to gateway at address {0} on trial {1}.", Address, i); return; } catch (Exception ex) { Log.Warn(ErrorCode.ProxyClient_CannotConnect, $"Unable to connect to gateway at address {Address} on trial {i} (Exception: {ex.Message})"); MarkAsDisconnected(Socket); } } // Failed too many times -- give up MarkAsDead(); } }
GatewayConnection的Start會調用到GatewayClientReceiver的Run方法,利用BlockingCollection
protected override void Run() { try { while (!Cts.IsCancellationRequested) { int bytesRead = FillBuffer(buffer.BuildReceiveBuffer()); if (bytesRead == 0) { continue; } buffer.UpdateReceivedData(bytesRead); Message msg; while (buffer.TryDecodeMessage(out msg)) { gatewayConnection.MsgCenter.QueueIncomingMessage(msg); if (Log.IsEnabled(LogLevel.Trace)) Log.Trace("Received a message from gateway {0}: {1}", gatewayConnection.Address, msg); } } } catch (Exception ex) { buffer.Reset(); Log.Warn(ErrorCode.ProxyClientUnhandledExceptionWhileReceiving, $"Unexpected/unhandled exception while receiving: {ex}. Restarting gateway receiver for {gatewayConnection.Address}.", ex); throw; } }
Orleans用於處理定時或延時回調做業。
建立一個簡單的connect,裏面有這麼多溝溝渠渠,但本質上來講,最底層是利用Socket套接字機制來實施機制。在Socket的基礎之上,又封裝維護了一層GatewayConnection和GatewayClientReceiver來實現網關(Silo)的操做,好比重試/監控/熔斷等,再結合Timer,Queue
,BlockingCollection,Task,ConcurrentDictionary,Interlocked等知識,構建一個可用的通訊框架。
說來容易幾句話,實現起來都是淚。
若是您徹底熟悉異步編程,並行編程,Socket網絡編程。又對分佈式/微服務理論有較深的理解,那麼orleans實現機制,對您來講多是相對容易。
本期結束,下期更精彩!