年前的一次任務,被插隊幫別的組作,須要一批兌換碼存起來,配合新年活動在某應用中,讓用戶抽獎領取,數量在1000w。俺首先固然是懷疑一次活動能有這麼多用戶?但人家爲了噱頭。php
因爲只是配合別人作一次活動,無需過後統計數據啥的,所以俺把它放到緩存裏邊(一條兌換碼就幾個字符,1000w個也佔不了多少空間),有點意思的就是如何導入這1000w兌換碼了。c++
若是是c/c++這種天生跑的快的,不作什麼鏈接數據庫等複雜操做,1000w次循環,是能夠1秒鐘完成的,可是服務器上跑腳本,得考慮到當前服務器運行着其餘任務,在腳本程序運行過程當中,還可能鏈接/斷開、讀寫數據庫,讀寫緩存等,自己也要花時間,須要分配大量的CPU時間才能搞定。確定得命令行窗口運行了,時間也不能設限制,可是若是隻是單獨寫一個for循環跑1000w次,這樣也得跑個幾天才行,所以一個簡單的多進程腳本就能夠大大加快速度了。shell
既然大數據量很是耗時,可能得幾個小時運行完成,這裏應該之後臺做業的形式執行腳本,具體是在執行時最後加上&(如 /usr/bin/php test1.php &),而爲了防止運行過程當中丟失shell會話,最好在執行前加上nohup,這樣可防止進程在執行時被系統掛起而中止,而後重定向一下輸出,命令執行時相似於這樣數據庫
/usr/bin/php test1.php > test1.log 2>1& &數組
整個思路也比較簡單,將兌換碼整理成一個一行,放在一個原始文件中,好比名字叫original_code_1.txt緩存
程序一行行讀取,設定一個行數,好比10w,每讀取這麼多行就換一個新文件寫入,即將這個大文件分紅許多小文件,用一個數組保存這些小文件的路徑名(如../../data/proc/proc_3d423v348k_1.txt)。服務器
文件劈分完成後,開啓多個進程處理,這個進程是實際執行導入操做的腳本,好比可命名爲import_code_into_cache.php,每次導入一個兌換碼成功,就把它寫到一箇中間文件中,好比../../data/mid/mid_cwe8422ici_1.txt併發
全部的文件跑完後,將中間數據(mid目錄下的額)彙總到end目錄下(../../data/end/end_1.txt),這樣若是導入中間某種緣由有丟失的,能夠將最終文件數據與原來的總數據對比、整理後,再次導入,能夠叫original_code_2.txt。函數
proc目錄:存放將大文件切分後的小文件,文件名相似proc_uniqid()_導入編號.txt工具
mid目錄:存放將小文件導入緩存,若成功則寫入這個文件,文件名相似mid_uniqid()_導入編號.txt
end目錄:存放最後導入彙總數據,文件名相似end_導入編號.txt
針對上面的簡單版本,在本身的機子上重寫了下,首先是配置文件config.php,規定程序執行過程當中的必要參數
<?php /** config.php 設置各項腳本執行參數 */ $num = isset($ARGV[1]) ? $ARGV[1] : 1; // 第幾回導入 define('DS', DIRECTORY_SEPARATOR); define('ROOT_PATH', realpath(dirname(__FILE__))); // 當前文件所在根目錄 $original_dir = ROOT_PATH.DS.'original'.DS; // 存放原始兌換碼數據目錄 $proc_dir = ROOT_PATH.DS.'data'.DS.'proc'.DS; // 待處理兌換碼文件目錄,存放切分後的文件 $mid_dir = ROOT_PATH.DS.'data'.DS.'mid'.DS; // 生成的中間數據目錄 $end_dir = ROOT_PATH.DS.'data'.DS.'end'.DS; // 最終彙總數據目錄 $original_file = $original_dir.'original_code_'.$num.'.txt'; // 原始兌換碼大文件 if(!file_exists($original_file)) { echo "file {$original_file} not exist!\n"; } $import_script = 'import_code_into_cache.php'; // 執行導入腳本的文件名 $exec_import_file = ROOT_PATH.DS.$import_script; // 執行導入的腳本的路徑名 if(!file_exists($exec_import_file)) { echo "the script that execute importion [{$exec_import_file}] not exist!\n"; } $max_proc_num = 5; // 最大併發進程數,測試 $max_file_line = 5; // 切分後單文件最大數據行數,這裏是測試 $is_del_mid = true; // 彙總時是否刪除中間數據
$num是本次導入的編號,即第幾回,經過腳本參數傳,而後是其餘存放原來大文件、切分後的數據目錄、中間數據目錄以及最終數據彙總結果等,而且規定了最大併發進程數,這裏寫的是5,每一個切分文件的行數,這裏由於是本機測試我寫的是5。以及最終將中間數據刪除。
經過以上思路,在程序開始以前,能夠先將切分數據目錄,或者稱之爲當前處理數據目錄的proc_dir、中間數據目錄mid_dir以及彙總目錄end_dir進行一次初始化,好比清空原有文件,不存在就新建這個目錄等。第二個函數是對大文件的切分,在一行行讀取原始兌換碼數據,寫到proc_dir目錄下,具體代碼以下:
<?php /** tool.php 需用到的公共函數 */ function init_dir($dir, $del = false) { if(!file_exists($dir)) { mkdir($dir, 0777, true); } else { chmod($dir, 0777); if($del) { $cmd = 'rm -rf '.$dir.'*'; exec($cmd); } } } // 初始化各個文件目錄 function init_workspace() { global $proc_dir, $mid_dir, $end_dir; init_dir($proc_dir, true); init_dir($mid_dir, true); init_dir($end_dir); } // 劈分原始數據文件 function get_split_file_list() { global $original_file, $proc_dir, $max_file_line, $num; $file_list = array(); if(!file_exists($original_file)) { return $file_list; } $i = 0; $handle = fopen($original_file, 'r'); if(!$handle) return $file_list; $proc_handle = null; while(!feof($handle)) // 循環讀取大文件 { $line_code = trim(fgets($handle)); if(empty($line_code)) continue; // 過濾空白行及fread讀取的文件空的末尾 if($i % $max_file_line == 0) // 寫入一個新文件,每一個文件行數最大爲$max_file_line { if(isset($proc_handle)) fclose($proc_handle); $filename = $proc_dir.'proc_'.uniqid().'_'.$num.'.txt'; // 切分文件的命名:proc_隨機串_導入編號.txt $file_list[] = $filename; if($proc_handle = fopen($filename, 'w')) { chmod($filename, 0777); fwrite($proc_handle, $line_code."\n"); // 寫入新的待處理文件,每一個文件行數是$max_file_line } } else // 寫入一個已存在的文件 { fwrite($proc_handle, $line_code."\n"); } ++ $i; } fclose($handle); fclose($proc_handle); return $file_list; }
get_split_file_list()函數功能爲把大文件切分紅小文件,將各個文件路徑放入一個數組並返回。
接下來是啓動進程,開始導入。我圖簡單寫了一個類,這個類大概有這樣幾個方法:(1)get_uniq_mid_file_name,獲取一箇中間數據的文件路徑名;(2)
merge_file,用於將中間數據彙總到最終數據中;(3)get_curr_proc_num,獲取當前正在執行導入的進程數;(4)process_data,根據進程數數否達到上限而開啓導入腳本進程,進行導入操做。代碼以下
<?php /** model.php 根據進程數,調用導入腳本,開始導入 */ class proj_manger { public $num; // 第幾回導入 public $max_proc_num; // 最大併發進程數 public $file_list; // 待處理劈分的文件(路徑) public $mid_dir; // 中間數據目錄 public $end_dir; // 最終彙總數據目錄 public $comm_mid_file; // 中間數據文件的統稱 public $import_script; // 執行導入的腳本名 public $exec_import_file; // 執行導入的腳本全路徑 public $is_del_mid_file; // 是否刪除中間數據 public function __construct($num, $max_proc_num, &$file_list, $mid_dir, $end_dir, $import_script, $exec_import_file, $is_del_mid_file) { $this->num = $num; $this->max_proc_num = $max_proc_num; $this->file_list = $file_list; $this->mid_dir = $mid_dir; $this->end_dir = $end_dir; $this->comm_mid_file = $this->mid_dir.'mid_*_'.$this->num.'.txt'; $this->import_script = $import_script; $this->exec_import_file = $exec_import_file; $this->is_del_mid_file = $is_del_mid_file; } // 獲取一箇中間數據文件路徑名 public function get_uniq_mid_file_name() { return $this->mid_dir.'mid_'.uniqid().'_'.$this->num.'.txt'; } // 將中間數據合併到最終數據目錄中的文件 private function merge_file() { $end_file = $this->end_dir.'end_'.$this->num.'.txt'; // 最終合併文件路徑名 $cmd = "cat {$this->comm_mid_file} > {$end_file}"; exec($cmd); // 執行合併 echo "merge file, execute cmd=>{$cmd}\n"; if($this->is_del_mid_file) { $cmd = "rm -rf {$this->comm_mid_file}"; exec($cmd); echo "delete middle file, execute cmd=>{$cmd}\n"; } } // 獲取當前進程數 public function get_curr_proc_num() { $cmd = "ps -ef | grep {$this->import_script} | grep -v grep | wc -l"; $handle = popen($cmd, 'r'); $pnum = trim(fread($handle, 512)); pclose($handle); echo "get current total import process num, execute cmd=>{$cmd}\n"; return (int)$pnum; } // 開啓進程,處理數據 public function process_data() { while(true) { $curr_proc_num = $this->get_curr_proc_num(); // 獲取當前正在執行導入腳本的進程數 if(count($this->file_list) > 0) // 若是存在需處理的文件數據 { if($curr_proc_num < $this->max_proc_num) { $curr_proc_file = array_pop($this->file_list); $curr_mid_file = $this->get_uniq_mid_file_name(); $cmd = "nohup /usr/bin/php {$this->exec_import_file} {$curr_proc_file} {$curr_mid_file} > import_code.log 2>&1 &"; // 開啓一個新進程 echo "start new proc: {$cmd}\n"; exec($cmd); } else // 進程數已達規定最大數目 { echo "please wait......\n"; } } else { // 處理完畢,合併文件並退出 $this->merge_file(); break; } sleep(1); } } }
這一行:$cmd = "nohup /usr/bin/php {$this->exec_import_file} {$curr_proc_file} {$curr_mid_file} > import_code.log 2>&1 &",大概就是
/usr/bin/php import_code_into_cache.php proc_232dh2842_1.txt mid_c2382nxr335_1.txt > import_code.log 2>&1 &
真正執行導入的是import_code_into_cache.php這個腳本,傳入的兩個參數是兩個問價名,一個是分配給當前這個腳本(進程)的切分後的文件,一個他要寫入的中間文件名,所以這裏是一個進程處理一個切分後的小文件,若是規定切分文件行數是10W,就是一個進程處理10w條記錄。若是當前進程數已達最大值,輸出please wait.......,只要一個小文件處理完並完整退出,它的進程就會消失,總的進程數就會減1,while循環的條件是true,所以又會檢查數組還有沒有數據,有的話檢查當前進程數是否小於最大值,小於的話,從新啓動一個import_code_into_cache進程,接續處理數組中的下一個文件名,因爲處理的是是從數組中array_pop出來的,每處理完一個,數組中少一個文件路徑名,等全部的處理完file_list長度爲0,進入while內最外層的else分支,開始彙總數據並break,break會退出這個死循環。
下面是實際導入程序import_code_into_cache.php
<?php /** import_code_into_cache.php 實際執行導入的腳本,導入數據並寫入中間數據文件 */ // 處理傳入的兩個文件名參數 $curr_proc_file = isset($argv[1]) ? $argv[1] : ''; // 當前處理的文件 $curr_mid_file = isset($argv[2]) ? $argv[2] : ''; // 當前要寫入的中間數據文件 $proc_handle = fopen($curr_proc_file, 'r'); $mid_handle = fopen($curr_mid_file, 'w'); if($proc_handle && $mid_handle) { chmod($curr_mid_file, 0777); while(!feof($proc_handle)) { $line = fgets($proc_handle); // TODO: 執行導入緩存等其餘操做,獲取操做結果 // 若是導入成功,寫入中間數據文件,這裏直接寫入 fwrite($mid_handle, $line); } } sleep(10); // 延遲10秒,僅僅爲了看進程測試
這裏實際導入腳本沒作任何有意義的操做,僅爲了演示,while循環讀取切分後的小文件,接下來能夠執行你想要的操做,成功後再fwrite進中間數據文件。
也就是說,並沒直接nohup /usr/bin/php import_code_into_cache.php *** 啓動導入程序,而是在已經運行的腳本中啓動的,並且對於一個正常完成的程序,它的進程會自動消失掉,這是檢測進程數的關鍵。
最後,須要一個腳本把全部這一切連起來,這是整個導入程序的直接入口(import.php)
<?php /** import.php 導入程序的直接入口 */ if($argc <= 1) { exit("input cmd like: nohup /usr/bin/php import.php 1 > import.log 2>&1 &"); } date_default_timezone_set("Asia/Shanghai"); set_time_limit(0); // 無時間限制 ini_set('memory_limit', '512M'); // 內存限制爲512M,僅爲示例 echo "\nprocess start at ".date('Y-m-d H:i:s')."\n\n"; $start_time = microtime(true); // 記錄開始時間 $GLOBALS['ARGV_INPUT'] = $argv; // 接收腳本參數 require_once('config.php'); // 配置文件 require_once('tool.php'); // 工具函數文件 require_once('model.php'); // 執行導入的類文件 init_workspace(); // 初始化待處理目錄、中間數據目錄、最終合併數據目錄 $file_list = get_split_file_list(); // 對文件進行劈分,分給各個進程處理 $file_num = count($file_list); if($file_num > 0) { echo "total file num: {$file_num}\n"; $proc_data_obj = new proj_manger($num, $max_proc_num, $file_list, $mid_dir, $end_dir, $import_script, $exec_import_file, $is_del_mid); $proc_data_obj->process_data(); } else { echo "no splited files to be processed.\n"; } $end_time = microtime(true); $length = sprintf('%.3f', $end_time - $start_time); echo "process end, spend time: ".$length." seconds.\n"; // 記錄本次執行時間 exit;
所以整個導入程序運行時是這樣工做的,先進入當前腳本所在目錄,確保當前目錄下的original目錄下有原始文件且名稱對應,執行/usr/bin/php import.php 1 > import.log 2>&1 &,整個導入程序便開始。這裏先調用了init_workspace簡單初始化一下各個須要的目錄,而後切分文件,而後new了一個proj_manager類並直接開始執行process_data方法,進行導入,從這個方法裏邊,執行實際導入數據的腳本,並保證同時最多有若干個進程在處理數據。
這裏在給import.php傳參數時只傳了一個數字,表示第幾回導入,若是想作的有擴展性點,好比某天爲某個應用某次導入數據,就能夠多傳幾個參數,並且文件的命名也會不一樣,甚至存放文件的目錄徹底換成一個日期爲名稱的目錄下邊,存放待處理數據、中間數據和彙總數據,具體看就要看需求了。
全部的代碼文件
original目錄存放原始大數據量文件,data目錄分別有三個目錄proc、mid、end,做用如前所述,正在處理的小文件目錄、中間數據文件目錄、最終彙總數據文件目錄。
original目錄下有一個測試文件 ,36行數據
運行示例以下(在個人mbp下邊)
運行後,ps -ef | grep import能夠看到有5個執行導入的進程,進程正在跑,完事兒後做業完成給出了Done的完成狀態,並顯示做業號[1]以及後邊的一串這個做業所執行的指令。
再看看proc和end目錄下的文件
測試時是每一個文件最多5行,總共36行數據,是能對上的。
固然俺在這兒沒講什麼設計,基本是怎麼簡單怎麼來,之後再須要處理這種大量的重複動做,拿這個爲模板,改改又能用了-_-