DirectX11 With Windows SDK--27 計算着色器:雙調排序

前言

上一章咱們用一個比較簡單的例子來嘗試使用計算着色器,可是在看這一章內容以前,你還須要瞭解下面的內容:html

章節
26 計算着色器:入門
深刻理解與使用緩衝區資源(結構化緩衝區/有類型緩衝區)
Visual Studio圖形調試器詳細使用教程(編程捕獲部分)

這一章咱們繼續用一個計算着色器的應用實例做爲切入點,進一步瞭解相關知識。git

DirectX11 With Windows SDK完整目錄github

Github項目源碼算法

歡迎加入QQ羣: 727623616 能夠一塊兒探討DX11,以及有什麼問題也能夠在這裏彙報。編程

線程標識符

對於線程組(大小(ThreadDimX, ThreadDimY, ThreadDimZ))中的每個線程,它們都有一個惟一的線程ID值。咱們可使用系統值SV_GroupThreadID來取得,它的索引範圍爲(0, 0, 0)(ThreadDimX - 1, ThreadDimY - 1, ThreadDimZ - 1)緩存

而對於整個線程組來講,因爲線程組集合也是在3D空間中排布,它們也有一個惟一的線程組ID值。咱們可使用系統值SV_GroupID來取得,線程組的索引範圍取決於調用ID3D11DeviceContext::Dispatch時提供的線程組(大小(GroupDimX, GroupDimY, GroupDimZ)),範圍爲(0, 0, 0)(GroupDimX - 1, GroupDimY - 1, GroupDimZ - 1)安全

緊接着就是系統值SV_GroupIndex,它是單個線程組內的線程三維索引的一維展開。若已知線程組的大小爲(ThreadDimX, ThreadDimY, ThreadDimZ),則能夠肯定SV_GroupIndexSV_GroupThreadID知足下面關係:網絡

SV_GroupIndex = SV_GroupThreadID.z * ThreadDimX * ThreadDimY + SV_GroupThreadID.y * ThreadDimX + SV_GroupThreadID.x;

最後就是系統值SV_DispatchThreadID,線程組中的每個線程在ID3D11DeviceContext::Dispatch提供的線程組集合中都有其惟一的線程ID值。若已知線程組的大小爲 (ThreadDimX, ThreadDimY, ThreadDimZ),則能夠肯定SV_DispatchThreadIDSV_GroupThreadIDSV_GroupID知足如下關係:dom

SV_DispatchThreadID.xyz = SV_GroupID.xyz * float3(ThreadDimX, ThreadDimY, ThreadDimZ) + SV_GroupThreadID.xyz;

共享內存和線程同步

在一個線程組內,容許設置一片共享內存區域,使得當前線程組內的全部線程均可以訪問當前的共享內存。一旦設置,那麼每一個線程都會各自配備一份共享內存。共享內存的訪問速度很是快,就像寄存器訪問CPU緩存那樣。函數

共享內存的聲明方式以下:

groupshared float4 g_Cache[256];

對於每一個線程組來講,它所容許分配的總空間最大爲32kb(即8192個標量,或2048個向量)。內部線程一般應該使用SV_ThreadGroupID來寫入共享內存,這樣以保證每一個線程不會出現重複寫入操做,而讀取共享內存通常是線程安全的。

分配太多的共享內存會致使性能問題。假如一個多處理器支持32kb的共享內存,而後你的計算着色器須要20kb的共享內存,這意味着一個多處理器只適合處理一個線程組,由於剩餘的共享內存不足以給新的線程組運行,這也會限制GPU的並行運算,當該線程組由於某些緣由須要等待,會致使當前的多處理器處於閒置狀態。所以保證一個多處理器至少可以處理兩個或以上的線程組(好比每一個線程組分配16kb如下的共享內存),以儘量減小該多處理器的閒置時間。

如今來考慮下面的代碼:

Texture2D g_Input : register(t0);
RWTexture2D<float4> g_Output : register(u0);

groupshared float4 g_Cache[256];

