Unity Job System

  參考連接 : 數組

  http://esprog.hatenablog.com/entry/2018/05/19/150313安全

  https://blogs.unity3d.com/2018/10/22/what-is-a-job-system/多線程

 

  Job系統做爲一個多線程系統, 它由於跟ECS有天生的融合關係因此比較重要的樣子, 我也按照使用類型的分類來看看Job System到底怎麼樣.函數

  Job說實話就是一套封裝的多線程系統, 我相信全部開發人員都能本身封裝一套, 因此Unity推出這個的時候跟着ECS一塊兒推出, 由於單獨推出來的話確定推不動, 多線程, 線程安全, 線程鎖, 線程共享資源, 這些都沒什麼區別, 我從一個簡單列表的功能來講吧.oop

  先來一個普通的多線程 :  測試

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

using System;
using System.Threading;

public class NormalListAccessTest01 : MonoBehaviour
{
    public class RunData
    {
        public List<int> datas = new List<int>();
        public float speed;
        public float deltaTime;
    }

    public static void RunOnThread<T>(System.Action<T> call, T obj, System.Action endCall = null)
    {
        System.Threading.ThreadPool.QueueUserWorkItem((_obj) =>
        {
            call.Invoke(obj);
            if(endCall != null)
            { ThreadMaster.Instance.CallFromMainThread(endCall); }
        });
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Run Test"))
        {
            ThreadMaster.GetOrCreate();
            var data = new RunData();
            data.deltaTime = Time.deltaTime;
            data.speed = 100.0f;
            for(int i = 0; i < 10000; i++)
            {
                data.datas.Add(i);
            }
            RunOnThread<RunData>((_data) =>
            {
                // 這是在工做線程裏
                Debug.Log("Start At : " + System.DateTime.Now.ToString("HH:mm:ss fff"));
                var move = _data.deltaTime * _data.speed;
                for(int i = 0; i < _data.datas.Count; i++)
                {
                    var val = _data.datas[i] + 1;
                    _data.datas[i] = val;
                }
            }, data, () =>
            {
                // 這是在主線程裏
                Debug.Log(data.datas[0]);
                Debug.Log("End At : " + System.DateTime.Now.ToString("HH:mm:ss fff"));
            });
        }
    }
}

  線程轉換的一個簡單封裝ThreadMaster : 優化

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

public class ThreadMaster : MonoBehaviour
{
    private static ThreadMaster _instance;
    public static ThreadMaster Instance
    {
        get
        {
            return GetOrCreate();
        }
    }

    private volatile List<System.Action> _calls = new List<System.Action>();

    public static ThreadMaster GetOrCreate()
    {
        if(_instance == false)
        {
            _instance = new GameObject("ThreadMaster").AddComponent<ThreadMaster>();
        }
        return _instance;
    }
    public void CallFromMainThread(System.Action call)
    {
        _calls.Add(call);
    }
    void Update()
    {
        if(_calls.Count > 0)
        {
            for(int i = 0; i < _calls.Count; i++)
            {
                var call = _calls[i];
                call.Invoke();
            }
            _calls.Clear();
        }
    }
}

  沒有加什麼鎖, 簡單運行沒有問題, 下面來個Job的跑一下:  this

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;

public class JobSystemSample00 : MonoBehaviour
{
    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(100, Allocator.Persistent);

        var job = new VelocityJob()
        {
            datas = datas
        };

        JobHandle jobHandle = job.Schedule();
        JobHandle.ScheduleBatchedJobs();

        //Debug.Log(datas[0]);     // Error : You must call JobHandle.Complete()
        jobHandle.Complete();
        Debug.Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  這裏就有一個大問題了, 在有註釋的地方 // Error : You must call JobHandle.Complete(), 是說在Job沒有調用Complete()時, 去獲取相關數組內容是非法的! 而這個jobHandle.Complete(); 沒法經過工做線程去調用, 也就是說Job的運行它是沒法自行結束的, 沒法發出運行結束的通知的, 對比上面封裝的普通多線程弱爆了.  而這個Complete()函數若是在工做線程執行完成前調用, 會強制當即執行(文檔也是寫 Wait for the job to complete), 也就是說它只能在主線程調用而且會阻塞主線程, 這樣就能夠定性了, 它的Job System不是爲了提供通常使用的多線程封裝給咱們用的, 但是它又是很強大的, 由於它能使用高效的內存結構, 能保證數據訪問安全, 能在須要的時候調用Complete方法強制等待工做線程執行完畢(若是沒猜錯的話, 引擎對這個作了很大優化, 並非簡單等待), 還有BurstCompile等, 若是咱們封裝成功了的話, 就是很好的多線程庫了.spa

  PS : 打個比方一個mesh的渲染, 在渲染以前必須計算完全部座標轉換, Job的好處就是能夠進行多線程並行的計算, 而後還能被主線程強制執行完畢, 比在主線程中單獨計算強多了. 而這個強制執行纔是核心邏輯.線程

  通過幾回測試, 幾乎沒有辦法簡單擴展Job系統來讓它成爲像上面同樣擁有自動完成通知的系統, 以下 : 

