【Orleans開胃菜系列2】鏈接Connect源碼簡易分析

簡要說明

//鏈接代碼。
 using (var client = await StartClientWithRetries())
                {
                   
                }

從方法看,只是一個簡單容許重試的啓動客戶端。追蹤進去會發現關於重試邏輯的實踐,Socket編程的實踐,基於內存的消息隊列的實踐,依賴注入。再看源碼的基礎上,最好能配合一些理論書籍來看。理論指導實踐,實踐反饋理論,纔是技術成長的步驟。css

這篇文章只涉及Connect所引用方法的部分說明,一步一步來加深理解。
原本我是打算把orleans研究透以後再來寫一篇,但看了一週以後,發下connect裏面調用了不少類,每一個類又有不少方法,這樣下去沒有盡頭,到最終估計什麼也寫不成。html

分析源碼原本就是循環漸進的過程,也是一個熟悉框架/原理/實踐的過程。直接跳過這個步驟,必然損失良多。因此這部分就叫開胃菜吧。在查看connect過程,會愈來愈接觸到各類知識。git

本篇暫不涉及數據持久化,主要依賴.netcore內置方法操縱內存實現。github

您會接觸到的擴展知識

擴展知識之Timer&TimerQueue
Timerweb

Timer
在設置的間隔後生成事件,並提供生成重複事件的選項

TimerQueue
時間隊列

擴展知識之信號量
SemaphoreSlim
SemaphoreSlim 實現編程

//信號量
SemaphoreSlim
表示Semaphore的輕量級替代,它限制了能夠同時訪問資源或資源池的線程數
>>Release 釋放
>> Wait 等待。

信號量有兩種類型:本地信號量和命名系統信號量。前者是應用程序的本地。後者在整個操做系統中是可見的,而且適用於進程間同步。該SemaphoreSlim是一個輕量級替代信號量不使用Windows內核中的信號類。與Semaphore類不一樣,SemaphoreSlim類不支持命名系統信號量。您只能將其用做本地信號量。所述SemaphoreSlim類爲單一的應用程序內的同步推薦的信號量。

擴展知識之BlockingCollection
BlockingCollection介紹
利用BlockingCollection實現生產者和消費者隊列json

BlockingCollection
爲實現 IProducerConsumerCollection<T> 的線程安全集合提供阻塞和限制功能。
  >> Take
  >> Add
  有這個類型,

擴展知識之Interlocked
Interlocked數組

Interlocked爲多個線程共享的變量提供原子操做。
>>Add
>>Decrement以原子操做的形式遞減指定變量的值並存儲結果。
>>Increment以原子操做的形式遞增指定變量的值並存儲結果
>>Exchange
>>CompareExchange 
>>Read
我的想法:和Redis的Increment/Decrement相似,部分狀況下能夠取代Redis的increment/decrement,提升速度。

擴展知識之SpinWait
SpinWait
兩階段提交
Monitor安全

SpinWait
爲基於旋轉的等待提供支持。
SpinWait是一種值類型,這意味着低級代碼可使用SpinWait而沒必要擔憂沒必要要的分配開銷。SpinWait一般不適用於普通應用程序。在大多數狀況下,您應該使用.NET Framework提供的同步類,例如Monitor
>> SpinOnce

擴展知識之Queue&Stack
Queue
Stackruby

Queue<T>
表示先進先出的對象集合,此類將通用隊列實現爲循環數組。存儲在隊列<T>中的對象在一端插入並從另外一端移除。
>Enqueue
>Dequeue
>Peek

Stack<T>
表示具備相同指定類型的實例的可變大小後進先出(LIFO)集合。
>Push
>Pop
>PeeK

ConcurrentQueue <T>
表示線程安全的先進先出的對象集合
ConcurrentStack <T> 
表示線程安全的後進先出(LIFO)集合

若是須要以與存儲在集合中的順序相同的順序訪問信息,請使用Queue <T>。若是須要以相反的順序訪問信息,請使用Stack <T>。使用ConcurrentQueue <T>或ConcurrentStack <T> 若是您須要同時從多個線程訪問該集合。

擴展知識之Task
TaskCompletionSource
基於Task的異步模式--全面介紹

TaskCompletionSource表示未綁定到委託的Task <TResult>的生產者端,經過Task屬性提供對使用者端的訪問。

