Unity實踐—多線程任務隊列實現

Unity 已可以使用 Thread、Task 等處理多線程任務,但缺乏成熟的多線程任務隊列工具,因此在此實現一個,代碼已上傳 Git 項目 GRUnityTools,可直接下載源碼或經過 UPM 使用git

本文原地址:Unity實踐—多線程任務隊列實現github

實現目標

  1. 串行與併發隊列c#

    隊列是首要實現目標,且須要串行與併發兩種隊列,以覆蓋不一樣需求api

  2. 同步與異步執行網絡

    因任務隊列過多可能阻塞主線程,因此除同步執行外還須要多線程異步操做多線程

  3. 主線程同步併發

    由於有多線程,但 Unity 部分操做只能在主線程執行,因此還須要線程同步到主線程異步

實現方式

  1. Taskasync

    Task 爲當前 .Net 提供的實用性最高的多線程接口,可實現任務的監控與操縱ide

  2. TaskScheduler

    Task 專用調度器,可更便捷地實現 Task 隊列調度

  3. Loom

    Loom 爲網絡上廣爲流傳的 Unity 中調用主線程的工具類,目前找不到源碼最原始地址,代碼拷貝自知乎

實現過程

方案選擇

最初即決定使用 Task 做爲隊列基本單位,但徹底沒有考慮 TaskScheduler。原計劃手動實現一個調度器,負責保存傳入的 Task 放入隊列,可設置同步異步,根據設置實現對隊列的不一樣操做。後來再研究微軟官方文檔時發如今其 Task 文檔的示例中有一個 LimitedConcurrencyLevelTaskScheduler 的演示代碼,直接經過 TaskScheduler 實現了可控併發數量的調度器,且當設置併發數爲1時隊列中的任務會逐一按順序執行即產生了串行隊列效果

TaskScheduler 有兩種使用方式

方式一:爲 TaskFactory 配置 TaskScheduler,經過 TaksFactory 使用配置的調度器啓動 Task

//建立併發數32的調度器
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(32); 
//方式1 
TaskFactory factory = new TaskFactory(scheduler);
factory.StartNew(()=>{
  //執行任務
});
複製代碼

方式二:直接使用 Task.Start(TaskFactory) 方法

//建立併發數1的調度器(此時即爲串行隊列效果)
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
//聲明一個 Task 對象
Task task = new Task(()=>{
  //任務
});
//啓動 Task 指定調度器
task.Start(scheduler);
複製代碼

編寫源碼

建立名爲 TaskQueue 的類,添加變量

//根據需求設置默認併發數
private const int DefaultConcurrentCount = 32;
//線程鎖
private static object _lock = new object();
//默認靜態串行隊列對象
private static TaskQueue _defaultSerial;
//默認靜態併發隊列對象
private static TaskQueue _defaultConcurrent;
//持有的調度器
private LimitedConcurrencyLevelTaskScheduler _scheduler;  

//提供默認串行隊列
public static TaskQueue DefaultSerailQueue
{
    get
    {
        if (_defaultSerial == null)
        {
            lock (_lock)
            {
                if (_defaultSerial == null)
                {
                    _defaultSerial = new TaskQueue(1);
                }
            }
        }
        return _defaultSerial;
    }
}

//提供默認併發隊列
public static TaskQueue DefaultConcurrentQueue
{
    get
    {
        if (_defaultConcurrent == null)
        {
            lock (_lock)
            {
                if (_defaultConcurrent == null)
                {
                    _defaultConcurrent = new TaskQueue(DefaultConcurrentCount);
                }
            }
        }
        return _defaultConcurrent;
    }
}
複製代碼

提供快捷構造方法

//默認構造方法,因 Loom 爲 UnityEngine.Monobehaviour對象,因此必須執行初始化方法將其加入場景中
public TaskQueue(int concurrentCount)
{
    _scheduler = new LimitedConcurrencyLevelTaskScheduler(concurrentCount);
    Loom.Initialize();
}
//建立串行隊列
public static TaskQueue CreateSerialQueue()
{
    return new TaskQueue(1);
}
//建立併發隊列
public static TaskQueue CreateConcurrentQueue()
{
    return new TaskQueue(DefaultConcurrentCount);
}
複製代碼

