矩陣乘法的MPI並行計算

一、問題描述

矩陣乘法問題描述以下:ios

  給定矩陣A和B,其中A是m*p大小矩陣,B是p*n大小的矩陣。求C = A*B。算法

求解這個問題最簡單的算法是遍歷A的行和B的列,求得C的相應元素,時間複雜度O(mnp),空間複雜度O(1)。編程

// 矩陣乘法的C++實現
for(int i=0; i<m; i++){
    for(int j=0; j<n; j++){
        float temp = 0.0;
        for(int k=0; k<p; k++){
            temp += A[i*p + k] * B[k*n + j];
        }
        C[i*n + j] = temp;
    }
}

二、最簡單的並行方案

要改進上述算法爲並行算法,須要瞭解到C++ MPI編程的特色:服務器

  a. 各個進程之間不能有依賴。這是由於各個進程能夠以任意的時間順序執行。分佈式

  b. 數據是分佈式存儲的。也就是說,每一個進程有本身獨立的數據備份。spa

有了這兩點認識後,一種最簡單的並行方案就出來了:(假設開啓np個進程)code

  (1). 首先將矩陣A和C按行分爲np塊;blog

  (2). 進程號爲 id 的進程讀取A的第 id 個分塊和B;three

  (3). 進程號爲 id 的進程求解相應的C的第 id 個分塊。進程

 

代碼以下:

/* filename: matMultiplyWithMPI.cpp
* parallel matrix multiplication with MPI * C(m,n) = A(m,p) * B(p,n) * input: three parameters - m, p, n
* @copyright: fengfu-chris
*/ #include<iostream> #include<mpi.h> #include<math.h> #include<stdlib.h>

