//測試函數 static async Task RunProgram() { var taskQueue = new ConcurrentQueue<CustomTask>(); var cts = new CancellationTokenSource(); //生成任務添加至併發隊列 var taskSource = Task.Run(() => TaskProducer(taskQueue)); //同時啓動四個任務處理隊列中的任務 Task[] processors = new Task[4]; for(int i =1;i <= 4; i++) { string processId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskQueue, "Processor " + processId, cts.Token) ); } await taskSource; //向任務發送取消信號 cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } //產生任務 static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for(int i= 0;i < 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; queue.Enqueue(workItem); Console.WriteLine("task {0} has been posted", workItem.Id); } } //執行任務 static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name, CancellationToken token) { CustomTask workItem; bool dequeueSuccesful = false; await GetRandomDelay(); do { dequeueSuccesful = queue.TryDequeue(out workItem); if (dequeueSuccesful) { Console.WriteLine("task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } }
調用併發
static void Main(string[] args) { Task t = RunProgram(); t.Wait(); Console.ReadKey(); }