C# 併發隊列ConcurrentQueue

//測試函數
static async Task RunProgram()  
{  
    var taskQueue = new ConcurrentQueue<CustomTask>();  
    var cts = new CancellationTokenSource();  
    //生成任務添加至併發隊列  
    var taskSource = Task.Run(() => TaskProducer(taskQueue));  
    //同時啓動四個任務處理隊列中的任務  
    Task[] processors = new Task[4];  
    for(int i =1;i <= 4; i++)  
    {  
        string processId = i.ToString();  
        processors[i - 1] = Task.Run(  
            () => TaskProcessor(taskQueue, "Processor " + processId, cts.Token)  
            );  
    }  
    await taskSource;  
    //向任務發送取消信號  
    cts.CancelAfter(TimeSpan.FromSeconds(2));  
    await Task.WhenAll(processors);  
}  

//產生任務
static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)  
{  
    for(int i= 0;i < 20; i++)  
    {  
        await Task.Delay(50);  
        var workItem = new CustomTask { Id = i };  
        queue.Enqueue(workItem);  
        Console.WriteLine("task {0} has been posted", workItem.Id);  
    }  
}  

//執行任務
static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)  
{  
    CustomTask workItem;  
    bool dequeueSuccesful = false;  
    await GetRandomDelay();  
    do  
    {  
        dequeueSuccesful = queue.TryDequeue(out workItem);  
        if (dequeueSuccesful)  
        {  
            Console.WriteLine("task {0} has been processed by {1}", workItem.Id, name);  
        }  
        await GetRandomDelay();  
    }  
    while (!token.IsCancellationRequested);  
}  

static Task GetRandomDelay()  
{  
    int delay = new Random(DateTime.Now.Millisecond).Next(1500);  
    return Task.Delay(delay);  
}  

class CustomTask  
{  
    public int Id { get; set; }  
}

調用併發

static void Main(string[] args)  
{  
    Task t = RunProgram();  
    t.Wait();  
    Console.ReadKey();  
}
相關文章
相關標籤/搜索