下面是各類同步、異步、主線程執行方法,方法會將執行的 Task 返回,以便在實際使用中須要對 Task 有其餘操做

需注意 RunSyncOnMainThreadRunAsyncOnMainThread 爲獨立的執行系統與隊列無關,若在主線程執行方法直接在主線程同步執行便可

//異步執行,因Task自己會開啓新線程因此直接調用便可
public Task RunAsync(Action action)
{
    Task t = new Task(action);
    t.Start(_scheduler);
    return t;
}

//同步執行,使用 Task 提供的 RunSynchronously 方法
public Task RunSync(Action action)
{
    Task t = new Task(action);
    t.RunSynchronously(_scheduler);
    return t;
}

//同步執行主線程方法
//爲避免主線程調用該方法因此須要判斷當前線程,若爲主線程則直接執行,防止死鎖  
//爲保證線程同步,此處使用信號量,僅在主線程方法執行完成後纔會釋放信號
public static void RunSyncOnMainThread(Action action)
{
    if (Thread.CurrentThread.ManagedThreadId == 1)
    {
        action();
    }
    else
    {
        Semaphore sem = new Semaphore(0, 1);
        Loom.QueueOnMainThread((o => { 
            action();
            sem.Release();
        }), null);
        sem.WaitOne();
    }
}

//因 Loom 自己即爲不會當即執行方法,因此直接調用便可
public static void RunAsyncOnMainThread(Action action)
{
    Loom.QueueOnMainThread((o => { action(); }), null);
}
複製代碼

擴展延遲執行方法,因延遲自己爲異步操做,因此只提供異步執行方式

// 此處使用async、await 關鍵字實現延遲操做, delay 爲秒,Task.Delay 參數爲毫秒
public Task RunAsync(Action action, float delay)
{
    Task t = Task.Run(async () =>
    {
        await Task.Delay((int) (delay * 1000));
        return RunAsync(action);
    });
    return t;
}
複製代碼

實現效果

併發隊列異步執行 併發異步

併發隊列同步執行

串行隊列異步執行

串行隊列同步執行

併發隊列延遲執行

子線程異步執行主線程

子線程同步執行主線程

到此一個多線程任務隊列工具就完成了,通常的需求基本能夠知足,後續還可提供更多擴展功能,如傳參、取消任務等

另外我我的想盡力將這套工具脫離 UnityEngine.Monobehaviour,但目前還沒找到除 Loom 外其餘 Unity 獲取主線程的方法,固然 Loom 自己仍然是一個很巧妙的工具

若想了解 LimitedConcurrencyLevelTaskSchedulerLoom 可繼續想下看

其餘

LimitedConcurrencyLevelTaskScheduler

TaskScheduler 爲抽象類,想自定義任務調度需繼承該類,並複寫部份內部調度方法

LimitedConcurrencyLevelTaskScheduler ,如下簡稱爲 LCLTS,爲微軟官方文檔提供的示例代碼,用於調度任務,控制併發數

LCLTS 工做流程
  1. 將 Task 入隊放入鏈表
  2. 判斷當前已執行任務數量,若未達到最大值則經過 ThreadPool 分配工做線程執行,並計數+1
  3. 標記已分匹配的線程並死循環執行任務隊列,將已執行的任務出隊
  4. 任務隊列爲空時退出循環,並移除標記
  5. 如有任務想插隊到線程執行,先檢查當前線程標記,若無標記則沒法執行插隊操做,該操做爲避免任務佔用隊列外繁忙線程
  6. 若插隊成功則檢查該 Task 是否已在隊列中,若存在則出隊執行,若不存在則直接執行
