做爲單體程序,依賴的第三方服務雖很少,可是2C的程序仍是有很多內容可講; 做爲一個常規互聯網系統,無外乎就是接受請求、處理請求,輸出響應。html
因爲業務漸漸增加,數據處理的過程會愈來愈複雜和冗長,【連貫高效的處理數據】 愈來愈被看重, .Net 提供了TPL Dataflow組件使咱們更高效的實現基於數據流和 流水線操做的代碼。redis
下圖是單體程序中 數據處理的用例圖。編程
程序中用到的TPL Dataflow 組件,Dataflow是微軟前幾年給出的數據處理庫, 是由不一樣的處理塊組成,可將這些塊組裝成一個處理管道,"塊"對應處理管道中的"階段", 可類比AspNetCore 中Middleware 和pipeline.。api
TPL Dataflow庫爲消息傳遞和並行化CPU密集型和I / O密集型應用程序提供了編程基礎,這些應用程序具備高吞吐量和低延遲。它還可讓您明確控制數據的緩衝方式並在系統中移動。數組
傳統的編程模型一般使用回調和同步對象(如鎖)來協調任務和訪問共享數據, 從宏觀看傳統模型: 任務是一步步緊接着完成的。緩存
經過使用數據流編程模型,您能夠建立在從磁盤讀取圖像時處理圖像的數據流對象。在數據流模型下,您能夠聲明數據在可用時的處理方式以及數據之間的依賴關係。因爲運行時管理數據之間的依賴關係,所以一般能夠避免同步訪問共享數據的要求。此外,因爲運行時調度基於數據的異步到達而工做,所以數據流能夠經過有效地管理底層線程來提升響應性和吞吐量。 也就是說: 你定義的是任務內容和任務之間的依賴,不關注數據何時流到這個任務 。併發
TPL Dataflow 內置的Block覆蓋了常見的應用場景,固然若是內置塊不能知足你的要求,你也能夠自定「塊」。app
Block能夠劃分爲下面3類:異步
Buffering Only 【Buffer不是緩存Cache的概念, 而是一個緩衝區的概念】async
Execution
Grouping
使用以上塊混搭處理管道, 大多數的塊都會執行一個操做,有些時候須要將消息分發到不一樣Block,這時可以使用特殊類型的緩衝塊給管道「」分叉」。
輸入、輸出消息的緩衝區(通常稱爲Input,Output隊列)
在消息上執行動做的委託
消息在輸入和輸出時可以被緩衝:當Func委託的運行速度比輸入的消息速度慢時,後續消息將在到達時進行緩衝;當下一個塊的輸入緩衝區中沒有容量時,將在輸出時緩衝。
每一個塊咱們能夠配置:
緩衝區的總容量, 默認無上限
執行操做委託的併發度, 默認狀況下塊按照順序處理消息,一次一個。
咱們將塊連接在一塊兒造成一個處理管道,生產者將消息推向管道。
TPL Dataflow有一個基於pull的機制(使用Receive和TryReceive方法),但咱們將在管道中使用塊鏈接和推送機制。
TransformBlock(Execution category)-- 由輸入輸出緩衝區和一個Func<TInput, TOutput>委託組成,消費的每一個消息,都會輸出另一個,你可使用這個Block去執行輸入消息的轉換,或者轉發輸出的消息到另一個Block。
TransformManyBlock (Execution category) -- 由輸入輸出緩衝區和一個Func<TInput, IEnumerable<TOutput>>委託組成, 它爲輸入的每一個消息輸出一個 IEnumerable<TOutput>
BroadcastBlock (Buffering category)-- 由只容納1個消息的緩衝區和Func<T, T>委託組成。緩衝區被每一個新傳入的消息所覆蓋,委託僅僅爲了讓你控制怎樣克隆這個消息,不作消息轉換。
該塊能夠連接到多個塊(管道的分叉),雖然它一次只緩衝一條消息,但它必定會在該消息被覆蓋以前將該消息轉發到連接塊(連接塊還有緩衝區)。
ActionBlock (Execution category)-- 由緩衝區和Action<T>委託組成,他們通常是管道的結尾,他們再也不給其餘塊轉發消息,他們只會處理輸入的消息。
BatchBlock (Grouping category)-- 告訴它你想要的每一個批處理的大小,它將累積消息,直到它達到那個大小,而後將它做爲一組消息轉發到下一個塊。
還有一下其餘的Block類型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,咱們暫時不會深刻。
當輸入緩衝區達到上限容量,爲其供貨的上游塊的輸出緩衝區將開始填充,當輸出緩衝區已滿時,該塊必須暫停處理,直到緩衝區有空間,這意味着一個Block的處理瓶頸可能致使全部前面的塊的緩衝區被填滿。
可是不是全部的塊變滿時,都會暫停,BroadcastBlock 有容許1個消息的緩衝區,每一個消息都會被覆蓋, 所以若是這個廣播塊不能將消息轉發到下游,則在下個消息到達的時候消息將丟失,這在某種意義上是一種限流(比較生硬).
將按照上圖實現TPL Dataflow
public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory) { _httpClient = httpClientFactory.CreateClient("bce-request"); _redisDB0 = redisCache[0]; _redisDB = redisCache; _logger = loggerFactory.CreateLogger(nameof(EqidPairHandler)); var option = new DataflowLinkOptions { PropagateCompletion = true }; publisher = _redisDB.RedisConnection.GetSubscriber(); _eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel> ( // redis piublih 沒有作在TransformBlock fun裏面, 由於publih失敗可能影響後續的block傳遞 eqidPair => EqidResolverAsync(eqidPair), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism") } ); // https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline _logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory); _logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) ); _broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容納一個消息的緩存區和拷貝函數組成 _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option); _broadcastBlock.LinkTo(_logPublishBlock, option); _eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option); }
public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase { private readonly string _dirPath; private readonly Timer _triggerBatchTimer; private readonly Timer _openFileTimer; private DateTime? _nextCheckpoint; private TextWriter _currentWriter; private readonly LogHead _logHead; private readonly object _syncRoot = new object(); private readonly ILogger _logger; private readonly BatchBlock<T> _packer; private readonly ActionBlock<T[]> batchWriterBlock; private readonly TimeSpan _logFileIntervalTimeSpan; /// <summary> /// Generate request log file. /// </summary> public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger<LogBatchBlock<T>>(); _dirPath = logConfig.DirPath; if (!Directory.Exists(_dirPath)) { Directory.CreateDirectory(_dirPath); } _logHead = logConfig.LogHead; _packer = new BatchBlock<T>(logConfig.BatchSize); batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models)); _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true }); _triggerBatchTimer = new Timer(state => { _packer.TriggerBatch(); }, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period)); _logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval); _openFileTimer = new Timer(state => { AlignCurrentFileTo(DateTime.Now); }, null, TimeSpan.Zero, _logFileIntervalTimeSpan); } public ITargetBlock<T> InputBlock => _packer; private void AlignCurrentFileTo(DateTime dt) { if (!_nextCheckpoint.HasValue) { OpenFile(dt); } if (dt >= _nextCheckpoint.Value) { CloseFile(); OpenFile(dt); } } private void OpenFile(DateTime now, string fileSuffix = null) { string filePath = null; try { var currentHour = now.Date.AddHours(now.Hour); _nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan); int hourConfiguration = _logFileIntervalTimeSpan.Hours; int minuteConfiguration = _logFileIntervalTimeSpan.Minutes; filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log"; var appendHead = !File.Exists(filePath); if (filePath != null) { var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write); var sw = new StreamWriter(stream, Encoding.Default); if (appendHead) { sw.Write(GenerateHead()); } _currentWriter = sw; _logger.LogDebug($"{currentHour} TextWriter has been created."); } } catch (UnauthorizedAccessException ex) { _logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex); throw; } catch (Exception e) { if (fileSuffix == null) { _logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace); OpenFile(now, $"-{Guid.NewGuid()}"); } else { _logger.LogError($"OpenFile failed after retry: {filePath}", e); throw; } } } private void CloseFile() { if (_currentWriter != null) { _currentWriter.Flush(); _currentWriter.Dispose(); _currentWriter = null; _logger.LogDebug($"{DateTime.Now} TextWriter has been disposed."); } _nextCheckpoint = null; } private string GenerateHead() { StringBuilder head = new StringBuilder(); head.AppendLine("#Software: " + _logHead.Software) .AppendLine("#Version: " + _logHead.Version) .AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}") .AppendLine("#Fields: " + _logHead.Fields); return head.ToString(); } private void WriteToFile(T[] models) { try { lock (_syncRoot) { var flag = false; foreach (var model in models) { if (model == null) continue; flag = true; AlignCurrentFileTo(model.ServerLocalTime); _currentWriter.WriteLine(model.ToString()); } if (flag) _currentWriter.Flush(); } } catch (Exception ex) { _logger.LogError("WriteToFile Error : {0}", ex.Message); } } public bool AcceptLogModel(T model) { return _packer.Post(model); } public string GetDirPath() { return _dirPath; } public async Task CompleteAsync() { _triggerBatchTimer.Dispose(); _openFileTimer.Dispose(); _packer.TriggerBatch(); _packer.Complete(); await InputBlock.Completion; lock (_syncRoot) { CloseFile(); } } }
② 異常處理
上述程序在部署時就遇到相關的坑位,在測試環境_eqid2ModelTransformBlock 內Func委託穩定執行,程序並未出現異樣;
部署到生產以後, 該Pipeline會運行一段時間就中止工做,一直很困惑, 後來經過監測_eqid2ModelTransformBlock.Completion 屬性,該塊提早進入「完成態」 : 程序在執行某次Func委託時報錯,Block提早進入完成態
TransfomrBlock.Completion 一個Task對象,當TPL Dataflow再也不處理消息而且能保證再也不處理消息的時候,就被定義爲完成態, Task對象的TaskStatus枚舉值將標記此Block進入完成態的真實緣由
- TaskStatus.RanToCompletion 根據Block定義的任務成功完成
- TaskStatus.Fault 由於未處理的異常 致使"過早的完成"
- TaskStatus.Cancled 由於取消操做 致使 "過早的完成"
咱們須要當心處理異常, 通常狀況下咱們使用try、catch包含全部的執行代碼以確保全部的異常都被處理。
可將TPL Dataflow 作爲進程內消息隊列,本文只是一個入門參考,更多複雜用法仍是看官網, 你須要記住的是, 這是一個.Net 進程內數據流組件, 能讓你專一於流程。
感謝您的認真閱讀,若有問題請大膽斧正;以爲有用,請下方或加關注。
本文歡迎轉載,但請保留此段聲明,且在文章頁面明顯位置註明本文的做者及原文連接。