[numthreads(256, 1, 1)]
void CS(uint3 GTid : SV_GroupThreadID,
    uint3 DTid : SV_DispatchThreadID)
{
    // 將紋理像素值緩存到共享內存
    g_Cache[GTid.x] = g_Input[DTid.xy];
    
    // 取出共享內存的值進行計算
    
    // 注意!!相鄰的兩個線程可能沒有完成對紋理的採樣
    // 以及存儲到共享內存的操做
    float left = g_Cache[GTid.x - 1];
    float right = g_Cache[GTid.x + 1];
    
    // ...
}

由於多個線程同時運行,同一時間各個線程當前執行的指令有所誤差,有的線程可能已經完成了共享內存的賦值操做,有的線程可能還在進行紋理採樣操做。若是當前線程正在讀取相鄰的共享內存片斷,結果將是未定義的。爲了解決這個問題,咱們必須在讀取共享內存以前讓當前線程等待線程組內其它的全部線程完成寫入操做。這裏咱們可使用GroupMemoryBarrierWithGroupSync函數:

Texture2D g_Input : register(t0);
RWTexture2D<float4> g_Output : register(u0);

groupshared float4 g_Cache[256];

[numthreads(256, 1, 1)]
void CS(uint3 GTid : SV_GroupThreadID,
    uint3 DTid : SV_DispatchThreadID)
{
    // 將紋理像素值緩存到共享內存
    g_Cache[GTid.x] = g_Input[DTid.xy];
    
    // 等待全部線程完成寫入
    GroupMemoryBarrierWithGroupSync();
    
    // 如今讀取操做是線程安全的,能夠開始進行計算
    float left = g_Cache[GTid.x - 1];
    float right = g_Cache[GTid.x + 1];
    
    // ...
}

雙調排序

雙調序列

所謂雙調序列(Bitonic Sequence),是指由一個非嚴格遞增序列X(容許相鄰兩個數相等)和非嚴格遞減序列Y構成的序列,好比序列\((5, 3, 2, 1, 4, 6, 6, 12)\)

定義:一個序列\(a_1 , a_2, ..., a_n\)是雙調序列,須要知足下面條件:

  1. 存在一個\(a_k(1 <= k <= n)\),使得\(a_1 >= ... >= a_k <= ... <= a_n\)成立,或者\(a_1 <= ... <= a_k >= ... >= a_n\)成立;
  2. 序列循環移位後仍可以知足條件(1)

Batcher歸併網絡

Batcher歸併網絡是由一系列Batcher比較器組成的,Batcher比較器是指在兩個輸入端給定輸入值x和y,再在兩個輸出端輸出最大值\(max(x, y)\)和最小值\(min(x, y)\)

雙調歸併網絡

雙調歸併網絡是基於Batch定理而構建的。該定理是說將任意一個長爲2n的雙調序列分爲等長的兩半X和Y,將X中的元素與Y中的元素按原序比較,即\(a_i\)\(a_{i+n}(i <= n)\)比較,將較大者放入MAX序列,較小者放入MIN序列。則獲得的MAX序列和MIN序列仍然是雙調序列,而且MAX序列中的任意一個元素不小於MIN序列中的任意一個元素。

根據這個原理,咱們能夠將一個n元素的雙調序列經過上述方式進行比較操做來獲得一個MAX序列和一個MIN序列,而後對這兩個序列進行遞歸處理,直到序列不可再分割爲止。最終歸併獲得的爲一個有序序列。

這裏咱們用一張圖來描述雙調排序的全過程:

其中箭頭方向指的是兩個數交換後,箭頭段的數爲較大值,圓點段的數爲較小值。

咱們能夠總結出以下規律:

  1. 每一趟排序結束會產生連續的雙調序列,除了最後一趟排序會產生咱們所須要的單調序列
  2. 對於2^k個元素的任意序列,須要進行k趟排序才能產生單調序列
  3. 對於由\(2^{k-1}\)個元素的單調遞增序列和\(2^{k-1}\)個元素的單調遞減序列組成的雙調序列,須要進行k趟交換才能產生2^k個元素的單調遞增序列
  4. 在第n趟排序中的第m趟交換,若兩個比較數中較小的索引值爲i,那麼與之進行交換的數索引爲\(i+2^{n-m}\)

