在咱們業務操做時,不免會有屢次操做,咱們指望什麼結果呢?html
絕大部分狀況,應該是隻須要最後一次操做的結果,其它操做應該無效。併發
自定義等待的任務類
1. 可等待的任務類 AwaitableTask:異步
1 /// <summary> 2 /// 可等待的任務 3 /// </summary> 4 public class AwaitableTask 5 { 6 /// <summary> 7 /// 獲取任務是否爲不可執行狀態 8 /// </summary> 9 public bool NotExecutable { get; private set; } 10 11 /// <summary> 12 /// 設置任務不可執行 13 /// </summary> 14 public void SetNotExecutable() 15 { 16 NotExecutable = true; 17 } 18 19 /// <summary> 20 /// 獲取任務是否有效 21 /// 注:對無效任務,能夠不作處理。減小併發操做致使的干擾 22 /// </summary> 23 public bool IsInvalid { get; private set; } = true; 24 25 /// <summary> 26 /// 標記任務無效 27 /// </summary> 28 public void MarkTaskValid() 29 { 30 IsInvalid = false; 31 } 32 33 #region Task 34 35 private readonly Task _task; 36 /// <summary> 37 /// 初始化可等待的任務。 38 /// </summary> 39 /// <param name="task"></param> 40 public AwaitableTask(Task task) => _task = task; 41 42 /// <summary> 43 /// 獲取任務是否已完成 44 /// </summary> 45 public bool IsCompleted => _task.IsCompleted; 46 47 /// <summary> 48 /// 任務的Id 49 /// </summary> 50 public int TaskId => _task.Id; 51 52 /// <summary> 53 /// 開始任務 54 /// </summary> 55 public void Start() => _task.Start(); 56 57 /// <summary> 58 /// 同步執行開始任務 59 /// </summary> 60 public void RunSynchronously() => _task.RunSynchronously(); 61 62 #endregion 63 64 #region TaskAwaiter 65 66 /// <summary> 67 /// 獲取任務等待器 68 /// </summary> 69 /// <returns></returns> 70 public TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 71 72 /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary> 73 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 74 public struct TaskAwaiter : INotifyCompletion 75 { 76 private readonly AwaitableTask _task; 77 78 /// <summary> 79 /// 任務等待器 80 /// </summary> 81 /// <param name="awaitableTask"></param> 82 public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask; 83 84 /// <summary> 85 /// 任務是否完成. 86 /// </summary> 87 public bool IsCompleted => _task._task.IsCompleted; 88 89 /// <inheritdoc /> 90 public void OnCompleted(Action continuation) 91 { 92 var This = this; 93 _task._task.ContinueWith(t => 94 { 95 if (!This._task.NotExecutable) continuation?.Invoke(); 96 }); 97 } 98 /// <summary> 99 /// 獲取任務結果 100 /// </summary> 101 public void GetResult() => _task._task.Wait(); 102 } 103 104 #endregion 105 106 }
無效的操做能夠分爲如下倆種:async
- 已經進行中的操做,後續結果應標記爲無效
- 還沒開始的操做,後續不執行
自定義任務類型 AwaitableTask中,添加倆個字段NotExecutable、IsInvalid:ide
1 /// <summary> 2 /// 獲取任務是否爲不可執行狀態 3 /// </summary> 4 public bool NotExecutable { get; private set; } 5 /// <summary> 6 /// 獲取任務是否有效 7 /// 注:對無效任務,能夠不作處理。減小併發操做致使的干擾 8 /// </summary> 9 public bool IsInvalid { get; private set; } = true;
2. 有返回結果的可等待任務類 AwaitableTask<TResult>:測試
1 /// <summary> 2 /// 可等待的任務 3 /// </summary> 4 /// <typeparam name="TResult"></typeparam> 5 public class AwaitableTask<TResult> : AwaitableTask 6 { 7 private readonly Task<TResult> _task; 8 /// <summary> 9 /// 初始化可等待的任務 10 /// </summary> 11 /// <param name="task">須要執行的任務</param> 12 public AwaitableTask(Task<TResult> task) : base(task) => _task = task; 13 14 #region TaskAwaiter 15 16 /// <summary> 17 /// 獲取任務等待器 18 /// </summary> 19 /// <returns></returns> 20 public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 21 22 /// <summary> 23 /// 任務等待器 24 /// </summary> 25 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 26 public new struct TaskAwaiter : INotifyCompletion 27 { 28 private readonly AwaitableTask<TResult> _task; 29 30 /// <summary> 31 /// 初始化任務等待器 32 /// </summary> 33 /// <param name="awaitableTask"></param> 34 public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 35 36 /// <summary> 37 /// 任務是否已完成。 38 /// </summary> 39 public bool IsCompleted => _task._task.IsCompleted; 40 41 /// <inheritdoc /> 42 public void OnCompleted(Action continuation) 43 { 44 var This = this; 45 _task._task.ContinueWith(t => 46 { 47 if (!This._task.NotExecutable) continuation?.Invoke(); 48 }); 49 } 50 51 /// <summary> 52 /// 獲取任務結果。 53 /// </summary> 54 /// <returns></returns> 55 public TResult GetResult() => _task._task.Result; 56 } 57 58 #endregion 59 }
添加任務等待器,同步等待結果返回:this
1 /// <summary> 2 /// 獲取任務等待器 3 /// </summary> 4 /// <returns></returns> 5 public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 6 7 /// <summary> 8 /// 任務等待器 9 /// </summary> 10 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 11 public new struct TaskAwaiter : INotifyCompletion 12 { 13 private readonly AwaitableTask<TResult> _task; 14 15 /// <summary> 16 /// 初始化任務等待器 17 /// </summary> 18 /// <param name="awaitableTask"></param> 19 public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 20 21 /// <summary> 22 /// 任務是否已完成。 23 /// </summary> 24 public bool IsCompleted => _task._task.IsCompleted; 25 26 /// <inheritdoc /> 27 public void OnCompleted(Action continuation) 28 { 29 var This = this; 30 _task._task.ContinueWith(t => 31 { 32 if (!This._task.NotExecutable) continuation?.Invoke(); 33 }); 34 } 35 36 /// <summary> 37 /// 獲取任務結果。 38 /// </summary> 39 /// <returns></returns> 40 public TResult GetResult() => _task._task.Result; 41 }
異步任務隊列
1 /// <summary> 2 /// 異步任務隊列 3 /// </summary> 4 public class AsyncTaskQueue : IDisposable 5 { 6 /// <summary> 7 /// 異步任務隊列 8 /// </summary> 9 public AsyncTaskQueue() 10 { 11 _autoResetEvent = new AutoResetEvent(false); 12 _thread = new Thread(InternalRunning) { IsBackground = true }; 13 _thread.Start(); 14 } 15 16 #region 執行 17 18 /// <summary> 19 /// 執行異步操做 20 /// </summary> 21 /// <typeparam name="T">返回結果類型</typeparam> 22 /// <param name="func">異步操做</param> 23 /// <returns>isInvalid:異步操做是否有效;result:異步操做結果</returns> 24 public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func) 25 { 26 var task = GetExecutableTask(func); 27 var result = await await task; 28 if (!task.IsInvalid) 29 { 30 result = default(T); 31 } 32 return (task.IsInvalid, result); 33 } 34 35 /// <summary> 36 /// 執行異步操做 37 /// </summary> 38 /// <typeparam name="T"></typeparam> 39 /// <param name="func"></param> 40 /// <returns></returns> 41 public async Task<bool> ExecuteAsync<T>(Func<Task> func) 42 { 43 var task = GetExecutableTask(func); 44 await await task; 45 return task.IsInvalid; 46 } 47 48 #endregion 49 50 #region 添加任務 51 52 /// <summary> 53 /// 獲取待執行任務 54 /// </summary> 55 /// <param name="action"></param> 56 /// <returns></returns> 57 private AwaitableTask GetExecutableTask(Action action) 58 { 59 var awaitableTask = new AwaitableTask(new Task(action)); 60 AddPenddingTaskToQueue(awaitableTask); 61 return awaitableTask; 62 } 63 64 /// <summary> 65 /// 獲取待執行任務 66 /// </summary> 67 /// <typeparam name="TResult"></typeparam> 68 /// <param name="function"></param> 69 /// <returns></returns> 70 private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function) 71 { 72 var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function)); 73 AddPenddingTaskToQueue(awaitableTask); 74 return awaitableTask; 75 } 76 77 /// <summary> 78 /// 添加待執行任務到隊列 79 /// </summary> 80 /// <param name="task"></param> 81 /// <returns></returns> 82 private void AddPenddingTaskToQueue(AwaitableTask task) 83 { 84 //添加隊列,加鎖。 85 lock (_queue) 86 { 87 _queue.Enqueue(task); 88 //開始執行任務 89 _autoResetEvent.Set(); 90 } 91 } 92 93 #endregion 94 95 #region 內部運行 96 97 private void InternalRunning() 98 { 99 while (!_isDisposed) 100 { 101 if (_queue.Count == 0) 102 { 103 //等待後續任務 104 _autoResetEvent.WaitOne(); 105 } 106 while (TryGetNextTask(out var task)) 107 { 108 //如已從隊列中刪除 109 if (task.NotExecutable) continue; 110 111 if (UseSingleThread) 112 { 113 task.RunSynchronously(); 114 } 115 else 116 { 117 task.Start(); 118 } 119 } 120 } 121 } 122 /// <summary> 123 /// 上一次異步操做 124 /// </summary> 125 private AwaitableTask _lastDoingTask; 126 private bool TryGetNextTask(out AwaitableTask task) 127 { 128 task = null; 129 while (_queue.Count > 0) 130 { 131 //獲取並從隊列中移除任務 132 if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) 133 { 134 //設置進行中的異步操做無效 135 _lastDoingTask?.MarkTaskValid(); 136 _lastDoingTask = task; 137 return true; 138 } 139 //併發操做,設置任務不可執行 140 task.SetNotExecutable(); 141 } 142 return false; 143 } 144 145 #endregion 146 147 #region dispose 148 149 /// <inheritdoc /> 150 public void Dispose() 151 { 152 Dispose(true); 153 GC.SuppressFinalize(this); 154 } 155 156 /// <summary> 157 /// 析構任務隊列 158 /// </summary> 159 ~AsyncTaskQueue() => Dispose(false); 160 161 private void Dispose(bool disposing) 162 { 163 if (_isDisposed) return; 164 if (disposing) 165 { 166 _autoResetEvent.Dispose(); 167 } 168 _thread = null; 169 _autoResetEvent = null; 170 _isDisposed = true; 171 } 172 173 #endregion 174 175 #region 屬性及字段 176 177 /// <summary> 178 /// 是否使用單線程完成任務. 179 /// </summary> 180 public bool UseSingleThread { get; set; } = true; 181 182 /// <summary> 183 /// 自動取消之前的任務。 184 /// </summary> 185 public bool AutoCancelPreviousTask { get; set; } = false; 186 187 private bool _isDisposed; 188 private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>(); 189 private Thread _thread; 190 private AutoResetEvent _autoResetEvent; 191 192 #endregion
添加異步任務隊列類,用於任務的管理,如添加、執行、篩選等:spa
1. 自動取消以前的任務 AutoCancelPreviousTask線程
內部使用線程,循環獲取當前任務列表,若是當前任務被標記NotExecutable不可執行,則跳過。code
NotExecutable是什麼時候標記的?
獲取任務時,標記全部獲取的任務爲NotExecutable。直到任務列表中爲空,那麼只執行最後獲取的一個任務。
2. 標記已經進行的任務無效 MarkTaskValid
當前進行的任務,沒法停止,那麼標記爲無效便可。
1 /// <summary> 2 /// 上一次異步操做 3 /// </summary> 4 private AwaitableTask _lastDoingTask; 5 private bool TryGetNextTask(out AwaitableTask task) 6 { 7 task = null; 8 while (_queue.Count > 0) 9 { 10 //獲取並從隊列中移除任務 11 if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) 12 { 13 //設置進行中的異步操做無效 14 _lastDoingTask?.MarkTaskValid(); 15 _lastDoingTask = task; 16 return true; 17 } 18 //併發操做,設置任務不可執行 19 task.SetNotExecutable(); 20 } 21 return false; 22 }
後續執行完後,根據此標記,設置操做結果爲空。
1 /// <summary> 2 /// 執行異步操做 3 /// </summary> 4 /// <typeparam name="T">返回結果類型</typeparam> 5 /// <param name="func">異步操做</param> 6 /// <returns>isInvalid:異步操做是否有效;result:異步操做結果</returns> 7 public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func) 8 { 9 var task = GetExecutableTask(func); 10 var result = await await task; 11 if (!task.IsInvalid) 12 { 13 result = default(T); 14 } 15 return (task.IsInvalid, result); 16 }
實踐測試
啓動10個併發任務,測試實際的任務隊列併發操做管理:
1 public MainWindow() 2 { 3 InitializeComponent(); 4 _asyncTaskQueue = new AsyncTaskQueue 5 { 6 AutoCancelPreviousTask = true, 7 UseSingleThread = true 8 }; 9 } 10 private AsyncTaskQueue _asyncTaskQueue; 11 private void ButtonBase_OnClick(object sender, RoutedEventArgs e) 12 { 13 // 快速啓動9個任務 14 for (var i = 1; i < 10; i++) 15 { 16 Test(_asyncTaskQueue, i); 17 } 18 } 19 public static async void Test(AsyncTaskQueue taskQueue, int num) 20 { 21 var result = await taskQueue.ExecuteAsync(async () => 22 { 23 Debug.WriteLine("輸入:" + num); 24 // 長時間耗時任務 25 await Task.Delay(TimeSpan.FromSeconds(5)); 26 return num * 100; 27 }); 28 Debug.WriteLine($"{num}輸出的:" + result); 29 }
測試結果以下:
一共9次操做,只有最後一次操做結果,纔是有效的。其它8次操做,一次是無效的,7次操做被取消不執行。
原文出處:https://www.cnblogs.com/kybs0/p/11988554.html