剖析虛幻渲染體系(02)- 多線程渲染

 

 

2.1 多線程編程基礎

爲了更平穩地過渡,在真正進入UE的多線程渲染知識以前,先學習或重溫一下多線程編程的基礎知識。php

2.1.1 多線程概述

多線程(Multithread)編程的思想早在單核時代就已經出現了,當時的操做系統(如Windows95)就已經支持多任務的功能,其原理就是在單核中切換不一樣的上下文(Context),以便每一個進程中的線程都有時間得到執行指令的機會。html

但到了2005年,當單核主頻接近4GHz時,CPU硬件廠商英特爾和AMD發現,速度也會遇到本身的極限:那就是單純的主頻提高,已經沒法明顯提高系統總體性能。前端

隨着單核計算頻率摩爾定律的緩慢終結,Intel率先於2005年發佈了奔騰D和奔騰四至尊版840系列,首次支持了兩個物理級別的線程計算單元。此後十多年,多核CPU獲得蓬勃發展,由AMD製造的Ryzen 3990X處理器已經擁有64個核心128個邏輯線程。node

銳龍(Ryzen)3990X的宣傳海報中赫然凸顯的核心與線程數量。ios

硬件的多核發展,給軟件極大的發揮空間。應用程序能夠充分發揮多核多線程的計算資源,各個應用領域由此也產生多線程編程模型和技術。做爲遊戲的發動機Unreal Engine等商業引擎,一樣能夠利用多線程技術,以便更加充分地提高效率和效果。c++

使用多線程併發帶來的做用總結起來主要有兩點:git

  • 分離關注點。經過將相關的代碼與無關的代碼分離,能夠使程序更容易理解和測試,從而減小出錯的可能性。好比,遊戲引擎中一般將文件加載、網絡傳輸放入獨立的線程中,既能夠不阻礙主線程,也能夠分離邏輯代碼,使得更加清晰可擴展。
  • 提高性能。人多力量大,這樣的道理一樣用到CPU上(核多力量大)。相同量級的任務,若是可以分散到多個CPU中同時運行,必然會帶來效率的提高。

可是,隨着CPU核心數量的提高,計算機得到的效益並不是直線提高,而是遵循Amdahl's law(阿姆達爾定律),Amdahl's law的公式定義以下:github

\[S_{latency}(s) = \frac{1}{(1-p) + \frac{p}{s}} \]

公式的各個份量含義以下:web

  • \(S_{latency}\):整個任務在多線程處理中理論上得到的加速比。
  • \(s\):用於執行任務並行部分的硬件資源的線程數量。
  • \(p\):可並行處理的任務佔比。

舉個具體的栗子,假設有8核16線程的CPU用於處理某個任務,這個任務有70%的部分是能夠並行處理的,那麼它的理論加速比爲:算法

\[S_{latency}(16) = \frac{1}{(1-0.7) + \frac{0.7}{16}} = 2.9 \]

因而可知,多線程編程帶來的效益並不是跟核心數呈直線正比,實際上它的曲線以下所示:

阿姆達爾定律揭示的核心數和加速比圖例。因而可知,可並行的任務佔比越低,加速比得到的效果越差:當可並行任務佔比爲50%時,16核已經基本達到加速比天花板,不管後面增長多少核心數量,都無濟於事;若是可並行任務佔比爲95%時,到2048個核心纔會達到加速比天花板。

雖然阿姆達爾定律給咱們帶來了殘酷的現實,可是,若是咱們可以提高任務並行佔比到接近100%,則加速比天花板能夠獲得極大提高:

\[S_{latency}(s) = \frac{1}{(1-p) + \frac{p}{s}} = \frac{1}{(1-1) + \frac{1}{s}} = s \]

如上公式所示,當\(p=1\)(便可並行的任務佔比100%)時,理論上的加速比和核心數量成線性正比!!

舉個具體的例子,在編譯Unreal Engine工程源碼或Shader時,因爲它們基本是100%的並行佔比,理論上能夠得到接近線性關係的加速比,在多核系統中將極大地縮短編譯時間。

利用多線程併發提升性能的方式有兩種:

  • 任務並行(task parallelism)。將一個單個任務分紅幾部分,且各自並行運行,從而下降總運行時間。這種方式雖然看起來很簡單直觀,但實際操做中可能會很複雜,由於在各個部分之間可能存在着依賴。
  • 數據並行(data parallelism)。任務並行的是算法(執行指令)部分,即每一個線程執行的指令不同;而數據並行是指令相同,但執行的數據不同。SIMD也是數據並行的一種方式。

上面闡述了多線程併發的益處,接下來講說它的反作用。總結起來,反作用以下:

  • 致使數據競爭。多線程訪問經常會交叉執行同一段代碼,或者操做同一個資源,又或者多核CPU的高度緩存同步問題,由此變化帶來各類數據不一樣步或數據讀寫錯誤,由此產生了各類各樣的異常結果,這即是數據競爭。
  • 邏輯複雜化,難以調試。因爲多線程的併發方式不惟一,不可預知,因此爲了不數據競爭,經常加入複雜多樣的同步操做,代碼也會變得離散、片斷化、繁瑣、難以理解,增長代碼的輔助,對後續的維護、擴展都帶來不可估量的阻礙。也會引起小几率事件難以重現的BUG,給調試和查錯增長了數量級的難度。
  • 不必定可以提高效益。多線程技術用獲得確實會帶來效率的提高,但並不是絕對,常和物理核心、同步機制、運行時狀態、併發佔比等等因素相關,在某些極端狀況,或者用得不夠穩當,可能反而會下降程序效率。

2.1.2 多線程概念

本小節將闡述多線程編程技術中常涉及的基本概念。

  • 進程(Process)

進程(Process)是操做系統執行應用程序的基本單元和實體,它自己只是個容器,一般包含內核對象、地址空間、統計信息和若干線程。它自己並不真正執行代碼指令,而是交由進程內的線程執行。

對Windows而言,操做系統在建立進程時,同時也會給它建立一個線程,該線程被稱爲主線程(Primary thread, Main thread)。

對Unix而言,進程和主線程實際上是同一個東西,操做系統並不知道有線程的存在,線程更接近於lightweight processes(輕量級進程)的概念。

進程有優先級概念,Windows下由低到高爲:低(Low)、低於正常(Below normal)、正常(Normal)、高於正常(Above normal)、高(High)、實時(Real time)。(見下圖)

默認狀況下,進程的優先級爲Normal。優先級高的進程將會優先得到執行機會和時間。

  • 線程(Thread)

線程(Thread)是能夠執行代碼的實體,一般不能獨立存在,須要依附在某個進程內部。一個進程能夠擁有多個線程,這些線程能夠共享進程的數據,以便並行或併發地執行多個任務。

在單核CPU中,操做系統(如Windows)可能會採用輪循(Round robin)的方式進行調度,使得多個線程看起來是同時運行的。(下圖)

在多核CPU中,線程可能會安排在不一樣的CPU核心同時運行,從而達到並行處理的目的。

採用SMP的Windows在多核CPU的執行示意圖。等待處理的線程被安排到不一樣的CPU核心。

每一個線程可擁有本身的執行指令上下文(如Windows的IP(指令寄存器地址)和SP(棧起始寄存器地址))、執行棧和TLS(Thread Local Storage,線程局部緩存)。

Windows線程建立和初始化示意圖。

線程局部存儲(Thread Local Storage)是一種存儲持續期,對象的生命週期與線程同樣,在線程開始時分配,線程結束時回收。每一個線程有該對象本身的實例,訪問和修改這樣的對象不會形成競爭條件(Race Condition)。

線程也存在優先級概念,優先級越高的將優先得到執行指令的機會。

線程的狀態通常有運行狀態、暫停狀態等。Windows可用如下接口切換線程狀態:

// 暫停線程
DWORD SuspendThread(HANDLE hThread);
// 繼續運行線程
DWORD ResumeThread(HANDLE hThread);

同個線程可被屢次暫停,若是要恢復運行狀態,則須要調用同等次數的繼續運行接口。

  • 協程(Coroutine)

協程(Coroutine)是一種輕量級(lightweight)的用戶態線程,一般跑在同一個線程,利用同一個線程的不一樣時間片斷執行指令,沒有線程、進程切換和調度的開銷。從使用者角度,能夠利用協程機制實如今同個線程模擬異步的任務和編碼方式。在同個線程內,它不會形成數據競爭,但也會因線程阻塞而阻塞。

  • 纖程(Fiber)

纖程(Fiber)如同協程,也是一種輕量級的用戶態線程,能夠使得應用程序獨立決定本身的線程要如何運做。操做系統內核不知道纖程的存在,也不會爲它進行調度。

  • 競爭條件(Race Condition)

同個進程容許有多個線程,這些線程能夠共享進程的地址空間、數據結構和上下文。進程內的同一數據塊,可能存在多個線程在某個很小的時間片斷內同時讀寫,這就會形成數據異常,從而致使了不可預料的結果。這種不可預期性便造就了競爭條件(Race Condition)

避免產生競爭條件的技術有不少,諸如原子操做、臨界區、讀寫鎖、內核對象、信號量、互斥體、柵欄、屏障、事件等等。

  • 並行(Parallelism)

至少兩個線程同時執行任務的機制。通常有多核多物理線程的CPU同時執行的行爲,才能夠叫並行,單核的多線程不能稱之爲並行。

  • 併發(Concurrency)

至少兩個線程利用時間片(Timeslice)執行任務的機制,是並行的更廣泛形式。即使單核CPU同時執行的多線程,也可稱爲併發。

併發的兩種形式——上:雙物理核心的同時執行(並行);下:單核的多任務切換(併發)。

事實上,併發和並行在多核處理器中是能夠同時存在的,好比下圖所示,存在雙核,每一個核心又同時切換着多個任務:

部分參考文獻嚴格區分了並行和併發,但部分文獻並不明確指出其中的區別。虛幻引擎的多線程渲染架構和API中,常出現並行和併發的概念,因此虛幻是明顯區分二者之間的含義。

  • 線程池(Thread Pool)

線程池提供了一種新的任務併發的方式,調用者只須要傳入一組可並行的任務和分組的策略,即可以使用線程池的若干線程併發地執行任務,使得調用者無需接直接觸線程的調用和管理細節,下降了調用者的成本,也提高了線程的調度效率和吞吐量。

不過,建立一個線程池時,幾個關鍵性的設計問題會影響併發效率,好比:可以使用的線程數量,高效的任務分配方式,以及是否須要等待一個任務完成。

線程池能夠自定義實現,也能夠直接使用C++、操做系統或第三方庫提供的API。

2.1.3 C++的多線程

在C++11以前,C++的多線程支持基本爲零,僅提供少許雞肋的volatile等關鍵字。直到C++11標準,多線程才真正歸入C++標準,並提供了相關關鍵字、STL標準庫,以便使用者實現跨平臺的多線程調用。

固然,對使用者來講,多線程的實現可採用C++11的線程庫,也能夠根據具體的系統平臺提供的多線程API自定義線程庫,還能夠使用諸如ACE、boost::thread等第三方庫。使用C++自帶的多線程庫,有幾個優勢,一是使用簡單方便,依賴少;二是跨平臺,無需關注系統底層。

2.1.3.1 C++多線程關鍵字

  • thread_local

thread_local是C++是實現線程局部存儲的關鍵,添加了此關鍵字的變量意味着每一個線程都有本身的一份數據,不會共享同一份數據,避免數據競爭。

C11的關鍵字_Thread_local用於定義線程局部變量。在頭文件<threads.h>定義了thread_local爲上述關鍵詞的同義。例如:

#include <threads.h>
thread_local int foo = 0;

C++11引入的thread_local關鍵字用於下述情形:

一、名字空間(全局)變量。

二、文件靜態變量。

三、函數靜態變量。

四、靜態成員變量。

此外,不一樣編譯器提供了各自的方法聲明線程局部變量:

// Visual C++, Intel C/C++ (Windows systems), C++Builder, Digital Mars C++
__declspec(thread) int number;

// Solaris Studio C/C++, IBM XL C/C++, GNU C, Clang, Intel C++ Compiler (Linux systems)
__thread int number;

// C++ Builder
int __thread number;
  • volatile

使用了volatile修飾符的變量意味着它在內存中的值可能隨時發生變化,也告訴編譯器不能作任何優化,每次使用到此變量的值都必須從內存中讀取,而不該該直接使用寄存器的值。

舉個具體的栗子吧。假設有如下代碼段:

int a = 10;
volatile int *p = &a;
int b, c;
b = *p;
c = *p;

p沒有volatile修飾,則b = *pc = *p只需從內存取一次p的值,那麼bc的值必然是10

若考慮volatile的影響,假設執行完b = *p語句以後,p的值被其它線程修改了,則執行c = *p會再次從內存中讀取p的值,此時c的值再也不是10,而是新的值。

可是,volatile並不能解決多線程的同步問題,只適合如下三種狀況使用:

一、和信號處理(signal handler)相關的場合。

二、和內存映射硬件(memory mapped hardware)相關的場合。

三、和非本地跳轉(setjmplongjmp)相關的場合。

  • std::atomic

嚴格來講atomic並非關鍵字,而是STL的模板類,能夠支持指定類型的原子操做。

使用原子的類型意味着該類型的實例的讀寫操做都是原子性的,沒法被其它線程切割,從而達到線程安全和同步的目標。

可能有些讀者會好奇,爲何對於基本類型的操做也須要原子操做。好比:

int cnt = 0;
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f};

以上三個線程同時調用函數f,該函數只執行cnt++,在C++維度,彷佛只有一條執行語句,理論上不該該存在同步問題。然而,編譯成彙編指令後,會有多條指令,這就會在多線程中引發線程上下文切換,引發不可預知的行爲。

爲了不這種狀況,就須要加入atomic類型:

std::atomic<int> cnt{0};	// 給cnt加入原子操做。
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f};

加入atomic以後,全部線程執行後的結果是肯定的,可以正常給變量計數。atomic的實現機制與臨界區相似,但效率上比臨界區更快。

爲了更進一步地說明C++的單條語句可能生成多條彙編指令,可藉助Compiler Explorer來實時查探C++彙編後的指令:

Compiler Explorer動態將左側C++語句編譯出的彙編指令。上圖所示的c++代碼編譯後可能存在一對多的彙編指令,由此印證atomic原子操做的必要性。

充分利用std::atomic的特性和接口,能夠實現不少非阻塞無鎖的線程安全的數據結構和算法,關於這一點的延伸閱讀,強力推薦《C++ Concurrency In Action》

2.1.3.2 C++線程

C++的線程類型是std::thread,它提供的接口以下表:

接口 解析
join 加入主線程,使得主線程強制等待該線程執行完。
detach 從主線程分離,使得主線程無需等待該線程執行完。
swap 與另一個線程交換線程對象。
joinable 查詢是否可加入主線程。
get_id 獲取該線程的惟一標識符。
native_handle 返回實現層的線程句柄。
hardware_concurrency 靜態接口,返回硬件支持的併發線程數量。

使用範例:

#include <iostream>
#include <thread>
#include <chrono>

void foo()
{
    // simulate expensive operation
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
 
int main()
{
    std::cout << "starting thread...\n";
    std::thread t(foo); // 構造線程對象,且傳入被執行的函數。
 
    std::cout << "waiting for thread to finish..." << std::endl;
    t.join(); // 加入主線程,使得主線程必須等待該線程執行完畢。
 
    std::cout << "done!\n";
}

輸出:

starting thread...
waiting for thread to finish...
done!

若是須要在調用線程和新線程之間同步數據,則能夠使用C++的std::promisestd::future等機制。示例代碼:

#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
 
void accumulate(std::vector<int>::iterator first,
                std::vector<int>::iterator last,
                std::promise<int> accumulate_promise)
{
    int sum = std::accumulate(first, last, 0);
    accumulate_promise.set_value(sum);  // Notify future
}
 
int main()
{
    // Demonstrate using promise<int> to transmit a result between threads.
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::promise<int> accumulate_promise;
    std::future<int> accumulate_future = accumulate_promise.get_future();
    std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
                            std::move(accumulate_promise));
 
    // future::get() will wait until the future has a valid result and retrieves it.
    // Calling wait() before get() is not needed
    //accumulate_future.wait();  // wait for result
    std::cout << "result = " << accumulate_future.get() << '\n';
    work_thread.join();  // wait for thread completion
}

輸出結果:

result = 21

可是,std::thread的執行並不能保證是異步的,也多是在當前線程執行。

若是須要強制異步,則可以使用std::async。它能夠指定兩種異步方式:std::launch::asyncstd::launch::deferred,前者表示使用新的線程異步地執行任務,後者表示在當前線程執行,且會被延遲執行。使用範例:

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
 
std::mutex m;
struct X {
    void foo(int i, const std::string& str) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << str << ' ' << i << '\n';
    }
    void bar(const std::string& str) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << str << '\n';
    }
    int operator()(int i) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << i << '\n';
        return i + 10;
    }
};
 
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
    auto len = end - beg;
    if (len < 1000)
        return std::accumulate(beg, end, 0);
 
    RandomIt mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                             parallel_sum<RandomIt>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}
 
int main()
{
    std::vector<int> v(10000, 1);
    std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
 
    X x;
    // Calls (&x)->foo(42, "Hello") with default policy:
    // may print "Hello 42" concurrently or defer execution
    auto a1 = std::async(&X::foo, &x, 42, "Hello");
    // Calls x.bar("world!") with deferred policy
    // prints "world!" when a2.get() or a2.wait() is called
    auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
    // Calls X()(43); with async policy
    // prints "43" concurrently
    auto a3 = std::async(std::launch::async, X(), 43);
    a2.wait();                     // prints "world!"
    std::cout << a3.get() << '\n'; // prints "53"
} // if a1 is not done at this point, destructor of a1 prints "Hello 42" here

執行結果:

The sum is 10000
43
Hello 42
world!
53

另外,C++20已經支持輕量級的協程(coroutine)了,相關的關鍵字:co_awaitco_returnco_yield,跟C#等腳本語言的概念和用法一模一樣,但行爲和實現機制可能會稍有不一樣,此文不展開探討了。

2.1.3.3 C++多線程同步

線程同步的機制有不少,C++支持的有如下幾種:

  • std::atomic

[2.1.3.1 C++多線程關鍵字](#2.1.3.1 C++多線程關鍵字)已經對std::atomic作了詳細的解析,能夠防止多線程之間共享數據的數據競險問題。此外,它還提供了豐富多樣的接口和狀態查詢,以便更加精細和高效地同步原子數據,常見接口和解析以下:

接口名 解析
is_lock_free 檢查原子對象是否無鎖的。
store 存儲值到原子對象。
load 從原子對象加載值。
exchange 獲取原子對象的值,並替換成指定值。
compare_exchange_weak, compare_exchange_strong 將原子對象的值和預期值(expected)對比,若是相同就替換成目標值(desired),並返回true;若是不一樣,就加載原子對象的值到預期值(expected),並返回false。weak模式不會卡調用線程,strong模式會卡住調用線程,直到原子對象的值和預期值(expected)相同。
fetch_add, fetch_sub, fetch_and, fetch_or, fetch_xor 獲取原子對象的值,並對其相加、相減等操做。
operator ++, operator --, operator +=, operator -=, ... 對原子對象響應各種操做符,操做符的意義和普通變量一致。

此外,C++20還支持wait, notify_one, notify_all等同步接口。

利用compare_exchange_weak接口能夠很方便地實現線程安全的非阻塞式的數據結構。示例:

#include <atomic>
#include <future>
#include <iostream>

template<typename T>
struct node
{
    T data;
    node* next;
    node(const T& data) : data(data), next(nullptr) {}
};
 
template<typename T>
class stack
{
 public:
    std::atomic<node<T>*> head;	// 堆棧頭, 採用原子操做.
 public:
    // 入棧操做
    void push(const T& data)
    {
        node<T>* new_node = new node<T>(data);
 
        // 將原有的頭指針做爲新節點的下一節點.
        new_node->next = head.load(std::memory_order_relaxed);
 
        // 將新的節點和老的頭部節點作對比測試, 若是new_node->next==head, 說明其它線程沒有修改head, 能夠將head替換成new_node, 從而完成push操做.
        // 反之, 若是new_node->next!=head, 說明其它線程修改了head, 將其它線程修改的head保存到new_node->next, 繼續循環檢測.
        while(!head.compare_exchange_weak(new_node->next, new_node,
                                        std::memory_order_release,
                                        std::memory_order_relaxed))
            ; // 空循環體
    }
};

int main()
{
    stack<int> s;
    
    auto r1 = std::async(std::launch::async, &stack<int>::push, &s, 1);
    auto r2 = std::async(std::launch::async, &stack<int>::push, &s, 2);
    auto r3 = std::async(std::launch::async, &stack<int>::push, &s, 3);
    
    r1.wait();
    r2.wait();
    r3.wait();
    
    // print the stack's values
    node<int>* node = s.head.load(std::memory_order_relaxed);
    while(node)
    {
        std::cout << node->data << " ";
        node = node->next;
    }
}

輸出:

2 3 1

因而可知,利用原子及其接口能夠很方便地進行多線程同步,並且因爲是多線程異步入棧,棧的元素不必定與編碼的順序一致。

以上代碼還涉及內存訪問順序的標記:

  • 排序一致序列(sequentially consistent)。
  • 獲取-釋放序列(memory_order_consume, memory_order_acquire, memory_order_release和memory_order_acq_rel)。
  • 自由序列(memory_order_relaxed)。

關於這方面的詳情能夠參看第一篇的內存屏障或者《C++ concurrency in action》的章節5.3 同步操做和強制排序

  • std::mutex

std::mutex即互斥量,它會在做用範圍內進入臨界區(Critical section),使得該代碼片斷同時只能由一個線程訪問,當其它線程嘗試執行該片斷時,會被阻塞。std::mutex常與std::lock_guard,示例代碼:

#include <iostream>
#include <map>
#include <string>
#include <chrono>
#include <thread>
#include <mutex>
 
std::map<std::string, std::string> g_pages;
std::mutex g_pages_mutex;	// 聲明互斥量
 
void save_page(const std::string &url)
{
    // simulate a long page fetch
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::string result = "fake content";
 	
    // 配合std::lock_guard使用, 能夠及時進入和釋放互斥量.
    std::lock_guard<std::mutex> guard(g_pages_mutex);
    g_pages[url] = result;
}
 
int main() 
{
    std::thread t1(save_page, "http://foo");
    std::thread t2(save_page, "http://bar");
    t1.join();
    t2.join();
 
    // safe to access g_pages without lock now, as the threads are joined
    for (const auto &pair : g_pages) {
        std::cout << pair.first << " => " << pair.second << '\n';
    }
}

輸出:

http://bar => fake content
http://foo => fake content

此外,手動操做std::mutex的鎖定和解鎖,能夠實現一些特殊行爲,例如等待某個標記:

#include <chrono>
#include <thread>
#include <mutex>

bool flag;
std::mutex m;

void wait_for_flag()
{
    std::unique_lock<std::mutex> lk(m); // 這裏採用std::unique_lock而非std::lock_guard. std::unique_lock能夠實現嘗試得到鎖, 若是當前以及被其它線程鎖定, 則延遲直到其它線程釋放, 而後纔得到鎖.
    while(!flag)
    {
        lk.unlock(); // 解鎖互斥量
        std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 休眠100ms,在此期間,其它線程能夠進入互斥量,以便更改flag標記。
        lk.lock();   // 再鎖互斥量
    }
}
  • std::condition_variable

std::condition_variablestd::condition_variable_any都是條件變量,都是C++標準庫的實現,它們都須要與互斥量配合使用。因爲std::condition_variable_any更加通用,會在性能上產生更多的開銷。故而,應當首先考慮使用std::condition_variable

利用條件變量的接口,結合互斥量的使用,能夠很方便地執行線程間的等待、通知等操做。示例:

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::mutex m;
std::condition_variable cv;	// 聲明條件變量
std::string data;
bool ready = false;
bool processed = false;
 
void worker_thread()
{
    // 等待直到主線程改變ready爲true.
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;});
 
    // 得到了互斥量的鎖
    std::cout << "Worker thread is processing data\n";
    data += " after processing";
 
    // 發送數據給主線程
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";
 
    // 手動解鎖, 以便主線程得到鎖.
    lk.unlock();
    cv.notify_one();
}
 
