基於OpenMP的矩陣乘法實現及效率提高分析

一.  矩陣乘法串行實現linux

例子選擇兩個1024*1024的矩陣相乘,根據矩陣乘法運算獲得運算結果。其中,兩個矩陣中的數爲double類型,初值由隨機數函數產生。代碼以下:ios

#include <iostream>  
#include <omp.h> // OpenMP編程須要包含的頭文件
#include <time.h>
#include <stdlib.h>

using namespace std;

#define MatrixOrder 1024
#define FactorIntToDouble 1.1; //使用rand()函數產生int型隨機數,將其乘以因子轉化爲double型;

double firstParaMatrix [MatrixOrder] [MatrixOrder] = {0.0};  
double secondParaMatrix [MatrixOrder] [MatrixOrder] = {0.0};
double matrixMultiResult [MatrixOrder] [MatrixOrder] = {0.0};

//計算matrixMultiResult[row][col]
double calcuPartOfMatrixMulti(int row,int col)
{
    double resultValue = 0;
    for(int transNumber = 0 ; transNumber < MatrixOrder ; transNumber++) {
        resultValue += firstParaMatrix [row] [transNumber] * secondParaMatrix [transNumber] [col] ;
    }
    return resultValue;
}

/* * * * * * * * * * * * * * * * * * * * * * * * *
* 使用隨機數爲乘數矩陣和被乘數矩陣賦double型初值 *
* * * * * * * * * * * * * * * * * * * * * * * * */
void matrixInit()
{
    for(int row = 0 ; row < MatrixOrder ; row++ ) {
        for(int col = 0 ; col < MatrixOrder ;col++){
            srand(row+col);
            firstParaMatrix [row] [col] = ( rand() % 10 ) * FactorIntToDouble;
            secondParaMatrix [row] [col] = ( rand() % 10 ) * FactorIntToDouble;
        }
    }
}

/* * * * * * * * * * * * * * * * * * * * * * * 
* 實現矩陣相乘 *                    
* * * * * * * * * * * * * * * * * * * * * * * */
void matrixMulti()
{
    for(int row = 0 ; row < MatrixOrder ; row++){
        for(int col = 0; col < MatrixOrder ; col++){
            matrixMultiResult [row] [col] = calcuPartOfMatrixMulti (row,col);
        }
    }
}

int main()  
{ 
    matrixInit();

    clock_t t1 = clock(); //開始計時;
    matrixMulti();
    clock_t t2 = clock(); //結束計時
      cout<<"time: "<<t2-t1<<endl; 
    
    system("pause");

    return 0;  
}  

 

 矩陣乘法並行實現算法

使用#pragma omp parallel for爲for循環添加並行,使用num_threads()函數指定並行線程數。編程

   使用VS2010編譯,須要先在項目屬性中選擇支持openmp,在頭文件中包含<omp.h>便可使用openmp爲矩陣乘法實現並行。框架

   代碼以下:函數

#include <iostream>  
#include <omp.h> // OpenMP編程須要包含的頭文件
#include <time.h>
#include <stdlib.h>

using namespace std;

#define MatrixOrder 1024
#define FactorIntToDouble 1.1; //使用rand()函數產生int型隨機數,將其乘以因子轉化爲double型;

double firstParaMatrix [MatrixOrder] [MatrixOrder] = {0.0};  
double secondParaMatrix [MatrixOrder] [MatrixOrder] = {0.0};
double matrixMultiResult [MatrixOrder] [MatrixOrder] = {0.0};

//計算matrixMultiResult[row][col]
double calcuPartOfMatrixMulti(int row,int col)
{
    double resultValue = 0;
    for(int transNumber = 0 ; transNumber < MatrixOrder ; transNumber++) {
        resultValue += firstParaMatrix [row] [transNumber] * secondParaMatrix [transNumber] [col] ;
    }
    return resultValue;
}

/* * * * * * * * * * * * * * * * * * * * * * * * *
* 使用隨機數爲乘數矩陣和被乘數矩陣賦double型初值 *
* * * * * * * * * * * * * * * * * * * * * * * * */
void matrixInit()
{
    #pragma omp parallel for num_threads(64)
    for(int row = 0 ; row < MatrixOrder ; row++ ) {
        for(int col = 0 ; col < MatrixOrder ;col++){
            srand(row+col);
            firstParaMatrix [row] [col] = ( rand() % 10 ) * FactorIntToDouble;
            secondParaMatrix [row] [col] = ( rand() % 10 ) * FactorIntToDouble;
        }
    }
    //#pragma omp barrier
}

