C# 異步併發操做,只保留最後一次操做

在咱們業務操做時,不免會有屢次操做,咱們指望什麼結果呢?html

絕大部分狀況,應該是隻須要最後一次操做的結果,其它操做應該無效。併發

自定義等待的任務類

1. 可等待的任務類 AwaitableTask:異步

 1     /// <summary>
 2     /// 可等待的任務  3     /// </summary>
 4     public class AwaitableTask  5  {  6         /// <summary>
 7         /// 獲取任務是否爲不可執行狀態  8         /// </summary>
 9         public bool NotExecutable { get; private set; }  10 
 11         /// <summary>
 12         /// 設置任務不可執行  13         /// </summary>
 14         public void SetNotExecutable()  15  {  16             NotExecutable = true;  17  }  18 
 19         /// <summary>
 20         /// 獲取任務是否有效  21         /// 注:對無效任務,能夠不作處理。減小併發操做致使的干擾  22         /// </summary>
 23         public bool IsInvalid { get; private set; } = true;  24 
 25         /// <summary>
 26         /// 標記任務無效  27         /// </summary>
 28         public void MarkTaskValid()  29  {  30             IsInvalid = false;  31  }  32 
 33         #region Task
 34 
 35         private readonly Task _task;  36         /// <summary>
 37         /// 初始化可等待的任務。  38         /// </summary>
 39         /// <param name="task"></param>
 40         public AwaitableTask(Task task) => _task = task;  41 
 42         /// <summary>
 43         /// 獲取任務是否已完成  44         /// </summary>
 45         public bool IsCompleted => _task.IsCompleted;  46 
 47         /// <summary>
 48         /// 任務的Id  49         /// </summary>
 50         public int TaskId => _task.Id;  51 
 52         /// <summary>
 53         /// 開始任務  54         /// </summary>
 55         public void Start() => _task.Start();  56 
 57         /// <summary>
 58         /// 同步執行開始任務  59         /// </summary>
 60         public void RunSynchronously() => _task.RunSynchronously();  61 
 62         #endregion
 63 
 64         #region TaskAwaiter
 65 
 66         /// <summary>
 67         /// 獲取任務等待器  68         /// </summary>
 69         /// <returns></returns>
 70         public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);  71 
 72         /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
 73         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]  74         public struct TaskAwaiter : INotifyCompletion  75  {  76             private readonly AwaitableTask _task;  77 
 78             /// <summary>
 79             /// 任務等待器  80             /// </summary>
 81             /// <param name="awaitableTask"></param>
 82             public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;  83 
 84             /// <summary>
 85             /// 任務是否完成.  86             /// </summary>
 87             public bool IsCompleted => _task._task.IsCompleted;  88 
 89             /// <inheritdoc />
 90             public void OnCompleted(Action continuation)  91  {  92                 var This = this;  93                 _task._task.ContinueWith(t =>
 94  {  95                     if (!This._task.NotExecutable) continuation?.Invoke();  96  });  97  }  98             /// <summary>
 99             /// 獲取任務結果 100             /// </summary>
101             public void GetResult() => _task._task.Wait(); 102  } 103 
104         #endregion
105 
106     }
View Code

無效的操做能夠分爲如下倆種:async

  • 已經進行中的操做,後續結果應標記爲無效
  • 還沒開始的操做,後續不執行

自定義任務類型 AwaitableTask中,添加倆個字段NotExecutable、IsInvalid:ide

1     /// <summary>
2     /// 獲取任務是否爲不可執行狀態 3     /// </summary>
4     public bool NotExecutable { get; private set; } 5     /// <summary>
6     /// 獲取任務是否有效 7     /// 注:對無效任務,能夠不作處理。減小併發操做致使的干擾 8     /// </summary>
9     public bool IsInvalid { get; private set; } = true;

2. 有返回結果的可等待任務類 AwaitableTask<TResult>:測試

 1     /// <summary>
 2     /// 可等待的任務  3     /// </summary>
 4     /// <typeparam name="TResult"></typeparam>
 5     public class AwaitableTask<TResult> : AwaitableTask  6  {  7         private readonly Task<TResult> _task;  8         /// <summary>
 9         /// 初始化可等待的任務 10         /// </summary>
11         /// <param name="task">須要執行的任務</param>
12         public AwaitableTask(Task<TResult> task) : base(task) => _task = task; 13 
14         #region TaskAwaiter
15 
16         /// <summary>
17         /// 獲取任務等待器 18         /// </summary>
19         /// <returns></returns>
20         public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 21 
22         /// <summary>
23         /// 任務等待器 24         /// </summary>
25         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 26         public new struct TaskAwaiter : INotifyCompletion 27  { 28             private readonly AwaitableTask<TResult> _task; 29 
30             /// <summary>
31             /// 初始化任務等待器 32             /// </summary>
33             /// <param name="awaitableTask"></param>
34             public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 35 
36             /// <summary>
37             /// 任務是否已完成。 38             /// </summary>
39             public bool IsCompleted => _task._task.IsCompleted; 40 
41             /// <inheritdoc />
42             public void OnCompleted(Action continuation) 43  { 44                 var This = this; 45                 _task._task.ContinueWith(t =>
46  { 47                     if (!This._task.NotExecutable) continuation?.Invoke(); 48  }); 49  } 50 
51             /// <summary>
52             /// 獲取任務結果。 53             /// </summary>
54             /// <returns></returns>
55             public TResult GetResult() => _task._task.Result; 56  } 57 
58         #endregion
59     }
View Code

