爲了更平穩地過渡,在真正進入UE的多線程渲染知識以前,先學習或重溫一下多線程編程的基礎知識。php
多線程(Multithread)編程的思想早在單核時代就已經出現了,當時的操做系統(如Windows95)就已經支持多任務的功能,其原理就是在單核中切換不一樣的上下文(Context),以便每一個進程中的線程都有時間得到執行指令的機會。html
但到了2005年,當單核主頻接近4GHz時,CPU硬件廠商英特爾和AMD發現,速度也會遇到本身的極限:那就是單純的主頻提高,已經沒法明顯提高系統總體性能。前端
隨着單核計算頻率摩爾定律的緩慢終結,Intel率先於2005年發佈了奔騰D和奔騰四至尊版840系列,首次支持了兩個物理級別的線程計算單元。此後十多年,多核CPU獲得蓬勃發展,由AMD製造的Ryzen 3990X處理器已經擁有64個核心128個邏輯線程。node
銳龍(Ryzen)3990X的宣傳海報中赫然凸顯的核心與線程數量。ios
硬件的多核發展,給軟件極大的發揮空間。應用程序能夠充分發揮多核多線程的計算資源,各個應用領域由此也產生多線程編程模型和技術。做爲遊戲的發動機Unreal Engine等商業引擎,一樣能夠利用多線程技術,以便更加充分地提高效率和效果。c++
使用多線程併發帶來的做用總結起來主要有兩點:git
可是,隨着CPU核心數量的提高,計算機得到的效益並不是直線提高,而是遵循Amdahl's law(阿姆達爾定律),Amdahl's law的公式定義以下:github
公式的各個份量含義以下:web
舉個具體的栗子,假設有8核16線程的CPU用於處理某個任務,這個任務有70%的部分是能夠並行處理的,那麼它的理論加速比爲:算法
因而可知,多線程編程帶來的效益並不是跟核心數呈直線正比,實際上它的曲線以下所示:
阿姆達爾定律揭示的核心數和加速比圖例。因而可知,可並行的任務佔比越低,加速比得到的效果越差:當可並行任務佔比爲50%時,16核已經基本達到加速比天花板,不管後面增長多少核心數量,都無濟於事;若是可並行任務佔比爲95%時,到2048個核心纔會達到加速比天花板。
雖然阿姆達爾定律給咱們帶來了殘酷的現實,可是,若是咱們可以提高任務並行佔比到接近100%,則加速比天花板能夠獲得極大提高:
如上公式所示,當\(p=1\)(便可並行的任務佔比100%)時,理論上的加速比和核心數量成線性正比!!
舉個具體的例子,在編譯Unreal Engine工程源碼或Shader時,因爲它們基本是100%的並行佔比,理論上能夠得到接近線性關係的加速比,在多核系統中將極大地縮短編譯時間。
利用多線程併發提升性能的方式有兩種:
上面闡述了多線程併發的益處,接下來講說它的反作用。總結起來,反作用以下:
本小節將闡述多線程編程技術中常涉及的基本概念。
進程(Process)是操做系統執行應用程序的基本單元和實體,它自己只是個容器,一般包含內核對象、地址空間、統計信息和若干線程。它自己並不真正執行代碼指令,而是交由進程內的線程執行。
對Windows而言,操做系統在建立進程時,同時也會給它建立一個線程,該線程被稱爲主線程(Primary thread, Main thread)。
對Unix而言,進程和主線程實際上是同一個東西,操做系統並不知道有線程的存在,線程更接近於lightweight processes(輕量級進程)的概念。
進程有優先級概念,Windows下由低到高爲:低(Low)、低於正常(Below normal)、正常(Normal)、高於正常(Above normal)、高(High)、實時(Real time)。(見下圖)
默認狀況下,進程的優先級爲Normal。優先級高的進程將會優先得到執行機會和時間。
線程(Thread)是能夠執行代碼的實體,一般不能獨立存在,須要依附在某個進程內部。一個進程能夠擁有多個線程,這些線程能夠共享進程的數據,以便並行或併發地執行多個任務。
在單核CPU中,操做系統(如Windows)可能會採用輪循(Round robin)的方式進行調度,使得多個線程看起來是同時運行的。(下圖)
在多核CPU中,線程可能會安排在不一樣的CPU核心同時運行,從而達到並行處理的目的。
採用SMP的Windows在多核CPU的執行示意圖。等待處理的線程被安排到不一樣的CPU核心。
每一個線程可擁有本身的執行指令上下文(如Windows的IP(指令寄存器地址)和SP(棧起始寄存器地址))、執行棧和TLS(Thread Local Storage,線程局部緩存)。
Windows線程建立和初始化示意圖。
線程局部存儲(Thread Local Storage)是一種存儲持續期,對象的生命週期與線程同樣,在線程開始時分配,線程結束時回收。每一個線程有該對象本身的實例,訪問和修改這樣的對象不會形成競爭條件(Race Condition)。
線程也存在優先級概念,優先級越高的將優先得到執行指令的機會。
線程的狀態通常有運行狀態、暫停狀態等。Windows可用如下接口切換線程狀態:
// 暫停線程 DWORD SuspendThread(HANDLE hThread); // 繼續運行線程 DWORD ResumeThread(HANDLE hThread);
同個線程可被屢次暫停,若是要恢復運行狀態,則須要調用同等次數的繼續運行接口。
協程(Coroutine)是一種輕量級(lightweight)的用戶態線程,一般跑在同一個線程,利用同一個線程的不一樣時間片斷執行指令,沒有線程、進程切換和調度的開銷。從使用者角度,能夠利用協程機制實如今同個線程模擬異步的任務和編碼方式。在同個線程內,它不會形成數據競爭,但也會因線程阻塞而阻塞。
纖程(Fiber)如同協程,也是一種輕量級的用戶態線程,能夠使得應用程序獨立決定本身的線程要如何運做。操做系統內核不知道纖程的存在,也不會爲它進行調度。
同個進程容許有多個線程,這些線程能夠共享進程的地址空間、數據結構和上下文。進程內的同一數據塊,可能存在多個線程在某個很小的時間片斷內同時讀寫,這就會形成數據異常,從而致使了不可預料的結果。這種不可預期性便造就了競爭條件(Race Condition)。
避免產生競爭條件的技術有不少,諸如原子操做、臨界區、讀寫鎖、內核對象、信號量、互斥體、柵欄、屏障、事件等等。
至少兩個線程同時執行任務的機制。通常有多核多物理線程的CPU同時執行的行爲,才能夠叫並行,單核的多線程不能稱之爲並行。
至少兩個線程利用時間片(Timeslice)執行任務的機制,是並行的更廣泛形式。即使單核CPU同時執行的多線程,也可稱爲併發。
併發的兩種形式——上:雙物理核心的同時執行(並行);下:單核的多任務切換(併發)。
事實上,併發和並行在多核處理器中是能夠同時存在的,好比下圖所示,存在雙核,每一個核心又同時切換着多個任務:
部分參考文獻嚴格區分了並行和併發,但部分文獻並不明確指出其中的區別。虛幻引擎的多線程渲染架構和API中,常出現並行和併發的概念,因此虛幻是明顯區分二者之間的含義。
線程池提供了一種新的任務併發的方式,調用者只須要傳入一組可並行的任務和分組的策略,即可以使用線程池的若干線程併發地執行任務,使得調用者無需接直接觸線程的調用和管理細節,下降了調用者的成本,也提高了線程的調度效率和吞吐量。
不過,建立一個線程池時,幾個關鍵性的設計問題會影響併發效率,好比:可以使用的線程數量,高效的任務分配方式,以及是否須要等待一個任務完成。
線程池能夠自定義實現,也能夠直接使用C++、操做系統或第三方庫提供的API。
在C++11以前,C++的多線程支持基本爲零,僅提供少許雞肋的volatile
等關鍵字。直到C++11標準,多線程才真正歸入C++標準,並提供了相關關鍵字、STL標準庫,以便使用者實現跨平臺的多線程調用。
固然,對使用者來講,多線程的實現可採用C++11的線程庫,也能夠根據具體的系統平臺提供的多線程API自定義線程庫,還能夠使用諸如ACE、boost::thread等第三方庫。使用C++自帶的多線程庫,有幾個優勢,一是使用簡單方便,依賴少;二是跨平臺,無需關注系統底層。
thread_local是C++是實現線程局部存儲的關鍵,添加了此關鍵字的變量意味着每一個線程都有本身的一份數據,不會共享同一份數據,避免數據競爭。
C11的關鍵字_Thread_local
用於定義線程局部變量。在頭文件<threads.h>
定義了thread_local
爲上述關鍵詞的同義。例如:
#include <threads.h> thread_local int foo = 0;
C++11引入的thread_local
關鍵字用於下述情形:
一、名字空間(全局)變量。
二、文件靜態變量。
三、函數靜態變量。
四、靜態成員變量。
此外,不一樣編譯器提供了各自的方法聲明線程局部變量:
// Visual C++, Intel C/C++ (Windows systems), C++Builder, Digital Mars C++ __declspec(thread) int number; // Solaris Studio C/C++, IBM XL C/C++, GNU C, Clang, Intel C++ Compiler (Linux systems) __thread int number; // C++ Builder int __thread number;
使用了volatile修飾符的變量意味着它在內存中的值可能隨時發生變化,也告訴編譯器不能作任何優化,每次使用到此變量的值都必須從內存中讀取,而不該該直接使用寄存器的值。
舉個具體的栗子吧。假設有如下代碼段:
int a = 10; volatile int *p = &a; int b, c; b = *p; c = *p;
若p
沒有volatile
修飾,則b = *p
和c = *p
只需從內存取一次p
的值,那麼b
和c
的值必然是10
。
若考慮volatile
的影響,假設執行完b = *p
語句以後,p
的值被其它線程修改了,則執行c = *p
會再次從內存中讀取p
的值,此時c
的值再也不是10,而是新的值。
可是,volatile並不能解決多線程的同步問題,只適合如下三種狀況使用:
一、和信號處理(signal handler)相關的場合。
二、和內存映射硬件(memory mapped hardware)相關的場合。
三、和非本地跳轉(setjmp
和 longjmp
)相關的場合。
嚴格來講atomic
並非關鍵字,而是STL的模板類,能夠支持指定類型的原子操做。
使用原子的類型意味着該類型的實例的讀寫操做都是原子性的,沒法被其它線程切割,從而達到線程安全和同步的目標。
可能有些讀者會好奇,爲何對於基本類型的操做也須要原子操做。好比:
int cnt = 0; auto f = [&]{cnt++;}; std::thread t1{f}, t2{f}, t3{f};
以上三個線程同時調用函數f
,該函數只執行cnt++
,在C++維度,彷佛只有一條執行語句,理論上不該該存在同步問題。然而,編譯成彙編指令後,會有多條指令,這就會在多線程中引發線程上下文切換,引發不可預知的行爲。
爲了不這種狀況,就須要加入atomic
類型:
std::atomic<int> cnt{0}; // 給cnt加入原子操做。 auto f = [&]{cnt++;}; std::thread t1{f}, t2{f}, t3{f};
加入atomic
以後,全部線程執行後的結果是肯定的,可以正常給變量計數。atomic
的實現機制與臨界區相似,但效率上比臨界區更快。
爲了更進一步地說明C++的單條語句可能生成多條彙編指令,可藉助Compiler Explorer來實時查探C++彙編後的指令:
Compiler Explorer動態將左側C++語句編譯出的彙編指令。上圖所示的c++代碼編譯後可能存在一對多的彙編指令,由此印證atomic原子操做的必要性。
充分利用std::atomic
的特性和接口,能夠實現不少非阻塞無鎖的線程安全的數據結構和算法,關於這一點的延伸閱讀,強力推薦《C++ Concurrency In Action》。
C++的線程類型是std::thread
,它提供的接口以下表:
接口 | 解析 |
---|---|
join | 加入主線程,使得主線程強制等待該線程執行完。 |
detach | 從主線程分離,使得主線程無需等待該線程執行完。 |
swap | 與另一個線程交換線程對象。 |
joinable | 查詢是否可加入主線程。 |
get_id | 獲取該線程的惟一標識符。 |
native_handle | 返回實現層的線程句柄。 |
hardware_concurrency | 靜態接口,返回硬件支持的併發線程數量。 |
使用範例:
#include <iostream> #include <thread> #include <chrono> void foo() { // simulate expensive operation std::this_thread::sleep_for(std::chrono::seconds(1)); } int main() { std::cout << "starting thread...\n"; std::thread t(foo); // 構造線程對象,且傳入被執行的函數。 std::cout << "waiting for thread to finish..." << std::endl; t.join(); // 加入主線程,使得主線程必須等待該線程執行完畢。 std::cout << "done!\n"; }
輸出:
starting thread... waiting for thread to finish... done!
若是須要在調用線程和新線程之間同步數據,則能夠使用C++的std::promise
和std::future
等機制。示例代碼:
#include <vector> #include <thread> #include <future> #include <numeric> #include <iostream> void accumulate(std::vector<int>::iterator first, std::vector<int>::iterator last, std::promise<int> accumulate_promise) { int sum = std::accumulate(first, last, 0); accumulate_promise.set_value(sum); // Notify future } int main() { // Demonstrate using promise<int> to transmit a result between threads. std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 }; std::promise<int> accumulate_promise; std::future<int> accumulate_future = accumulate_promise.get_future(); std::thread work_thread(accumulate, numbers.begin(), numbers.end(), std::move(accumulate_promise)); // future::get() will wait until the future has a valid result and retrieves it. // Calling wait() before get() is not needed //accumulate_future.wait(); // wait for result std::cout << "result = " << accumulate_future.get() << '\n'; work_thread.join(); // wait for thread completion }
輸出結果:
result = 21
可是,std::thread
的執行並不能保證是異步的,也多是在當前線程執行。
若是須要強制異步,則可以使用std::async
。它能夠指定兩種異步方式:std::launch::async
和std::launch::deferred
,前者表示使用新的線程異步地執行任務,後者表示在當前線程執行,且會被延遲執行。使用範例:
#include <iostream> #include <vector> #include <algorithm> #include <numeric> #include <future> #include <string> #include <mutex> std::mutex m; struct X { void foo(int i, const std::string& str) { std::lock_guard<std::mutex> lk(m); std::cout << str << ' ' << i << '\n'; } void bar(const std::string& str) { std::lock_guard<std::mutex> lk(m); std::cout << str << '\n'; } int operator()(int i) { std::lock_guard<std::mutex> lk(m); std::cout << i << '\n'; return i + 10; } }; template <typename RandomIt> int parallel_sum(RandomIt beg, RandomIt end) { auto len = end - beg; if (len < 1000) return std::accumulate(beg, end, 0); RandomIt mid = beg + len/2; auto handle = std::async(std::launch::async, parallel_sum<RandomIt>, mid, end); int sum = parallel_sum(beg, mid); return sum + handle.get(); } int main() { std::vector<int> v(10000, 1); std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n'; X x; // Calls (&x)->foo(42, "Hello") with default policy: // may print "Hello 42" concurrently or defer execution auto a1 = std::async(&X::foo, &x, 42, "Hello"); // Calls x.bar("world!") with deferred policy // prints "world!" when a2.get() or a2.wait() is called auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!"); // Calls X()(43); with async policy // prints "43" concurrently auto a3 = std::async(std::launch::async, X(), 43); a2.wait(); // prints "world!" std::cout << a3.get() << '\n'; // prints "53" } // if a1 is not done at this point, destructor of a1 prints "Hello 42" here
執行結果:
The sum is 10000 43 Hello 42 world! 53
另外,C++20已經支持輕量級的協程(coroutine)了,相關的關鍵字:co_await
,co_return
,co_yield
,跟C#等腳本語言的概念和用法一模一樣,但行爲和實現機制可能會稍有不一樣,此文不展開探討了。
線程同步的機制有不少,C++支持的有如下幾種:
[2.1.3.1 C++多線程關鍵字](#2.1.3.1 C++多線程關鍵字)已經對std::atomic
作了詳細的解析,能夠防止多線程之間共享數據的數據競險問題。此外,它還提供了豐富多樣的接口和狀態查詢,以便更加精細和高效地同步原子數據,常見接口和解析以下:
接口名 | 解析 |
---|---|
is_lock_free | 檢查原子對象是否無鎖的。 |
store | 存儲值到原子對象。 |
load | 從原子對象加載值。 |
exchange | 獲取原子對象的值,並替換成指定值。 |
compare_exchange_weak, compare_exchange_strong | 將原子對象的值和預期值(expected)對比,若是相同就替換成目標值(desired),並返回true ;若是不一樣,就加載原子對象的值到預期值(expected),並返回false 。weak模式不會卡調用線程,strong模式會卡住調用線程,直到原子對象的值和預期值(expected)相同。 |
fetch_add, fetch_sub, fetch_and, fetch_or, fetch_xor | 獲取原子對象的值,並對其相加、相減等操做。 |
operator ++, operator --, operator +=, operator -=, ... | 對原子對象響應各種操做符,操做符的意義和普通變量一致。 |
此外,C++20還支持wait, notify_one, notify_all等同步接口。
利用compare_exchange_weak
接口能夠很方便地實現線程安全的非阻塞式的數據結構。示例:
#include <atomic> #include <future> #include <iostream> template<typename T> struct node { T data; node* next; node(const T& data) : data(data), next(nullptr) {} }; template<typename T> class stack { public: std::atomic<node<T>*> head; // 堆棧頭, 採用原子操做. public: // 入棧操做 void push(const T& data) { node<T>* new_node = new node<T>(data); // 將原有的頭指針做爲新節點的下一節點. new_node->next = head.load(std::memory_order_relaxed); // 將新的節點和老的頭部節點作對比測試, 若是new_node->next==head, 說明其它線程沒有修改head, 能夠將head替換成new_node, 從而完成push操做. // 反之, 若是new_node->next!=head, 說明其它線程修改了head, 將其它線程修改的head保存到new_node->next, 繼續循環檢測. while(!head.compare_exchange_weak(new_node->next, new_node, std::memory_order_release, std::memory_order_relaxed)) ; // 空循環體 } }; int main() { stack<int> s; auto r1 = std::async(std::launch::async, &stack<int>::push, &s, 1); auto r2 = std::async(std::launch::async, &stack<int>::push, &s, 2); auto r3 = std::async(std::launch::async, &stack<int>::push, &s, 3); r1.wait(); r2.wait(); r3.wait(); // print the stack's values node<int>* node = s.head.load(std::memory_order_relaxed); while(node) { std::cout << node->data << " "; node = node->next; } }
輸出:
2 3 1
因而可知,利用原子及其接口能夠很方便地進行多線程同步,並且因爲是多線程異步入棧,棧的元素不必定與編碼的順序一致。
以上代碼還涉及內存訪問順序的標記:
關於這方面的詳情能夠參看第一篇的內存屏障或者《C++ concurrency in action》的章節5.3 同步操做和強制排序。
std::mutex即互斥量,它會在做用範圍內進入臨界區(Critical section),使得該代碼片斷同時只能由一個線程訪問,當其它線程嘗試執行該片斷時,會被阻塞。std::mutex常與std::lock_guard
,示例代碼:
#include <iostream> #include <map> #include <string> #include <chrono> #include <thread> #include <mutex> std::map<std::string, std::string> g_pages; std::mutex g_pages_mutex; // 聲明互斥量 void save_page(const std::string &url) { // simulate a long page fetch std::this_thread::sleep_for(std::chrono::seconds(2)); std::string result = "fake content"; // 配合std::lock_guard使用, 能夠及時進入和釋放互斥量. std::lock_guard<std::mutex> guard(g_pages_mutex); g_pages[url] = result; } int main() { std::thread t1(save_page, "http://foo"); std::thread t2(save_page, "http://bar"); t1.join(); t2.join(); // safe to access g_pages without lock now, as the threads are joined for (const auto &pair : g_pages) { std::cout << pair.first << " => " << pair.second << '\n'; } }
輸出:
http://bar => fake content http://foo => fake content
此外,手動操做std::mutex
的鎖定和解鎖,能夠實現一些特殊行爲,例如等待某個標記:
#include <chrono> #include <thread> #include <mutex> bool flag; std::mutex m; void wait_for_flag() { std::unique_lock<std::mutex> lk(m); // 這裏採用std::unique_lock而非std::lock_guard. std::unique_lock能夠實現嘗試得到鎖, 若是當前以及被其它線程鎖定, 則延遲直到其它線程釋放, 而後纔得到鎖. while(!flag) { lk.unlock(); // 解鎖互斥量 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 休眠100ms,在此期間,其它線程能夠進入互斥量,以便更改flag標記。 lk.lock(); // 再鎖互斥量 } }
std::condition_variable
和std::condition_variable_any
都是條件變量,都是C++標準庫的實現,它們都須要與互斥量配合使用。因爲std::condition_variable_any
更加通用,會在性能上產生更多的開銷。故而,應當首先考慮使用std::condition_variable
。
利用條件變量的接口,結合互斥量的使用,能夠很方便地執行線程間的等待、通知等操做。示例:
#include <iostream> #include <string> #include <thread> #include <mutex> #include <condition_variable> std::mutex m; std::condition_variable cv; // 聲明條件變量 std::string data; bool ready = false; bool processed = false; void worker_thread() { // 等待直到主線程改變ready爲true. std::unique_lock<std::mutex> lk(m); cv.wait(lk, []{return ready;}); // 得到了互斥量的鎖 std::cout << "Worker thread is processing data\n"; data += " after processing"; // 發送數據給主線程 processed = true; std::cout << "Worker thread signals data processing completed\n"; // 手動解鎖, 以便主線程得到鎖. lk.unlock(); cv.notify_one(); } int main() { std::thread worker(worker_thread); data = "Example data"; // send data to the worker thread { std::lock_guard<std::mutex> lk(m); ready = true; std::cout << "main() signals data ready for processing\n"; } cv.notify_one(); // wait for the worker { std::unique_lock<std::mutex> lk(m); cv.wait(lk, []{return processed;}); } std::cout << "Back in main(), data = " << data << '\n'; worker.join(); }
輸出:
main() signals data ready for processing Worker thread is processing data Worker thread signals data processing completed Back in main(), data = Example data after processing
C++的future(指望)是一種能夠訪問將來的返回值的機制,經常使用於多線程的同步。能夠建立future的類型有: std::async, std::packaged_task, std::promise。
future對象能夠執行wait、wait_for、wait_until,從而實現事件等待和同步,示例代碼:
#include <iostream> #include <future> #include <thread> int main() { // 從packaged_task獲取的future std::packaged_task<int()> task([]{ return 7; }); // wrap the function std::future<int> f1 = task.get_future(); // get a future std::thread t(std::move(task)); // launch on a thread // 從async()獲取的future std::future<int> f2 = std::async(std::launch::async, []{ return 8; }); // 從promise獲取的future std::promise<int> p; std::future<int> f3 = p.get_future(); std::thread( [&p]{ p.set_value_at_thread_exit(9); }).detach(); // 等待全部future std::cout << "Waiting..." << std::flush; f1.wait(); f2.wait(); f3.wait(); std::cout << "Done!\nResults are: " << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n'; t.join(); }
輸出:
Waiting...Done! Results are: 7 8 9
多線程按並行內容可分爲數據並行和任務並行兩種。其中數據並行是不一樣的線程攜帶不一樣的數據執行相同的邏輯,最經典的數據並行的應用是MMX指令、SIMD技術、Compute着色器等。任務並行是不一樣的線程執行不一樣的邏輯,數據能夠相同,也能夠不一樣,例如,遊戲引擎常常將文件加載、音頻處理、網絡接收乃至物理模擬都放到單獨的線程,以便它們能夠並行地執行不一樣的任務。
多線程若是按劃分粒度和方式,則有線性劃分、遞歸劃分、任務類型劃分等。
線性劃分法的最簡單應用就是將連續數組的元素平均分紅若干份,每份數據派發到一個線程中執行,例如並行化的std::for_each
和UE裏的ParallelFor
。
線性劃分示意圖。連續數據被均分爲若干份,接着派發到若干線程中並行地執行。
在線性劃分並行執行結束後,一般須要由調用線程合併和同步並行的結果。
遞歸劃分法是將連續數據按照某種規則劃分紅若干份,每一份又可繼續劃分紅更細粒度,直到某種規則中止劃分。經常使用於快速排序。
快速排序有兩個最基本的步驟:將數據劃分到中樞(pivot)元素以前或以後,而後對中樞元素以前和以後的兩半數組再次進行快速排序。因爲只有在一次排序結束後才能知道哪些項在中樞元素以前和以後,因此不能經過對數據的簡單(線性)劃分達到並行。當要對這種算法進行並行化,很天然的會想到使用遞歸。每一級的遞歸都會屢次調用quick_sort函數,由於須要知道哪些元素在中樞元素以前和以後。
遞歸劃分法示意圖。
將一個大框架內的邏輯劃分紅若干個子任務,它們之間一般保持獨立,也能夠有必定依賴,每一個任務派發到一個線程執行,這就意味着真正意義上的線程獨立,每一個線程只須要關注本身所要作的事情便可。
任務劃分示意圖。
合理地安排和劃分子任務,減小它們之間的依賴和等待同步,是提高運行效率的有利武器。不過,要作到這點,每每須要通過精細的設計實現以及反覆調試和修改。
上面這種實現機制常被稱爲Fork-Join(分叉-合併)並行模型,它和串行模型的運行機制對好比下圖:
上:串行運行模型;下:Fork-Join並行運行模型。
GDC2010的演講Task-based Multithreading - How to Program for 100 cores詳細闡述瞭如何採用基於Task的多線程運行機制:
基於Task的多線程比基於線程的架構要好不少,能夠更加充分地利用多核優點,使得每一個核心都保持忙碌狀態:
該文還提到了如何將基於任務的多線程應用於排序、迷宮尋路等實際案例中。
OpenGL及DirectX10以前版本的圖形API,全部的繪製指令是線性和阻塞式的,意味着每次調用Draw接口都不會當即返回,會卡住調用線程。這種CPU和GPU的交互機制在單核時代,對性能的影響不那麼突出,可是隨着多核時代的到來,這種交互機制顯然會嚴重影響運行性能。
若遊戲引擎的渲染器仍然是單線程的,這經常致使CPU的性能瓶頸,阻礙了利用多核計算資源來提升性能或豐富可視化內容。
傳統圖形API線性執行繪製指令示意圖。
單線程渲染器一般會致使單個 CPU 內核滿負荷運行,而其餘內核保持相對空閒,且性能低於可玩的幀率。
傳統圖形API在單線程單Context下設置渲染狀態調用繪製指令,而且繪製指令是阻塞式的,CPU和GPU沒法並行運行,其它CPU核心也會處於空閒等待狀態。
在這些傳統圖形API架構多線程渲染,必須從軟件層面着手,開闢多個線程,用於單獨處理邏輯和渲染指令,以便解除CPU和GPU的相互等待耦合。早在SIGGraph2008有個Talk(Practical Parallel Rendering with DirectX 9 and 10)專門講解如何在DirectX9和10實現軟件級的多線程渲染,核心部分就是在軟件層錄製(Playback)D3D的繪製命令(Command)。
Practical Parallel Rendering with DirectX 9 and 10中提出的一種軟件級的多線程渲染架構。
不過,這種軟件層面的命令錄製存在多種問題,不支持部分圖形API(如狀態查詢),需額外的命令緩存區記錄繪製指令,命令階段沒法建立真正的GPU資源等等。
DirectX11嘗試從硬件層面解決多線程渲染的問題。它支持了兩種設備上下文:即時上下文(Immediate Context)和延遲上下文(Deferred Context)。不一樣的延遲上下文能夠同時在不一樣的線程中使用,生成將在「即時上下文」中執行的命令列表。這種多線程策略容許將複雜的場景分解成併發任務。
DirectX11的多線程模型。
不一樣的延遲上下文能夠同時在不一樣的線程中使用,生成將在即時上下文中執行的命令列表。這種多線程策略容許將複雜的場景分解成併發任務。此外,延遲上下文在某些驅動的支持下,可實現硬件級的加速,而沒必要在即時上下文執行Command List。
爲何使用Deferred Context的Command List提早錄製繪製指令會比直接使用Immediate Context調用繪製指令的效率更高?
答案在於Command List內部會對已經錄製的指令作高度的優化,執行這些優化後的指令會明顯提高效率,比直接單獨每條調用圖形API會高效得多。
在D3D11中命令列表中的命令是被快速記錄下來,而不是當即執行的,直到程序調用ExecuteCommandList方法(調用即返回,不等待)才被GPU真正的執行,此時那些使用延遲渲染設備接口的CPU線程以及主渲染線程又能夠去幹別的事情了,好比繼續下一幀的碰撞檢測、物理變換、動畫插值、光照準備等等,從而爲記錄造成新的命令列表作準備。
不過,基於DirectX11的多線程架構,因爲硬件層面的加速不是必然支持,全部在Deferred Context記錄的指令連同Immediate Context的指令必須由同一個線程(一般是渲染線程)提交給GPU執行。
DirectX11下的多線程架構示意圖。
這種非硬件支持的多線程渲染只是節省了部分CPU時間(多線程錄製指令和繪製同步等待),並不能從硬件層面真正發揮多線程的威力。
相較於DirectX11過渡性的僞多線程模型(稱之僞,是由於當時的大多數驅動並不支持DirectX11的硬件級多線程),DirectX 12 多線程則經過顯著減小 API 調用額外開銷獲得了很大的改進,它取消了 DirectX 11 的設備上下文的概念,直接使用Command List來調用 D3D APIs,而後經過命令隊列將命令列表提交給 GPU,而且全部 DirectX 12顯卡都支持 DirectX 12 多線程的硬件加速。
DirectX12的多線程模型。
從原理上來看,DirectX12與DirectX11多線程渲染框架是相似的,都是經過在不一樣的CPU線程中錄製命令列表(Command Lists),最後再統一執行的方式完成多線程渲染。它們都從根本上屏蔽了使人髮指的Draw Call同步調用,而改成CPU和GPU徹底異步(並行)執行的方式,從而在總體渲染效率和性能上得到巨大的提高。
對於DirectX12,用戶層面有3種命令隊列(Command Queue):複製隊列(Copy Queue)、計算隊列(Compute Queue)和3D隊列(3D Queue),它們能夠並行地執行,而且經過柵欄(Fence)、信號(Signal)或屏障(Barrier)來等待和同步。
GPU硬件層面則有3種引擎:複製引擎(Copy Engine)、計算引擎(Compute Engine)和3D引擎(3D Engine),它們也能夠並行地執行,而且經過柵欄(Fence)、信號(Signal)或屏障(Barrier)來等待和同步。
命令隊列可驅動GPU硬件的若干引擎,但有必定的限制,更具體地,3D Queue能夠驅動GPU硬件的3種引擎,Compute Queue只能驅動Compute Engine和Copy Engine,Copy Queue僅能夠驅動Copy Engine。
在CPU層面,能夠有若干個線程,每一個線程可建立產生若干個命令列表(Command List),每一個命令列表可進入3種Command Queue的其中一種。當這些命令被GPU執行時,每種指令列表裏的命令會壓入不一樣的GPU引擎,以便它們並行地執行。(下圖)
DirectX12中的CPU線程、命令列表、命令隊列、GPU引擎之間的運行機制示意圖。
做爲跨平臺圖形API的新生表明Vulkan,摒棄了傳統圖形API的弊端,直面多核時代的優點,從而從設計和架構上發揮了並行渲染的威力。
綜合上看,Vulkan和DirectX12是很是接近的,都有着Command Buffer、CommandPool、Command Queue和Fence等核心概念,並行模式也很是類似:在不一樣的CPU線程並行地生成Command Buffer指令,最後由主線程收集這些Command Buffer並提交至GPU:
Vulkan圖形API並行示意圖。
而且,Vulkan的CommandPool能夠每幀被不一樣的線程建立,以便減小同步等待,提高並行效率:
Vulkan中的CommandPool在不一樣幀之間的並行示意圖。
此外,Vulkan也存在着和DirectX12相似的各類同步機制:
Vulkan同步機制:semaphore(信號)用於同步Queue;Fence(柵欄)用於同步GPU和CPU;Event(事件)和Barrier(屏障)用於同步Command Buffer。
關於Vulkan的更多用法、剖析、對比可參見文獻Evaluation of multi-threading in Vulkan。
Metal做爲iOS和MacOS系統的專屬圖形API,也是新生代的表明,它既兼容OpenGL這種傳統的圖形API用法,也支持相似Vulkan、DirectX12的新一代圖形API理念和架構。從使用者層面來看,Metal是比較友善的,提供告終構更清晰、概念更友好的API。
從OpenGL遷移到新生代圖形API的成本和收益對比。橫座標是從OpenGL(或ES)遷移其它圖形API的成本,縱座標是潛在的性能收益。可見Metal的遷移成本較低,但潛在的性能比也沒有Vulkan和DirectX12高。
Metal如同Vulkan和DirectX,有着不少類似的概念,諸如Command、Command Buffer、Command Queue及各種同步機制。
Metal基礎概念關係一覽表。其中Command Encoder有3種類型:MTLRenderCommandEncoder、MTLComputeCommandEncoder和MTLBlitCommandEncoder。CommandEncoder錄製命令以後,塞入Command Buffer,最終進入Command Queue命令隊列。
有了相似的概念和機制,Metal一樣能夠方便地實現多線程錄製命令,且從硬件層面支持多線程調度:
Metal多線程模型示意圖。圖中顯示了3個CPU線程同時錄製不一樣類型的Encoder,每一個線程都有專屬的Command Buffer,最終這些Command Buffer統一匯入Command Queue交由GPU執行。
在正式講解UE的多線程渲染以前,先了解一下其它主流商業引擎的多線程架構和設計。
Unity的渲染體系中有幾個核心概念,一個是Client,運行於主線程(邏輯線程),負責產生渲染指令;另外一個是Worker Thread,工做線程,用於協助處理主線程或生成渲染指令等各種子工做。Unity的渲染架構中支持如下幾種模式:
單線程渲染模式,此模式下只有單個Client組件,沒有工做線程。惟一的Client在主線程中產生全部的渲染命令(rendering command,RCMD),而且擁有圖形設備對象,也會在主線程向圖形設備產生調用圖形API命令(graphics API,GCMD),它的調度示意圖以下:
這種模式下,CPU和GPU可能會相互等待,沒法充分利用多核CPU,性能比較最差。
多線程渲染模式,這種模式下和單線程對比,就是多了一條工做線程,即用於生成GCMD的渲染線程,其中渲染線程跑的是GfxDeviceClient對象,專用於生成對應平臺的圖形API指令:
做業化渲染模式,此模式下有多個Client對象,單個渲染線程。此外,有多個做業對象,每一個做業對象跑在專用獨立的線程,用於生成即時圖形命令(intermediate graphics commands,IGCMD)。此外,還有一個工做線程(渲染線程)用於將做業線程生成的IGCMD轉換成圖形API的GCMD,運行示意圖以下:
圖形化做業渲染模式,此模式下有多個Client,多個工做線程,沒有渲染線程。主線程上的多個Client對象驅動工做線程上的對應圖形設備對象,直接生成GCMD,從而避免生成Jobified Rendering模式的IGCMD中間指令。只在支持硬件級多線程的圖形API上可啓用,如DirectX十二、Vulkan等。運行示意圖以下:
Frostbite(寒霜)引擎在早期的時候,將每一幀分紅個步驟:裁剪、構建、渲染,每一個步驟所需的數據都放到雙緩衝內(double buffer),採用級聯方式運行,應用簡單的同步流程。它的運行示意圖以下:
而通過多年的進化,Frostbite在前幾年採用了幀圖(Frame Graph)的多線程渲染模式。該模式旨在將引擎的各種渲染功能(Feature)和上層渲染邏輯(Renderer)和下層資源(Shader、RenderContext、圖形API等)隔離開來,以便作進一步的解耦、優化,其中最重要的優化即開啓多線程渲染。
FrameGraph是高層級的Render Pass和資源的表明,包含了一幀中所用到的全部信息。Pass之間能夠指定順序和依賴關係,下圖是其中的一個示例:
寒霜引擎採用幀圖方式實現的延遲渲染的順序和依賴圖。
其中幀圖的每一幀信息都有三個階段:創建(Setup)、編譯(Compile)和執行(Execute)。
創建階段就是建立各個Render Pass、輸入紋理、輸出紋理、依賴資源等等信息。
編譯階段的工做主要是剔除未使用的Render Pass和資源,計算資源生命週期,以及根據使用標記建立對應的GPU資源,建立GPU資源時又作了大量的優化,諸如:簡化顯存分配算法,在第一次使用時申請最後一次使用後釋放,異步計算外部資源的生命週期,源於綁定標記的精確資源管理,扁平化全部資源的引用以提高GPU高速緩存的命中率等等。編譯階段採用線性遍歷全部的RenderPass,遍歷時會計算資源引用次數、資源的最初和最後使用者、異步等待點和資源屏障等等。
執行階段就按照Setup的順序執行(編譯階段也不會從新排序),只遍歷那些未被剔除的Render Pass並執行它們的回調函數。若是是當即模式,則直接調用設備上下文的API。執行階段纔會根據編譯階段生成的handle真正獲取GPU資源。
最關鍵的是整個過程經過依賴圖(Dependency Grahp)實現自動化異步計算。異步機制在主時間軸開始,會自動同步在不一樣Queue裏的資源,同時會擴展它們的生命週期,以防意外釋放。固然,這個自動化系統也有反作用,如額外增長必定量的內存,可能會引起不可預期的性能瓶頸。因此,寒霜引擎支持手動模式,以便按照預期控制和更改異步運行方式,從而逐Render Pass選擇性加入。
下圖能夠比較簡潔明瞭說明異步計算的運行機制:
寒霜引擎異步計算示意圖。其中SSAO、SSAO Filter的Pass放入到異步隊列,它們會寫入和讀取Raw AO的紋理,即使在同步點以前結束,但Raw AO的生命週期依然會被延長到同步點。
總之,幀圖的渲染架構得益於記錄了該幀全部的信息,以致於能夠經過資源別名(Resource Aliasing)節省大量的內存和顯存,能夠實現半自動化的異步計算,能夠簡化渲染管線控制,能夠製做出更加良好的可視化和診斷工具。
頑皮狗的遊戲引擎採用的也是做業系統,容許非GPU端的邏輯代碼加入到做業系統。做業直接能夠開啓和等待其它做業,對調用者隱藏內存管理細節,提供了簡潔易用的API,性能優化放在了第二位。
其中做業系統運行於纖程(Fiber),每一個纖程相似局部的線程,用戶層提供棧空間,其上下文包含少許的纖程狀態,以便減小寄存器的佔用。實際地運行在線程上,協做型的多線程模型。因爲纖程非系統級的線程,切換上下文會很是快,只保存和恢復寄存器的狀態(程序計數,棧指針,gpr等),故而開銷會很小。
做業系統會開闢若干條工做線程,每條工做線程會鎖定到GPU硬件核心。線程是執行單元,纖程是上下文,做業老是在線程的上下文內執行,採用原子計數器來同步。下圖是頑皮狗引擎的做業系統架構圖:
頑皮狗引擎做業系統架構圖。擁有6個工做線程,160個纖程,3個做業隊列。
做業能夠向做業隊列添加新的做業,同時等待中的做業會放到專門的等待列表,每一個等待中的做業會有引用計數,直到引用計數爲0,纔會從等待隊列中出列,以便繼續執行。
在頑皮狗引擎內,除了IO線程以外的全部東西都是做業,包括遊戲物體更新、動做更新和混合、射線檢測、渲染命令生成等等。可見將做業系統發揮得淋漓盡致,最大程度提高了並行的比例和效率。
爲了提高幀率,將遊戲邏輯和渲染邏輯相分離,並行地執行,不過處理的是不一樣幀的數據,一般遊戲數據領先渲染數據一幀,而渲染邏輯又領先GPU數據一幀。
經過這樣的機制,能夠避免CPU線程之間以及CPU和GPU之間的同步和等待,提高了幀率和吞吐量。
此外,它的內存分配也作了精緻的管理,好比引入了帶標籤的內存堆(Tagged Heap),內存堆以2M爲一塊(Block),每一個Block帶有一個標籤(Game、Render、GPU之一),分配器分配和釋放內存時是在標籤堆裏執行,避免直接向操做系統獲取:
此外,分配器支持爲每一個工做線程分配一個專屬的塊(跟TLS相似),避免數據同步和等待的時間,避免數據競險。
命運(Destiny)是一款第一人稱的動做角色扮演MMORPG,它使用的引擎也被稱爲命運引擎(Destiny’s Engine)。
命運引擎在多線程架構上,採用的技術有基於任務的並行處理,做業管理設計和同步處理,做業的執行也是在纖程上。做業系統執行做業的優先級是FIFO(先進先出),做業圖是異源架構,做業之間存在依賴,但沒有柵欄。
它將每一幀分紅幾個步驟:模擬遊戲物體、物體裁剪、生成渲染命令、執行GPU相關工做、顯示。在線程設計上,會建立6條系統線程,每條線程的內容依次是:模擬循環,其它做業,渲染循環,音頻循環,做業核心和調試Log,異步任務、IO等。
在處理幀之間的數據,也是分離開遊戲模擬和渲染邏輯,遊戲模擬老是領先渲染一幀。遊戲模擬完以後,會將全部數據和狀態拷貝一份(鏡像,Mirror),以供下一幀的渲染使用:
命運引擎爲了最大化CPU和GPU的並行效率,採起了動態加載平衡(dynamic load balancing)和智能做業合批(smart job batching),具體作法是將全部渲染和可見性剔除的工做加入到任務系統,保持低延遲。下圖是並行化計算視圖做業的圖例:
此外,還將模擬邏輯從渲染邏輯中抽離和解耦,採用徹底的數據驅動的渲染管線,全部的排序、內存分配、遍歷等算法都遵循了高速緩存一致性(結構體小量化,數據對齊,使得單個結構體數據能一次性被加載進高速緩存行)。
本章節主要剖析一下UE的多線程基礎、設計及架構,以便後面更好地切入到多線程渲染。
UE的原子操做並無使用C++的Atomic模板,而是本身實現了一套,叫TAtomic。它提供的功能有加載、存儲、賦值等操做,在底層實現上,會採用平臺相關的原子操做接口實現:
// Engine\Source\Runtime\Core\Public\Templates\Atomic.h template <typename T> FORCEINLINE T Load(const volatile T* Element) { // 採起平臺相關的接口加載原子值. auto Result = FPlatformAtomics::AtomicRead((volatile TUnderlyingIntegerType_T<T>*)Element); return *(const T*)&Result; } template <typename T> FORCEINLINE void Store(const volatile T* Element, T Value) { // 採起平臺相關的接口存儲原子值. FPlatformAtomics::InterlockedExchange((volatile TUnderlyingIntegerType_T<T>*)Element, *(const TUnderlyingIntegerType_T<T>*)&Value); } template <typename T> FORCEINLINE T Exchange(volatile T* Element, T Value) { // 採起平臺相關的接口交換原子值. auto Result = FPlatformAtomics::InterlockedExchange((volatile TUnderlyingIntegerType_T<T>*)Element, *(const TUnderlyingIntegerType_T<T>*)&Value); return *(const T*)&Result; }
在內存順序上,不像C++提供了四種模式,UE作了簡化,只提供了兩種模式:
enum class EMemoryOrder { Relaxed, // 順序鬆散, 不會引發重排序 SequentiallyConsistent // 順序一致 };
須要注意的是,TAtomic雖然是模板類,但只對基本類型生效,UE是經過父類TAtomicBaseType_T
來達到檢測的目的:
template <typename T> class TAtomic final : public UE4Atomic_Private::TAtomicBaseType_T<T> { static_assert(TIsTrivial<T>::Value, "TAtomic is only usable with trivial types"); (......) }
UE實現了相似C++的Future和Promise對象,是模板類,抽象了返回值類型。如下是TFuture的聲明:
// Engine\Source\Runtime\Core\Public\Async\Future.h template<typename InternalResultType> class TFutureBase { public: bool IsReady() const; bool IsValid() const; void Wait() const { if (State.IsValid()) { while (!WaitFor(FTimespan::MaxValue())); } } bool WaitFor(const FTimespan& Duration) const { return State.IsValid() ? State->WaitFor(Duration) : false; } bool WaitUntil(const FDateTime& Time) const { return WaitFor(Time - FDateTime::UtcNow()); } protected: typedef TSharedPtr<TFutureState<InternalResultType>, ESPMode::ThreadSafe> StateType; const StateType& GetState() const; template<typename Func> auto Then(Func Continuation); template<typename Func> auto Next(Func Continuation); void Reset(); private: /** Holds the future's state. */ StateType State; }; template<typename ResultType> class TFuture : public TFutureBase<ResultType> { typedef TFutureBase<ResultType> BaseType; public: ResultType Get() const { return this->GetState()->GetResult(); } TSharedFuture<ResultType> Share() { return TSharedFuture<ResultType>(MoveTemp(*this)); } };
TPromise一般要和TFuture配合使用,以下所示:
template<typename InternalResultType> class TPromiseBase : FNoncopyable { typedef TSharedPtr<TFutureState<InternalResultType>, ESPMode::ThreadSafe> StateType; (......) protected: const StateType& GetState(); private: StateType State; // 存儲了Future的狀態. }; template<typename ResultType> class TPromise : public TPromiseBase<ResultType> { public: typedef TPromiseBase<ResultType> BaseType; public: // 獲取Future對象 TFuture<ResultType> GetFuture() { check(!FutureRetrieved); FutureRetrieved = true; return TFuture<ResultType>(this->GetState()); } // 設置Future的值 FORCEINLINE void SetValue(const ResultType& Result) { EmplaceValue(Result); } FORCEINLINE void SetValue(ResultType&& Result) { EmplaceValue(MoveTemp(Result)); } template<typename... ArgTypes> void EmplaceValue(ArgTypes&&... Args) { this->GetState()->EmplaceResult(Forward<ArgTypes>(Args)...); } private: bool FutureRetrieved; };
ParallelFor是UE內置的支持多線程並行處理任務的For循環,在渲染系統中應用得至關廣泛。它支持如下幾種並行方式:
enum class EParallelForFlags { None, // 默認並行方式 ForceSingleThread = 1, // 強制單線程, 經常使用於調試. Unbalanced = 2, // 非任務平衡, 經常使用於具備高度可變計算時間的任務. PumpRenderingThread = 4, // 注入渲染線程. 若是是在渲染線程調用, 須要保證ProcessThread空閒狀態. };
支持的ParallelFor調用方式以下:
inline void ParallelFor(int32 Num, TFunctionRef<void(int32)> Body, bool bForceSingleThread, bool bPumpRenderingThread=false); inline void ParallelFor(int32 Num, TFunctionRef<void(int32)> Body, EParallelForFlags Flags = EParallelForFlags::None); template<typename FunctionType> inline void ParallelForTemplate(int32 Num, const FunctionType& Body, EParallelForFlags Flags = EParallelForFlags::None); inline void ParallelForWithPreWork(int32 Num, TFunctionRef<void(int32)> Body, TFunctionRef<void()> CurrentThreadWorkToDoBeforeHelping, bool bForceSingleThread, bool bPumpRenderingThread = false); inline void ParallelForWithPreWork(int32 Num, TFunctionRef<void(int32)> Body, TFunctionRef<void()> CurrentThreadWorkToDoBeforeHelping, EParallelForFlags Flags = EParallelForFlags::None);
ParallelFor是基於TaskGraph機制實現的,因爲TaskGraph後面才提到,這裏就不涉及其實現。下面展現UE的一個應用案例:
// Engine\Source\Runtime\Engine\Private\Components\ActorComponent.cpp // 並行化增長Primitive到場景的用例. void FRegisterComponentContext::Process() { FSceneInterface* Scene = World->Scene; ParallelFor(AddPrimitiveBatches.Num(), // 數量 [&](int32 Index) //回調函數, Index返回索引 { if (!AddPrimitiveBatches[Index]->IsPendingKill()) { Scene->AddPrimitive(AddPrimitiveBatches[Index]); } }, !FApp::ShouldUseThreadingForPerformance() // 是否多線程處理 ); AddPrimitiveBatches.Empty(); }
UnrealTemplate.h定義了不少基礎模板,用於數據轉換、拷貝、轉移等功能。下面例舉部分常見的函數和類型:
模板名 | 解析 | stl映射 |
---|---|---|
template
ReferencedType* IfAThenAElseB(ReferencedType* A,ReferencedType* B) |
返回A ? A : B | - |
template
void Move(T& A,typename TMoveSupportTraits |
釋放A,將B的數據替換到A,但不會影響B的數據。 | - |
template
void Move(T& A,typename TMoveSupportTraits |
釋放A,將B的數據替換到A,但會影響B的數據。 | - |
FNoncopyable | 派生它便可實現不可拷貝的對象。 | - |
TGuardValue | 帶做業域的值,可指定一個新值和舊值,做用域內是新值,離開做用域變成舊值。 | - |
TScopeCounter | 帶做用域的計數器,做用域內計數器+1,離開做用域後計數器-1 | - |
template
typename TRemoveReference |
將引用轉換成右值,可能會修改源值。 | std::move |
template
T CopyTemp(T& Val) |
強制建立右值的拷貝,不會改變源值。 | - |
template
T&& Forward(typename TRemoveReference |
將引用轉換成右值引用。 | std::forward |
template <typename T, typename ArgType> T StaticCast(ArgType&& Arg) |
靜態類型轉換。 | static_cast |
UE的多線程實現上並無採納C++11標準庫的那一套,而是本身從系統級作了封裝和實現,包括系統線程、線程池、異步任務、任務圖以及相關的通知和同步機制。
FRunnable
是全部能夠在多個線程並行地運行的物體的父類,它提供的基礎接口以下:
// Engine\Source\Runtime\Core\Public\HAL\Runnable.h class CORE_API FRunnable { public: virtual bool Init(); // 初始化, 成功返回True. virtual uint32 Run(); // 運行, 只有Init成功纔會被調用. virtual void Stop(); // 請求提早中止. virtual void Exit(); // 退出, 清理數據. };
FRunnable
及其子類是可運行於多線程的對象,而與之對立的是隻在單線程運行的類FSingleThreadRunnable
:
// Engine\Source\Runtime\Core\Public\Misc\SingleThreadRunnable.h // 多線程禁用下的單線程運行的物體 class CORE_API FSingleThreadRunnable { public: virtual void Tick(); };
FRunnable
的子類很是多,如下是常見的部分核心子類及其解析。
FRenderingThread:運行於渲染線程上的對象。後面有章節會專門剖析。
FRHIThread:運行於RHI線程上的對象。後面有章節會專門剖析。
FRenderingThreadTickHeartbeat:運行於心跳渲染線程上的物體。
FTaskThreadBase:在線程執行的任務父類,後面會有章節專門解析這部分。
FQueuedThread:可存儲在線程池的線程父類。提供的接口以下:
// Engine\Source\Runtime\Core\Private\HAL\ThreadingBase.cpp class FQueuedThread : public FRunnable { protected: FEvent* DoWorkEvent; // 任務執行完畢的事件. TAtomic<bool> TimeToDie; // 是否須要超時. IQueuedWork* volatile QueuedWork; // 被執行的任務. class FQueuedThreadPoolBase* OwningThreadPool; // 所在的線程池. FRunnableThread* Thread; // 真正用於執行任務的線程. virtual uint32 Run() override; public: virtual bool Create(class FQueuedThreadPoolBase* InPool,uint32 InStackSize,EThreadPriority ThreadPriority); bool KillThread(); void DoWork(IQueuedWork* InQueuedWork); };
TAsyncRunnable:異步地在單獨線程運行的任務,是個模板類,聲明以下:
// Engine\Source\Runtime\Core\Public\Async\Async.h template<typename ResultType> class TAsyncRunnable: public FRunnable { public: virtual uint32 Run() override; private: TUniqueFunction<ResultType()> Function; TPromise<ResultType> Promise; TFuture<FRunnableThread*> ThreadFuture; };
FAsyncPurge:輔助類,提供銷燬位於工做線程的UObject對象。
因而可知,FRunnable對象並不能獨立存在,老是要依賴線程來真正地執行任務。
另外,還須要特地提出:FRenderingThread、FQueuedThread聽名字像是真正的線程,然而並非,只是用於處理某些特定任務的可運行物體,實際上仍是要依賴它們內部FRunnableThread的成員對象來執行。
FRunnableThread是可運行線程的父類,提供了一組用於管理線程生命週期的接口。它提供的基礎接口和解析以下:
// Engine\Source\Runtime\Core\Public\HAL\RunnableThread.h class CORE_API FRunnableThread { static uint32 RunnableTlsSlot; // FRunnableThread的TLS插槽索引. public: static uint32 GetTlsSlot(); // 靜態類, 用於建立線程, 需提供一個FRunnable對象, 用於線程執行的任務. static FRunnableThread* Create(FRunnable* InRunnable, const TCHAR* ThreadName, uint32 InStackSize = 0, EThreadPriority InThreadPri, uint64 InThreadAffinityMask,EThreadCreateFlags InCreateFlags); // 設置線程優先級. virtual void SetThreadPriority( EThreadPriority NewPriority ); // 暫停/繼續運行線程 virtual void Suspend( bool bShouldPause = true ); // 銷燬線程, 一般須要指定等待標記bShouldWait爲true, 不然可能引發內存泄漏或死鎖! virtual bool Kill( bool bShouldWait = true ); // 等待執行完畢, 會卡調用線程. virtual void WaitForCompletion(); const uint32 GetThreadID() const; const FString& GetThreadName() const; protected: FString ThreadName; FRunnable* Runnable; // 被執行對象 FEvent* ThreadInitSyncEvent; // 線程初始化完成同步事件, 防止線程未初始化完畢就執行任務. uint64 ThreadAffinityMask; // 親和標記, 用於線程傾向指定的CPU核心執行. TArray<FTlsAutoCleanup*> TlsInstances; // 線程消耗時須要一塊兒清理的Tls對象. EThreadPriority ThreadPriority; uint32 ThreadID; private: virtual void Tick(); };
須要注意的是,FRunnableThread提供了靜態建立接口,建立線程時須要指定一個FRunnable對象,做爲線程執行的任務。它是一個基礎父類,下面是繼承自它的部分核心子類及解析:
FRunnableThreadWin:Windows平臺的線程實現。它的接口和實現以下:
// Engine\Source\Runtime\Core\Private\Windows\WindowsRunnableThread.h class FRunnableThreadWin : public FRunnableThread { HANDLE Thread; // 線程句柄 // 線程回調接口, 建立線程時做爲參數傳入. static ::DWORD STDCALL _ThreadProc( LPVOID pThis ) { check(pThis); return ((FRunnableThreadWin*)pThis)->GuardedRun(); } uint32 GuardedRun(); uint32 Run(); public: // 轉換優先級 static int TranslateThreadPriority(EThreadPriority Priority) { switch (Priority) { case TPri_AboveNormal: return THREAD_PRIORITY_HIGHEST; case TPri_Normal: return THREAD_PRIORITY_HIGHEST - 1; case TPri_BelowNormal: return THREAD_PRIORITY_HIGHEST - 3; case TPri_Highest: return THREAD_PRIORITY_HIGHEST; case TPri_TimeCritical: return THREAD_PRIORITY_HIGHEST; case TPri_Lowest: return THREAD_PRIORITY_HIGHEST - 4; case TPri_SlightlyBelowNormal: return THREAD_PRIORITY_HIGHEST - 2; default: UE_LOG(LogHAL, Fatal, TEXT("Unknown Priority passed to TranslateThreadPriority()")); return TPri_Normal; } } // 設置優先級 virtual void SetThreadPriority( EThreadPriority NewPriority ) override { // Don't bother calling the OS if there is no need ThreadPriority = NewPriority; // Change the priority on the thread ::SetThreadPriority(Thread, TranslateThreadPriority(ThreadPriority)); } virtual void Suspend( bool bShouldPause = true ) override { check(Thread); if (bShouldPause == true) { SuspendThread(Thread); } else { ResumeThread(Thread); } } virtual bool Kill( bool bShouldWait = false ) override { check(Thread && "Did you forget to call Create()?"); bool bDidExitOK = true; // 先中止Runnable對象, 使得其有清理數據的機會 if (Runnable) { Runnable->Stop(); } // 等待線程處理完畢. if (bShouldWait == true) { // Wait indefinitely for the thread to finish. IMPORTANT: It's not safe to just go and // kill the thread with TerminateThread() as it could have a mutex lock that's shared // with a thread that's continuing to run, which would cause that other thread to // dead-lock. (This can manifest itself in code as simple as the synchronization // object that is used by our logging output classes. Trust us, we've seen it!) WaitForSingleObject(Thread,INFINITE); } // 關閉線程句柄 CloseHandle(Thread); Thread = NULL; return bDidExitOK; } virtual void WaitForCompletion( ) override { // Block until this thread exits WaitForSingleObject(Thread,INFINITE); } protected: virtual bool CreateInternal( FRunnable* InRunnable, const TCHAR* InThreadName, uint32 InStackSize = 0, EThreadPriority InThreadPri = TPri_Normal, uint64 InThreadAffinityMask = 0, EThreadCreateFlags InCreateFlags = EThreadCreateFlags::None) override { static bool bOnce = false; if (!bOnce) { bOnce = true; ::SetThreadPriority(::GetCurrentThread(), TranslateThreadPriority(TPri_Normal)); // set the main thread to be normal, since this is no longer the windows default. } check(InRunnable); Runnable = InRunnable; ThreadAffinityMask = InThreadAffinityMask; // 建立初始化完成同步事件. ThreadInitSyncEvent = FPlatformProcess::GetSynchEventFromPool(true); ThreadName = InThreadName ? InThreadName : TEXT("Unnamed UE4"); // Create the new thread { LLM_SCOPE(ELLMTag::ThreadStack); LLM_PLATFORM_SCOPE(ELLMTag::ThreadStackPlatform); // add in the thread size, since it's allocated in a black box we can't track LLM(FLowLevelMemTracker::Get().OnLowLevelAlloc(ELLMTracker::Default, nullptr, InStackSize)); LLM(FLowLevelMemTracker::Get().OnLowLevelAlloc(ELLMTracker::Platform, nullptr, InStackSize)); // 調用Windows API建立線程. Thread = CreateThread(NULL, InStackSize, _ThreadProc, this, STACK_SIZE_PARAM_IS_A_RESERVATION | CREATE_SUSPENDED, (::DWORD *)&ThreadID); } // If it fails, clear all the vars if (Thread == NULL) { Runnable = nullptr; } else { // 加入到線程管理器中. FThreadManager::Get().AddThread(ThreadID, this); ResumeThread(Thread); // Let the thread start up ThreadInitSyncEvent->Wait(INFINITE); SetThreadPriority(InThreadPri); } // 清理同步事件 FPlatformProcess::ReturnSynchEventToPool(ThreadInitSyncEvent); ThreadInitSyncEvent = nullptr; return Thread != NULL; } };
從上面代碼可看出,Windows平臺的線程直接調用Windows API建立和同步信息,從而實現線程的平臺抽象,從平臺依賴抽離出來。
FRunnableThreadPThread:POSIX Thread(簡稱PThread)的父類,經常使用於類Unix POSIX 系統,如Linux、Solaris、Apple等。其實現和Windows平臺相似,這裏就不展開其代碼解析了。它的子類有:
FRunnableThreadApple:蘋果系統(MacOS、iOS)的線程。
FRunnableThreadAndroid:安卓系統的線程。
FRunnableThreadUnix:Unix系統的線程。
FRunnableThreadHoloLens:HoloLens系統的線程。
FFakeThread:假線程,多線程被禁用後的代替品,實際運行於單個線程。
FRunnable和FRunnableThread是相輔相成的,缺一而不可,一個是運行的載體,一個是運行的內容。下面是它們的一個應用示例:
// 派生FRunnable class FMyRunnable : public FRunnable { bool bStop; public: virtual bool Init(void) { bStop = false; return true; } virtual uint32 Run(void) { for (int32 i = 0; i < 10 && !bStop; i++) { FPlatformProcess::Sleep(1.0f); } return 0; } virtual void Stop(void) { bStop = true; } virtual void Exit(void) { } }; void TestRunnableAndRunnableThread() { // 建立Runnable對象 FMyRunnable* MyRunnable = new FMyRunnable; // 建立線程, 傳入MyRunnable FRunnableThread* MyThread = FRunnableThread::Create(MyRunnable, TEXT("MyRunnable")); // 暫停當前線程 FPlatformProcess::Sleep(4.0f); // 等待線程結束 MyRunnable->Stop(); MyThread->WaitForCompletion(); // 清理數據. delete MyThread; delete MyRunnable; }
細心的同窗應該有注意到,建立線程的時候,會將線程加入到FThreadManager中,也就是說全部的線程都由FThreadManager來管理。如下是FThreadManager的聲明:
// Engine\Source\Runtime\Core\Public\HAL\ThreadManager.h class FThreadManager { FCriticalSection ThreadsCritical; // 修改線程列表Threads的臨界區 static bool bIsInitialized; TMap<uint32, class FRunnableThread*, TInlineSetAllocator<256>> Threads; // 線程列表, 注意數據結構是Map, Key是線程ID. public: void AddThread(uint32 ThreadId, class FRunnableThread* Thread); // 增長線程 void RemoveThread(class FRunnableThread* Thread); // 刪除線程 void Tick(); // 幀更新, 只對FFakeThread起做用. const FString& GetThreadName(uint32 ThreadId); void ForEachThread(TFunction<void(uint32, class FRunnableThread*)> Func); // 遍歷線程 static bool IsInitialized(); static FThreadManager& Get(); };
本節將闡述UE的隊列化QueuedWork體系,包含IQueuedWork、TAsyncQueuedWork、FQueuedThreadPool、FQueuedThreadPoolBase等。
IQueuedWork是一組抽象接口,存儲着一組隊列化的任務對象,會被FQueuedThreadPool線程池對象執行。IQueuedWork的接口以下:
// Engine\Source\Runtime\Core\Public\Misc\IQueuedWork.h class IQueuedWork { public: virtual void DoThreadedWork() = 0; // 執行隊列化的任務. virtual void Abandon() = 0; // 提早放棄執行, 並通知隊列裏的全部對象清理數據. };
因爲IQueuedWork只是抽象類,並無實際執行代碼,故而主要子類TAsyncQueuedWork承擔了實現代碼的任務,如下是TAsyncQueuedWork的聲明和實現:
// Engine\Source\Runtime\Core\Public\Async\Async.h template<typename ResultType> class TAsyncQueuedWork : public IQueuedWork { public: virtual void DoThreadedWork() override { SetPromise(Promise, Function); delete this; } virtual void Abandon() override { // not supported } private: TUniqueFunction<ResultType()> Function; // 被執行的函數列表. TPromise<ResultType> Promise; // 用於同步的對象 };
與FRunnable和FRunnableThread相似,TAsyncQueuedWork也不能獨立地執行任務,須要依賴FQueuedThreadPool來執行。下面是FQueuedThreadPool的聲明:
// Engine\Source\Runtime\Core\Public\Misc\QueuedThreadPool.h // 執行IQueuedWork任務列表的線程池. class FQueuedThreadPool { public: // 建立指定數量、棧大小和優先級的線程。 virtual bool Create( uint32 InNumQueuedThreads, uint32 StackSize = (32 * 1024), EThreadPriority ThreadPriority=TPri_Normal ) = 0; // 銷燬線程內的後臺線程. virtual void Destroy() = 0; // 加入隊列化任務. 若是有可用的線程, 則當即執行; 不然會稍後再執行. virtual void AddQueuedWork( IQueuedWork* InQueuedWork ) = 0; // 撤銷指定隊列化任務. virtual bool RetractQueuedWork( IQueuedWork* InQueuedWork ) = 0; // 獲取線程數量. virtual int32 GetNumThreads() const = 0; public: // 建立線程池對象. static FQueuedThreadPool* Allocate(); // 重寫棧大小. static uint32 OverrideStackSize; };
上面能夠看出,FQueuedThreadPool是抽象類,只提供接口,並無實現。實際上,實現是在FQueuedThreadPoolBase中,以下:
// Engine\Source\Runtime\Core\Private\HAL\ThreadingBase.cpp class FQueuedThreadPoolBase : public FQueuedThreadPool { protected: TArray<IQueuedWork*> QueuedWork; // 須要執行的任務列表 TArray<FQueuedThread*> QueuedThreads; // 線程池內的可用線程 TArray<FQueuedThread*> AllThreads; // 線程池內的全部線程 FCriticalSection* SynchQueue; // 同步臨界區 bool TimeToDie; // 超時標記 public: FQueuedThreadPoolBase() : SynchQueue(nullptr) , TimeToDie(0) { } virtual ~FQueuedThreadPoolBase() { Destroy(); } virtual bool Create(uint32 InNumQueuedThreads,uint32 StackSize = (32 * 1024),EThreadPriority ThreadPriority=TPri_Normal) override { // 處理同步鎖. bool bWasSuccessful = true; check(SynchQueue == nullptr); SynchQueue = new FCriticalSection(); FScopeLock Lock(SynchQueue); // Presize the array so there is no extra memory allocated check(QueuedThreads.Num() == 0); QueuedThreads.Empty(InNumQueuedThreads); if( OverrideStackSize > StackSize ) { StackSize = OverrideStackSize; } // 建立線程, 注意建立的是FQueuedThread. for (uint32 Count = 0; Count < InNumQueuedThreads && bWasSuccessful == true; Count++) { FQueuedThread* pThread = new FQueuedThread(); // 利用FQueuedThread對象建立真正的線程. if (pThread->Create(this,StackSize,ThreadPriority) == true) { QueuedThreads.Add(pThread); AllThreads.Add(pThread); } else { // 建立失敗, 清理線程對象. bWasSuccessful = false; delete pThread; } } // 建立線程池失敗, 清理數據. if (bWasSuccessful == false) { Destroy(); } return bWasSuccessful; } virtual void Destroy() override { if (SynchQueue) { { FScopeLock Lock(SynchQueue); TimeToDie = 1; FPlatformMisc::MemoryBarrier(); // Clean up all queued objects for (int32 Index = 0; Index < QueuedWork.Num(); Index++) { QueuedWork[Index]->Abandon(); } // Empty out the invalid pointers QueuedWork.Empty(); } // 等待全部線程執行完成, 注意這裏並無使用同步時間, 而是使用相似自旋鎖的機制. while (1) { { // 訪問AllThreads和QueuedThreads的數據時先鎖定臨界區. 防止其它線程修改數據. FScopeLock Lock(SynchQueue); if (AllThreads.Num() == QueuedThreads.Num()) { break; } } FPlatformProcess::Sleep(0.0f); // 切換當前線程時間片, 防止當前線程佔用cpu時鐘. } // 刪除全部線程. { FScopeLock Lock(SynchQueue); // Now tell each thread to die and delete those for (int32 Index = 0; Index < AllThreads.Num(); Index++) { AllThreads[Index]->KillThread(); delete AllThreads[Index]; } QueuedThreads.Empty(); AllThreads.Empty(); } // 刪除同步鎖. delete SynchQueue; SynchQueue = nullptr; } } int32 GetNumQueuedJobs() const { return QueuedWork.Num(); } virtual int32 GetNumThreads() const { return AllThreads.Num(); } // 加入隊列化任務. void AddQueuedWork(IQueuedWork* InQueuedWork) override { check(InQueuedWork != nullptr); if (TimeToDie) { InQueuedWork->Abandon(); return; } check(SynchQueue); FQueuedThread* Thread = nullptr; { // 操做線程池裏的全部數據前都須要鎖定臨界區. FScopeLock sl(SynchQueue); const int32 AvailableThreadCount = QueuedThreads.Num(); // 沒有可用線程, 加入任務隊列, 稍後再執行. if (AvailableThreadCount == 0) { QueuedWork.Add(InQueuedWork); return; } // 從可用線程池中獲取一個線程, 並將其從可用線程池中刪除. const int32 ThreadIndex = AvailableThreadCount - 1; Thread = QueuedThreads[ThreadIndex]; QueuedThreads.RemoveAt(ThreadIndex, 1, /* do not allow shrinking */ false); } // 執行任務 Thread->DoWork(InQueuedWork); } virtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) override { if (TimeToDie) { return false; // no special consideration for this, refuse the retraction and let shutdown proceed } check(InQueuedWork != nullptr); check(SynchQueue); FScopeLock sl(SynchQueue); return !!QueuedWork.RemoveSingle(InQueuedWork); } // 若是有可用任務,則獲取一個並執行, 不然將線程迴歸可用線程池. 此接口由FQueuedThread調用. IQueuedWork* ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread) { check(InQueuedThread != nullptr); IQueuedWork* Work = nullptr; // Check to see if there is any work to be done FScopeLock sl(SynchQueue); if (TimeToDie) { check(!QueuedWork.Num()); // we better not have anything if we are dying } if (QueuedWork.Num() > 0) { // Grab the oldest work in the queue. This is slower than // getting the most recent but prevents work from being // queued and never done Work = QueuedWork[0]; // Remove it from the list so no one else grabs it QueuedWork.RemoveAt(0, 1, /* do not allow shrinking */ false); } if (!Work) { // There was no work to be done, so add the thread to the pool QueuedThreads.Add(InQueuedThread); } return Work; } };
上面的接口ReturnToPoolOrGetNextJob並不是FQueuedThreadPoolBase調用,而是由正在執行任務且執行完畢的FQueuedThread對象主動調用,以下所示:
uint32 FQueuedThread::Run() { while (!TimeToDie.Load(EMemoryOrder::Relaxed)) { bool bContinueWaiting = true; (......) // 讓事件等待. if (bContinueWaiting) { DoWorkEvent->Wait(); } IQueuedWork* LocalQueuedWork = QueuedWork; QueuedWork = nullptr; FPlatformMisc::MemoryBarrier(); check(LocalQueuedWork || TimeToDie.Load(EMemoryOrder::Relaxed)); // well you woke me up, where is the job or termination request? // 不斷地從線程池獲取任務並執行, 直到線程池的全部任務執行完畢. while (LocalQueuedWork) { // 執行任務. LocalQueuedWork->DoThreadedWork(); // 從線程池獲取下一個任務. LocalQueuedWork = OwningThreadPool->ReturnToPoolOrGetNextJob(this); } } return 0; }
從上面能夠看出,FQueuedThreadPool和FQueuedThread的數據和接口巧妙地配合,從而並行化地執行任務。
線程池的機制已經講述完畢,下面講一下UE的全局線程池GThreadPool的初始化過程,此過程在FEngineLoop::PreInitPreStartupScreen中,1.4.6.1 引擎預初始化已經有說起:
// Engine\Source\Runtime\Launch\Private\LaunchEngineLoop.cpp int32 FEngineLoop::PreInitPreStartupScreen(const TCHAR* CmdLine) { (......) { TRACE_THREAD_GROUP_SCOPE("ThreadPool"); // 建立全局線程池 GThreadPool = FQueuedThreadPool::Allocate(); int32 NumThreadsInThreadPool = FPlatformMisc::NumberOfWorkerThreadsToSpawn(); // 若是是純服務器模式, 線程池只有一個線程. if (FPlatformProperties::IsServerOnly()) { NumThreadsInThreadPool = 1; } // 建立工做線程相等的線程數量. verify(GThreadPool->Create(NumThreadsInThreadPool, StackSize * 1024, TPri_SlightlyBelowNormal)); } (......) }
若是須要GThreadPool爲咱們作事,則使用示例以下:
// Engine\Source\Runtime\Engine\Private\ShadowMap.cpp // 多線程編碼紋理 if (bMultithreadedEncode) { // 完成的任務計數器. FThreadSafeCounter Counter(PendingTextures.Num()); // 待編碼的紋理任務列表 TArray<FAsyncEncode<FShadowMapPendingTexture>> AsyncEncodeTasks; AsyncEncodeTasks.Empty(PendingTextures.Num()); // 建立全部任務, 加入到AsyncEncodeTasks列表中. for (auto& PendingTexture : PendingTextures) { PendingTexture.CreateUObjects(); // 建立AsyncEncodeTask auto AsyncEncodeTask = new (AsyncEncodeTasks)FAsyncEncode<FShadowMapPendingTexture>(&PendingTexture, LightingScenario, Counter, TextureCompressorModule); // 將AsyncEncodeTask加入全局線程池並執行. GThreadPool->AddQueuedWork(AsyncEncodeTask); } // 若是還有任務未完成, 則讓當前線程進入睡眠狀態. while (Counter.GetValue() > 0) { GWarn->UpdateProgress(Counter.GetValue(), PendingTextures.Num()); FPlatformProcess::Sleep(0.0001f); } }
TaskGraph直譯是任務圖,使用的圖是DAG(Directed Acyclic Graph,有向非循環圖),能夠指定依賴關係,指定前序和後序任務,但不能有循環依賴。它是UE內迄今爲止最爲複雜的並行任務系統,涉及的概念、運行機制的複雜度都陡增,本節將花大篇幅描述它們,旨在闡述清楚它們的機制和原理。
FBaseGraphTask是運行於TaskGraph的任務,是個基礎父類,其派生的具體任務子類纔會執行任務。它的聲明(節選)以下:
// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h class FBaseGraphTask { protected: FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding); // 先決任務完成或部分地完成. void PrerequisitesComplete(ENamedThreads::Type CurrentThread, int32 NumAlreadyFinishedPrequistes, bool bUnlock = true); // 帶條件(前置任務都已經執行完畢)地執行任務 void ConditionalQueueTask(ENamedThreads::Type CurrentThread) { if (NumberOfPrerequistitesOutstanding.Decrement()==0) { QueueTask(CurrentThread); } } private: // 真正地執行任務, 由子類實現. virtual void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread)=0; // 加入到TaskGraph任務隊列中. void QueueTask(ENamedThreads::Type CurrentThreadIfKnown) { checkThreadGraph(LifeStage.Increment() == int32(LS_Queued)); FTaskGraphInterface::Get().QueueTask(this, ThreadToExecuteOn, CurrentThreadIfKnown); } ENamedThreads::Type ThreadToExecuteOn; // 執行任務的線程類型 FThreadSafeCounter NumberOfPrerequistitesOutstanding; // 執行任務前的計數器 };
FBaseGraphTask的惟一子類TGraphTask承接了完成執行任務的代碼。TGraphTask的聲明和實現以下:
template<typename TTask> class TGraphTask final : public FBaseGraphTask { public: // 構造任務的輔助類. class FConstructor { public: // 建立TTask任務對象, 而後設置TGraphTask任務的數據, 以便在適當時機執行. template<typename...T> FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args) { new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...); return Owner->Setup(Prerequisites, CurrentThreadIfKnown); } // 建立TTask任務對象, 而後設置TGraphTask任務的數據, 並持有但不執行. template<typename...T> TGraphTask* ConstructAndHold(T&&... Args) { new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...); return Owner->Hold(Prerequisites, CurrentThreadIfKnown); } private: TGraphTask* Owner; // 所在的TGraphTask對象. const FGraphEventArray* Prerequisites; // 先決任務. ENamedThreads::Type CurrentThreadIfKnown; }; // 建立任務, 注意返回的是FConstructor對象, 以便對任務執行後續操做. static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) { int32 NumPrereq = Prerequisites ? Prerequisites->Num() : 0; if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE) { void *Mem = FBaseGraphTask::GetSmallTaskAllocator().Allocate(); return FConstructor(new (Mem) TGraphTask(TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent(), NumPrereq), Prerequisites, CurrentThreadIfKnown); } return FConstructor(new TGraphTask(TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent(), NumPrereq), Prerequisites, CurrentThreadIfKnown); } void Unlock(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) { ConditionalQueueTask(CurrentThreadIfKnown); } FGraphEventRef GetCompletionEvent() { return Subsequents; } private: // 執行任務 void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread) override { (......) // 處理後續任務. if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents) { Subsequents->CheckDontCompleteUntilIsEmpty(); // we can only add wait for tasks while executing the task } // 執行任務 TTask& Task = *(TTask*)&TaskStorage; { FScopeCycleCounter Scope(Task.GetStatId(), true); Task.DoTask(CurrentThread, Subsequents); Task.~TTask(); checkThreadGraph(ENamedThreads::GetThreadIndex(CurrentThread) <= ENamedThreads::GetRenderThread() || FMemStack::Get().IsEmpty()); // you must mark and pop memstacks if you use them in tasks! Named threads are excepted. } TaskConstructed = false; // 執行後序任務. if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents) { FPlatformMisc::MemoryBarrier(); Subsequents->DispatchSubsequents(NewTasks, CurrentThread); } // 釋聽任務對象數據. if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE) { this->TGraphTask::~TGraphTask(); FBaseGraphTask::GetSmallTaskAllocator().Free(this); } else { delete this; } } // 設置先決任務. void SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock) { checkThreadGraph(!TaskConstructed); TaskConstructed = true; TTask& Task = *(TTask*)&TaskStorage; SetThreadToExecuteOn(Task.GetDesiredThread()); int32 AlreadyCompletedPrerequisites = 0; if (Prerequisites) { for (int32 Index = 0; Index < Prerequisites->Num(); Index++) { check((*Prerequisites)[Index]); if (!(*Prerequisites)[Index]->AddSubsequent(this)) { AlreadyCompletedPrerequisites++; } } } PrerequisitesComplete(CurrentThreadIfKnown, AlreadyCompletedPrerequisites, bUnlock); } // 設置任務數據. FGraphEventRef Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) { FGraphEventRef ReturnedEventRef = Subsequents; // very important so that this doesn't get destroyed before we return SetupPrereqs(Prerequisites, CurrentThreadIfKnown, true); return ReturnedEventRef; } // 持有任務數據. TGraphTask* Hold(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) { SetupPrereqs(Prerequisites, CurrentThreadIfKnown, false); return this; } // 建立任務. static FConstructor CreateTask(FGraphEventRef SubsequentsToAssume, const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) { if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE) { void *Mem = FBaseGraphTask::GetSmallTaskAllocator().Allocate(); return FConstructor(new (Mem) TGraphTask(SubsequentsToAssume, Prerequisites ? Prerequisites->Num() : 0), Prerequisites, CurrentThreadIfKnown); } return FConstructor(new TGraphTask(SubsequentsToAssume, Prerequisites ? Prerequisites->Num() : 0), Prerequisites, CurrentThreadIfKnown); } TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage; // 被執行的任務對象. bool TaskConstructed; FGraphEventRef Subsequents; // 後續任務同步對象. };
上面可知TGraphTask雖然是任務,但它執行的實際任務是TTask的模板類,UE的註釋裏邊給出了TTask的基本形式:
class FGenericTask { TSomeType SomeArgument; public: FGenericTask(TSomeType InSomeArgument) // 不能用引用, 可用指針代替之. : SomeArgument(InSomeArgument) { // Usually the constructor doesn't do anything except save the arguments for use in DoWork or GetDesiredThread. } ~FGenericTask() { // you will be destroyed immediately after you execute. Might as well do cleanup in DoWork, but you could also use a destructor. } FORCEINLINE TStatId GetStatId() const { RETURN_QUICK_DECLARE_CYCLE_STAT(FGenericTask, STATGROUP_TaskGraphTasks); } [static] ENamedThreads::Type GetDesiredThread() { return ENamedThreads::[named thread or AnyThread]; } void DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent) { // The arguments are useful for setting up other tasks. // Do work here, probably using SomeArgument. MyCompletionGraphEvent->DontCompleteUntil(TGraphTask<FSomeChildTask>::CreateTask(NULL,CurrentThread).ConstructAndDispatchWhenReady()); } };
然而,咱們若是須要定製本身的任務,直接使用或派生TAsyncGraphTask類便可,無需另起爐竈。TAsyncGraphTask和其父類FAsyncGraphTaskBase聲明以下:
// Engine\Source\Runtime\Core\Public\Async\Async.h // 後序任務模式 namespace ESubsequentsMode { enum Type { TrackSubsequents, // 追蹤後序任務 FireAndForget // 無需追蹤任務依賴, 能夠避免線程同步, 提高執行效率. }; } class FAsyncGraphTaskBase { public: TStatId GetStatId() const { return GET_STATID(STAT_TaskGraph_OtherTasks); } // 任務後序模式. static ESubsequentsMode::Type GetSubsequentsMode() { return ESubsequentsMode::FireAndForget; } }; template<typename ResultType> class TAsyncGraphTask : public FAsyncGraphTaskBase { public: // 構造任務, InFunction就是須要執行的代碼段. TAsyncGraphTask(TUniqueFunction<ResultType()>&& InFunction, TPromise<ResultType>&& InPromise, ENamedThreads::Type InDesiredThread = ENamedThreads::AnyThread) : Function(MoveTemp(InFunction)) , Promise(MoveTemp(InPromise)) , DesiredThread(InDesiredThread) { } public: // 執行任務 void DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent) { SetPromise(Promise, Function); } ENamedThreads::Type GetDesiredThread() { return DesiredThread; } TFuture<ResultType> GetFuture() { return Promise.GetFuture(); } private: TUniqueFunction<ResultType()> Function; // 被執行的函數對象. TPromise<ResultType> Promise; // 同步對象. ENamedThreads::Type DesiredThread; // 指望執行的線程類型. };
FTaskThreadBase是執行任務的線程父類,定義了一組設置、操做任務的接口,聲明以下:
class FTaskThreadBase : public FRunnable, FSingleThreadRunnable { public: FTaskThreadBase() : ThreadId(ENamedThreads::AnyThread) , PerThreadIDTLSSlot(0xffffffff) , OwnerWorker(nullptr) { NewTasks.Reset(128); } // 設置數據. void Setup(ENamedThreads::Type InThreadId, uint32 InPerThreadIDTLSSlot, FWorkerThread* InOwnerWorker) { ThreadId = InThreadId; check(ThreadId >= 0); PerThreadIDTLSSlot = InPerThreadIDTLSSlot; OwnerWorker = InOwnerWorker; } // 從當前線程初始化. void InitializeForCurrentThread() { // 設置平臺相關的TLS. FPlatformTLS::SetTlsValue(PerThreadIDTLSSlot, OwnerWorker); } ENamedThreads::Type GetThreadId() const; // 用於帶名字的線程處理任務直到線程空閒或RequestQuit被調用. virtual void ProcessTasksUntilQuit(int32 QueueIndex) = 0; // 用於帶名字的線程處理任務直到線程空閒或RequestQuit被調用. virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex); // 請求退出. 會致使線程空閒時退出到調用者. 若是是帶名字的線程, 在ProcessTasksUntilQuit中用以返回給調用者; 無名線程則直接關閉. virtual void RequestQuit(int32 QueueIndex) = 0; // 入隊任務, 假設this線程和當前線程同樣. 若是是帶名字的線程, 會直接進入私有的隊列. virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task); // 入隊任務, 假設this線程和當前線程不同. virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task); // 喚醒線程. virtual void WakeUp(); // 查詢任務是否在處理中. virtual bool IsProcessingTasks(int32 QueueIndex) = 0; // 單線程幀更新 virtual void Tick() override { ProcessTasksUntilIdle(0); } // FRunnable API virtual bool Init() override { InitializeForCurrentThread(); return true; } virtual uint32 Run() override { check(OwnerWorker); // make sure we are started up ProcessTasksUntilQuit(0); FMemory::ClearAndDisableTLSCachesOnCurrentThread(); return 0; } virtual void Stop() override { RequestQuit(-1); } virtual void Exit() override { } virtual FSingleThreadRunnable* GetSingleThreadInterface() override { return this; } protected: ENamedThreads::Type ThreadId; // 線程id(線程索引) uint32 PerThreadIDTLSSlot; // TLS槽. FThreadSafeCounter IsStalled; // 阻塞計數器. 用於觸發阻塞信號. TArray<FBaseGraphTask*> NewTasks; // 待處理的任務列表. FWorkerThread* OwnerWorker; // 所在的工做線程對象. };
FTaskThreadBase只是抽象類,具體的實現由子類FNamedTaskThread和FTaskThreadAnyThread完成。
其中FNamedTaskThread處理帶名字線程的任務:
// 帶名字的任務線程. class FNamedTaskThread : public FTaskThreadBase { public: // 用於帶名字的線程處理任務直到線程空閒或RequestQuit被調用. virtual void ProcessTasksUntilQuit(int32 QueueIndex) override { check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up Queue(QueueIndex).QuitForReturn = false; verify(++Queue(QueueIndex).RecursionGuard == 1); // 不斷地循環處理隊列任務, 直到退出、關閉或平臺不支持多線程。 do { ProcessTasksNamedThread(QueueIndex, FPlatformProcess::SupportsMultithreading()); } while (!Queue(QueueIndex).QuitForReturn && !Queue(QueueIndex).QuitForShutdown && FPlatformProcess::SupportsMultithreading()); // @Hack - quit now when running with only one thread. verify(!--Queue(QueueIndex).RecursionGuard); } // 用於帶名字的線程處理任務直到線程空閒或RequestQuit被調用. virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override { check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up Queue(QueueIndex).QuitForReturn = false; verify(++Queue(QueueIndex).RecursionGuard == 1); uint64 ProcessedTasks = ProcessTasksNamedThread(QueueIndex, false); verify(!--Queue(QueueIndex).RecursionGuard); return ProcessedTasks; } // 處理任務. uint64 ProcessTasksNamedThread(int32 QueueIndex, bool bAllowStall) { uint64 ProcessedTasks = 0; (......) TStatId StallStatId; bool bCountAsStall = false; (......) while (!Queue(QueueIndex).QuitForReturn) { // 從隊列首部獲取任務. FBaseGraphTask* Task = Queue(QueueIndex).StallQueue.Pop(0, bAllowStall); TestRandomizedThreads(); if (!Task) { if (bAllowStall) { { FScopeCycleCounter Scope(StallStatId); Queue(QueueIndex).StallRestartEvent->Wait(MAX_uint32, bCountAsStall); if (Queue(QueueIndex).QuitForShutdown) { return ProcessedTasks; } TestRandomizedThreads(); } continue; } else { break; // we were asked to quit } } else // 任務不爲空 { // 執行任務. Task->Execute(NewTasks, ENamedThreads::Type(ThreadId | (QueueIndex << ENamedThreads::QueueIndexShift))); ProcessedTasks++; TestRandomizedThreads(); } } return ProcessedTasks; } virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task) override { checkThreadGraph(Task && Queue(QueueIndex).StallRestartEvent); // make sure we are started up uint32 PriIndex = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn) ? 0 : 1; int32 ThreadToStart = Queue(QueueIndex).StallQueue.Push(Task, PriIndex); check(ThreadToStart < 0); // if I am stalled, then how can I be queueing a task? } virtual void RequestQuit(int32 QueueIndex) override { // this will not work under arbitrary circumstances. For example you should not attempt to stop threads unless they are known to be idle. if (!Queue(0).StallRestartEvent) { return; } if (QueueIndex == -1) { // we are shutting down checkThreadGraph(Queue(0).StallRestartEvent); // make sure we are started up checkThreadGraph(Queue(1).StallRestartEvent); // make sure we are started up Queue(0).QuitForShutdown = true; Queue(1).QuitForShutdown = true; Queue(0).StallRestartEvent->Trigger(); Queue(1).StallRestartEvent->Trigger(); } else { checkThreadGraph(Queue(QueueIndex).StallRestartEvent); // make sure we are started up Queue(QueueIndex).QuitForReturn = true; } } virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task) override { TestRandomizedThreads(); checkThreadGraph(Task && Queue(QueueIndex).StallRestartEvent); // make sure we are started up uint32 PriIndex = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn) ? 0 : 1; int32 ThreadToStart = Queue(QueueIndex).StallQueue.Push(Task, PriIndex); if (ThreadToStart >= 0) { QUICK_SCOPE_CYCLE_COUNTER(STAT_TaskGraph_EnqueueFromOtherThread_Trigger); checkThreadGraph(ThreadToStart == 0); TASKGRAPH_SCOPE_CYCLE_COUNTER(1, STAT_TaskGraph_EnqueueFromOtherThread_Trigger); Queue(QueueIndex).StallRestartEvent->Trigger(); return true; } return false; } virtual bool IsProcessingTasks(int32 QueueIndex) override { return !!Queue(QueueIndex).RecursionGuard; } private: // 線程任務隊列. struct FThreadTaskQueue { FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> StallQueue; // 阻塞的任務隊列. uint32 RecursionGuard; // 防止循環(遞歸)調用. bool QuitForReturn; // 是否請求退出. bool QuitForShutdown; // 是否請求關閉. FEvent* StallRestartEvent; // 當線程滿載時的阻塞事件. }; FORCEINLINE FThreadTaskQueue& Queue(int32 QueueIndex) { checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues); return Queues[QueueIndex]; } FORCEINLINE const FThreadTaskQueue& Queue(int32 QueueIndex) const { checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues); return Queues[QueueIndex]; } FThreadTaskQueue Queues[ENamedThreads::NumQueues]; // 帶名字線程專用的任務隊列. };
FTaskThreadAnyThread用於處理無名線程的任務,因爲無名線程有不少個,因此處理任務時和FNamedTaskThread有所不一樣:
class FTaskThreadAnyThread : public FTaskThreadBase { public: virtual void ProcessTasksUntilQuit(int32 QueueIndex) override { if (PriorityIndex != (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift)) { FMemory::SetupTLSCachesOnCurrentThread(); } check(!QueueIndex); do { // 處理任務 ProcessTasks(); } while (!Queue.QuitForShutdown && FPlatformProcess::SupportsMultithreading()); // @Hack - quit now when running with only one thread. } virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override { if (!FPlatformProcess::SupportsMultithreading()) { // 處理任務 return ProcessTasks(); } else { check(0); return 0; } } (......) private: #if UE_EXTERNAL_PROFILING_ENABLED static inline const TCHAR* ThreadPriorityToName(int32 PriorityIdx) { PriorityIdx <<= ENamedThreads::ThreadPriorityShift; if (PriorityIdx == ENamedThreads::HighThreadPriority) { return TEXT("Task Thread HP"); // 高優先級的工做線程 } else if (PriorityIdx == ENamedThreads::NormalThreadPriority) { return TEXT("Task Thread NP"); // 普通優先級的工做線程 } else if (PriorityIdx == ENamedThreads::BackgroundThreadPriority) { return TEXT("Task Thread BP"); // 後臺優先級的工做線程 } else { return TEXT("Task Thread Unknown Priority"); } } #endif // 此處的處理任務與FNamedTaskThread有區別, 在於獲取任務的方式不同, 是從TaskGraph系統中的無名任務隊列獲取任務的. uint64 ProcessTasks() { LLM_SCOPE(ELLMTag::TaskGraphTasksMisc); TStatId StallStatId; bool bCountAsStall = true; uint64 ProcessedTasks = 0; (......) verify(++Queue.RecursionGuard == 1); bool bDidStall = false; while (1) { // 從TaskGraph系統中的無名任務隊列獲取任務的. FBaseGraphTask* Task = FindWork(); if (!Task) { (......) TestRandomizedThreads(); if (FPlatformProcess::SupportsMultithreading()) { FScopeCycleCounter Scope(StallStatId); Queue.StallRestartEvent->Wait(MAX_uint32, bCountAsStall); bDidStall = true; } if (Queue.QuitForShutdown || !FPlatformProcess::SupportsMultithreading()) { break; } TestRandomizedThreads(); (......) continue; } TestRandomizedThreads(); (......) bDidStall = false; Task->Execute(NewTasks, ENamedThreads::Type(ThreadId)); ProcessedTasks++; TestRandomizedThreads(); if (Queue.bStallForTuning) { { FScopeLock Lock(&Queue.StallForTuning); } } } verify(!--Queue.RecursionGuard); return ProcessedTasks; } // 任務隊列數據. struct FThreadTaskQueue { FEvent* StallRestartEvent; uint32 RecursionGuard; bool QuitForShutdown; bool bStallForTuning; FCriticalSection StallForTuning; // 阻塞臨界區 }; // 從TaskGraph系統中獲取任務. FBaseGraphTask* FindWork() { return FTaskGraphImplementation::Get().FindWork(ThreadId); } FThreadTaskQueue Queue; // 任務隊列, 只有第一個用於無名線程. int32 PriorityIndex; };
在理解TaskGraph的實現和使用以前,有必要理解ENamedThreads相關的機制。ENamedThreads是一個命名空間,此空間內提供了編解碼線程、優先級的操做。它的聲明和解析以下:
namespace ENamedThreads { enum Type : int32 { UnusedAnchor = -1, // ----專用(帶名字的)線程---- #if STATS StatsThread, // 統計線程 #endif RHIThread, // RHI線程 AudioThread, // 音頻線程 GameThread, // 遊戲線程 ActualRenderingThread = GameThread + 1, // 實際渲染線程. GetRenderingThread()獲取的渲染多是實際渲染線程也多是遊戲線程. AnyThread = 0xff, // 任意線程(未知線程, 無名線程) // ----隊列索引和優先級---- MainQueue = 0x000, // 主隊列 LocalQueue = 0x100, // 局部隊列 NumQueues = 2, ThreadIndexMask = 0xff, QueueIndexMask = 0x100, QueueIndexShift = 8, // ----隊列任務索引、優先級---- NormalTaskPriority = 0x000, // 普通任務優先級 HighTaskPriority = 0x200, // 高任務優先級 NumTaskPriorities = 2, TaskPriorityMask = 0x200, TaskPriorityShift = 9, // ----線程優先級---- NormalThreadPriority = 0x000, // 普通線程優先級 HighThreadPriority = 0x400, // 高線程優先級 BackgroundThreadPriority = 0x800, // 後臺線程優先級 NumThreadPriorities = 3, ThreadPriorityMask = 0xC00, ThreadPriorityShift = 10, // 組合標記 #if STATS StatsThread_Local = StatsThread | LocalQueue, #endif GameThread_Local = GameThread | LocalQueue, ActualRenderingThread_Local = ActualRenderingThread | LocalQueue, AnyHiPriThreadNormalTask = AnyThread | HighThreadPriority | NormalTaskPriority, AnyHiPriThreadHiPriTask = AnyThread | HighThreadPriority | HighTaskPriority, AnyNormalThreadNormalTask = AnyThread | NormalThreadPriority | NormalTaskPriority, AnyNormalThreadHiPriTask = AnyThread | NormalThreadPriority | HighTaskPriority, AnyBackgroundThreadNormalTask = AnyThread | BackgroundThreadPriority | NormalTaskPriority, AnyBackgroundHiPriTask = AnyThread | BackgroundThreadPriority | HighTaskPriority, }; struct FRenderThreadStatics { private: // 存儲了渲染線程,注意是原子操做類型。 static CORE_API TAtomic<Type> RenderThread; static CORE_API TAtomic<Type> RenderThread_Local; }; // ----設置和獲取渲染線程接口---- Type GetRenderThread(); Type GetRenderThread_Local(); void SetRenderThread(Type Thread); void SetRenderThread_Local(Type Thread); extern CORE_API int32 bHasBackgroundThreads; // 是否有後臺線程 extern CORE_API int32 bHasHighPriorityThreads; // 是否有高優先級線程 // ----設置和獲取線程索引、線程優先級、任務優先級接口---- Type GetThreadIndex(Type ThreadAndIndex); int32 GetQueueIndex(Type ThreadAndIndex); int32 GetTaskPriority(Type ThreadAndIndex); int32 GetThreadPriorityIndex(Type ThreadAndIndex); Type SetPriorities(Type ThreadAndIndex, Type ThreadPriority, Type TaskPriority); Type SetPriorities(Type ThreadAndIndex, int32 PriorityIndex, bool bHiPri); Type SetThreadPriority(Type ThreadAndIndex, Type ThreadPriority); Type SetTaskPriority(Type ThreadAndIndex, Type TaskPriority); }
上面提到了不少任務類型,本節才真正涉及這些任務的管理器和工廠FTaskGraphInterface。FTaskGraphInterface就是任務圖的管理者,提供了任務的操做接口:
class FTaskGraphInterface { virtual void QueueTask(class FBaseGraphTask* Task, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) = 0; public: // FTaskGraphInterface對象操做接口 static CORE_API void Startup(int32 NumThreads); static CORE_API void Shutdown(); static CORE_API bool IsRunning(); static CORE_API FTaskGraphInterface& Get(); // 線程操做接口. virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue = false) = 0; virtual int32 GetNumWorkerThreads() = 0; virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) = 0; virtual void AttachToThread(ENamedThreads::Type CurrentThread)=0; virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread)=0; // 任務操做接口. virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread)=0; virtual void RequestReturn(ENamedThreads::Type CurrentThread)=0; virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)=0; virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask)=0; void WaitUntilTaskCompletes(const FGraphEventRef& Task, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread); void TriggerEventWhenTaskCompletes(FEvent* InEvent, const FGraphEventRef& Task, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask); virtual void AddShutdownCallback(TFunction<void()>& Callback) = 0; static void BroadcastSlow_OnlyUseForSpecialPurposes(bool bDoTaskThreads, bool bDoBackgroundThreads, TFunction<void(ENamedThreads::Type CurrentThread)>& Callback); };
FTaskGraphInterface的實現是在FTaskGraphImplementation類中,FTaskGraphImplementation採用了特殊的線程對象WorkerThreads(工做線程)來做爲執行的載體,固然若是是專用的(帶名字的線程,如GameThread、RHI、ActualRenderingThread)線程,則會進入專用的任務隊列。因爲它的實現細節不少,後面再展開討論。
FTaskGraphImplementation繼承並實現了FTaskGraphInterface的接口,部分接口和實現以下:
// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp class FTaskGraphImplementation : public FTaskGraphInterface { public: static FTaskGraphImplementation& Get(); // 構造函數, 計算任務線程數量, 建立專用線程和無名線程等. FTaskGraphImplementation(int32) { bCreatedHiPriorityThreads = !!ENamedThreads::bHasHighPriorityThreads; bCreatedBackgroundPriorityThreads = !!ENamedThreads::bHasBackgroundThreads; int32 MaxTaskThreads = MAX_THREADS; // 最大任務線程數量默認是83. int32 NumTaskThreads = FPlatformMisc::NumberOfWorkerThreadsToSpawn(); // 根據硬件核心數量獲取任務線程數量. // 處理不能支持多線程的平臺. if (!FPlatformProcess::SupportsMultithreading()) { MaxTaskThreads = 1; NumTaskThreads = 1; LastExternalThread = (ENamedThreads::Type)(ENamedThreads::ActualRenderingThread - 1); bCreatedHiPriorityThreads = false; bCreatedBackgroundPriorityThreads = false; ENamedThreads::bHasBackgroundThreads = 0; ENamedThreads::bHasHighPriorityThreads = 0; } else { LastExternalThread = ENamedThreads::ActualRenderingThread; } // 專用線程數量 NumNamedThreads = LastExternalThread + 1; // 計算工做線程集數量, 與是否開啓線程高優先級、是否建立後臺優先級線程有關。 NumTaskThreadSets = 1 + bCreatedHiPriorityThreads + bCreatedBackgroundPriorityThreads; // 計算真正須要的任務線程數量, 最大不超過83個. NumThreads = FMath::Max<int32>(FMath::Min<int32>(NumTaskThreads * NumTaskThreadSets + NumNamedThreads, MAX_THREADS), NumNamedThreads + 1); NumThreads = FMath::Min(NumThreads, NumNamedThreads + NumTaskThreads * NumTaskThreadSets); NumTaskThreadsPerSet = (NumThreads - NumNamedThreads) / NumTaskThreadSets; ReentrancyCheck.Increment(); // just checking for reentrancy PerThreadIDTLSSlot = FPlatformTLS::AllocTlsSlot(); // 建立全部任務線程. for (int32 ThreadIndex = 0; ThreadIndex < NumThreads; ThreadIndex++) { check(!WorkerThreads[ThreadIndex].bAttached); // reentrant? // 根據是否專用線程分別建立線程. bool bAnyTaskThread = ThreadIndex >= NumNamedThreads; if (bAnyTaskThread) { WorkerThreads[ThreadIndex].TaskGraphWorker = new FTaskThreadAnyThread(ThreadIndexToPriorityIndex(ThreadIndex)); } else { WorkerThreads[ThreadIndex].TaskGraphWorker = new FNamedTaskThread; } WorkerThreads[ThreadIndex].TaskGraphWorker->Setup(ENamedThreads::Type(ThreadIndex), PerThreadIDTLSSlot, &WorkerThreads[ThreadIndex]); } TaskGraphImplementationSingleton = this; // 賦值this到TaskGraphImplementationSingleton, 以便外部可獲取. // 設置無名線程的屬性. for (int32 ThreadIndex = LastExternalThread + 1; ThreadIndex < NumThreads; ThreadIndex++) { FString Name; const ANSICHAR* GroupName = "TaskGraphNormal"; int32 Priority = ThreadIndexToPriorityIndex(ThreadIndex); EThreadPriority ThreadPri; uint64 Affinity = FPlatformAffinity::GetTaskGraphThreadMask(); if (Priority == 1) { Name = FString::Printf(TEXT("TaskGraphThreadHP %d"), ThreadIndex - (LastExternalThread + 1)); GroupName = "TaskGraphHigh"; ThreadPri = TPri_SlightlyBelowNormal; // we want even hi priority tasks below the normal threads // If the platform defines FPlatformAffinity::GetTaskGraphHighPriorityTaskMask then use it if (FPlatformAffinity::GetTaskGraphHighPriorityTaskMask() != 0xFFFFFFFFFFFFFFFF) { Affinity = FPlatformAffinity::GetTaskGraphHighPriorityTaskMask(); } } else if (Priority == 2) { Name = FString::Printf(TEXT("TaskGraphThreadBP %d"), ThreadIndex - (LastExternalThread + 1)); GroupName = "TaskGraphLow"; ThreadPri = TPri_Lowest; // If the platform defines FPlatformAffinity::GetTaskGraphBackgroundTaskMask then use it if ( FPlatformAffinity::GetTaskGraphBackgroundTaskMask() != 0xFFFFFFFFFFFFFFFF ) { Affinity = FPlatformAffinity::GetTaskGraphBackgroundTaskMask(); } } else { Name = FString::Printf(TEXT("TaskGraphThreadNP %d"), ThreadIndex - (LastExternalThread + 1)); ThreadPri = TPri_BelowNormal; // we want normal tasks below normal threads like the game thread } // 計算線程棧大小. #if WITH_EDITOR uint32 StackSize = 1024 * 1024; #elif ( UE_BUILD_SHIPPING || UE_BUILD_TEST ) uint32 StackSize = 384 * 1024; #else uint32 StackSize = 512 * 1024; #endif // 真正地建立工做線程的執行線程. WorkerThreads[ThreadIndex].RunnableThread = FRunnableThread::Create(&Thread(ThreadIndex), *Name, StackSize, ThreadPri, Affinity); // these are below normal threads so that they sleep when the named threads are active WorkerThreads[ThreadIndex].bAttached = true; if (WorkerThreads[ThreadIndex].RunnableThread) { TRACE_SET_THREAD_GROUP(WorkerThreads[ThreadIndex].RunnableThread->GetThreadID(), GroupName); } } } // 入隊任務. virtual void QueueTask(FBaseGraphTask* Task, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type InCurrentThreadIfKnown = ENamedThreads::AnyThread) final override { TASKGRAPH_SCOPE_CYCLE_COUNTER(2, STAT_TaskGraph_QueueTask); if (ENamedThreads::GetThreadIndex(ThreadToExecuteOn) == ENamedThreads::AnyThread) { TASKGRAPH_SCOPE_CYCLE_COUNTER(3, STAT_TaskGraph_QueueTask_AnyThread); // 多線程支持下的處理. if (FPlatformProcess::SupportsMultithreading()) { // 處理優先級. uint32 TaskPriority = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn); int32 Priority = ENamedThreads::GetThreadPriorityIndex(Task->ThreadToExecuteOn); if (Priority == (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift) && (!bCreatedBackgroundPriorityThreads || !ENamedThreads::bHasBackgroundThreads)) { Priority = ENamedThreads::NormalThreadPriority >> ENamedThreads::ThreadPriorityShift; // we don't have background threads, promote to normal TaskPriority = ENamedThreads::NormalTaskPriority >> ENamedThreads::TaskPriorityShift; // demote to normal task pri } else if (Priority == (ENamedThreads::HighThreadPriority >> ENamedThreads::ThreadPriorityShift) && (!bCreatedHiPriorityThreads || !ENamedThreads::bHasHighPriorityThreads)) { Priority = ENamedThreads::NormalThreadPriority >> ENamedThreads::ThreadPriorityShift; // we don't have hi priority threads, demote to normal TaskPriority = ENamedThreads::HighTaskPriority >> ENamedThreads::TaskPriorityShift; // promote to hi task pri } uint32 PriIndex = TaskPriority ? 0 : 1; check(Priority >= 0 && Priority < MAX_THREAD_PRIORITIES); { TASKGRAPH_SCOPE_CYCLE_COUNTER(4, STAT_TaskGraph_QueueTask_IncomingAnyThreadTasks_Push); // 將任務壓入待執行隊列, 且得到並執行可執行的任務索引(可能無). int32 IndexToStart = IncomingAnyThreadTasks[Priority].Push(Task, PriIndex); if (IndexToStart >= 0) { StartTaskThread(Priority, IndexToStart); } } return; } else { ThreadToExecuteOn = ENamedThreads::GameThread; } } // 如下是不支持多線程的處理. ENamedThreads::Type CurrentThreadIfKnown; if (ENamedThreads::GetThreadIndex(InCurrentThreadIfKnown) == ENamedThreads::AnyThread) { CurrentThreadIfKnown = GetCurrentThread(); } else { CurrentThreadIfKnown = ENamedThreads::GetThreadIndex(InCurrentThreadIfKnown); checkThreadGraph(CurrentThreadIfKnown == ENamedThreads::GetThreadIndex(GetCurrentThread())); } { int32 QueueToExecuteOn = ENamedThreads::GetQueueIndex(ThreadToExecuteOn); ThreadToExecuteOn = ENamedThreads::GetThreadIndex(ThreadToExecuteOn); FTaskThreadBase* Target = &Thread(ThreadToExecuteOn); if (ThreadToExecuteOn == ENamedThreads::GetThreadIndex(CurrentThreadIfKnown)) { Target->EnqueueFromThisThread(QueueToExecuteOn, Task); } else { Target->EnqueueFromOtherThread(QueueToExecuteOn, Task); } } } virtual int32 GetNumWorkerThreads() final override; virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue) final override; virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) final override; // 將當前線程導入到指定Index. virtual void AttachToThread(ENamedThreads::Type CurrentThread) final override; // ----處理任務接口---- virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread) final override; virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread) final override; virtual void RequestReturn(ENamedThreads::Type CurrentThread) final override; virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) final override; virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask) final override; virtual void AddShutdownCallback(TFunction<void()>& Callback); // ----任務調度接口---- // 開啓指定優先級和索引的任務線程. void StartTaskThread(int32 Priority, int32 IndexToStart); void StartAllTaskThreads(bool bDoBackgroundThreads); FBaseGraphTask* FindWork(ENamedThreads::Type ThreadInNeed); void StallForTuning(int32 Index, bool Stall); void SetTaskThreadPriorities(EThreadPriority Pri); private: // 獲取指定索引的任務線程引用. FTaskThreadBase& Thread(int32 Index) { checkThreadGraph(Index >= 0 && Index < NumThreads); checkThreadGraph(WorkerThreads[Index].TaskGraphWorker->GetThreadId() == Index); return *WorkerThreads[Index].TaskGraphWorker; } // 獲取當前線程索引. ENamedThreads::Type GetCurrentThread(); int32 ThreadIndexToPriorityIndex(int32 ThreadIndex); enum { MAX_THREADS = 26 * (CREATE_HIPRI_TASK_THREADS + CREATE_BACKGROUND_TASK_THREADS + 1) + ENamedThreads::ActualRenderingThread + 1, MAX_THREAD_PRIORITIES = 3 }; FWorkerThread WorkerThreads[MAX_THREADS]; // 全部工做線程(任務線程)對象數組. int32 NumThreads; // 實際上被使用的線程數量. int32 NumNamedThreads; // 專用線程數量. int32 NumTaskThreadSets;// 任務線程集合數量. int32 NumTaskThreadsPerSet; // 每一個集合擁有的任務線程數量. bool bCreatedHiPriorityThreads; bool bCreatedBackgroundPriorityThreads; ENamedThreads::Type LastExternalThread; FThreadSafeCounter ReentrancyCheck; uint32 PerThreadIDTLSSlot; TArray<TFunction<void()> > ShutdownCallbacks; // 銷燬前的回調. FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> IncomingAnyThreadTasks[MAX_THREAD_PRIORITIES]; };
總結起來,TaskGraph會根據線程優先級、是否啓用後臺線程建立不一樣的工做線程集合,而後建立它們的FWorkerThread對象。入隊任務時,會將任務Push到任務列表IncomingAnyThreadTasks(類型是FStallingTaskQueue,線程安全的無鎖的鏈表)中,並取出可執行的任務索引,根據任務的屬性(但願在哪一個線程執行、優先級、任務索引)啓用對應的工做線程去執行。
TaskGraph涉及的工做線程FWorkerThread聲明以下:
struct FWorkerThread { FTaskThreadBase* TaskGraphWorker; // 所在的FTaskThread對象(被FTaskThread對象擁有) FRunnableThread* RunnableThread; // 真正執行任務的可運行線程. bool bAttached; // 是否附加的線程.(通常用於專用線程) };
因而可知,TaskGraph最終也是藉助FRunnableThread來執行任務。TaskGraph系統總算是和FRunnableThread聯繫起來,造成了閉環。
至此,終於將TaskGraph體系的主幹脈絡闡述完了,固然,還有不少技術細節(如同步事件、觸發細節、調度算法、無鎖鏈表以及部分概念)並無涉及,這些就留給讀者本身去研讀UE源碼探索了。
前面作了大量的基礎鋪墊,終於回到了主題,講UE的多線程渲染相關的知識。
UE的場景和渲染模塊涉及到概念很是多,主要類型和解析以下:
類型 | 解析 |
---|---|
UWorld | 包含了一組能夠相互交互的Actor和組件的集合,多個關卡(Level)能夠被加載進UWorld或從UWorld卸載。能夠同時存在多個UWorld實例。 |
ULevel | 關卡,存儲着一組Actor和組件,而且存儲在同一個文件。 |
USceneComponent | 場景組件,是全部能夠被加入到場景的物體的父類,好比燈光、模型、霧等。 |
UPrimitiveComponent | 圖元組件,是全部可渲染或擁有物理模擬的物體父類。是CPU層裁剪的最小粒度單位, |
ULightComponent | 光源組件,是全部光源類型的父類。 |
FScene | 是UWorld在渲染模塊的表明。只有加入到FScene的物體纔會被渲染器感知到。渲染線程擁有FScene的全部狀態(遊戲線程不可直接修改)。 |
FPrimitiveSceneProxy | 圖元場景代理,是UPrimitiveComponent在渲染器的表明,鏡像了UPrimitiveComponent在渲染線程的狀態。 |
FPrimitiveSceneInfo | 渲染器內部狀態(描述了FRendererModule的實現),至關於融合了UPrimitiveComponent and FPrimitiveSceneProxy。只存在渲染器模塊,因此引擎模塊沒法感知到它的存在。 |
FSceneView | 描述了FScene內的單個視圖(view),同個FScene容許有多個view,換言之,一個場景能夠被多個view繪製,或者多個view同時被繪製。每一幀都會建立新的view實例。 |
FViewInfo | view在渲染器的內部表明,只存在渲染器模塊,引擎模塊不可見。 |
FSceneViewState | 存儲了有關view的渲染器私有信息,這些信息須要被跨幀訪問。在Game實例,每一個ULocalPlayer擁有一個FSceneViewState實例。 |
FSceneRenderer | 每幀都會被建立,封裝幀間臨時數據。下派生FDeferredShadingSceneRenderer(延遲着色場景渲染器)和FMobileSceneRenderer(移動端場景渲染器),分別表明PC和移動端的默認渲染器。 |
UE爲告終構清晰,減小模塊之間的依賴,加速迭代速度,劃分了不少模塊,最主要的有引擎模塊、渲染器模塊、核心、RHI、插件等等。上一小節提到了不少概念和類型,它們有些存在於引擎模塊(Engine Module),有些存在於渲染器模塊(Renderer Module),具體以下表:
Engine Module | Renderer Module |
---|---|
UWorld | FScene |
UPrimitiveComponent / FPrimitiveSceneProxy | FPrimitiveSceneInfo |
FSceneView | FViewInfo |
ULocalPlayer | FSceneViewState |
ULightComponent / FLightSceneProxy | FLightSceneInfo |
遊戲線程的對象一般作邏輯更新,在內存中有一份持久的數據,爲了不遊戲線程和渲染線程產生競爭條件,會在渲染線程額外存儲一分內存拷貝,而且使用的是另外的類型,如下是UE比較常見的類型映射關係(遊戲線程對象以U開頭,渲染線程以F開頭):
Game Thread | Rendering Thread |
---|---|
UWorld | FScene |
UPrimitiveComponent | FPrimitiveSceneProxy / FPrimitiveSceneInfo |
- | FSceneView / FViewInfo |
ULocalPlayer | FSceneViewState |
ULightComponent | FLightSceneProxy / FLightSceneInfo |
遊戲線程表明通常由遊戲遊戲線程操做,渲染線程表明主要由渲染線程操做。若是嘗試跨線程操做數據,將會引起不可預料的結果,產生競爭條件。
/** SceneProxy在註冊進場景時,會在遊戲線程中被構造和傳遞數據。 */ FStaticMeshSceneProxy::FStaticMeshSceneProxy(UStaticMeshComponent* InComponent): FPrimitiveSceneProxy(...), Owner(InComponent->GetOwner()) <======== 此處將AActor指針被緩存 ... /** SceneProxy的DrawDynamicElements將被渲染器在渲染線程中調用 */ void FStaticMeshSceneProxy::DrawDynamicElements(...) { if (Owner->AnyProperty) <========== 將會引起競爭條件! 遊戲線程擁有AActor、UObject的全部狀態!!而且UObject對象可能被GC掉,此時再訪問會引發程序崩潰!! }
部分表明比較特殊,如FPrimitiveSceneProxy、FLightSceneProxy ,這些場景代理本屬於引擎模塊,但又屬於渲染線程專屬對象,說明它們是鏈接遊戲線程和渲染線程的橋樑,是線程間傳遞數據的工具人。
默認狀況下,UE存在遊戲線程(Game Thread)、渲染線程(Render Thread)、RHI線程(RHI Thread),它們都獨立地運行在專門的線程上(FRunnableThread)。
遊戲線程經過某些接口向渲染線程的Queue入隊回調接口,以便渲染線程稍後運行時,從渲染線程的Queue獲取回調,一個個地執行,從而生成了Command List。
渲染線程做爲前端(frontend)產生的Command List是平臺無關的,是抽象的圖形API調用;而RHI線程做爲後端(backtend)會執行和轉換渲染線程的Command List成爲指定圖形API的調用(稱爲Graphical Command),並提交到GPU執行。這些線程處理的數據一般是不一樣幀的,譬如遊戲線程處理N幀數據,渲染線程和RHI線程處理N-1幀數據。
但也存在例外,好比渲染線程和RHI線程運行很快,幾乎不存在延遲,這種狀況下,遊戲線程處理N幀,而渲染線程可能處理N或N-1幀,RHI線程也可能在轉換N或N-1幀。可是,渲染線程不能落後遊戲線程一幀,不然遊戲線程會卡住,直到渲染線程處理全部指令。
除此以外,渲染指令是能夠並行地被生成,RHI線程也能夠並行地轉換這些指令,以下所示:
UE4並行生成Command list示意圖。
開啓多線程渲染帶來的收益是幀率更高,幀間變化頻率下降(幀率更穩定)。以Fortnite(堡壘之夜)移動端爲例,在開啓RHI線程以前,渲染線程急劇地上下波動,而加了RHI線程以後,波動平緩許多,和遊戲線程基本保持一致,幀率也提高很多:
遊戲線程被稱爲主線程,是引擎運行的心臟,承載主要的遊戲邏輯、運行流程的工做,也是其它線程的數據發起者。
遊戲線程的建立是運行程序入口的線程,由系統啓動進程時被同時建立的(由於進程至少須要一個線程來工做),在引擎啓動時直接存儲到全局變量中,且稍後會設置到TaskGraph系統中:
// Engine\Source\Runtime\Launch\Private\LaunchEngineLoop.cpp int32 FEngineLoop::PreInitPreStartupScreen(const TCHAR* CmdLine) { (......) // 獲取當前線程id, 存儲到全局變量中. GGameThreadId = FPlatformTLS::GetCurrentThreadId(); GIsGameThreadIdInitialized = true; FPlatformProcess::SetThreadAffinityMask(FPlatformAffinity::GetMainGameMask()); // 設置遊戲線程數據(但不少平臺都是空的實現體) FPlatformProcess::SetupGameThread(); (......) if (bCreateTaskGraphAndThreadPools) { SCOPED_BOOT_TIMING("FTaskGraphInterface::Startup"); FTaskGraphInterface::Startup(FPlatformMisc::NumberOfCores()); // 將當前線程(主線程)附加到TaskGraph的GameThread命名插槽中. 這樣主線程便和TaskGraph聯動了起來. FTaskGraphInterface::Get().AttachToThread(ENamedThreads::GameThread); } }
以上代碼也說明:主線程、遊戲線程和TaskGraph系統的ENamedThreads::GameThread實際上是一回事,都是同一個線程!
通過上面的初始化和設置後,其它地方就能夠經過TaskGraph系統並行地處理任務了,也能夠訪問全局變量,以便判斷遊戲線程是否初始化完,當前線程是否遊戲線程:
bool IsInGameThread() { return GIsGameThreadIdInitialized && FPlatformTLS::GetCurrentThreadId() == GGameThreadId; }
渲染線程與遊戲不一樣,是一條專門用於生成渲染指令和渲染邏輯的獨立線程。RenderingThread.h聲明瞭所有對外的接口,部分以下:
// Engine\Source\Runtime\RenderCore\Public\RenderingThread.h // 是否啓用了獨立的渲染線程, 若是爲false, 則全部渲染命令會被當即執行, 而不是放入渲染命令隊列. extern RENDERCORE_API bool GIsThreadedRendering; // 渲染線程是否應該被建立. 一般被命令行參數或ToggleRenderingThread控制檯參數設置. extern RENDERCORE_API bool GUseThreadedRendering; // 是否開啓RHI線程 extern RENDERCORE_API void SetRHIThreadEnabled(bool bEnableDedicatedThread, bool bEnableRHIOnTaskThreads); (......) // 開啓渲染線程. extern RENDERCORE_API void StartRenderingThread(); // 中止渲染線程. extern RENDERCORE_API void StopRenderingThread(); // 檢查渲染線程是否健康(是否Crash), 若是crash, 則會用UE_Log輸出日誌. extern RENDERCORE_API void CheckRenderingThreadHealth(); // 檢查渲染線程是否健康(是否Crash) extern RENDERCORE_API bool IsRenderingThreadHealthy(); // 增長一個必須在下一個場景繪製前或flush渲染命令前完成的任務. extern RENDERCORE_API void AddFrameRenderPrerequisite(const FGraphEventRef& TaskToAdd); // 手機幀渲染前序任務, 保證全部渲染命令被入隊. extern RENDERCORE_API void AdvanceFrameRenderPrerequisite(); // 等待全部渲染線程的渲染命令被執行完畢. 會卡住遊戲線程, 只能被遊戲線程調用. extern RENDERCORE_API void FlushRenderingCommands(bool bFlushDeferredDeletes = false); extern RENDERCORE_API void FlushPendingDeleteRHIResources_GameThread(); extern RENDERCORE_API void FlushPendingDeleteRHIResources_RenderThread(); extern RENDERCORE_API void TickRenderingTickables(); extern RENDERCORE_API void StartRenderCommandFenceBundler(); extern RENDERCORE_API void StopRenderCommandFenceBundler(); (......)
RenderingThread.h還有一個很是重要的宏ENQUEUE_RENDER_COMMAND
,它的做用是向渲染線程入隊渲染指令。下面是它的聲明和實現:
// 向渲染線程入隊渲染指令, Type指明瞭渲染操做的名字. #define ENQUEUE_RENDER_COMMAND(Type) \ struct Type##Name \ { \ static const char* CStr() { return #Type; } \ static const TCHAR* TStr() { return TEXT(#Type); } \ }; \ EnqueueUniqueRenderCommand<Type##Name>
上面最後一句使用了EnqueueUniqueRenderCommand
命令,繼續追蹤之:
// TSTR是渲染命令名字, LAMBDA是回調函數. template<typename TSTR, typename LAMBDA> FORCEINLINE_DEBUGGABLE void EnqueueUniqueRenderCommand(LAMBDA&& Lambda) { typedef TEnqueueUniqueRenderCommandType<TSTR, LAMBDA> EURCType; // 若是在渲染線程內直接執行回調而不入隊渲染命令. if (IsInRenderingThread()) { FRHICommandListImmediate& RHICmdList = GetImmediateCommandList_ForRenderCommand(); Lambda(RHICmdList); } else { // 須要在獨立的渲染線程執行 if (ShouldExecuteOnRenderThread()) { CheckNotBlockedOnRenderThread(); // 從GraphTask建立任務且在適當時候入隊渲染命令. TGraphTask<EURCType>::CreateTask().ConstructAndDispatchWhenReady(Forward<LAMBDA>(Lambda)); } else // 不在獨立的渲染線程執行, 則直接執行. { EURCType TempCommand(Forward<LAMBDA>(Lambda)); FScopeCycleCounter EURCMacro_Scope(TempCommand.GetStatId()); TempCommand.DoTask(ENamedThreads::GameThread, FGraphEventRef()); } } }
上面說明若是是有獨立的渲染線程,最終會將渲染命令入隊到TaskGraph的任務Queue中,等待合適的時機在渲染線程中被執行。其中TEnqueueUniqueRenderCommandType
就是專用於渲染命令的特殊TaskGraph任務類型,聲明以下:
class RENDERCORE_API FRenderCommand { public: // 全部渲染指令都必須在渲染線程執行. static ENamedThreads::Type GetDesiredThread() { check(!GIsThreadedRendering || ENamedThreads::GetRenderThread() != ENamedThreads::GameThread); return ENamedThreads::GetRenderThread(); } static ESubsequentsMode::Type GetSubsequentsMode() { return ESubsequentsMode::FireAndForget; } }; template<typename TSTR, typename LAMBDA> class TEnqueueUniqueRenderCommandType : public FRenderCommand { public: TEnqueueUniqueRenderCommandType(LAMBDA&& InLambda) : Lambda(Forward<LAMBDA>(InLambda)) {} // 正在執行任務. void DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent) { TRACE_CPUPROFILER_EVENT_SCOPE_ON_CHANNEL_STR(TSTR::TStr(), RenderCommandsChannel); FRHICommandListImmediate& RHICmdList = GetImmediateCommandList_ForRenderCommand(); Lambda(RHICmdList); } (......) private: LAMBDA Lambda; // 緩存渲染回調函數. };
爲了更好理解入隊渲染命令操做,舉個具體的例子,以增長燈光到場景爲例:
void FScene::AddLight(ULightComponent* Light) { (......) // Send a command to the rendering thread to add the light to the scene. FScene* Scene = this; FLightSceneInfo* LightSceneInfo = Proxy->LightSceneInfo; // 這裏入隊渲染指令, 以便在渲染線程將燈光數據傳遞到渲染器. ENQUEUE_RENDER_COMMAND(FAddLightCommand)( [Scene, LightSceneInfo](FRHICommandListImmediate& RHICmdList) { CSV_SCOPED_TIMING_STAT_EXCLUSIVE(Scene_AddLight); FScopeCycleCounter Context(LightSceneInfo->Proxy->GetStatId()); Scene->AddLightSceneInfo_RenderThread(LightSceneInfo); }); }
將ENQUEUE_RENDER_COMMAND(FAddLightCommand)
代入前面解析過的宏和模板,並展開,完整的代碼以下:
struct FAddLightCommandName { static const char* CStr() { return "FAddLightCommand"; } static const TCHAR* TStr() { return TEXT("FAddLightCommand"); } }; EnqueueUniqueRenderCommand<FAddLightCommandName>( [Scene, LightSceneInfo](FRHICommandListImmediate& RHICmdList) { CSV_SCOPED_TIMING_STAT_EXCLUSIVE(Scene_AddLight); FScopeCycleCounter Context(LightSceneInfo->Proxy->GetStatId()); Scene->AddLightSceneInfo_RenderThread(LightSceneInfo); }) { typedef TEnqueueUniqueRenderCommandType<FAddLightCommandName, LAMBDA> EURCType; // 若是在渲染線程內直接執行回調而不入隊渲染命令. if (IsInRenderingThread()) { FRHICommandListImmediate& RHICmdList = GetImmediateCommandList_ForRenderCommand(); Lambda(RHICmdList); } else { // 須要在獨立的渲染線程執行 if (ShouldExecuteOnRenderThread()) { CheckNotBlockedOnRenderThread(); // 從GraphTask建立任務且在適當時候入隊渲染命令. TGraphTask<EURCType>::CreateTask().ConstructAndDispatchWhenReady(Forward<LAMBDA>(Lambda)); } else // 不在獨立的渲染線程執行, 則直接執行. { EURCType TempCommand(Forward<LAMBDA>(Lambda)); FScopeCycleCounter EURCMacro_Scope(TempCommand.GetStatId()); TempCommand.DoTask(ENamedThreads::GameThread, FGraphEventRef()); } } }
FRenderingThread承載了渲染線程的主要工做,它的部分接口和實現代碼以下:
// Engine\Source\Runtime\RenderCore\Private\RenderingThread.cpp class FRenderingThread : public FRunnable { private: bool bAcquiredThreadOwnership; // 當沒有獨立的RHI線程時, 渲染線程將被其它線程捕獲. public: FEvent* TaskGraphBoundSyncEvent; // TaskGraph同步事件, 以便在主線程使用渲染線程以前就將渲染線程綁定到TaskGraph體系中. FRenderingThread() { bAcquiredThreadOwnership = false; // 獲取同步事件. TaskGraphBoundSyncEvent = FPlatformProcess::GetSynchEventFromPool(true); RHIFlushResources(); } // FRunnable interface. virtual bool Init(void) override { // 獲取當前線程ID到全局變量GRenderThreadId, 以便其它地方引用. GRenderThreadId = FPlatformTLS::GetCurrentThreadId(); // 處理線程捕獲關係. if (!IsRunningRHIInSeparateThread()) { bAcquiredThreadOwnership = true; RHIAcquireThreadOwnership(); } return true; } (......) virtual uint32 Run(void) override { // 設置TLS. FMemory::SetupTLSCachesOnCurrentThread(); // 設置渲染線程平臺相關的數據. FPlatformProcess::SetupRenderThread(); (......) { // 進入渲染線程主循環. RenderingThreadMain( TaskGraphBoundSyncEvent ); } FMemory::ClearAndDisableTLSCachesOnCurrentThread(); return 0; } };
可見它在運行以後會進入渲染線程邏輯,這裏再進入RenderingThreadMain代碼一探究竟:
void RenderingThreadMain( FEvent* TaskGraphBoundSyncEvent ) { LLM_SCOPE(ELLMTag::RenderingThreadMemory); // 將渲染線程和局部線程線程插槽設置成ActualRenderingThread和ActualRenderingThread_Local. ENamedThreads::Type RenderThread = ENamedThreads::Type(ENamedThreads::ActualRenderingThread); ENamedThreads::SetRenderThread(RenderThread); ENamedThreads::SetRenderThread_Local(ENamedThreads::Type(ENamedThreads::ActualRenderingThread_Local)); // 將當前線程附加到TaskGraph的RenderThread插槽中. FTaskGraphInterface::Get().AttachToThread(RenderThread); FPlatformMisc::MemoryBarrier(); // 觸發同步事件, 通知主線程渲染線程已經附加到TaskGraph, 已經準備好接收任務. if( TaskGraphBoundSyncEvent != NULL ) { TaskGraphBoundSyncEvent->Trigger(); } (......) // 渲染線程不一樣階段的處理. FCoreDelegates::PostRenderingThreadCreated.Broadcast(); check(GIsThreadedRendering); FTaskGraphInterface::Get().ProcessThreadUntilRequestReturn(RenderThread); FPlatformMisc::MemoryBarrier(); check(!GIsThreadedRendering); FCoreDelegates::PreRenderingThreadDestroyed.Broadcast(); (......) // 恢復線程線程到遊戲線程. ENamedThreads::SetRenderThread(ENamedThreads::GameThread); ENamedThreads::SetRenderThread_Local(ENamedThreads::GameThread_Local); FPlatformMisc::MemoryBarrier(); }
不過這裏還留有一個很大的疑問,那就是FRenderingThread只是獲取當前線程做爲渲染線程並附加到TaskGraph中,並無建立線程。那麼是哪裏建立的渲染線程呢?繼續追蹤,結果發現是在StartRenderingThread()
接口中建立了FRenderingThread實例,它的實現代碼以下(節選):
// Engine\Source\Runtime\RenderCore\Private\RenderingThread.cpp void StartRenderingThread() { (......) // Turn on the threaded rendering flag. GIsThreadedRendering = true; // 建立FRenderingThread實例. GRenderingThreadRunnable = new FRenderingThread(); // 建立渲染線程!! GRenderingThread = FRunnableThread::Create(GRenderingThreadRunnable, *BuildRenderingThreadName(ThreadCount), 0, FPlatformAffinity::GetRenderingThreadPriority(), FPlatformAffinity::GetRenderingThreadMask(), FPlatformAffinity::GetRenderingThreadFlags()); (......) // 開啓渲染命令的柵欄. FRenderCommandFence Fence; Fence.BeginFence(); Fence.Wait(); (......) }
若是繼續追蹤,會發現StartRenderingThread()
是在FEngineLoop::PreInitPostStartupScreen
中調用的。
至此,渲染線程的建立、初始化以及主要接口的實現都剖析完了。
RHI線程的工做是轉換渲染指令到指定圖形API,建立、上傳渲染資源到GPU。它的主要邏輯在FRHIThread中,實現代碼以下:
// Engine\Source\Runtime\RenderCore\Private\RenderingThread.cpp class FRHIThread : public FRunnable { public: FRunnableThread* Thread; // 所在的RHI線程. FRHIThread() : Thread(nullptr) { check(IsInGameThread()); } void Start() { // 開始時建立RHI線程. Thread = FRunnableThread::Create(this, TEXT("RHIThread"), 512 * 1024, FPlatformAffinity::GetRHIThreadPriority(), FPlatformAffinity::GetRHIThreadMask(), FPlatformAffinity::GetRHIThreadFlags() ); check(Thread); } virtual uint32 Run() override { LLM_SCOPE(ELLMTag::RHIMisc); // 初始化TLS FMemory::SetupTLSCachesOnCurrentThread(); // 將FRHIThread所在的RHI線程附加到askGraph體系中,並指定到ENamedThreads::RHIThread。 FTaskGraphInterface::Get().AttachToThread(ENamedThreads::RHIThread); // 啓動RHI線程,直到線程返回。 FTaskGraphInterface::Get().ProcessThreadUntilRequestReturn(ENamedThreads::RHIThread); // 清理TLS. FMemory::ClearAndDisableTLSCachesOnCurrentThread(); return 0; } // 單例接口。 static FRHIThread& Get() { static FRHIThread Singleton; // 使用了局部靜態變量,能夠保證線程安全。 return Singleton; } };
可見RHI線程不一樣於渲染線程,是直接在FRHIThread對象內建立實際的線程。而FRHIThread的建立也是在StartRenderingThread()
中:
void StartRenderingThread() { (......) if (GUseRHIThread_InternalUseOnly) { FRHICommandListExecutor::GetImmediateCommandList().ImmediateFlush(EImmediateFlushType::DispatchToRHIThread); if (!FTaskGraphInterface::Get().IsThreadProcessingTasks(ENamedThreads::RHIThread)) { // 建立FRHIThread實例並啓動它. FRHIThread::Get().Start(); } DECLARE_CYCLE_STAT(TEXT("Wait For RHIThread"), STAT_WaitForRHIThread, STATGROUP_TaskGraphTasks); // 建立RHI線程擁有者捕獲任務, 讓遊戲線程等待. FGraphEventRef CompletionEvent = TGraphTask<FOwnershipOfRHIThreadTask>::CreateTask(NULL, ENamedThreads::GameThread).ConstructAndDispatchWhenReady(true, GET_STATID(STAT_WaitForRHIThread)); QUICK_SCOPE_CYCLE_COUNTER(STAT_StartRenderingThread); // 讓遊戲線程或局部線程等待RHI線程處理(捕獲了線程擁有者, 大多數圖形API爲空)完畢. FTaskGraphInterface::Get().WaitUntilTaskCompletes(CompletionEvent, ENamedThreads::GameThread_Local); // 存儲RHI線程id. GRHIThread_InternalUseOnly = FRHIThread::Get().Thread; check(GRHIThread_InternalUseOnly); GIsRunningRHIInDedicatedThread_InternalUseOnly = true; GIsRunningRHIInSeparateThread_InternalUseOnly = true; GRHIThreadId = GRHIThread_InternalUseOnly->GetThreadID(); GRHICommandList.LatchBypass(); } (......) }
那麼渲染線程如何向RHI線程入隊任務呢?答案就在RHICommandList.h中:
// Engine\Source\Runtime\RHI\Public\RHICommandList.h // RHI命令父類 struct FRHICommandBase { FRHICommandBase* Next = nullptr; // 指向下一條RHI命令. // 執行RHI命令並銷燬. virtual void ExecuteAndDestruct(FRHICommandListBase& CmdList, FRHICommandListDebugContext& DebugContext) = 0; }; // RHI命令結構體 template<typename TCmd, typename NameType = FUnnamedRhiCommand> struct FRHICommand : public FRHICommandBase { (......) void ExecuteAndDestruct(FRHICommandListBase& CmdList, FRHICommandListDebugContext& Context) override final { (......) TCmd *ThisCmd = static_cast<TCmd*>(this); ThisCmd->Execute(CmdList); ThisCmd->~TCmd(); } }; // 向RHI線程發送RHI命令的宏. #define FRHICOMMAND_MACRO(CommandName) \ struct PREPROCESSOR_JOIN(CommandName##String, __LINE__) \ { \ static const TCHAR* TStr() { return TEXT(#CommandName); } \ }; \ struct CommandName final : public FRHICommand<CommandName, PREPROCESSOR_JOIN(CommandName##String, __LINE__)>
RHI線程的相關實現機制跟渲染線程類型,且更加簡潔。如下是它的使用示範:
// Engine\Source\Runtime\RHI\Public\RHICommandList.h FRHICOMMAND_MACRO(FRHICommandDrawPrimitive) { uint32 BaseVertexIndex; uint32 NumPrimitives; uint32 NumInstances; FORCEINLINE_DEBUGGABLE FRHICommandDrawPrimitive(uint32 InBaseVertexIndex, uint32 InNumPrimitives, uint32 InNumInstances) : BaseVertexIndex(InBaseVertexIndex) , NumPrimitives(InNumPrimitives) , NumInstances(InNumInstances) { } RHI_API void Execute(FRHICommandListBase& CmdList); }; // Engine\Source\Runtime\RHI\Public\RHICommandListCommandExecutes.inl void FRHICommandDrawPrimitive::Execute(FRHICommandListBase& CmdList) { RHISTAT(DrawPrimitive); INTERNAL_DECORATOR(RHIDrawPrimitive)(BaseVertexIndex, NumPrimitives, NumInstances); }
因而可知,全部的RHI指令都是預先聲明並實現好的,目前存在的RHI渲染指令類型達到近百種(以下),渲染線程建立這些聲明好的RHI指令便可在合適的被推入RHI線程隊列並被執行。
FRHICOMMAND_MACRO(FRHICommandUpdateGeometryCacheBuffer) FRHICOMMAND_MACRO(FRHISubmitFrameToEncoder) FRHICOMMAND_MACRO(FLocalRHICommand) FRHICOMMAND_MACRO(FRHISetSpectatorScreenTexture) FRHICOMMAND_MACRO(FRHISetSpectatorScreenModeTexturePlusEyeLayout) FRHICOMMAND_MACRO(FRHISyncFrameCommand) FRHICOMMAND_MACRO(FRHICommandStat) FRHICOMMAND_MACRO(FRHICommandRHIThreadFence) FRHICOMMAND_MACRO(FRHIAsyncComputeSubmitList) FRHICOMMAND_MACRO(FRHICommandWaitForAndSubmitSubListParallel) FRHICOMMAND_MACRO(FRHICommandWaitForAndSubmitSubList) FRHICOMMAND_MACRO(FRHICommandWaitForAndSubmitRTSubList) FRHICOMMAND_MACRO(FRHICommandSubmitSubList) FRHICOMMAND_MACRO(FRHICommandBeginUpdateMultiFrameResource) FRHICOMMAND_MACRO(FRHICommandEndUpdateMultiFrameResource) FRHICOMMAND_MACRO(FRHICommandBeginUpdateMultiFrameUAV) FRHICOMMAND_MACRO(FRHICommandEndUpdateMultiFrameUAV) FRHICOMMAND_MACRO(FRHICommandSetGPUMask) FRHICOMMAND_MACRO(FRHICommandWaitForTemporalEffect) FRHICOMMAND_MACRO(FRHICommandBroadcastTemporalEffect) FRHICOMMAND_MACRO(FRHICommandSetStencilRef) FRHICOMMAND_MACRO(FRHICommandDrawPrimitive) FRHICOMMAND_MACRO(FRHICommandDrawIndexedPrimitive) FRHICOMMAND_MACRO(FRHICommandSetBlendFactor) FRHICOMMAND_MACRO(FRHICommandSetStreamSource) FRHICOMMAND_MACRO(FRHICommandSetViewport) FRHICOMMAND_MACRO(FRHICommandSetStereoViewport) FRHICOMMAND_MACRO(FRHICommandSetScissorRect) FRHICOMMAND_MACRO(FRHICommandSetRenderTargets) FRHICOMMAND_MACRO(FRHICommandBeginRenderPass) FRHICOMMAND_MACRO(FRHICommandEndRenderPass) FRHICOMMAND_MACRO(FRHICommandNextSubpass) FRHICOMMAND_MACRO(FRHICommandBeginParallelRenderPass) FRHICOMMAND_MACRO(FRHICommandEndParallelRenderPass) FRHICOMMAND_MACRO(FRHICommandBeginRenderSubPass) FRHICOMMAND_MACRO(FRHICommandEndRenderSubPass) FRHICOMMAND_MACRO(FRHICommandBeginComputePass) FRHICOMMAND_MACRO(FRHICommandEndComputePass) FRHICOMMAND_MACRO(FRHICommandBindClearMRTValues) FRHICOMMAND_MACRO(FRHICommandSetGraphicsPipelineState) FRHICOMMAND_MACRO(FRHICommandAutomaticCacheFlushAfterComputeShader) FRHICOMMAND_MACRO(FRHICommandFlushComputeShaderCache) FRHICOMMAND_MACRO(FRHICommandDrawPrimitiveIndirect) FRHICOMMAND_MACRO(FRHICommandDrawIndexedIndirect) FRHICOMMAND_MACRO(FRHICommandDrawIndexedPrimitiveIndirect) FRHICOMMAND_MACRO(FRHICommandSetDepthBounds) FRHICOMMAND_MACRO(FRHICommandClearUAVFloat) FRHICOMMAND_MACRO(FRHICommandClearUAVUint) FRHICOMMAND_MACRO(FRHICommandCopyToResolveTarget) FRHICOMMAND_MACRO(FRHICommandCopyTexture) FRHICOMMAND_MACRO(FRHICommandResummarizeHTile) FRHICOMMAND_MACRO(FRHICommandTransitionTexturesDepth) FRHICOMMAND_MACRO(FRHICommandTransitionTextures) FRHICOMMAND_MACRO(FRHICommandTransitionTexturesArray) FRHICOMMAND_MACRO(FRHICommandTransitionTexturesPipeline) FRHICOMMAND_MACRO(FRHICommandTransitionTexturesArrayPipeline) FRHICOMMAND_MACRO(FRHICommandClearColorTexture) FRHICOMMAND_MACRO(FRHICommandClearDepthStencilTexture) FRHICOMMAND_MACRO(FRHICommandClearColorTextures) FRHICOMMAND_MACRO(FRHICommandSetGlobalUniformBuffers) FRHICOMMAND_MACRO(FRHICommandBuildLocalUniformBuffer) FRHICOMMAND_MACRO(FRHICommandBeginRenderQuery) FRHICOMMAND_MACRO(FRHICommandEndRenderQuery) FRHICOMMAND_MACRO(FRHICommandCalibrateTimers) FRHICOMMAND_MACRO(FRHICommandPollOcclusionQueries) FRHICOMMAND_MACRO(FRHICommandBeginScene) FRHICOMMAND_MACRO(FRHICommandEndScene) FRHICOMMAND_MACRO(FRHICommandBeginFrame) FRHICOMMAND_MACRO(FRHICommandEndFrame) FRHICOMMAND_MACRO(FRHICommandBeginDrawingViewport) FRHICOMMAND_MACRO(FRHICommandEndDrawingViewport) FRHICOMMAND_MACRO(FRHICommandInvalidateCachedState) FRHICOMMAND_MACRO(FRHICommandDiscardRenderTargets) FRHICOMMAND_MACRO(FRHICommandDebugBreak) FRHICOMMAND_MACRO(FRHICommandUpdateTextureReference) FRHICOMMAND_MACRO(FRHICommandUpdateRHIResources) FRHICOMMAND_MACRO(FRHICommandCopyBufferRegion) FRHICOMMAND_MACRO(FRHICommandCopyBufferRegions) FRHICOMMAND_MACRO(FRHICommandClearRayTracingBindings) FRHICOMMAND_MACRO(FRHICommandRayTraceOcclusion) FRHICOMMAND_MACRO(FRHICommandRayTraceIntersection) FRHICOMMAND_MACRO(FRHICommandRayTraceDispatch) FRHICOMMAND_MACRO(FRHICommandSetRayTracingBindings) FRHICOMMAND_MACRO(FClearCachedRenderingDataCommand) FRHICOMMAND_MACRO(FClearCachedElementDataCommand)
本節將講述各個線程之間的數據交換機制和實現細節。首先看看遊戲線程如何將數據傳遞給渲染線程。
遊戲線程在Tick時,會經過UGameEngine、FViewport、UGameViewportClient等對象,纔會進入渲染模塊的調用:
void UGameEngine::Tick( float DeltaSeconds, bool bIdleMode ) { UGameEngine::RedrawViewports() { void FViewport::Draw( bool bShouldPresent) { void UGameViewportClient::Draw() { // 計算ViewFamily、View的各類屬性 ULocalPlayer::CalcSceneView(); // 發送渲染命令 FRendererModule::BeginRenderingViewFamily() { World->SendAllEndOfFrameUpdates(); // 建立場景渲染器 FSceneRenderer* SceneRenderer = FSceneRenderer::CreateSceneRenderer(ViewFamily, ...); // 向渲染線程發送繪製場景指令. ENQUEUE_RENDER_COMMAND(FDrawSceneCommand)( [SceneRenderer](FRHICommandListImmediate& RHICmdList) { RenderViewFamily_RenderThread(RHICmdList, SceneRenderer) { (......) // 調用場景渲染器的繪製接口. SceneRenderer->Render(RHICmdList); (......) } FlushPendingDeleteRHIResources_RenderThread(); }); } }}}}
前面章節也提到,渲染線程使用的是SceneProxy和SceneInfo等對象,那麼遊戲的Actor組件是如何跟場景代理的數據聯繫起來的呢?又是如何更新數據的?
先弄清楚遊戲組件向SceneProxy傳遞數據的機制,答案就藏在FScene::AddPrimitive
:
// Engine\Source\Runtime\Renderer\Private\RendererScene.cpp void FScene::AddPrimitive(UPrimitiveComponent* Primitive) { (......) // 建立圖元的場景代理 FPrimitiveSceneProxy* PrimitiveSceneProxy = Primitive->CreateSceneProxy(); Primitive->SceneProxy = PrimitiveSceneProxy; if(!PrimitiveSceneProxy) { return; } // 建立圖元場景代理的場景信息 FPrimitiveSceneInfo* PrimitiveSceneInfo = new FPrimitiveSceneInfo(Primitive, this); PrimitiveSceneProxy->PrimitiveSceneInfo = PrimitiveSceneInfo; (......) FScene* Scene = this; ENQUEUE_RENDER_COMMAND(AddPrimitiveCommand)( [Params = MoveTemp(Params), Scene, PrimitiveSceneInfo, PreviousTransform = MoveTemp(PreviousTransform)](FRHICommandListImmediate& RHICmdList) { FPrimitiveSceneProxy* SceneProxy = Params.PrimitiveSceneProxy; (......) SceneProxy->CreateRenderThreadResources(); // 在渲染線程中將SceneInfo加入到場景中. Scene->AddPrimitiveSceneInfo_RenderThread(PrimitiveSceneInfo, PreviousTransform); }); }
上面有個關鍵的一句Primitive->CreateSceneProxy()
便是建立組件對應的PrimitiveSceneProxy,在PrimitiveSceneProxy的構造函數中,將組件的全部數據都拷貝了一份:
FPrimitiveSceneProxy::FPrimitiveSceneProxy(const UPrimitiveComponent* InComponent, FName InResourceName) : CustomPrimitiveData(InComponent->GetCustomPrimitiveData()) , TranslucencySortPriority(FMath::Clamp(InComponent->TranslucencySortPriority, SHRT_MIN, SHRT_MAX)) , Mobility(InComponent->Mobility) , LightmapType(InComponent->LightmapType) , StatId() , DrawInGame(InComponent->IsVisible()) , DrawInEditor(InComponent->GetVisibleFlag()) , bReceivesDecals(InComponent->bReceivesDecals) (......) { (......) }
拷貝數據以後,遊戲線程修改的是PrimitiveComponent的數據,而渲染線程修改或訪問的是PrimitiveSceneProxy的數據,彼此不干擾,避免了臨界區和鎖的同步,也保證了線程安全。不過這裏還有疑問,那就是建立PrimitiveSceneProxy的時候會拷貝一份數據,但在建立完以後,PrimitiveComponent是如何向PrimitiveSceneProxy更新數據的呢?
原來是ActorComponent有幾個標記,只要這幾個標記被標記爲true
,便會在適當的時機調用更新接口,以便獲得更新:
// Engine\Source\Runtime\Engine\Classes\Components\ActorComponent.h class ENGINE_API UActorComponent : public UObject, public IInterface_AssetUserData { protected: // 如下接口分別更新對應的狀態, 子類能夠重寫以實現本身的更新邏輯. virtual void DoDeferredRenderUpdates_Concurrent() { (......) if(bRenderStateDirty) { RecreateRenderState_Concurrent(); } else { if(bRenderTransformDirty) { SendRenderTransform_Concurrent(); } if(bRenderDynamicDataDirty) { SendRenderDynamicData_Concurrent(); } } } virtual void CreateRenderState_Concurrent(FRegisterComponentContext* Context) { bRenderStateCreated = true; bRenderStateDirty = false; bRenderTransformDirty = false; bRenderDynamicDataDirty = false; } virtual void SendRenderTransform_Concurrent() { bRenderTransformDirty = false; } virtual void SendRenderDynamicData_Concurrent() { bRenderDynamicDataDirty = false; } private: uint8 bRenderStateDirty:1; // 組件的渲染狀態是否髒的 uint8 bRenderTransformDirty:1; // 組件的變換矩陣是否髒的 uint8 bRenderDynamicDataDirty:1; // 組件的渲染動態數據是否髒的 };
上面protected的接口就是用於刷新組件的數據到對應的SceneProxy,具體的組件子類能夠重寫它,以定製本身的更新邏輯,好比ULightComponent
的變換矩陣更新邏輯以下:
// Engine\Source\Runtime\Engine\Private\Components\LightComponent.cpp void ULightComponent::SendRenderTransform_Concurrent() { // 將變換信息更新到場景. GetWorld()->Scene->UpdateLightTransform(this); Super::SendRenderTransform_Concurrent(); }
而場景的UpdateLightTransform
會將組件的數據組裝起來,並將數據發送到渲染線程執行:
// Engine\Source\Runtime\Renderer\Private\RendererScene.cpp void FScene::UpdateLightTransform(ULightComponent* Light) { if(Light->SceneProxy) { // 組裝組件的數據到結構體(注意這裏不能將Component的地址傳到渲染線程,而是將全部要更新的數據拷貝一份) FUpdateLightTransformParameters Parameters; Parameters.LightToWorld = Light->GetComponentTransform().ToMatrixNoScale(); Parameters.Position = Light->GetLightPosition(); FScene* Scene = this; FLightSceneInfo* LightSceneInfo = Light->SceneProxy->GetLightSceneInfo(); // 將數據發送到渲染線程執行. ENQUEUE_RENDER_COMMAND(UpdateLightTransform)( [Scene, LightSceneInfo, Parameters](FRHICommandListImmediate& RHICmdList) { FScopeCycleCounter Context(LightSceneInfo->Proxy->GetStatId()); // 在渲染線程執行數據更新. Scene->UpdateLightTransform_RenderThread(LightSceneInfo, Parameters); }); } } void FScene::UpdateLightTransform_RenderThread(FLightSceneInfo* LightSceneInfo, const FUpdateLightTransformParameters& Parameters) { (......) // 更新變換矩陣. LightSceneInfo->Proxy->SetTransform(Parameters.LightToWorld, Parameters.Position); (......) }
至此,組件如何向場景代理更新數據的邏輯終於理清了。
須要特別提醒的是,FScene、FSceneProxy等有些接口在遊戲線程調用,而有些接口(通常帶有_RenderThread
的後綴)在渲染線程調用,切記不能跨線程調用,不然會產生競爭條件,甚至引起程序崩潰。
前面也提到,遊戲線程不可能領先於渲染線程超過一幀,不然遊戲線程會等待渲染線程處理完。它們的同步機制涉及兩個關鍵的概念:
// Engine\Source\Runtime\RenderCore\Public\RenderCommandFence.h // 渲染命令柵欄 class RENDERCORE_API FRenderCommandFence { public: // 向渲染命令隊列增長一個柵欄. bSyncToRHIAndGPU是否同步RHI和GPU交換Buffer, 不然只等待渲染線程. void BeginFence(bool bSyncToRHIAndGPU = false); // 等待柵欄被執行. bProcessGameThreadTasks沒有做用. void Wait(bool bProcessGameThreadTasks = false) const; // 是否完成了柵欄. bool IsFenceComplete() const; private: mutable FGraphEventRef CompletionEvent; // 處理完成同步的事件 ENamedThreads::Type TriggerThreadIndex; // 處理完以後須要觸發的線程類型. }; // Engine\Source\Runtime\Engine\Public\UnrealEngine.h class FFrameEndSync { FRenderCommandFence Fence[2]; // 渲染柵欄對. int32 EventIndex; // 當前事件索引 public: // 同步遊戲線程和渲染線程. bAllowOneFrameThreadLag是否容許渲染線程一幀的延遲. void Sync( bool bAllowOneFrameThreadLag ) { Fence[EventIndex].BeginFence(true); // 開啓柵欄, 強制同步RHI和GPU交換鏈的. bool bEmptyGameThreadTasks = !FTaskGraphInterface::Get().IsThreadProcessingTasks(ENamedThreads::GameThread); // 保證遊戲線程至少跑過一次任務. if (bEmptyGameThreadTasks) { FTaskGraphInterface::Get().ProcessThreadUntilIdle(ENamedThreads::GameThread); } // 若是容許延遲, 交換事件索引. if( bAllowOneFrameThreadLag ) { EventIndex = (EventIndex + 1) % 2; } (......) // 開啓柵欄等待. Fence[EventIndex].Wait(bEmptyGameThreadTasks); } };
而FFrameEndSync
的使用是在FEngineLoop::Tick
中:
// Engine\Source\Runtime\Launch\Private\LaunchEngineLoop.cpp void FEngineLoop::Tick() { (......) // 在引擎循環的幀末尾添加遊戲線程和渲染線程的同步事件. { static FFrameEndSync FrameEndSync; // 局部靜態變量, 線程安全. static auto CVarAllowOneFrameThreadLag = IConsoleManager::Get().FindTConsoleVariableDataInt(TEXT("r.OneFrameThreadLag")); // 同步遊戲和渲染線程, 是否容許一幀的延遲可由控制檯命令控制. 默認是開啓的. FrameEndSync.Sync( CVarAllowOneFrameThreadLag->GetValueOnGameThread() != 0 ); } (......) }
並行計算架構已然成爲現代引擎的標配,UE的多線程渲染是隨着多核CPU和新一代圖形API誕生而必然的產物。但就目前而言,渲染線程不少時候仍是單條的(雖然能夠藉助TaskGraph部分地並行)。理想狀況下,是多條渲染線程並行且不依賴地生成渲染命令,而且不須要主線程來驅動,任何線程均可做爲工做線程(亦即沒有UE的命名線程),任何線程均可發起計算任務,避免操做系統級別的功能線程。而這須要操做系統、圖形API、計算機語言共同地不斷演化纔可達成。
最近發佈的UE4.26已經在普及RDG,RDG能夠自動裁剪、優化渲染Pass和資源,是提高引擎總體並行處理的一大利器。
這篇文章本來預計2個月左右完成,然而實際上花了3個多月,幾乎耗盡了筆者的全部業餘時間。本來還有不少技術章節須要添加,但篇幅和時間都超限了,只好做罷。但願此係列文章對學習UE的讀者們有幫助,感謝關注和收藏。