擴展知識之線程安全的集合
System.Collections.Concurrent
ConcurrentDictionary
ConcurrentDictionary 對決 Dictionary+Locking

System.Collections.Concurrent提供了應在的地方對應的類型在使用幾個線程安全的集合類System.Collections中和System.Collections.Generic命名空間,只要多線程併發訪問的集合。
可是,經過當前集合實現的其中一個接口訪問的成員(包括擴展方法)不保證是線程安全的,而且可能須要由調用者同步。

ConcurrentDictionary:表示能夠由多個線程同時訪問的鍵/值對的線程安全集合
對於ConcurrentDictionary <TKey,TValue>類上的全部其餘操做,全部這些操做都是原子操做而且是線程安全的。惟一的例外是接受委託的方法,即AddOrUpdate和GetOrAdd。對於字典的修改和寫入操做,ConcurrentDictionary <TKey,TValue>使用細粒度鎖定來確保線程安全。(對字典的讀取操做是以無鎖方式執行的。)可是,這些方法的委託在鎖外部調用,以免在鎖定下執行未知代碼時可能出現的問題。所以,這些代理執行的代碼不受操做的原子性影響。

擴展知識之網絡編程
Socket微軟官方文檔
Socket博客園

Socket 類提供一組豐富的方法和屬性進行網絡通訊
TCP協議
>BeginConnect
>EndConnect
>BeginSend
>EndSend
>BeginReceive
>EndReceive
>BeginAccept
>EndAccept
UDP協議
>BeginSendTo
>EndSendTo
>BeginReceiveFromandEndReceiveFrom

擴展知識之線程通知:
AutoResetEvent
ManualResetEvent
ManualResetEventSlim

AutoResetEvent容許線程經過信令相互通訊。一般,當線程須要對資源的獨佔訪問時,可使用此類。
>Set釋放線程
>WaitOne等待線程

ManualResetEvent
通知一個或多個等待線程發生了事件

ManualResetEventSlim
當等待時間預期很是短,而且事件未跨越進程邊界時,您可使用此類以得到比ManualResetEvent更好的性能

擴展知識之依賴注入:
ActivatorUtilities
擴展.net-使用.netcore進行依賴注入

服務能夠經過兩種機制來解析:
IServiceProvider
ActivatorUtilities – 容許在依賴關係注入容器中建立沒有服務註冊的對象。 ActivatorUtilities 用於面向用戶的抽象,例如標記幫助器、MVC 控制器、SignalR 集線器和模型綁定器。
>ActivatorUtilities.CreateInstance
>ActivatorUtilities.GetServiceOrCreateInstance

Client鏈接代碼。

//鏈接代碼。
 using (var client = await StartClientWithRetries())
                {
                    await DoClientWork(client);
                    Console.ReadKey();
                }

重點分析StartClientWithRetries

  • UseLocalhostClustering 用來配置鏈接參數:端口/ClusterId/ServiceId等。 配置一個鏈接本地silo的客戶端,也有其餘類型的如: UseServiceProviderFactory,UseStaticClustering

  • ConfigureLogging配置日誌參數擴展閱讀

  • Build用來註冊默認服務和構建容器,擴展瞭解依賴注入知識。微軟自帶Microsoft.Extensions.DependencyInjection庫

private static async Task<IClusterClient> StartClientWithRetries()
        {
            attempt = 0;
            IClusterClient client;
            client = new ClientBuilder()
                .UseLocalhostClustering()
                .Configure<ClusterOptions>(options =>
                {
                    options.ClusterId = "dev";
                    options.ServiceId = "HelloWorldApp";
                })
                .ConfigureLogging(logging => logging.AddConsole())
                .Build();

            await client.Connect(RetryFilter);
            Console.WriteLine("Client successfully connect to silo host");
            return client;
        }

先來看下connect

這裏的LockAsync,內部用了SemaphoreSlim.Wait須要擴展瞭解下。和lock的區別。信號量本地信號量和系統信號量。
這裏用state來維護生命週期

