分析.Net裏線程同步機制

我 們知道並行編程模型兩種:一種是基於消息式的,第二種是基於共享內存式的。 前段時間項目中遇到了第二種 使用多線程開發並行程序共享資源的問題 ,今天以實際案例出發對.net裏的共享內存式的線程同步機制作個總結,因爲某些類庫的應用屬於基礎,因此本次不對基本使用作出講解,基本使用 MSDN是最好的教程。

    1、volatile關鍵字

     基本介紹: 封裝了 Thread.VolatileWrite() 和  Thread.VolatileRead()的實現 ,主要做用是強制刷新高速緩存。編程

     使用場景: 適用於在多核多CPU的機器上 解決變量在內存和高速緩存同步不及時的問題。c#

     案例:參考下文   2、原子操做的 案例 或者 System.Collections.Concurrent命名空間下的 ConcurrentQueue ,ConcurrentDictionary  等併發集合的實現方式。       緩存

    2、原子操做(Interlock)

      基本介紹: 原 子操做是 實現Spinlock,Monitor,ReadWriterLock鎖的基礎,其實現原理是在計算機總線上標誌一個信號來表示資源已經被佔用 若是其餘指令進行修改則等待本次操做完成後才能進行,由於原子操做是在硬件上實現的 因此速度很是快,大約在50個時鐘週期。其實原子操做也能夠看作一種鎖。安全

      使用場景:性 能要求較高的場合,須要對字段進行快速的同步或者對變量進行原子形式的跟新操做(例如:int b=0;  b=b+1  實際分解爲多條彙編指令,在多線程狀況下 多條彙編指令並行的執行可能致使錯誤的結果,因此要保證執行 b=b+1 生成的彙編指令是一個原子形式執行 ),例如實現一個並行隊列,異步隊列等。多線程

     案例:一個基於事件觸發機制隊列的實現併發

001. /// <summary>
002. /// 表示一個實時處理隊列
003. /// </summary>
004. public class ProcessQueue<T>
005. {
006. #region [成員]
007.  
008. private ConcurrentQueue<IEnumerable<T>> queue;
009.  
010. private Action<IEnumerable<T>> PublishHandler;
011.  
012. //指定處理的線程數
013. private int core = Environment.ProcessorCount;
014.  
015. //正在運行的線程數
016. private int runingCore = 0;
017.  
018. public event Action<Exception> OnException;
019.  
020. //隊列是否正在處理數據
021. private int isProcessing=0;
022.  
023. //隊列是否可用
024. private bool enabled = true;
025.  
026. #endregion
027.  
028. #region 構造函數
029.  
030. public ProcessQueue(Action<IEnumerable<T>> handler)
031. {
032.  
033. queue = new ConcurrentQueue<IEnumerable<T>>();
034.  
035. PublishHandler = handler;
036. this.OnException += ProcessException.OnProcessException;
037. }
038.  
039. #endregion
040.  
041. #region [方法]
042.  
043. /// <summary>
044. /// 入隊
045. /// </summary>
046. /// <param name="items">數據集合</param>
047. public void Enqueue(IEnumerable<T> items)
048. {
049. if (items != null)
050. {
051. queue.Enqueue(items);
052. }
053.  
054. //判斷是否隊列有線程正在處理
055. if (enabled && Interlocked.CompareExchange(ref isProcessing, 1, 0) == 0)
056. {
057. if (!queue.IsEmpty)
058. {
059. ThreadPool.QueueUserWorkItem(ProcessItemLoop);
060. }
061. else
062. {
063. Interlocked.Exchange(ref isProcessing, 0);
064. }
065. }
066. }
067.  
068. /// <summary>
069. /// 開啓隊列數據處理
070. /// </summary>
071. public void Start()
072. {
073. Thread process_Thread = new Thread(PorcessItem);
074. process_Thread.IsBackground = true;
075. process_Thread.Start();
076. }
077.  
078. /// <summary>
079. /// 循環處理數據項
080. /// </summary>
081. /// <param name="state"></param>
082. private void ProcessItemLoop(object state)
083. {
084. //表示一個線程遞歸 當處理完當前數據時 則開起線程處理隊列中下一條數據 遞歸終止條件是隊列爲空時
085. //可是可能會出現 隊列有數據可是沒有線程去處理的狀況 全部一個監視線程監視隊列中的數據是否爲空,若是爲空
086. //而且沒有線程去處理則開啓遞歸線程
087.  
088. if (!enabled && queue.IsEmpty)
089. {
090. Interlocked.Exchange(ref isProcessing, 0);
091. return;
092. }
093.  
094. //處理的線程數 是否小於當前CPU核數
095. if (Thread.VolatileRead(ref runingCore) <= core * 2*)
096. {
097. IEnumerable<T> publishFrame;
098. //出隊之後交給線程池處理
099. if (queue.TryDequeue(out publishFrame))
100. {
101. Interlocked.Increment(ref runingCore);
102. try
103. {
104. PublishHandler(publishFrame);
105.  
106. if (enabled && !queue.IsEmpty)
107. {   
108. ThreadPool.QueueUserWorkItem(ProcessItemLoop);
109. }
110. else
111. {
112. Interlocked.Exchange(ref isProcessing, 0);
113. }
114.  
115. }
116. catch (Exception ex)
117. {
118. OnProcessException(ex);
119. }
120.  
121. finally
122. {
123. Interlocked.Decrement(ref runingCore);
124. }
125. }
126. }
127.  
128. }
129.  
130. /// <summary>
131. ///定時處理幀 線程調用函數 
132. ///主要是監視入隊的時候線程 沒有來的及處理的狀況
133. /// </summary>
134. private void PorcessItem(object state)
135. {
136. int sleepCount=0;
137. int sleepTime = 1000;
138. while (enabled)
139. {
140. //若是隊列爲空則根據循環的次數肯定睡眠的時間
141. if (queue.IsEmpty)
142. {
143. if (sleepCount == 0)
144. {
145. sleepTime = 1000;
146. }
147. else if (sleepCount == 3)
148. {
149. sleepTime = 1000 * 3;
150. }
151. else if (sleepCount == 5)
152. {
153. sleepTime = 1000 * 5;
154. }
155. else if (sleepCount == 8)
156. {
157. sleepTime = 1000 * 8;
158. }
159. else if (sleepCount == 10)
160. {
161. sleepTime = 1000 * 10;
162. }
163. else
164. {
165. sleepTime = 1000 * 50;
166. }
167. sleepCount++;
168. Thread.Sleep(sleepTime);
169. }
170. else
171. {
172. //判斷是否隊列有線程正在處理
173. if (enabled && Interlocked.CompareExchange(ref isProcessing, 1, 0) == 0)
174. {
175. if (!queue.IsEmpty)
176. {
177. ThreadPool.QueueUserWorkItem(ProcessItemLoop);
178. }
179. else
180. {
181. Interlocked.Exchange(ref isProcessing, 0);
182. }
183. sleepCount = 0;
184. sleepTime = 1000;
185. }
186. }
187. }
188. }
189.  
190. /// <summary>
191. /// 中止隊列
192. /// </summary>
193. public void Stop()
194. {
195. this.enabled = false;
196.  
197. }
198.  
199. /// <summary>
200. /// 觸發異常處理事件
201. /// </summary>
202. /// <param name="ex">異常</param>
203. private void OnProcessException(Exception ex)
204. {
205. var tempException = OnException;
206. Interlocked.CompareExchange(ref tempException, null, null);
207.  
208. if (tempException != null)
209. {
210. OnException(ex);
211. }
212. }
213.  
214. #endregion
215.  
216. }

 

    3、自旋鎖(Spinlock)

      基本介紹:  在原子操做基礎上實現的鎖,用戶態的鎖,缺點是線程一直不釋放CPU時間片。操做系統進行一次線程用戶態到內核態的切換大約須要500個時鐘週期,能夠根據這個進行參考咱們的線程是進行用戶等待仍是轉到內核的等待.。異步

      使用場景:線程等待資源時間較短的狀況下使用。函數

      案例: 和最經常使用的Monitor 使用方法同樣  這裏就不舉例了,在實際場景中應該優先選擇使用Monitor,除非是線程等待資源的時間特別的短oop

 

    4、監視器(Monitor)

      基本介紹:  原子操做基礎上實現的鎖,開始處於用戶態,自旋一段時間進入內核態的等待釋放CPU時間片,缺點使用不當容易形成死鎖    c#實現的關鍵字是Lock。         性能

      使用場景:  全部須要加鎖的場景均可以使用。

      案例: 案例太多了,這裏就不列出了。

    5、讀寫鎖(ReadWriterLock)

      原理分析:   原子操做基礎上實現的鎖,

      使用場景:適用於寫的次數少,讀的頻率高的狀況。

    案例:一個線程安全的緩存實現(.net 4.0 能夠使用基礎類庫中的  ConcurrentDictionary<K,V>)  注意:老版本ReaderWriterLock已經被淘汰,新版的是ReaderWriterLockSlim

