最近常收到SOD框架的朋友報告的SOD的SQL日誌功能報錯:文件句柄丟失。通過分析得知,這些朋友使用SOD框架開發了訪問量比較大的系統,因爲忘記關閉SQL日誌功能因此出現了很高頻率的日誌寫入操做,從而偶然引發錯誤。後來我建議只記錄出錯的或者執行時間較長的SQL信息,暫時解決了此問題。可是做爲一個熱心造輪子的人,必定要看看能不能造一個更好的輪子出來。html
前面說的錯誤緣由已經很直白了,就是頻繁的日誌寫入致使的,那麼解決方案就是將屢次寫入操做合併成一次寫入操做,而且採用異步寫入方式。要保存屢次操做的內容就要有一個相似「隊列」的東西來保存,而通常的線程安全的隊列,都是「有鎖隊列」,在性能要求很高的系統中,不但願在日誌記錄這個地方耗費多一點計算資源,因此最好有一個「無鎖隊列」,所以最佳方案就是Ring Buffer(環形緩衝區)了。sql
什麼是Ring Buffer?顧名思義,就是一個內存環,每一次讀寫操做都循環利用這個內存環,從而避免頻繁分配和回收內存,減輕GC壓力,同時因爲Ring Buffer能夠實現爲無鎖的隊列,從而總體上大幅提升系統性能。Ring Buffer的示意圖以下,有關具體原理,請參考此文《Ring Buffer 有什麼特別? 》。數組
上文並無詳細說明如何具體讀寫Ring Buffer,可是原理介紹已經足夠咱們怎麼寫一個Ring Buffer程序了,接下來看看我在 .NET上的實現。安全
首先,定一個存放數據的數組,記住必定要用數組,它是實現Ring Buffer的關鍵而且CPU友好。數據結構
const int C_BUFFER_SIZE = 10;//寫入次數緩衝區大小,每次的實際內容大小不固定 string[] RingBuffer = new string[C_BUFFER_SIZE];
int writedTimes = 0;
變量writedTimes 記錄寫入次數,它會一直遞增,不過爲了線程安全的遞增且不使用託管鎖,須要使用原子鎖Interlocked。以後,根據每次 writedTimes 跟環形緩衝區的大小求餘數,獲得當前要寫入的數組位置:多線程
void SaveFile(string fileName, string text) { int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; }
Ring Buffer的核心代碼就這麼點,調用此方法,會一直往緩衝區寫入數據而不會「溢出」,因此寫入Ring Buffer效率很高。併發
一個隊列若是隻生產不消費確定不行的,那麼如何及時消費Ring Buffer的數據呢?簡單的方案就是當Ring Buffer「寫滿」的時候一次性將數據「消費」掉。注意這裏的「寫滿」僅僅是指寫入位置 index達到了數組最大索引位置,而「消費」也不一樣於常見的堆棧,隊列等數據結構,只是讀取緩衝區的數據而不會移除它。框架
因此前面的代碼只須要稍加改造:異步
void SaveFile(string fileName, string text) { int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; if (writeP == 0 ) { string result = string.Concat( RingBuffer); FlushFile(fileName, result); } }
writeP == 0 表示當前一輪的緩衝區已經寫滿,而後調用函數 FlushFile 將Ring Buffer的數據鏈接起來,總體寫入文件。async
void FlushFile(string fileName, string text) { using (FileStream fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.Write, 2048, FileOptions.Asynchronous)) { byte[] buffer = System.Text.Encoding.UTF8.GetBytes(text); IAsyncResult writeResult = fs.BeginWrite(buffer, 0, buffer.Length, (asyncResult) => { fs.EndWrite(asyncResult); }, fs); //fs.EndWrite(writeResult);//這種方法異步起不到效果 fs.Flush(); } }
在函數 FlushFile 中咱們使用了異步寫入文件的技術,注意 FileOptions.Asynchronous ,使用它才能夠真正利用Windows的完成端口IOCP,將文件異步寫入。
固然這段代碼也可使用.NET最新版本支持的 async/await ,不過我要讓SOD框架繼續支持.NET 2.0,因此只好這樣寫了。
如今,咱們能夠開多線程來測試這個循環隊列效果怎麼樣:
Task[] arrTask = new Task[20]; for (int i = 0; i < arrTask.Length; i++) { arrTask[i] = new Task(obj => SaveFile( (int)obj) ,i); } for (int i = 0; i < arrTask.Length; i++) { arrTask[i].Start(); } Task.WaitAll(arrTask); MessageBox.Show(arrTask.Length +" Task All OK.");
這裏開啓20個Task任務線程來寫入文件,運行此程序,發現20個線程才寫入了10條數據,分析好久才發現,文件異步IO太快的話,會有緩衝區丟失,第一次寫入的10條數據沒法寫入文件,多運行幾回就沒有問題了。因此仍是得想法解決此問題。
一般狀況下咱們都是使用託管鎖來解決這種併發問題,但本文的目的就是要實現一個「無鎖環形緩衝區」,不能在此「功虧一簣」,因此此時「信號量」上場了。
同步能夠分爲鎖定和信號同步,信號同步機制中涉及的類型都繼承自抽象類WaitHandle,這些類型有EventWaitHandle(類型化爲AutoResetEvent、ManualResetEvent)、Semaphore以及Mutex。見下圖:
首先聲明一個 ManualResetEvent對象:
ManualResetEvent ChangeEvent = new ManualResetEvent(true);
這裏咱們將 ManualResetEvent 對象設置成 「終止狀態」,意味着程序一開始是容許全部線程不等待的,當咱們須要消費Ring Buffer的時候再將 ManualResetEvent 設置成「非終止狀態」,阻塞其它線程。簡單說就是當要寫文件的時候將環形緩衝區阻塞,直到文件寫完才容許繼續寫入環形緩衝區。
對應的新的代碼調整以下:
void SaveFile(string fileName, string text) { ChangeEvent.WaitOne(); int currP= Interlocked.Increment(ref writedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; RingBuffer[index] = " Arr[" + index + "]:" + text; if (writeP == 0 ) { ChangeEvent.Reset(); string result = string.Concat( RingBuffer); FlushFile(fileName, result); } }
而後,再FlushFile 方法的 回掉方法中,加入設置終止狀態的代碼,部分代碼以下:
(asyncResult) =>
{
fs.EndWrite(asyncResult);
ChangeEvent.Set();
}
OK,如今咱們的程序具有高性能的安全的寫入日誌文件的功能了,咱們來看看演示程序測試的日誌結果實例:
Arr[0]:Thread index:0--FFFFFFF Arr[1]:Thread index:1--FFFFFFF Arr[2]:Thread index:8--FFFFFFF Arr[3]:Thread index:9--FFFFFFF Arr[4]:Thread index:3--FFFFFFF Arr[5]:Thread index:2--FFFFFFF Arr[6]:Thread index:4--FFFFFFF Arr[7]:Thread index:10--FFFFFFF Arr[8]:Thread index:5--FFFFFFF Arr[9]:Thread index:6--FFFFFFF Arr[0]:Thread index:7--FFFFFFF Arr[1]:Thread index:11--FFFFFFF Arr[2]:Thread index:12--FFFFFFF Arr[3]:Thread index:13--FFFFFFF Arr[4]:Thread index:14--FFFFFFF Arr[5]:Thread index:15--FFFFFFF Arr[6]:Thread index:16--FFFFFFF Arr[7]:Thread index:17--FFFFFFF Arr[8]:Thread index:18--FFFFFFF Arr[9]:Thread index:19--FFFFFFF
測試結果符合預期!
到此,咱們今天的主題就所有介紹完成了,不過要讓本文的代碼可以符合實際的運行,還要解決每次只寫入少許數據而且將它按期寫入日誌文件的問題,這裏貼出真正的局部代碼:
PS:有朋友說採用信號量並不能徹底保證程序安全,查閱了MSDN也說若是信號量狀態改變尚未來得及應用,那麼是起不到做用的,因此還須要檢查業務狀態標記,也就是在設置非終止狀態後,立刻設置一個操做標記,在其它線程中,須要檢查此標記,以免「漏網之魚」引發不指望的結果。
再具體實現上,咱們能夠實現一個「自旋鎖」,循環檢查此狀態標記,爲了防止發生死鎖,還須要有鎖超時機制,代碼以下:
void SaveFile(string fileName, string text) { ChangeEvent.WaitOne(10000); int currP= Interlocked.Increment(ref WritedTimes); int writeP= currP % C_BUFFER_SIZE ; int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1; if (writeP == 0 ) { ChangeEvent.Reset(); IsReading = true; RingBuffer[index] = " Arr[" + index + "]:" + text; LastWriteTime = DateTime.Now; WritingIndex = 0; SaveFile(fileName,RingBuffer); } else if (DateTime.Now.Subtract(LastWriteTime).TotalSeconds > C_WRITE_TIMESPAN) { ChangeEvent.Reset(); IsReading = true; RingBuffer[index] = " Arr[" + index + "]:" + text; int length = index - WritingIndex + 1; if (length <= 0) length = 1; string[] newArr = new string[length]; Array.Copy(RingBuffer, WritingIndex, newArr, 0, length); LastWriteTime = DateTime.Now; WritingIndex = index + 1; SaveFile(fileName, newArr); } else { //防止漏網之魚的線程在信號量產生做用以前修改數據 //採用「自旋鎖」等待 int count = 0; while (IsReading) { if (count++ > 10000000) { Thread.Sleep(50); break; } } RingBuffer[index] = " Arr[" + index + "]:" + text; } }
完整的Ring Buffer代碼會在最新版本的SOD框架源碼中,有關本篇文章測試程序的完整源碼,請加QQ羣討論獲取,
羣號碼:SOD框架高級羣 18215717 ,加羣請註明 PDF.NET技術交流 ,不然可能被拒絕。