/* * * * * * * * * * * * * * * * * * * * * * * 
* 實現矩陣相乘 *                    
* * * * * * * * * * * * * * * * * * * * * * * */
void matrixMulti()
{

    #pragma omp parallel for num_threads(64)
    for(int row = 0 ; row < MatrixOrder ; row++){
        for(int col = 0; col < MatrixOrder ; col++){
            matrixMultiResult [row] [col] = calcuPartOfMatrixMulti (row,col);
        }
    }
    //#pragma omp barrier
}

int main()  
{ 
    matrixInit();

    clock_t t1 = clock(); //開始計時;
    matrixMulti();
    clock_t t2 = clock(); //結束計時
      cout<<"time: "<<t2-t1<<endl; 
    
    system("pause");

    return 0;  
}  

 

三 效率對比spa

運行以上兩種方法,對比程序運行時間。線程

當矩陣階數爲1024時,串行和並行中矩陣乘法耗費時間以下:code

串行:blog

並行:

可看出,階數爲1024時並行花費的時間大約是串行的五分之一。

 

當改變矩陣階數,並行和串行所花費時間以下:

 

128

256

512

1024

2048

並行

0

31

164

3491

43203

串行

16

100

516

15584

134818

 

   畫成折線圖以下:

 

   加速比曲線以下(將串行時間除以並行時間):

 

 

 

   從以上圖表能夠看出當矩陣規模不大(階數小於500)時,並行算法與串行算法差距不大,當階數到達1000、2000時,差距就很是明顯。並且,並不是隨着矩陣規模越大,加速比就會越大。在本機硬件條件下,並行線程數爲64時,大約在1024附近會有較高加速比。

 

四 矩陣分塊相乘並行算法

將矩陣乘法的計算轉化爲其各自分塊矩陣相乘然後相加,可以有效減小乘數矩陣和被乘數矩陣調入內存的次數,可加快程序執行。

代碼以下:

#include <iostream>  
#include <omp.h> // OpenMP編程須要包含的頭文件
#include <time.h>
#include <stdlib.h>

using namespace std;

#define MatrixOrder 2048
#define FactorIntToDouble 1.1; //使用rand()函數產生int型隨機數,將其乘以因子轉化爲double型;

double firstParaMatrix [MatrixOrder] [MatrixOrder] = {0.0};  
double secondParaMatrix [MatrixOrder] [MatrixOrder] = {0.0};
double matrixMultiResult [MatrixOrder] [MatrixOrder] = {0.0};

/* * * * * * * * * * * * * * * * * * * * * * * * *
* 使用隨機數爲乘數矩陣和被乘數矩陣賦double型初值 *
* * * * * * * * * * * * * * * * * * * * * * * * */
void matrixInit()
{
    //#pragma omp parallel for num_threads(64)
    for(int row = 0 ; row < MatrixOrder ; row++ ) {
        for(int col = 0 ; col < MatrixOrder ;col++){
            srand(row+col);
            firstParaMatrix [row] [col] = ( rand() % 10 ) * FactorIntToDouble;
            secondParaMatrix [row] [col] = ( rand() % 10 ) * FactorIntToDouble;
        }
    }
    //#pragma omp barrier
}

void smallMatrixMult (int upperOfRow , int bottomOfRow , 
                      int leftOfCol , int rightOfCol ,
                      int transLeft ,int transRight )
{
    int row=upperOfRow;
    int col=leftOfCol;
    int trans=transLeft;

    #pragma omp parallel for num_threads(64)
    for(int row = upperOfRow ; row <= bottomOfRow ; row++){
        for(int col = leftOfCol ; col < rightOfCol ; col++){
            for(int trans = transLeft ; trans <= transRight ; trans++){
                matrixMultiResult [row] [col] += firstParaMatrix [row] [trans] * secondParaMatrix [trans] [col] ;
            }
        }
    }
    //#pragma omp barrier
}

