現實業務中常常遇到須要隊列處理的問題。數據庫
問題場景:服務器
客戶端記錄設備運行數據,傳輸給服務器。在傳輸中可能存在延遲或中斷狀況。
當中斷時,系統傳輸數據可能由於沒法傳輸或電腦重啓,會致使服務器數據記錄不連續。併發
設想的解決方案:異步
當客戶端採集到設備數據後,將數據先插入處理隊列。
另外一個服務程序從隊列中取出數據發送到服務器。
數據記錄與數據上傳作到徹底獨立,而且是異步操做。
數據記錄在硬盤上,保障了數據的連續性和可靠性。
若是一直沒法上傳到服務器,會致使隊列中等待處理的數據積攢過多。函數
現有解決方案:性能
使用數據庫作隊列(可能遇到的問題:客戶端性能有限,數據庫可靠性差,系統斷電重啓,可能致使數據庫損壞等)
使用專業的消息隊列(學習成本高,部署複雜,功能複雜,性能高)
本身實現簡單的隊列服務(基於文件系統實現,無需部署,簡單可靠,性能基本知足需求)學習
基於文件的隊列處理設計this
主要用的C#函數:File.AppendAllText、StreamWriter.WriteLine、File.ReadAllText、File.WriteAllText、File.Deletespa
好處:設計
基於系統文件處理函數
隊列大小依賴於磁盤大小
隊列內容爲一行文本,無格式要求
單文件隊列
原理:
將業務數據追加到待處理文件中。
處理程序從文件頭部讀取業務數據,處理完成後,將頭部處理過的業務數據刪除。
遇到的問題:
當業務處理程序一直失敗時,隊列文件會隨着時間的增加愈來愈大。致使文件讀寫緩慢。
由於業務數據寫入與業務數據讀取操做的是同一個文件,在大量併發時,存在文件鎖定問題。
由於待處理文件過大,磁盤操做數據量過大,致使磁盤性能不足。
多文件隊列
處理示意圖
原理:
業務數據寫入時,按日期生成文件寫入。一天一個文件,避免單文件過大問題。
業務程序,處理時,將最先的待處理隊列文件更名爲正在處理文件。避免業務寫入和業務處理同時操做一個文件問題。
業務程序處理完成後,記錄已經處理到的文件行數,避免屢次重寫隊列文件。
使用方法:
LsLib.FileQueue fileQueue = new LsLib.FileQueue(); // 插入待處理隊列 fileQueue.Push(new string[] { "業務數據內容,字符串(中間不能有換行)" }); // 清空隊列 fileQueue.Clear(); // 隊列待處理數量 fileQueue.Count(); // 獲取待處理數據 fileQueue.GetList(10); // 設置最先的數據處理完成 fileQueue.Pop(10);
源代碼:
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Windows.Forms; namespace LsLib { /// <summary> /// 文件堆棧 /// </summary> class FileQueue { private string QueueName = "file"; private string FileQueuePre = ""; private string FileQueueRunPath = ""; private string FileQueueRunIndexPath = ""; private string FileQueueCountPath = ""; public FileQueue(string queueName = "file") { this.QueueName = queueName; this.FileQueuePre = Application.StartupPath + "\\queue\\" + queueName; this.FileQueueRunPath = Application.StartupPath + "\\queue\\" + queueName + "_run.dat"; this.FileQueueRunIndexPath = Application.StartupPath + "\\queue\\" + queueName + "_run_index.dat"; this.FileQueueCountPath = Application.StartupPath + "\\queue\\" + queueName + "_count.dat"; } /// <summary> /// 插入堆棧 /// </summary> /// <param name="str"></param> /// <returns></returns> public bool Push(string[] strList) { int tryIndex = 0; string filePath = this.FileQueuePre + "_list" + DateTime.Now.ToString("_yyyyMMdd") + ".dat"; while (tryIndex < 5) { try { using (StreamWriter sw = new StreamWriter(filePath, true)) { foreach (var str in strList) { sw.WriteLine(str); } } SetCount(strList.Length); return true; } catch (Exception ex) { tryIndex++; Thread.Sleep(100); } } return false; } // 設置隊列待處理數量 private int SetCount(int i) { int count = 0; if (File.Exists(this.FileQueueCountPath)) { count = int.Parse(File.ReadAllText(this.FileQueueCountPath)); } count += i; File.WriteAllText(this.FileQueueCountPath, count.ToString()); return count; } /// <summary> /// 清空堆棧 /// </summary> /// <returns></returns> public bool Clear() { string[] fileList = Directory.GetFiles(Application.StartupPath + "\\queue\\", QueueName + "_*.dat"); foreach (var file in fileList) { File.Delete(file); } return true; } /// <summary> /// 堆棧待處理數量 /// </summary> /// <returns></returns> public int Count() { int count = 0; if (File.Exists(this.FileQueueCountPath)) { count = int.Parse(File.ReadAllText(this.FileQueueCountPath)); } return count; } /// <summary> /// 獲取待處理列表 /// </summary> /// <param name="count"></param> /// <returns></returns> public List<string> GetList(int count = 1) { List<string> list = new List<string>(); bool isFirst = false; if (!File.Exists(this.FileQueueRunPath)) { string[] fileList = Directory.GetFiles(Application.StartupPath + "\\queue\\", QueueName + "_list_*.dat"); if (fileList.Length == 0) { return list; } Array.Sort(fileList); File.Move(fileList[0], this.FileQueueRunPath); isFirst = true; } int startIndex = 0; int totalCount = 0; if (File.Exists(this.FileQueueRunIndexPath)) { string strIndex = File.ReadAllText(this.FileQueueRunIndexPath); string[] arrIndex = strIndex.Split(','); startIndex = int.Parse(arrIndex[0]); totalCount = int.Parse(arrIndex[1]); } int index = 0; using (StreamReader sm = File.OpenText(this.FileQueueRunPath)) { while (true) { string str = sm.ReadLine(); if (str == null) { break; } str = str.Trim(); if (str == "") { continue; } totalCount++; if (index < startIndex) { index++; continue; } if (list.Count < count) { list.Add(str); } if (list.Count >= count && !isFirst) { break; } } } if (isFirst) { File.WriteAllText(this.FileQueueRunIndexPath, "0," + totalCount); } return list; } /// <summary> /// 出棧 /// </summary> /// <param name="count"></param> /// <returns></returns> public bool Pop(int count = 1) { if (!File.Exists(this.FileQueueRunIndexPath)) { return false; } string strIndex = File.ReadAllText(this.FileQueueRunIndexPath); string[] arrIndex = strIndex.Split(','); int startIndex = int.Parse(arrIndex[0]) + count; int totalCount = int.Parse(arrIndex[1]); SetCount(-1 * count); if (startIndex >= totalCount) { File.Delete(this.FileQueueRunIndexPath); File.Delete(this.FileQueueRunPath); } else { File.WriteAllText(this.FileQueueRunIndexPath, startIndex + "," + totalCount); } return true; } } }