int main()
{
    std::thread worker(worker_thread);
 
    data = "Example data";
    // send data to the worker thread
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one();
 
    // wait for the worker
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';
 
    worker.join();
}

輸出:

main() signals data ready for processing
Worker thread is processing data
Worker thread signals data processing completed
Back in main(), data = Example data after processing
  • std::future

C++的future(指望)是一種能夠訪問將來的返回值的機制,經常使用於多線程的同步。能夠建立future的類型有: std::async, std::packaged_task, std::promise。

future對象能夠執行wait、wait_for、wait_until,從而實現事件等待和同步,示例代碼:

#include <iostream>
#include <future>
#include <thread>
 
int main()
{
    // 從packaged_task獲取的future
    std::packaged_task<int()> task([]{ return 7; }); // wrap the function
    std::future<int> f1 = task.get_future();  // get a future
    std::thread t(std::move(task)); // launch on a thread
 
    // 從async()獲取的future
    std::future<int> f2 = std::async(std::launch::async, []{ return 8; });
 
    // 從promise獲取的future
    std::promise<int> p;
    std::future<int> f3 = p.get_future();
    std::thread( [&p]{ p.set_value_at_thread_exit(9); }).detach();
 	
    // 等待全部future
    std::cout << "Waiting..." << std::flush;
    f1.wait();
    f2.wait();
    f3.wait();
    std::cout << "Done!\nResults are: " << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
    t.join();
}

輸出:

Waiting...Done!
Results are: 7 8 9

2.1.4 多線程實現機制

多線程按並行內容可分爲數據並行和任務並行兩種。其中數據並行是不一樣的線程攜帶不一樣的數據執行相同的邏輯,最經典的數據並行的應用是MMX指令、SIMD技術、Compute着色器等。任務並行是不一樣的線程執行不一樣的邏輯,數據能夠相同,也能夠不一樣,例如,遊戲引擎常常將文件加載、音頻處理、網絡接收乃至物理模擬都放到單獨的線程,以便它們能夠並行地執行不一樣的任務。

多線程若是按劃分粒度和方式,則有線性劃分、遞歸劃分、任務類型劃分等。

線性劃分法的最簡單應用就是將連續數組的元素平均分紅若干份,每份數據派發到一個線程中執行,例如並行化的std::for_each和UE裏的ParallelFor

線性劃分示意圖。連續數據被均分爲若干份,接着派發到若干線程中並行地執行。

在線性劃分並行執行結束後,一般須要由調用線程合併和同步並行的結果。

遞歸劃分法是將連續數據按照某種規則劃分紅若干份,每一份又可繼續劃分紅更細粒度,直到某種規則中止劃分。經常使用於快速排序。

快速排序有兩個最基本的步驟:將數據劃分到中樞(pivot)元素以前或以後,而後對中樞元素以前和以後的兩半數組再次進行快速排序。因爲只有在一次排序結束後才能知道哪些項在中樞元素以前和以後,因此不能經過對數據的簡單(線性)劃分達到並行。當要對這種算法進行並行化,很天然的會想到使用遞歸。每一級的遞歸都會屢次調用quick_sort函數,由於須要知道哪些元素在中樞元素以前和以後。

遞歸劃分法示意圖。

將一個大框架內的邏輯劃分紅若干個子任務,它們之間一般保持獨立,也能夠有必定依賴,每一個任務派發到一個線程執行,這就意味着真正意義上的線程獨立,每一個線程只須要關注本身所要作的事情便可。

任務劃分示意圖。

合理地安排和劃分子任務,減小它們之間的依賴和等待同步,是提高運行效率的有利武器。不過,要作到這點,每每須要通過精細的設計實現以及反覆調試和修改。

上面這種實現機制常被稱爲Fork-Join(分叉-合併)並行模型,它和串行模型的運行機制對好比下圖:

上:串行運行模型;下:Fork-Join並行運行模型。

GDC2010的演講Task-based Multithreading - How to Program for 100 cores詳細闡述瞭如何採用基於Task的多線程運行機制:

基於Task的多線程比基於線程的架構要好不少,能夠更加充分地利用多核優點,使得每一個核心都保持忙碌狀態:

該文還提到了如何將基於任務的多線程應用於排序、迷宮尋路等實際案例中。

 

2.2 現代圖形API的多線程特性

2.2.1 傳統圖形API的多線程特性

OpenGL及DirectX10以前版本的圖形API,全部的繪製指令是線性和阻塞式的,意味着每次調用Draw接口都不會當即返回,會卡住調用線程。這種CPU和GPU的交互機制在單核時代,對性能的影響不那麼突出,可是隨着多核時代的到來,這種交互機制顯然會嚴重影響運行性能。

若遊戲引擎的渲染器仍然是單線程的,這經常致使CPU的性能瓶頸,阻礙了利用多核計算資源來提升性能或豐富可視化內容。

傳統圖形API線性執行繪製指令示意圖。

單線程渲染器一般會致使單個 CPU 內核滿負荷運行,而其餘內核保持相對空閒,且性能低於可玩的幀率。

傳統圖形API在單線程單Context下設置渲染狀態調用繪製指令,而且繪製指令是阻塞式的,CPU和GPU沒法並行運行,其它CPU核心也會處於空閒等待狀態。

在這些傳統圖形API架構多線程渲染,必須從軟件層面着手,開闢多個線程,用於單獨處理邏輯和渲染指令,以便解除CPU和GPU的相互等待耦合。早在SIGGraph2008有個Talk(Practical Parallel Rendering with DirectX 9 and 10)專門講解如何在DirectX9和10實現軟件級的多線程渲染,核心部分就是在軟件層錄製(Playback)D3D的繪製命令(Command)。

Practical Parallel Rendering with DirectX 9 and 10中提出的一種軟件級的多線程渲染架構。

不過,這種軟件層面的命令錄製存在多種問題,不支持部分圖形API(如狀態查詢),需額外的命令緩存區記錄繪製指令,命令階段沒法建立真正的GPU資源等等。

DirectX11嘗試從硬件層面解決多線程渲染的問題。它支持了兩種設備上下文:即時上下文(Immediate Context)延遲上下文(Deferred Context)。不一樣的延遲上下文能夠同時在不一樣的線程中使用,生成將在「即時上下文」中執行的命令列表。這種多線程策略容許將複雜的場景分解成併發任務。

DirectX11的多線程模型。

不一樣的延遲上下文能夠同時在不一樣的線程中使用,生成將在即時上下文中執行的命令列表。這種多線程策略容許將複雜的場景分解成併發任務。此外,延遲上下文在某些驅動的支持下,可實現硬件級的加速,而沒必要在即時上下文執行Command List。

爲何使用Deferred Context的Command List提早錄製繪製指令會比直接使用Immediate Context調用繪製指令的效率更高?

答案在於Command List內部會對已經錄製的指令作高度的優化,執行這些優化後的指令會明顯提高效率,比直接單獨每條調用圖形API會高效得多。

在D3D11中命令列表中的命令是被快速記錄下來,而不是當即執行的,直到程序調用ExecuteCommandList方法(調用即返回,不等待)才被GPU真正的執行,此時那些使用延遲渲染設備接口的CPU線程以及主渲染線程又能夠去幹別的事情了,好比繼續下一幀的碰撞檢測、物理變換、動畫插值、光照準備等等,從而爲記錄造成新的命令列表作準備。

不過,基於DirectX11的多線程架構,因爲硬件層面的加速不是必然支持,全部在Deferred Context記錄的指令連同Immediate Context的指令必須由同一個線程(一般是渲染線程)提交給GPU執行。

DirectX11下的多線程架構示意圖。

這種非硬件支持的多線程渲染只是節省了部分CPU時間(多線程錄製指令和繪製同步等待),並不能從硬件層面真正發揮多線程的威力。

2.2.2 DirectX12的多線程特性

相較於DirectX11過渡性的僞多線程模型(稱之僞,是由於當時的大多數驅動並不支持DirectX11的硬件級多線程),DirectX 12 多線程則經過顯著減小 API 調用額外開銷獲得了很大的改進,它取消了 DirectX 11 的設備上下文的概念,直接使用Command List來調用 D3D APIs,而後經過命令隊列將命令列表提交給 GPU,而且全部 DirectX 12顯卡都支持 DirectX 12 多線程的硬件加速。

DirectX12的多線程模型。

從原理上來看,DirectX12與DirectX11多線程渲染框架是相似的,都是經過在不一樣的CPU線程中錄製命令列表(Command Lists),最後再統一執行的方式完成多線程渲染。它們都從根本上屏蔽了使人髮指的Draw Call同步調用,而改成CPU和GPU徹底異步(並行)執行的方式,從而在總體渲染效率和性能上得到巨大的提高。

對於DirectX12,用戶層面有3種命令隊列(Command Queue):複製隊列(Copy Queue)計算隊列(Compute Queue)3D隊列(3D Queue),它們能夠並行地執行,而且經過柵欄(Fence)、信號(Signal)或屏障(Barrier)來等待和同步。

GPU硬件層面則有3種引擎:複製引擎(Copy Engine)計算引擎(Compute Engine)3D引擎(3D Engine),它們也能夠並行地執行,而且經過柵欄(Fence)、信號(Signal)或屏障(Barrier)來等待和同步。

命令隊列可驅動GPU硬件的若干引擎,但有必定的限制,更具體地,3D Queue能夠驅動GPU硬件的3種引擎,Compute Queue只能驅動Compute Engine和Copy Engine,Copy Queue僅能夠驅動Copy Engine。

在CPU層面,能夠有若干個線程,每一個線程可建立產生若干個命令列表(Command List),每一個命令列表可進入3種Command Queue的其中一種。當這些命令被GPU執行時,每種指令列表裏的命令會壓入不一樣的GPU引擎,以便它們並行地執行。(下圖)

DirectX12中的CPU線程、命令列表、命令隊列、GPU引擎之間的運行機制示意圖。

2.2.3 Vulkan的多線程特性

做爲跨平臺圖形API的新生表明Vulkan,摒棄了傳統圖形API的弊端,直面多核時代的優點,從而從設計和架構上發揮了並行渲染的威力。

綜合上看,Vulkan和DirectX12是很是接近的,都有着Command Buffer、CommandPool、Command Queue和Fence等核心概念,並行模式也很是類似:在不一樣的CPU線程並行地生成Command Buffer指令,最後由主線程收集這些Command Buffer並提交至GPU:

Vulkan圖形API並行示意圖。

而且,Vulkan的CommandPool能夠每幀被不一樣的線程建立,以便減小同步等待,提高並行效率:

Vulkan中的CommandPool在不一樣幀之間的並行示意圖。

此外,Vulkan也存在着和DirectX12相似的各類同步機制:

Vulkan同步機制:semaphore(信號)用於同步Queue;Fence(柵欄)用於同步GPU和CPU;Event(事件)和Barrier(屏障)用於同步Command Buffer。

關於Vulkan的更多用法、剖析、對比可參見文獻Evaluation of multi-threading in Vulkan

2.2.4 Metal的多線程特性

Metal做爲iOS和MacOS系統的專屬圖形API,也是新生代的表明,它既兼容OpenGL這種傳統的圖形API用法,也支持相似Vulkan、DirectX12的新一代圖形API理念和架構。從使用者層面來看,Metal是比較友善的,提供告終構更清晰、概念更友好的API。

從OpenGL遷移到新生代圖形API的成本和收益對比。橫座標是從OpenGL(或ES)遷移其它圖形API的成本,縱座標是潛在的性能收益。可見Metal的遷移成本較低,但潛在的性能比也沒有Vulkan和DirectX12高。

Metal如同Vulkan和DirectX,有着不少類似的概念,諸如Command、Command Buffer、Command Queue及各種同步機制。

Metal基礎概念關係一覽表。其中Command Encoder有3種類型:MTLRenderCommandEncoder、MTLComputeCommandEncoder和MTLBlitCommandEncoder。CommandEncoder錄製命令以後,塞入Command Buffer,最終進入Command Queue命令隊列。

有了相似的概念和機制,Metal一樣能夠方便地實現多線程錄製命令,且從硬件層面支持多線程調度:

Metal多線程模型示意圖。圖中顯示了3個CPU線程同時錄製不一樣類型的Encoder,每一個線程都有專屬的Command Buffer,最終這些Command Buffer統一匯入Command Queue交由GPU執行。

 

2.3 遊戲引擎的多線程渲染

在正式講解UE的多線程渲染以前,先了解一下其它主流商業引擎的多線程架構和設計。

2.3.1 Unity

Unity的渲染體系中有幾個核心概念,一個是Client,運行於主線程(邏輯線程),負責產生渲染指令;另外一個是Worker Thread,工做線程,用於協助處理主線程或生成渲染指令等各種子工做。Unity的渲染架構中支持如下幾種模式:

  • Singlethreaded Rendering

單線程渲染模式,此模式下只有單個Client組件,沒有工做線程。惟一的Client在主線程中產生全部的渲染命令(rendering command,RCMD),而且擁有圖形設備對象,也會在主線程向圖形設備產生調用圖形API命令(graphics API,GCMD),它的調度示意圖以下:

這種模式下,CPU和GPU可能會相互等待,沒法充分利用多核CPU,性能比較最差。

  • **Multithreaded Rendering **

多線程渲染模式,這種模式下和單線程對比,就是多了一條工做線程,即用於生成GCMD的渲染線程,其中渲染線程跑的是GfxDeviceClient對象,專用於生成對應平臺的圖形API指令:

  • Jobified Rendering

做業化渲染模式,此模式下有多個Client對象,單個渲染線程。此外,有多個做業對象,每一個做業對象跑在專用獨立的線程,用於生成即時圖形命令(intermediate graphics commands,IGCMD)。此外,還有一個工做線程(渲染線程)用於將做業線程生成的IGCMD轉換成圖形API的GCMD,運行示意圖以下:

  • Graphics Jobs

圖形化做業渲染模式,此模式下有多個Client,多個工做線程,沒有渲染線程。主線程上的多個Client對象驅動工做線程上的對應圖形設備對象,直接生成GCMD,從而避免生成Jobified Rendering模式的IGCMD中間指令。只在支持硬件級多線程的圖形API上可啓用,如DirectX十二、Vulkan等。運行示意圖以下:

**2.3.2 Frostbite **

Frostbite(寒霜)引擎在早期的時候,將每一幀分紅個步驟:裁剪、構建、渲染,每一個步驟所需的數據都放到雙緩衝內(double buffer),採用級聯方式運行,應用簡單的同步流程。它的運行示意圖以下:

而通過多年的進化,Frostbite在前幾年採用了幀圖(Frame Graph)的多線程渲染模式。該模式旨在將引擎的各種渲染功能(Feature)和上層渲染邏輯(Renderer)和下層資源(Shader、RenderContext、圖形API等)隔離開來,以便作進一步的解耦、優化,其中最重要的優化即開啓多線程渲染。

FrameGraph是高層級的Render Pass和資源的表明,包含了一幀中所用到的全部信息。Pass之間能夠指定順序和依賴關係,下圖是其中的一個示例:

寒霜引擎採用幀圖方式實現的延遲渲染的順序和依賴圖。

其中幀圖的每一幀信息都有三個階段:創建(Setup)、編譯(Compile)和執行(Execute)。

創建階段就是建立各個Render Pass、輸入紋理、輸出紋理、依賴資源等等信息。

編譯階段的工做主要是剔除未使用的Render Pass和資源,計算資源生命週期,以及根據使用標記建立對應的GPU資源,建立GPU資源時又作了大量的優化,諸如:簡化顯存分配算法,在第一次使用時申請最後一次使用後釋放,異步計算外部資源的生命週期,源於綁定標記的精確資源管理,扁平化全部資源的引用以提高GPU高速緩存的命中率等等。編譯階段採用線性遍歷全部的RenderPass,遍歷時會計算資源引用次數、資源的最初和最後使用者、異步等待點和資源屏障等等。

執行階段就按照Setup的順序執行(編譯階段也不會從新排序),只遍歷那些未被剔除的Render Pass並執行它們的回調函數。若是是當即模式,則直接調用設備上下文的API。執行階段纔會根據編譯階段生成的handle真正獲取GPU資源。

最關鍵的是整個過程經過依賴圖(Dependency Grahp)實現自動化異步計算。異步機制在主時間軸開始,會自動同步在不一樣Queue裏的資源,同時會擴展它們的生命週期,以防意外釋放。固然,這個自動化系統也有反作用,如額外增長必定量的內存,可能會引起不可預期的性能瓶頸。因此,寒霜引擎支持手動模式,以便按照預期控制和更改異步運行方式,從而逐Render Pass選擇性加入。

下圖能夠比較簡潔明瞭說明異步計算的運行機制:

寒霜引擎異步計算示意圖。其中SSAO、SSAO Filter的Pass放入到異步隊列,它們會寫入和讀取Raw AO的紋理,即使在同步點以前結束,但Raw AO的生命週期依然會被延長到同步點。

總之,幀圖的渲染架構得益於記錄了該幀全部的信息,以致於能夠經過資源別名(Resource Aliasing)節省大量的內存和顯存,能夠實現半自動化的異步計算,能夠簡化渲染管線控制,能夠製做出更加良好的可視化和診斷工具。

2.3.3 Naughty Dog Engine

頑皮狗的遊戲引擎採用的也是做業系統,容許非GPU端的邏輯代碼加入到做業系統。做業直接能夠開啓和等待其它做業,對調用者隱藏內存管理細節,提供了簡潔易用的API,性能優化放在了第二位。

其中做業系統運行於纖程(Fiber),每一個纖程相似局部的線程,用戶層提供棧空間,其上下文包含少許的纖程狀態,以便減小寄存器的佔用。實際地運行在線程上,協做型的多線程模型。因爲纖程非系統級的線程,切換上下文會很是快,只保存和恢復寄存器的狀態(程序計數,棧指針,gpr等),故而開銷會很小。

做業系統會開闢若干條工做線程,每條工做線程會鎖定到GPU硬件核心。線程是執行單元,纖程是上下文,做業老是在線程的上下文內執行,採用原子計數器來同步。下圖是頑皮狗引擎的做業系統架構圖:

頑皮狗引擎做業系統架構圖。擁有6個工做線程,160個纖程,3個做業隊列。