public async Task Connect(Func<Exception, Task<bool>> retryFilter = null)
        {
            this.ThrowIfDisposedOrAlreadyInitialized();
            using (await this.initLock.LockAsync().ConfigureAwait(false))
            {
                this.ThrowIfDisposedOrAlreadyInitialized();
                if (this.state == LifecycleState.Starting)
                {
                    throw new InvalidOperationException("A prior connection attempt failed. This instance must be disposed.");
                }
                
                this.state = LifecycleState.Starting;
                if (this.runtimeClient is OutsideRuntimeClient orc) await orc.Start(retryFilter).ConfigureAwait(false);
                await this.clusterClientLifecycle.OnStart().ConfigureAwait(false);
                this.state = LifecycleState.Started;
            }
        }

看下orc.Start

 public async Task Start(Func<Exception, Task<bool>> retryFilter = null)
        {
            // Deliberately avoid capturing the current synchronization context during startup and execute on the default scheduler.
            // This helps to avoid any issues (such as deadlocks) caused by executing with the client's synchronization context/scheduler.
            await Task.Run(() => this.StartInternal(retryFilter)).ConfigureAwait(false);

            logger.Info(ErrorCode.ProxyClient_StartDone, "{0} Started OutsideRuntimeClient with Global Client ID: {1}", BARS, CurrentActivationAddress.ToString() + ", client GUID ID: " + handshakeClientId);
        }

重要的StartInternal

gateways獲取網關列表
transport用來維護客戶端消息管理。
RunClientMessagePump用來處理接收分發消息。

 private async Task StartInternal(Func<Exception, Task<bool>> retryFilter)
        {
            // Initialize the gateway list provider, since information from the cluster is required to successfully
            // initialize subsequent services.
            var initializedGatewayProvider = new[] {false};
            await ExecuteWithRetries(async () =>
                {
                    if (!initializedGatewayProvider[0])
                    {
                        await this.gatewayListProvider.InitializeGatewayListProvider();
                        initializedGatewayProvider[0] = true;
                    }

                    var gateways = await this.gatewayListProvider.GetGateways();
                    if (gateways.Count == 0)
                    {
                        var gatewayProviderType = this.gatewayListProvider.GetType().GetParseableName();
                        var err = $"Could not find any gateway in {gatewayProviderType}. Orleans client cannot initialize.";
                        logger.Error(ErrorCode.GatewayManager_NoGateways, err);
                        throw new SiloUnavailableException(err);
                    }
                },
                retryFilter);

            var generation = -SiloAddress.AllocateNewGeneration(); // Client generations are negative
            transport = ActivatorUtilities.CreateInstance<ClientMessageCenter>(this.ServiceProvider, localAddress, generation, handshakeClientId);
            transport.Start();
            CurrentActivationAddress = ActivationAddress.NewActivationAddress(transport.MyAddress, handshakeClientId);

            listeningCts = new CancellationTokenSource();
            var ct = listeningCts.Token;
            listenForMessages = true;

            // Keeping this thread handling it very simple for now. Just queue task on thread pool.
            Task.Run(
                () =>
                {
                    while (listenForMessages && !ct.IsCancellationRequested)
                    {
                        try
                        {
                            RunClientMessagePump(ct);
                        }
                        catch (Exception exc)
                        {
                            logger.Error(ErrorCode.Runtime_Error_100326, "RunClientMessagePump has thrown exception", exc);
                        }
                    }
                },
                ct).Ignore();

            await ExecuteWithRetries(
                async () => this.GrainTypeResolver = await transport.GetGrainTypeResolver(this.InternalGrainFactory),
                retryFilter);

            this.typeMapRefreshTimer = new AsyncTaskSafeTimer(
                this.logger, 
                RefreshGrainTypeResolver, 
                null,
                this.typeMapRefreshInterval,
                this.typeMapRefreshInterval);

            ClientStatistics.Start(transport, clientId);
            
            await ExecuteWithRetries(StreamingInitialize, retryFilter);

            async Task ExecuteWithRetries(Func<Task> task, Func<Exception, Task<bool>> shouldRetry)
            {
                while (true)
                {
                    try
                    {
                        await task();
                        return;
                    }
                    catch (Exception exception) when (shouldRetry != null)
                    {
                        var retry = await shouldRetry(exception);
                        if (!retry) throw;
                    }
                }
            }
        }

重點關注下StartInternal裏面ClientMessageCenter的初始化

