因未測試其餘做者的算法時間和效率,本文不敢自稱是最快的,可是速度也能夠確定說是至關快的,在一臺I5機器上佔用單核的資源處理 3000 * 2000的灰度數據用時約 20ms,而且算法和核心的大小是無關的,即所謂的o(1)算法。html
在實現本算法以前,也曾經參考何凱明在暗通道去霧時提出的一篇參考論文中的算法: STREAMING MAXIMUM-MINIMUM FILTER USING NO MORE THAN THREE COMPARISONS PER ELEMENT ,這篇文章所描述的過程也是o(1)的,速度也至關快,不過仍是不夠快。算法
我曾經本身構思了一個想法,也是基於行列分離的,在速度上比上文的代碼又要快,而且也是o(1)算法,可是算法速度和圖片的內容有關,好比對一個圖進行了一次算法後,再次對結果執行相同的算法,可能後一次就要慢不少,這和我改進的算法自己有關係,可是這種狀況是不多見的。數組
本文所要介紹的算法也是在好久之前就看到過的,可是一直沒有引發個人重視,其對應的參考論文是 A fast algorithm for local minimum and maximum filters on rectangular and octagonal kernels ,當時以爲論文的實現起來可能沒有我本身構思的快,也就沒有去深究。ide
論文的實現步驟也是基於行列分離的,即先進行行方向的一維運算,而後再對結果進行列方向的一維計算,具體的理論描述你們去研究論文吧。函數
那其實論文的核心就是下面這個圖。post
In表示一維行方向的輸入數據,g/h分別運算過程當中的兩個中間數據,和輸入數據大小同樣。性能
如上圖所示,咱們假定須要進行計算的核大小爲R,那麼將一行分爲多個大小爲 D =(2R+1) 的分段,例如圖中R=2, D=5 ,對每個分段進行預處理,其中 x 號位置存放的是箭頭所在直線段上的點中的最大值(最小值),如此處理獲得 g 和 h 兩個數組,那麼對於某個點(索引爲I),其半徑R內的最大(小)值爲:Max/ Min(g(I+R),h(I-R))。測試
過程好簡單。優化
咱們拿一組數據來講明上述過程,假如一行數據以下,咱們要進行膨脹操做(最大值),核半徑爲2:url
In: 20 12 35 9 10 7 32 15 20 45 28
對應的g/h爲:
g: 20 20 35 35 35 7 32 32 32 45 45
h: 35 35 35 10 9 45 45 45 45 45 28
若是咱們要計算第4個點的半徑爲2的最大值,則對應 g(I+R) = g(4+2) = 7, h(I-R)=h(4-2)=35, 獲得結果爲max(7,35) = 35;
若是咱們要計算第6個點的半徑爲2的最大值,則對應 g(I+R) = g(6+2) = 32, h(I-R)=h(6-2)=10, 獲得結果爲max(32,10) = 32;
注意上面索引是以1位小標計數起點的。
邊緣處理:
注意到在邊緣處,好比左側邊緣,當要計算的點的索引小於R時,這時h值是無效的,而在右側邊緣,g值是沒法取到的,可是仔細觀察,問題很簡單,還拿上述數據爲例,若是要計算索引爲2處半徑爲2的最值,因爲h(2-2)是超出索引的(前面說了本例以1爲下標起點),所以不能用前述的方式,那咱們迴歸到問題的本質,計算2處半徑爲2的最值,就是計算max(In(1), In(2), In(3), In(4)), 而這個值不就是g數據中索引爲2+2處的數據嗎,在這以前就已經幫咱們算法,直接取值就能夠了。
在數據尾部(右側),狀況有所不一樣,咱們應該從H中取值。
從上述的分析可知,這個算法有個特性,就是半徑越大,算法的耗時會越小,由於對邊緣的處理只須要拷貝數據,而沒有了更多的計算,可能不少人不信吧。
算法實現:
有了上面的描述,要實現一個快速的腐蝕或膨脹算法相信對部分來講應該是一件很是容易的事情,先行方向處理,在列方向,好簡單。
最近我是迷上了SSE算法優化,因而就思考了這個算法的SSE優化,之前在看SSE的函數時,就一直在想_mm_max_epi8/_mm_min_epi8這種一次性能獲取16個字節數據的最值的函數是否能用在腐蝕和膨脹上,不過因爲他們是對兩個連續數據的比較,在行方向上是難以運用的,可是若是數據比較是列方向上,那不就可使用了嗎。
咱們上述算法就有列方向的比較,不就有了用武之地了。
首先,咱們給出在列方向更新g值/h值在每一個分段範圍內的C語言實現代碼,好比獲取g值大概的代碼以下:
memcpy(G + StartY * ValidDataLength, Src + StartY * Stride, ValidDataLength); for (int Y = StartY + 1; Y < EndY; Y++) { unsigned char *LinePS = Src + Y * Stride; unsigned char *LinePD = G + Y * ValidDataLength; unsigned char *LinePD = G + (Y - 1) * ValidDataLength; for (int X = 0; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } }
StartY爲計算好的分段範圍的起點,EndY爲分段範圍的終點,咱們觀察g數據的規律,知道在分段範圍內第一行的最大值就是數據自己,然後面的則是和前一行比較獲得結果。
很明顯:
for (int X = 0; X < Width * Channel; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); }
這個代碼很容易向量化,若是這裏是浮點運算,編譯器會直接幫咱們向量處理的,可是對於字節,彷佛編譯器尚未那麼智能,咱們本身手動來向量化,代碼以下:
memcpy(G + StartY * ValidDataLength, Dest + StartY * ValidDataLength, ValidDataLength); // 每段G數據第一行就是原始的數據 for (int Y = StartY + 1; Y < EndY; Y++) { unsigned char *LinePS = Dest + Y * ValidDataLength; unsigned char *LinePD = G + Y * ValidDataLength; unsigned char *LinePL = G + (Y - 1) * ValidDataLength; for (int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i *)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + X)), _mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X = Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } }
其中BlockSize = 16, Block = ValidDataLength / BlockSize;
這段代碼很簡單,對於h的處理也是相似的道理。
當咱們得到了g,h的數據後,後面的處理過程的C代碼也很簡單:
for (int Y = Radius; Y < IM_Min(BlockV * Size - Radius, Width); Y++) // 這些位於中間的數據是符合 G+Radius 和 R - Radius 取大的要求的 { unsigned char *LinePG = G + IM_Min(Y + Radius, Width - 1) * Width; unsigned char *LinePH = H + (Y - Radius) * ValidDataLength; unsigned char *LinePD = T + Y * ValidDataLength;for (int X = 0; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePG[X], LinePH[X]); } }
又是一樣的道理,內部的for循環可直接用SSE代替。
可是這裏只是對列方向進行處理,行方向有沒有可能用SSE作處理呢,能夠確定的說,絕對能夠,可是除非你很是有耐心,覺得其中各類pack和unpack或者shuffle會讓你瘋掉,並且最後的效率也許還不如直接使用普通的C語言。
那麼如何處理呢,我想你們確定能想到轉置,確實,對數據進行轉置後再進行列方向的處理,而後再轉置回來就至關於對原數據的行方向處理。
關於轉置,一直也是個耗時的過程,可是我在圖像轉置的SSE優化(支持8位、24位、32位),提速4-6倍 一文中提到了利用SSE實現高速的轉置操做,利用它去實現本文的流程則很是靠譜。
那麼咱們貼出總體的大部分處理代碼:
垂直方向的核心:
int IM_Vert_MaxFilter(unsigned char *Src, unsigned char *Dest, int Width, int Height, int Stride, int Radius) { int Channel = Stride / Width; if ((Src == NULL) || (Dest == NULL)) return IM_STATUS_NULLREFRENCE; if ((Width <= 0) || (Height <= 0)) return IM_STATUS_INVALIDPARAMETER; if ((Channel != 1) && (Channel != 3) && (Channel != 4)) return IM_STATUS_INVALIDPARAMETER; // 從節省內存的角度考慮,能夠只須要兩倍額外的內存 unsigned char *G = (unsigned char *)malloc(Height * Width * Channel * sizeof(unsigned char)); unsigned char *H = (unsigned char *)malloc(Height * Width * Channel * sizeof(unsigned char)); if ((G == NULL) || (H == NULL)) { if (G != NULL) free(G); if (H != NULL) free(H); return IM_STATUS_OUTOFMEMORY; } // 垂直方向處理 int Size = Radius * 2 + 1, ValidDataLength = Width * Channel; int BlockSize = 16, Block = ValidDataLength / BlockSize; int BlockV = ((Height % Size) == 0 ? Height / Size : (Height / Size) + 1); for (int Index = 0; Index < BlockV; Index++) { int StartY = Index * Size, EndY = IM_Min(Index * Size + Size, Height); memcpy(G + StartY * ValidDataLength, Src + StartY * Stride, ValidDataLength); // 每段G數據第一行就是原始的數據 for (int Y = StartY + 1; Y < EndY; Y++) { unsigned char *LinePS = Src + Y * Stride; unsigned char *LinePD = G + Y * ValidDataLength; unsigned char *LinePL = G + (Y - 1) * ValidDataLength; for (int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i *)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + X)), _mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X = Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } } memcpy(H + StartY * ValidDataLength, G + (EndY - 1) * ValidDataLength, ValidDataLength); // 每段H數據的第一行就是對應G數據的最後一行 memcpy(H + (EndY - 1) * ValidDataLength, Src + (EndY - 1) * Stride, ValidDataLength); // 每段H數據的最後一行就是原始的數據 for (int Y = EndY - 2; Y > StartY; Y--) // 注意循環次數的改變 { unsigned char *LinePS = Src + Y * Stride; unsigned char *LinePD = H + Y * ValidDataLength; unsigned char *LinePL = H + (Y + 1) * ValidDataLength; for (int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i *)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + X)), _mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X = Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } } // 針對最值算法,在列方向最後一塊不是Size大小時,後面的數據只能是重複邊緣像素,這樣後面跟的G/H值和Height - 1大小是相同的 } // 整個的數據分爲三個部分,[0, Radius]爲第一組,[Radius, BlockV * Size - Radius]爲第二組,[BlockV * Size - Radius, BlockV * Size]爲第三組, // 第一組數據的結果取G中[Radius, 2 * Radius]的值,第二組數據取G + Radius和H - Radius中的小值,第三組取H - Radius的值。 // 最頂部的一半數據,此時的H數據無效 // // 此處刪除若干代碼 // for (int Y = Radius; Y < IM_Min(BlockV * Size - Radius, Height); Y++) // 這些位於中間的數據是符合 G + Radius 和 R - Radius 取大的要求的 { unsigned char *LinePG = G + IM_Min(Y + Radius, Height - 1) * ValidDataLength; // 有可能超出範圍的 unsigned char *LinePH = H + (Y - Radius) * ValidDataLength; unsigned char *LinePD = Dest + Y * Stride; for (int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i *)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePG + X)), _mm_loadu_si128((__m128i *)(LinePH + X)))); } for (int X = Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePG[X], LinePH[X]); } } // 最底部的一半數據,此時的G數據無用 // // 此處刪除若干代碼 // free(G); free(H); return IM_STATUS_OK; }
綜合的調用:
int IM_MaxFilter(unsigned char *Src, unsigned char *Dest, int Width, int Height, int Stride, int Radius) { int Channel = Stride / Width; if ((Src == NULL) || (Dest == NULL)) return IM_STATUS_NULLREFRENCE; if ((Width <= 0) || (Height <= 0)) return IM_STATUS_INVALIDPARAMETER; if ((Channel != 1) && (Channel != 3) && (Channel != 4)) return IM_STATUS_INVALIDPARAMETER; int Status = IM_STATUS_OK; unsigned char *T = (unsigned char *)malloc(Height * Stride * sizeof(unsigned char)); if (T == NULL) return IM_STATUS_OUTOFMEMORY; Status = IM_Vert_MaxFilter(Src, T, Width, Height, Stride, Radius); if (Status != IM_STATUS_OK) goto FreeMemory; Status = IM_Transpose(T, Dest, Width, Height, Stride, Height, Width, Height * Channel); // 轉置,注意Dest我只用了Height * Channel的數據 if (Status != IM_STATUS_OK) goto FreeMemory; Status = IM_Vert_MaxFilter(Dest, T, Height, Width, Height * Channel, Radius); if (Status != IM_STATUS_OK) goto FreeMemory; Status = IM_Transpose(T, Dest, Height, Width, Height * Channel, Width, Height, Stride); FreeMemory: free(T); return Status; }
上面代碼中還有不少細節,包括分塊尾部不完整數據的處理,你們能夠本身理解。
有兩處刪除了部分代碼,刪除的代碼是很容易補上去的,由於我不喜歡個人代碼被別人直接複製黏貼。
進一步的分析:
由上面的代碼能夠看到,要實現整個的過程,咱們須要原圖3倍大小的額外內存,那麼是否有下降這個的可能性呢,是有的,在處理列方向時,咱們能夠一次性只處理16列或32列,這樣g/h數據只各須要Height * 16(32) * sizeof(unsigned char)大小的內存,並且這樣作還有一個優點就是在每一個分段內部比較時,因爲更新的內容較少,能夠用一個xmm寄存器保存最值得臨時結果,這樣就不用去加載上一行的內存數據,少了不少次內存讀寫的過程,一個簡單的示例代碼以下所示:
unsigned char *LinePS = Src + StartY * Stride + X; unsigned char *LinePD = G + StartY * 32; __m128i Max1 = _mm_setzero_si128(), Max2 = _mm_setzero_si128(); // 這樣寫能減小一次內存加載 for (int Y = StartY; Y < EndY; Y++) { Max1 = _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + 0)), Max1); // 或者使用一條AVX指令 Max2 = _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + 16)), Max2); _mm_store_si128((__m128i *)(LinePD + 0), Max1); _mm_store_si128((__m128i *)(LinePD + 16), Max2); LinePS += Stride; LinePD += 32; }
在個人筆記本中測試,這個的速度要比上面的版本還快一點,而且有佔用了更少的內存,一箭雙鵰啊。
歡迎你們能提供更快速的算法的實現思路。
本文Demo下載地址: http://files.cnblogs.com/files/Imageshop/SSE_Optimization_Demo.rar,裏面的全部算法都是基於SSE實現的。
若是以爲本文對你有幫助,請爲本文點個贊。
2019.4.26日更新:
在本文的評論裏有網友提出該文的速度仍是不夠快,比起商業軟件還有很大的差距,一直未找到良方,最近有網友說他作的比我這個速度要快,其提到了不須要徹底轉置。雖未提到代碼層面的東西,可是這種事情是一點就通的,要實現起來仍是很容易的。
比起全局轉置,局部轉置能夠只須要分配不多的內存,從速度方面考慮,咱們在進行了垂直方向的優化後,就要進行水平方向處理,此時,我每次轉置32行(或其餘16的倍數),而後利用垂直方向的處理技巧處理轉置後數據的垂直方向最值,處理完成後在轉置到水平方向對應的32行數據中,至於不能被32行整除的那一部分,就用普通的方式處理了。
此種優化後,咱們驚喜的發現,速度較以前有2到3倍的提升,以下圖所示:
其實,總體來看,修改後的代碼計算量上並無什麼減小,那主要耗時下降了,其核心在於減小了內存的Cache Miss,這種技巧在不少算法中也能夠加以推廣。
最後,咱們還注意到一個問題,表面上看垂直方向的代碼更爲簡介,流程也少那些轉置的過程,可是最後實測垂直處理的時間和水平處理的時間的佔比約爲內存·6:4,分析認爲這個的主要緣由是在垂直方向處理時圖比較大,連續訪問垂直方向的內存,Cache miss比較多,而水平方向由於是分塊處理的,中間用到的臨時內存訪問時基本無啥Cache miss(寬度爲32的臨時區都是連續訪問的)。
修改後的算法對於評論區裏博友提到的4096X8192大小的的灰度圖,其耗時大概在70ms,比其說的商業軟件的速度仍是要慢一倍的。在Opencv中,使用cvErode函數,發現他也很是很是的快,和我這裏的速度旗鼓至關,有的時候還快點,查看其代碼,發現他核心的地方是調用hal:morph實現的,可是HAL硬件加速層究竟是個啥傢伙,實在是搞不清楚。