使用Ring Buffer構建高性能的文件寫入程序


最近常收到SOD框架的朋友報告的SOD的SQL日誌功能報錯:文件句柄丟失。通過分析得知,這些朋友使用SOD框架開發了訪問量比較大的系統,因爲忘記關閉SQL日誌功能因此出現了很高頻率的日誌寫入操做,從而偶然引發錯誤。後來我建議只記錄出錯的或者執行時間較長的SQL信息,暫時解決了此問題。可是做爲一個熱心造輪子的人,必定要看看能不能造一個更好的輪子出來。html

前面說的錯誤緣由已經很直白了,就是頻繁的日誌寫入致使的,那麼解決方案就是將屢次寫入操做合併成一次寫入操做,而且採用異步寫入方式。要保存屢次操做的內容就要有一個相似「隊列」的東西來保存,而通常的線程安全的隊列,都是「有鎖隊列」,在性能要求很高的系統中,不但願在日誌記錄這個地方耗費多一點計算資源,因此最好有一個「無鎖隊列」,所以最佳方案就是Ring Buffer(環形緩衝區)了。sql

 什麼是Ring Buffer?顧名思義,就是一個內存環,每一次讀寫操做都循環利用這個內存環,從而避免頻繁分配和回收內存,減輕GC壓力,同時因爲Ring Buffer能夠實現爲無鎖的隊列,從而總體上大幅提升系統性能。Ring Buffer的示意圖以下,有關具體原理,請參考此文《Ring Buffer 有什麼特別? 》。數組

 Ring buffer

上文並無詳細說明如何具體讀寫Ring Buffer,可是原理介紹已經足夠咱們怎麼寫一個Ring Buffer程序了,接下來看看我在 .NET上的實現。安全

首先,定一個存放數據的數組,記住必定要用數組,它是實現Ring Buffer的關鍵而且CPU友好。數據結構

const int C_BUFFER_SIZE = 10;//寫入次數緩衝區大小,每次的實際內容大小不固定
string[] RingBuffer = new string[C_BUFFER_SIZE];
int writedTimes = 0;


變量writedTimes 記錄寫入次數,它會一直遞增,不過爲了線程安全的遞增且不使用託管鎖,須要使用原子鎖Interlocked。以後,根據每次 writedTimes 跟環形緩衝區的大小求餘數,獲得當前要寫入的數組位置:多線程

 void SaveFile(string fileName, string text)
 {
            int currP= Interlocked.Increment(ref writedTimes);
            int writeP= currP % C_BUFFER_SIZE ;
            int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;
            RingBuffer[index] = " Arr[" + index + "]:" + text;
  }


Ring Buffer的核心代碼就這麼點,調用此方法,會一直往緩衝區寫入數據而不會「溢出」,因此寫入Ring Buffer效率很高。併發


一個隊列若是隻生產不消費確定不行的,那麼如何及時消費Ring Buffer的數據呢?簡單的方案就是當Ring Buffer「寫滿」的時候一次性將數據「消費」掉。注意這裏的「寫滿」僅僅是指寫入位置 index達到了數組最大索引位置,而「消費」也不一樣於常見的堆棧,隊列等數據結構,只是讀取緩衝區的數據而不會移除它。框架

因此前面的代碼只須要稍加改造:異步

 void SaveFile(string fileName, string text)
 {
            int currP= Interlocked.Increment(ref writedTimes);
            int writeP= currP % C_BUFFER_SIZE ;
            int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;
            RingBuffer[index] = " Arr[" + index + "]:" + text;
            if (writeP == 0 )
            {
                 string result = string.Concat( RingBuffer);
                 FlushFile(fileName, result);
            }
  }


writeP == 0 表示當前一輪的緩衝區已經寫滿,而後調用函數 FlushFile 將Ring Buffer的數據鏈接起來,總體寫入文件。async

        void FlushFile(string fileName, string text)
        {
            using (FileStream fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.Write, 2048, FileOptions.Asynchronous))
            {
                byte[] buffer = System.Text.Encoding.UTF8.GetBytes(text);
                IAsyncResult writeResult = fs.BeginWrite(buffer, 0, buffer.Length,
                    (asyncResult) =>
                    {
                        fs.EndWrite(asyncResult);
                       
                    },
                    fs);
                //fs.EndWrite(writeResult);//這種方法異步起不到效果
                fs.Flush();
                
            }
        }

在函數 FlushFile 中咱們使用了異步寫入文件的技術,注意 FileOptions.Asynchronous ,使用它才能夠真正利用Windows的完成端口IOCP,將文件異步寫入。

固然這段代碼也可使用.NET最新版本支持的 async/await ,不過我要讓SOD框架繼續支持.NET 2.0,因此只好這樣寫了。

如今,咱們能夠開多線程來測試這個循環隊列效果怎麼樣:

            Task[] arrTask = new Task[20];
            for (int i = 0; i < arrTask.Length; i++)
            {
                arrTask[i] = new Task(obj => SaveFile( (int)obj) ,i);
            }
            for (int i = 0; i < arrTask.Length; i++)
            {
                arrTask[i].Start();
            }
            
            Task.WaitAll(arrTask);
            MessageBox.Show(arrTask.Length +" Task All OK.");

這裏開啓20個Task任務線程來寫入文件,運行此程序,發現20個線程才寫入了10條數據,分析好久才發現,文件異步IO太快的話,會有緩衝區丟失,第一次寫入的10條數據沒法寫入文件,多運行幾回就沒有問題了。因此仍是得想法解決此問題。