雙調排序的空間複雜度爲\(O(n)\),時間複雜度爲\(O(n{(lg(n))}^2)\),看起來比\(O(nlg(n))\)系列的排序算法慢上一截,可是得益於GPU的並行計算,能夠看做同一時間內有n個線程在運行,使得最終的時間複雜度能夠降爲\(O({(lg(n))}^2)\),效率又上了一個檔次。

須要注意的是,雙調排序要求排序元素的數目爲\(2^k, (k>=1)\),若是元素個數爲\(2^k < n < 2^{k+1}\),則須要填充數據到\(2^{k+1}\)個。若須要進行升序排序,則須要填充足夠的最大值;若須要進行降序排序,則須要填充足夠的最小值。

排序核心代碼實現

本HLSL實現參考了directx-sdk-samples,雖然裏面的實現看起來比較簡潔,可是理解它的算法實現費了我很多的時間。我的以本身可以理解的形式對它的實現進行了修改,所以這裏以我這邊的實現版原本講解。

首先是排序須要用到的資源和常量緩衝區,定義在BitonicSort.hlsli

// BitonicSort.hlsli
Buffer<uint> g_Input : register(t0);
RWBuffer<uint> g_Data : register(u0);

cbuffer CB : register(b0)
{
    uint g_Level;        // 2^須要排序趟數
    uint g_DescendMask;  // 降低序列掩碼
    uint g_MatrixWidth;  // 矩陣寬度(要求寬度>=高度且都爲2的倍數)
    uint g_MatrixHeight; // 矩陣高度
}

而後是核心的排序算法:

// BitonicSort_CS.hlsl
#include "BitonicSort.hlsli"

#define BITONIC_BLOCK_SIZE 512

groupshared uint shared_data[BITONIC_BLOCK_SIZE];

[numthreads(BITONIC_BLOCK_SIZE, 1, 1)]
void CS(uint3 Gid : SV_GroupID,
    uint3 DTid : SV_DispatchThreadID,
    uint3 GTid : SV_GroupThreadID,
    uint GI : SV_GroupIndex)
{
    // 寫入共享數據
    shared_data[GI] = g_Data[DTid.x];
    GroupMemoryBarrierWithGroupSync();
    
    // 進行排序
    for (uint j = g_Level >> 1; j > 0; j >>= 1)
    {
        uint smallerIndex = GI & ~j;
        uint largerIndex = GI | j;
        bool isDescending = (bool) (g_DescendMask & DTid.x);
        bool isSmallerIndex = (GI == smallerIndex);
        uint result = ((shared_data[smallerIndex] <= shared_data[largerIndex]) == (isDescending == isSmallerIndex)) ?
            shared_data[largerIndex] : shared_data[smallerIndex];
        GroupMemoryBarrierWithGroupSync();

        shared_data[GI] = result;
        GroupMemoryBarrierWithGroupSync();
    }
    
    // 保存結果
    g_Data[DTid.x] = shared_data[GI];
}

能夠看到,咱們實際上能夠將遞歸過程轉化成迭代來實現。

如今咱們先從核心排序算法講起,因爲受到線程組的線程數目、共享內存大小限制,這裏定義一個線程組包含512個線程,即一個線程組最大容許排序的元素數目爲512。共享內存在這是用於臨時緩存中間排序的結果。

首先,咱們須要將數據寫入共享內存中:

// 寫入共享數據
shared_data[GI] = g_Data[DTid.x];
GroupMemoryBarrierWithGroupSync();

接着就是要開始遞歸排序的過程,其中g_Level的含義爲單個雙調序列的長度,它也說明了須要對該序列進行\(lg(g_Level)\)趟遞歸交換。

在一個線程中,咱們僅知道該線程對應的元素,但如今咱們還須要作兩件事情:

  1. 找到須要與該線程對應元素進行Batcher比較的另外一個元素
  2. 判斷當前線程對應元素與另外一個待比較元素相比,是較小索引仍是較大索引