做業能夠向做業隊列添加新的做業,同時等待中的做業會放到專門的等待列表,每一個等待中的做業會有引用計數,直到引用計數爲0,纔會從等待隊列中出列,以便繼續執行。

在頑皮狗引擎內,除了IO線程以外的全部東西都是做業,包括遊戲物體更新、動做更新和混合、射線檢測、渲染命令生成等等。可見將做業系統發揮得淋漓盡致,最大程度提高了並行的比例和效率。

爲了提高幀率,將遊戲邏輯和渲染邏輯相分離,並行地執行,不過處理的是不一樣幀的數據,一般遊戲數據領先渲染數據一幀,而渲染邏輯又領先GPU數據一幀。

經過這樣的機制,能夠避免CPU線程之間以及CPU和GPU之間的同步和等待,提高了幀率和吞吐量。

此外,它的內存分配也作了精緻的管理,好比引入了帶標籤的內存堆(Tagged Heap),內存堆以2M爲一塊(Block),每一個Block帶有一個標籤(Game、Render、GPU之一),分配器分配和釋放內存時是在標籤堆裏執行,避免直接向操做系統獲取:

此外,分配器支持爲每一個工做線程分配一個專屬的塊(跟TLS相似),避免數據同步和等待的時間,避免數據競險。

2.3.4 Destiny’s Engine

命運(Destiny)是一款第一人稱的動做角色扮演MMORPG,它使用的引擎也被稱爲命運引擎(Destiny’s Engine)。

命運引擎在多線程架構上,採用的技術有基於任務的並行處理,做業管理設計和同步處理,做業的執行也是在纖程上。做業系統執行做業的優先級是FIFO(先進先出),做業圖是異源架構,做業之間存在依賴,但沒有柵欄。

它將每一幀分紅幾個步驟:模擬遊戲物體、物體裁剪、生成渲染命令、執行GPU相關工做、顯示。在線程設計上,會建立6條系統線程,每條線程的內容依次是:模擬循環,其它做業,渲染循環,音頻循環,做業核心和調試Log,異步任務、IO等。

在處理幀之間的數據,也是分離開遊戲模擬和渲染邏輯,遊戲模擬老是領先渲染一幀。遊戲模擬完以後,會將全部數據和狀態拷貝一份(鏡像,Mirror),以供下一幀的渲染使用:

命運引擎爲了最大化CPU和GPU的並行效率,採起了動態加載平衡(dynamic load balancing)和智能做業合批(smart job batching),具體作法是將全部渲染和可見性剔除的工做加入到任務系統,保持低延遲。下圖是並行化計算視圖做業的圖例:

此外,還將模擬邏輯從渲染邏輯中抽離和解耦,採用徹底的數據驅動的渲染管線,全部的排序、內存分配、遍歷等算法都遵循了高速緩存一致性(結構體小量化,數據對齊,使得單個結構體數據能一次性被加載進高速緩存行)。

 

2.4 UE的多線程機制

本章節主要剖析一下UE的多線程基礎、設計及架構,以便後面更好地切入到多線程渲染。

2.4.1 UE的多線程基礎

  • TAtomic

UE的原子操做並無使用C++的Atomic模板,而是本身實現了一套,叫TAtomic。它提供的功能有加載、存儲、賦值等操做,在底層實現上,會採用平臺相關的原子操做接口實現:

// Engine\Source\Runtime\Core\Public\Templates\Atomic.h

template <typename T>
FORCEINLINE T Load(const volatile T* Element)
{
    // 採起平臺相關的接口加載原子值.
    auto Result = FPlatformAtomics::AtomicRead((volatile TUnderlyingIntegerType_T<T>*)Element);
    return *(const T*)&Result;
}

template <typename T>
FORCEINLINE void Store(const volatile T* Element, T Value)
{
    // 採起平臺相關的接口存儲原子值.
    FPlatformAtomics::InterlockedExchange((volatile TUnderlyingIntegerType_T<T>*)Element, *(const TUnderlyingIntegerType_T<T>*)&Value);
}

template <typename T>
FORCEINLINE T Exchange(volatile T* Element, T Value)
{
    // 採起平臺相關的接口交換原子值.
    auto Result = FPlatformAtomics::InterlockedExchange((volatile TUnderlyingIntegerType_T<T>*)Element, *(const TUnderlyingIntegerType_T<T>*)&Value);
    return *(const T*)&Result;
}

在內存順序上,不像C++提供了四種模式,UE作了簡化,只提供了兩種模式:

enum class EMemoryOrder
{
	Relaxed,	// 順序鬆散, 不會引發重排序
	SequentiallyConsistent	// 順序一致
};

須要注意的是,TAtomic雖然是模板類,但只對基本類型生效,UE是經過父類TAtomicBaseType_T來達到檢測的目的:

template <typename T>
class TAtomic final : public UE4Atomic_Private::TAtomicBaseType_T<T>
{
	static_assert(TIsTrivial<T>::Value, "TAtomic is only usable with trivial types");
    
    (......)
}
  • TFuture

UE實現了相似C++的Future和Promise對象,是模板類,抽象了返回值類型。如下是TFuture的聲明:

// Engine\Source\Runtime\Core\Public\Async\Future.h

template<typename InternalResultType>
class TFutureBase
{
public:
	bool IsReady() const;
	bool IsValid() const;

	void Wait() const
	{
		if (State.IsValid())
		{
			while (!WaitFor(FTimespan::MaxValue()));
		}
	}
	bool WaitFor(const FTimespan& Duration) const
	{
		return State.IsValid() ? State->WaitFor(Duration) : false;
	}
	bool WaitUntil(const FDateTime& Time) const
	{
		return WaitFor(Time - FDateTime::UtcNow());
	}

protected:
	typedef TSharedPtr<TFutureState<InternalResultType>, ESPMode::ThreadSafe> StateType;

	const StateType& GetState() const;

	template<typename Func>
	auto Then(Func Continuation);

	template<typename Func>
	auto Next(Func Continuation);

	void Reset();

private:

	/** Holds the future's state. */
	StateType State;
};

template<typename ResultType>
class TFuture : public TFutureBase<ResultType>
{
	typedef TFutureBase<ResultType> BaseType;

public:

	ResultType Get() const
	{
		return this->GetState()->GetResult();
	}

	TSharedFuture<ResultType> Share()
	{
		return TSharedFuture<ResultType>(MoveTemp(*this));
	}
};
  • TPromise

TPromise一般要和TFuture配合使用,以下所示:

template<typename InternalResultType>
class TPromiseBase : FNoncopyable
{
	typedef TSharedPtr<TFutureState<InternalResultType>, ESPMode::ThreadSafe> StateType;

    (......)
    
protected:
	const StateType& GetState();

private:
	StateType State; // 存儲了Future的狀態.
};

template<typename ResultType>
class TPromise : public TPromiseBase<ResultType>
{
public:
	typedef TPromiseBase<ResultType> BaseType;

public:
    // 獲取Future對象
	TFuture<ResultType> GetFuture()
	{
		check(!FutureRetrieved);
		FutureRetrieved = true;

		return TFuture<ResultType>(this->GetState());
	}
	
    // 設置Future的值
	FORCEINLINE void SetValue(const ResultType& Result)
	{
		EmplaceValue(Result);
	}

	FORCEINLINE void SetValue(ResultType&& Result)
	{
		EmplaceValue(MoveTemp(Result));
	}

	template<typename... ArgTypes>
	void EmplaceValue(ArgTypes&&... Args)
	{
		this->GetState()->EmplaceResult(Forward<ArgTypes>(Args)...);
	}

private:
	bool FutureRetrieved;
};
  • ParallelFor

ParallelFor是UE內置的支持多線程並行處理任務的For循環,在渲染系統中應用得至關廣泛。它支持如下幾種並行方式:

enum class EParallelForFlags
{
	None, // 默認並行方式
	ForceSingleThread = 1, // 強制單線程, 經常使用於調試.
	Unbalanced = 2, // 非任務平衡, 經常使用於具備高度可變計算時間的任務.
	PumpRenderingThread = 4, // 注入渲染線程. 若是是在渲染線程調用, 須要保證ProcessThread空閒狀態.
};

支持的ParallelFor調用方式以下:

inline void ParallelFor(int32 Num, TFunctionRef<void(int32)> Body, bool bForceSingleThread, bool bPumpRenderingThread=false);

inline void ParallelFor(int32 Num, TFunctionRef<void(int32)> Body, EParallelForFlags Flags = EParallelForFlags::None);

template<typename FunctionType>
inline void ParallelForTemplate(int32 Num, const FunctionType& Body, EParallelForFlags Flags = EParallelForFlags::None);

inline void ParallelForWithPreWork(int32 Num, TFunctionRef<void(int32)> Body, TFunctionRef<void()> CurrentThreadWorkToDoBeforeHelping, bool bForceSingleThread, bool bPumpRenderingThread = false);

inline void ParallelForWithPreWork(int32 Num, TFunctionRef<void(int32)> Body, TFunctionRef<void()> CurrentThreadWorkToDoBeforeHelping, EParallelForFlags Flags = EParallelForFlags::None);

ParallelFor是基於TaskGraph機制實現的,因爲TaskGraph後面才提到,這裏就不涉及其實現。下面展現UE的一個應用案例:

// Engine\Source\Runtime\Engine\Private\Components\ActorComponent.cpp

// 並行化增長Primitive到場景的用例.
void FRegisterComponentContext::Process()
{
	FSceneInterface* Scene = World->Scene;
	ParallelFor(AddPrimitiveBatches.Num(), // 數量
		[&](int32 Index) //回調函數, Index返回索引
		{
			if (!AddPrimitiveBatches[Index]->IsPendingKill())
			{
				Scene->AddPrimitive(AddPrimitiveBatches[Index]);
			}
		},
		!FApp::ShouldUseThreadingForPerformance() // 是否多線程處理
	);
	AddPrimitiveBatches.Empty();
}
  • 基礎模板

UnrealTemplate.h定義了不少基礎模板,用於數據轉換、拷貝、轉移等功能。下面例舉部分常見的函數和類型:

模板名 解析 stl映射
template
ReferencedType* IfAThenAElseB(ReferencedType* A,ReferencedType* B)
返回A ? A : B -
template
void Move(T& A,typename TMoveSupportTraits ::Copy B)
釋放A,將B的數據替換到A,但不會影響B的數據。 -
template
void Move(T& A,typename TMoveSupportTraits ::Move B)
釋放A,將B的數據替換到A,但會影響B的數據。 -
FNoncopyable 派生它便可實現不可拷貝的對象。 -
TGuardValue 帶做業域的值,可指定一個新值和舊值,做用域內是新值,離開做用域變成舊值。 -
TScopeCounter 帶做用域的計數器,做用域內計數器+1,離開做用域後計數器-1 -
template
typename TRemoveReference ::Type&& MoveTemp(T&& Obj)
將引用轉換成右值,可能會修改源值。 std::move
template
T CopyTemp(T& Val)
強制建立右值的拷貝,不會改變源值。 -
template
T&& Forward(typename TRemoveReference ::Type& Obj)
將引用轉換成右值引用。 std::forward
template <typename T, typename ArgType>
T StaticCast(ArgType&& Arg)
靜態類型轉換。 static_cast

2.4.2 UE的多線程實現

UE的多線程實現上並無採納C++11標準庫的那一套,而是本身從系統級作了封裝和實現,包括系統線程、線程池、異步任務、任務圖以及相關的通知和同步機制。

2.4.2.1 FRunnable

FRunnable是全部能夠在多個線程並行地運行的物體的父類,它提供的基礎接口以下:

// Engine\Source\Runtime\Core\Public\HAL\Runnable.h

class CORE_API FRunnable
{
public:
	virtual bool Init();	// 初始化, 成功返回True.
	virtual uint32 Run();	// 運行, 只有Init成功纔會被調用.
	virtual void Stop();	// 請求提早中止.
	virtual void Exit();	// 退出, 清理數據.
};

FRunnable及其子類是可運行於多線程的對象,而與之對立的是隻在單線程運行的類FSingleThreadRunnable

// Engine\Source\Runtime\Core\Public\Misc\SingleThreadRunnable.h

// 多線程禁用下的單線程運行的物體
class CORE_API FSingleThreadRunnable
{
public:
	virtual void Tick();
};

FRunnable的子類很是多,如下是常見的部分核心子類及其解析。

  • FRenderingThread:運行於渲染線程上的對象。後面有章節會專門剖析。

  • FRHIThread:運行於RHI線程上的對象。後面有章節會專門剖析。

  • FRenderingThreadTickHeartbeat:運行於心跳渲染線程上的物體。

  • FTaskThreadBase:在線程執行的任務父類,後面會有章節專門解析這部分。

  • FQueuedThread:可存儲在線程池的線程父類。提供的接口以下:

    // Engine\Source\Runtime\Core\Private\HAL\ThreadingBase.cpp
    
    class FQueuedThread : public FRunnable
    {
    protected:
    	FEvent* DoWorkEvent; // 任務執行完畢的事件.
    	TAtomic<bool> TimeToDie; // 是否須要超時.
    	IQueuedWork* volatile QueuedWork; // 被執行的任務.
    	class FQueuedThreadPoolBase* OwningThreadPool; // 所在的線程池.
    	FRunnableThread* Thread; // 真正用於執行任務的線程.
    
    	virtual uint32 Run() override;
        
    public:
    	virtual bool Create(class FQueuedThreadPoolBase* InPool,uint32 InStackSize,EThreadPriority ThreadPriority);
    	bool KillThread();
    	void DoWork(IQueuedWork* InQueuedWork);
    };
  • TAsyncRunnable:異步地在單獨線程運行的任務,是個模板類,聲明以下:

    // Engine\Source\Runtime\Core\Public\Async\Async.h
    
    template<typename ResultType>
    class TAsyncRunnable: public FRunnable
    {
    public:
    	virtual uint32 Run() override;
    
    private:
    	TUniqueFunction<ResultType()> Function;
    	TPromise<ResultType> Promise;
    	TFuture<FRunnableThread*> ThreadFuture;
    };
  • FAsyncPurge:輔助類,提供銷燬位於工做線程的UObject對象。

因而可知,FRunnable對象並不能獨立存在,老是要依賴線程來真正地執行任務。

另外,還須要特地提出:FRenderingThread、FQueuedThread聽名字像是真正的線程,然而並非,只是用於處理某些特定任務的可運行物體,實際上仍是要依賴它們內部FRunnableThread的成員對象來執行。

2.4.2.2 FRunnableThread

FRunnableThread是可運行線程的父類,提供了一組用於管理線程生命週期的接口。它提供的基礎接口和解析以下:

// Engine\Source\Runtime\Core\Public\HAL\RunnableThread.h

class CORE_API FRunnableThread
{
	static uint32 RunnableTlsSlot;	// FRunnableThread的TLS插槽索引.

public:
	static uint32 GetTlsSlot();
    // 靜態類, 用於建立線程, 需提供一個FRunnable對象, 用於線程執行的任務.
	static FRunnableThread* Create(FRunnable* InRunnable, const TCHAR* ThreadName, uint32 InStackSize = 0,
                                   EThreadPriority InThreadPri, uint64 InThreadAffinityMask,EThreadCreateFlags InCreateFlags);

    // 設置線程優先級.
	virtual void SetThreadPriority( EThreadPriority NewPriority );
    // 暫停/繼續運行線程
	virtual void Suspend( bool bShouldPause = true );
    // 銷燬線程, 一般須要指定等待標記bShouldWait爲true, 不然可能引發內存泄漏或死鎖!
	virtual bool Kill( bool bShouldWait = true );
    // 等待執行完畢, 會卡調用線程.
	virtual void WaitForCompletion();
    
	const uint32 GetThreadID() const;
	const FString& GetThreadName() const;

protected:
	FString ThreadName;
	FRunnable* Runnable; // 被執行對象
	FEvent* ThreadInitSyncEvent; // 線程初始化完成同步事件, 防止線程未初始化完畢就執行任務.
	uint64 ThreadAffinityMask; // 親和標記, 用於線程傾向指定的CPU核心執行.
	TArray<FTlsAutoCleanup*> TlsInstances; // 線程消耗時須要一塊兒清理的Tls對象.
	EThreadPriority ThreadPriority;
	uint32 ThreadID;

private:
	virtual void Tick();
};

須要注意的是,FRunnableThread提供了靜態建立接口,建立線程時須要指定一個FRunnable對象,做爲線程執行的任務。它是一個基礎父類,下面是繼承自它的部分核心子類及解析:

  • FRunnableThreadWin:Windows平臺的線程實現。它的接口和實現以下:

    // Engine\Source\Runtime\Core\Private\Windows\WindowsRunnableThread.h
    
    class FRunnableThreadWin : public FRunnableThread
    {
    	HANDLE Thread; // 線程句柄
    	
        // 線程回調接口, 建立線程時做爲參數傳入.
    	static ::DWORD STDCALL _ThreadProc( LPVOID pThis )
    	{
    		check(pThis);
    		return ((FRunnableThreadWin*)pThis)->GuardedRun();
    	}
    
    	uint32 GuardedRun();
    	uint32 Run();
    
    public:
        // 轉換優先級
    	static int TranslateThreadPriority(EThreadPriority Priority)
    	{
    		switch (Priority)
    		{
    		case TPri_AboveNormal: return THREAD_PRIORITY_HIGHEST;
    		case TPri_Normal: return THREAD_PRIORITY_HIGHEST - 1;
    		case TPri_BelowNormal: return THREAD_PRIORITY_HIGHEST - 3;
    		case TPri_Highest: return THREAD_PRIORITY_HIGHEST;
    		case TPri_TimeCritical: return THREAD_PRIORITY_HIGHEST;
    		case TPri_Lowest: return THREAD_PRIORITY_HIGHEST - 4;
    		case TPri_SlightlyBelowNormal: return THREAD_PRIORITY_HIGHEST - 2;
    		default: UE_LOG(LogHAL, Fatal, TEXT("Unknown Priority passed to TranslateThreadPriority()")); return TPri_Normal;
    		}
    	}
    	
        // 設置優先級
    	virtual void SetThreadPriority( EThreadPriority NewPriority ) override
    	{
    		// Don't bother calling the OS if there is no need
            ThreadPriority = NewPriority;
            // Change the priority on the thread
            ::SetThreadPriority(Thread, TranslateThreadPriority(ThreadPriority));
    	}
    	
    	virtual void Suspend( bool bShouldPause = true ) override
    	{
    		check(Thread);
    		if (bShouldPause == true)
    		{
    			SuspendThread(Thread);
    		}
    		else
    		{
    			ResumeThread(Thread);
    		}
    	}
    
    	virtual bool Kill( bool bShouldWait = false ) override
    	{
    		check(Thread && "Did you forget to call Create()?");
    		bool bDidExitOK = true;
    		// 先中止Runnable對象, 使得其有清理數據的機會
    		if (Runnable)
    		{
    			Runnable->Stop();
    		}
    		// 等待線程處理完畢.
    		if (bShouldWait == true)
    		{
    			// Wait indefinitely for the thread to finish.  IMPORTANT:  It's not safe to just go and
    			// kill the thread with TerminateThread() as it could have a mutex lock that's shared
    			// with a thread that's continuing to run, which would cause that other thread to
    			// dead-lock.  (This can manifest itself in code as simple as the synchronization
    			// object that is used by our logging output classes.  Trust us, we've seen it!)
    			WaitForSingleObject(Thread,INFINITE);
    		}
    		// 關閉線程句柄
    		CloseHandle(Thread);
    		Thread = NULL;
    
    		return bDidExitOK;
    	}
    
    	virtual void WaitForCompletion( ) override
    	{
    		// Block until this thread exits
    		WaitForSingleObject(Thread,INFINITE);
    	}
    
    protected:
    
    	virtual bool CreateInternal( FRunnable* InRunnable, const TCHAR* InThreadName,
    		uint32 InStackSize = 0,
    		EThreadPriority InThreadPri = TPri_Normal, uint64 InThreadAffinityMask = 0,
    		EThreadCreateFlags InCreateFlags = EThreadCreateFlags::None) override
    	{
    		static bool bOnce = false;
    		if (!bOnce)
    		{
    			bOnce = true;
    			::SetThreadPriority(::GetCurrentThread(), TranslateThreadPriority(TPri_Normal)); // set the main thread to be normal, since this is no longer the windows default.
    		}
    
    		check(InRunnable);
    		Runnable = InRunnable;
    		ThreadAffinityMask = InThreadAffinityMask;
    
    		// 建立初始化完成同步事件.
    		ThreadInitSyncEvent	= FPlatformProcess::GetSynchEventFromPool(true);
    
    		ThreadName = InThreadName ? InThreadName : TEXT("Unnamed UE4");
    
    		// Create the new thread
    		{
    			LLM_SCOPE(ELLMTag::ThreadStack);
    			LLM_PLATFORM_SCOPE(ELLMTag::ThreadStackPlatform);
    			// add in the thread size, since it's allocated in a black box we can't track
    			LLM(FLowLevelMemTracker::Get().OnLowLevelAlloc(ELLMTracker::Default, nullptr, InStackSize));
    			LLM(FLowLevelMemTracker::Get().OnLowLevelAlloc(ELLMTracker::Platform, nullptr, InStackSize));
    
    			// 調用Windows API建立線程.
    			Thread = CreateThread(NULL, InStackSize, _ThreadProc, this, STACK_SIZE_PARAM_IS_A_RESERVATION | CREATE_SUSPENDED, (::DWORD *)&ThreadID);
    		}
    
    		// If it fails, clear all the vars
    		if (Thread == NULL)
    		{
    			Runnable = nullptr;
    		}
    		else
    		{
                // 加入到線程管理器中.
    			FThreadManager::Get().AddThread(ThreadID, this);
    			ResumeThread(Thread);
    
    			// Let the thread start up
    			ThreadInitSyncEvent->Wait(INFINITE);
    
    			SetThreadPriority(InThreadPri);
    		}
    
    		// 清理同步事件
    		FPlatformProcess::ReturnSynchEventToPool(ThreadInitSyncEvent);
    		ThreadInitSyncEvent = nullptr;
    		return Thread != NULL;
    	}
    };

    從上面代碼可看出,Windows平臺的線程直接調用Windows API建立和同步信息,從而實現線程的平臺抽象,從平臺依賴抽離出來。

  • FRunnableThreadPThread:POSIX Thread(簡稱PThread)的父類,經常使用於類Unix POSIX 系統,如Linux、Solaris、Apple等。其實現和Windows平臺相似,這裏就不展開其代碼解析了。它的子類有:

    • FRunnableThreadApple:蘋果系統(MacOS、iOS)的線程。

    • FRunnableThreadAndroid:安卓系統的線程。

    • FRunnableThreadUnix:Unix系統的線程。

  • FRunnableThreadHoloLens:HoloLens系統的線程。

  • FFakeThread:假線程,多線程被禁用後的代替品,實際運行於單個線程。

