c# 基於文件系統實現的隊列處理類

現實業務中常常遇到須要隊列處理的問題。數據庫

問題場景:服務器

客戶端記錄設備運行數據,傳輸給服務器。在傳輸中可能存在延遲或中斷狀況。
當中斷時,系統傳輸數據可能由於沒法傳輸或電腦重啓,會致使服務器數據記錄不連續。併發

設想的解決方案:異步

當客戶端採集到設備數據後,將數據先插入處理隊列。
另外一個服務程序從隊列中取出數據發送到服務器。
數據記錄與數據上傳作到徹底獨立,而且是異步操做。
數據記錄在硬盤上,保障了數據的連續性和可靠性。
若是一直沒法上傳到服務器,會致使隊列中等待處理的數據積攢過多。函數

現有解決方案:性能

使用數據庫作隊列(可能遇到的問題:客戶端性能有限,數據庫可靠性差,系統斷電重啓,可能致使數據庫損壞等)
使用專業的消息隊列(學習成本高,部署複雜,功能複雜,性能高)
本身實現簡單的隊列服務(基於文件系統實現,無需部署,簡單可靠,性能基本知足需求)學習

基於文件的隊列處理設計this

主要用的C#函數:File.AppendAllText、StreamWriter.WriteLine、File.ReadAllText、File.WriteAllText、File.Deletespa

好處:設計

基於系統文件處理函數
隊列大小依賴於磁盤大小
隊列內容爲一行文本,無格式要求

單文件隊列

原理:

將業務數據追加到待處理文件中。
處理程序從文件頭部讀取業務數據,處理完成後,將頭部處理過的業務數據刪除。

遇到的問題:

當業務處理程序一直失敗時,隊列文件會隨着時間的增加愈來愈大。致使文件讀寫緩慢。
由於業務數據寫入與業務數據讀取操做的是同一個文件,在大量併發時,存在文件鎖定問題。
由於待處理文件過大,磁盤操做數據量過大,致使磁盤性能不足。

多文件隊列

處理示意圖

原理:

業務數據寫入時,按日期生成文件寫入。一天一個文件,避免單文件過大問題。
業務程序,處理時,將最先的待處理隊列文件更名爲正在處理文件。避免業務寫入和業務處理同時操做一個文件問題。
業務程序處理完成後,記錄已經處理到的文件行數,避免屢次重寫隊列文件。

使用方法:

LsLib.FileQueue fileQueue = new LsLib.FileQueue();

// 插入待處理隊列
fileQueue.Push(new string[] { "業務數據內容,字符串(中間不能有換行)" });

// 清空隊列
fileQueue.Clear();

// 隊列待處理數量
fileQueue.Count();

// 獲取待處理數據
fileQueue.GetList(10);

// 設置最先的數據處理完成
fileQueue.Pop(10);

源代碼:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Windows.Forms;

namespace LsLib
{
    /// <summary>
    /// 文件堆棧
    /// </summary>
    class FileQueue
    {
        private string QueueName = "file";
        private string FileQueuePre = "";
        private string FileQueueRunPath = "";
        private string FileQueueRunIndexPath = "";
        private string FileQueueCountPath = "";

        public FileQueue(string queueName = "file")
        {
            this.QueueName = queueName;
            this.FileQueuePre = Application.StartupPath + "\\queue\\" + queueName;
            this.FileQueueRunPath = Application.StartupPath + "\\queue\\" + queueName + "_run.dat";
            this.FileQueueRunIndexPath = Application.StartupPath + "\\queue\\" + queueName + "_run_index.dat";
            this.FileQueueCountPath = Application.StartupPath + "\\queue\\" + queueName + "_count.dat";
        }