這裏用到了位運算的魔法。先舉個例子,當前j爲4,則待比較兩個元素的索引分別爲2和6,這兩個索引值的區別在於索引2(二進制010)和索引6(二進制110),前者二進制第三位爲0,後者二進制第三位爲1.

但只要咱們知道上述其中的一個索引,就能夠求出另外一個索引。較小索引值的索引能夠經過屏蔽二進制的第三位獲得,而較大索引值的索引能夠經過按位或運算使得第三位爲1來獲得:

uint smallerIndex = GI & ~j;
uint largerIndex = GI | j;
bool isSmallerIndex = (GI == smallerIndex);

而後就是判斷當前元素是位於當前趟排序完成後的遞增序列仍是遞減序列,好比序列\((4, 6, 4, 3, 5, 7, 2, 1)\),如今要進行第二趟排序,那麼先後4個數將分別生成遞增序列和遞減序列,咱們能夠設置g_DescendMask的值爲4(二進制100),這樣二進制索引範圍在100到111的值(對應十進制4-7)處在遞減序列,若是這個雙調序列長度爲16,那麼索引4-7和12-15的兩段序列均可以經過g_DescendMask來判斷出處在遞減序列:

bool isDescending = (bool) (g_DescendMask & DTid.x);

最後就是要肯定當前線程對應的共享內存元素須要獲得較小值,仍是較大值了。這裏又以一個雙調序列\((2, 5, 7, 4)\)爲例,待比較的兩個元素爲5和4,當前趟排序會將它變爲單調遞增序列,即所處的序列爲遞增序列,當前線程對應的元素爲5,shared_data[smallerIndex] <= shared_data[largerIndex]的比較結果爲>,那麼它將拿到(較小值)較大索引的值。通過第一趟交換後將變成\((2, 4, 7, 5)\),第二趟交換就不討論了。

根據對元素所處序列、元素當前索引和比較結果的討論,能夠產生出八種狀況:

所處序列 當前索引 比較結果 取值結果
遞減 小索引 <= (較大值)較大索引的值
遞減 大索引 <= (較小值)較大索引的值
遞增 小索引 <= (較小值)較小索引的值
遞增 大索引 <= (較大值)較小索引的值
遞減 小索引 > (較大值)較小索引的值
遞減 大索引 > (較小值)較小索引的值
遞增 小索引 > (較小值)較大索引的值
遞增 大索引 > (較大值)較大索引的值

顯然現有的變量判斷較大/較小索引值比判斷較大值/較小值容易得多。上述結果表能夠整理成下面的代碼:

uint result = ((shared_data[smallerIndex] <= shared_data[largerIndex]) == (isDescending == isSmallerIndex)) ?
    shared_data[largerIndex] : shared_data[smallerIndex];
GroupMemoryBarrierWithGroupSync();

shared_data[GI] = result;
GroupMemoryBarrierWithGroupSync();

在C++中,如今有以下資源和着色器:

ComPtr<ID3D11Buffer> m_pConstantBuffer;             // 常量緩衝區
ComPtr<ID3D11Buffer> m_pTypedBuffer1;               // 有類型緩衝區1
ComPtr<ID3D11Buffer> m_pTypedBuffer2;               // 有類型緩衝區2
ComPtr<ID3D11Buffer> m_pTypedBufferCopy;            // 用於拷貝的有類型緩衝區
ComPtr<ID3D11UnorderedAccessView> m_pDataUAV1;      // 有類型緩衝區1對應的無序訪問視圖
ComPtr<ID3D11UnorderedAccessView> m_pDataUAV2;      // 有類型緩衝區2對應的無序訪問視圖
ComPtr<ID3D11ShaderResourceView> m_pDataSRV1;       // 有類型緩衝區1對應的着色器資源視圖
ComPtr<ID3D11ShaderResourceView> m_pDataSRV2;       // 有類型緩衝區2對應的着色器資源視圖

而後就是對512個元素進行排序的部分代碼(size爲2的次冪):

void GameApp::SetConstants(UINT level, UINT descendMask, UINT matrixWidth, UINT matrixHeight);

//
// GameApp::GPUSort
//

m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);