  1. 添加JobHandle變量到IJob中, 在Execute結束時調用  

    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        [Unity.Collections.LowLevel.Unsafe.NativeDisableUnsafePtrRestriction]
        public JobHandle selfHandle;    // 是這個IJob調用Schedule的句柄

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            selfHandle.Complete();
        }
    }

  報錯, InvalidOperationException: VelocityJob.selfHandle.jobGroup uses unsafe Pointers which is not allowed. 沒法解決, 直接就沒法在IJob結構體中添加JobHandle變量. 而且沒法在工做線程中調用Complete方法.

  2. 添加回調函數進去

    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public System.Action endCall;

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            if(endCall != null)
            {
                endCall.Invoke();
            }
        }
    }

  報錯, Job系統的struct裏面只能存在值類型的變量 !!-_-

  3. 使用全局的引用以及線程轉換邏輯來作成自動回調的形式, 雖然可使用了但是很是浪費資源 :

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;
using System.Collections.Generic;

public class JobSystemSample01 : MonoBehaviour
{
    private static int _id = 0;
    public static int NewID => _id++;
    public static Dictionary<int, IJobCall> ms_handleRef = new Dictionary<int, IJobCall>();

    public class IJobCall
    {
        public JobHandle jobHandle;
        public System.Action endCall;
    }
    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public int refID;
        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            var handle = ms_handleRef[refID];
            ThreadMaster.Instance.CallFromMainThread(() =>
            {
                handle.jobHandle.Complete();
                if(handle.endCall != null)
                {
                    handle.endCall.Invoke();
                }
            });
        }
    }

    public void Test()
    {
        ThreadMaster.GetOrCreate();
        var datas = new NativeArray<int>(100, Allocator.Persistent);
        int id = NewID;
        var job = new VelocityJob() { refID = id, datas = datas };
        ms_handleRef[id] = new IJobCall()
        {
            jobHandle = job.Schedule(),
            endCall = () => { Debug.Log(datas[0]); datas.Dispose(); }
        };
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  經過上面封裝就能夠做爲通常多線程使用了, 而且咱們得到了引擎提供的數據安全和高效邏輯性, 再加上利用BurstCpmpile和只讀屬性, 可以提高一些計算效率吧. ECS on Job已經在另一篇中說過了, 這裏忽略了.

  ----------------------------------------------

  當我測試到IJobParallelFor的時候, 發現並行並不像GPU那樣的並行那麼美好, 由於GPU它自己就是全並行的, 像卷積之類的, 它跟像素的處理順序自己就沒有關係, 但是咱們的邏輯有些會受順序的影響. 先看看下面的代碼 : 

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;


public class IJobParallelForSample01 : MonoBehaviour
{
    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            if(index == 0)
            {
                index = datas.Length - 1;
            }
            datas[index] = datas[index - 1] + 1;
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(100, Allocator.Persistent);
        for(int i = 0; i < datas.Length; i++)
        {
            datas[i] = i;
        }
        var job = new VelocityJob()
        {
            datas = datas
        };

        var jobHandle = job.Schedule(datas.Length, 20);
        JobHandle.ScheduleBatchedJobs();
        
        jobHandle.Complete();
        Debug.Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  主要的是Schedule的方法上 : public static JobHandle Schedule<T>(this T jobData, int arrayLength, int innerloopBatchCount, JobHandle dependsOn = default) where T : struct, IJobParallelFor;

  第二個參數innerloopBatchCount表示的是分塊的大小, 好比咱們數組長度是100,  每20個元素分紅一塊, 一共能夠分5塊, 若是你的CPU核心數大於等於5它就能開5個線程來處理, 但是你不能去獲取這個塊以外的Index的數據:

  顯然這裏數據每20個一組被分爲了5組, 在5個線程裏, 而後跨組獲取數據就報錯了.

  測試一下線程數是否5個 : 

    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            throw new System.Exception(index + " ERROR");
        }
    }

   5個線程報錯, 應該每一個線程內的處理也是按照for的順序來的.

  把每一個塊改爲5的大小, 看看它能開幾個線程:

 var jobHandle = job.Schedule(datas.Length, 5);

  恩開了8個, 個人機器確實是8核的, 不過它的分塊不是我想的0-5-10-15, 或者0-12-24-36 而是整10的, 不知道爲何, 由於按照我設定每一個分組是5, 而總體平均100/8=12.5而不該該是整10的, 具體不詳.

  若是咱們要跟其它元素進行交互, 就只能把處理單元設置到跟數組同樣大, 才能在一個塊中處理:

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;


public class IJobParallelForSample01 : MonoBehaviour
{
    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            if(index > 0 && index < datas.Length - 1)
            {
                datas[index] = datas[datas.Length - 1];
            }
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(10, Allocator.Persistent);
        for(int i = 0; i < datas.Length; i++)
        {
            datas[i] = i;
        }
        var job = new VelocityJob()
        {
            datas = datas
        };

        var jobHandle = job.Schedule(datas.Length, datas.Length);
        JobHandle.ScheduleBatchedJobs();

        jobHandle.Complete();
        Debug .Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

 

  順便測試一下各個線程的分配狀況:

    private volatile static Dictionary<int, List<int>> ms_threads = new Dictionary<int, List<int>>();

    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            Debug.Log(index + " : " + System.Threading.Thread.CurrentThread.ManagedThreadId);
            lock(ms_threads)
            {
                List<int> val = null;
                ms_threads.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out val);
                if(val == null)
                {
                    val = new List<int>();
                    ms_threads[System.Threading.Thread.CurrentThread.ManagedThreadId] = val;
                }
                val.Add(index);
            }
        }
    }
        var jobHandle = job.Schedule(100, 5);

  結果是分爲8個線程, 4個線程的塊爲10, 4個爲15

  因此不能想固然的去獲取其它Index的內容, 畢竟分塊邏輯不必定.

相關文章
相關標籤/搜索