用來處理消息分發等,也涉及網關部分調用。

 public ClientMessageCenter(
            IOptions<GatewayOptions> gatewayOptions,
            IOptions<ClientMessagingOptions> clientMessagingOptions,
            IPAddress localAddress,
            int gen,
            GrainId clientId,
            IGatewayListProvider gatewayListProvider,
            SerializationManager serializationManager,
            IRuntimeClient runtimeClient,
            MessageFactory messageFactory,
            IClusterConnectionStatusListener connectionStatusListener,
            ExecutorService executorService,
            ILoggerFactory loggerFactory,
            IOptions<NetworkingOptions> networkingOptions,
            IOptions<StatisticsOptions> statisticsOptions)
        {
            this.loggerFactory = loggerFactory;
            this.openConnectionTimeout = networkingOptions.Value.OpenConnectionTimeout;
            this.SerializationManager = serializationManager;
            this.executorService = executorService;
            lockable = new object();
            MyAddress = SiloAddress.New(new IPEndPoint(localAddress, 0), gen);
            ClientId = clientId;
            this.RuntimeClient = runtimeClient;
            this.messageFactory = messageFactory;
            this.connectionStatusListener = connectionStatusListener;
            Running = false;
            GatewayManager = new GatewayManager(gatewayOptions.Value, gatewayListProvider, loggerFactory);
            PendingInboundMessages = new BlockingCollection<Message>();
            gatewayConnections = new Dictionary<Uri, GatewayConnection>();
            numMessages = 0;
            grainBuckets = new WeakReference[clientMessagingOptions.Value.ClientSenderBuckets];
            logger = loggerFactory.CreateLogger<ClientMessageCenter>();
            if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Proxy grain client constructed");
            IntValueStatistic.FindOrCreate(
                StatisticNames.CLIENT_CONNECTED_GATEWAY_COUNT,
                () =>
                {
                    lock (gatewayConnections)
                    {
                        return gatewayConnections.Values.Count(conn => conn.IsLive);
                    }
                });
            statisticsLevel = statisticsOptions.Value.CollectionLevel;
            if (statisticsLevel.CollectQueueStats())
            {
                queueTracking = new QueueTrackingStatistic("ClientReceiver", statisticsOptions);
            }
        }

關注下StartInternal的RunClientMessagePump

WaitMessage裏面利用了BlockingCollection.Take

 private void RunClientMessagePump(CancellationToken ct)
        {
            incomingMessagesThreadTimeTracking?.OnStartExecution();

            while (listenForMessages)
            {
                var message = transport.WaitMessage(Message.Categories.Application, ct);

                if (message == null) // if wait was cancelled
                    break;

                // when we receive the first message, we update the
                // clientId for this client because it may have been modified to
                // include the cluster name
                if (!firstMessageReceived)
                {
                    firstMessageReceived = true;
                    if (!handshakeClientId.Equals(message.TargetGrain))
                    {
                        clientId = message.TargetGrain;
                        transport.UpdateClientId(clientId);
                        CurrentActivationAddress = ActivationAddress.GetAddress(transport.MyAddress, clientId, CurrentActivationAddress.Activation);
                    }
                    else
                    {
                        clientId = handshakeClientId;
                    }
                }

                switch (message.Direction)
                {
                    case Message.Directions.Response:
                        {
                            ReceiveResponse(message);
                            break;
                        }
                    case Message.Directions.OneWay:
                    case Message.Directions.Request:
                        {
                            this.localObjects.Dispatch(message);
                            break;
                        }
                    default:
                        logger.Error(ErrorCode.Runtime_Error_100327, $"Message not supported: {message}.");
                        break;
                }
            }

            incomingMessagesThreadTimeTracking?.OnStopExecution();
        }

RunClientMessagePump裏面的ReceiveResponse

這裏主要是對response作一些判斷處理。

