C# 並行開發總結

本文內容 均參考自 《C#並行高級編程》 編程

TPL 支持 數據並行(有大量數據要處理,必須對每一個數據執行一樣的操做, 任務並行(有好多能夠併發運行的操做),流水線(任務並行和數據並行的結合體)併發

在.net 4.0 引入新的 Task Parallel Library 處理 並行開發 。 負載均衡

Parallel類  異步

關鍵詞   :  spa

Parallel.For   and Parallel.Foreach    -  負載均衡的多任務 .net

Parallel.Invoke                              -  並行運行多任務 線程

ParallelOptions                              -  指定最大並行度  (實例化一個類並修改MaxDegreeOfParallelism 屬性的值 )    orm

Environment.ProcessorCount          -   內核最大數 blog

 

命令式任務並行 進程

關鍵詞 : Task類 , 一個task 類表示一個異步操做 (須要考慮到任務的開銷)  

啓動任務使用Task類的Start  方法 ,  等待線程完成使用WaitAll 方法 ,  經過CancellationTokenSource 的Cancel方法來 中斷 Task的運行 

怎樣表達任務間的父子關係 ?   TaskCreationOption的AttachToParent來完成 

怎樣來表達串行任務 ? Task.ContinueWith

 

 

併發集合 

BlockingCollection 

ConcurrentDictionary/ConcurrentQueue/ConcurrentStack

 

下面來一個例子是實操 C# 多任務併發。

場景 :  主進程 打開 一個 生產者線程 和 一個消費線程 。 他們之間能夠相互對話, 如([動詞,名詞]) say,hello  task,a  .   生產者說一句話 消費者聽, 消費者或應答或提交新的任務或結束本身。  

代碼 

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace TPLTest
{
	class Program
	{
		public static readonly int MAX_DATA_LENGTH         =  256; 
		private static BlockingCollection<Message> bcstr   = new BlockingCollection<Message>(MAX_DATA_LENGTH) ;
		public static readonly string SAY_THANKS           = "thanks";
		public static readonly string SAY_WELCOME          = "welcome!";
		public static readonly string BYE                  = "bye";
		public static readonly string SAY                  = "say";
		public static readonly string TASK                 = "task";
		public static readonly string TIMEOUT              = "timeout";
		public static readonly string ONLINE               = "ONLINE";
		public static readonly string WHAT                 = "What?";
		public static readonly int    WAIT                 = 20000;
		public static void Main(string[] args)
		{
			
			//消費者線程  
			ParallelOptions po = new ParallelOptions();
			po.MaxDegreeOfParallelism = -1; 
			Parallel.Invoke(po,() => {
				int selfID = Environment.CurrentManagedThreadId;
				Message customer = new Message();
				customer.CustomerThreadID = selfID ;
				customer.content = ONLINE;
				Console.WriteLine(customer.ToString(false));
				while(true){
					if (bcstr.TryTake(out customer, WAIT))
					{
					    customer.CustomerThreadID = selfID ;
					    customer.doAction();
					    Console.WriteLine(" ");
					    Console.WriteLine(customer.ToString(false));
						if (customer.endThread()){
					    	break;
					    } 
					     
					} else {
						if (customer == null)
						{
							customer = new Message();
						}
					    customer.CustomerThreadID = selfID ;
					    customer.content  =  TIMEOUT;   
					    Console.WriteLine(customer.ToString(false));
					}
				}
			}, 
			() => {
				int prdID = Environment.CurrentManagedThreadId;
				Message productor  = new Message(); 
				productor.ProductorThreadID = prdID; 
				productor.content           = ONLINE;
				Console.WriteLine(productor.ToString(true));
				while(true){
					Console.Write("Productor Behavior (i.e. say,hello) :   ");
					string msgContent = Console.ReadLine();
					productor       = new Message(); 
					productor.ProductorThreadID = prdID; 
					productor.key   = msgContent.Split(',')[0];
					productor.content = msgContent.Split(',')[1]; 
					bcstr.Add(productor);
					if (productor.endThread()) {
						break;
					}
				}
			});
			
			
			
		}
	}
	
	class Message
	{
		public int    ProductorThreadID {get; set;}
		public int    CustomerThreadID  {get; set;}
		public string key {get; set;}
		public string content{get; set;}
		public bool   endThread()
		{
			return string.Compare(key, Program.BYE) == 0;
		}
		
		public string ToString(bool isProductor){
			return string.Format("{0} Thread ID {1} : {2}", isProductor ? "Productor" : "Customer",  
			                                                isProductor ?  ProductorThreadID.ToString() : CustomerThreadID.ToString(), 
			                                                content);
		}
		
		public void doAction(){
			if (string.Compare(key, Program.SAY) == 0) {
				content = string.Compare(content, Program.SAY_THANKS) == 0 ?  Program.SAY_WELCOME : Program.WHAT;
			}
			
			if (string.Compare(key, Program.TASK) == 0) {
				Task taskA = Task.Factory.StartNew(() => {
				    Console.WriteLine("task A begin ");
				    Task ChildOfFatehrA = Task.Factory.StartNew(() => {
				     	Console.WriteLine("Sub task A begin ");
				    	Thread.Sleep(1000);
				    	Console.WriteLine("Sub task A end ");                          	
					});
				    ChildOfFatehrA.Wait();
				    Console.WriteLine("task A end ");
				    
				});
				taskA.ContinueWith(taskB => {
					Console.WriteLine("task B begin ");
				    Thread.Sleep(5000);
				    Console.WriteLine("task B end ");                     
				});
				taskA.Wait(); 
			}
		}
			
	}
}
相關文章
相關標籤/搜索