這一章講了MPI非阻塞通訊的原理和一些函數接口,最後再用非阻塞通訊方式實現Jacobi迭代,記錄學習中的一些知識。程序員
(1)阻塞通訊與非阻塞通訊緩存
阻塞通訊調用時,整個程序只能執行通訊相關的內容,而沒法執行計算相關的內容;函數
非阻塞調用的初衷是儘可能讓通訊和計算重疊進行,提升程序總體執行效率。學習
總體對比見下圖:優化
(2)非阻塞通訊的要素spa
非阻塞通訊調用返回意味着通訊開始啓動;而非阻塞通訊完成則須要調用其餘的接口來查詢。翻譯
要素1:非阻塞通訊的調用接口設計
要素2:非阻塞通訊的完成查詢接口code
理想的非阻塞通訊設計應該以下:對象
非阻塞通訊的 發送 和 接受 過程都須要同時具有以上兩個要素,「調用+完成」
「調用」按照通訊方式的不一樣(標準、緩存、同步、就緒),有各類函數接口,具體用到哪一個就查手冊的性質。
這裏「完成」是重點,由於程序員須要知道非阻塞調用是否執行完成了,來作下一步的操做。
MPI爲「完成」定義了一個內部變量MPI_Request request,每一個request與一個在非阻塞調用發生時與該調用發生關聯(這裏的調用包括髮送和接收)。
「完成」不區分通訊方式的不一樣,統一用MPI_Wait系列函數來完成,這裏對MPI_Wait函數作一點說明:
1)MPI_Wait(MPI_Request *request),均等着request執行完畢了,再往下進行
2)對於非重複非阻塞通訊,MPI_Wait系列函數調用的返回,還意味着request對象被釋放了,程序員不用再顯式釋放request變量。
3)對於重複非阻塞通訊,MPI_Wait系列函數調用的返回,意味着將於request對象關聯的非阻塞通訊處於不激活狀態,並不釋放request
關於2)3)看後面的代碼示例就瞭解了
(3)非阻塞調用實現Jacobi迭代
有了非阻塞調用的技術,能夠再將Jacobi迭代的程序效率提高,其整體的實現思路以下:
1)先計算Jacobi迭代下次計算所須要的邊界數據,這些數據與每一個計算節點中的計算無關,能夠先獨立計算好
2)啓動非阻塞通訊,將邊界數據在進程間傳遞
3)計算每一個計算節點能夠獨立計算的部分;此時,2)中啓動的非阻塞通訊也在進行中,這時通訊和計算就重疊了
4)等着非阻塞通訊完成,再進行下一次迭代
再回顧一下以前用阻塞通訊實現Jacobi迭代的思路:
1)先傳遞邊界數據
2)等着數據都傳遞完了,再進行計算
3)等着計算完成了,進行下一次迭代
能夠看到阻塞通訊中實現Jacobi迭代的程序中,在同一計算節點下,通訊和計算是分別進行的,效率不如非阻塞通訊。
總結起來:
「單機程序 → 阻塞通訊MPI程序」 實現單機計算到多機計算,用並行代替串行提升效率。
「阻塞MPI程序 → 非阻塞MPI程序」 不只將多臺機器之間的並行,並且還能將每臺機器的通訊與計算過程並行,實現更高效的並行。
(4)非阻塞通訊實現Jacobi迭代的代碼
書上的源代碼是Fortan的,數據存儲是列優先的,矩陣按列分塊;下面的代碼是我翻譯的C的代碼,數據存儲是行優先的,矩陣按行分塊。
1 #include "mpi.h" 2 #include <stdio.h> 3 #include <stdlib.h> 4 5 #define N 8 6 #define SIZE N/4 7 #define T 2 8 9 void print_matrix(int myid, float myRows[][N]); 10 11 int main(int argc, char *argv[]) 12 { 13 float matrix1[SIZE+2][N], matrix2[SIZE+2][N]; 14 int myid; 15 MPI_Status status[4]; 16 MPI_Request request[4]; 17 18 MPI_Init(&argc, &argv); 19 MPI_Comm_rank(MPI_COMM_WORLD, &myid); 20 21 // 初始化 22 int i,j; 23 for(i=0; i<SIZE+2; i++) 24 { 25 for(j=0; j<N; j++) 26 { 27 matrix1[i][j] = matrix2[i][j] = 0; 28 } 29 } 30 if(0==myid) // 按行劃分 上面第一分塊矩陣 上邊界 31 { 32 for(j=0; j<N; j++) matrix1[1][j] = matrix2[1][j] = N; 33 } 34 if (3==myid) { // 按行劃分 最下面一分塊矩陣 下邊界 35 for(j=0; j<N; j++) matrix1[SIZE][j] = matrix2[SIZE][j] = N; 36 } 37 for(i=1; i<SIZE+1; i++) // 每一個矩陣的兩側邊界 38 { 39 matrix1[i][0] = matrix1[i][N-1] = matrix2[i][0] = matrix2[i][N-1] = N; 40 } 41 // 引入虛擬進程 並計算每一個進程上下相鄰進程 42 int up_proc_id = myid==0 ? MPI_PROC_NULL : myid-1; 43 int down_proc_id = myid==3 ? MPI_PROC_NULL : myid+1; 44 // jacobi迭代過程 45 int t,row,col; 46 for(t=0; t<T; t++) 47 { 48 // 1 計算邊界數據 49 if(0==myid) // 最上的矩陣塊 50 { 51 for (col=1; col<N-1; col++) 52 { 53 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25; 54 } 55 } 56 else if (3==myid) { // 最下的矩陣塊 57 for (col=1; col<N-1; col++) 58 { 59 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25; 60 } 61 } 62 else { 63 for(col=1; col<N-1; col++) // 中間的矩陣塊 64 { 65 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25; 66 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25; 67 } 68 } 69 // 2 利用非阻塞函數傳遞邊界數據 爲下一次計算作準備 70 int tag1 = 1, tag2 = 2; 71 MPI_Isend(&matrix2[1][0], N, MPI_FLOAT, up_proc_id, tag1, MPI_COMM_WORLD, &request[0]); 72 MPI_Isend(&matrix2[SIZE][0], N, MPI_FLOAT, down_proc_id, tag2, MPI_COMM_WORLD, &request[1]); 73 MPI_Irecv(&matrix1[SIZE+1][0], N, MPI_FLOAT, down_proc_id, tag1, MPI_COMM_WORLD, &request[2]); 74 MPI_Irecv(&matrix1[0][0], N, MPI_FLOAT, up_proc_id, tag2, MPI_COMM_WORLD, &request[3]); 75 // 3 計算中間數據 76 int begin_row = 0==myid ? 2 : 1; 77 int end_row = 3==myid ? (SIZE-1) : SIZE; 78 for (row=begin_row; row<end_row; row++) 79 { 80 for (col=1; col<N-1; col++) 81 { 82 matrix2[row][col] = (matrix1[row][col-1]+matrix1[row][col+1]+matrix1[row+1][col]+matrix1[row-1][col])*0.25; 83 } 84 } 85 // 4 更新矩陣 並等待各個進程間數據傳遞完畢 86 for (row=begin_row; row<=end_row; row++) 87 { 88 for (col=1; col<N-1; col++) 89 { 90 matrix1[row][col] = matrix2[row][col]; 91 } 92 } 93 MPI_Waitall(4, &request[0], &status[0]); 94 } 95 MPI_Barrier(MPI_COMM_WORLD); 96 print_matrix(myid, matrix1); 97 MPI_Finalize(); 98 } 99 100 101 void print_matrix(int myid, float myRows[][N]) 102 { 103 int i,j; 104 int buf[1]; 105 MPI_Status status; 106 buf[0] = 1; 107 if ( myid>0 ) { 108 MPI_Recv(buf, 1, MPI_INT, myid-1, 0, MPI_COMM_WORLD, &status); 109 } 110 printf("Result in process %d:\n", myid); 111 for ( i = 0; i<SIZE+2; i++) 112 { 113 for ( j = 0; j<N; j++) 114 printf("%1.3f\t", myRows[i][j]); 115 printf("\n"); 116 } 117 if ( myid<3 ) { 118 MPI_Send(buf, 1, MPI_INT, myid+1, 0, MPI_COMM_WORLD); 119 } 120 MPI_Barrier(MPI_COMM_WORLD); 121 }
程序的執行結果以下:
上述程序設計的邏輯以下:
1)各個分塊矩陣的邊界數據是能夠須要通訊交換的
2)先計算邊界數據,儘可能把須要通訊交換並且又相對獨立的數據先計算出來
3)用非阻塞通訊傳遞分塊矩陣的邊界數據;同時每一個節點內計算內部的數據;計算與通訊並行
4)等到每一個計算節點的2個發送、2個接收,總共4個非阻塞調用都完成了,進行下一輪迭代
(5)重複非阻塞通訊
上面實現Jacobi迭代的代碼中,以進程1和進程2爲例:
1)迭代一輪兩者之間就須要互相通訊一次
2)每次互相通訊,隨着MPI_Wait的執行,request通訊對象釋放,兩個進程通訊徹底被切斷了
3)兩個進程之間每次通訊,有一些通訊鏈接操做都是重複的,最好不用每次通訊都從新執行這些鏈接操做,以此提升效率
4)所以,比上面實現jacobi迭代更優化一些的作法是:每次不徹底掐斷兩個進程的非阻塞通訊,保持那些基礎的通用的操做,每次迭代只須要更新須要傳輸的數據,再激活兩個進程之間的非阻塞通訊
依照上面的思路,MPI給出了重複非阻塞的通訊調用實現。用重複非阻塞的通訊再實現一次Jacobi迭代,代碼以下:
1 #include "mpi.h" 2 #include <stdio.h> 3 #include <stdlib.h> 4 5 #define N 8 6 #define SIZE N/4 7 #define T 2 8 9 void print_matrix(int myid, float myRows[][N]); 10 11 int main(int argc, char *argv[]) 12 { 13 float matrix1[SIZE+2][N], matrix2[SIZE+2][N]; 14 int myid; 15 MPI_Status status[4]; 16 MPI_Request request[4]; 17 18 MPI_Init(&argc, &argv); 19 MPI_Comm_rank(MPI_COMM_WORLD, &myid); 20 21 // 初始化 22 int i,j; 23 for(i=0; i<SIZE+2; i++) 24 { 25 for(j=0; j<N; j++) 26 { 27 matrix1[i][j] = matrix2[i][j] = 0; 28 } 29 } 30 if(0==myid) // 按行劃分 上面第一分塊矩陣 上邊界 31 { 32 for(j=0; j<N; j++) matrix1[1][j] = matrix2[1][j] = N; 33 } 34 if (3==myid) { // 按行劃分 最下面一分塊矩陣 下邊界 35 for(j=0; j<N; j++) matrix1[SIZE][j] = matrix2[SIZE][j] = N; 36 } 37 for(i=1; i<SIZE+1; i++) // 每一個矩陣的兩側邊界 38 { 39 matrix1[i][0] = matrix1[i][N-1] = matrix2[i][0] = matrix2[i][N-1] = N; 40 } 41 // 引入虛擬進程 並計算每一個進程上下相鄰進程 42 int up_proc_id = myid==0 ? MPI_PROC_NULL : myid-1; 43 int down_proc_id = myid==3 ? MPI_PROC_NULL : myid+1; 44 // 初始化重複非阻塞通訊 45 int tag1 = 1, tag2 = 2; 46 MPI_Send_init(&matrix2[1][0], N, MPI_FLOAT, up_proc_id, tag1, MPI_COMM_WORLD, &request[0]); 47 MPI_Send_init(&matrix2[SIZE][0], N, MPI_FLOAT, down_proc_id, tag2, MPI_COMM_WORLD, &request[1]); 48 MPI_Recv_init(&matrix1[SIZE+1][0], N, MPI_FLOAT, down_proc_id, tag1, MPI_COMM_WORLD, &request[2]); 49 MPI_Recv_init(&matrix1[0][0], N, MPI_FLOAT, up_proc_id, tag2, MPI_COMM_WORLD, &request[3]); 50 // jacobi迭代過程 51 int t,row,col; 52 for(t=0; t<T; t++) 53 { 54 // 1 計算邊界數據 55 if(0==myid) // 最上的矩陣塊 56 { 57 for (col=1; col<N-1; col++) 58 { 59 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25; 60 } 61 } 62 else if (3==myid) { // 最下的矩陣塊 63 for (col=1; col<N-1; col++) 64 { 65 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25; 66 } 67 } 68 else { 69 for(col=1; col<N-1; col++) // 中間的矩陣塊 70 { 71 matrix2[SIZE][col] = (matrix1[SIZE][col-1]+matrix1[SIZE][col+1]+matrix1[SIZE+1][col]+matrix1[SIZE-1][col])*0.25; 72 matrix2[1][col] = (matrix1[1][col-1]+matrix1[1][col+1]+matrix1[2][col]+matrix1[0][col])*0.25; 73 } 74 } 75 // 2 啓動重複非阻塞通訊 76 MPI_Startall(4, &request[0]); 77 // 3 計算中間數據 78 int begin_row = 0==myid ? 2 : 1; 79 int end_row = 3==myid ? (SIZE-1) : SIZE; 80 for (row=begin_row; row<end_row; row++) 81 { 82 for (col=1; col<N-1; col++) 83 { 84 matrix2[row][col] = (matrix1[row][col-1]+matrix1[row][col+1]+matrix1[row+1][col]+matrix1[row-1][col])*0.25; 85 } 86 } 87 // 4 更新矩陣 並等待各個進程間數據傳遞完畢 88 for (row=begin_row; row<=end_row; row++) 89 { 90 for (col=1; col<N-1; col++) 91 { 92 matrix1[row][col] = matrix2[row][col]; 93 } 94 } 95 MPI_Waitall(4, &request[0], &status[0]); 96 } 97 int n; 98 for(n = 0; n < 4; n++) MPI_Request_free(&request[n]); // 釋放非阻塞通訊對象 99 MPI_Barrier(MPI_COMM_WORLD); 100 print_matrix(myid, matrix1); 101 MPI_Finalize(); 102 } 103 104 105 void print_matrix(int myid, float myRows[][N]) 106 { 107 int i,j; 108 int buf[1]; 109 MPI_Status status; 110 buf[0] = 1; 111 if ( myid>0 ) { 112 MPI_Recv(buf, 1, MPI_INT, myid-1, 0, MPI_COMM_WORLD, &status); 113 } 114 printf("Result in process %d:\n", myid); 115 for ( i = 0; i<SIZE+2; i++) 116 { 117 for ( j = 0; j<N; j++) 118 printf("%1.3f\t", myRows[i][j]); 119 printf("\n"); 120 } 121 if ( myid<3 ) { 122 MPI_Send(buf, 1, MPI_INT, myid+1, 0, MPI_COMM_WORLD); 123 } 124 MPI_Barrier(MPI_COMM_WORLD); 125 }
1)上述的代碼不難理解,能夠查閱相關函數手冊;最核心的思想就是,若是兩個進程有屢次迭代通訊,就能夠用這種重複非阻塞的通訊函數。
2)另外,對於重複非阻塞通訊的調用,在調用MPI_Wait系列函數時,不會釋放與通訊關聯的request函數(即上面說的保持一些共性的通訊設定操做,不徹底掐斷),所以,須要在line98中,程序員手動釋放非則色通訊操做對象