public void ReceiveResponse(Message response)
        {
            if (logger.IsEnabled(LogLevel.Trace)) logger.Trace("Received {0}", response);

            // ignore duplicate requests
            if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.DuplicateRequest)
                return;

            CallbackData callbackData;
            var found = callbacks.TryGetValue(response.Id, out callbackData);
            if (found)
            {
                // We need to import the RequestContext here as well.
                // Unfortunately, it is not enough, since CallContext.LogicalGetData will not flow "up" from task completion source into the resolved task.
                // RequestContextExtensions.Import(response.RequestContextData);
                callbackData.DoCallback(response);
            }
            else
            {
                logger.Warn(ErrorCode.Runtime_Error_100011, "No callback for response message: " + response);
            }
        }
        //DoCallBack
        public void DoCallback(Message response)
        {
            if (this.IsCompleted)
                return;
            var requestStatistics = this.shared.RequestStatistics;
            lock (this)
            {
                if (this.IsCompleted)
                    return;

                if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.Transient)
                {
                    if (this.shared.ShouldResend(this.Message))
                    {
                        return;
                    }
                }

                this.IsCompleted = true;
                if (requestStatistics.CollectApplicationRequestsStats)
                {
                    this.stopwatch.Stop();
                }

                this.shared.Unregister(this.Message);
            }

            if (requestStatistics.CollectApplicationRequestsStats)
            {
                requestStatistics.OnAppRequestsEnd(this.stopwatch.Elapsed);
            }

            // do callback outside the CallbackData lock. Just not a good practice to hold a lock for this unrelated operation.
            this.shared.ResponseCallback(response, this.context);
        }

        //this.shared.Unregister(this.Message);

RunClientMessagePump裏面的消息分發Dispatch(message)

這裏面用ConcurrentDictionary<GuidId, LocalObjectData>來判斷ObserverId是否存在,不存在移除。
若是存在,利用Queue 的Enqueue將消息插入隊列。

若是啓動成功,異步調用LocalObjectMessagePumpAsync,而後利用Queue 的Dequeue來取的最新消息,
而後調用SendResponseAsync來發送消息

private async Task LocalObjectMessagePumpAsync(LocalObjectData objectData)
        {
            while (true)
            {
                try
                {
                    Message message;
                    lock (objectData.Messages)
                    {
                        if (objectData.Messages.Count == 0)
                        {
                            objectData.Running = false;
                            break;
                        }

                        message = objectData.Messages.Dequeue();
                    }

                    if (ExpireMessageIfExpired(message, MessagingStatisticsGroup.Phase.Invoke))
                        continue;

                    RequestContextExtensions.Import(message.RequestContextData);
                    var request = (InvokeMethodRequest)message.GetDeserializedBody(this.serializationManager);
                    var targetOb = (IAddressable)objectData.LocalObject.Target;
                    object resultObject = null;
                    Exception caught = null;
                    try
                    {
                        // exceptions thrown within this scope are not considered to be thrown from user code
                        // and not from runtime code.
                        var resultPromise = objectData.Invoker.Invoke(targetOb, request);
                        if (resultPromise != null) // it will be null for one way messages
                        {
                            resultObject = await resultPromise;
                        }
                    }
                    catch (Exception exc)
                    {
                        // the exception needs to be reported in the log or propagated back to the caller.
                        caught = exc;
                    }

                    if (caught != null)
                        this.ReportException(message, caught);
                    else if (message.Direction != Message.Directions.OneWay)
                        this.SendResponseAsync(message, resultObject);
                }
                catch (Exception)
                {
                    // ignore, keep looping.
                }
            }
        }

SendResponseAsync通過序列化,DeepCopy,賦值各類請求參數等各類操做之後,來到最關鍵的部分
transport.SendMessage

第一步先獲取活動的網關(silo),如沒有則創建GatewayConnection
第二步啓動Connection