        /// <summary>
        /// 插入堆棧
        /// </summary>
        /// <param name="str"></param>
        /// <returns></returns>
        public bool Push(string[] strList)
        {
            int tryIndex = 0;
            string filePath = this.FileQueuePre + "_list" + DateTime.Now.ToString("_yyyyMMdd") + ".dat";

            while (tryIndex < 5)
            {
                try
                {
                    using (StreamWriter sw = new StreamWriter(filePath, true))
                    {
                        foreach (var str in strList)
                        {
                            sw.WriteLine(str);
                        }
                    }

                    SetCount(strList.Length);

                    return true;
                }
                catch (Exception ex)
                {
                    tryIndex++;
                    Thread.Sleep(100);
                }
            }

            return false;
        }

        // 設置隊列待處理數量
        private int SetCount(int i)
        {
            int count = 0;
            if (File.Exists(this.FileQueueCountPath))
            {
                count = int.Parse(File.ReadAllText(this.FileQueueCountPath));
            }

            count += i;

            File.WriteAllText(this.FileQueueCountPath, count.ToString());

            return count;
        }

        /// <summary>
        /// 清空堆棧
        /// </summary>
        /// <returns></returns>
        public bool Clear()
        {
            string[] fileList = Directory.GetFiles(Application.StartupPath + "\\queue\\", QueueName + "_*.dat");

            foreach (var file in fileList)
            {
                File.Delete(file);
            }

            return true;
        }

        /// <summary>
        /// 堆棧待處理數量
        /// </summary>
        /// <returns></returns>
        public int Count()
        {
            int count = 0;
            if (File.Exists(this.FileQueueCountPath))
            {
                count = int.Parse(File.ReadAllText(this.FileQueueCountPath));
            }

            return count;
        }

        /// <summary>
        /// 獲取待處理列表
        /// </summary>
        /// <param name="count"></param>
        /// <returns></returns>
        public List<string> GetList(int count = 1)
        {
            List<string> list = new List<string>();

            bool isFirst = false;
            if (!File.Exists(this.FileQueueRunPath))
            {
                string[] fileList = Directory.GetFiles(Application.StartupPath + "\\queue\\", QueueName + "_list_*.dat");
                if (fileList.Length == 0)
                {
                    return list;
                }

                Array.Sort(fileList);

                File.Move(fileList[0], this.FileQueueRunPath);
                isFirst = true;
            }

            int startIndex = 0;
            int totalCount = 0;

            if (File.Exists(this.FileQueueRunIndexPath))
            {
                string strIndex = File.ReadAllText(this.FileQueueRunIndexPath);
                string[] arrIndex = strIndex.Split(',');

                startIndex = int.Parse(arrIndex[0]);
                totalCount = int.Parse(arrIndex[1]);
            }

            int index = 0;
            using (StreamReader sm = File.OpenText(this.FileQueueRunPath))
            {
                while (true)
                {
                    string str = sm.ReadLine();
                    if (str == null)
                    {
                        break;
                    }
                    str = str.Trim();

                    if (str == "")
                    {
                        continue;
                    }

                    totalCount++;
                    if (index < startIndex)
                    {
                        index++;
                        continue;
                    }

                    if (list.Count < count)
                    {
                        list.Add(str);
                    }

                    if (list.Count >= count && !isFirst)
                    {
                        break;
                    }
                }
            }

            if (isFirst)
            {
                File.WriteAllText(this.FileQueueRunIndexPath, "0," + totalCount);
            }

            return list;
        }

        /// <summary>
        /// 出棧
        /// </summary>
        /// <param name="count"></param>
        /// <returns></returns>
        public bool Pop(int count = 1)
        {
            if (!File.Exists(this.FileQueueRunIndexPath))
            {
                return false;
            }

            string strIndex = File.ReadAllText(this.FileQueueRunIndexPath);
            string[] arrIndex = strIndex.Split(',');

            int startIndex = int.Parse(arrIndex[0]) + count;
            int totalCount = int.Parse(arrIndex[1]);

            SetCount(-1 * count);

            if (startIndex >= totalCount)
            {
                File.Delete(this.FileQueueRunIndexPath);
                File.Delete(this.FileQueueRunPath);
            }
            else
            {
                File.WriteAllText(this.FileQueueRunIndexPath, startIndex + "," + totalCount);
            }

            return true;
        }
    }
}
相關文章
相關標籤/搜索