void initMatrixWithRV(float *A, int rows, int cols);
void matMultiplyWithSingleThread(float *A, float *B, float *matResult, int m, int p, int n); int main(int argc, char** argv) { int m = atoi(argv[1]); int p = atoi(argv[2]); int n = atoi(argv[3]); float *A, *B, *C; float *bA, *bC; int myrank, numprocs; MPI_Status status; MPI_Init(&argc, &argv); // 並行開始 MPI_Comm_size(MPI_COMM_WORLD, &numprocs); MPI_Comm_rank(MPI_COMM_WORLD, &myrank); int bm = m / numprocs; bA = new float[bm * p]; B = new float[p * n]; bC = new float[bm * n]; if(myrank == 0){ A = new float[m * p]; C = new float[m * n]; initMatrixWithRV(A, m, p); initMatrixWithRV(B, p, n); } MPI_Barrier(MPI_COMM_WORLD);
  /* step 1: 數據分配 */ MPI_Scatter(A, bm
* p, MPI_FLOAT, bA, bm *p, MPI_FLOAT, 0, MPI_COMM_WORLD); MPI_Bcast(B, p * n, MPI_FLOAT, 0, MPI_COMM_WORLD);
  /* step 2: 並行計算C的各個分塊 */ matMultiplyWithSingleThread(bA, B, bC, bm, p, n); MPI_Barrier(MPI_COMM_WORLD);
  /* step 3: 彙總結果 */ MPI_Gather(bC, bm
* n, MPI_FLOAT, C, bm * n, MPI_FLOAT, 0, MPI_COMM_WORLD);
  /* step 3-1: 解決歷史遺留問題(多餘的分塊) */
int remainRowsStartId = bm * numprocs; if(myrank == 0 && remainRowsStartId < m){ int remainRows = m - remainRowsStartId; matMultiplyWithSingleThread(A + remainRowsStartId * p, B, C + remainRowsStartId * n, remainRows, p, n); } delete[] bA; delete[] B; delete[] bC; if(myrank == 0){ delete[] A; delete[] C; } MPI_Finalize(); // 並行結束 return 0; } void initMatrixWithRV(float *A, int rows, int cols) { srand((unsigned)time(NULL)); for(int i = 0; i < rows*cols; i++){ A[i] = (float)rand() / RAND_MAX; } } void matMultiplyWithSingleThread(float *A, float *B, float *matResult, int m, int p, int n) { for(int i=0; i<m; i++){ for(int j=0; j<n; j++){ float temp = 0; for(int k=0; k<p; k++){ temp += A[i*p+k] * B[k*n + j]; } matResult[i*n+j] = temp; } } }

編譯:

$mpigxx matMultiplyWithMPI.cpp -o matMultiplyWithMPI

運行:

$mpirun -np 8 matMultiplyWithMPI 3000 2000 4000

這裏假設m = 3000, p = 2000, n = 4000。另外,開啓的進程數爲8個。 np的個數能夠大於CPU的個數。

通常來說,只有當矩陣大小大於5000的量級時,開啓幾十上百個進程的威力才能凸顯出來。尤爲是當矩陣量級達到萬維以上時,串行或是少數幾個進程並行的矩陣乘法將變得特別耗時。

三、改進的並行方案:內存考慮

上面的並行方案有個很大的缺陷,那就是 B 的備份數和開啓的進程數一致。這對於內存不是很充裕或矩陣很大的時候,會致使災難!例如,假設 B 是10000*10000維的,用double類型存儲大概佔700M左右的內存。當開啓的進程數達到128個時,單是 B 的備份佔據的內存開銷將達到 128 * 700 M = 90G。 這將耗掉巨大的內存!

有什麼改進的方案呢?

必須瞭解MPI的第三個特色:

  c. 進程之間能夠很方便地通訊,而且支持多種通訊方案。

這樣,就能夠把 B 也同時分佈式的存儲到各個進程對應的內存中,而後利用進程之間的通訊來輪換各個 B 的分塊,從而達到減少內存開銷的效果。固然,幾乎和全部的程序同樣,離不開時間與空間的trade-off。因此,這種方法雖然節省了內存,卻要消耗大量的時間在進程之間的通訊上。

下面給出改進的並行方案:

  (1). 將A和C按行分爲np塊,將B按列分爲np塊(B能夠按列存儲);

  (2). 進程號爲 id 的進程讀取 A 和 B 的第id個分塊;

  (3). 循環np次:

    <1>. 各個進程用各自的A、B分塊求解C的分塊;

    <2>. 輪換B的分塊(例如:id 號進程發送本身當前的B的分塊到 id+1號進程)

 

代碼以下:

/* filename: matMultiplyWithMPI_updated.cpp
 * parallel matrix multiplication with MPI: updated
 * C(m,n) = A(m,p) * B(p,n)
 * input: three parameters - m, p, n
 * @copyright: fengfu-chris
 */
#include<iostream>
#include<mpi.h>
#include<math.h>
#include<stdlib.h>

void initMatrixWithRV(float *A, int rows, int cols);
void copyMatrix(float *A, float *A_copy, int rows, int cols);
// A: m*p, B: p*n  !!! note that B is stored by column first
void matMultiplyWithTransposedB(float *A, float *B, float *matResult, int m, int n, int p);

int main(int argc, char** argv)
{
  int m = atoi(argv[1]);
  int n = atoi(argv[2]);
  int p = atoi(argv[3]);
    
   float *A, *B, *C;
   float *bA, *bB_send, *bB_recv, *bC, *bC_send;
  int myrank, numprocs;

    MPI_Status status;
  
    MPI_Init(&argc, &argv);  // 並行開始
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs); 
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank); 
   
   int bm = m / numprocs;
    int bn = n / numprocs;

    bA = new float[bm * p];
    bB_send = new float[bn * p];
    bB_recv = new float[bn * p];
    bC = new float[bm * bn];
    bC_send = new float[bm * n];
    
    if(myrank == 0){
        A = new float[m * p];
        B = new float[n * p];
        C = new float[m * n];
        
        initMatrixWithRV(A, m, p);
        initMatrixWithRV(B, n, p);
    }

    MPI_Barrier(MPI_COMM_WORLD);
    MPI_Scatter(A, bm * p, MPI_FLOAT, bA, bm * p, MPI_FLOAT, 0, MPI_COMM_WORLD);
    MPI_Scatter(B, bn * p, MPI_FLOAT, bB_recv, bn * p, MPI_FLOAT, 0, MPI_COMM_WORLD);

    int sendTo = (myrank + 1) % numprocs;
    int recvFrom = (myrank - 1 + numprocs) % numprocs;
        
    int circle = 0;  
    do{
        matMultiplyWithTransposedB(bA, bB_recv, bC, bm, bn, p);
        int blocks_col = (myrank - circle + numprocs) % numprocs;
        for(int i=0; i<bm; i++){
            for(int j=0; j<bn; j++){
                bC_send[i*n + blocks_col*bn + j] = bC[i*bn + j];
            }
        }

        if(myrank % 2 == 0){
            copyMatrix(bB_recv, bB_send, bn, p);
            MPI_Ssend(bB_send, bn*p, MPI_FLOAT, sendTo, circle, MPI_COMM_WORLD);
            MPI_Recv(bB_recv, bn*p, MPI_FLOAT, recvFrom, circle, MPI_COMM_WORLD, &status);
        }else{
            MPI_Recv(bB_recv, bn*p, MPI_FLOAT, recvFrom, circle, MPI_COMM_WORLD, &status); 
            MPI_Ssend(bB_send, bn*p, MPI_FLOAT, sendTo, circle, MPI_COMM_WORLD);
            copyMatrix(bB_recv, bB_send, bn, p);    
        }
        
        circle++;
    }while(circle < numprocs);

  MPI_Barrier(MPI_COMM_WORLD);
    MPI_Gather(bC_send, bm * n, MPI_FLOAT, C, bm * n, MPI_FLOAT, 0, MPI_COMM_WORLD);
    
    if(myrank == 0){
        int remainAStartId = bm * numprocs;
        int remainBStartId = bn * numprocs;
        
        for(int i=remainAStartId; i<m; i++){
            for(int j=0; j<n; j++){
                float temp=0;
                for(int k=0; k<p; k++){
                    temp += A[i*p + k] * B[j*p +k];
                }
                C[i*p + j] = temp;
            }
        }
        
        for(int i=0; i<remainAStartId; i++){
            for(int j=remainBStartId; j<n; j++){
                float temp = 0;
                for(int k=0; k<p; k++){
                    temp += A[i*p + k] * B[j*p +k];
                }
                C[i*p + j] = temp;
            }
        }
    }
    
    delete[] bA;
    delete[] bB_send;
    delete[] bB_recv;
    delete[] bC;
    delete[] bC_send;
    
    if(myrank == 0){
        delete[] A;
        delete[] B;
        delete[] C;
    }
    
    MPI_Finalize(); // 並行結束

    return 0;
}