FRunnable和FRunnableThread是相輔相成的,缺一而不可,一個是運行的載體,一個是運行的內容。下面是它們的一個應用示例:

// 派生FRunnable
class FMyRunnable : public FRunnable
{
	bool bStop;
public:
	virtual bool Init(void) 
	{
		bStop = false;
		return true;
	}

	virtual uint32 Run(void)
	{
		for (int32 i = 0; i < 10 && !bStop; i++)
		{
			FPlatformProcess::Sleep(1.0f);
		}

		return 0;
	}

	virtual void Stop(void)
	{
		bStop = true;
	}

	virtual void Exit(void)
	{
	}

};

void TestRunnableAndRunnableThread()
{
    // 建立Runnable對象
    FMyRunnable* MyRunnable = new FMyRunnable;
    // 建立線程, 傳入MyRunnable
    FRunnableThread* MyThread = FRunnableThread::Create(MyRunnable, TEXT("MyRunnable"));
	
    // 暫停當前線程
    FPlatformProcess::Sleep(4.0f);

    // 等待線程結束
    MyRunnable->Stop();
    MyThread->WaitForCompletion();

    // 清理數據.
    delete MyThread;
    delete MyRunnable;
}

細心的同窗應該有注意到,建立線程的時候,會將線程加入到FThreadManager中,也就是說全部的線程都由FThreadManager來管理。如下是FThreadManager的聲明:

// Engine\Source\Runtime\Core\Public\HAL\ThreadManager.h

class FThreadManager
{
	FCriticalSection ThreadsCritical; // 修改線程列表Threads的臨界區
	static bool bIsInitialized;

	TMap<uint32, class FRunnableThread*, TInlineSetAllocator<256>> Threads; // 線程列表, 注意數據結構是Map, Key是線程ID.

public:
	void AddThread(uint32 ThreadId, class FRunnableThread* Thread); // 增長線程
	void RemoveThread(class FRunnableThread* Thread); // 刪除線程

	void Tick(); // 幀更新, 只對FFakeThread起做用.

	const FString& GetThreadName(uint32 ThreadId);
	void ForEachThread(TFunction<void(uint32, class FRunnableThread*)> Func); // 遍歷線程
	
    static bool IsInitialized();
	static FThreadManager& Get();
};

2.4.2.3 QueuedWork

本節將闡述UE的隊列化QueuedWork體系,包含IQueuedWork、TAsyncQueuedWork、FQueuedThreadPool、FQueuedThreadPoolBase等。

  • IQueuedWork和TAsyncQueuedWork

IQueuedWork是一組抽象接口,存儲着一組隊列化的任務對象,會被FQueuedThreadPool線程池對象執行。IQueuedWork的接口以下:

// Engine\Source\Runtime\Core\Public\Misc\IQueuedWork.h

class IQueuedWork
{
public:
	virtual void DoThreadedWork() = 0; // 執行隊列化的任務.
	virtual void Abandon() = 0; // 提早放棄執行, 並通知隊列裏的全部對象清理數據.
};

因爲IQueuedWork只是抽象類,並無實際執行代碼,故而主要子類TAsyncQueuedWork承擔了實現代碼的任務,如下是TAsyncQueuedWork的聲明和實現:

// Engine\Source\Runtime\Core\Public\Async\Async.h

template<typename ResultType>
class TAsyncQueuedWork : public IQueuedWork
{
public:
	virtual void DoThreadedWork() override
	{
		SetPromise(Promise, Function);
		delete this;
	}

	virtual void Abandon() override
	{
		// not supported
	}

private:
	TUniqueFunction<ResultType()> Function; // 被執行的函數列表.
	TPromise<ResultType> Promise; // 用於同步的對象
};
  • FQueuedThreadPool和FQueuedThreadPoolBase

與FRunnable和FRunnableThread相似,TAsyncQueuedWork也不能獨立地執行任務,須要依賴FQueuedThreadPool來執行。下面是FQueuedThreadPool的聲明:

// Engine\Source\Runtime\Core\Public\Misc\QueuedThreadPool.h

// 執行IQueuedWork任務列表的線程池.
class FQueuedThreadPool
{
public:
    // 建立指定數量、棧大小和優先級的線程。
	virtual bool Create( uint32 InNumQueuedThreads, uint32 StackSize = (32 * 1024), EThreadPriority ThreadPriority=TPri_Normal ) = 0;
    // 銷燬線程內的後臺線程.
	virtual void Destroy() = 0;
    // 加入隊列化任務. 若是有可用的線程, 則當即執行; 不然會稍後再執行.
	virtual void AddQueuedWork( IQueuedWork* InQueuedWork ) = 0;
    // 撤銷指定隊列化任務.
	virtual bool RetractQueuedWork( IQueuedWork* InQueuedWork ) = 0;
    // 獲取線程數量.
	virtual int32 GetNumThreads() const = 0;

public:
    // 建立線程池對象.
	static FQueuedThreadPool* Allocate();
    // 重寫棧大小.
	static uint32 OverrideStackSize;
};

上面能夠看出,FQueuedThreadPool是抽象類,只提供接口,並無實現。實際上,實現是在FQueuedThreadPoolBase中,以下:

// Engine\Source\Runtime\Core\Private\HAL\ThreadingBase.cpp

class FQueuedThreadPoolBase : public FQueuedThreadPool
{
protected:
	TArray<IQueuedWork*> QueuedWork; // 須要執行的任務列表
	TArray<FQueuedThread*> QueuedThreads; // 線程池內的可用線程
	TArray<FQueuedThread*> AllThreads;    // 線程池內的全部線程
	FCriticalSection* SynchQueue; // 同步臨界區
	bool TimeToDie; // 超時標記

public:
	FQueuedThreadPoolBase()
		: SynchQueue(nullptr)
		, TimeToDie(0)
	{ }
	virtual ~FQueuedThreadPoolBase()
	{
		Destroy();
	}

	virtual bool Create(uint32 InNumQueuedThreads,uint32 StackSize = (32 * 1024),EThreadPriority ThreadPriority=TPri_Normal) override
	{
		// 處理同步鎖.
		bool bWasSuccessful = true;
		check(SynchQueue == nullptr);
		SynchQueue = new FCriticalSection();
		FScopeLock Lock(SynchQueue);
		// Presize the array so there is no extra memory allocated
		check(QueuedThreads.Num() == 0);
		QueuedThreads.Empty(InNumQueuedThreads);

		if( OverrideStackSize > StackSize )
		{
			StackSize = OverrideStackSize;
		}

		// 建立線程, 注意建立的是FQueuedThread.
		for (uint32 Count = 0; Count < InNumQueuedThreads && bWasSuccessful == true; Count++)
		{
			FQueuedThread* pThread = new FQueuedThread();
			// 利用FQueuedThread對象建立真正的線程.
			if (pThread->Create(this,StackSize,ThreadPriority) == true)
			{
				QueuedThreads.Add(pThread);
				AllThreads.Add(pThread);
			}
			else
			{
				// 建立失敗, 清理線程對象.
				bWasSuccessful = false;
				delete pThread;
			}
		}
		// 建立線程池失敗, 清理數據.
		if (bWasSuccessful == false)
		{
			Destroy();
		}
		return bWasSuccessful;
	}

	virtual void Destroy() override
	{
		if (SynchQueue)
		{
			{
				FScopeLock Lock(SynchQueue);
				TimeToDie = 1;
				FPlatformMisc::MemoryBarrier();
				// Clean up all queued objects
				for (int32 Index = 0; Index < QueuedWork.Num(); Index++)
				{
					QueuedWork[Index]->Abandon();
				}
				// Empty out the invalid pointers
				QueuedWork.Empty();
			}
			// 等待全部線程執行完成, 注意這裏並無使用同步時間, 而是使用相似自旋鎖的機制.
			while (1)
			{
				{
                    // 訪問AllThreads和QueuedThreads的數據時先鎖定臨界區. 防止其它線程修改數據.
					FScopeLock Lock(SynchQueue);
					if (AllThreads.Num() == QueuedThreads.Num())
					{
						break;
					}
				}
				FPlatformProcess::Sleep(0.0f); // 切換當前線程時間片, 防止當前線程佔用cpu時鐘.
			}
			// 刪除全部線程.
			{
				FScopeLock Lock(SynchQueue);
				// Now tell each thread to die and delete those
				for (int32 Index = 0; Index < AllThreads.Num(); Index++)
				{
					AllThreads[Index]->KillThread();
					delete AllThreads[Index];
				}
				QueuedThreads.Empty();
				AllThreads.Empty();
			}
            // 刪除同步鎖.
			delete SynchQueue;
			SynchQueue = nullptr;
		}
	}

	int32 GetNumQueuedJobs() const
	{
		return QueuedWork.Num();
	}
	virtual int32 GetNumThreads() const 
	{
		return AllThreads.Num();
	}
    
    // 加入隊列化任務.
	void AddQueuedWork(IQueuedWork* InQueuedWork) override
	{
		check(InQueuedWork != nullptr);

		if (TimeToDie)
		{
			InQueuedWork->Abandon();
			return;
		}

		check(SynchQueue);

		FQueuedThread* Thread = nullptr;

		{
            // 操做線程池裏的全部數據前都須要鎖定臨界區.
			FScopeLock sl(SynchQueue);
			const int32 AvailableThreadCount = QueuedThreads.Num();
            
            // 沒有可用線程, 加入任務隊列, 稍後再執行.
			if (AvailableThreadCount == 0)
			{
				QueuedWork.Add(InQueuedWork);
				return;
			}
			
            // 從可用線程池中獲取一個線程, 並將其從可用線程池中刪除.
			const int32 ThreadIndex = AvailableThreadCount - 1;

			Thread = QueuedThreads[ThreadIndex];
			QueuedThreads.RemoveAt(ThreadIndex, 1, /* do not allow shrinking */ false);
		}

		// 執行任務
		Thread->DoWork(InQueuedWork);
	}

	virtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) override
	{
		if (TimeToDie)
		{
			return false; // no special consideration for this, refuse the retraction and let shutdown proceed
		}
		check(InQueuedWork != nullptr);
		check(SynchQueue);
		FScopeLock sl(SynchQueue);
		return !!QueuedWork.RemoveSingle(InQueuedWork);
	}
	
    // 若是有可用任務,則獲取一個並執行, 不然將線程迴歸可用線程池. 此接口由FQueuedThread調用.
	IQueuedWork* ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread)
	{
		check(InQueuedThread != nullptr);
		IQueuedWork* Work = nullptr;
		// Check to see if there is any work to be done
		FScopeLock sl(SynchQueue);
		if (TimeToDie)
		{
			check(!QueuedWork.Num());  // we better not have anything if we are dying
		}
		if (QueuedWork.Num() > 0)
		{
			// Grab the oldest work in the queue. This is slower than
			// getting the most recent but prevents work from being
			// queued and never done
			Work = QueuedWork[0];
			// Remove it from the list so no one else grabs it
			QueuedWork.RemoveAt(0, 1, /* do not allow shrinking */ false);
		}
		if (!Work)
		{
			// There was no work to be done, so add the thread to the pool
			QueuedThreads.Add(InQueuedThread);
		}
		return Work;
	}
};

上面的接口ReturnToPoolOrGetNextJob並不是FQueuedThreadPoolBase調用,而是由正在執行任務且執行完畢的FQueuedThread對象主動調用,以下所示:

uint32 FQueuedThread::Run()
{
	while (!TimeToDie.Load(EMemoryOrder::Relaxed))
	{
		bool bContinueWaiting = true;
        
        (......)
		
        // 讓事件等待.
		if (bContinueWaiting)
		{
			DoWorkEvent->Wait();
		}

		IQueuedWork* LocalQueuedWork = QueuedWork;
		QueuedWork = nullptr;
		FPlatformMisc::MemoryBarrier();
		check(LocalQueuedWork || TimeToDie.Load(EMemoryOrder::Relaxed)); // well you woke me up, where is the job or termination request?
        // 不斷地從線程池獲取任務並執行, 直到線程池的全部任務執行完畢.
		while (LocalQueuedWork)
		{
			// 執行任務.
			LocalQueuedWork->DoThreadedWork();
			// 從線程池獲取下一個任務.
			LocalQueuedWork = OwningThreadPool->ReturnToPoolOrGetNextJob(this);
		}
	}
	return 0;
}

從上面能夠看出,FQueuedThreadPool和FQueuedThread的數據和接口巧妙地配合,從而並行化地執行任務。

  • GThreadPool

線程池的機制已經講述完畢,下面講一下UE的全局線程池GThreadPool的初始化過程,此過程在FEngineLoop::PreInitPreStartupScreen中,1.4.6.1 引擎預初始化已經有說起:

// Engine\Source\Runtime\Launch\Private\LaunchEngineLoop.cpp

int32 FEngineLoop::PreInitPreStartupScreen(const TCHAR* CmdLine)
{
	(......)
    
	{
		TRACE_THREAD_GROUP_SCOPE("ThreadPool");
        // 建立全局線程池
        GThreadPool = FQueuedThreadPool::Allocate();
        int32 NumThreadsInThreadPool = FPlatformMisc::NumberOfWorkerThreadsToSpawn();

        // 若是是純服務器模式, 線程池只有一個線程.
        if (FPlatformProperties::IsServerOnly())
        {
            NumThreadsInThreadPool = 1;
        }
        // 建立工做線程相等的線程數量.
        verify(GThreadPool->Create(NumThreadsInThreadPool, StackSize * 1024, TPri_SlightlyBelowNormal));
	}
    
    (......)
}

若是須要GThreadPool爲咱們作事,則使用示例以下:

// Engine\Source\Runtime\Engine\Private\ShadowMap.cpp

// 多線程編碼紋理
if (bMultithreadedEncode)
{
    // 完成的任務計數器.
    FThreadSafeCounter Counter(PendingTextures.Num());
    // 待編碼的紋理任務列表
    TArray<FAsyncEncode<FShadowMapPendingTexture>> AsyncEncodeTasks;
    AsyncEncodeTasks.Empty(PendingTextures.Num());
    // 建立全部任務, 加入到AsyncEncodeTasks列表中.
    for (auto& PendingTexture : PendingTextures)
    {
        PendingTexture.CreateUObjects();
        // 建立AsyncEncodeTask
        auto AsyncEncodeTask = new (AsyncEncodeTasks)FAsyncEncode<FShadowMapPendingTexture>(&PendingTexture, LightingScenario, Counter, TextureCompressorModule);
        // 將AsyncEncodeTask加入全局線程池並執行.
        GThreadPool->AddQueuedWork(AsyncEncodeTask);
    }
	// 若是還有任務未完成, 則讓當前線程進入睡眠狀態.
    while (Counter.GetValue() > 0)
    {
        GWarn->UpdateProgress(Counter.GetValue(), PendingTextures.Num());
        FPlatformProcess::Sleep(0.0001f);
    }
}

2.4.2.4 TaskGraph

TaskGraph直譯是任務圖,使用的圖是DAG(Directed Acyclic Graph,有向非循環圖),能夠指定依賴關係,指定前序和後序任務,但不能有循環依賴。它是UE內迄今爲止最爲複雜的並行任務系統,涉及的概念、運行機制的複雜度都陡增,本節將花大篇幅描述它們,旨在闡述清楚它們的機制和原理。

  • FBaseGraphTask

FBaseGraphTask是運行於TaskGraph的任務,是個基礎父類,其派生的具體任務子類纔會執行任務。它的聲明(節選)以下:

// Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.h

class FBaseGraphTask
{
protected:
	FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding);
    
    // 先決任務完成或部分地完成.
	void PrerequisitesComplete(ENamedThreads::Type CurrentThread, int32 NumAlreadyFinishedPrequistes, bool bUnlock = true);
	
    // 帶條件(前置任務都已經執行完畢)地執行任務
	void ConditionalQueueTask(ENamedThreads::Type CurrentThread)
	{
		if (NumberOfPrerequistitesOutstanding.Decrement()==0)
		{
			QueueTask(CurrentThread);
		}
	}

private:
    // 真正地執行任務, 由子類實現.
	virtual void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread)=0;
	
    // 加入到TaskGraph任務隊列中.
	void QueueTask(ENamedThreads::Type CurrentThreadIfKnown)
	{
		checkThreadGraph(LifeStage.Increment() == int32(LS_Queued));
		FTaskGraphInterface::Get().QueueTask(this, ThreadToExecuteOn, CurrentThreadIfKnown);
	}

	ENamedThreads::Type ThreadToExecuteOn; // 執行任務的線程類型
	FThreadSafeCounter  NumberOfPrerequistitesOutstanding; // 執行任務前的計數器
};
  • TGraphTask

FBaseGraphTask的惟一子類TGraphTask承接了完成執行任務的代碼。TGraphTask的聲明和實現以下:

template<typename TTask>
class TGraphTask final : public FBaseGraphTask
{
public:
    // 構造任務的輔助類.
	class FConstructor
	{
	public:
		// 建立TTask任務對象, 而後設置TGraphTask任務的數據, 以便在適當時機執行.
		template<typename...T>
		FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args)
		{
			new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...);
			return Owner->Setup(Prerequisites, CurrentThreadIfKnown);
		}

		// 建立TTask任務對象, 而後設置TGraphTask任務的數據, 並持有但不執行.
		template<typename...T>
		TGraphTask* ConstructAndHold(T&&... Args)
		{
			new ((void *)&Owner->TaskStorage) TTask(Forward<T>(Args)...);
			return Owner->Hold(Prerequisites, CurrentThreadIfKnown);
		}

	private:
		TGraphTask*				Owner; // 所在的TGraphTask對象.
		const FGraphEventArray*	Prerequisites; // 先決任務.
		ENamedThreads::Type		CurrentThreadIfKnown;
	};

	// 建立任務, 注意返回的是FConstructor對象, 以便對任務執行後續操做.
	static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
	{
		int32 NumPrereq = Prerequisites ? Prerequisites->Num() : 0;
		if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE)
		{
			void *Mem = FBaseGraphTask::GetSmallTaskAllocator().Allocate();
			return FConstructor(new (Mem) TGraphTask(TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent(), NumPrereq), Prerequisites, CurrentThreadIfKnown);
		}
		return FConstructor(new TGraphTask(TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent(), NumPrereq), Prerequisites, CurrentThreadIfKnown);
	}

	void Unlock(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
	{
		ConditionalQueueTask(CurrentThreadIfKnown);
	}
	
	FGraphEventRef GetCompletionEvent()
	{
		return Subsequents;
	}

private:
	// 執行任務
	void ExecuteTask(TArray<FBaseGraphTask*>& NewTasks, ENamedThreads::Type CurrentThread) override
	{
		(......)
		
        // 處理後續任務.
		if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents)
		{
			Subsequents->CheckDontCompleteUntilIsEmpty(); // we can only add wait for tasks while executing the task
		}
		
        // 執行任務
		TTask& Task = *(TTask*)&TaskStorage;
		{
			FScopeCycleCounter Scope(Task.GetStatId(), true); 
			Task.DoTask(CurrentThread, Subsequents);
			Task.~TTask();
			checkThreadGraph(ENamedThreads::GetThreadIndex(CurrentThread) <= ENamedThreads::GetRenderThread() || FMemStack::Get().IsEmpty()); // you must mark and pop memstacks if you use them in tasks! Named threads are excepted.
		}
		
		TaskConstructed = false;
		
        // 執行後序任務.
		if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents)
		{
			FPlatformMisc::MemoryBarrier();
			Subsequents->DispatchSubsequents(NewTasks, CurrentThread);
		}
		
        // 釋聽任務對象數據.
		if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE)
		{
			this->TGraphTask::~TGraphTask();
			FBaseGraphTask::GetSmallTaskAllocator().Free(this);
		}
		else
		{
			delete this;
		}
	}
	
    // 設置先決任務.
	void SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock)
	{
		checkThreadGraph(!TaskConstructed);
		TaskConstructed = true;
		TTask& Task = *(TTask*)&TaskStorage;
		SetThreadToExecuteOn(Task.GetDesiredThread());
		int32 AlreadyCompletedPrerequisites = 0;
		if (Prerequisites)
		{
			for (int32 Index = 0; Index < Prerequisites->Num(); Index++)
			{
				check((*Prerequisites)[Index]);
				if (!(*Prerequisites)[Index]->AddSubsequent(this))
				{
					AlreadyCompletedPrerequisites++;
				}
			}
		}
		PrerequisitesComplete(CurrentThreadIfKnown, AlreadyCompletedPrerequisites, bUnlock);
	}

	// 設置任務數據.
	FGraphEventRef Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
	{
		FGraphEventRef ReturnedEventRef = Subsequents; // very important so that this doesn't get destroyed before we return
		SetupPrereqs(Prerequisites, CurrentThreadIfKnown, true);
		return ReturnedEventRef;
	}

	// 持有任務數據.
	TGraphTask* Hold(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
	{
		SetupPrereqs(Prerequisites, CurrentThreadIfKnown, false);
		return this;
	}

	// 建立任務.
	static FConstructor CreateTask(FGraphEventRef SubsequentsToAssume, const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
	{
		if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE)
		{
			void *Mem = FBaseGraphTask::GetSmallTaskAllocator().Allocate();
			return FConstructor(new (Mem) TGraphTask(SubsequentsToAssume, Prerequisites ? Prerequisites->Num() : 0), Prerequisites, CurrentThreadIfKnown);
		}
		return FConstructor(new TGraphTask(SubsequentsToAssume, Prerequisites ? Prerequisites->Num() : 0), Prerequisites, CurrentThreadIfKnown);
	}

	TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage; // 被執行的任務對象.
	bool						TaskConstructed;
	FGraphEventRef				Subsequents; // 後續任務同步對象.
};
  • TAsyncGraphTask

