近期因爲須要進行分佈式鏈路跟蹤系統的技術選型,因此一直在研究鏈路跟蹤相關的框架。做爲能在.Net Core中使用的APM,SkyWalking天然成爲了首選。SkyAPM-dotnet是SkyWalking在.Net Core端的探針實現,其主要的收集日誌的手段就是基於DiagnosticSource來進行診斷跟蹤的。不得不說SkyAPM-dotnet的設計仍是很是優秀的,它自己定義了一套很是規範的標準,並且提供了很是良好的擴展性,雖然框架自己可支持的採集端有限,可是基於這套標準擴展起來仍是很是方便的。html
關於DiagnosticSource它自己是一個基於發佈訂閱模式的工做模式,因爲它自己的實現方式是異步的,因此不只僅能夠把它用到日誌上,還能夠用它實現異步操做,或者用它簡化實現發佈訂閱的功能。DiagnosticSource自己是一個抽象類,咱們最經常使用到的是它的子類DiagnosticListener,經過DiagnosticSource的Write方法實現發佈一條有具體名稱的消息,而後經過IObserver去訂閱消息。DiagnosticListener能夠實現不一樣的實例,每一個實例能夠有本身的名稱,每一個實例還能夠發佈不一樣名稱的消息,比如一個在寫代碼的時候咱們能夠定義多個程序集,一個程序集下面能夠包含多個命名空間。git
上面咱們大體的介紹了關於DiagnosticSource相關的概念,相信你們已經有了初步的瞭解,接下來咱們就來看一下在代碼中如何使用DiagnosticSource,還說到了它一個重要的子類DiagnosticListener,基本上關於DiagnosticSource的工做方式都是圍繞着DiagnosticListener實現的,首先咱們來看一下如何發佈一條消息github
//聲明DiagnosticListener並命名爲MyTest DiagnosticSource diagnosticSource = new DiagnosticListener("MyTest"); string pubName = "MyTest.Log"; //判斷是否存在MyTest.Log的訂閱者 if (diagnosticSource.IsEnabled(pubName)) { //發送名爲MyTest.Log的消息,包含Name,Address兩個屬性 diagnosticSource.Write(pubName, new { Name = "old王", Address="隔壁" }); }
經過這種方式,咱們就能夠完成針對消息的發佈,其中用到了IsEnabled方法,這個方法是在實際使用DiagnosticSource過程當中比較經常使用的方法,用於判斷是夠存在對應名稱的消費者,這樣能夠有效的避免發送消息浪費。
發送相對仍是比較簡單的,接下來咱們看一下如何訂閱發佈的消息。上面咱們提到了訂閱消息是經過IObserver接口實現的,IObserver表明了訂閱者。雖然咱們經過DiagnosticSource去發佈消息,可是真正描述發佈者身份的是IObservable接口,IObservable的惟一方法Subscribe是用來註冊訂閱者IObserver,可是默認系統並無爲咱們提供一個具體的實現類,因此咱們須要定義一個IObserver訂閱者的實現類網絡
public class MyObserver<T>:IObserver<T> { private Action<T> _next; public MyObserver(Action<T> next) { _next = next; } public void OnCompleted() { } public void OnError(Exception error) { } public void OnNext(T value) => _next(value); }
有了具體的訂閱者實現類,咱們就能夠爲發佈者註冊訂閱者了,一樣是使用DiagnosticListener,我的認爲雖然操做都是經過DiagnosticSource來完成的,但它只是一個外觀類,可是並不能直接描述發佈者和訂閱者自己。接下來咱們看一下具體實現app
//AllListeners獲取全部發布者,Subscribe爲發佈者註冊訂閱者MyObserver DiagnosticListener.AllListeners.Subscribe(new MyObserver<DiagnosticListener>(listener => { //判斷髮布者的名字 if (listener.Name == "MyTest") { //獲取訂閱信息 listener.Subscribe(new MyObserver<KeyValuePair<string, object>>(listenerData => { System.Console.WriteLine($"監聽名稱:{listenerData.Key}"); dynamic data = listenerData.Value; //打印發布的消息 System.Console.WriteLine($"獲取的信息爲:{data.Name}的地址是{data.Address}"); })); listener.SubscribeWithAdapter(new MyDiagnosticListener()); } }));
具體實現可總結爲兩步,首先爲發佈者註冊訂閱者,而後獲取訂閱者獲取發佈的消息。這種寫法仍是比較複雜的,首先須要實現訂閱者類,而後經過一系列複雜的操做,才能完成消息訂閱,而後還要本身獲取發佈的消息,解析具體的消息值,總之操做流程很是繁瑣。微軟彷佛也意識到了這個問題,因而乎給我提供了一個關於實現訂閱者的便利方法,編輯項目文件引入DiagnosticAdapter包框架
<PackageReference Include="Microsoft.Extensions.DiagnosticAdapter" Version="3.1.7" />
或者經過包管理器直接搜索安裝,道路千萬條都是通羅馬。經過這個包解決了咱們兩個痛點,首先是關於訂閱者的註冊難問題,其次解決了關於發佈消息解析難的痛點。咱們能夠直接訂閱一個適配類來充當訂閱者的載體,其次咱們能夠定義方法模擬訂閱去訂閱消息,而這個方法的參數就是咱們發佈的消息內容。說了這麼多,不如直接上代碼異步
public class MyDiagnosticListener { //發佈的消息主題名稱 [DiagnosticName("MyTest.Log")] //發佈的消息參數名稱和發佈的屬性名稱要一致 public void MyLog(string name,string address) { System.Console.WriteLine($"監聽名稱:MyTest.Log"); System.Console.WriteLine($"獲取的信息爲:{name}的地址是{address}"); } }
咱們能夠隨便定義一個類來充當訂閱者載體,類裏面能夠自定義方法來實現獲取解析消息的實現。想要讓方法能夠訂閱消息,須要在方法上聲明DiagnosticName,而後名稱就是你要訂閱消息的名稱,而方法的參數就是你發佈消息的字段屬性名稱,這裏須要注意的是訂閱的參數名稱須要和發佈聲明屬性名稱一致。
而後咱們直接能夠經過這個類去接收訂閱消息async
DiagnosticListener.AllListeners.Subscribe(new MyObserver<DiagnosticListener>(listener => { if (listener.Name == "MyTest") { //適配訂閱 listener.SubscribeWithAdapter(new MyDiagnosticListener()); } }));
可能你以爲這樣仍是不夠好,由於仍是沒有脫離須要自定義訂閱者,這裏還有更簡潔的實現方式。細心的你可能已經發現了SubscribeWithAdapter是DiagnosticListener的擴展方法,而咱們聲明DiagnosticSource就是使用的DiagnosticListener實例,因此上面的代碼能夠簡化爲一下方式分佈式
DiagnosticListener diagnosticListener = new DiagnosticListener("MyTest"); DiagnosticSource diagnosticSource = diagnosticListener; //直接去適配訂閱者 diagnosticListener.SubscribeWithAdapter(new MyDiagnosticListener()); string pubName = "MyTest.Log"; if (diagnosticSource.IsEnabled(pubName)) { diagnosticSource.Write(pubName, new { Name = "old王", Address="隔壁" }); }
這種方式也是咱們比較推薦的使用方式,極大的節省了工做的方式,並且代碼很是的簡潔。可是存在惟一的不足,這種寫法只能針對特定的DiagnosticListener進行訂閱處理,若是你須要監聽全部發布者,就須要使用DiagnosticListener.AllListeners.Subscribe的方式。ide
在.Net Core的源碼中,微軟默認在涉及到網絡請求或處理請求等許多重要的節點都使用了DiagnosticListener來發布攔截的消息,接下來就羅列一些我知道的比較常見的埋點,經過這些操做咱們就能夠看出,診斷日誌仍是很便利的,並且微軟在.Net Core中也很是重視它的使用。
當咱們經過ConfigureWebHostDefaults配置Web主機的時候,程序就已經默認給咱們注入了診斷名稱爲Microsoft.AspNetCore的DiagnosticListener和DiagnosticSource,這樣咱們就能夠很方便的在程序中直接獲取DiagnosticListener實例去發佈消息或者監聽發佈的內部消息,具體注入邏輯位於能夠去GenericWebHostBuilder類中查看[點擊查看源碼👈]
var listener = new DiagnosticListener("Microsoft.AspNetCore"); services.TryAddSingleton<DiagnosticListener>(listener); services.TryAddSingleton<DiagnosticSource>(listener);
而後在Server啓動的時候傳遞了DiagnosticListener實例[點擊查看源碼👈]
var httpApplication = new HostingApplication(application, Logger, DiagnosticListener, HttpContextFactory); await Server.StartAsync(httpApplication, cancellationToken);
這樣在Server運行期間咱們能夠經過DiagnosticListener診斷跟蹤請求相關的信息,咱們能夠看下在處理請求的過程當中DiagnosticListener都發布了哪些消息,咱們找到發送診斷跟蹤的位置位於HostingApplicationDiagnostics中[點擊查看源碼👈],這事集中處理請求相關的診斷跟蹤,接下來咱們就大體查看一下它發佈了哪些事件消息,首先找到定義發佈名稱的屬性
private const string ActivityName = "Microsoft.AspNetCore.Hosting.HttpRequestIn"; private const string ActivityStartKey = ActivityName + ".Start"; private const string ActivityStopKey = ActivityName + ".Stop"; private const string DeprecatedDiagnosticsBeginRequestKey = "Microsoft.AspNetCore.Hosting.BeginRequest"; private const string DeprecatedDiagnosticsEndRequestKey = "Microsoft.AspNetCore.Hosting.EndRequest"; private const string DiagnosticsUnhandledExceptionKey = "Microsoft.AspNetCore.Hosting.UnhandledException";
經過這些發佈消息的名稱咱們就能夠看出,在請求開始、請求進入、請求結束、請求中止、請求異常等都發布了診斷消息,咱們以BeginRequest爲例查看一下具體發送的消息
if (_diagnosticListener.IsEnabled(DeprecatedDiagnosticsBeginRequestKey)) { startTimestamp = Stopwatch.GetTimestamp(); RecordBeginRequestDiagnostics(httpContext, startTimestamp); }
找到RecordBeginRequestDiagnostics方法的實現
[MethodImpl(MethodImplOptions.NoInlining)] private void RecordBeginRequestDiagnostics(HttpContext httpContext, long startTimestamp) { _diagnosticListener.Write( DeprecatedDiagnosticsBeginRequestKey, new { httpContext = httpContext, timestamp = startTimestamp }); }
從這裏咱們能夠看出在BeginRequest中診斷日誌發出的消息中包含了HttpContext和開始時間戳信息,而後再來看一下請求結束髮布的診斷消息
[MethodImpl(MethodImplOptions.NoInlining)] private void RecordEndRequestDiagnostics(HttpContext httpContext, long currentTimestamp) { _diagnosticListener.Write( DeprecatedDiagnosticsEndRequestKey, new { httpContext = httpContext, timestamp = currentTimestamp }); }
經過發佈的這些跟蹤日誌咱們能夠獲取請求信息,請求時間而且能獲得輸出信息和結束時間,有了這些關鍵信息,咱們就能夠監聽請Asp.Net Core處理請求的狀況,咱們上面提到過SkyAPM-dotnet正是經過這些發出診斷跟蹤日誌,來實現對程序無入侵的方式來處理應用系統監控的,具體咱們能夠查看相關實現,咱們找到訂閱這些消息的地方
[點擊查看源碼👈],拿出來類的結構,大體以下
public class HostingTracingDiagnosticProcessor : ITracingDiagnosticProcessor { public string ListenerName { get; } = "Microsoft.AspNetCore"; [DiagnosticName("Microsoft.AspNetCore.Hosting.BeginRequest")] public void BeginRequest([Property] HttpContext httpContext) { } [DiagnosticName("Microsoft.AspNetCore.Hosting.EndRequest")] public void EndRequest([Property] HttpContext httpContext) { } [DiagnosticName("Microsoft.AspNetCore.Diagnostics.UnhandledException")] public void DiagnosticUnhandledException([Property] HttpContext httpContext, [Property] Exception exception) { } [DiagnosticName("Microsoft.AspNetCore.Hosting.UnhandledException")] public void HostingUnhandledException([Property] HttpContext httpContext, [Property] Exception exception) { } //[DiagnosticName("Microsoft.AspNetCore.Mvc.BeforeAction")] public void BeforeAction([Property] ActionDescriptor actionDescriptor, [Property] HttpContext httpContext) { } //[DiagnosticName("Microsoft.AspNetCore.Mvc.AfterAction")] public void AfterAction([Property] ActionDescriptor actionDescriptor, [Property] HttpContext httpContext) { } }
不得不認可SkyAPM-dotnet很是巧妙的利用了系統內部發出的診斷跟蹤日誌,實現了對請求的處理跟蹤,真的是很是優秀。
上面咱們看到的是AspNetCore處理請求的診斷日誌埋點,在發出請求的HttpClient中,微軟也作了埋點處理。咱們在以前的文章.NET Core HttpClient源碼探究中提到過HttpClient經過HttpClientHandler發送請求的,在HttpClientHandler SendAsync方法中咱們能夠看到以下實現
protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { return DiagnosticsHandler.IsEnabled() && _diagnosticsHandler != null ? _diagnosticsHandler.SendAsync(request, cancellationToken) : _underlyingHandler.SendAsync(request, cancellationToken); }
也就是說若是知足DiagnosticsHandler.IsEnabled()而且_diagnosticsHandler不爲空的狀況下將會使用DiagnosticsHandler發送請求,關於DiagnosticsHandler.IsEnabled()的大體實現邏輯以下
if (AppContext.TryGetSwitch("System.Net.Http.EnableActivityPropagation", out bool enableActivityPropagation)) { return enableActivityPropagation; } string? envVar = Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_HTTP_ENABLEACTIVITYPROPAGATION"); if (envVar != null && (envVar.Equals("false", StringComparison.OrdinalIgnoreCase) || envVar.Equals("0"))) { return false; } return true;
經過這個邏輯能夠看出,默認狀況下咱們不作特殊處理返回的就是true,也就是說發送請求會經過DiagnosticsHandler,咱們找到DiagnosticsHandler的實現[點擊查看源碼👈],抽離出來SendAsyncCore方法中關於診斷跟蹤的核心實現邏輯,大體以下
DiagnosticListener diagnosticListener = new DiagnosticListener("HttpHandlerDiagnosticListener"); if (diagnosticListener.IsEnabled("System.Net.Http.Request")) { long timestamp = Stopwatch.GetTimestamp(); loggingRequestId = Guid.NewGuid(); //請求開始以前發送診斷日誌 diagnosticListener.Write("System.Net.Http.Request",new RequestData(request, loggingRequestId, timestamp)); } HttpResponseMessage? response = null; TaskStatus taskStatus = TaskStatus.RanToCompletion; try { response = async ? await base.SendAsync(request, cancellationToken).ConfigureAwait(false) : base.Send(request, cancellationToken); return response; } catch (OperationCanceledException) { taskStatus = TaskStatus.Canceled; throw; } catch (Exception ex) { taskStatus = TaskStatus.Faulted; if (diagnosticListener.IsEnabled("System.Net.Http.Exception")) { //若是請求出現異常發出異常消息診斷日誌 diagnosticListener.Write("System.Net.Http.Exception", new ExceptionData(ex, request)); } throw; } finally { if (activity != null) { diagnosticListener.StopActivity(activity, new ActivityStopData(response,request,taskStatus)); } if (diagnosticListener.IsEnabled("System.Net.Http.Response")) { long timestamp = Stopwatch.GetTimestamp(); //獲得輸出結果後發送診斷日誌 diagnosticListener.Write("System.Net.Http.Response",new ResponseData(response,loggingRequestId,timestamp,taskStatus)); } }
一樣的思路HttpClient會在發送請求以前發出請求信息相關的診斷跟蹤,會在獲得相應以後發送響應相關診斷跟蹤,經過這些信息咱們能夠捕獲到由程序發出的Http請求相關的信息,從而監控請求相關的數據,咱們來看一下SkyAPM-dotnet訂閱Http請求相關的實現,在HttpClientTracingDiagnosticProcessor類中[點擊查看源碼👈],抽離實現的框架大體以下
public class HttpClientTracingDiagnosticProcessor : ITracingDiagnosticProcessor { public string ListenerName { get; } = "HttpHandlerDiagnosticListener"; [DiagnosticName("System.Net.Http.Request")] public void HttpRequest([Property(Name = "Request")] HttpRequestMessage request) { } [DiagnosticName("System.Net.Http.Response")] public void HttpResponse([Property(Name = "Response")] HttpResponseMessage response) { } [DiagnosticName("System.Net.Http.Exception")] public void HttpException([Property(Name = "Request")] HttpRequestMessage request, [Property(Name = "Exception")] Exception exception) { } }
這裏正是監聽的HttpClient發出的診斷日誌。假如存在系統A和系統B,系統A經過HttpClient發送請求調用Asp.Net Core系統B,經過訂閱他們發出的診斷跟蹤日誌,而這些數據正是實現系統監控和鏈路跟蹤重要依據。
在.Net Core相關的源碼中還有許多其餘關於DiagnosticListener的埋點信息好比請求執行到Action的時候或者出現全局異常的時候都有相似的處理。一樣在EFCore中也存在這些埋點信息,有興趣的能夠自行查閱相關源碼和SkyAPM-dotnet源碼,瞭解DiagnosticSource工做方式,以及如何經過這些信息實現APM系統。雖然SkyAPM-dotnet自己實現的框架個數有限,可是它給咱們實現了良好的擴展性,咱們能夠經過DiagnosticSource和DiagnosticListener自行實現SkyAPM-dotnet的擴展,好比你能夠擴展Redis MongoDb等其它中間件。好比SkyApm.Diagnostics.CAP是CAP歸入SkyAPM中程序包,正是楊總參與了相關代碼的實現。
DiagnosticSource診斷跟蹤涉及到的概念雖然不是不少,可是在.Net Core相關的框架中使用的仍是很是普遍的,經過這些信息咱們能夠拿到框架執行過程當中關鍵節點獲得的信息,爲咱們提供了很大的便利。加上SkyAPM-dotnet巧妙的使用了這一特色使得DiagnosticSource更變得強大並且通用。上面咱們講述的只是冰山一角,還有更多更深的應用,好比Azure監控.Net Core應用程序也是利用了這些。有興趣的能夠查看相關源碼,也能夠學習一下SkyAPM-dotnet相關源碼,體會一下DiagnosticSource精髓所在。