源碼解釋
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
   //ThreadStatic 線程變量特性,代表是當前線程是否正在處理任務
   [ThreadStatic]
   private static bool _currentThreadIsProcessingItems;

  // 任務隊列,使用鏈表比 List 和 Array 更方便執行插隊出隊操做(隊列中不會出現空位)
   private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // 該隊列由 lock(_tasks) 鎖定

   // 最大併發數
   private readonly int _maxDegreeOfParallelism;

   // 當前已分配入隊的任務數量 
   private int _delegatesQueuedOrRunning = 0;

   // 帶併發數的構造方法
   public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
   {
       if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
       _maxDegreeOfParallelism = maxDegreeOfParallelism;
   }

   // 將 Task 放入調度隊列
   protected sealed override void QueueTask(Task task)
   {
      //將任務放入列表,檢查當前執行數是否達到最大值,若未達到則分配線程執行,並計數+1
       lock (_tasks)
       {
           _tasks.AddLast(task);
           if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
           {
               ++_delegatesQueuedOrRunning;
               NotifyThreadPoolOfPendingWork();
           }
       }
   }

   // 使用 ThreadPool 將 Task 分配到工做線程
   private void NotifyThreadPoolOfPendingWork()
   {
       ThreadPool.UnsafeQueueUserWorkItem(_ =>
       {
         	 //標記當前線程正在執行任務,當有 Task 想插入此線程執行時會檢查該狀態
           _currentThreadIsProcessingItems = true;
           try
           {
               // 死循環處理全部隊列中 Task
               while (true)
               {
                   Task item;
                   lock (_tasks)
                   {
                       // 任務隊列執行完後退出循環,並將佔用標記置爲 false
                       if (_tasks.Count == 0)
                       {
                           --_delegatesQueuedOrRunning;
                           break;
                       }

                       // 若還有 Task 則獲取第一個,並出隊
                       item = _tasks.First.Value;
                       _tasks.RemoveFirst();
                   }

                   // 執行 Task
                   base.TryExecuteTask(item);
               }
           }
           // 線程佔用標記置爲 false
           finally { _currentThreadIsProcessingItems = false; }
       }, null);
   }

   // 嘗試在當前線程執行指定任務
   protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
   {
       // 若當前線程沒有在執行任務則沒法執行插隊操做
       if (!_currentThreadIsProcessingItems) return false;

       // 若該任務已在隊列中,則出隊
       if (taskWasPreviouslyQueued) 
          // 嘗試執行 Task
          if (TryDequeue(task)) 
            return base.TryExecuteTask(task);
          else
             return false; 
       else 
          return base.TryExecuteTask(task);
   }

   // 嘗試將已調度的 Task 移出調度隊列
   protected sealed override bool TryDequeue(Task task)
   {
       lock (_tasks) return _tasks.Remove(task);
   }

   // 獲取最大併發數
   public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

   // 獲取已調度任務隊列迭代器
   protected sealed override IEnumerable<Task> GetScheduledTasks()
   {
       bool lockTaken = false;
       try
       {
         	 // Monitor.TryEnter 做用爲線程鎖,其語法糖爲 lock (_tasks)
           Monitor.TryEnter(_tasks, ref lockTaken);
           if (lockTaken) return _tasks;
           else throw new NotSupportedException();
       }
       finally
       {
           if (lockTaken) Monitor.Exit(_tasks);
       }
   }
}
複製代碼

Loom

Loom 經過繼承 UnityEngine.MonoBehaviour,使用 Unity 主線程生命週期 Update 在主線程執行方法,同時 Loom 也支持簡單的多線程異步執行

Loom 結構和流程

Loom 包含兩個隊列有延遲方法隊列和無延遲方法隊列,兩條隊列方法均可執行傳參方法

  1. Actionparam 以及延遲時間打包入結構體放入延遲或無延遲隊列
  2. 若爲延遲任務,則使用 Time.time 獲取添加任務的時間加上延遲時間獲得預約執行時間打包入延遲任務結構體併入隊
  3. 待一個 Update 週期執行,清空執行隊列舊任務,取出無延遲隊列全部對象,放入執行隊列,清空無延遲隊列,遍歷執行執行隊列任務
  4. 同一個 Update 週期,清空延遲執行隊列舊任務,取出預計執行時間小於等於當前時間的任務,放入延遲執行隊列,將取出的任務移出延遲隊列,遍歷執行延遲執行隊列任務