添加任務等待器,同步等待結果返回:this

 1    /// <summary>
 2     /// 獲取任務等待器  3     /// </summary>
 4     /// <returns></returns>
 5     public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);  6 
 7     /// <summary>
 8     /// 任務等待器  9     /// </summary>
10     [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 11     public new struct TaskAwaiter : INotifyCompletion 12  { 13         private readonly AwaitableTask<TResult> _task; 14 
15         /// <summary>
16         /// 初始化任務等待器 17         /// </summary>
18         /// <param name="awaitableTask"></param>
19         public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 20 
21         /// <summary>
22         /// 任務是否已完成。 23         /// </summary>
24         public bool IsCompleted => _task._task.IsCompleted; 25 
26         /// <inheritdoc />
27         public void OnCompleted(Action continuation) 28  { 29             var This = this; 30             _task._task.ContinueWith(t =>
31  { 32                 if (!This._task.NotExecutable) continuation?.Invoke(); 33  }); 34  } 35 
36         /// <summary>
37         /// 獲取任務結果。 38         /// </summary>
39         /// <returns></returns>
40         public TResult GetResult() => _task._task.Result; 41     }

異步任務隊列

 1     /// <summary>
 2     /// 異步任務隊列  3     /// </summary>
 4     public class AsyncTaskQueue : IDisposable  5  {  6         /// <summary>
 7         /// 異步任務隊列  8         /// </summary>
 9         public AsyncTaskQueue()  10  {  11             _autoResetEvent = new AutoResetEvent(false);  12             _thread = new Thread(InternalRunning) { IsBackground = true };  13  _thread.Start();  14  }  15 
 16         #region 執行
 17 
 18         /// <summary>
 19         /// 執行異步操做  20         /// </summary>
 21         /// <typeparam name="T">返回結果類型</typeparam>
 22         /// <param name="func">異步操做</param>
 23         /// <returns>isInvalid:異步操做是否有效;result:異步操做結果</returns>
 24         public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)  25  {  26             var task = GetExecutableTask(func);  27             var result = await await task;  28             if (!task.IsInvalid)  29  {  30                 result = default(T);  31  }  32             return (task.IsInvalid, result);  33  }  34 
 35         /// <summary>
 36         /// 執行異步操做  37         /// </summary>
 38         /// <typeparam name="T"></typeparam>
 39         /// <param name="func"></param>
 40         /// <returns></returns>
 41         public async Task<bool> ExecuteAsync<T>(Func<Task> func)  42  {  43             var task = GetExecutableTask(func);  44             await await task;  45             return task.IsInvalid;  46  }  47 
 48         #endregion
 49 
 50         #region 添加任務
 51 
 52         /// <summary>
 53         /// 獲取待執行任務  54         /// </summary>
 55         /// <param name="action"></param>
 56         /// <returns></returns>
 57         private AwaitableTask GetExecutableTask(Action action)  58  {  59             var awaitableTask = new AwaitableTask(new Task(action));  60  AddPenddingTaskToQueue(awaitableTask);  61             return awaitableTask;  62  }  63 
 64         /// <summary>
 65         /// 獲取待執行任務  66         /// </summary>
 67         /// <typeparam name="TResult"></typeparam>
 68         /// <param name="function"></param>
 69         /// <returns></returns>
 70         private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function)  71  {  72             var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function));  73  AddPenddingTaskToQueue(awaitableTask);  74             return awaitableTask;  75  }  76 
 77         /// <summary>
 78         /// 添加待執行任務到隊列  79         /// </summary>
 80         /// <param name="task"></param>
 81         /// <returns></returns>
 82         private void AddPenddingTaskToQueue(AwaitableTask task)  83  {  84             //添加隊列,加鎖。
 85             lock (_queue)  86  {  87  _queue.Enqueue(task);  88                 //開始執行任務
 89  _autoResetEvent.Set();  90  }  91  }  92 
 93         #endregion
 94 
 95         #region 內部運行
 96 
 97         private void InternalRunning()  98  {  99             while (!_isDisposed) 100  { 101                 if (_queue.Count == 0) 102  { 103                     //等待後續任務
104  _autoResetEvent.WaitOne(); 105  } 106                 while (TryGetNextTask(out var task)) 107  { 108                     //如已從隊列中刪除
109                     if (task.NotExecutable) continue; 110 
111                     if (UseSingleThread) 112  { 113  task.RunSynchronously(); 114  } 115                     else
116  { 117  task.Start(); 118  } 119  } 120  } 121  } 122         /// <summary>
123         /// 上一次異步操做 124         /// </summary>
125         private AwaitableTask _lastDoingTask; 126         private bool TryGetNextTask(out AwaitableTask task) 127  { 128             task = null; 129             while (_queue.Count > 0) 130  { 131                 //獲取並從隊列中移除任務
132                 if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) 133  { 134                     //設置進行中的異步操做無效
135                     _lastDoingTask?.MarkTaskValid(); 136                     _lastDoingTask = task; 137                     return true; 138  } 139                 //併發操做,設置任務不可執行
140  task.SetNotExecutable(); 141  } 142             return false; 143  } 144 
145         #endregion
146 
147         #region dispose
148 
149         /// <inheritdoc />
150         public void Dispose() 151  { 152             Dispose(true); 153             GC.SuppressFinalize(this); 154  } 155 
156         /// <summary>
157         /// 析構任務隊列 158         /// </summary>
159         ~AsyncTaskQueue() => Dispose(false); 160 
161         private void Dispose(bool disposing) 162  { 163             if (_isDisposed) return; 164             if (disposing) 165  { 166  _autoResetEvent.Dispose(); 167  } 168             _thread = null; 169             _autoResetEvent = null; 170             _isDisposed = true; 171  } 172 
173         #endregion
174 
175         #region 屬性及字段
176 
177         /// <summary>
178         /// 是否使用單線程完成任務. 179         /// </summary>
180         public bool UseSingleThread { get; set; } = true; 181 
182         /// <summary>
183         /// 自動取消之前的任務。 184         /// </summary>
185         public bool AutoCancelPreviousTask { get; set; } = false; 186 
187         private bool _isDisposed; 188         private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>(); 189         private Thread _thread; 190         private AutoResetEvent _autoResetEvent; 191 
192         #endregion
View Code

