SSE圖像算法優化系列十一:使用FFT變換實現圖像卷積。

      本文重點主要不在於FFT的SSE優化,而在於使用FFT實現快速卷積的相關技巧和過程。算法

      關於FFT變換,有不少參考的代碼,特別是對於長度爲2的整數次冪的序列,實現起來也是很是簡易的,而對於非2次冪的序列,就稍微有點麻煩了,matlab中是能夠實現任意長度FFT的,FFTW也是能夠的,而Opencv則有選擇性的實現了某些長度序列的變換,查看Opencv的代碼,能夠發現其只有對是4的整數次冪的數據部分採用了SSE優化,好比四、1六、6四、25六、1024這樣的序列部分,所以基4的FFT是最快的,而剩餘的部分則依舊是普通的C語言實現,我這裏主要是把Opencv的代碼摳出來了。多線程

     Opencv關於FFT實現的代碼在Opencv 3.0\opencv\sources\modules\core\src\dxt.cpp中,代碼寫的特別複雜,扣取工做也作的至關艱苦,其基4的SSE優化核心代碼以下所示:ide

// optimized radix-4 transform
template<> struct DFT_VecR4<float>
{
    int operator()(Complex<float>* dst, int N, int n0, int& _dw0, const Complex<float>* wave) const
    {
        int n = 1, i, j, nx, dw, dw0 = _dw0;
        __m128 z = _mm_setzero_ps(), x02=z, x13=z, w01=z, w23=z, y01, y23, t0, t1;
        Cv32suf t; t.i = 0x80000000;
        __m128 neg0_mask = _mm_load_ss(&t.f);
        __m128 neg3_mask = _mm_shuffle_ps(neg0_mask, neg0_mask, _MM_SHUFFLE(0,1,2,3));

        for( ; n*4 <= N; )
        {
            nx = n;
            n *= 4;
            dw0 /= 4;

            for( i = 0; i < n0; i += n )
            {
                Complexf *v0, *v1;

                v0 = dst + i;
                v1 = v0 + nx*2;

                x02 = _mm_loadl_pi(x02, (const __m64*)&v0[0]);
                x13 = _mm_loadl_pi(x13, (const __m64*)&v0[nx]);
                x02 = _mm_loadh_pi(x02, (const __m64*)&v1[0]);
                x13 = _mm_loadh_pi(x13, (const __m64*)&v1[nx]);

                y01 = _mm_add_ps(x02, x13);
                y23 = _mm_sub_ps(x02, x13);
                t1 = _mm_xor_ps(_mm_shuffle_ps(y01, y23, _MM_SHUFFLE(2,3,3,2)), neg3_mask);
                t0 = _mm_movelh_ps(y01, y23);
                y01 = _mm_add_ps(t0, t1);
                y23 = _mm_sub_ps(t0, t1);

                _mm_storel_pi((__m64*)&v0[0], y01);
                _mm_storeh_pi((__m64*)&v0[nx], y01);
                _mm_storel_pi((__m64*)&v1[0], y23);
                _mm_storeh_pi((__m64*)&v1[nx], y23);

                for( j = 1, dw = dw0; j < nx; j++, dw += dw0 )
                {
                    v0 = dst + i + j;
                    v1 = v0 + nx*2;

                    x13 = _mm_loadl_pi(x13, (const __m64*)&v0[nx]);
                    w23 = _mm_loadl_pi(w23, (const __m64*)&wave[dw*2]);
                    x13 = _mm_loadh_pi(x13, (const __m64*)&v1[nx]); // x1, x3 = r1 i1 r3 i3
                    w23 = _mm_loadh_pi(w23, (const __m64*)&wave[dw*3]); // w2, w3 = wr2 wi2 wr3 wi3

                    t0 = _mm_mul_ps(_mm_moveldup_ps(x13), w23);
                    t1 = _mm_mul_ps(_mm_movehdup_ps(x13), _mm_shuffle_ps(w23, w23, _MM_SHUFFLE(2,3,0,1)));
                    x13 = _mm_addsub_ps(t0, t1);
                    // re(x1*w2), im(x1*w2), re(x3*w3), im(x3*w3)
                    x02 = _mm_loadl_pi(x02, (const __m64*)&v1[0]); // x2 = r2 i2
                    w01 = _mm_loadl_pi(w01, (const __m64*)&wave[dw]); // w1 = wr1 wi1
                    x02 = _mm_shuffle_ps(x02, x02, _MM_SHUFFLE(0,0,1,1));
                    w01 = _mm_shuffle_ps(w01, w01, _MM_SHUFFLE(1,0,0,1));
                    x02 = _mm_mul_ps(x02, w01);
                    x02 = _mm_addsub_ps(x02, _mm_movelh_ps(x02, x02));
                    // re(x0) im(x0) re(x2*w1), im(x2*w1)
                    x02 = _mm_loadl_pi(x02, (const __m64*)&v0[0]);

                    y01 = _mm_add_ps(x02, x13);
                    y23 = _mm_sub_ps(x02, x13);
                    t1 = _mm_xor_ps(_mm_shuffle_ps(y01, y23, _MM_SHUFFLE(2,3,3,2)), neg3_mask);
                    t0 = _mm_movelh_ps(y01, y23);
                    y01 = _mm_add_ps(t0, t1);
                    y23 = _mm_sub_ps(t0, t1);

                    _mm_storel_pi((__m64*)&v0[0], y01);
                    _mm_storeh_pi((__m64*)&v0[nx], y01);
                    _mm_storel_pi((__m64*)&v1[0], y23);
                    _mm_storeh_pi((__m64*)&v1[nx], y23);
                }
            }
        }

        _dw0 = dw0;
        return n;
    }
};

  其實也不復雜,總以爲裏面的_mm_loadh_pi用的很好,這也得益於其數據源採用了Complex格式,這樣數據的實部和虛部在內存中是連續的,用SSE加載數據是也就很方便了。函數

  上面只是部分代碼,爲了配合該過程,還有不少的初始化,好比數據的shuffle等。詳見Opencv的文件。測試

  對於2維的FFT變換,我沒有去扣CV的代碼,而是直接先每行進行一維的FFT1D,而後對結果在進行列方向的FFT1D,因爲FFT1D算法須要處理的序列必須是連續的內存,所以,須要對中間的結果進行轉置,處理完後在轉置回來,爲了節省時間,這個轉置也應該用SSE優化。優化

  當2維的寬度和高度相同時,這個轉置是不須要分配另一份額外的內存的,這個叫In-Place轉置,另一個重要優勢就是FFT1D算法也支持In-Place操做。spa

  因爲是對Complex數據進行轉置,咱們能夠換個角度考慮,由於一個Complex正好和double數據佔用一樣的內存,這樣直接利用和double相關的SSE指令就能夠方便的實現Complex相關數據的轉置。好比當寬度和高度同樣時,刻直接使用下述方式。線程

