通常狀況下,常常會遇到一個單線程程序時執行對CPU,MEMORY,IO利用率上不來,且速度慢下問題;那麼,怎麼解決這些問題呢?api
據我我的經驗來講有如下兩種方式:數組
一、並行、多線程(Parallel、Task、ThreadPool)安全
二、多進程MultipleProcess多線程
剛好工做中又一次遇到單線程程序性能低的問題,本次我主要想嘗試使用ThreadPool來實現多線程,而且在實現多線程任務同步結束。異步
一個ManualResetEvent結合Interlocked來實現線程同步結束。函數
1 static void Main(string[] args) 2 { 3 using (ManualResetEvent finish = new ManualResetEvent(false)) 4 { 5 int maxThreadCount = 100; 6 for (var i = 0; i < 100; i++) { 7 ThreadPool.QueueUserWorkItem((Object state)=> { 8 Console.WriteLine("task:{0}",state); 9 10 // 以原子操做的形式遞減指定變量的值並存儲結果。 11 if (Interlocked.Decrement(ref maxThreadCount) == 0) { 12 // 將事件狀態設置爲有信號,從而容許一個或多個等待線程繼續執行。 13 finish.Set(); 14 } 15 }, i); 16 } 17 18 // 阻止當前線程,直到當前 System.Threading.WaitHandle 收到信號。 19 finish.WaitOne(); 20 } 21 22 Console.WriteLine("Complete!"); 23 Console.ReadKey();
上邊的代碼是可行性,當系統線程數超過系統容許最大數時,線程會被在線程池中排隊等待。性能
ManualResetEvent集合(每個線程由集合中的惟一一個ManualResetEvent對象來實現線程的同步跟蹤)結合WaitHandle.WaitAll(ManualResetEvent集合)來實現線程同步結束。ui
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; namespace ThreadPoolTest { class MyTask { private ManualResetEvent finish = null; public MyTask(ManualResetEvent finish) { this.finish = finish; } public void MyTaskThreadPoolCallback(Object state) { Console.WriteLine("task:{0}", state); // 將事件狀態設置爲有信號,從而容許一個或多個等待線程繼續執行。 this.finish.Set(); } } class Program { static void Main(string[] args) { const int maxThreadCount = 64; ManualResetEvent[] finishItems = new ManualResetEvent[maxThreadCount]; MyTask[] myTaskItems = new MyTask[maxThreadCount] ; for (var i = 0; i < maxThreadCount; i++) { finishItems[i] = new ManualResetEvent(false); MyTask myTask = new MyTask(finishItems[i]); myTaskItems[i] = myTask; ThreadPool.QueueUserWorkItem(myTask.MyTaskThreadPoolCallback, i); } // 等待指定數組中的全部元素都收到信號。 WaitHandle.WaitAll(finishItems); Console.WriteLine("Complete!"); Console.ReadKey(); } } }
儘管這種想法不錯,可是存在一些問題:好比ManualResetEvent集合數量不容許超過系統容許的最大數量,個人計算機系統容許的最大數量是64,當我把配置超過64時(const int maxThreadCount = 65;),就會拋出異常。this
但是通常狀況下遇到這種業務的狀況下,只要修改多線程,必然會遇到某個對象不容許被多個線程操做的問題。spa
好比:
一、多個線程同時向一個文件中寫入內容,這種狀況通常使用鎖來包成被訪問對象的安全性。好比:互斥鎖(lock、Mutex)、讀寫鎖(ReadWriteLock)、Monitor、Semaphore(信號燈)、Interlocked(內存共享)等。
二、多個線程同時修改一個非線程安全集合對象(List,Collection,Dictionary,Bag,Queue,Stack,ArrayList,Array,HashTable等)時,每每會拋出異常。針對這種狀況,須要使用命名空間System.Collections.Concurrent.*下支持線程安全的集合、字典、隊列、棧等對象來替代。
咱們須要對一個多行文本文件進行解析,根據具體地址解析其中的經緯度信息。若是解析過程當中解析失敗的行,須要記錄到一個_error.txt;解析成功的記錄行,記錄到_result.txt。使用單線程分析過程當中已經遇到了性能低問題,需求解決方案是使用ThreadPool技術。
1 private static int maxThreadCount = 0; 2 private static int fakeMaxThreadCount = int.MaxValue; 3 private static ManualResetEvent finish = new ManualResetEvent(false); 4 private static object errorLocker = new object(); 5 private static object resultLocker = new object(); 6 private static object maxThreadCountLcker = new object(); 7 8 public void ParserFile(string filePath) 9 { 10 using (StreamWriter writerError = new StreamWriter(filePath + "_error")) 11 { 12 using (StreamWriter writerResult = new StreamWriter(filePath + "_result")) 13 { 14 finish = new ManualResetEvent(false); 15 using (StreamReader reader = new StreamReader(filePath)) 16 { 17 string line = reader.ReadLine(); 18 while (line != null) 19 { 20 maxThreadCount++; 21 ThreadPool.QueueUserWorkItem(DoWork, new object[] { line, writerError, writerResult 22 }); 23 24 line = reader.ReadLine(); 25 } 26 } 27 28 maxThreadCount++; 29 lock (maxThreadCountLcker) 30 { 31 fakeMaxThreadCount = maxThreadCount; 32 } 33 34 ThreadPool.QueueUserWorkItem(DoWork, new object[] { }); 35 36 finish.WaitOne(); 37 finish.Close(); 38 finish.Dispose(); 39 } 40 } 41 } 42 43 44 45 private void DoWork(object state) 46 { 47 object[] objectItem = state as object[]; 48 if (objectItem.Length != 3) 49 { 50 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0) 51 { 52 finish.Set(); 53 } 54 return; 55 } 56 string line = objectItem[0].ToString(); 57 StreamWriter writerError = objectItem[1] as StreamWriter; 58 StreamWriter writerResult = objectItem[2] as StreamWriter; 59 60 try 61 { 62 string[] fields = line.Split(new char[] { '|' }); 63 64 string imsi = fields[0]; 65 string city = fields[1]; 66 string county = fields[2]; 67 string address = fields[3]; 68 69 // http://restapi.amap.com/v3/geocode/geo?key=7de8697669288fc848e12a08f58d995e&s=rsv3&city=**市&address=**省**市**區**路23號 70 string uri = " http://restapi.amap.com/v3/geocode/geo"; 71 string parameter = string.Format("key={0}&s={1}&city={2}&address={3}", "7de8697669288fc848e12a08f58d995e", "rsv3", "**(市名稱)", address); 72 73 // {"status":"1","info":"OK","infocode":"10000","count":"1","geocodes":[{"formatted_address":"***省**市**區***路|23號","province":"***","citycode":"***","city":"***市","district":"***區","township":[],"neighborhood":{"name":[],"type":[]},"building":{"name":[],"type":[]},"adcode":"330105","street":[],"number":[],"location":"120.151367,30.362293","level":"門牌號"}]} 74 string result = GetRequesetContext(uri, parameter); 75 if (string.IsNullOrEmpty(result) || result.IndexOf("location") == -1) 76 { 77 lock (errorLocker) 78 { 79 writerError.WriteLine(result); 80 } 81 } 82 else 83 { 84 int indexCount = 0; 85 List<string> lnglatItems = new List<string>(); 86 foreach (string resultItem in result.Split(new string[] { "\",\"", ",\"" }, StringSplitOptions.RemoveEmptyEntries)) 87 { 88 if (resultItem.IndexOf("location") != -1) 89 { 90 indexCount++; 91 lnglatItems.Add(resultItem.Split(new char[] { ':' })[1].Replace("\"", string.Empty)); 92 } 93 } 94 if (indexCount == 1) 95 { 96 lock (resultLocker) 97 { 98 writerResult.WriteLine(address + "|" + lnglatItems[0] + "|" + imsi); 99 } 100 } 101 else 102 { 103 lock (resultLocker) 104 { 105 writerError.WriteLine(address + "|" + string.Join(",", lnglatItems) + "|" + imsi); 106 } 107 } 108 } 109 } 110 catch (Exception ex) 111 { 112 logger.Error("{0}\r\n{1}", ex.Message, ex.StackTrace); 113 lock (errorLocker) 114 { 115 writerError.WriteLine(line); 116 } 117 } 118 finally 119 { 120 lock (maxThreadCountLcker) 121 { 122 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0) 123 { 124 finish.Set(); 125 } 126 } 127 } 128 }
備註:
關於ThreadPool線程池內最大線程控制函數:SetMaxThreads 設置能夠同時處於活動狀態的線程池的請求數目。 全部大於此數目的請求將保持排隊狀態,直到線程池線程變爲可用。
[SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)] public static bool SetMaxThreads( int workerThreads, int completionPortThreads )workerThreads:線程池中輔助線程的最大數目。
completionPortThreads:線程池中異步 I/O 線程的最大數目。
可是,須要注意事項:
不能將輔助線程的數目或 I/O 完成線程的數目設置爲小於計算機的處理器數目。
若是承載了公共語言運行時,例如由 Internet 信息服務 (IIS) 或 SQL Server 承載,主機可能會限制或禁止更改線程池大小。
更改線程池中的最大線程數時需謹慎。 雖然這類更改可能對您的代碼有益,但對您使用的代碼庫可能會有不利的影響。
將線程池大小設置得太大可能致使性能問題。 若是同時執行的線程太多,任務切換開銷就成爲影響性能的一個主要因素。