添加異步任務隊列類,用於任務的管理,如添加、執行、篩選等:spa

1. 自動取消以前的任務 AutoCancelPreviousTask線程

內部使用線程,循環獲取當前任務列表,若是當前任務被標記NotExecutable不可執行,則跳過。code

NotExecutable是什麼時候標記的?

獲取任務時,標記全部獲取的任務爲NotExecutable。直到任務列表中爲空,那麼只執行最後獲取的一個任務。

2. 標記已經進行的任務無效 MarkTaskValid

當前進行的任務,沒法停止,那麼標記爲無效便可。

 1     /// <summary>
 2     /// 上一次異步操做  3     /// </summary>
 4     private AwaitableTask _lastDoingTask;  5     private bool TryGetNextTask(out AwaitableTask task)  6  {  7         task = null;  8         while (_queue.Count > 0)  9  { 10             //獲取並從隊列中移除任務
11             if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) 12  { 13                 //設置進行中的異步操做無效
14                 _lastDoingTask?.MarkTaskValid(); 15                 _lastDoingTask = task; 16                 return true; 17  } 18             //併發操做,設置任務不可執行
19  task.SetNotExecutable(); 20  } 21         return false; 22     }

後續執行完後,根據此標記,設置操做結果爲空。

 1     /// <summary>
 2     /// 執行異步操做  3     /// </summary>
 4     /// <typeparam name="T">返回結果類型</typeparam>
 5     /// <param name="func">異步操做</param>
 6     /// <returns>isInvalid:異步操做是否有效;result:異步操做結果</returns>
 7     public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)  8  {  9         var task = GetExecutableTask(func); 10         var result = await await task; 11         if (!task.IsInvalid) 12  { 13             result = default(T); 14  } 15         return (task.IsInvalid, result); 16     }

實踐測試

啓動10個併發任務,測試實際的任務隊列併發操做管理:

 1     public MainWindow()  2  {  3  InitializeComponent();  4         _asyncTaskQueue = new AsyncTaskQueue  5  {  6             AutoCancelPreviousTask = true,  7             UseSingleThread = true
 8  };  9  } 10     private AsyncTaskQueue _asyncTaskQueue; 11     private void ButtonBase_OnClick(object sender, RoutedEventArgs e) 12  { 13         // 快速啓動9個任務
14         for (var i = 1; i < 10; i++) 15  { 16  Test(_asyncTaskQueue, i); 17  } 18  } 19     public static async void Test(AsyncTaskQueue taskQueue, int num) 20  { 21         var result = await taskQueue.ExecuteAsync(async () =>
22  { 23             Debug.WriteLine("輸入:" + num); 24             // 長時間耗時任務
25             await Task.Delay(TimeSpan.FromSeconds(5)); 26             return num * 100; 27  }); 28         Debug.WriteLine($"{num}輸出的:" + result); 29     }

測試結果以下:

一共9次操做,只有最後一次操做結果,纔是有效的。其它8次操做,一次是無效的,7次操做被取消不執行。

原文出處:https://www.cnblogs.com/kybs0/p/11988554.html

相關文章
相關標籤/搜索