inline void Inplace_TransposeSSE2X2D(double *Src, double *Dest, int Length)
{
    __m128d S0 = _mm_loadu_pd(Src);                        
    __m128d S1 = _mm_loadu_pd((Src + Length));            
    __m128d D0 = _mm_loadu_pd(Dest);                
    __m128d D1 = _mm_loadu_pd((Dest + Length));            
    _mm_storeu_pd(Dest, _mm_unpacklo_pd(S0, S1));
    _mm_storeu_pd(Dest + Length, _mm_unpackhi_pd(S0, S1));
    _mm_storeu_pd(Src, _mm_unpacklo_pd(D0, D1));
    _mm_storeu_pd(Src + Length, _mm_unpackhi_pd(D0, D1));
}

  參考系統的Intrinsic的_MM_TRANSPOSE4_PS函數,能夠發現double類型的轉置方式不太同樣,能夠直接使用有關的unpack函數,而不使用shuffle,固然使用shuffle也是沒有問題的。3d

  那麼最後我實現的FFT2D函數以下所示:code

int FFT2D(Complex *Input, Complex *Output, int Width, int Height, bool Invert, int StartZeroLines = 0, int EndZeroLines = 0)
{
    if ((Input == NULL) || (Output == NULL))                                return IM_STATUS_NULLREFRENCE;
    if ((IsPowerOfTwo(Width) == false) || (IsPowerOfTwo(Height) == false))    return IM_STATUS_INVALIDPARAMETER;

    if (Width == Height)
    {
        void *Buffer = FFT_Init(Width);
        if (Buffer == NULL)    return IM_STATUS_OUTOFMEMORY;

        for (int Y = StartZeroLines; Y < Height - EndZeroLines; Y++)
            FFT1D(Input + Y * Width, Output + Y * Width, Buffer, Width, Invert);            //    先水平方向變換
        InPlace_TransposeD((double *)Output, Width);                                        //    再位轉置

        for (int Y = 0; Y < Height; Y++)
            FFT1D(Output + Y * Width, Output + Y * Width, Buffer, Width, Invert);            //    再垂直方向變換
        InPlace_TransposeD((double *)Output, Width);
        FFT_Free(Buffer);
    }
    else
    {
        void *Buffer = FFT_Init(Width);
        if (Buffer == NULL)    return IM_STATUS_OUTOFMEMORY;

        for (int Y = StartZeroLines; Y < Height - EndZeroLines; Y++)
            FFT1D(Input + Y * Width, Output + Y * Width, Buffer, Width, Invert);
        Complex *Temp = (Complex *)malloc(Width * Height * sizeof(Complex));
        if (Temp == NULL)
        {
            FFT_Free(Buffer);
            return IM_STATUS_OUTOFMEMORY;
        }
        TransposeD((double *)Output, (double *)Temp, Width, Height);
        FFT_Free(Buffer);

        Buffer = FFT_Init(Height);                                                            //    必需要二次初始化
        if (Buffer == NULL)    return IM_STATUS_OUTOFMEMORY;

        for (int Y = 0; Y < Width; Y++)
            FFT1D(Temp + Y * Height, Temp + Y * Height, Buffer, Height, Invert);
        TransposeD((double *)Temp, (double *)Output, Height, Width);
        FFT_Free(Buffer);
        free(Temp);
    }
    return IM_STATUS_OK;
}

  注意在函數的最後我增長了StartZeroLines 和EndZeroLines 兩個參數,他們主要是在進行行方向的FFT1D是忽略最前面的StartZeroLines和最後面的EndZeroLines 個全0的行,由於全0的變換後面的結果仍是0,沒有必要進行計算,減小計算時間。在opencv中也是有個nonzero_rows的,可是他只是針對前面的行,而實際中,好比不少狀況是,先要擴展數據,而後把有效數據放置在左上角,這樣只有右下角有非零元素,因此opencv這樣作就沒法起到加速做用了。

  對扣取的代碼進行了實際的測試,1024*1024的數據,進行100次正反變換,耗時3000ms,使用matlab進行一樣的操做,耗時約5500ms,而且觀察任務管理器,在4核PC上CPU使用率100%,說明他內部使用了多線程,不過有一點就是matlab使用的是double類型的數據。據說matlab最新版使用的就是FFTW庫,不過不管如何,這個速度仍是能夠接受和至關快的。

  下面咱們重點談下基於FFT的圖像卷積的實現,理論上若是圖像a大小爲N * M,卷積核b大小爲 X * Y,則卷積實現的過程以下:

  首先擴展數據,擴展後的大小爲 (N + X - 1) * (M + Y - 1),將卷積核數據放置到擴展後的數據的左上角,其餘元素填充0,獲得bb, 對bb進行FFT2D正向變換獲得B,而後也將圖像數據放置到圖像的左上角,其餘元素填充爲0,獲得aa,對aa也進行FFT2D正向變換獲得B,接着對A和B進行點乘獲得C,最後對C進行逆向的FFT變換獲得D,最後取D的中間部分有效數據就是卷積的結果。

  舉個例子,假設圖像數據爲:

       

  卷積核爲:

         

  擴展後的圖像數據爲:

        

  擴展後的卷積數據爲:

        

  進行上述操做:D = ifft2(fft2(aa).*fft2(bb)),獲得:

        

  中間部分的數據就是卷積的有效數據。

  仔細觀察,能夠發現這樣的卷積在邊緣處是有問題,他把超出邊緣部分的數據所有用0代替了,所以這種卷積仍是不完美的,一種解決辦法就是首先把原圖沿着邊緣擴展,擴展的方式能夠是重複或者鏡像,擴展的大小固然就和卷積核的大小有關了,通常按卷積核中心對稱分佈,這時,邊緣擴展後的圖像大小爲  (N + X - 1) * (M + Y - 1)。

  也就是說合理的過程是進行兩次擴展,在進行FFT變換前的最終數據維度爲 (N + X - 1 + X - 1) * (M + Y - 1 + Y - 1),在進行逆變換獲得的結果中從第(X - 1, Y - 1)座標處取有效值。

  咱們在仔細審視下上述過程,第1、內存佔用方面,須要2個 (N + X - 1 + X - 1) * (M + Y - 1 + Y - 1) * sizeof(Complex)大小的內存,這很可觀的,特備是對於移動設備,第二,因爲咱們目前的FFT都是隻能處理2的整數次冪的序列,若是以全圖爲對象,則有可能會增長很大的無效計算,好比N + X - 1 + X - 1 若是等於1025,則須要調整至於2048,尺寸約大,這種無效計算的可能性越大,而這於咱們前面但願利用2的整數次冪的FFT最快的初衷就矛盾了。

  一種解決方法就是分塊計算,好比咱們把圖像分紅不少個知足條件 (NN+ X - 1 + X - 1)  = 256 和  (MM + Y - 1 + Y - 1) = 256的塊,其中NN * MM就是圖像分塊大的大小,這樣對卷積核那部分的FFT就是一次256*256的二維卷積,而且是公共的,這比原始的計算一塊很大的(N + X - 1 + X - 1) * (M + Y - 1 + Y - 1)要節省不少時間,同時,只佔用了很小的一塊內存。當卷積核的大小不大於50時,每次有效的計算的塊NN * MM相對於總體的2D FFT計算來講佔比仍是至關高的。這樣可有效的減小1025尺寸直接變成了2048這樣的FFT計算。

  另外注意一點,FFT卷積是虛部和實部的做用是同樣的,也就是說咱們能夠同時進行兩個不想關元素的計算,好比對於32位圖像,能夠把一個塊的Blue份量填充到實部,把Green份量填充到虛部,這樣一次性就完成了2個份量的計算。而對於灰度圖,則能夠同時計算兩個塊,即第一個塊的灰度值填充到實部,第二個塊的灰度填充到虛部。這樣一次性就完成了2個塊的計算。

  還有,在每一個塊的底部,有X - 1 + X -1個行是要填充爲0的,所以這些的行的一維FFT變換是沒有必要的,能夠優化掉。

  優化後的部分代碼以下:

int IM_FFTConv2(unsigned char *Src, unsigned char *Dest, int Width, int Height, int Stride, float *Kernel, int KerWidth, int KerHeight)                //    閾值
{
    int Channel = Stride / Width;
    if ((Src == NULL) || (Dest == NULL))                        return IM_STATUS_NULLREFRENCE;
    if ((Width <= 0) || (Height <= 0))                            return IM_STATUS_INVALIDPARAMETER;
    if ((KerWidth >50) || (KerHeight > 50))                        return IM_STATUS_INVALIDPARAMETER;            // 卷積核越大,每次的有效計算量就越小了
    if ((Channel != 1) && (Channel != 3) && (Channel != 4))        return IM_STATUS_INVALIDPARAMETER;

    int Status = IM_STATUS_OK;

    const int TileWidth = 256, TileHeight = 256;

    int HalfW = KerWidth / 2, HalfH = KerHeight / 2;
    int ValidW = TileWidth + 1 - KerWidth - (KerWidth - 1);                                //    第一個加1是由於卷積擴展的矩陣大小爲N+M-1,第二個-1是由於爲了取得有效數據,還要對邊緣進行擴展,擴展的大小爲KerHeight - 1,好比KerHeight=5,則每邊須要擴展2個像素,一共擴展4個像素
    int ValidH = TileHeight + 1 - KerHeight - (KerHeight - 1);                            //    默認的卷積的效果再卷積周邊是用0來填充的,若是分塊處理時,這明顯是不能知足要求的,會帶來明顯的塊於塊之間的分界線

    int TileAmountX = (Width / ValidW) + (Width % ValidW ? 1 : 0);                        //    圖像須要分紅的塊數
    int TileAmountY = (Height / ValidH) + (Height % ValidH ? 1 : 0);

    Complex *Conv = (Complex *)malloc(TileWidth * TileHeight * sizeof(Complex));        //    須要將卷積核擴展到TileWidth和TileHeight大小                    
    Complex *Tile = (Complex *)malloc(TileWidth * TileHeight * sizeof(Complex) * 3);    //    每一個小塊對應的數據,當爲24位模式時須要3分內存,灰度只需一份

    int *RowOffset = (int *)malloc((TileAmountX * ValidW + KerWidth) * sizeof(int));    //    每一個小塊取樣時的座標偏移,這樣在中間的塊也能夠取到周邊合理的只,在邊緣處則位鏡像值
    int *ColOffset = (int *)malloc((TileAmountY * ValidH + KerHeight) * sizeof(int));

    if ((Conv == NULL) || (Tile == NULL) || (RowOffset == NULL) || (ColOffset == NULL))
    {
        Status = IM_STATUS_OUTOFMEMORY;
        goto FreeMemory;
    }
    Status = IM_GetOffsetPos(RowOffset, Width, HalfW, TileAmountX * ValidW + (KerWidth - HalfW) - Width, IM_EDGE_MIRROR);            //    左右對稱
    if (Status != IM_STATUS_OK) goto FreeMemory;
    Status = IM_GetOffsetPos(ColOffset, Height, HalfH, TileAmountY * ValidH + (KerHeight - HalfH) - Height, IM_EDGE_MIRROR);
    if (Status != IM_STATUS_OK) goto FreeMemory;

    memset(Conv, 0, TileWidth * TileHeight * sizeof(Complex));                //    卷積核的其餘元素都爲0,這裏先總體賦值爲0
    for (int Y = 0; Y < KerHeight; Y++)
    {
        int Index = Y * KerWidth;
        for (int X = 0; X < KerWidth; X++)
        {
            Conv[Y * TileWidth + X].Real = Kernel[Index + X];                //    卷積核須要放置在左上角
        }
    }
    
    Status = FFT2D(Conv, Conv, TileWidth, TileHeight, false, 0, TileHeight - KerHeight);                //    對卷積核進行FFT變換,注意行方向上下部都爲0,這樣能夠節省部分計算時間    
    if (Status != IM_STATUS_OK)    goto FreeMemory;

    if (Channel == 1)                                                                                    //    單通道時能夠一次性處理2個塊
    {

    }
    else if (Channel == 3)                                                                // 3通道時能夠利用兩個塊的信息分別填充到BG  RB  GR 序列裏,這樣就更快了
    {

    }
    else if (Channel == 4)
    {
        Complex *TileBG = Tile, *TileRA = Tile + TileWidth * TileHeight;
        for (int TileY = 0; TileY < TileAmountY; TileY++)
        {
            for (int TileX = 0; TileX < TileAmountX; TileX++)
            {
                IM_Rectangle SrcR, ValidR;
                IM_SetRect(&SrcR, TileX * ValidW, TileY * ValidH, TileX * ValidW + ValidW, TileY * ValidH + ValidH);
                IM_SetRect(&ValidR, 0, 0, Width, Height);
                IM_IntersectRect(&ValidR, ValidR, SrcR);
                for (int Y = ValidR.Top; Y < ValidR.Bottom + KerHeight - 1; Y++)
                {
                    byte *LinePS = Src + ColOffset[Y] * Stride;
                    int Index = (Y - ValidR.Top) * TileWidth;
                    for (int X = ValidR.Left; X < ValidR.Right + KerWidth - 1; X++)
                    {
                        byte *Sample = LinePS + RowOffset[X] * 4;
                        TileBG[Index].Real = Sample[0];
                        TileBG[Index].Imag = Sample[1];
                        TileRA[Index].Real = Sample[2];
                        TileRA[Index].Imag = Sample[3];
                        Index++;
                    }
                }
                Status = FFT2D(TileBG, TileBG, TileWidth, TileHeight, false, 0, TileHeight - (ValidR.Bottom + KerHeight - 1 - ValidR.Top));
                if (Status != IM_STATUS_OK)    goto FreeMemory;

                Status = FFT2D(TileRA, TileRA, TileWidth, TileHeight, false, 0, TileHeight - (ValidR.Bottom + KerHeight - 1 - ValidR.Top));
                if (Status != IM_STATUS_OK)    goto FreeMemory;

                for (int Y = 0; Y < TileWidth * TileHeight; Y++)
                {
                    float Temp = TileBG[Y].Real;
                    TileBG[Y].Real = TileBG[Y].Real * Conv[Y].Real - TileBG[Y].Imag * Conv[Y].Imag;        
                    TileBG[Y].Imag = Temp * Conv[Y].Imag + TileBG[Y].Imag * Conv[Y].Real;
                    Temp = TileRA[Y].Real;
                    TileRA[Y].Real = TileRA[Y].Real * Conv[Y].Real - TileRA[Y].Imag * Conv[Y].Imag;
                    TileRA[Y].Imag = Temp * Conv[Y].Imag + TileRA[Y].Imag * Conv[Y].Real;
                }
                FFT2D(TileBG, TileBG, TileWidth, TileHeight, true);
                if (Status != IM_STATUS_OK)    goto FreeMemory;

                FFT2D(TileRA, TileRA, TileWidth, TileHeight, true);
                if (Status != IM_STATUS_OK)    goto FreeMemory;

                for (int Y = ValidR.Top; Y < ValidR.Bottom; Y++)
                {
                    byte *LinePD = Dest + Y * Stride + ValidR.Left * 4;
                    int Index = (Y - ValidR.Top + KerHeight - 1) * TileWidth + KerWidth - 1;
                    for (int X = ValidR.Left; X < ValidR.Right; X++)
                    {
                        LinePD[0] = IM_ClampToByte(TileBG[Index].Real);                        //    取有效的數據
                        LinePD[1] = IM_ClampToByte(TileBG[Index].Imag);
                        LinePD[2] = IM_ClampToByte(TileRA[Index].Real);
                        LinePD[3] = IM_ClampToByte(TileRA[Index].Imag);
                        LinePD += 4;
                        Index++;
                    }
                }
            }
        }
    }
FreeMemory:
    if (RowOffset != NULL) free(RowOffset);
    if (ColOffset != NULL) free(ColOffset);
    if (Conv != NULL) free(Conv);
    if (Tile != NULL) free(Tile);
    return Status;
}

 

  程序裏我固定每一個塊的大小爲256*256,這主要是考慮256是4的整數次冪,是可以徹底用SSE優化掉的,同時也佔用更少的內存。

  若是考慮更好的處理,在最後一列以及最後一行那個塊,由於有效元素的減小,能夠考慮使用更小的塊來計算,不過這會增長程序的複雜性。本例沒有考慮了。

  速度測試:

  3000*3000的灰度圖    卷積核5*5         328ms

                     卷積核15*15        348ms

                卷積核25*25       400ms

                卷積核49*49       600ms

  之前我寫過基於SSE直接卷積,同一個機器上也彙報下測試速度:

  3000*3000的灰度圖    卷積核5*5         264ms

                     卷積核15*15       550ms

                卷積核25*25       1300ms

  因此我之前那篇文章的有些結論是錯誤的。

  測試工程的地址:http://files.cnblogs.com/files/Imageshop/SSE_Optimization_Demo.rar

 

 

相關文章
相關標籤/搜索