本文內容 均參考自 《C#並行高級編程》 編程
TPL 支持 數據並行(有大量數據要處理,必須對每一個數據執行一樣的操做, 任務並行(有好多能夠併發運行的操做),流水線(任務並行和數據並行的結合體)併發
在.net 4.0 引入新的 Task Parallel Library 處理 並行開發 。 負載均衡
Parallel類 異步
關鍵詞 : spa
Parallel.For and Parallel.Foreach - 負載均衡的多任務 .net
Parallel.Invoke - 並行運行多任務 線程
ParallelOptions - 指定最大並行度 (實例化一個類並修改MaxDegreeOfParallelism 屬性的值 ) orm
Environment.ProcessorCount - 內核最大數 blog
命令式任務並行 進程
關鍵詞 : Task類 , 一個task 類表示一個異步操做 (須要考慮到任務的開銷)
啓動任務使用Task類的Start 方法 , 等待線程完成使用WaitAll 方法 , 經過CancellationTokenSource 的Cancel方法來 中斷 Task的運行
怎樣表達任務間的父子關係 ? TaskCreationOption的AttachToParent來完成
怎樣來表達串行任務 ? Task.ContinueWith
併發集合
BlockingCollection
ConcurrentDictionary/ConcurrentQueue/ConcurrentStack
下面來一個例子是實操 C# 多任務併發。
場景 : 主進程 打開 一個 生產者線程 和 一個消費線程 。 他們之間能夠相互對話, 如([動詞,名詞]) say,hello task,a . 生產者說一句話 消費者聽, 消費者或應答或提交新的任務或結束本身。
代碼
using System; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; namespace TPLTest { class Program { public static readonly int MAX_DATA_LENGTH = 256; private static BlockingCollection<Message> bcstr = new BlockingCollection<Message>(MAX_DATA_LENGTH) ; public static readonly string SAY_THANKS = "thanks"; public static readonly string SAY_WELCOME = "welcome!"; public static readonly string BYE = "bye"; public static readonly string SAY = "say"; public static readonly string TASK = "task"; public static readonly string TIMEOUT = "timeout"; public static readonly string ONLINE = "ONLINE"; public static readonly string WHAT = "What?"; public static readonly int WAIT = 20000; public static void Main(string[] args) { //消費者線程 ParallelOptions po = new ParallelOptions(); po.MaxDegreeOfParallelism = -1; Parallel.Invoke(po,() => { int selfID = Environment.CurrentManagedThreadId; Message customer = new Message(); customer.CustomerThreadID = selfID ; customer.content = ONLINE; Console.WriteLine(customer.ToString(false)); while(true){ if (bcstr.TryTake(out customer, WAIT)) { customer.CustomerThreadID = selfID ; customer.doAction(); Console.WriteLine(" "); Console.WriteLine(customer.ToString(false)); if (customer.endThread()){ break; } } else { if (customer == null) { customer = new Message(); } customer.CustomerThreadID = selfID ; customer.content = TIMEOUT; Console.WriteLine(customer.ToString(false)); } } }, () => { int prdID = Environment.CurrentManagedThreadId; Message productor = new Message(); productor.ProductorThreadID = prdID; productor.content = ONLINE; Console.WriteLine(productor.ToString(true)); while(true){ Console.Write("Productor Behavior (i.e. say,hello) : "); string msgContent = Console.ReadLine(); productor = new Message(); productor.ProductorThreadID = prdID; productor.key = msgContent.Split(',')[0]; productor.content = msgContent.Split(',')[1]; bcstr.Add(productor); if (productor.endThread()) { break; } } }); } } class Message { public int ProductorThreadID {get; set;} public int CustomerThreadID {get; set;} public string key {get; set;} public string content{get; set;} public bool endThread() { return string.Compare(key, Program.BYE) == 0; } public string ToString(bool isProductor){ return string.Format("{0} Thread ID {1} : {2}", isProductor ? "Productor" : "Customer", isProductor ? ProductorThreadID.ToString() : CustomerThreadID.ToString(), content); } public void doAction(){ if (string.Compare(key, Program.SAY) == 0) { content = string.Compare(content, Program.SAY_THANKS) == 0 ? Program.SAY_WELCOME : Program.WHAT; } if (string.Compare(key, Program.TASK) == 0) { Task taskA = Task.Factory.StartNew(() => { Console.WriteLine("task A begin "); Task ChildOfFatehrA = Task.Factory.StartNew(() => { Console.WriteLine("Sub task A begin "); Thread.Sleep(1000); Console.WriteLine("Sub task A end "); }); ChildOfFatehrA.Wait(); Console.WriteLine("task A end "); }); taskA.ContinueWith(taskB => { Console.WriteLine("task B begin "); Thread.Sleep(5000); Console.WriteLine("task B end "); }); taskA.Wait(); } } } }