轉自:http://netsmell.com/post/how-sort-10-billion-data.html?ref=myreadhtml
海量數據處理/外部歸併排序 - 分治.cpppnode
今天要給100億個數字排序,100億個 int 型數字放在文件裏面大概有 37.2GB,很是大,內存一次裝不下了。那麼確定是要拆分紅小的文件一個一個來處理,最終在合併成一個排好序的大文件。c++
實現思路git
1.把這個37GB的大文件,用哈希分紅1000個小文件,每一個小文件平均38MB左右(理想狀況),把100億個數字對1000取模,模出來的結果在0到999之間,每一個結果對應一個文件,因此我這裏取的哈希函數是 h = x % 1000,哈希函數取得」好」,能使衝突減少,結果分佈均勻。github
2.拆分完了以後,獲得一些幾十MB的小文件,那麼就能夠放進內存裏排序了,能夠用快速排序,歸併排序,堆排序等等。算法
3.1000個小文件內部排好序以後,就要把這些內部有序的小文件,合併成一個大的文件,能夠用二叉堆來作1000路合併的操做,每一個小文件是一路,合併後的大文件仍然有序。函數
首先遍歷1000個文件,每一個文件裏面取第一個數字,組成 (數字, 文件號) 這樣的組合加入到堆裏(假設是從小到大排序,用小頂堆),遍歷完後堆裏有1000個 (數字,文件號) 這樣的元素
而後不斷從堆頂拿元素出來,每拿出一個元素,把它的文件號讀取出來,而後去對應的文件裏,加一個元素進入堆,直到那個文件被讀取完。拿出來的元素固然追加到最終結果的文件裏。
按照上面的操做,直到堆被取空了,此時最終結果文件裏的所有數字就是有序的了。
最後我用c++寫了個實驗程序,具體代碼在這裏能夠看到。post
如何拆分大文件?spa
一個32G的大文件,用fopen()打開不會所有加載到內存的,而後for循環遍歷啊,把每一個數字對1000取模,會獲得0到999種結果,而後每種結果在寫入到新的文件中,就拆分了code
// 對 2 億個數字進行排序, 約 10 G 的文件, 每一個數字 int 能表示 | ||
3 | // 算法流程 | |
4 | // 將 10 G 的文件散列到 300 個文件中, 每一個文件大約 35 MB | |
5 | // 對 35 MB 的小文件內部排序, 或者分發到多臺計算機中, 並行處理 MapReduce | |
6 | // 最後使用最小堆, 進行 300 路歸併排序, 合成大文件 | |
7 | // 再寫一個算法判斷 2 億個數字是否有序 | |
8 | ||
9 | #include <stdio.h> | |
10 | #include <stdlib.h> | |
11 | #include <time.h> | |
12 | #include <io.h> | |
13 | #include <queue> | |
14 | ||
15 | #define FILE_NUM 300 // 哈希文件數 | |
16 | #define HASH(a) (a % FILE_NUM) | |
17 | ||
18 | int num = 6000000; // 2 億個數字, 手動改 | |
19 | char path[20] = "c:\\data.dat"; // 待排文件 | |
20 | char result[20] = "c:\\result.dat"; // 排序後文件 | |
21 | char tmpdir[100] = "c:\\hashfile"; // 臨時目錄 | |
22 | ||
23 | // 隨機生成 2 億個數字 | |
24 | int write_file(void) | |
25 | { | |
26 | FILE *out = NULL; | |
27 | int i; | |
28 | ||
29 | printf("\n正在生成 %d 個數字...\n\n", num); | |
30 | out = fopen(path, "wt"); | |
31 | if (out == NULL) return 0; | |
32 | ||
33 | unsigned int s, e; | |
34 | e = s = clock(); | |
35 | for (i=0; i<num; i++) | |
36 | { | |
37 | e = clock(); | |
38 | if (e - s > 1000) // 計算進度 | |
39 | { | |
40 | printf("\r處理進度 %0.2f %%\t", (i * 100.0) / num); | |
41 | s = e; | |
42 | } | |
43 | fprintf(out, "%d\n", | |
44 | (rand() % 31623) * (rand() % 31623)); | |
45 | } | |
46 | fclose(out); | |
47 | return 1; | |
48 | } | |
49 | ||
50 | // 對 2 億個數字進行哈希, 分散到子文件中 | |
51 | // 入口參數: path, tmpdir | |
52 | int map(void) | |
53 | { | |
54 | FILE *in = NULL; | |
55 | FILE *tmp[FILE_NUM + 5]; | |
56 | char hashfile[512]; // 哈希文件地址 | |
57 | int data, add; | |
58 | int i; | |
59 | ||
60 | printf("\r正在哈希 %s\n\n", path); | |
61 | in = fopen(path, "rt"); | |
62 | if (in == NULL) return 0; | |
63 | for (i=0; i<FILE_NUM; i++) tmp[i] = NULL; | |
64 | ||
65 | // 開始哈希, 核心代碼要儘量的加速 | |
66 | unsigned int s, e; | |
67 | e = s = clock(); | |
68 | i = 0; | |
69 | while (fscanf(in, "%d", &data) != EOF) | |
70 | { | |
71 | add = HASH(data); | |
72 | if (tmp[add] == NULL) | |
73 | { | |
74 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, add); | |
75 | tmp[add] = fopen(hashfile, "a"); | |
76 | } | |
77 | fprintf(tmp[add], "%d\n", data); | |
78 | ||
79 | i++; | |
80 | e = clock(); // 計算進度 | |
81 | if (e - s > 1000) | |
82 | { | |
83 | printf("\r處理進度 %0.2f %%\t", (i * 100.0) / num); | |
84 | s = e; | |
85 | } | |
86 | } | |
87 | for (i=0; i<FILE_NUM; i++) | |
88 | if (tmp[i]) fclose(tmp[i]); | |
89 | fclose(in); | |
90 | ||
91 | return 1; | |
92 | } | |
93 | ||
94 | // 對 300 個文件逐個排序, 採用堆排序 STL 的優先隊列 | |
95 | void calc(void) | |
96 | { | |
97 | int fileexist(char *path); // 判斷文件存在 | |
98 | std::priority_queue<int> q; // 堆排序 | |
99 | char hashfile[512]; | |
100 | FILE *fp = NULL; | |
101 | int i, data; | |
102 | ||
103 | // 逐個處理 300 個文件, 或者將這些文件發送到其它計算機中並行處理 | |
104 | for (i=0; i<FILE_NUM; i++) | |
105 | { | |
106 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, i); | |
107 | if (fileexist(hashfile)) | |
108 | { | |
109 | printf("\r正在排序 hash_%d.~tmp\t", i); | |
110 | ||
111 | // 小文件從磁盤加入內存中 | |
112 | fp = fopen(hashfile, "rt"); | |
113 | while (fscanf(fp, "%d", &data) != EOF) | |
114 | { | |
115 | q.push(data); | |
116 | // 優先隊列默認是大頂堆, 即降序排序 | |
117 | // 要升序須要重載 () 運算符 | |
118 | } | |
119 | fclose(fp); | |
120 | ||
121 | // 排序後再從內存寫回磁盤 | |
122 | fp = fopen(hashfile, "wt"); // 覆蓋模式寫 | |
123 | while (!q.empty()) | |
124 | { | |
125 | fprintf(fp, "%d\n", q.top()); | |
126 | q.pop(); | |
127 | } | |
128 | fclose(fp); | |
129 | } | |
130 | } | |
131 | } | |
132 | ||
133 | typedef struct node // 隊列結點 | |
134 | { | |
135 | int data; | |
136 | int id; // 哈希文件的編號 | |
137 | bool operator < (const node &a) const | |
138 | { return data < a.data; } | |
139 | }node; | |
140 | ||
141 | // 將 300 個有序文件合併成一個文件, K 路歸併排序 | |
142 | int reduce(void) | |
143 | { | |
144 | int fileexist(char *path); | |
145 | std::priority_queue<node> q; // 堆排序 | |
146 | FILE *file[FILE_NUM + 5]; | |
147 | FILE *out = NULL; | |
148 | char hashfile[512]; | |
149 | node tmp, p; | |
150 | int i, count = 0; | |
151 | ||
152 | printf("\r正在合併 %s\n\n", result); | |
153 | out = fopen(result, "wt"); | |
154 | if (out == NULL) return 0; | |
155 | for (i=0; i<FILE_NUM; i++) file[i] = NULL; | |
156 | for (i=0; i<FILE_NUM; i++) // 打開所有哈希文件 | |
157 | { | |
158 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, i); | |
159 | if (fileexist(hashfile)) | |
160 | { | |
161 | file[i] = fopen(hashfile, "rt"); | |
162 | fscanf(file[i], "%d", &tmp.data); | |
163 | tmp.id = i; | |
164 | q.push(tmp); // 初始化隊列 | |
165 | count++; // 計數器 | |
166 | printf("\r入隊進度 %0.2f %%\t", (count * 100.0) / FILE_NUM); | |
167 | } | |
168 | } | |
169 | unsigned int s, e; | |
170 | e = s = clock(); | |
171 | while (!q.empty()) // 開始 K 路歸併 | |
172 | { | |
173 | tmp = q.top(); | |
174 | q.pop(); | |
175 | // 將堆頂的元素寫回磁盤, 再從磁盤中拿一個到內存 | |
176 | fprintf(out, "%d\n", tmp.data); | |
177 | if (fscanf(file[tmp.id], "%d", &p.data) != EOF) | |
178 | { | |
179 | p.id = tmp.id; | |
180 | q.push(p); | |
181 | count++; | |
182 | } | |
183 | ||
184 | e = clock(); // 計算進度 | |
185 | if (e - s > 1000) | |
186 | { | |
187 | printf("\r處理進度 %0.2f %%\t", (count * 100.0) / num); | |
188 | s = e; | |
189 | } | |
190 | } | |
191 | for (i=0; i<FILE_NUM; i++) | |
192 | if (file[i]) fclose(file[i]); | |
193 | fclose(out); | |
194 | ||
195 | return 1; | |
196 | } | |
197 | ||
198 | int check(void) // 檢查是否降序排序 | |
199 | { | |
200 | FILE *in = NULL; | |
201 | int max = 0x7FFFFFFF; | |
202 | int data; | |
203 | int count = 0; | |
204 | ||
205 | printf("\r正在檢查文件正確性...\n\n"); | |
206 | in = fopen(result, "rt"); | |
207 | if (in == NULL) return 0; | |
208 | ||
209 | unsigned int s, e; | |
210 | e = s = clock(); | |
211 | while (fscanf(in, "%d", &data) != EOF) | |
212 | { | |
213 | if (data <= max) max = data; | |
214 | else | |
215 | { | |
216 | fclose(in); | |
217 | return 0; | |
218 | } | |
219 | count++; | |
220 | e = clock(); // 計算進度 | |
221 | if (e - s > 1000) | |
222 | { | |
223 | printf("\r處理進度 %0.2f %%\t", (count * 100.0) / num); | |
224 | s = e; | |
225 | } | |
226 | } | |
227 | fclose(in); | |
228 | return 1; | |
229 | } | |
230 | ||
231 | // 判斷文件存在 | |
232 | int fileexist(char *path) | |
233 | { | |
234 | FILE *fp = NULL; | |
235 | ||
236 | fp = fopen(path, "rt"); | |
237 | if (fp) | |
238 | { | |
239 | fclose(fp); | |
240 | return 1; | |
241 | } | |
242 | else return 0; | |
243 | } | |
244 | ||
245 | int main(void) | |
246 | { | |
247 | char cmd_del[200]; // 刪除目錄 | |
248 | char cmd_att[200]; // 設置隱藏 | |
249 | char cmd_mkdir[200]; // 創建目錄 | |
250 | ||
251 | // 初始化 cmd 命令, 創建工做目錄 | |
252 | sprintf(cmd_del, "rmdir /s /q %s", tmpdir); | |
253 | sprintf(cmd_att, "attrib +h %s", tmpdir); | |
254 | sprintf(cmd_mkdir, "mkdir %s", tmpdir); | |
255 | if (access(path, 0) == 0) system(cmd_del); | |
256 | system(cmd_mkdir); // 創建工做目錄 | |
257 | system(cmd_att); // 隱藏目錄 | |
258 | ||
259 | // 隨機生成 2 億個數字 | |
260 | if (!write_file()) return 0; | |
261 | ||
262 | map(); // 對 2 億個數字進行哈希, 即 Map | |
263 | calc(); // 對 300 個文件逐個排序 | |
264 | reduce(); // 最後將 300 個有序文件合併成一個文件, 即 reduce | |
265 | if (check()) printf("\r排序正確!\t\t\t\n\n"); | |
266 | else printf("\r排序錯誤!\t\t\t\n\n"); | |
267 | ||
268 | system(cmd_del); // 刪除哈希文件 | |
269 | remove(path); // 刪除 2 億數字文件 | |
270 | remove(result); // 刪除排序後的文件 | |
271 | ||
272 | return 0; | |
273 | } |