Connect--調用socket建立鏈接
Start--GatewayClientReceiver間接調用Socket來接收消息,

 public void SendMessage(Message msg)
        {
            GatewayConnection gatewayConnection = null;
            bool startRequired = false;

            if (!Running)
            {
                this.logger.Error(ErrorCode.ProxyClient_MsgCtrNotRunning, $"Ignoring {msg} because the Client message center is not running");
                return;
            }

            // If there's a specific gateway specified, use it
            if (msg.TargetSilo != null && GatewayManager.GetLiveGateways().Contains(msg.TargetSilo.ToGatewayUri()))
            {
                Uri addr = msg.TargetSilo.ToGatewayUri();
                lock (lockable)
                {
                    if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
                    {
                        gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, executorService, this.loggerFactory, this.openConnectionTimeout);
                        gatewayConnections[addr] = gatewayConnection;
                        if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Creating gateway to {0} for pre-addressed message", addr);
                        startRequired = true;
                    }
                }
            }
            // For untargeted messages to system targets, and for unordered messages, pick a next connection in round robin fashion.
            else if (msg.TargetGrain.IsSystemTarget || msg.IsUnordered)
            {
                // Get the cached list of live gateways.
                // Pick a next gateway name in a round robin fashion.
                // See if we have a live connection to it.
                // If Yes, use it.
                // If not, create a new GatewayConnection and start it.
                // If start fails, we will mark this connection as dead and remove it from the GetCachedLiveGatewayNames.
                lock (lockable)
                {
                    int msgNumber = numMessages;
                    numMessages = unchecked(numMessages + 1);
                    IList<Uri> gatewayNames = GatewayManager.GetLiveGateways();
                    int numGateways = gatewayNames.Count;
                    if (numGateways == 0)
                    {
                        RejectMessage(msg, "No gateways available");
                        logger.Warn(ErrorCode.ProxyClient_CannotSend, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager);
                        return;
                    }
                    Uri addr = gatewayNames[msgNumber % numGateways];
                    if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
                    {
                        gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout);
                        gatewayConnections[addr] = gatewayConnection;
                        if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayUnordered, "Creating gateway to {0} for unordered message to grain {1}", addr, msg.TargetGrain);
                        startRequired = true;
                    }
                    // else - Fast path - we've got a live gatewayConnection to use
                }
            }
            // Otherwise, use the buckets to ensure ordering.
            else
            {
                var index = msg.TargetGrain.GetHashCode_Modulo((uint)grainBuckets.Length);
                lock (lockable)
                {
                    // Repeated from above, at the declaration of the grainBuckets array:
                    // Requests are bucketed by GrainID, so that all requests to a grain get routed through the same bucket.
                    // Each bucket holds a (possibly null) weak reference to a GatewayConnection object. That connection instance is used
                    // if the WeakReference is non-null, is alive, and points to a live gateway connection. If any of these conditions is
                    // false, then a new gateway is selected using the gateway manager, and a new connection established if necessary.
                    var weakRef = grainBuckets[index];
                    if ((weakRef != null) && weakRef.IsAlive)
                    {
                        gatewayConnection = weakRef.Target as GatewayConnection;
                    }
                    if ((gatewayConnection == null) || !gatewayConnection.IsLive)
                    {
                        var addr = GatewayManager.GetLiveGateway();
                        if (addr == null)
                        {
                            RejectMessage(msg, "No gateways available");
                            logger.Warn(ErrorCode.ProxyClient_CannotSend_NoGateway, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager);
                            return;
                        }
                        if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_NewBucketIndex, "Starting new bucket index {0} for ordered messages to grain {1}", index, msg.TargetGrain);
                        if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
                        {
                            gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout);
                            gatewayConnections[addr] = gatewayConnection;
                            if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayToGrain, "Creating gateway to {0} for message to grain {1}, bucket {2}, grain id hash code {3}X", addr, msg.TargetGrain, index,
                                               msg.TargetGrain.GetHashCode().ToString("x"));
                            startRequired = true;
                        }
                        grainBuckets[index] = new WeakReference(gatewayConnection);
                    }
                }
            }

            if (startRequired)
            {
                gatewayConnection.Start();

                if (!gatewayConnection.IsLive)
                {
                    // if failed to start Gateway connection (failed to connect), try sending this msg to another Gateway.
                    RejectOrResend(msg);
                    return;
                }
            }

            try
            {
                gatewayConnection.QueueRequest(msg);
                if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_QueueRequest, "Sending message {0} via gateway {1}", msg, gatewayConnection.Address);
            }
            catch (InvalidOperationException)
            {
                // This exception can be thrown if the gateway connection we selected was closed since we checked (i.e., we lost the race)
                // If this happens, we reject if the message is targeted to a specific silo, or try again if not
                RejectOrResend(msg);
            }
        }
 public void Connect()
        {
            if (!MsgCenter.Running)
            {
                if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_MsgCtrNotRunning, "Ignoring connection attempt to gateway {0} because the proxy message center is not running", Address);
                return;
            }

            // Yes, we take the lock around a Sleep. The point is to ensure that no more than one thread can try this at a time.
            // There's still a minor problem as written -- if the sending thread and receiving thread both get here, the first one
            // will try to reconnect. eventually do so, and then the other will try to reconnect even though it doesn't have to...
            // Hopefully the initial "if" statement will prevent that.
            lock (Lockable)
            {
                if (!IsLive)
                {
                    if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_DeadGateway, "Ignoring connection attempt to gateway {0} because this gateway connection is already marked as non live", Address);
                    return; // if the connection is already marked as dead, don't try to reconnect. It has been doomed.
                }

                for (var i = 0; i < ClientMessageCenter.CONNECT_RETRY_COUNT; i++)
                {
                    try
                    {
                        if (Socket != null)
                        {
                            if (Socket.Connected)
                                return;

                            MarkAsDisconnected(Socket); // clean up the socket before reconnecting.
                        }
                        if (lastConnect != new DateTime())
                        {
                            // We already tried at least once in the past to connect to this GW.
                            // If we are no longer connected to this GW and it is no longer in the list returned
                            // from the GatewayProvider, consider directly this connection dead.
                            if (!MsgCenter.GatewayManager.GetLiveGateways().Contains(Address))
                                break;

                            // Wait at least ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY before reconnection tries
                            var millisecondsSinceLastAttempt = DateTime.UtcNow - lastConnect;
                            if (millisecondsSinceLastAttempt < ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY)
                            {
                                var wait = ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY - millisecondsSinceLastAttempt;
                                if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_PauseBeforeRetry, "Pausing for {0} before trying to connect to gateway {1} on trial {2}", wait, Address, i);
                                Thread.Sleep(wait);
                            }
                        }
                        lastConnect = DateTime.UtcNow;
                        Socket = new Socket(Silo.Endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                        Socket.EnableFastpath();
                        SocketManager.Connect(Socket, Silo.Endpoint, this.openConnectionTimeout);
                        NetworkingStatisticsGroup.OnOpenedGatewayDuplexSocket();
                        MsgCenter.OnGatewayConnectionOpen();
                        SocketManager.WriteConnectionPreamble(Socket, MsgCenter.ClientId);  // Identifies this client
                        Log.Info(ErrorCode.ProxyClient_Connected, "Connected to gateway at address {0} on trial {1}.", Address, i);
                        return;
                    }
                    catch (Exception ex)
                    {
                        Log.Warn(ErrorCode.ProxyClient_CannotConnect, $"Unable to connect to gateway at address {Address} on trial {i} (Exception: {ex.Message})");
                        MarkAsDisconnected(Socket);
                    }
                }
                // Failed too many times -- give up
                MarkAsDead();
            }
        }