// 按行數據進行排序,先排序level <= BLOCK_SIZE 的全部狀況
for (UINT level = 2; level <= size && level <= BITONIC_BLOCK_SIZE; level *= 2)
{
    SetConstants(level, level, 0, 0);
    m_pd3dImmediateContext->Dispatch((size + BITONIC_BLOCK_SIZE - 1) / BITONIC_BLOCK_SIZE, 1, 1);
}

給更多的數據排序

上述代碼容許咱們對元素個數爲2到512的序列進行排序,但緩衝區的元素數目必須爲2的次冪。因爲在CS4.0中,一個線程組最多容許一個線程組包含768個線程,這意味着雙調排序僅容許在一個線程組中對最多512個元素進行排序。

接下來咱們看一個例子,假若有一個16元素的序列,然而線程組僅容許包含最多4個線程,那咱們將其放置在一個4x4的矩陣內:

而後對矩陣轉置:

能夠看到,經過轉置後,列數據變換到行數據的位置,這樣咱們就能夠進行跨度更大的交換操做了。處理完大跨度的交換後,咱們再轉置回來,處理行數據便可。

如今假定咱們已經對行數據排完序,下圖演示了剩餘的排序過程:

可是在線程組容許最大線程數爲4的狀況下,經過二維矩陣最多也只能排序16個數。。。。也許能夠考慮三維矩陣轉置法,這樣就能夠排序64個數了哈哈哈。。。

不過還有一個狀況咱們要考慮,就是元素數目不爲(2x2)的倍數,沒法構成一個方陣,但咱們也能夠把它變成對兩個方陣轉置。這時矩陣的寬是高的兩倍:

因爲元素個數爲32,它的最大索引跨度爲16,轉置後的索引跨度爲2,不會越界到另外一個方陣進行比較。可是當g_Level到32時,此時進行的是單調排序,g_DescendMask也必須設爲最大值32(而不是4),避免產生雙調序列。

經過下面的轉置算法,使得本來只能排序512個數的算法如今能夠排序最大262144個數了。

負責轉置的着色器實現以下:

// MatrixTranspose_CS.hlsl
#include "BitonicSort.hlsli"

#define TRANSPOSE_BLOCK_SIZE 16

groupshared uint shared_data[TRANSPOSE_BLOCK_SIZE * TRANSPOSE_BLOCK_SIZE];

[numthreads(TRANSPOSE_BLOCK_SIZE, TRANSPOSE_BLOCK_SIZE, 1)]
void CS(uint3 Gid : SV_GroupID,
    uint3 DTid : SV_DispatchThreadID,
    uint3 GTid : SV_GroupThreadID,
    uint GI : SV_GroupIndex)
{
    uint index = DTid.y * g_MatrixWidth + DTid.x;
    shared_data[GI] = g_Input[index];
    GroupMemoryBarrierWithGroupSync();
    
    uint2 outPos = DTid.yx % g_MatrixHeight + DTid.xy / g_MatrixHeight * g_MatrixHeight;
    g_Data[outPos.y * g_MatrixWidth + outPos.x] = shared_data[GI];
}

最後是GPU排序用的函數:

#define BITONIC_BLOCK_SIZE 512

#define TRANSPOSE_BLOCK_SIZE 16

