如何給100億個數字排序

轉自:http://netsmell.com/post/how-sort-10-billion-data.html?ref=myreadhtml

海量數據處理/外部歸併排序 - 分治.cpppnode

 

今天要給100億個數字排序,100億個 int 型數字放在文件裏面大概有 37.2GB,很是大,內存一次裝不下了。那麼確定是要拆分紅小的文件一個一個來處理,最終在合併成一個排好序的大文件。c++

實現思路git

1.把這個37GB的大文件,用哈希分紅1000個小文件,每一個小文件平均38MB左右(理想狀況),把100億個數字對1000取模,模出來的結果在0到999之間,每一個結果對應一個文件,因此我這裏取的哈希函數是 h = x % 1000,哈希函數取得」好」,能使衝突減少,結果分佈均勻。github

2.拆分完了以後,獲得一些幾十MB的小文件,那麼就能夠放進內存裏排序了,能夠用快速排序,歸併排序,堆排序等等。算法

3.1000個小文件內部排好序以後,就要把這些內部有序的小文件,合併成一個大的文件,能夠用二叉堆來作1000路合併的操做,每一個小文件是一路,合併後的大文件仍然有序。函數

首先遍歷1000個文件,每一個文件裏面取第一個數字,組成 (數字, 文件號) 這樣的組合加入到堆裏(假設是從小到大排序,用小頂堆),遍歷完後堆裏有1000個 (數字,文件號) 這樣的元素
而後不斷從堆頂拿元素出來,每拿出一個元素,把它的文件號讀取出來,而後去對應的文件裏,加一個元素進入堆,直到那個文件被讀取完。拿出來的元素固然追加到最終結果的文件裏。
按照上面的操做,直到堆被取空了,此時最終結果文件裏的所有數字就是有序的了。
最後我用c++寫了個實驗程序,具體代碼在這裏能夠看到。post

如何拆分大文件?spa

一個32G的大文件,用fopen()打開不會所有加載到內存的,而後for循環遍歷啊,把每一個數字對1000取模,會獲得0到999種結果,而後每種結果在寫入到新的文件中,就拆分了code


 
 

 