一般狀況下咱們都是使用託管鎖來解決這種併發問題,但本文的目的就是要實現一個「無鎖環形緩衝區」,不能在此「功虧一簣」,因此此時「信號量」上場了。

同步能夠分爲鎖定和信號同步,信號同步機制中涉及的類型都繼承自抽象類WaitHandle,這些類型有EventWaitHandle(類型化爲AutoResetEvent、ManualResetEvent)、Semaphore以及Mutex。見下圖:

首先聲明一個 ManualResetEvent對象:

ManualResetEvent ChangeEvent = new ManualResetEvent(true);

這裏咱們將 ManualResetEvent 對象設置成 「終止狀態」,意味着程序一開始是容許全部線程不等待的,當咱們須要消費Ring Buffer的時候再將  ManualResetEvent 設置成「非終止狀態」,阻塞其它線程。簡單說就是當要寫文件的時候將環形緩衝區阻塞,直到文件寫完才容許繼續寫入環形緩衝區。

對應的新的代碼調整以下:

 void SaveFile(string fileName, string text)
 {
            ChangeEvent.WaitOne();
            int currP= Interlocked.Increment(ref writedTimes);
            int writeP= currP % C_BUFFER_SIZE ;
            int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;
            RingBuffer[index] = " Arr[" + index + "]:" + text;
            if (writeP == 0 )
            {
                 ChangeEvent.Reset();
                 string result = string.Concat( RingBuffer);
                 FlushFile(fileName, result);
            }
  }

而後,再FlushFile 方法的 回掉方法中,加入設置終止狀態的代碼,部分代碼以下:

(asyncResult) =>
                    {
                        fs.EndWrite(asyncResult);
                        ChangeEvent.Set();
                     }

OK,如今咱們的程序具有高性能的安全的寫入日誌文件的功能了,咱們來看看演示程序測試的日誌結果實例:

 Arr[0]:Thread index:0--FFFFFFF
 Arr[1]:Thread index:1--FFFFFFF
 Arr[2]:Thread index:8--FFFFFFF
 Arr[3]:Thread index:9--FFFFFFF
 Arr[4]:Thread index:3--FFFFFFF
 Arr[5]:Thread index:2--FFFFFFF
 Arr[6]:Thread index:4--FFFFFFF
 Arr[7]:Thread index:10--FFFFFFF
 Arr[8]:Thread index:5--FFFFFFF
 Arr[9]:Thread index:6--FFFFFFF
 Arr[0]:Thread index:7--FFFFFFF
 Arr[1]:Thread index:11--FFFFFFF
 Arr[2]:Thread index:12--FFFFFFF
 Arr[3]:Thread index:13--FFFFFFF
 Arr[4]:Thread index:14--FFFFFFF
 Arr[5]:Thread index:15--FFFFFFF
 Arr[6]:Thread index:16--FFFFFFF
 Arr[7]:Thread index:17--FFFFFFF
 Arr[8]:Thread index:18--FFFFFFF
 Arr[9]:Thread index:19--FFFFFFF

測試結果符合預期!
到此,咱們今天的主題就所有介紹完成了,不過要讓本文的代碼可以符合實際的運行,還要解決每次只寫入少許數據而且將它按期寫入日誌文件的問題,這裏貼出真正的局部代碼:

 

PS:有朋友說採用信號量並不能徹底保證程序安全,查閱了MSDN也說若是信號量狀態改變尚未來得及應用,那麼是起不到做用的,因此還須要檢查業務狀態標記,也就是在設置非終止狀態後,立刻設置一個操做標記,在其它線程中,須要檢查此標記,以免「漏網之魚」引發不指望的結果。

再具體實現上,咱們能夠實現一個「自旋鎖」,循環檢查此狀態標記,爲了防止發生死鎖,還須要有鎖超時機制,代碼以下:

 void SaveFile(string fileName, string text)
        {
            ChangeEvent.WaitOne(10000);
            int currP= Interlocked.Increment(ref WritedTimes);
            int writeP= currP % C_BUFFER_SIZE ;
            int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;
           
            if (writeP == 0 )
            {
                ChangeEvent.Reset();
                IsReading = true;
                RingBuffer[index] = " Arr[" + index + "]:" + text;

                LastWriteTime = DateTime.Now;
                WritingIndex = 0;
                SaveFile(fileName,RingBuffer);
            }
            else if (DateTime.Now.Subtract(LastWriteTime).TotalSeconds > C_WRITE_TIMESPAN)
            {
                ChangeEvent.Reset();
                IsReading = true;
                RingBuffer[index] = " Arr[" + index + "]:" + text;

                int length = index - WritingIndex + 1;
                if (length <= 0)
                    length = 1;
                string[] newArr = new string[length];
                Array.Copy(RingBuffer, WritingIndex, newArr, 0, length);

                LastWriteTime = DateTime.Now;
                WritingIndex = index + 1;
                SaveFile(fileName, newArr);
            }
            else
            {
                //防止漏網之魚的線程在信號量產生做用以前修改數據
                //採用「自旋鎖」等待
                int count = 0;
                while (IsReading)
                {
                    if (count++ > 10000000)
                    {
                        Thread.Sleep(50);
                        break;
                    }
                }
                RingBuffer[index] = " Arr[" + index + "]:" + text;
            }
        }

 


完整的Ring Buffer代碼會在最新版本的SOD框架源碼中,有關本篇文章測試程序的完整源碼,請加QQ羣討論獲取,

羣號碼:SOD框架高級羣 18215717 ,加羣請註明 PDF.NET技術交流 ,不然可能被拒絕。

相關文章
相關標籤/搜索