上面可知TGraphTask雖然是任務,但它執行的實際任務是TTask的模板類,UE的註釋裏邊給出了TTask的基本形式:

class FGenericTask
{
	TSomeType	SomeArgument;
public:
	FGenericTask(TSomeType InSomeArgument) // 不能用引用, 可用指針代替之.
		: SomeArgument(InSomeArgument)
	{
		// Usually the constructor doesn't do anything except save the arguments for use in DoWork or GetDesiredThread.
	}
	~FGenericTask()
	{
		// you will be destroyed immediately after you execute. Might as well do cleanup in DoWork, but you could also use a destructor.
	}
	FORCEINLINE TStatId GetStatId() const
	{
		RETURN_QUICK_DECLARE_CYCLE_STAT(FGenericTask, STATGROUP_TaskGraphTasks);
	}

	[static] ENamedThreads::Type GetDesiredThread()
	{
		return ENamedThreads::[named thread or AnyThread];
	}
	void DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent)
	{
		// The arguments are useful for setting up other tasks. 
		// Do work here, probably using SomeArgument.
		MyCompletionGraphEvent->DontCompleteUntil(TGraphTask<FSomeChildTask>::CreateTask(NULL,CurrentThread).ConstructAndDispatchWhenReady());
	}
};

然而,咱們若是須要定製本身的任務,直接使用或派生TAsyncGraphTask類便可,無需另起爐竈。TAsyncGraphTask和其父類FAsyncGraphTaskBase聲明以下:

// Engine\Source\Runtime\Core\Public\Async\Async.h

// 後序任務模式
namespace ESubsequentsMode
{
	enum Type
	{
		TrackSubsequents, // 追蹤後序任務
		FireAndForget     // 無需追蹤任務依賴, 能夠避免線程同步, 提高執行效率.
	};
}

class FAsyncGraphTaskBase
{
public:
	TStatId GetStatId() const
	{
		return GET_STATID(STAT_TaskGraph_OtherTasks);
	}
	
    // 任務後序模式.
	static ESubsequentsMode::Type GetSubsequentsMode()
	{
		return ESubsequentsMode::FireAndForget;
	}
};

template<typename ResultType>
class TAsyncGraphTask : public FAsyncGraphTaskBase
{
public:
    // 構造任務, InFunction就是須要執行的代碼段.
	TAsyncGraphTask(TUniqueFunction<ResultType()>&& InFunction, TPromise<ResultType>&& InPromise, ENamedThreads::Type InDesiredThread = ENamedThreads::AnyThread)
		: Function(MoveTemp(InFunction))
		, Promise(MoveTemp(InPromise))
		, DesiredThread(InDesiredThread)
	{ }

public:
    // 執行任務
	void DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent)
	{
		SetPromise(Promise, Function);
	}

	ENamedThreads::Type GetDesiredThread()
	{
		return DesiredThread;
	}

	TFuture<ResultType> GetFuture()
	{
		return Promise.GetFuture();
	}

private:
	TUniqueFunction<ResultType()> Function; // 被執行的函數對象.
	TPromise<ResultType> Promise; // 同步對象.
	ENamedThreads::Type DesiredThread; // 指望執行的線程類型.
};
  • FTaskThreadBase

FTaskThreadBase是執行任務的線程父類,定義了一組設置、操做任務的接口,聲明以下:

class FTaskThreadBase : public FRunnable, FSingleThreadRunnable
{
public:
	FTaskThreadBase()
		: ThreadId(ENamedThreads::AnyThread)
		, PerThreadIDTLSSlot(0xffffffff)
		, OwnerWorker(nullptr)
	{
		NewTasks.Reset(128);
	}

	// 設置數據.
	void Setup(ENamedThreads::Type InThreadId, uint32 InPerThreadIDTLSSlot, FWorkerThread* InOwnerWorker)
	{
		ThreadId = InThreadId;
		check(ThreadId >= 0);
		PerThreadIDTLSSlot = InPerThreadIDTLSSlot;
		OwnerWorker = InOwnerWorker;
	}

	// 從當前線程初始化.
	void InitializeForCurrentThread()
	{
        // 設置平臺相關的TLS.
		FPlatformTLS::SetTlsValue(PerThreadIDTLSSlot, OwnerWorker);
	}

	ENamedThreads::Type GetThreadId() const;
	
    // 用於帶名字的線程處理任務直到線程空閒或RequestQuit被調用.
	virtual void ProcessTasksUntilQuit(int32 QueueIndex) = 0;

	// 用於帶名字的線程處理任務直到線程空閒或RequestQuit被調用.
	virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex);
    
    // 請求退出. 會致使線程空閒時退出到調用者. 若是是帶名字的線程, 在ProcessTasksUntilQuit中用以返回給調用者; 無名線程則直接關閉.
	virtual void RequestQuit(int32 QueueIndex) = 0;

    // 入隊任務, 假設this線程和當前線程同樣. 若是是帶名字的線程, 會直接進入私有的隊列.
	virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task);

    // 入隊任務, 假設this線程和當前線程不同.
	virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task);
	
    // 喚醒線程.
	virtual void WakeUp();
	
    // 查詢任務是否在處理中.
	virtual bool IsProcessingTasks(int32 QueueIndex) = 0;

	// 單線程幀更新
	virtual void Tick() override
	{
		ProcessTasksUntilIdle(0);
	}

	// FRunnable API

	virtual bool Init() override
	{
		InitializeForCurrentThread();
		return true;
	}
	virtual uint32 Run() override
	{
		check(OwnerWorker); // make sure we are started up
		ProcessTasksUntilQuit(0);
		FMemory::ClearAndDisableTLSCachesOnCurrentThread();
		return 0;
	}
	virtual void Stop() override
	{
		RequestQuit(-1);
	}
	virtual void Exit() override
	{
	}
	virtual FSingleThreadRunnable* GetSingleThreadInterface() override
	{
		return this;
	}

protected:
	ENamedThreads::Type		ThreadId; // 線程id(線程索引)
	uint32					PerThreadIDTLSSlot; // TLS槽.
	FThreadSafeCounter		IsStalled; // 阻塞計數器. 用於觸發阻塞信號.
	TArray<FBaseGraphTask*> NewTasks; // 待處理的任務列表.
	FWorkerThread* OwnerWorker; // 所在的工做線程對象.
};

FTaskThreadBase只是抽象類,具體的實現由子類FNamedTaskThread和FTaskThreadAnyThread完成。

其中FNamedTaskThread處理帶名字線程的任務:

// 帶名字的任務線程.
class FNamedTaskThread : public FTaskThreadBase
{
public:
    // 用於帶名字的線程處理任務直到線程空閒或RequestQuit被調用.
	virtual void ProcessTasksUntilQuit(int32 QueueIndex) override
	{
		check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up

		Queue(QueueIndex).QuitForReturn = false;
		verify(++Queue(QueueIndex).RecursionGuard == 1);
        
        // 不斷地循環處理隊列任務, 直到退出、關閉或平臺不支持多線程。
		do
		{
			ProcessTasksNamedThread(QueueIndex, FPlatformProcess::SupportsMultithreading());
		} while (!Queue(QueueIndex).QuitForReturn && !Queue(QueueIndex).QuitForShutdown && FPlatformProcess::SupportsMultithreading()); // @Hack - quit now when running with only one thread.
		verify(!--Queue(QueueIndex).RecursionGuard);
	}
	
    // 用於帶名字的線程處理任務直到線程空閒或RequestQuit被調用.
	virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override
	{
		check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up

		Queue(QueueIndex).QuitForReturn = false;
		verify(++Queue(QueueIndex).RecursionGuard == 1);
		uint64 ProcessedTasks = ProcessTasksNamedThread(QueueIndex, false);
		verify(!--Queue(QueueIndex).RecursionGuard);
		return ProcessedTasks;
	}
	
    // 處理任務.
	uint64 ProcessTasksNamedThread(int32 QueueIndex, bool bAllowStall)
	{
		uint64 ProcessedTasks = 0;

		(......)
        
		TStatId StallStatId;
		bool bCountAsStall = false;
        
        (......)

		while (!Queue(QueueIndex).QuitForReturn)
		{
            // 從隊列首部獲取任務.
			FBaseGraphTask* Task = Queue(QueueIndex).StallQueue.Pop(0, bAllowStall);
			TestRandomizedThreads();
			if (!Task)
			{
				if (bAllowStall)
				{
					{
						FScopeCycleCounter Scope(StallStatId);
						Queue(QueueIndex).StallRestartEvent->Wait(MAX_uint32, bCountAsStall);
						if (Queue(QueueIndex).QuitForShutdown)
						{
							return ProcessedTasks;
						}
						TestRandomizedThreads();
					}
					continue;
				}
				else
				{
					break; // we were asked to quit
				}
			}
			else // 任務不爲空
			{
                // 執行任務.
				Task->Execute(NewTasks, ENamedThreads::Type(ThreadId | (QueueIndex << ENamedThreads::QueueIndexShift)));
				ProcessedTasks++;
				TestRandomizedThreads();
			}
		}
		return ProcessedTasks;
	}
    
	virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task) override
	{
		checkThreadGraph(Task && Queue(QueueIndex).StallRestartEvent); // make sure we are started up
		uint32 PriIndex = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn) ? 0 : 1;
		int32 ThreadToStart = Queue(QueueIndex).StallQueue.Push(Task, PriIndex);
		check(ThreadToStart < 0); // if I am stalled, then how can I be queueing a task?
	}

	virtual void RequestQuit(int32 QueueIndex) override
	{
		// this will not work under arbitrary circumstances. For example you should not attempt to stop threads unless they are known to be idle.
		if (!Queue(0).StallRestartEvent)
		{
			return;
		}
		if (QueueIndex == -1)
		{
			// we are shutting down
			checkThreadGraph(Queue(0).StallRestartEvent); // make sure we are started up
			checkThreadGraph(Queue(1).StallRestartEvent); // make sure we are started up
			Queue(0).QuitForShutdown = true;
			Queue(1).QuitForShutdown = true;
			Queue(0).StallRestartEvent->Trigger();
			Queue(1).StallRestartEvent->Trigger();
		}
		else
		{
			checkThreadGraph(Queue(QueueIndex).StallRestartEvent); // make sure we are started up
			Queue(QueueIndex).QuitForReturn = true;
		}
	}

	virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task) override
	{
		TestRandomizedThreads();
		checkThreadGraph(Task && Queue(QueueIndex).StallRestartEvent); // make sure we are started up

		uint32 PriIndex = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn) ? 0 : 1;
		int32 ThreadToStart = Queue(QueueIndex).StallQueue.Push(Task, PriIndex);

		if (ThreadToStart >= 0)
		{
			QUICK_SCOPE_CYCLE_COUNTER(STAT_TaskGraph_EnqueueFromOtherThread_Trigger);
			checkThreadGraph(ThreadToStart == 0);
			TASKGRAPH_SCOPE_CYCLE_COUNTER(1, STAT_TaskGraph_EnqueueFromOtherThread_Trigger);
			Queue(QueueIndex).StallRestartEvent->Trigger();
			return true;
		}
		return false;
	}

	virtual bool IsProcessingTasks(int32 QueueIndex) override
	{
		return !!Queue(QueueIndex).RecursionGuard;
	}

private:
    // 線程任務隊列.
	struct FThreadTaskQueue
	{
		FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2> StallQueue; // 阻塞的任務隊列.

		uint32 RecursionGuard; // 防止循環(遞歸)調用.
		bool QuitForReturn; // 是否請求退出.
		bool QuitForShutdown; // 是否請求關閉.
		FEvent*	StallRestartEvent; // 當線程滿載時的阻塞事件.
	};

	FORCEINLINE FThreadTaskQueue& Queue(int32 QueueIndex)
	{
		checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues);
		return Queues[QueueIndex];
	}
	FORCEINLINE const FThreadTaskQueue& Queue(int32 QueueIndex) const
	{
		checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues);
		return Queues[QueueIndex];
	}

	FThreadTaskQueue Queues[ENamedThreads::NumQueues]; // 帶名字線程專用的任務隊列.
};

FTaskThreadAnyThread用於處理無名線程的任務,因爲無名線程有不少個,因此處理任務時和FNamedTaskThread有所不一樣:

class FTaskThreadAnyThread : public FTaskThreadBase
{
public:
	virtual void ProcessTasksUntilQuit(int32 QueueIndex) override
	{
		if (PriorityIndex != (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift))
		{
			FMemory::SetupTLSCachesOnCurrentThread();
		}
		check(!QueueIndex);
		do
		{
            // 處理任務
			ProcessTasks();			
		} while (!Queue.QuitForShutdown && FPlatformProcess::SupportsMultithreading()); // @Hack - quit now when running with only one thread.
	}

	virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override
	{
		if (!FPlatformProcess::SupportsMultithreading())
		{
            // 處理任務
			return ProcessTasks();
		}
		else
		{
			check(0);
			return 0;
		}
	}
    
    (......)

private:

#if UE_EXTERNAL_PROFILING_ENABLED
	static inline const TCHAR* ThreadPriorityToName(int32 PriorityIdx)
	{
		PriorityIdx <<= ENamedThreads::ThreadPriorityShift;
		if (PriorityIdx == ENamedThreads::HighThreadPriority)
		{
			return TEXT("Task Thread HP"); // 高優先級的工做線程
		}
		else if (PriorityIdx == ENamedThreads::NormalThreadPriority)
		{
			return TEXT("Task Thread NP"); // 普通優先級的工做線程
		}
		else if (PriorityIdx == ENamedThreads::BackgroundThreadPriority)
		{
			return TEXT("Task Thread BP"); // 後臺優先級的工做線程
		}
		else
		{
			return TEXT("Task Thread Unknown Priority");
		}
	}
#endif

	// 此處的處理任務與FNamedTaskThread有區別, 在於獲取任務的方式不同, 是從TaskGraph系統中的無名任務隊列獲取任務的.
	uint64 ProcessTasks()
	{
		LLM_SCOPE(ELLMTag::TaskGraphTasksMisc);

		TStatId StallStatId;
		bool bCountAsStall = true;
		uint64 ProcessedTasks = 0;
		
        (......)
        
		verify(++Queue.RecursionGuard == 1);
		bool bDidStall = false;
		while (1)
		{
            // 從TaskGraph系統中的無名任務隊列獲取任務的.
			FBaseGraphTask* Task = FindWork();
			if (!Task)
			{
				(......)

				TestRandomizedThreads();
				if (FPlatformProcess::SupportsMultithreading())
				{
					FScopeCycleCounter Scope(StallStatId);
					Queue.StallRestartEvent->Wait(MAX_uint32, bCountAsStall);
					bDidStall = true;
				}
				if (Queue.QuitForShutdown || !FPlatformProcess::SupportsMultithreading())
				{
					break;
				}
				TestRandomizedThreads();
				
                (......)
                
				continue;
			}
			TestRandomizedThreads();
			
            (......)
            
			bDidStall = false;
			Task->Execute(NewTasks, ENamedThreads::Type(ThreadId));
			ProcessedTasks++;
			TestRandomizedThreads();
			if (Queue.bStallForTuning)
			{
				{
					FScopeLock Lock(&Queue.StallForTuning);
				}
			}
		}
		verify(!--Queue.RecursionGuard);
		return ProcessedTasks;
	}

    // 任務隊列數據.
	struct FThreadTaskQueue
	{
		FEvent* StallRestartEvent;
		uint32 RecursionGuard;
		bool QuitForShutdown;
		bool bStallForTuning;
        
		FCriticalSection StallForTuning; // 阻塞臨界區
	};

	// 從TaskGraph系統中獲取任務.
	FBaseGraphTask* FindWork()
    {
		return FTaskGraphImplementation::Get().FindWork(ThreadId);
	}

	FThreadTaskQueue Queue; // 任務隊列, 只有第一個用於無名線程.

	int32 PriorityIndex;
};
  • ENamedThreads

在理解TaskGraph的實現和使用以前,有必要理解ENamedThreads相關的機制。ENamedThreads是一個命名空間,此空間內提供了編解碼線程、優先級的操做。它的聲明和解析以下:

namespace ENamedThreads
{
	enum Type : int32
	{
		UnusedAnchor = -1,
		
        // ----專用(帶名字的)線程----
#if STATS
		StatsThread, // 統計線程
#endif
		RHIThread,   // RHI線程
		AudioThread, // 音頻線程
		GameThread,  // 遊戲線程
		ActualRenderingThread = GameThread + 1, // 實際渲染線程. GetRenderingThread()獲取的渲染多是實際渲染線程也多是遊戲線程.

		AnyThread = 0xff,  // 任意線程(未知線程, 無名線程)

        // ----隊列索引和優先級----
		MainQueue =			0x000, // 主隊列
		LocalQueue =		0x100, // 局部隊列

		NumQueues =			2,
		ThreadIndexMask =	0xff,
		QueueIndexMask =	0x100,
		QueueIndexShift =	8,

		// ----隊列任務索引、優先級----
		NormalTaskPriority =	0x000, // 普通任務優先級
		HighTaskPriority =		0x200, // 高任務優先級

		NumTaskPriorities =		2,
		TaskPriorityMask =		0x200,
		TaskPriorityShift =		9,
		
        // ----線程優先級----
		NormalThreadPriority = 0x000, // 普通線程優先級
		HighThreadPriority = 0x400,   // 高線程優先級
		BackgroundThreadPriority = 0x800, // 後臺線程優先級

		NumThreadPriorities = 3,
		ThreadPriorityMask = 0xC00,
		ThreadPriorityShift = 10,

		// 組合標記
#if STATS
		StatsThread_Local = StatsThread | LocalQueue,
#endif
		GameThread_Local = GameThread | LocalQueue,
		ActualRenderingThread_Local = ActualRenderingThread | LocalQueue,

		AnyHiPriThreadNormalTask = AnyThread | HighThreadPriority | NormalTaskPriority,
		AnyHiPriThreadHiPriTask = AnyThread | HighThreadPriority | HighTaskPriority,

		AnyNormalThreadNormalTask = AnyThread | NormalThreadPriority | NormalTaskPriority,
		AnyNormalThreadHiPriTask = AnyThread | NormalThreadPriority | HighTaskPriority,

		AnyBackgroundThreadNormalTask = AnyThread | BackgroundThreadPriority | NormalTaskPriority,
		AnyBackgroundHiPriTask = AnyThread | BackgroundThreadPriority | HighTaskPriority,
	};

	struct FRenderThreadStatics
	{
	private:
		// 存儲了渲染線程,注意是原子操做類型。
		static CORE_API TAtomic<Type> RenderThread;
		static CORE_API TAtomic<Type> RenderThread_Local;
	};

    // ----設置和獲取渲染線程接口----
	Type GetRenderThread();
	Type GetRenderThread_Local();
	void SetRenderThread(Type Thread);
	void SetRenderThread_Local(Type Thread);

	extern CORE_API int32 bHasBackgroundThreads;   // 是否有後臺線程
	extern CORE_API int32 bHasHighPriorityThreads; // 是否有高優先級線程
	
    // ----設置和獲取線程索引、線程優先級、任務優先級接口----
	Type GetThreadIndex(Type ThreadAndIndex);
	int32 GetQueueIndex(Type ThreadAndIndex);
	int32 GetTaskPriority(Type ThreadAndIndex);
	int32 GetThreadPriorityIndex(Type ThreadAndIndex);
    