GatewayConnection的Start會調用到GatewayClientReceiver的Run方法,利用BlockingCollection 的Add方法添加到PendingInboundMessages,而以前的RunClientMessagePump裏面transport.WaitMessage方法正式經過PendingInboundMessages.Take()來獲取消息,至此造成了閉環。

 protected override void Run()
        {
            try
            {
                while (!Cts.IsCancellationRequested)
                {
                    int bytesRead = FillBuffer(buffer.BuildReceiveBuffer());
                    if (bytesRead == 0)
                    {
                        continue;
                    }

                    buffer.UpdateReceivedData(bytesRead);

                    Message msg;
                    while (buffer.TryDecodeMessage(out msg))
                    {
                        gatewayConnection.MsgCenter.QueueIncomingMessage(msg);
                        if (Log.IsEnabled(LogLevel.Trace)) Log.Trace("Received a message from gateway {0}: {1}", gatewayConnection.Address, msg);
                    }
                }
            }
            catch (Exception ex)
            {
                buffer.Reset();
                Log.Warn(ErrorCode.ProxyClientUnhandledExceptionWhileReceiving, $"Unexpected/unhandled exception while receiving: {ex}. Restarting gateway receiver for {gatewayConnection.Address}.", ex);
                throw;
            }
        }

關注SafeTimerBase類

Orleans用於處理定時或延時回調做業。

總結

建立一個簡單的connect,裏面有這麼多溝溝渠渠,但本質上來講,最底層是利用Socket套接字機制來實施機制。在Socket的基礎之上,又封裝維護了一層GatewayConnection和GatewayClientReceiver來實現網關(Silo)的操做,好比重試/監控/熔斷等,再結合Timer,Queue ,BlockingCollection,Task,ConcurrentDictionary,Interlocked等知識,構建一個可用的通訊框架。
說來容易幾句話,實現起來都是淚。

若是您徹底熟悉異步編程,並行編程,Socket網絡編程。又對分佈式/微服務理論有較深的理解,那麼orleans實現機制,對您來講多是相對容易。

本期結束,下期更精彩!

相關文章
相關標籤/搜索