/* * * * * * * * * * * * * * * * * * * * * * * 
* 實現矩陣相乘 *                    
* * * * * * * * * * * * * * * * * * * * * * * */
void matrixMulti(int upperOfRow , int bottomOfRow , 
                 int leftOfCol , int rightOfCol ,
                 int transLeft ,int transRight )
{
    if ( ( bottomOfRow - upperOfRow ) < 512 ) 
        smallMatrixMult ( upperOfRow , bottomOfRow , 
                          leftOfCol , rightOfCol , 
                          transLeft , transRight );

    else
    {
        #pragma omp task
        {
            matrixMulti( upperOfRow , ( upperOfRow + bottomOfRow ) / 2 ,
                         leftOfCol , ( leftOfCol + rightOfCol ) / 2 ,
                         transLeft , ( transLeft + transRight ) / 2 );
            matrixMulti( upperOfRow , ( upperOfRow + bottomOfRow ) / 2 ,
                         leftOfCol , ( leftOfCol + rightOfCol ) / 2 ,
                         ( transLeft + transRight ) / 2 + 1 , transRight );
        }

        #pragma omp task
        {
            matrixMulti( upperOfRow , ( upperOfRow + bottomOfRow ) / 2 ,
                         ( leftOfCol + rightOfCol ) / 2 + 1 , rightOfCol ,
                         transLeft , ( transLeft + transRight ) / 2 );
            matrixMulti( upperOfRow , ( upperOfRow + bottomOfRow ) / 2 ,
                         ( leftOfCol + rightOfCol ) / 2 + 1 , rightOfCol ,
                         ( transLeft + transRight ) / 2 + 1 , transRight );
        }
        

        #pragma omp task
        {
            matrixMulti( ( upperOfRow + bottomOfRow ) / 2 + 1 , bottomOfRow ,
                         leftOfCol , ( leftOfCol + rightOfCol ) / 2 ,
                         transLeft , ( transLeft + transRight ) / 2 );
            matrixMulti( ( upperOfRow + bottomOfRow ) / 2 + 1 , bottomOfRow ,
                         leftOfCol , ( leftOfCol + rightOfCol ) / 2 ,
                         ( transLeft + transRight ) / 2 + 1 , transRight );
        }

        #pragma omp task
        {
            matrixMulti( ( upperOfRow + bottomOfRow ) / 2 + 1 , bottomOfRow ,
                         ( leftOfCol + rightOfCol ) / 2 + 1 , rightOfCol ,
                         transLeft , ( transLeft + transRight ) / 2 );
            matrixMulti( ( upperOfRow + bottomOfRow ) / 2 + 1 , bottomOfRow ,
                         ( leftOfCol + rightOfCol ) / 2 + 1 , rightOfCol ,
                         ( transLeft + transRight ) / 2 + 1 , transRight );
        }

        #pragma omp taskwait
    }
}

int main()  
{ 
    matrixInit();

    clock_t t1 = clock(); //開始計時;

    //smallMatrixMult( 0 , MatrixOrder - 1 , 0 , MatrixOrder -1 , 0 , MatrixOrder -1 );
    matrixMulti( 0 , MatrixOrder - 1 , 0 , MatrixOrder -1 , 0 , MatrixOrder -1 );

    clock_t t2 = clock(); //結束計時
    cout<<"time: "<<t2-t1<<endl; 
    
    system("pause");

    return 0;  
}  

  因爲task是openmp 3.0版本支持的特性,尚不支持VS2010,2013,2015,支持的編譯器包括GCC,PGI,INTEL等。

      程序大體框架與前面並行算法區別不大,只是將計算的矩陣大小約定爲512,大於512的矩陣就分塊,直到小於512.具體大小可根據實際狀況而定。

 

五  小結

  本文首先實現基於串行算法的高階矩陣相乘和基於OpenMP的並行算法的高階矩陣相乘。接着,對比了128,256,512,1024,2049階數下,兩種算法的耗費時間,並經過表格和曲線圖的形式直觀表現時間的差異,發現,兩種算法並不是隨着階數的增大,加速比一直增大,具體緣由應該和本機運行環境有關。最後,根據矩陣分塊能有效減小數據加載進內存次數,完成了矩陣分塊相乘並行算法的代碼。考慮到編譯環境的限制,未及時將結果運行出來。下一步可裝linux虛擬機,使用gcc編譯器得出算法的運行時間,進行進一步的分析對比。

相關文章
相關標籤/搜索