// 對 2 億個數字進行排序, 約 10 G 的文件, 每一個數字 int 能表示
  3 // 算法流程
  4 // 將 10 G 的文件散列到 300 個文件中, 每一個文件大約 35 MB
  5 // 對 35 MB 的小文件內部排序, 或者分發到多臺計算機中, 並行處理 MapReduce
  6 // 最後使用最小堆, 進行 300 路歸併排序, 合成大文件
  7 // 再寫一個算法判斷 2 億個數字是否有序
  8  
  9 #include <stdio.h>
  10 #include <stdlib.h>
  11 #include <time.h>
  12 #include <io.h>
  13 #include <queue>
  14  
  15 #define FILE_NUM 300 // 哈希文件數
  16 #define HASH(a) (a % FILE_NUM)
  17  
  18 int num = 6000000; // 2 億個數字, 手動改
  19 char path[20] = "c:\\data.dat"; // 待排文件
  20 char result[20] = "c:\\result.dat"; // 排序後文件
  21 char tmpdir[100] = "c:\\hashfile"; // 臨時目錄
  22  
  23 // 隨機生成 2 億個數字
  24 int write_file(void)
  25 {
  26   FILE *out = NULL;
  27   int i;
  28  
  29   printf("\n正在生成 %d 個數字...\n\n", num);
  30   out = fopen(path, "wt");
  31   if (out == NULL) return 0;
  32  
  33   unsigned int s, e;
  34   e = s = clock();
  35   for (i=0; i<num; i++)
  36   {
  37     e = clock();
  38     if (e - s > 1000) // 計算進度
  39     {
  40       printf("\r處理進度 %0.2f %%\t", (i * 100.0) / num);
  41       s = e;
  42     }
  43     fprintf(out, "%d\n",
  44         (rand() % 31623) * (rand() % 31623));
  45   }
  46   fclose(out);
  47   return 1;
  48 }
  49  
  50 // 對 2 億個數字進行哈希, 分散到子文件中
  51 // 入口參數: path, tmpdir
  52 int map(void)
  53 {
  54   FILE *in = NULL;
  55   FILE *tmp[FILE_NUM + 5];
  56   char hashfile[512]; // 哈希文件地址
  57   int data, add;
  58   int i;
  59  
  60   printf("\r正在哈希 %s\n\n", path);
  61   in = fopen(path, "rt");
  62   if (in == NULL) return 0;
  63   for (i=0; i<FILE_NUM; i++) tmp[i] = NULL;
  64  
  65   // 開始哈希, 核心代碼要儘量的加速
  66   unsigned int s, e;
  67   e = s = clock();
  68   i = 0;
  69   while (fscanf(in, "%d", &data) != EOF)
  70   {
  71     add = HASH(data);
  72     if (tmp[add] == NULL)
  73     {
  74       sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, add);
  75       tmp[add] = fopen(hashfile, "a");
  76     }
  77     fprintf(tmp[add], "%d\n", data);
  78  
  79     i++;
  80     e = clock(); // 計算進度
  81     if (e - s > 1000)
  82     {
  83       printf("\r處理進度 %0.2f %%\t", (i * 100.0) / num);
  84       s = e;
  85     }
  86   }
  87   for (i=0; i<FILE_NUM; i++)
  88   if (tmp[i]) fclose(tmp[i]);
  89   fclose(in);
  90  
  91   return 1;
  92 }
  93  
  94 // 對 300 個文件逐個排序, 採用堆排序 STL 的優先隊列
  95 void calc(void)
  96 {
  97   int fileexist(char *path); // 判斷文件存在
  98   std::priority_queue<int> q; // 堆排序
  99   char hashfile[512];
  100   FILE *fp = NULL;
  101   int i, data;
  102  
  103   // 逐個處理 300 個文件, 或者將這些文件發送到其它計算機中並行處理
  104   for (i=0; i<FILE_NUM; i++)
  105   {
  106     sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, i);
  107     if (fileexist(hashfile))
  108     {
  109       printf("\r正在排序 hash_%d.~tmp\t", i);
  110  
  111       // 小文件從磁盤加入內存中
  112       fp = fopen(hashfile, "rt");
  113       while (fscanf(fp, "%d", &data) != EOF)
  114       {
  115         q.push(data);
  116         // 優先隊列默認是大頂堆, 即降序排序
  117         // 要升序須要重載 () 運算符
  118       }
  119       fclose(fp);
  120  
  121       // 排序後再從內存寫回磁盤
  122       fp = fopen(hashfile, "wt"); // 覆蓋模式寫
  123       while (!q.empty())
  124       {
  125         fprintf(fp, "%d\n", q.top());
  126         q.pop();
  127       }
  128       fclose(fp);
  129     }
  130   }
  131 }
  132  
  133 typedef struct node // 隊列結點
  134 {
  135   int data;
  136   int id; // 哈希文件的編號
  137   bool operator < (const node &a) const
  138   { return data < a.data; }
  139 }node;
  140  
  141 // 將 300 個有序文件合併成一個文件, K 路歸併排序
  142 int reduce(void)
  143 {
  144   int fileexist(char *path);
  145   std::priority_queue<node> q; // 堆排序
  146   FILE *file[FILE_NUM + 5];
  147   FILE *out = NULL;
  148   char hashfile[512];
  149   node tmp, p;
  150   int i, count = 0;
  151  
  152   printf("\r正在合併 %s\n\n", result);
  153   out = fopen(result, "wt");
  154   if (out == NULL) return 0;
  155   for (i=0; i<FILE_NUM; i++) file[i] = NULL;
  156   for (i=0; i<FILE_NUM; i++) // 打開所有哈希文件
  157   {
  158     sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, i);
  159     if (fileexist(hashfile))
  160     {
  161       file[i] = fopen(hashfile, "rt");
  162       fscanf(file[i], "%d", &tmp.data);
  163       tmp.id = i;
  164       q.push(tmp); // 初始化隊列
  165       count++; // 計數器
  166       printf("\r入隊進度 %0.2f %%\t", (count * 100.0) / FILE_NUM);
  167     }
  168   }
  169   unsigned int s, e;
  170   e = s = clock();
  171   while (!q.empty()) // 開始 K 路歸併
  172   {
  173     tmp = q.top();
  174     q.pop();
  175     // 將堆頂的元素寫回磁盤, 再從磁盤中拿一個到內存
  176     fprintf(out, "%d\n", tmp.data);
  177     if (fscanf(file[tmp.id], "%d", &p.data) != EOF)
  178     {
  179       p.id = tmp.id;
  180       q.push(p);
  181       count++;
  182     }
  183  
  184     e = clock(); // 計算進度
  185     if (e - s > 1000)
  186     {
  187       printf("\r處理進度 %0.2f %%\t", (count * 100.0) / num);
  188       s = e;
  189     }
  190   }
  191   for (i=0; i<FILE_NUM; i++)
  192   if (file[i]) fclose(file[i]);
  193   fclose(out);
  194  
  195   return 1;
  196 }
  197  
  198 int check(void) // 檢查是否降序排序
  199 {
  200   FILE *in = NULL;
  201   int max = 0x7FFFFFFF;
  202   int data;
  203   int count = 0;
  204  
  205   printf("\r正在檢查文件正確性...\n\n");
  206   in = fopen(result, "rt");
  207   if (in == NULL) return 0;
  208  
  209   unsigned int s, e;
  210   e = s = clock();
  211   while (fscanf(in, "%d", &data) != EOF)
  212   {
  213     if (data <= max) max = data;
  214     else
  215     {
  216       fclose(in);
  217       return 0;
  218     }
  219     count++;
  220     e = clock(); // 計算進度
  221     if (e - s > 1000)
  222     {
  223       printf("\r處理進度 %0.2f %%\t", (count * 100.0) / num);
  224       s = e;
  225     }
  226   }
  227   fclose(in);
  228   return 1;
  229 }
  230  
  231 // 判斷文件存在
  232 int fileexist(char *path)
  233 {
  234   FILE *fp = NULL;
  235  
  236   fp = fopen(path, "rt");
  237   if (fp)
  238   {
  239     fclose(fp);
  240     return 1;
  241   }
  242   else return 0;
  243 }
  244  
  245 int main(void)
  246 {
  247   char cmd_del[200]; // 刪除目錄
  248   char cmd_att[200]; // 設置隱藏
  249   char cmd_mkdir[200]; // 創建目錄
  250  
  251   // 初始化 cmd 命令, 創建工做目錄
  252   sprintf(cmd_del, "rmdir /s /q %s", tmpdir);
  253   sprintf(cmd_att, "attrib +h %s", tmpdir);
  254   sprintf(cmd_mkdir, "mkdir %s", tmpdir);
  255   if (access(path, 0) == 0) system(cmd_del);
  256   system(cmd_mkdir); // 創建工做目錄
  257   system(cmd_att); // 隱藏目錄
  258  
  259   // 隨機生成 2 億個數字
  260   if (!write_file()) return 0;
  261  
  262   map(); // 對 2 億個數字進行哈希, 即 Map
  263   calc(); // 對 300 個文件逐個排序
  264   reduce(); // 最後將 300 個有序文件合併成一個文件, 即 reduce
  265   if (check()) printf("\r排序正確!\t\t\t\n\n");
  266   else printf("\r排序錯誤!\t\t\t\n\n");
  267  
  268   system(cmd_del); // 刪除哈希文件
  269   remove(path); // 刪除 2 億數字文件
  270   remove(result); // 刪除排序後的文件
  271  
  272   return 0;
  273 }
相關文章
相關標籤/搜索