01. class CacheManager<K, V>
02. {
03. #region [成員]
04.  
05. private ReaderWriterLockSlim readerWriterLockSlim;
06.  
07. private Dictionary<K, V> containter;
08.  
09. #endregion
10.  
11. #region [構造函數]
12.  
13. public CacheManager()
14. {
15. this.readerWriterLockSlim = new ReaderWriterLockSlim();
16. this.containter = new Dictionary<K, V>();
17. }
18.  
19. #endregion
20.  
21. #region [方法]
22.  
23. public void Add(K key, V value)
24. {
25. readerWriterLockSlim.EnterWriteLock();
26.  
27. try
28. {
29. containter.Add(key, value);
30. }
31.  
32. finally
33. {
34. readerWriterLockSlim.ExitWriteLock();
35. }
36. }
37.  
38. public V Get(K key)
39. {
40.  
41. bool result = false;
42. V value;
43.  
44. do
45. {
46. readerWriterLockSlim.EnterReadLock();
47.  
48. try
49. {
50. result = containter.TryGetValue(key, out value);
51. }
52.  
53. finally
54. {
55. readerWriterLockSlim.ExitWriteLock();
56. }
57.  
58. } while (!result);
59.  
60. return value;
61. }
62.  
63. #endregion
64. }

 

      .net中還有其餘的線程同步機制:ManualResetEventSlim ,AutoResetEvent ,SemaphoreSlim 這裏就逐個進行不介紹 具體在《CLR Via C# 》中解釋的很是詳細,但在具體的實際開發中我尚未使用到。

      最好的線程同步機制是沒有同步,這取決於良好的設計,固然有些狀況下沒法避免使用鎖。 在性能要求不高的場合基本的lock就能知足要求,但性能要求比較苛刻的情就需求更具實際場景進行選擇哪一種線程同步機制。

免費培訓課:http://www.jinhusns.com/Products/Curriculum/?type=xcj

源碼分享:http://www.jinhusns.com/Products/Download/?type=xcj

相關文章
相關標籤/搜索