歸併排序屬於比較類非線性時間排序,號稱比較類排序中性能最佳者,在數據中應用中較廣。
歸併排序是分治法(Divide and Conquer)的一個典型的應用。將已有序的子序列合併,獲得徹底有序的序列;即先使每一個子序列有序,再使子序列段間有序。若將兩個有序表合併成一個有序表,稱爲二路歸併。java
設有數列{16,23,100,3,38,128,23}
初始狀態:16,23,100,3,38,128,23
第一次歸併後:{6,23},{3,100},{38,128},{23};
第二次歸併後:{3,6,23,100},{23,38,128};
第三次歸併後:{3,6,23,23,38,100,128}。
完成排序。算法
時間複雜度:O(nlogn),是最好、最壞和平均的時間性能,排序性能不受待排序的數據的混亂程度影響,比較穩定,這也是相對於快排的優點所在。
空間複雜度爲:O(n)。
穩定性:穩定,從上文排序過程當中能夠看出,黑體23一直在前面。 windows
/************************************************ *函數名稱:mergearray *參數:a:待歸併數組;first:開始下標;mid:中間下標; * last:結束下標;temp:臨時數組 *說明:將有二個有序數列a[first...mid]和a[mid...last]合併 *************************************************/
void mergearray(int a[], int first, int mid, int last, int temp[])
{
int i = first, j = mid + 1,k =0;
while (i <= mid && j <= last)
{
if (a[i] <= a[j])
temp[k++] = a[i++];
else
temp[k++] = a[j++];
}
while (i<= mid)
temp[k++] = a[i++];
while (j <= last)
temp[k++] = a[j++];
for (i=0; i < k; i++)
a[first+i] = temp[i];
}
/************************************************ *函數名稱:mergesort *參數:a:待歸併數組;first:開始下標; * last:結束下標;temp:臨時數組 *說明:實現給定數組區間的二路歸併排序 *************************************************/
void mergesort(int a[], int first, int last, int temp[])
{
if (first < last)
{
int mid = (first + last) / 2;
mergesort(a, first, mid, temp); //左邊有序
mergesort(a, mid + 1, last, temp); //右邊有序
mergearray(a, first, mid, last, temp); //再將二個有序數列合併
}
}
本機測試100 * 1024 * 1024 =100M個32bits整型,串行須要15.536s,如下是本機軟硬件參數,爲Linux平臺。
數組
將待排序數組經過偏移量進行邏輯切分爲多塊,將每一個塊傳遞給多個線程調用二路歸併排序函數進行排序。待各個塊內有序後,再合併各個塊整合成有序數列。緩存
線程函數,供建立出來的線程調用。markdown
/*******************************************
*函數名稱:merge_exec
*參數: para指針,用於接收線程下邊,表示第幾個線程
*說明: 調用二路歸併排序
*******************************************/
void* merge_exec(void *para)
{
int threadIndex=*(int*)para;
int blockLen=DataNum/threadNum;
int* temp=new int[blockLen];
int offset=threadIndex*blockLen;
mergesort(randInt,offset,offset+blockLen-1,temp);
}
合併多個已經排好序的塊。代碼以下:多線程
/*********************************************** *函數名稱:mergeBlocks *參數: pDataArray:塊內有序的數組 arrayLen:數組長度 * blockNum:塊數 resultArray:存放排序的結果 *說明: 合併有序的塊 ************************************************/
inline void mergeBlocks(int* const pDataArray,int arrayLen,const int blockNum,int* const resultArray)
{
int blockLen=arrayLen/blockNum;
int blockIndex[blockNum];//各個塊中元素在數組中的下標,VC可能不支持變量做爲數組的長度,解決辦法可以使用宏定義
for(int i=0;i<blockNum;++i)//初始化塊內元素起始下標
{
blockIndex[i]=i*blockLen;
}
int smallest=0;
for(int i=0;i<arrayLen;++i)//掃描全部塊內的全部元素
{
for(int j=0;j<blockNum;++j)//以第一個未掃描完的塊內元素做爲最小數
{
if(blockIndex[j]<(j*blockLen+blockLen))
{
smallest=pDataArray[blockIndex[j]];
break;
}
}
for(int j=0;j<blockNum;++j)//掃描各個塊,尋找最小數
{
if((blockIndex[j]<(j*blockLen+blockLen))&&(pDataArray[blockIndex[j]]<smallest))
{
smallest=pDataArray[blockIndex[j]];
}
}
for(int j=0;j<blockNum;++j)//肯定哪一個塊內元素下標進行自增
{
if((blockIndex[j]<(j*blockLen+blockLen))&&(pDataArray[blockIndex[j]]==smallest))
{
++blockIndex[j];
break;
}
}
resultArray[i]=smallest;//本次循環最小數放入結果數組
}
}
main函數中建立多線程完成並行排序,代碼以下:ide
int main(int argc,char* argv[])
{
int threadBum=8;
int blockNum=threadNum;
struct timeval ts,te;
srand(time(NULL));
for(int i=0;i<DataNum;++i)
{
randInt[i]=rand();
}
pthread_t tid[blockNum],ret[blockNum],threadIndex[blockNum];
//--------Two-way Merge Sort-------
gettimeofday(&ts,NULL);
for(int i = 0; i < threadNum; ++i)
{
threadIndex[i]=i;
ret[i] = pthread_create(&tid[i], NULL,merge_exec,(void *)(threadIndex+i));
if(ret[i] != 0){
cout<<"thread "<<i<<" create error!"<<endl;
break;
}
}
for(int i = 0; i <threadNum; ++i)
{
pthread_join(tid[i], NULL);
}
mergeBlocks(randInt,DataNum,threadNum,resultInt);
gettimeofday(&te,NULL);
cout<<"MergeSort time: "<<(te.tv_sec-ts.tv_sec)*1000+(te.tv_usec-ts.tv_usec)/1000<<"ms"<<endl;
}
8線程狀況下,測試性能爲4.223s,加速比3.68。針對機器的緩存大小,經過提升緩存命中率,可繼續進行算法優化,提升排序性能。函數
[1]http://baike.baidu.com/link?url=nQp1WY3UpyMMgJ1mr0qL6amEmDZZb2MLtxrwMTVIfFyaQaTAA1LXC5JqnrDqm_teLpX2TwCpKMdoPLXQ0jHrCa#6
[2]http://blog.csdn.net/morewindows/article/details/6678165性能