c#:ThreadPool實現並行分析,並實現線程同步結束

  • 背景:

通常狀況下,常常會遇到一個單線程程序時執行對CPU,MEMORY,IO利用率上不來,且速度慢下問題;那麼,怎麼解決這些問題呢?api

據我我的經驗來講有如下兩種方式:數組

一、並行、多線程(Parallel、Task、ThreadPool)安全

二、多進程MultipleProcess多線程

剛好工做中又一次遇到單線程程序性能低的問題,本次我主要想嘗試使用ThreadPool來實現多線程,而且在實現多線程任務同步結束。異步

  • ThreadPool線程同步結束示例一:

一個ManualResetEvent結合Interlocked來實現線程同步結束。函數

 1  static void Main(string[] args)
 2         {
 3             using (ManualResetEvent finish = new ManualResetEvent(false))
 4             {
 5                 int maxThreadCount = 100;
 6                 for (var i = 0; i < 100; i++) {
 7                     ThreadPool.QueueUserWorkItem((Object state)=> {
 8                         Console.WriteLine("task:{0}",state);
 9 
10                         // 以原子操做的形式遞減指定變量的值並存儲結果。
11                         if (Interlocked.Decrement(ref maxThreadCount) == 0) {
12                             // 將事件狀態設置爲有信號,從而容許一個或多個等待線程繼續執行。
13                             finish.Set();
14                         }                        
15                     }, i);
16                 }
17 
18                 // 阻止當前線程,直到當前 System.Threading.WaitHandle 收到信號。
19                 finish.WaitOne();
20             }
21 
22             Console.WriteLine("Complete!");
23             Console.ReadKey();

上邊的代碼是可行性,當系統線程數超過系統容許最大數時,線程會被在線程池中排隊等待。性能

  • ThreadPool線程同步結束示例二: 

ManualResetEvent集合(每個線程由集合中的惟一一個ManualResetEvent對象來實現線程的同步跟蹤)結合WaitHandle.WaitAll(ManualResetEvent集合)來實現線程同步結束。ui

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadPoolTest
{
    class MyTask
    {
        private ManualResetEvent finish = null;
        public MyTask(ManualResetEvent finish)
        {
            this.finish = finish;
        }

        public void MyTaskThreadPoolCallback(Object state)
        {
            Console.WriteLine("task:{0}", state);

            // 將事件狀態設置爲有信號,從而容許一個或多個等待線程繼續執行。
            this.finish.Set();
        }
    }


    class Program
    {
        static void Main(string[] args)
        {
            const int maxThreadCount = 64;
            ManualResetEvent[] finishItems = new ManualResetEvent[maxThreadCount];
            MyTask[] myTaskItems = new MyTask[maxThreadCount]
                ;
            for (var i = 0; i < maxThreadCount; i++)
            {
                finishItems[i] = new ManualResetEvent(false);

                MyTask myTask = new MyTask(finishItems[i]);
                myTaskItems[i] = myTask;

                ThreadPool.QueueUserWorkItem(myTask.MyTaskThreadPoolCallback, i);
            }

            // 等待指定數組中的全部元素都收到信號。
            WaitHandle.WaitAll(finishItems);

            Console.WriteLine("Complete!");
            Console.ReadKey();
        }


    }
}

儘管這種想法不錯,可是存在一些問題:好比ManualResetEvent集合數量不容許超過系統容許的最大數量,個人計算機系統容許的最大數量是64,當我把配置超過64時(const int maxThreadCount = 65;),就會拋出異常。this

 

 

  • 實現多線程時,須要注意事項:

但是通常狀況下遇到這種業務的狀況下,只要修改多線程,必然會遇到某個對象不容許被多個線程操做的問題。spa

好比:

一、多個線程同時向一個文件中寫入內容,這種狀況通常使用鎖來包成被訪問對象的安全性。好比:互斥鎖(lock、Mutex)、讀寫鎖(ReadWriteLock)、Monitor、Semaphore(信號燈)、Interlocked(內存共享)等。

二、多個線程同時修改一個非線程安全集合對象(List,Collection,Dictionary,Bag,Queue,Stack,ArrayList,Array,HashTable等)時,每每會拋出異常。針對這種狀況,須要使用命名空間System.Collections.Concurrent.*下支持線程安全的集合、字典、隊列、棧等對象來替代。

  • 業務場景:

咱們須要對一個多行文本文件進行解析,根據具體地址解析其中的經緯度信息。若是解析過程當中解析失敗的行,須要記錄到一個_error.txt;解析成功的記錄行,記錄到_result.txt。使用單線程分析過程當中已經遇到了性能低問題,需求解決方案是使用ThreadPool技術。

  • 業務實現:
  1         private static int maxThreadCount = 0;
  2         private static int fakeMaxThreadCount = int.MaxValue;
  3         private static ManualResetEvent finish = new ManualResetEvent(false);
  4         private static object errorLocker = new object();
  5         private static object resultLocker = new object();
  6         private static object maxThreadCountLcker = new object();
  7 
  8         public void ParserFile(string filePath)
  9         {
 10             using (StreamWriter writerError = new StreamWriter(filePath + "_error"))
 11             {
 12                 using (StreamWriter writerResult = new StreamWriter(filePath + "_result"))
 13                 {
 14                     finish = new ManualResetEvent(false);
 15                     using (StreamReader reader = new StreamReader(filePath))
 16                     {
 17                         string line = reader.ReadLine();
 18                         while (line != null)
 19                         {
 20                             maxThreadCount++;
 21                             ThreadPool.QueueUserWorkItem(DoWork, new object[] { line, writerError, writerResult
 22 });
 23 
 24                             line = reader.ReadLine();
 25                         }
 26                     }
 27 
 28                     maxThreadCount++;
 29                     lock (maxThreadCountLcker)
 30                     {
 31                         fakeMaxThreadCount = maxThreadCount;
 32                     }
 33 
 34                     ThreadPool.QueueUserWorkItem(DoWork, new object[] { });
 35 
 36                     finish.WaitOne();
 37                     finish.Close();
 38                     finish.Dispose();
 39                 }
 40             }
 41         }
 42 
 43 
 44 
 45         private void DoWork(object state)
 46         {
 47             object[] objectItem = state as object[];
 48             if (objectItem.Length != 3)
 49             {
 50                 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
 51                 {
 52                     finish.Set();
 53                 }
 54                 return;
 55             }
 56             string line = objectItem[0].ToString();
 57             StreamWriter writerError = objectItem[1] as StreamWriter;
 58             StreamWriter writerResult = objectItem[2] as StreamWriter;
 59 
 60             try
 61             {
 62                 string[] fields = line.Split(new char[] { '|' });
 63 
 64                 string imsi = fields[0];
 65                 string city = fields[1];
 66                 string county = fields[2];
 67                 string address = fields[3];
 68 
 69                 // http://restapi.amap.com/v3/geocode/geo?key=7de8697669288fc848e12a08f58d995e&s=rsv3&city=**市&address=**省**市**區**路23號
 70                 string uri = " http://restapi.amap.com/v3/geocode/geo";
 71                 string parameter = string.Format("key={0}&s={1}&city={2}&address={3}", "7de8697669288fc848e12a08f58d995e", "rsv3", "**(市名稱)", address);
 72 
 73                 // {"status":"1","info":"OK","infocode":"10000","count":"1","geocodes":[{"formatted_address":"***省**市**區***路|23號","province":"***","citycode":"***","city":"***市","district":"***區","township":[],"neighborhood":{"name":[],"type":[]},"building":{"name":[],"type":[]},"adcode":"330105","street":[],"number":[],"location":"120.151367,30.362293","level":"門牌號"}]}
 74                 string result = GetRequesetContext(uri, parameter);
 75                 if (string.IsNullOrEmpty(result) || result.IndexOf("location") == -1)
 76                 {
 77                     lock (errorLocker)
 78                     {
 79                         writerError.WriteLine(result);
 80                     }
 81                 }
 82                 else
 83                 {
 84                     int indexCount = 0;
 85                     List<string> lnglatItems = new List<string>();
 86                     foreach (string resultItem in result.Split(new string[] { "\",\"", ",\"" }, StringSplitOptions.RemoveEmptyEntries))
 87                     {
 88                         if (resultItem.IndexOf("location") != -1)
 89                         {
 90                             indexCount++;
 91                             lnglatItems.Add(resultItem.Split(new char[] { ':' })[1].Replace("\"", string.Empty));
 92                         }
 93                     }
 94                     if (indexCount == 1)
 95                     {
 96                         lock (resultLocker)
 97                         {
 98                             writerResult.WriteLine(address + "|" + lnglatItems[0] + "|" + imsi);
 99                         }
100                     }
101                     else
102                     {
103                         lock (resultLocker)
104                         {                            
105                             writerError.WriteLine(address + "|" + string.Join(",", lnglatItems) + "|" + imsi);
106                         }
107                     }
108                 }
109             }
110             catch (Exception ex)
111             {
112                 logger.Error("{0}\r\n{1}", ex.Message, ex.StackTrace);
113                 lock (errorLocker)
114                 {
115                     writerError.WriteLine(line);
116                 }
117             }
118             finally
119             {
120                 lock (maxThreadCountLcker)
121                 {
122                     if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)
123                     {
124                         finish.Set();
125                     }
126                 }
127             }
128         }

 備註:

關於ThreadPool線程池內最大線程控制函數:SetMaxThreads 設置能夠同時處於活動狀態的線程池的請求數目。 全部大於此數目的請求將保持排隊狀態,直到線程池線程變爲可用。

[SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)]
public static bool SetMaxThreads(
    int workerThreads,
    int completionPortThreads
)

workerThreads:線程池中輔助線程的最大數目。

completionPortThreads:線程池中異步 I/O 線程的最大數目。

可是,須要注意事項:

不能將輔助線程的數目或 I/O 完成線程的數目設置爲小於計算機的處理器數目。

若是承載了公共語言運行時,例如由 Internet 信息服務 (IIS) 或 SQL Server 承載,主機可能會限制或禁止更改線程池大小。

更改線程池中的最大線程數時需謹慎。 雖然這類更改可能對您的代碼有益,但對您使用的代碼庫可能會有不利的影響。

將線程池大小設置得太大可能致使性能問題。 若是同時執行的線程太多,任務切換開銷就成爲影響性能的一個主要因素。

相關文章
相關標籤/搜索