void GameApp::GPUSort()
{
    UINT size = (UINT)m_RandomNums.size();

    m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
    m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);

    // 按行數據進行排序,先排序level <= BLOCK_SIZE 的全部狀況
    for (UINT level = 2; level <= size && level <= BITONIC_BLOCK_SIZE; level *= 2)
    {
        SetConstants(level, level, 0, 0);
        m_pd3dImmediateContext->Dispatch((size + BITONIC_BLOCK_SIZE - 1) / BITONIC_BLOCK_SIZE, 1, 1);
    }
    
    // 計算相近的矩陣寬高(寬>=高且須要都爲2的次冪)
    UINT matrixWidth = 2, matrixHeight = 2;
    while (matrixWidth * matrixWidth < size)
    {
        matrixWidth *= 2;
    }
    matrixHeight = size / matrixWidth;

    // 排序level > BLOCK_SIZE 的全部狀況
    ComPtr<ID3D11ShaderResourceView> pNullSRV;
    for (UINT level = BITONIC_BLOCK_SIZE * 2; level <= size; level *= 2)
    {
        // 若是達到最高等級,則爲全遞增序列
        if (level == size)
        {
            SetConstants(level / matrixWidth, level, matrixWidth, matrixHeight);
        }
        else
        {
            SetConstants(level / matrixWidth, level / matrixWidth, matrixWidth, matrixHeight);
        }
        // 先進行轉置,並把數據輸出到Buffer2
        m_pd3dImmediateContext->CSSetShader(m_pMatrixTranspose_CS.Get(), nullptr, 0);
        m_pd3dImmediateContext->CSSetShaderResources(0, 1, pNullSRV.GetAddressOf());
        m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV2.GetAddressOf(), nullptr);
        m_pd3dImmediateContext->CSSetShaderResources(0, 1, m_pDataSRV1.GetAddressOf());
        m_pd3dImmediateContext->Dispatch(matrixWidth / TRANSPOSE_BLOCK_SIZE, 
            matrixHeight / TRANSPOSE_BLOCK_SIZE, 1);

        // 對Buffer2排序列數據
        m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
        m_pd3dImmediateContext->Dispatch(size / BITONIC_BLOCK_SIZE, 1, 1);

        // 接着轉置回來,並把數據輸出到Buffer1
        SetConstants(matrixWidth, level, matrixWidth, matrixHeight);
        m_pd3dImmediateContext->CSSetShader(m_pMatrixTranspose_CS.Get(), nullptr, 0);
        m_pd3dImmediateContext->CSSetShaderResources(0, 1, pNullSRV.GetAddressOf());
        m_pd3dImmediateContext->CSSetUnorderedAccessViews(0, 1, m_pDataUAV1.GetAddressOf(), nullptr);
        m_pd3dImmediateContext->CSSetShaderResources(0, 1, m_pDataSRV2.GetAddressOf());
        m_pd3dImmediateContext->Dispatch(matrixWidth / TRANSPOSE_BLOCK_SIZE,
            matrixHeight / TRANSPOSE_BLOCK_SIZE, 1);

        // 對Buffer1排序剩餘行數據
        m_pd3dImmediateContext->CSSetShader(m_pBitonicSort_CS.Get(), nullptr, 0);
        m_pd3dImmediateContext->Dispatch(size / BITONIC_BLOCK_SIZE, 1, 1);
    }
}

最後是std::sort和雙調排序(使用NVIDIA GTX 850M)的比較結果:

元素數目 CPU排序(s) GPU排序(s) GPU排序+寫回內存總時(s)
512 0.000020 0.000003 0.000186
1024 0.000043 0.000007 0.000226
2048 0.000102 0.000009 0.000310
4096 0.000245 0.000009 0.000452
8192 0.000512 0.000010 0.000771
16384 0.001054 0.000012 0.000869
32768 0.002114 0.000012 0.001819
65536 0.004448 0.000014 0.002625
131072 0.009600 0.000015 0.005130
262144 0.021555 0.000021 0.008983
524288 0.048621 0.000030 0.015652
1048576 0.111219 0.000044 0.040927
2097152 0.257118 0.000072 0.188662
4194304 0.444995 0.000083 0.300240
8388608 0.959173 0.000124 0.465746
16777216 2.178453 0.000177 0.915898
33554432 5.869439 0.000361 2.269989

能夠初步看到雙調排序的排序用時比較穩定,而快速排序明顯隨元素數目增加而變慢。固然,若是GPU排序的數據量再更大一些的話,能夠看到時間的明顯增加。

可是!若是你用GPU排序後須要取回到CPU,由於GPU->CPU的速度比較慢,而且須要等待資源結束佔用才能開始取出,故須要排較大數目的數據(約10000以上)纔有明顯效益

DirectX11 With Windows SDK完整目錄

Github項目源碼

歡迎加入QQ羣: 727623616 能夠一塊兒探討DX11,以及有什麼問題也能夠在這裏彙報。

相關文章
相關標籤/搜索