	Type SetPriorities(Type ThreadAndIndex, Type ThreadPriority, Type TaskPriority);
	Type SetPriorities(Type ThreadAndIndex, int32 PriorityIndex, bool bHiPri);
	Type SetThreadPriority(Type ThreadAndIndex, Type ThreadPriority);
	Type SetTaskPriority(Type ThreadAndIndex, Type TaskPriority);
}
  • FTaskGraphInterface

上面提到了不少任務類型,本節才真正涉及這些任務的管理器和工廠FTaskGraphInterface。FTaskGraphInterface就是任務圖的管理者,提供了任務的操做接口:

class FTaskGraphInterface
{
	virtual void QueueTask(class FBaseGraphTask* Task, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) = 0;

public:
    // FTaskGraphInterface對象操做接口
	static CORE_API void Startup(int32 NumThreads);
	static CORE_API void Shutdown();
    static CORE_API bool IsRunning();
	static CORE_API FTaskGraphInterface& Get();
	
    // 線程操做接口.
	virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue = false) = 0;
	virtual	int32 GetNumWorkerThreads() = 0;
	virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) = 0;
	virtual void AttachToThread(ENamedThreads::Type CurrentThread)=0;
	virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread)=0;
	
    // 任務操做接口.
	virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread)=0;
	virtual void RequestReturn(ENamedThreads::Type CurrentThread)=0;
	virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)=0;
	virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask)=0;
	void WaitUntilTaskCompletes(const FGraphEventRef& Task, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread);
	void TriggerEventWhenTaskCompletes(FEvent* InEvent, const FGraphEventRef& Task, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask);
	virtual void AddShutdownCallback(TFunction<void()>& Callback) = 0;
	static void BroadcastSlow_OnlyUseForSpecialPurposes(bool bDoTaskThreads, bool bDoBackgroundThreads, TFunction<void(ENamedThreads::Type CurrentThread)>& Callback);
};

FTaskGraphInterface的實現是在FTaskGraphImplementation類中,FTaskGraphImplementation採用了特殊的線程對象WorkerThreads(工做線程)來做爲執行的載體,固然若是是專用的(帶名字的線程,如GameThread、RHI、ActualRenderingThread)線程,則會進入專用的任務隊列。因爲它的實現細節不少,後面再展開討論。

  • FTaskGraphImplementation

FTaskGraphImplementation繼承並實現了FTaskGraphInterface的接口,部分接口和實現以下:

// Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp

class FTaskGraphImplementation : public FTaskGraphInterface
{
public:
	static FTaskGraphImplementation& Get();

	// 構造函數, 計算任務線程數量, 建立專用線程和無名線程等.
	FTaskGraphImplementation(int32)
	{
		bCreatedHiPriorityThreads = !!ENamedThreads::bHasHighPriorityThreads;
		bCreatedBackgroundPriorityThreads = !!ENamedThreads::bHasBackgroundThreads;

		int32 MaxTaskThreads = MAX_THREADS; // 最大任務線程數量默認是83.
		int32 NumTaskThreads = FPlatformMisc::NumberOfWorkerThreadsToSpawn(); // 根據硬件核心數量獲取任務線程數量.

		// 處理不能支持多線程的平臺.
		if (!FPlatformProcess::SupportsMultithreading())
		{
			MaxTaskThreads = 1;
			NumTaskThreads = 1;
			LastExternalThread = (ENamedThreads::Type)(ENamedThreads::ActualRenderingThread - 1);
			bCreatedHiPriorityThreads = false;
			bCreatedBackgroundPriorityThreads = false;
			ENamedThreads::bHasBackgroundThreads = 0;
			ENamedThreads::bHasHighPriorityThreads = 0;
		}
		else
		{
			LastExternalThread = ENamedThreads::ActualRenderingThread;
		}
        
		// 專用線程數量
		NumNamedThreads = LastExternalThread + 1;
        // 計算工做線程集數量, 與是否開啓線程高優先級、是否建立後臺優先級線程有關。
		NumTaskThreadSets = 1 + bCreatedHiPriorityThreads + bCreatedBackgroundPriorityThreads;
        // 計算真正須要的任務線程數量, 最大不超過83個.
		NumThreads = FMath::Max<int32>(FMath::Min<int32>(NumTaskThreads * NumTaskThreadSets + NumNamedThreads, MAX_THREADS), NumNamedThreads + 1);
		NumThreads = FMath::Min(NumThreads, NumNamedThreads + NumTaskThreads * NumTaskThreadSets);

		NumTaskThreadsPerSet = (NumThreads - NumNamedThreads) / NumTaskThreadSets;

		ReentrancyCheck.Increment(); // just checking for reentrancy
		PerThreadIDTLSSlot = FPlatformTLS::AllocTlsSlot();
		
        // 建立全部任務線程.
		for (int32 ThreadIndex = 0; ThreadIndex < NumThreads; ThreadIndex++)
		{
			check(!WorkerThreads[ThreadIndex].bAttached); // reentrant?
            // 根據是否專用線程分別建立線程.
			bool bAnyTaskThread = ThreadIndex >= NumNamedThreads;
			if (bAnyTaskThread)
			{
				WorkerThreads[ThreadIndex].TaskGraphWorker = new FTaskThreadAnyThread(ThreadIndexToPriorityIndex(ThreadIndex));
			}
			else
			{
				WorkerThreads[ThreadIndex].TaskGraphWorker = new FNamedTaskThread;
			}
			WorkerThreads[ThreadIndex].TaskGraphWorker->Setup(ENamedThreads::Type(ThreadIndex), PerThreadIDTLSSlot, &WorkerThreads[ThreadIndex]);
		}

		TaskGraphImplementationSingleton = this; // 賦值this到TaskGraphImplementationSingleton, 以便外部可獲取.

        // 設置無名線程的屬性.
		for (int32 ThreadIndex = LastExternalThread + 1; ThreadIndex < NumThreads; ThreadIndex++)
		{
			FString Name;
			const ANSICHAR* GroupName = "TaskGraphNormal";
			int32 Priority = ThreadIndexToPriorityIndex(ThreadIndex);
			EThreadPriority ThreadPri;
			uint64 Affinity = FPlatformAffinity::GetTaskGraphThreadMask();
			if (Priority == 1)
			{
				Name = FString::Printf(TEXT("TaskGraphThreadHP %d"), ThreadIndex - (LastExternalThread + 1));
				GroupName = "TaskGraphHigh";
				ThreadPri = TPri_SlightlyBelowNormal; // we want even hi priority tasks below the normal threads

				// If the platform defines FPlatformAffinity::GetTaskGraphHighPriorityTaskMask then use it
				if (FPlatformAffinity::GetTaskGraphHighPriorityTaskMask() != 0xFFFFFFFFFFFFFFFF)
				{
					Affinity = FPlatformAffinity::GetTaskGraphHighPriorityTaskMask();
				}
			}
			else if (Priority == 2)
			{
				Name = FString::Printf(TEXT("TaskGraphThreadBP %d"), ThreadIndex - (LastExternalThread + 1));
				GroupName = "TaskGraphLow";
				ThreadPri = TPri_Lowest;
				// If the platform defines FPlatformAffinity::GetTaskGraphBackgroundTaskMask then use it
				if ( FPlatformAffinity::GetTaskGraphBackgroundTaskMask() != 0xFFFFFFFFFFFFFFFF )
				{
					Affinity = FPlatformAffinity::GetTaskGraphBackgroundTaskMask();
				}
			}
			else
			{
				Name = FString::Printf(TEXT("TaskGraphThreadNP %d"), ThreadIndex - (LastExternalThread + 1));
				ThreadPri = TPri_BelowNormal; // we want normal tasks below normal threads like the game thread
			}
            
            // 計算線程棧大小.
#if WITH_EDITOR
			uint32 StackSize = 1024 * 1024;
#elif ( UE_BUILD_SHIPPING || UE_BUILD_TEST )
			uint32 StackSize = 384 * 1024;
#else
			uint32 StackSize = 512 * 1024;
#endif
            // 真正地建立工做線程的執行線程.
			WorkerThreads[ThreadIndex].RunnableThread = FRunnableThread::Create(&Thread(ThreadIndex), *Name, StackSize, ThreadPri, Affinity); // these are below normal threads so that they sleep when the named threads are active
			WorkerThreads[ThreadIndex].bAttached = true;
            
			if (WorkerThreads[ThreadIndex].RunnableThread)
			{
				TRACE_SET_THREAD_GROUP(WorkerThreads[ThreadIndex].RunnableThread->GetThreadID(), GroupName);
			}
		}
	}
	
    // 入隊任務.
	virtual void QueueTask(FBaseGraphTask* Task, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type InCurrentThreadIfKnown = ENamedThreads::AnyThread) final override
	{
		TASKGRAPH_SCOPE_CYCLE_COUNTER(2, STAT_TaskGraph_QueueTask);

		if (ENamedThreads::GetThreadIndex(ThreadToExecuteOn) == ENamedThreads::AnyThread)
		{
			TASKGRAPH_SCOPE_CYCLE_COUNTER(3, STAT_TaskGraph_QueueTask_AnyThread);
            // 多線程支持下的處理.
			if (FPlatformProcess::SupportsMultithreading())
			{
                // 處理優先級.
				uint32 TaskPriority = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn);
				int32 Priority = ENamedThreads::GetThreadPriorityIndex(Task->ThreadToExecuteOn);
				if (Priority == (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift) && (!bCreatedBackgroundPriorityThreads || !ENamedThreads::bHasBackgroundThreads))
				{
					Priority = ENamedThreads::NormalThreadPriority >> ENamedThreads::ThreadPriorityShift; // we don't have background threads, promote to normal
					TaskPriority = ENamedThreads::NormalTaskPriority >> ENamedThreads::TaskPriorityShift; // demote to normal task pri
				}
				else if (Priority == (ENamedThreads::HighThreadPriority >> ENamedThreads::ThreadPriorityShift) && (!bCreatedHiPriorityThreads || !ENamedThreads::bHasHighPriorityThreads))
				{
					Priority = ENamedThreads::NormalThreadPriority >> ENamedThreads::ThreadPriorityShift; // we don't have hi priority threads, demote to normal
					TaskPriority = ENamedThreads::HighTaskPriority >> ENamedThreads::TaskPriorityShift; // promote to hi task pri
				}
                
				uint32 PriIndex = TaskPriority ? 0 : 1;
				check(Priority >= 0 && Priority < MAX_THREAD_PRIORITIES);
				{
					TASKGRAPH_SCOPE_CYCLE_COUNTER(4, STAT_TaskGraph_QueueTask_IncomingAnyThreadTasks_Push);
                    // 將任務壓入待執行隊列, 且得到並執行可執行的任務索引(可能無).
					int32 IndexToStart = IncomingAnyThreadTasks[Priority].Push(Task, PriIndex);
					if (IndexToStart >= 0)
					{
						StartTaskThread(Priority, IndexToStart);
					}
				}
				return;
			}
			else
			{
				ThreadToExecuteOn = ENamedThreads::GameThread;
			}
		}
        
        // 如下是不支持多線程的處理.
		ENamedThreads::Type CurrentThreadIfKnown;
		if (ENamedThreads::GetThreadIndex(InCurrentThreadIfKnown) == ENamedThreads::AnyThread)
		{
			CurrentThreadIfKnown = GetCurrentThread();
		}
		else
		{
			CurrentThreadIfKnown = ENamedThreads::GetThreadIndex(InCurrentThreadIfKnown);
			checkThreadGraph(CurrentThreadIfKnown == ENamedThreads::GetThreadIndex(GetCurrentThread()));
		}
		{
			int32 QueueToExecuteOn = ENamedThreads::GetQueueIndex(ThreadToExecuteOn);
			ThreadToExecuteOn = ENamedThreads::GetThreadIndex(ThreadToExecuteOn);
			FTaskThreadBase* Target = &Thread(ThreadToExecuteOn);
			if (ThreadToExecuteOn == ENamedThreads::GetThreadIndex(CurrentThreadIfKnown))
			{
				Target->EnqueueFromThisThread(QueueToExecuteOn, Task);
			}
			else
			{
				Target->EnqueueFromOtherThread(QueueToExecuteOn, Task);
			}
		}
	}

	virtual	int32 GetNumWorkerThreads() final override;
	virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue) final override;
	virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) final override;

	// 將當前線程導入到指定Index.
	virtual void AttachToThread(ENamedThreads::Type CurrentThread) final override;
    
    // ----處理任務接口----
    
	virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread) final override;
	virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread) final override;
	virtual void RequestReturn(ENamedThreads::Type CurrentThread) final override;
	virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) final override;
	virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask) final override;
	virtual void AddShutdownCallback(TFunction<void()>& Callback);

	// ----任務調度接口----

    // 開啓指定優先級和索引的任務線程.
	void StartTaskThread(int32 Priority, int32 IndexToStart);
	void StartAllTaskThreads(bool bDoBackgroundThreads);
	FBaseGraphTask* FindWork(ENamedThreads::Type ThreadInNeed);
	void StallForTuning(int32 Index, bool Stall);
	void SetTaskThreadPriorities(EThreadPriority Pri);

private:
	// 獲取指定索引的任務線程引用.
	FTaskThreadBase& Thread(int32 Index)
	{
		checkThreadGraph(Index >= 0 && Index < NumThreads);
		checkThreadGraph(WorkerThreads[Index].TaskGraphWorker->GetThreadId() == Index);
		return *WorkerThreads[Index].TaskGraphWorker;
	}

	// 獲取當前線程索引.
	ENamedThreads::Type GetCurrentThread();
	int32 ThreadIndexToPriorityIndex(int32 ThreadIndex);

	enum
	{
		MAX_THREADS = 26 * (CREATE_HIPRI_TASK_THREADS + CREATE_BACKGROUND_TASK_THREADS + 1) + ENamedThreads::ActualRenderingThread + 1,
		MAX_THREAD_PRIORITIES = 3
	};

	FWorkerThread		WorkerThreads[MAX_THREADS]; // 全部工做線程(任務線程)對象數組.
	int32				NumThreads;       // 實際上被使用的線程數量.
	int32				NumNamedThreads;  // 專用線程數量.
	int32				NumTaskThreadSets;// 任務線程集合數量.
	int32				NumTaskThreadsPerSet; // 每一個集合擁有的任務線程數量.
    
	bool				bCreatedHiPriorityThreads;
	bool				bCreatedBackgroundPriorityThreads;

	ENamedThreads::Type LastExternalThread;
	FThreadSafeCounter	ReentrancyCheck;
	uint32				PerThreadIDTLSSlot;

	TArray<TFunction<void()> > ShutdownCallbacks; // 銷燬前的回調.

	FStallingTaskQueue<FBaseGraphTask, PLATFORM_CACHE_LINE_SIZE, 2>	IncomingAnyThreadTasks[MAX_THREAD_PRIORITIES];
};

總結起來,TaskGraph會根據線程優先級、是否啓用後臺線程建立不一樣的工做線程集合,而後建立它們的FWorkerThread對象。入隊任務時,會將任務Push到任務列表IncomingAnyThreadTasks(類型是FStallingTaskQueue,線程安全的無鎖的鏈表)中,並取出可執行的任務索引,根據任務的屬性(但願在哪一個線程執行、優先級、任務索引)啓用對應的工做線程去執行。

TaskGraph涉及的工做線程FWorkerThread聲明以下:

struct FWorkerThread
{
	FTaskThreadBase*	TaskGraphWorker; // 所在的FTaskThread對象(被FTaskThread對象擁有)
	FRunnableThread*	RunnableThread;  // 真正執行任務的可運行線程.
	bool				bAttached; // 是否附加的線程.(通常用於專用線程)
};

因而可知,TaskGraph最終也是藉助FRunnableThread來執行任務。TaskGraph系統總算是和FRunnableThread聯繫起來,造成了閉環。

至此,終於將TaskGraph體系的主幹脈絡闡述完了,固然,還有不少技術細節(如同步事件、觸發細節、調度算法、無鎖鏈表以及部分概念)並無涉及,這些就留給讀者本身去研讀UE源碼探索了。

 

2.5 UE的多線程渲染

前面作了大量的基礎鋪墊,終於回到了主題,講UE的多線程渲染相關的知識。

2.5.1 UE的多線程渲染基礎

2.5.1.1 場景和渲染模塊主要類型

UE的場景和渲染模塊涉及到概念很是多,主要類型和解析以下:

類型 解析
UWorld 包含了一組能夠相互交互的Actor和組件的集合,多個關卡(Level)能夠被加載進UWorld或從UWorld卸載。能夠同時存在多個UWorld實例。
ULevel 關卡,存儲着一組Actor和組件,而且存儲在同一個文件。
USceneComponent 場景組件,是全部能夠被加入到場景的物體的父類,好比燈光、模型、霧等。
UPrimitiveComponent 圖元組件,是全部可渲染或擁有物理模擬的物體父類。是CPU層裁剪的最小粒度單位,
ULightComponent 光源組件,是全部光源類型的父類。
FScene 是UWorld在渲染模塊的表明。只有加入到FScene的物體纔會被渲染器感知到。渲染線程擁有FScene的全部狀態(遊戲線程不可直接修改)。
FPrimitiveSceneProxy 圖元場景代理,是UPrimitiveComponent在渲染器的表明,鏡像了UPrimitiveComponent在渲染線程的狀態。
FPrimitiveSceneInfo 渲染器內部狀態(描述了FRendererModule的實現),至關於融合了UPrimitiveComponent and FPrimitiveSceneProxy。只存在渲染器模塊,因此引擎模塊沒法感知到它的存在。
FSceneView 描述了FScene內的單個視圖(view),同個FScene容許有多個view,換言之,一個場景能夠被多個view繪製,或者多個view同時被繪製。每一幀都會建立新的view實例。
FViewInfo view在渲染器的內部表明,只存在渲染器模塊,引擎模塊不可見。
FSceneViewState 存儲了有關view的渲染器私有信息,這些信息須要被跨幀訪問。在Game實例,每一個ULocalPlayer擁有一個FSceneViewState實例。
FSceneRenderer 每幀都會被建立,封裝幀間臨時數據。下派生FDeferredShadingSceneRenderer(延遲着色場景渲染器)和FMobileSceneRenderer(移動端場景渲染器),分別表明PC和移動端的默認渲染器。

2.5.1.2 引擎模塊和渲染模塊表明

UE爲告終構清晰,減小模塊之間的依賴,加速迭代速度,劃分了不少模塊,最主要的有引擎模塊、渲染器模塊、核心、RHI、插件等等。上一小節提到了不少概念和類型,它們有些存在於引擎模塊(Engine Module),有些存在於渲染器模塊(Renderer Module),具體以下表:

Engine Module Renderer Module
UWorld FScene
UPrimitiveComponent / FPrimitiveSceneProxy FPrimitiveSceneInfo
FSceneView FViewInfo
ULocalPlayer FSceneViewState
ULightComponent / FLightSceneProxy FLightSceneInfo

2.5.1.3 遊戲線程和渲染線程表明

遊戲線程的對象一般作邏輯更新,在內存中有一份持久的數據,爲了不遊戲線程和渲染線程產生競爭條件,會在渲染線程額外存儲一分內存拷貝,而且使用的是另外的類型,如下是UE比較常見的類型映射關係(遊戲線程對象以U開頭,渲染線程以F開頭):

Game Thread Rendering Thread
UWorld FScene
UPrimitiveComponent FPrimitiveSceneProxy / FPrimitiveSceneInfo
- FSceneView / FViewInfo
ULocalPlayer FSceneViewState
ULightComponent FLightSceneProxy / FLightSceneInfo

遊戲線程表明通常由遊戲遊戲線程操做,渲染線程表明主要由渲染線程操做。若是嘗試跨線程操做數據,將會引起不可預料的結果,產生競爭條件。

/** SceneProxy在註冊進場景時,會在遊戲線程中被構造和傳遞數據。 */
FStaticMeshSceneProxy::FStaticMeshSceneProxy(UStaticMeshComponent* InComponent):
    FPrimitiveSceneProxy(...),
    Owner(InComponent->GetOwner()) <======== 此處將AActor指針被緩存
    ...

    /** SceneProxy的DrawDynamicElements將被渲染器在渲染線程中調用 */
    void FStaticMeshSceneProxy::DrawDynamicElements(...)
    {
        if (Owner->AnyProperty) <========== 將會引起競爭條件!  遊戲線程擁有AActor、UObject的全部狀態!!而且UObject對象可能被GC掉,此時再訪問會引發程序崩潰!!
    }

部分表明比較特殊,如FPrimitiveSceneProxy、FLightSceneProxy ,這些場景代理本屬於引擎模塊,但又屬於渲染線程專屬對象,說明它們是鏈接遊戲線程和渲染線程的橋樑,是線程間傳遞數據的工具人。

2.5.2 UE的多線程渲染總覽

默認狀況下,UE存在遊戲線程(Game Thread)、渲染線程(Render Thread)、RHI線程(RHI Thread),它們都獨立地運行在專門的線程上(FRunnableThread)。

遊戲線程經過某些接口向渲染線程的Queue入隊回調接口,以便渲染線程稍後運行時,從渲染線程的Queue獲取回調,一個個地執行,從而生成了Command List。

