二路歸併排序簡介及其並行化 分類: 算法與數據結構 2015-05-08 17:46 112人閱讀 評論(0) 收藏

1、歸併排序簡介

1.算法思想

歸併排序屬於比較類非線性時間排序,號稱比較類排序中性能最佳者,在數據中應用中較廣。
歸併排序是分治法(Divide and Conquer)的一個典型的應用。將已有序的子序列合併,獲得徹底有序的序列;即先使每一個子序列有序,再使子序列段間有序。若將兩個有序表合併成一個有序表,稱爲二路歸併。java

2.二路歸併排序過程描述

設有數列{16,23,100,3,38,128,23}
初始狀態:16,23,100,3,38,128,23
第一次歸併後:{6,23},{3,100},{38,128},{23};
第二次歸併後:{3,6,23,100},{23,38,128};
第三次歸併後:{3,6,23,23,38,100,128}。
完成排序。算法

3.二路歸併複雜度分析

時間複雜度:O(nlogn),是最好、最壞和平均的時間性能,排序性能不受待排序的數據的混亂程度影響,比較穩定,這也是相對於快排的優點所在。
空間複雜度爲:O(n)。
穩定性:穩定,從上文排序過程當中能夠看出,黑體23一直在前面。 windows

2、二路歸併實現

1.C/C++串行實現

/************************************************ *函數名稱:mergearray *參數:a:待歸併數組;first:開始下標;mid:中間下標; * last:結束下標;temp:臨時數組 *說明:將有二個有序數列a[first...mid]和a[mid...last]合併 *************************************************/
void mergearray(int a[], int first, int mid, int last, int temp[])  
{  
    int i = first, j = mid + 1,k =0;    
    while (i <= mid && j <= last)  
    {  
        if (a[i] <= a[j])  
            temp[k++] = a[i++];  
        else  
            temp[k++] = a[j++];  
    }    
    while (i<= mid)  
        temp[k++] = a[i++];  

    while (j <= last)  
        temp[k++] = a[j++];   
    for (i=0; i < k; i++)  
        a[first+i] = temp[i];  
}  
/************************************************ *函數名稱:mergesort *參數:a:待歸併數組;first:開始下標; * last:結束下標;temp:臨時數組 *說明:實現給定數組區間的二路歸併排序 *************************************************/
void mergesort(int a[], int first, int last, int temp[])  
{  

    if (first < last)  
    {  
        int mid = (first + last) / 2;       
        mergesort(a, first, mid, temp);    //左邊有序 
        mergesort(a, mid + 1, last, temp); //右邊有序 
        mergearray(a, first, mid, last, temp); //再將二個有序數列合併 
    }  
}

本機測試100 * 1024 * 1024 =100M個32bits整型,串行須要15.536s,如下是本機軟硬件參數,爲Linux平臺。
這裏寫圖片描述 數組

2.C/C++並行實現

2.1並行思路

將待排序數組經過偏移量進行邏輯切分爲多塊,將每一個塊傳遞給多個線程調用二路歸併排序函數進行排序。待各個塊內有序後,再合併各個塊整合成有序數列。緩存

2.2並行代碼

線程函數,供建立出來的線程調用。markdown

/*******************************************
*函數名稱:merge_exec
*參數:   para指針,用於接收線程下邊,表示第幾個線程
*說明:   調用二路歸併排序
*******************************************/
void* merge_exec(void *para)
{
  int threadIndex=*(int*)para; 
  int blockLen=DataNum/threadNum;
  int* temp=new int[blockLen];
  int offset=threadIndex*blockLen;
  mergesort(randInt,offset,offset+blockLen-1,temp);
}

合併多個已經排好序的塊。代碼以下:多線程

/*********************************************** *函數名稱:mergeBlocks *參數: pDataArray:塊內有序的數組 arrayLen:數組長度 * blockNum:塊數 resultArray:存放排序的結果 *說明: 合併有序的塊 ************************************************/
inline void mergeBlocks(int* const pDataArray,int arrayLen,const int blockNum,int* const resultArray)
{
    int blockLen=arrayLen/blockNum;
    int blockIndex[blockNum];//各個塊中元素在數組中的下標,VC可能不支持變量做爲數組的長度,解決辦法可以使用宏定義
    for(int i=0;i<blockNum;++i)//初始化塊內元素起始下標
    {
        blockIndex[i]=i*blockLen;
    }
    int smallest=0;
    for(int i=0;i<arrayLen;++i)//掃描全部塊內的全部元素
    {  
      for(int j=0;j<blockNum;++j)//以第一個未掃描完的塊內元素做爲最小數
      {
       if(blockIndex[j]<(j*blockLen+blockLen))
       {
        smallest=pDataArray[blockIndex[j]];
        break;
       }
      }
      for(int j=0;j<blockNum;++j)//掃描各個塊,尋找最小數
      {
        if((blockIndex[j]<(j*blockLen+blockLen))&&(pDataArray[blockIndex[j]]<smallest))
        {
          smallest=pDataArray[blockIndex[j]];
        }
      }
      for(int j=0;j<blockNum;++j)//肯定哪一個塊內元素下標進行自增
      {
        if((blockIndex[j]<(j*blockLen+blockLen))&&(pDataArray[blockIndex[j]]==smallest))
        {
          ++blockIndex[j];
          break;
        }
      }
      resultArray[i]=smallest;//本次循環最小數放入結果數組
    }
}

main函數中建立多線程完成並行排序,代碼以下:ide

int main(int argc,char* argv[])
{
    int threadBum=8;
    int blockNum=threadNum;
    struct timeval ts,te;
    srand(time(NULL));
    for(int i=0;i<DataNum;++i)
    {
      randInt[i]=rand();
    }
    pthread_t tid[blockNum],ret[blockNum],threadIndex[blockNum];

    //--------Two-way Merge Sort-------
    gettimeofday(&ts,NULL);
    for(int i = 0; i < threadNum; ++i)
    {
        threadIndex[i]=i;
        ret[i] = pthread_create(&tid[i], NULL,merge_exec,(void *)(threadIndex+i));
        if(ret[i] != 0){
             cout<<"thread "<<i<<" create error!"<<endl;
             break;
         }
    }
    for(int i = 0; i <threadNum; ++i)
    {
         pthread_join(tid[i], NULL);
    }
    mergeBlocks(randInt,DataNum,threadNum,resultInt);
    gettimeofday(&te,NULL);
    cout<<"MergeSort time: "<<(te.tv_sec-ts.tv_sec)*1000+(te.tv_usec-ts.tv_usec)/1000<<"ms"<<endl;
    }

8線程狀況下,測試性能爲4.223s,加速比3.68。針對機器的緩存大小,經過提升緩存命中率,可繼續進行算法優化,提升排序性能。函數

參考文獻

[1]http://baike.baidu.com/link?url=nQp1WY3UpyMMgJ1mr0qL6amEmDZZb2MLtxrwMTVIfFyaQaTAA1LXC5JqnrDqm_teLpX2TwCpKMdoPLXQ0jHrCa#6
[2]http://blog.csdn.net/morewindows/article/details/6678165性能

相關文章
相關標籤/搜索