C#使用互斥量(Mutex)實現多進程併發操做時多進程間線程同步操做(進程同步)的簡單示例代碼及使用方法

本文主要是實現操做系統級別的多進程間線程同步(進程同步)的示例代碼及測試結果。代碼通過測試,可供參考,也可直接使用。

承接上一篇博客的業務場景[C#使用讀寫鎖三行代碼簡單解決多線程併發寫入文件時線程同步的問題]。html

隨着服務進程的增多,光憑進程內的線程同步已經不能知足如今的需求,致使多進程同時寫入同一個文件時,同樣提示文件被佔用的問題。服務器

在這種場景下,跨進程級的鎖是不可避免的。在.NET提供的參考中,進程鎖都繼承了System.Threading.WaitHandle類多線程

而在本文中針對單個文件同一時間僅容許單個進程(線程)操做的場景,System.Threading.Mutex類無疑是最簡單也是最合適的選擇併發

該類型的對象可使用命名(字符串)互斥量實現當前會話級或操做系統級的同步需求。我選擇了操做系統級別的同步編寫示例,由於覆蓋面更廣。函數

 

下面是實現代碼,註釋很詳細就不細說了:

 

namespace WaitHandleExample
{
    class Program
    {
        static void Main(string[] args)
        {
            #region 簡單使用
            //var mutexKey = MutexExample.GetFilePathMutexKey("文件路徑");
            //MutexExample.MutexExec(mutexKey, () =>
            //{
            //    Console.WriteLine("須要進程同步執行的代碼");
            //});
            #endregion

            #region 測試代碼
            var filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "test.log").ToUpper();
            var mutexKey = MutexExample.GetFilePathMutexKey(filePath);

            //同時開啓N個寫入線程
            Parallel.For(0, LogCount, e =>
            {
                //沒使用互斥鎖操做寫入,大量寫入錯誤;FileStream包含FileShare的構造函數也僅實現了進程內的線程同步,多進程同時寫入時也會出錯
                //WriteLog(filePath);

                //使用互斥鎖操做寫入,因爲同一時間僅有一個線程操做,因此不會出錯
                MutexExample.MutexExec(mutexKey, () =>
                {
                    WriteLog(filePath);
                });
            });

            Console.WriteLine(string.Format("Log Count:{0}.\t\tWrited Count:{1}.\tFailed Count:{2}.", LogCount.ToString(), WritedCount.ToString(), FailedCount.ToString()));
            Console.Read();
            #endregion
        }


        /// <summary>
        /// C#互斥量使用示例代碼
        /// </summary>
        /// <remarks>已在通過測試並上線運行,可直接使用</remarks>
        public static class MutexExample
        {
            /// <summary>
            /// 進程間同步執行的簡單例子
            /// </summary>
            /// <param name="action">同步處理代碼</param>
            /// <param name="mutexKey">操做系統級的同步鍵
            /// (若是將 name 指定爲 null 或空字符串,則建立一個局部互斥體。 
            /// 若是名稱之前綴「Global\」開頭,則 mutex 在全部終端服務器會話中均爲可見。 
            /// 若是名稱之前綴「Local\」開頭,則 mutex 僅在建立它的終端服務器會話中可見。 
            /// 若是建立已命名 mutex 時不指定前綴,則它將採用前綴「Local\」。)</param>
            /// <remarks>不重試且不考慮異常狀況處理的簡單例子</remarks>
            [Obsolete(error: false, message: "請使用MutexExec")]
            public static void MutexExecEasy(string mutexKey, Action action)
            {
                //聲明一個已命名的互斥體,實現進程間同步;該命名互斥體不存在則自動建立,已存在則直接獲取
                using (Mutex mut = new Mutex(false, mutexKey))
                {
                    try
                    {
                        //上鎖,其餘線程需等待釋放鎖以後才能執行處理;若其餘線程已經上鎖或優先上鎖,則先等待其餘線程執行完畢
                        mut.WaitOne();
                        //執行處理代碼(在調用WaitHandle.WaitOne至WaitHandle.ReleaseMutex的時間段裏,只有一個線程處理,其餘線程都得等待釋放鎖後才能執行該代碼段)
                        action();
                    }
                    finally
                    {
                        //釋放鎖,讓其餘進程(或線程)得以繼續執行
                        mut.ReleaseMutex();
                    }
                }
            }


            /// <summary>
            /// 獲取文件名對應的進程同步鍵
            /// </summary>
            /// <param name="filePath">文件路徑(請注意大小寫及空格)</param>
            /// <returns>進程同步鍵(互斥體名稱)</returns>
            public static string GetFilePathMutexKey(string filePath)
            {
                //生成文件對應的同步鍵,可自定義格式(互斥體名稱對特殊字符支持不友好,遂轉換爲BASE64格式字符串)
                var fileKey = Convert.ToBase64String(Encoding.Default.GetBytes(string.Format(@"FILE\{0}", filePath)));
                //轉換爲操做系統級的同步鍵
                var mutexKey = string.Format(@"Global\{0}", fileKey);
                return mutexKey;
            }

            /// <summary>
            /// 進程間同步執行
            /// </summary>
            /// <param name="mutexKey">操做系統級的同步鍵
            /// (若是將 name 指定爲 null 或空字符串,則建立一個局部互斥體。 
            /// 若是名稱之前綴「Global\」開頭,則 mutex 在全部終端服務器會話中均爲可見。 
            /// 若是名稱之前綴「Local\」開頭,則 mutex 僅在建立它的終端服務器會話中可見。 
            /// 若是建立已命名 mutex 時不指定前綴,則它將採用前綴「Local\」。)</param>
            /// <param name="action">同步處理操做</param>
            public static void MutexExec(string mutexKey, Action action)
            {
                MutexExec(mutexKey: mutexKey, action: action, recursive: false);
            }

            /// <summary>
            /// 進程間同步執行
            /// </summary>
            /// <param name="mutexKey">操做系統級的同步鍵
            /// (若是將 name 指定爲 null 或空字符串,則建立一個局部互斥體。 
            /// 若是名稱之前綴「Global\」開頭,則 mutex 在全部終端服務器會話中均爲可見。 
            /// 若是名稱之前綴「Local\」開頭,則 mutex 僅在建立它的終端服務器會話中可見。 
            /// 若是建立已命名 mutex 時不指定前綴,則它將採用前綴「Local\」。)</param>
            /// <param name="action">同步處理操做</param>
            /// <param name="recursive">指示當前調用是否爲遞歸處理,遞歸處理時檢測到異常則拋出異常,避免進入無限遞歸</param>
            private static void MutexExec(string mutexKey, Action action, bool recursive)
            {
                //聲明一個已命名的互斥體,實現進程間同步;該命名互斥體不存在則自動建立,已存在則直接獲取
                //initiallyOwned: false:默認當前線程並不擁有已存在互斥體的所屬權,即默認本線程並不是爲首次建立該命名互斥體的線程
                //注意:併發聲明同名的命名互斥體時,若間隔時間太短,則可能同時聲明瞭多個名稱相同的互斥體,而且同名的多個互斥體之間並不一樣步,高併發用戶請另行處理
                using (Mutex mut = new Mutex(initiallyOwned: false, name: mutexKey))
                {
                    try
                    {
                        //上鎖,其餘線程需等待釋放鎖以後才能執行處理;若其餘線程已經上鎖或優先上鎖,則先等待其餘線程執行完畢
                        mut.WaitOne();
                        //執行處理代碼(在調用WaitHandle.WaitOne至WaitHandle.ReleaseMutex的時間段裏,只有一個線程處理,其餘線程都得等待釋放鎖後才能執行該代碼段)
                        action();
                    }
                    //當其餘進程已上鎖且沒有正常釋放互斥鎖時(譬如進程突然關閉或退出),則會拋出AbandonedMutexException異常
                    catch (AbandonedMutexException ex)
                    {
                        //避免進入無限遞歸
                        if (recursive)
                            throw ex;

                        //非遞歸調用,由其餘進程拋出互斥鎖解鎖異常時,重試執行
                        MutexExec(mutexKey: mutexKey, action: action, recursive: true);
                    }
                    finally
                    {
                        //釋放鎖,讓其餘進程(或線程)得以繼續執行
                        mut.ReleaseMutex();
                    }
                }
            }
        }


        #region 測試寫文件的代碼
        static int LogCount = 500;
        static int WritedCount = 0;
        static int FailedCount = 0;
        static void WriteLog(string logFilePath)
        {
            try
            {
                var now = DateTime.Now;
                var logContent = string.Format("Tid: {0}{1} {2}.{3}\r\n", Thread.CurrentThread.ManagedThreadId.ToString().PadRight(4), now.ToLongDateString(), now.ToLongTimeString(), now.Millisecond.ToString());
                File.AppendAllText(logFilePath, logContent);
                WritedCount++;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                FailedCount++;
            }
        }
        #endregion
    }
}

 

測試不使用進程同步,多進程多線程同時寫入文件:

測試結果:6個進程同時進行3000次寫入請求,僅成功寫入277次高併發

 

測試使用互斥量進行進程同步,多進程多線程同時寫入文件:

測試結果:6個進程同時進行3000次寫入請求,所有成功寫入測試

 

補充:

進程同步的資源消耗及效率比線程同步要差得多,請根據實際場景合理使用。spa

本文雖然是用寫入文件做爲示例,但進程同步的代碼使用場景與文件操做無關。操作系統

Semaphore類(信號燈)雖然能夠限制同時操做的線程數,甚至把最大同時操做數設置爲1時,行爲與Mutex類(互斥量)相似;可是因爲信號燈在其餘進程中出現異常退出時並不能接收到異常通知,只能經過等待超時觸發異常,並不適合如今的場景,因此並沒講述。線程

關於進程同步的其餘深刻了解及應用,請參閱其餘資料

相關文章
相關標籤/搜索