渲染線程做爲前端(frontend)產生的Command List是平臺無關的,是抽象的圖形API調用;而RHI線程做爲後端(backtend)會執行和轉換渲染線程的Command List成爲指定圖形API的調用(稱爲Graphical Command),並提交到GPU執行。這些線程處理的數據一般是不一樣幀的,譬如遊戲線程處理N幀數據,渲染線程和RHI線程處理N-1幀數據。

但也存在例外,好比渲染線程和RHI線程運行很快,幾乎不存在延遲,這種狀況下,遊戲線程處理N幀,而渲染線程可能處理N或N-1幀,RHI線程也可能在轉換N或N-1幀。可是,渲染線程不能落後遊戲線程一幀,不然遊戲線程會卡住,直到渲染線程處理全部指令。

除此以外,渲染指令是能夠並行地被生成,RHI線程也能夠並行地轉換這些指令,以下所示:

UE4並行生成Command list示意圖。

開啓多線程渲染帶來的收益是幀率更高,幀間變化頻率下降(幀率更穩定)。以Fortnite(堡壘之夜)移動端爲例,在開啓RHI線程以前,渲染線程急劇地上下波動,而加了RHI線程以後,波動平緩許多,和遊戲線程基本保持一致,幀率也提高很多:

2.5.3 遊戲線程和渲染線程的實現

2.5.3.1 遊戲線程的實現

遊戲線程被稱爲主線程,是引擎運行的心臟,承載主要的遊戲邏輯、運行流程的工做,也是其它線程的數據發起者。

遊戲線程的建立是運行程序入口的線程,由系統啓動進程時被同時建立的(由於進程至少須要一個線程來工做),在引擎啓動時直接存儲到全局變量中,且稍後會設置到TaskGraph系統中:

// Engine\Source\Runtime\Launch\Private\LaunchEngineLoop.cpp

int32 FEngineLoop::PreInitPreStartupScreen(const TCHAR* CmdLine)
{
	(......)
    
    // 獲取當前線程id, 存儲到全局變量中.
	GGameThreadId = FPlatformTLS::GetCurrentThreadId();
	GIsGameThreadIdInitialized = true;

	FPlatformProcess::SetThreadAffinityMask(FPlatformAffinity::GetMainGameMask());
    // 設置遊戲線程數據(但不少平臺都是空的實現體)
	FPlatformProcess::SetupGameThread();
    
    (......)
    
    if (bCreateTaskGraphAndThreadPools)
	{
		SCOPED_BOOT_TIMING("FTaskGraphInterface::Startup");
		FTaskGraphInterface::Startup(FPlatformMisc::NumberOfCores());
        // 將當前線程(主線程)附加到TaskGraph的GameThread命名插槽中. 這樣主線程便和TaskGraph聯動了起來.
		FTaskGraphInterface::Get().AttachToThread(ENamedThreads::GameThread);
	}
}

以上代碼也說明:主線程、遊戲線程和TaskGraph系統的ENamedThreads::GameThread實際上是一回事,都是同一個線程!

通過上面的初始化和設置後,其它地方就能夠經過TaskGraph系統並行地處理任務了,也能夠訪問全局變量,以便判斷遊戲線程是否初始化完,當前線程是否遊戲線程:

bool IsInGameThread()
{
    return GIsGameThreadIdInitialized && FPlatformTLS::GetCurrentThreadId() == GGameThreadId;
}

2.5.3.2 渲染線程的實現

渲染線程與遊戲不一樣,是一條專門用於生成渲染指令和渲染邏輯的獨立線程。RenderingThread.h聲明瞭所有對外的接口,部分以下:

// Engine\Source\Runtime\RenderCore\Public\RenderingThread.h

// 是否啓用了獨立的渲染線程, 若是爲false, 則全部渲染命令會被當即執行, 而不是放入渲染命令隊列.
extern RENDERCORE_API bool GIsThreadedRendering;

// 渲染線程是否應該被建立. 一般被命令行參數或ToggleRenderingThread控制檯參數設置.
extern RENDERCORE_API bool GUseThreadedRendering;

// 是否開啓RHI線程
extern RENDERCORE_API void SetRHIThreadEnabled(bool bEnableDedicatedThread, bool bEnableRHIOnTaskThreads);

(......)

// 開啓渲染線程.
extern RENDERCORE_API void StartRenderingThread();

// 中止渲染線程.
extern RENDERCORE_API void StopRenderingThread();

// 檢查渲染線程是否健康(是否Crash), 若是crash, 則會用UE_Log輸出日誌.
extern RENDERCORE_API void CheckRenderingThreadHealth();

// 檢查渲染線程是否健康(是否Crash)
extern RENDERCORE_API bool IsRenderingThreadHealthy();

// 增長一個必須在下一個場景繪製前或flush渲染命令前完成的任務.
extern RENDERCORE_API void AddFrameRenderPrerequisite(const FGraphEventRef& TaskToAdd);

// 手機幀渲染前序任務, 保證全部渲染命令被入隊.
extern RENDERCORE_API void AdvanceFrameRenderPrerequisite();

// 等待全部渲染線程的渲染命令被執行完畢. 會卡住遊戲線程, 只能被遊戲線程調用.
extern RENDERCORE_API void FlushRenderingCommands(bool bFlushDeferredDeletes = false);

extern RENDERCORE_API void FlushPendingDeleteRHIResources_GameThread();
extern RENDERCORE_API void FlushPendingDeleteRHIResources_RenderThread();

extern RENDERCORE_API void TickRenderingTickables();

extern RENDERCORE_API void StartRenderCommandFenceBundler();
extern RENDERCORE_API void StopRenderCommandFenceBundler();

(......)

RenderingThread.h還有一個很是重要的宏ENQUEUE_RENDER_COMMAND,它的做用是向渲染線程入隊渲染指令。下面是它的聲明和實現:

