上篇講到.net core web app是如何啓動並接受請求的,下面接着探索kestrel server是如何完成此任務的。web
FrameFactory建立的frame實例最終會交給libuv的loop回調接收請求。可是在這過程當中仍是有不少的初始化工做須要作的。後面咱們就管中窺豹來看一看。算法
public void Start<TContext>(IHttpApplication<TContext> application) { var engine = new KestrelEngine(new ServiceContext { FrameFactory = context => { return new Frame<TContext>(application, context); }, AppLifetime = _applicationLifetime, Log = trace, ThreadPool = new LoggingThreadPool(trace), DateHeaderValueManager = dateHeaderValueManager, ServerOptions = Options }); //啓動引擎。完成libuv的配置和啓動 engine.Start(threadCount); //針對綁定的多個地址建立server來接收請求。也就是針對ip:port來啓動tcp監聽 foreach (var address in _serverAddresses.Addresses.ToArray()) { engine.CreateServer(ipv4Address); } }
啓動綁定的端口*最大處理線程的thread。並初始化libuv組件。
每個線程初始化libuv,註冊loop回調等,並啓動libuv。c#
public void Start(int count) { for (var index = 0; index < count; index++) { Threads.Add(new KestrelThread(this)); } foreach (var thread in Threads) { thread.StartAsync().Wait(); } } private void ThreadStart(object parameter) { lock (_startSync) { var tcs = (TaskCompletionSource<int>) parameter; try { //初始化loop _loop.Init(_engine.Libuv); //註冊loop回調 //EnqueueCloseHandle:持有的資源釋放後的回調方法,回調往queue內增長一個item,事件循環該queue完成資源的最終釋放 _post.Init(_loop, OnPost, EnqueueCloseHandle); //註冊心跳定時器 _heartbeatTimer.Init(_loop, EnqueueCloseHandle); //啓動心跳定時器 _heartbeatTimer.Start(OnHeartbeat, timeout: HeartbeatMilliseconds, repeat: HeartbeatMilliseconds); _initCompleted = true; tcs.SetResult(0); } catch (Exception ex) { tcs.SetException(ex); return; } } try { //當前線程執行到Run()這裏會掛起 _loop.Run(); //應用程序stop,shutdown之類的狀況,libuv喚醒當前線程,完成資源清理 if (_stopImmediate) { // thread-abort form of exit, resources will be leaked //線程停止形式的退出,資源會被泄露。 return; } // run the loop one more time to delete the open handles //再次運行循環以刪除打開的句柄 _post.Reference(); _post.Dispose(); _heartbeatTimer.Dispose(); // Ensure the Dispose operations complete in the event loop. //確保事件循環中的Dispose操做完成。 _loop.Run(); _loop.Dispose(); } catch (Exception ex) { _closeError = ExceptionDispatchInfo.Capture(ex); // Request shutdown so we can rethrow this exception // in Stop which should be observable. //請求關閉,以便咱們能夠從新拋出此異常在中止應該是可觀察的。 _appLifetime.StopApplication(); } finally { _threadTcs.SetResult(null); } }
回到1的kestrel的start中。接着執行engine.CreateServer(ipv4Address);,這裏和.net 裏面的tcplistener不太同樣。.net裏面就是listener bind,start,accept就行了。而libuv涉及到一個多路io複用的概念,這也是爲何使用他能高併發的緣由。服務器
public IDisposable CreateServer(ServerAddress address) { var usingPipes = address.IsUnixPipe; var pipeName = (Libuv.IsWindows ? @"\\.\pipe\kestrel_" : "/tmp/kestrel_") + Guid.NewGuid().ToString("n"); var single = Threads.Count == 1; var first = true; foreach (var thread in Threads) { if(single){}//single就不考慮,這種狀況真是環境是不會這樣玩的 else if (first) { //根據當前平臺建立tcp listener var listener = usingPipes ? (ListenerPrimary)new PipeListenerPrimary(ServiceContext) : new TcpListenerPrimary(ServiceContext); listener.StartAsync(pipeName, address, thread).Wait(); } else { //若是是屢次對同一個ip:port作監聽 var listener = usingPipes ? (ListenerSecondary)new PipeListenerSecondary(ServiceContext) : new TcpListenerSecondary(ServiceContext); listener.StartAsync(pipeName, address, thread).Wait(); } first = false; } }
首先說明一下TcpListenerPrimary這個類的繼承關係:TcpListenerPrimary -->ListenerPrimary -->Listener。這樣纔有助於後續代碼的理解。
後續代碼處處都能看到thread.post/postaysnc的代碼。這玩意的意思是把傳入的action放到libuv loop中,並激活異步完成回調。libuv另外一個重要的概念各類回調。
1.接着上面的代碼,咱們進入TcpListenerPrimary.StartAsync()方法。方法在ListenerPrimary中。併發
public async Task StartAsync(string pipeName, ServerAddress address, KestrelThread thread) { _pipeName = pipeName; await StartAsync(address, thread).ConfigureAwait(false); await Thread.PostAsync(state => ((ListenerPrimary)state).PostCallback(), this).ConfigureAwait(false); }
2.接着上面的代碼進入StartAsync(address, thread)。他是父類Listener的方法。app
public Task StartAsync(ServerAddress address, KestrelThread thread) { ServerAddress = address; Thread = thread; var tcs = new TaskCompletionSource<int>(this); Thread.Post(state => { var tcs2 = (TaskCompletionSource<int>)state; var listener = ((Listener)tcs2.Task.AsyncState); //建立socket listener.ListenSocket = listener.CreateListenSocket(); ////socket監聽,libu註冊監聽並設置回調函數,最大隊列。 ListenSocket.Listen(Constants.ListenBacklog, ConnectionCallback, this); tcs2.SetResult(0); }, tcs); return tcs.Task; } protected override UvStreamHandle CreateListenSocket() { //初始化socket並bind到address var socket = new UvTcpHandle(Log); socket.Init(Thread.Loop, Thread.QueueCloseHandle); //是否使用Nagle's algorithm算法。 socket.NoDelay(ServerOptions.NoDelay); socket.Bind(ServerAddress); // If requested port was "0", replace with assigned dynamic port. ServerAddress.Port = socket.GetSockIPEndPoint().Port; return socket; }
在接着上面的代碼ListenSocket.Listen成功以後,libuv回調ConnectionCallback函數。異步
step1:listen成功libuv回調ConnectionCallback方法。
step2:初始化接收請求socket,並將之關聯到監聽socket
step3:適配接收請求socket,若是是第一次適配的話則建立connection
step4:建立connection並啓動
step5:new connection 關聯 Frame
step6:啓動frame
step7:由Connection類調用一次以開始RequestProcessingAsync循環。
step8:循環接收請求,接收請求到以後交給上層程序處理
private static void ConnectionCallback(UvStreamHandle stream, int status, Exception error, object state) { var listener = (Listener)state; listener.OnConnection(stream, status);//step 1 } protected override void OnConnection(UvStreamHandle listenSocket, int status)//step 2 { var acceptSocket = new UvTcpHandle(Log); acceptSocket.Init(Thread.Loop, Thread.QueueCloseHandle); acceptSocket.NoDelay(ServerOptions.NoDelay); listenSocket.Accept(acceptSocket); DispatchConnection(acceptSocket); } protected override void DispatchConnection(UvStreamHandle socket)// step 3 { var index = _dispatchIndex++ % (_dispatchPipes.Count + 1); if (index == _dispatchPipes.Count) { base.DispatchConnection(socket); } else { DetachFromIOCP(socket); var dispatchPipe = _dispatchPipes[index]; var write = new UvWriteReq(Log); write.Init(Thread.Loop); write.Write2(dispatchPipe, _dummyMessage, socket, (write2, status, error, state) => { write2.Dispose(); ((UvStreamHandle)state).Dispose(); }, socket); } } protected virtual void DispatchConnection(UvStreamHandle socket)//step 4 { var connection = new Connection(this, socket); connection.Start(); } private Func<ConnectionContext, Frame> FrameFactory => ListenerContext.ServiceContext.FrameFactory; public Connection(ListenerContext context, UvStreamHandle socket) : base(context)//step 5 { SocketInput = new SocketInput(Thread.Memory, ThreadPool, _bufferSizeControl); SocketOutput = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool); //重點代碼在這裏,FrameFactory是一個委託,是KestrelServer.Start中註冊的action _frame = FrameFactory(this); } public void Start()//step 6 { Log.ConnectionStart(ConnectionId); // Start socket prior to applying the ConnectionFilter _socket.ReadStart(_allocCallback, _readCallback, this); _frame.Start(); } /// <summary> /// Called once by Connection class to begin the RequestProcessingAsync loop. /// </summary> public void Start()//step 7 { Reset(); _requestProcessingTask = Task.Factory.StartNew( (o) => ((Frame)o).RequestProcessingAsync(), this, default(CancellationToken), TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(); } /// <summary> /// 主循環消耗套接字輸入,將其解析爲協議幀,並調用應用程序委託,只要套接字打算保持打開。 /// 今後循環獲得的任務將保留在服務器須要時使用的字段中以排除和關閉全部當前活動的鏈接。 /// </summary> public override async Task RequestProcessingAsync() { while (!_requestProcessingStopping) { InitializeHeaders(); var context = _application.CreateContext(this); await _application.ProcessRequestAsync(context).ConfigureAwait(false); } }