Loom 的使用

用戶可將 Loom 腳本掛載在已有對象上,也可直接代碼調用方法,Loom 會自動在場景中添加一個不會銷燬的 Loom 單例對象

代碼中使用 QueueOnMainThread 將延遲和無延遲方法加入主線程隊列, RunAsync 執行異步方法

public class Loom :MonoBehaviour
{
    public static int maxThreads = 8;
    static int numThreads;

    private static Loom _current;
    //private int _count;
    public static Loom Current
    {
        get
        {
            Initialize();
            return _current;
        }
    }

    void Awake()
    {
        _current = this;
        initialized = true;
    }

    static bool initialized;

    [RuntimeInitializeOnLoadMethod]
    public static void Initialize()
    {
        if (!initialized)
        {

            if (!Application.isPlaying)
                return;
            initialized = true;
            var g = new GameObject("Loom");
            _current = g.AddComponent<Loom>();
#if !ARTIST_BUILD
            UnityEngine.Object.DontDestroyOnLoad(g);
#endif
        }
    }
    public struct NoDelayedQueueItem
    {
        public Action<object> action;
        public object param;
    }

    private List<NoDelayedQueueItem> _actions = new List<NoDelayedQueueItem>();
    public struct DelayedQueueItem
    {
        public float time;
        public Action<object> action;
        public object param;
    }
    private List<DelayedQueueItem> _delayed = new List<DelayedQueueItem>();

    List<DelayedQueueItem> _currentDelayed = new List<DelayedQueueItem>();

    public static void QueueOnMainThread(Action<object> taction, object tparam)
    {
        QueueOnMainThread(taction, tparam, 0f);
    }
    public static void QueueOnMainThread(Action<object> taction, object tparam, float time)
    {
        if (time != 0)
        {
            lock (Current._delayed)
            {
                Current._delayed.Add(new DelayedQueueItem { time = Time.time + time, action = taction, param = tparam });
            }
        }
        else
        {
            lock (Current._actions)
            {
                Current._actions.Add(new NoDelayedQueueItem { action = taction, param = tparam });
            }
        }
    }

    public static Thread RunAsync(Action a)
    {
        Initialize();
        while (numThreads >= maxThreads)
        {
            Thread.Sleep(100);
        }
        Interlocked.Increment(ref numThreads);
        ThreadPool.QueueUserWorkItem(RunAction, a);
        return null;
    }

    private static void RunAction(object action)
    {
        try
        {
            ((Action)action)();
        }
        catch
        {
        }
        finally
        {
            Interlocked.Decrement(ref numThreads);
        }

    }


    void OnDisable()
    {
        if (_current == this)
        {

            _current = null;
        }
    }



    // Use this for initialization
    void Start()
    {

    }

    List<NoDelayedQueueItem> _currentActions = new List<NoDelayedQueueItem>();

    // Update is called once per frame
    void Update()
    {
        if (_actions.Count > 0)
        {
            lock (_actions)
            {
                _currentActions.Clear();
                _currentActions.AddRange(_actions);
                _actions.Clear();
            }
            for (int i = 0; i < _currentActions.Count; i++)
            {
                _currentActions[i].action(_currentActions[i].param);
            }
        }

        if (_delayed.Count > 0)
        {
            lock (_delayed)
            {
                _currentDelayed.Clear();
                _currentDelayed.AddRange(_delayed.Where(d => d.time <= Time.time));
                for (int i = 0; i < _currentDelayed.Count; i++)
                {
                    _delayed.Remove(_currentDelayed[i]);
                }
            }

            for (int i = 0; i < _currentDelayed.Count; i++)
            {
                _currentDelayed[i].action(_currentDelayed[i].param);
            }
        }
    }
}
複製代碼
相關文章
相關標籤/搜索