// 向渲染線程入隊渲染指令, Type指明瞭渲染操做的名字.
#define ENQUEUE_RENDER_COMMAND(Type) \
	struct Type##Name \
	{  \
		static const char* CStr() { return #Type; } \
		static const TCHAR* TStr() { return TEXT(#Type); } \
	}; \
	EnqueueUniqueRenderCommand<Type##Name>

上面最後一句使用了EnqueueUniqueRenderCommand命令,繼續追蹤之:

// TSTR是渲染命令名字, LAMBDA是回調函數.
template<typename TSTR, typename LAMBDA>
FORCEINLINE_DEBUGGABLE void EnqueueUniqueRenderCommand(LAMBDA&& Lambda)
{
	typedef TEnqueueUniqueRenderCommandType<TSTR, LAMBDA> EURCType;

    // 若是在渲染線程內直接執行回調而不入隊渲染命令.
	if (IsInRenderingThread())
	{
		FRHICommandListImmediate& RHICmdList = GetImmediateCommandList_ForRenderCommand();
		Lambda(RHICmdList);
	}
	else
	{
        // 須要在獨立的渲染線程執行
		if (ShouldExecuteOnRenderThread())
		{
			CheckNotBlockedOnRenderThread();
            // 從GraphTask建立任務且在適當時候入隊渲染命令.
			TGraphTask<EURCType>::CreateTask().ConstructAndDispatchWhenReady(Forward<LAMBDA>(Lambda));
		}
		else // 不在獨立的渲染線程執行, 則直接執行.
		{
			EURCType TempCommand(Forward<LAMBDA>(Lambda));
			FScopeCycleCounter EURCMacro_Scope(TempCommand.GetStatId());
			TempCommand.DoTask(ENamedThreads::GameThread, FGraphEventRef());
		}
	}
}

上面說明若是是有獨立的渲染線程,最終會將渲染命令入隊到TaskGraph的任務Queue中,等待合適的時機在渲染線程中被執行。其中TEnqueueUniqueRenderCommandType就是專用於渲染命令的特殊TaskGraph任務類型,聲明以下:

class RENDERCORE_API FRenderCommand
{
public:
	// 全部渲染指令都必須在渲染線程執行.
	static ENamedThreads::Type GetDesiredThread()
	{
		check(!GIsThreadedRendering || ENamedThreads::GetRenderThread() != ENamedThreads::GameThread);
		return ENamedThreads::GetRenderThread();
	}

	static ESubsequentsMode::Type GetSubsequentsMode()
	{
		return ESubsequentsMode::FireAndForget;
	}
};

template<typename TSTR, typename LAMBDA>
class TEnqueueUniqueRenderCommandType : public FRenderCommand
{
public:
	TEnqueueUniqueRenderCommandType(LAMBDA&& InLambda) : Lambda(Forward<LAMBDA>(InLambda)) {}
	
    // 正在執行任務.
	void DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent)
	{
		TRACE_CPUPROFILER_EVENT_SCOPE_ON_CHANNEL_STR(TSTR::TStr(), RenderCommandsChannel);
		FRHICommandListImmediate& RHICmdList = GetImmediateCommandList_ForRenderCommand();
		Lambda(RHICmdList);
	}

	(......)
    
private:
	LAMBDA Lambda; // 緩存渲染回調函數.
};

爲了更好理解入隊渲染命令操做,舉個具體的例子,以增長燈光到場景爲例:

void FScene::AddLight(ULightComponent* Light)
{
    (......)

    // Send a command to the rendering thread to add the light to the scene.
    FScene* Scene = this;
    FLightSceneInfo* LightSceneInfo = Proxy->LightSceneInfo;

    // 這裏入隊渲染指令, 以便在渲染線程將燈光數據傳遞到渲染器.
    ENQUEUE_RENDER_COMMAND(FAddLightCommand)(
        [Scene, LightSceneInfo](FRHICommandListImmediate& RHICmdList)
        {
            CSV_SCOPED_TIMING_STAT_EXCLUSIVE(Scene_AddLight);
            FScopeCycleCounter Context(LightSceneInfo->Proxy->GetStatId());
            Scene->AddLightSceneInfo_RenderThread(LightSceneInfo);
        });
}

ENQUEUE_RENDER_COMMAND(FAddLightCommand)代入前面解析過的宏和模板,並展開,完整的代碼以下:

struct FAddLightCommandName
{
    static const char* CStr() { return "FAddLightCommand"; }
    static const TCHAR* TStr() { return TEXT("FAddLightCommand"); }
};

EnqueueUniqueRenderCommand<FAddLightCommandName>(
    [Scene, LightSceneInfo](FRHICommandListImmediate& RHICmdList)
    {
        CSV_SCOPED_TIMING_STAT_EXCLUSIVE(Scene_AddLight);
        FScopeCycleCounter Context(LightSceneInfo->Proxy->GetStatId());
        Scene->AddLightSceneInfo_RenderThread(LightSceneInfo);
    })
{
	typedef TEnqueueUniqueRenderCommandType<FAddLightCommandName, LAMBDA> EURCType;

    // 若是在渲染線程內直接執行回調而不入隊渲染命令.
	if (IsInRenderingThread())
	{
		FRHICommandListImmediate& RHICmdList = GetImmediateCommandList_ForRenderCommand();
		Lambda(RHICmdList);
	}
	else
	{
        // 須要在獨立的渲染線程執行
		if (ShouldExecuteOnRenderThread())
		{
			CheckNotBlockedOnRenderThread();
            // 從GraphTask建立任務且在適當時候入隊渲染命令.
			TGraphTask<EURCType>::CreateTask().ConstructAndDispatchWhenReady(Forward<LAMBDA>(Lambda));
		}
		else // 不在獨立的渲染線程執行, 則直接執行.
		{
			EURCType TempCommand(Forward<LAMBDA>(Lambda));
			FScopeCycleCounter EURCMacro_Scope(TempCommand.GetStatId());
			TempCommand.DoTask(ENamedThreads::GameThread, FGraphEventRef());
		}
	}
}

FRenderingThread承載了渲染線程的主要工做,它的部分接口和實現代碼以下:

// Engine\Source\Runtime\RenderCore\Private\RenderingThread.cpp

class FRenderingThread : public FRunnable
{
private:
	bool bAcquiredThreadOwnership;	// 當沒有獨立的RHI線程時, 渲染線程將被其它線程捕獲.

public:
	FEvent* TaskGraphBoundSyncEvent; // TaskGraph同步事件, 以便在主線程使用渲染線程以前就將渲染線程綁定到TaskGraph體系中.

	FRenderingThread()
	{
		bAcquiredThreadOwnership = false;
        // 獲取同步事件.
		TaskGraphBoundSyncEvent	= FPlatformProcess::GetSynchEventFromPool(true);
		RHIFlushResources();
	}

	// FRunnable interface.
	virtual bool Init(void) override
	{
        // 獲取當前線程ID到全局變量GRenderThreadId, 以便其它地方引用.
		GRenderThreadId = FPlatformTLS::GetCurrentThreadId();
		
        // 處理線程捕獲關係.
		if (!IsRunningRHIInSeparateThread())
		{
			bAcquiredThreadOwnership = true;
			RHIAcquireThreadOwnership();
		}

		return true; 
	}
    
    (......)
    
	virtual uint32 Run(void) override
	{
        // 設置TLS.
		FMemory::SetupTLSCachesOnCurrentThread();
        // 設置渲染線程平臺相關的數據.
		FPlatformProcess::SetupRenderThread();

        (......)
		
        {
            // 進入渲染線程主循環.
            RenderingThreadMain( TaskGraphBoundSyncEvent );
        }
        
		FMemory::ClearAndDisableTLSCachesOnCurrentThread();
		return 0;
	}
};

可見它在運行以後會進入渲染線程邏輯,這裏再進入RenderingThreadMain代碼一探究竟:

void RenderingThreadMain( FEvent* TaskGraphBoundSyncEvent )
{
	LLM_SCOPE(ELLMTag::RenderingThreadMemory);
	
    // 將渲染線程和局部線程線程插槽設置成ActualRenderingThread和ActualRenderingThread_Local.
	ENamedThreads::Type RenderThread = ENamedThreads::Type(ENamedThreads::ActualRenderingThread);

	ENamedThreads::SetRenderThread(RenderThread);
	ENamedThreads::SetRenderThread_Local(ENamedThreads::Type(ENamedThreads::ActualRenderingThread_Local));
	
    // 將當前線程附加到TaskGraph的RenderThread插槽中.
	FTaskGraphInterface::Get().AttachToThread(RenderThread);
	FPlatformMisc::MemoryBarrier();

	// 觸發同步事件, 通知主線程渲染線程已經附加到TaskGraph, 已經準備好接收任務.
	if( TaskGraphBoundSyncEvent != NULL )
	{
		TaskGraphBoundSyncEvent->Trigger();
	}

	(......)
	
    // 渲染線程不一樣階段的處理.
	FCoreDelegates::PostRenderingThreadCreated.Broadcast();
	check(GIsThreadedRendering);
	FTaskGraphInterface::Get().ProcessThreadUntilRequestReturn(RenderThread);
	FPlatformMisc::MemoryBarrier();
	check(!GIsThreadedRendering);
	FCoreDelegates::PreRenderingThreadDestroyed.Broadcast();
	
	(......)
	
    // 恢復線程線程到遊戲線程.
	ENamedThreads::SetRenderThread(ENamedThreads::GameThread);
	ENamedThreads::SetRenderThread_Local(ENamedThreads::GameThread_Local);
	FPlatformMisc::MemoryBarrier();
}

不過這裏還留有一個很大的疑問,那就是FRenderingThread只是獲取當前線程做爲渲染線程並附加到TaskGraph中,並無建立線程。那麼是哪裏建立的渲染線程呢?繼續追蹤,結果發現是在StartRenderingThread()接口中建立了FRenderingThread實例,它的實現代碼以下(節選):

// Engine\Source\Runtime\RenderCore\Private\RenderingThread.cpp

void StartRenderingThread()
{
    (......)

	// Turn on the threaded rendering flag.
	GIsThreadedRendering = true;

	// 建立FRenderingThread實例.
	GRenderingThreadRunnable = new FRenderingThread();

    // 建立渲染線程!!
	GRenderingThread = FRunnableThread::Create(GRenderingThreadRunnable, *BuildRenderingThreadName(ThreadCount), 0, FPlatformAffinity::GetRenderingThreadPriority(), FPlatformAffinity::GetRenderingThreadMask(), FPlatformAffinity::GetRenderingThreadFlags());
	
    (......)

	// 開啓渲染命令的柵欄.
	FRenderCommandFence Fence;
	Fence.BeginFence();
	Fence.Wait();

	(......)
}

若是繼續追蹤,會發現StartRenderingThread()是在FEngineLoop::PreInitPostStartupScreen中調用的。

至此,渲染線程的建立、初始化以及主要接口的實現都剖析完了。

2.5.3.3 RHI線程的實現

RHI線程的工做是轉換渲染指令到指定圖形API,建立、上傳渲染資源到GPU。它的主要邏輯在FRHIThread中,實現代碼以下:

// Engine\Source\Runtime\RenderCore\Private\RenderingThread.cpp

class FRHIThread : public FRunnable
{
public:
	FRunnableThread* Thread;	// 所在的RHI線程.

	FRHIThread()
		: Thread(nullptr)
	{
		check(IsInGameThread());
	}
    
    void Start()
	{
        // 開始時建立RHI線程.
		Thread = FRunnableThread::Create(this, TEXT("RHIThread"), 512 * 1024, FPlatformAffinity::GetRHIThreadPriority(),
			FPlatformAffinity::GetRHIThreadMask(), FPlatformAffinity::GetRHIThreadFlags()
			);
		check(Thread);
	}

	virtual uint32 Run() override
	{
		LLM_SCOPE(ELLMTag::RHIMisc);
		
        // 初始化TLS
		FMemory::SetupTLSCachesOnCurrentThread();
        // 將FRHIThread所在的RHI線程附加到askGraph體系中,並指定到ENamedThreads::RHIThread。
		FTaskGraphInterface::Get().AttachToThread(ENamedThreads::RHIThread);
        // 啓動RHI線程,直到線程返回。
		FTaskGraphInterface::Get().ProcessThreadUntilRequestReturn(ENamedThreads::RHIThread);
        // 清理TLS.
		FMemory::ClearAndDisableTLSCachesOnCurrentThread();
		return 0;
	}
    
	// 單例接口。
	static FRHIThread& Get()
	{
		static FRHIThread Singleton; // 使用了局部靜態變量,能夠保證線程安全。
		return Singleton;
	}
};

可見RHI線程不一樣於渲染線程,是直接在FRHIThread對象內建立實際的線程。而FRHIThread的建立也是在StartRenderingThread()中:

void StartRenderingThread()
{
	(......)

	if (GUseRHIThread_InternalUseOnly)
	{
		FRHICommandListExecutor::GetImmediateCommandList().ImmediateFlush(EImmediateFlushType::DispatchToRHIThread);		
		if (!FTaskGraphInterface::Get().IsThreadProcessingTasks(ENamedThreads::RHIThread))
		{
            // 建立FRHIThread實例並啓動它.
			FRHIThread::Get().Start();
		}
		DECLARE_CYCLE_STAT(TEXT("Wait For RHIThread"), STAT_WaitForRHIThread, STATGROUP_TaskGraphTasks);
		
        // 建立RHI線程擁有者捕獲任務, 讓遊戲線程等待.
		FGraphEventRef CompletionEvent = TGraphTask<FOwnershipOfRHIThreadTask>::CreateTask(NULL, ENamedThreads::GameThread).ConstructAndDispatchWhenReady(true, GET_STATID(STAT_WaitForRHIThread));
		QUICK_SCOPE_CYCLE_COUNTER(STAT_StartRenderingThread);
        // 讓遊戲線程或局部線程等待RHI線程處理(捕獲了線程擁有者, 大多數圖形API爲空)完畢.
		FTaskGraphInterface::Get().WaitUntilTaskCompletes(CompletionEvent, ENamedThreads::GameThread_Local);
        // 存儲RHI線程id.
		GRHIThread_InternalUseOnly = FRHIThread::Get().Thread;
		check(GRHIThread_InternalUseOnly);
		GIsRunningRHIInDedicatedThread_InternalUseOnly = true;
		GIsRunningRHIInSeparateThread_InternalUseOnly = true;
		GRHIThreadId = GRHIThread_InternalUseOnly->GetThreadID();
        
		GRHICommandList.LatchBypass();
	}
	
    (......)
}

那麼渲染線程如何向RHI線程入隊任務呢?答案就在RHICommandList.h中:

// Engine\Source\Runtime\RHI\Public\RHICommandList.h

// RHI命令父類
struct FRHICommandBase
{
	FRHICommandBase* Next = nullptr; // 指向下一條RHI命令.
    // 執行RHI命令並銷燬.
	virtual void ExecuteAndDestruct(FRHICommandListBase& CmdList, FRHICommandListDebugContext& DebugContext) = 0;
};

// RHI命令結構體
template<typename TCmd, typename NameType = FUnnamedRhiCommand>
struct FRHICommand : public FRHICommandBase
{
	(......)

	void ExecuteAndDestruct(FRHICommandListBase& CmdList, FRHICommandListDebugContext& Context) override final
	{
		(......)
		
		TCmd *ThisCmd = static_cast<TCmd*>(this);

		ThisCmd->Execute(CmdList);
		ThisCmd->~TCmd();
	}
};

// 向RHI線程發送RHI命令的宏.
#define FRHICOMMAND_MACRO(CommandName)								\
struct PREPROCESSOR_JOIN(CommandName##String, __LINE__)				\
{																	\
	static const TCHAR* TStr() { return TEXT(#CommandName); }		\
};																	\
struct CommandName final : public FRHICommand<CommandName, PREPROCESSOR_JOIN(CommandName##String, __LINE__)>

RHI線程的相關實現機制跟渲染線程類型,且更加簡潔。如下是它的使用示範:

// Engine\Source\Runtime\RHI\Public\RHICommandList.h
FRHICOMMAND_MACRO(FRHICommandDrawPrimitive)
{
	uint32 BaseVertexIndex;
	uint32 NumPrimitives;
	uint32 NumInstances;
    
	FORCEINLINE_DEBUGGABLE FRHICommandDrawPrimitive(uint32 InBaseVertexIndex, uint32 InNumPrimitives, uint32 InNumInstances)
		: BaseVertexIndex(InBaseVertexIndex)
		, NumPrimitives(InNumPrimitives)
		, NumInstances(InNumInstances)
	{
	}
	RHI_API void Execute(FRHICommandListBase& CmdList);
};

// Engine\Source\Runtime\RHI\Public\RHICommandListCommandExecutes.inl
void FRHICommandDrawPrimitive::Execute(FRHICommandListBase& CmdList)
{
	RHISTAT(DrawPrimitive);
	INTERNAL_DECORATOR(RHIDrawPrimitive)(BaseVertexIndex, NumPrimitives, NumInstances);
}

因而可知,全部的RHI指令都是預先聲明並實現好的,目前存在的RHI渲染指令類型達到近百種(以下),渲染線程建立這些聲明好的RHI指令便可在合適的被推入RHI線程隊列並被執行。

FRHICOMMAND_MACRO(FRHICommandUpdateGeometryCacheBuffer)
FRHICOMMAND_MACRO(FRHISubmitFrameToEncoder)
FRHICOMMAND_MACRO(FLocalRHICommand)
FRHICOMMAND_MACRO(FRHISetSpectatorScreenTexture)
FRHICOMMAND_MACRO(FRHISetSpectatorScreenModeTexturePlusEyeLayout)
FRHICOMMAND_MACRO(FRHISyncFrameCommand)
FRHICOMMAND_MACRO(FRHICommandStat)
FRHICOMMAND_MACRO(FRHICommandRHIThreadFence)
FRHICOMMAND_MACRO(FRHIAsyncComputeSubmitList)
FRHICOMMAND_MACRO(FRHICommandWaitForAndSubmitSubListParallel)
FRHICOMMAND_MACRO(FRHICommandWaitForAndSubmitSubList)
FRHICOMMAND_MACRO(FRHICommandWaitForAndSubmitRTSubList)
FRHICOMMAND_MACRO(FRHICommandSubmitSubList)
FRHICOMMAND_MACRO(FRHICommandBeginUpdateMultiFrameResource)
FRHICOMMAND_MACRO(FRHICommandEndUpdateMultiFrameResource)
FRHICOMMAND_MACRO(FRHICommandBeginUpdateMultiFrameUAV)
FRHICOMMAND_MACRO(FRHICommandEndUpdateMultiFrameUAV)
FRHICOMMAND_MACRO(FRHICommandSetGPUMask)
FRHICOMMAND_MACRO(FRHICommandWaitForTemporalEffect)
FRHICOMMAND_MACRO(FRHICommandBroadcastTemporalEffect)
FRHICOMMAND_MACRO(FRHICommandSetStencilRef)
FRHICOMMAND_MACRO(FRHICommandDrawPrimitive)
FRHICOMMAND_MACRO(FRHICommandDrawIndexedPrimitive)
FRHICOMMAND_MACRO(FRHICommandSetBlendFactor)
FRHICOMMAND_MACRO(FRHICommandSetStreamSource)
FRHICOMMAND_MACRO(FRHICommandSetViewport)
FRHICOMMAND_MACRO(FRHICommandSetStereoViewport)
FRHICOMMAND_MACRO(FRHICommandSetScissorRect)
FRHICOMMAND_MACRO(FRHICommandSetRenderTargets)
FRHICOMMAND_MACRO(FRHICommandBeginRenderPass)
FRHICOMMAND_MACRO(FRHICommandEndRenderPass)
FRHICOMMAND_MACRO(FRHICommandNextSubpass)
FRHICOMMAND_MACRO(FRHICommandBeginParallelRenderPass)
FRHICOMMAND_MACRO(FRHICommandEndParallelRenderPass)
FRHICOMMAND_MACRO(FRHICommandBeginRenderSubPass)
FRHICOMMAND_MACRO(FRHICommandEndRenderSubPass)
FRHICOMMAND_MACRO(FRHICommandBeginComputePass)
FRHICOMMAND_MACRO(FRHICommandEndComputePass)
FRHICOMMAND_MACRO(FRHICommandBindClearMRTValues)
FRHICOMMAND_MACRO(FRHICommandSetGraphicsPipelineState)
FRHICOMMAND_MACRO(FRHICommandAutomaticCacheFlushAfterComputeShader)
FRHICOMMAND_MACRO(FRHICommandFlushComputeShaderCache)
FRHICOMMAND_MACRO(FRHICommandDrawPrimitiveIndirect)
FRHICOMMAND_MACRO(FRHICommandDrawIndexedIndirect)
FRHICOMMAND_MACRO(FRHICommandDrawIndexedPrimitiveIndirect)
FRHICOMMAND_MACRO(FRHICommandSetDepthBounds)
FRHICOMMAND_MACRO(FRHICommandClearUAVFloat)
FRHICOMMAND_MACRO(FRHICommandClearUAVUint)
FRHICOMMAND_MACRO(FRHICommandCopyToResolveTarget)
FRHICOMMAND_MACRO(FRHICommandCopyTexture)
FRHICOMMAND_MACRO(FRHICommandResummarizeHTile)
FRHICOMMAND_MACRO(FRHICommandTransitionTexturesDepth)
FRHICOMMAND_MACRO(FRHICommandTransitionTextures)
FRHICOMMAND_MACRO(FRHICommandTransitionTexturesArray)
FRHICOMMAND_MACRO(FRHICommandTransitionTexturesPipeline)
FRHICOMMAND_MACRO(FRHICommandTransitionTexturesArrayPipeline)
FRHICOMMAND_MACRO(FRHICommandClearColorTexture)
FRHICOMMAND_MACRO(FRHICommandClearDepthStencilTexture)
FRHICOMMAND_MACRO(FRHICommandClearColorTextures)
FRHICOMMAND_MACRO(FRHICommandSetGlobalUniformBuffers)
FRHICOMMAND_MACRO(FRHICommandBuildLocalUniformBuffer)
FRHICOMMAND_MACRO(FRHICommandBeginRenderQuery)
FRHICOMMAND_MACRO(FRHICommandEndRenderQuery)
FRHICOMMAND_MACRO(FRHICommandCalibrateTimers)
FRHICOMMAND_MACRO(FRHICommandPollOcclusionQueries)
FRHICOMMAND_MACRO(FRHICommandBeginScene)
FRHICOMMAND_MACRO(FRHICommandEndScene)
FRHICOMMAND_MACRO(FRHICommandBeginFrame)
FRHICOMMAND_MACRO(FRHICommandEndFrame)
FRHICOMMAND_MACRO(FRHICommandBeginDrawingViewport)
FRHICOMMAND_MACRO(FRHICommandEndDrawingViewport)
FRHICOMMAND_MACRO(FRHICommandInvalidateCachedState)
FRHICOMMAND_MACRO(FRHICommandDiscardRenderTargets)
FRHICOMMAND_MACRO(FRHICommandDebugBreak)
FRHICOMMAND_MACRO(FRHICommandUpdateTextureReference)
FRHICOMMAND_MACRO(FRHICommandUpdateRHIResources)
FRHICOMMAND_MACRO(FRHICommandCopyBufferRegion)
FRHICOMMAND_MACRO(FRHICommandCopyBufferRegions)
FRHICOMMAND_MACRO(FRHICommandClearRayTracingBindings)
FRHICOMMAND_MACRO(FRHICommandRayTraceOcclusion)
FRHICOMMAND_MACRO(FRHICommandRayTraceIntersection)
FRHICOMMAND_MACRO(FRHICommandRayTraceDispatch)
FRHICOMMAND_MACRO(FRHICommandSetRayTracingBindings)
FRHICOMMAND_MACRO(FClearCachedRenderingDataCommand)
FRHICOMMAND_MACRO(FClearCachedElementDataCommand)

2.5.4 遊戲線程和渲染線程的交互

本節將講述各個線程之間的數據交換機制和實現細節。首先看看遊戲線程如何將數據傳遞給渲染線程。

遊戲線程在Tick時,會經過UGameEngine、FViewport、UGameViewportClient等對象,纔會進入渲染模塊的調用:

void UGameEngine::Tick( float DeltaSeconds, bool bIdleMode )
{
    UGameEngine::RedrawViewports()
    {
        void FViewport::Draw( bool bShouldPresent)
        {
            void UGameViewportClient::Draw()
            {
                // 計算ViewFamily、View的各類屬性
                ULocalPlayer::CalcSceneView();
                // 發送渲染命令
                FRendererModule::BeginRenderingViewFamily()
                {
                    World->SendAllEndOfFrameUpdates();
                    // 建立場景渲染器
                    FSceneRenderer* SceneRenderer = FSceneRenderer::CreateSceneRenderer(ViewFamily, ...);
                    // 向渲染線程發送繪製場景指令.
                    ENQUEUE_RENDER_COMMAND(FDrawSceneCommand)(
                    [SceneRenderer](FRHICommandListImmediate& RHICmdList)
                    {
                        RenderViewFamily_RenderThread(RHICmdList, SceneRenderer)
                        {
                            (......)
                            // 調用場景渲染器的繪製接口.
                            SceneRenderer->Render(RHICmdList);
                            (......)
                        }
                        FlushPendingDeleteRHIResources_RenderThread();
                    });
                }
}}}}

前面章節也提到,渲染線程使用的是SceneProxy和SceneInfo等對象,那麼遊戲的Actor組件是如何跟場景代理的數據聯繫起來的呢?又是如何更新數據的?

先弄清楚遊戲組件向SceneProxy傳遞數據的機制,答案就藏在FScene::AddPrimitive

// Engine\Source\Runtime\Renderer\Private\RendererScene.cpp

void FScene::AddPrimitive(UPrimitiveComponent* Primitive)
{
	(......)
    
	// 建立圖元的場景代理
	FPrimitiveSceneProxy* PrimitiveSceneProxy = Primitive->CreateSceneProxy();
	Primitive->SceneProxy = PrimitiveSceneProxy;
	if(!PrimitiveSceneProxy)
	{
		return;
	}

	// 建立圖元場景代理的場景信息
	FPrimitiveSceneInfo* PrimitiveSceneInfo = new FPrimitiveSceneInfo(Primitive, this);
	PrimitiveSceneProxy->PrimitiveSceneInfo = PrimitiveSceneInfo;
    
    (......)

	FScene* Scene = this;

	ENQUEUE_RENDER_COMMAND(AddPrimitiveCommand)(
		[Params = MoveTemp(Params), Scene, PrimitiveSceneInfo, PreviousTransform = MoveTemp(PreviousTransform)](FRHICommandListImmediate& RHICmdList)
		{
			FPrimitiveSceneProxy* SceneProxy = Params.PrimitiveSceneProxy;
			
            (......)

			SceneProxy->CreateRenderThreadResources();
            // 在渲染線程中將SceneInfo加入到場景中.
			Scene->AddPrimitiveSceneInfo_RenderThread(PrimitiveSceneInfo, PreviousTransform);
		});
}

上面有個關鍵的一句Primitive->CreateSceneProxy()便是建立組件對應的PrimitiveSceneProxy,在PrimitiveSceneProxy的構造函數中,將組件的全部數據都拷貝了一份:

FPrimitiveSceneProxy::FPrimitiveSceneProxy(const UPrimitiveComponent* InComponent, FName InResourceName)
:
	CustomPrimitiveData(InComponent->GetCustomPrimitiveData())
,	TranslucencySortPriority(FMath::Clamp(InComponent->TranslucencySortPriority, SHRT_MIN, SHRT_MAX))
,	Mobility(InComponent->Mobility)
,	LightmapType(InComponent->LightmapType)
,	StatId()
,	DrawInGame(InComponent->IsVisible())
,	DrawInEditor(InComponent->GetVisibleFlag())
,	bReceivesDecals(InComponent->bReceivesDecals)

(......)

{
	(......)
}

拷貝數據以後,遊戲線程修改的是PrimitiveComponent的數據,而渲染線程修改或訪問的是PrimitiveSceneProxy的數據,彼此不干擾,避免了臨界區和鎖的同步,也保證了線程安全。不過這裏還有疑問,那就是建立PrimitiveSceneProxy的時候會拷貝一份數據,但在建立完以後,PrimitiveComponent是如何向PrimitiveSceneProxy更新數據的呢?

原來是ActorComponent有幾個標記,只要這幾個標記被標記爲true,便會在適當的時機調用更新接口,以便獲得更新:

// Engine\Source\Runtime\Engine\Classes\Components\ActorComponent.h

class ENGINE_API UActorComponent : public UObject, public IInterface_AssetUserData
{
protected:
    // 如下接口分別更新對應的狀態, 子類能夠重寫以實現本身的更新邏輯.
    virtual void DoDeferredRenderUpdates_Concurrent()
    {
		(......)
        
        if(bRenderStateDirty)
        {
            RecreateRenderState_Concurrent();
        }
        else
        {
            if(bRenderTransformDirty)
            {
                SendRenderTransform_Concurrent();
            }
            if(bRenderDynamicDataDirty)
            {
                SendRenderDynamicData_Concurrent();
            }
        }
    }
    virtual void CreateRenderState_Concurrent(FRegisterComponentContext* Context)
    {
        bRenderStateCreated = true;

        bRenderStateDirty = false;
        bRenderTransformDirty = false;
        bRenderDynamicDataDirty = false;
    }
	virtual void SendRenderTransform_Concurrent()
    {
		bRenderTransformDirty = false;
	}
	virtual void SendRenderDynamicData_Concurrent()
    {
		bRenderDynamicDataDirty = false;
	}
    
private:
	uint8 bRenderStateDirty:1; // 組件的渲染狀態是否髒的
	uint8 bRenderTransformDirty:1; // 組件的變換矩陣是否髒的
	uint8 bRenderDynamicDataDirty:1; // 組件的渲染動態數據是否髒的
};

上面protected的接口就是用於刷新組件的數據到對應的SceneProxy,具體的組件子類能夠重寫它,以定製本身的更新邏輯,好比ULightComponent的變換矩陣更新邏輯以下:

// Engine\Source\Runtime\Engine\Private\Components\LightComponent.cpp

void ULightComponent::SendRenderTransform_Concurrent()
{
	// 將變換信息更新到場景.
	GetWorld()->Scene->UpdateLightTransform(this);
	Super::SendRenderTransform_Concurrent();
}

而場景的UpdateLightTransform會將組件的數據組裝起來,並將數據發送到渲染線程執行:

// Engine\Source\Runtime\Renderer\Private\RendererScene.cpp

void FScene::UpdateLightTransform(ULightComponent* Light)
{
	if(Light->SceneProxy)
	{
        // 組裝組件的數據到結構體(注意這裏不能將Component的地址傳到渲染線程,而是將全部要更新的數據拷貝一份)
		FUpdateLightTransformParameters Parameters;
		Parameters.LightToWorld = Light->GetComponentTransform().ToMatrixNoScale();
		Parameters.Position = Light->GetLightPosition();
		FScene* Scene = this;
		FLightSceneInfo* LightSceneInfo = Light->SceneProxy->GetLightSceneInfo();
        // 將數據發送到渲染線程執行.
		ENQUEUE_RENDER_COMMAND(UpdateLightTransform)(
			[Scene, LightSceneInfo, Parameters](FRHICommandListImmediate& RHICmdList)
			{
				FScopeCycleCounter Context(LightSceneInfo->Proxy->GetStatId());
                // 在渲染線程執行數據更新.
				Scene->UpdateLightTransform_RenderThread(LightSceneInfo, Parameters);
			});
	}
}

void FScene::UpdateLightTransform_RenderThread(FLightSceneInfo* LightSceneInfo, const FUpdateLightTransformParameters& Parameters)
{
	(......)

	// 更新變換矩陣.
	LightSceneInfo->Proxy->SetTransform(Parameters.LightToWorld, Parameters.Position);
		
	(......)
}

至此,組件如何向場景代理更新數據的邏輯終於理清了。

須要特別提醒的是,FScene、FSceneProxy等有些接口在遊戲線程調用,而有些接口(通常帶有_RenderThread的後綴)在渲染線程調用,切記不能跨線程調用,不然會產生競爭條件,甚至引起程序崩潰。

2.5.5 遊戲線程和渲染線程的同步

前面也提到,遊戲線程不可能領先於渲染線程超過一幀,不然遊戲線程會等待渲染線程處理完。它們的同步機制涉及兩個關鍵的概念:

// Engine\Source\Runtime\RenderCore\Public\RenderCommandFence.h

// 渲染命令柵欄
class RENDERCORE_API FRenderCommandFence
{
public:
    // 向渲染命令隊列增長一個柵欄. bSyncToRHIAndGPU是否同步RHI和GPU交換Buffer, 不然只等待渲染線程.
	void BeginFence(bool bSyncToRHIAndGPU = false); 

    // 等待柵欄被執行. bProcessGameThreadTasks沒有做用.
	void Wait(bool bProcessGameThreadTasks = false) const;

	// 是否完成了柵欄.
	bool IsFenceComplete() const;

private:
	mutable FGraphEventRef CompletionEvent; // 處理完成同步的事件
	ENamedThreads::Type TriggerThreadIndex; // 處理完以後須要觸發的線程類型.
};

// Engine\Source\Runtime\Engine\Public\UnrealEngine.h
class FFrameEndSync
{
	FRenderCommandFence Fence[2]; // 渲染柵欄對.
	int32 EventIndex; // 當前事件索引
public:
    // 同步遊戲線程和渲染線程. bAllowOneFrameThreadLag是否容許渲染線程一幀的延遲.
	void Sync( bool bAllowOneFrameThreadLag )
    {
        Fence[EventIndex].BeginFence(true); // 開啓柵欄, 強制同步RHI和GPU交換鏈的.

        bool bEmptyGameThreadTasks = !FTaskGraphInterface::Get().IsThreadProcessingTasks(ENamedThreads::GameThread);
		
        // 保證遊戲線程至少跑過一次任務.
        if (bEmptyGameThreadTasks)
        {
            FTaskGraphInterface::Get().ProcessThreadUntilIdle(ENamedThreads::GameThread);
        }

        // 若是容許延遲, 交換事件索引.
        if( bAllowOneFrameThreadLag )
        {
            EventIndex = (EventIndex + 1) % 2;
        }

        (......)
        
        // 開啓柵欄等待.
        Fence[EventIndex].Wait(bEmptyGameThreadTasks);
    }
};

FFrameEndSync的使用是在FEngineLoop::Tick中:

// Engine\Source\Runtime\Launch\Private\LaunchEngineLoop.cpp

void FEngineLoop::Tick()
{
	(......)
    
    // 在引擎循環的幀末尾添加遊戲線程和渲染線程的同步事件.
    {
        static FFrameEndSync FrameEndSync; // 局部靜態變量, 線程安全.
        static auto CVarAllowOneFrameThreadLag = IConsoleManager::Get().FindTConsoleVariableDataInt(TEXT("r.OneFrameThreadLag"));
        // 同步遊戲和渲染線程, 是否容許一幀的延遲可由控制檯命令控制. 默認是開啓的.
        FrameEndSync.Sync( CVarAllowOneFrameThreadLag->GetValueOnGameThread() != 0 );
	}
    
    (......)
}

 

2.6 多線程渲染結語

並行計算架構已然成爲現代引擎的標配,UE的多線程渲染是隨着多核CPU和新一代圖形API誕生而必然的產物。但就目前而言,渲染線程不少時候仍是單條的(雖然能夠藉助TaskGraph部分地並行)。理想狀況下,是多條渲染線程並行且不依賴地生成渲染命令,而且不須要主線程來驅動,任何線程均可做爲工做線程(亦即沒有UE的命名線程),任何線程均可發起計算任務,避免操做系統級別的功能線程。而這須要操做系統、圖形API、計算機語言共同地不斷演化纔可達成。

最近發佈的UE4.26已經在普及RDG,RDG能夠自動裁剪、優化渲染Pass和資源,是提高引擎總體並行處理的一大利器。

這篇文章本來預計2個月左右完成,然而實際上花了3個多月,幾乎耗盡了筆者的全部業餘時間。本來還有不少技術章節須要添加,但篇幅和時間都超限了,只好做罷。但願此係列文章對學習UE的讀者們有幫助,感謝關注和收藏。

 

特別說明

  • 感謝全部參考文獻的做者,部分圖片來自參考文獻和網絡,侵刪。
  • 本系列文章爲筆者原創,只發表在博客園上,歡迎分享本文連接,但未經贊成,不容許轉載
  • 系列文章,未完待續,完整目錄請戳內容綱目
  • 系列文章,未完待續,完整目錄請戳內容綱目
  • 系列文章,未完待續,完整目錄請戳內容綱目

 

參考文獻

相關文章
相關標籤/搜索