void initMatrixWithRV(float *A, int rows, int cols)
{
    srand((unsigned)time(NULL));
    for(int i = 0; i < rows*cols; i++){
        A[i] = (float)rand() / RAND_MAX;
    }
}
void copyMatrix(float *A, float *A_copy, int rows, int cols)
{
    for(int i=0; i<rows*cols; i++){
        A_copy[i] = A[i];
    }
}

void matMultiplyWithTransposedB(float *A, float *B, float *matResult, int m, int p, int n)
{
    for(int i=0; i<m; i++){
        for(int j=0; j<n; j++){
            float temp = 0;
            for(int k=0; k<p; k++){
                temp += A[i*p+k] * B[j*p+k];
            }
            matResult[i*n+j] = temp;
        }
    }
}

 

這裏最須要注意的地方就是B的輪換。 有兩點須要注意:

   (1) 防阻塞機制。這裏採用奇偶原則:偶數號進程先發送,再接收;奇數號進程則相反。這樣能夠避免全部進程同時發送形成死鎖的狀況;

 (2) 數據備份。發送和接收的信息存儲在不一樣的矩陣中,這樣保證原來的信息不會被覆蓋。

 

這種方法的優勢是顯而易見的。對於足夠牛的服務器/計算機集羣,開啓成百上千個進程來並行徹底不是問題。

 

並行不易,且行且珍惜!

相關文章
相關標籤/搜索