一次多進程的小嚐試

  年前的一次任務,被插隊幫別的組作,須要一批兌換碼存起來,配合新年活動在某應用中,讓用戶抽獎領取,數量在1000w。俺首先固然是懷疑一次活動能有這麼多用戶?但人家爲了噱頭。php

  因爲只是配合別人作一次活動,無需過後統計數據啥的,所以俺把它放到緩存裏邊(一條兌換碼就幾個字符,1000w個也佔不了多少空間),有點意思的就是如何導入這1000w兌換碼了。c++

  若是是c/c++這種天生跑的快的,不作什麼鏈接數據庫等複雜操做,1000w次循環,是能夠1秒鐘完成的,可是服務器上跑腳本,得考慮到當前服務器運行着其餘任務,在腳本程序運行過程當中,還可能鏈接/斷開、讀寫數據庫,讀寫緩存等,自己也要花時間,須要分配大量的CPU時間才能搞定。確定得命令行窗口運行了,時間也不能設限制,可是若是隻是單獨寫一個for循環跑1000w次,這樣也得跑個幾天才行,所以一個簡單的多進程腳本就能夠大大加快速度了。shell

  既然大數據量很是耗時,可能得幾個小時運行完成,這裏應該之後臺做業的形式執行腳本,具體是在執行時最後加上&(如 /usr/bin/php test1.php &),而爲了防止運行過程當中丟失shell會話,最好在執行前加上nohup,這樣可防止進程在執行時被系統掛起而中止,而後重定向一下輸出,命令執行時相似於這樣數據庫

  /usr/bin/php test1.php > test1.log 2>1& &數組

  整個思路也比較簡單,將兌換碼整理成一個一行,放在一個原始文件中,好比名字叫original_code_1.txt緩存

  程序一行行讀取,設定一個行數,好比10w,每讀取這麼多行就換一個新文件寫入,即將這個大文件分紅許多小文件,用一個數組保存這些小文件的路徑名(如../../data/proc/proc_3d423v348k_1.txt)。服務器

  文件劈分完成後,開啓多個進程處理,這個進程是實際執行導入操做的腳本,好比可命名爲import_code_into_cache.php,每次導入一個兌換碼成功,就把它寫到一箇中間文件中,好比../../data/mid/mid_cwe8422ici_1.txt併發

  全部的文件跑完後,將中間數據(mid目錄下的額)彙總到end目錄下(../../data/end/end_1.txt),這樣若是導入中間某種緣由有丟失的,能夠將最終文件數據與原來的總數據對比、整理後,再次導入,能夠叫original_code_2.txt。函數

  proc目錄:存放將大文件切分後的小文件,文件名相似proc_uniqid()_導入編號.txt工具

  mid目錄:存放將小文件導入緩存,若成功則寫入這個文件,文件名相似mid_uniqid()_導入編號.txt

  end目錄:存放最後導入彙總數據,文件名相似end_導入編號.txt

  針對上面的簡單版本,在本身的機子上重寫了下,首先是配置文件config.php,規定程序執行過程當中的必要參數

<?php
    /**
      config.php
      設置各項腳本執行參數
     */
    $num = isset($ARGV[1]) ? $ARGV[1] : 1; // 第幾回導入

    define('DS', DIRECTORY_SEPARATOR);
    define('ROOT_PATH', realpath(dirname(__FILE__))); // 當前文件所在根目錄
    
    $original_dir = ROOT_PATH.DS.'original'.DS;   // 存放原始兌換碼數據目錄
    $proc_dir = ROOT_PATH.DS.'data'.DS.'proc'.DS; // 待處理兌換碼文件目錄,存放切分後的文件
    $mid_dir = ROOT_PATH.DS.'data'.DS.'mid'.DS;   // 生成的中間數據目錄
    $end_dir = ROOT_PATH.DS.'data'.DS.'end'.DS;   // 最終彙總數據目錄

    $original_file = $original_dir.'original_code_'.$num.'.txt'; // 原始兌換碼大文件
    if(!file_exists($original_file))
    {
        echo "file {$original_file} not exist!\n";
    }
    
    $import_script = 'import_code_into_cache.php';   // 執行導入腳本的文件名
    $exec_import_file = ROOT_PATH.DS.$import_script; // 執行導入的腳本的路徑名
    if(!file_exists($exec_import_file))
    {
        echo "the script that execute importion [{$exec_import_file}] not exist!\n";
    }

    $max_proc_num = 5;  // 最大併發進程數,測試
    $max_file_line = 5; // 切分後單文件最大數據行數,這裏是測試
    $is_del_mid = true; // 彙總時是否刪除中間數據

   $num是本次導入的編號,即第幾回,經過腳本參數傳,而後是其餘存放原來大文件、切分後的數據目錄、中間數據目錄以及最終數據彙總結果等,而且規定了最大併發進程數,這裏寫的是5,每一個切分文件的行數,這裏由於是本機測試我寫的是5。以及最終將中間數據刪除。

  經過以上思路,在程序開始以前,能夠先將切分數據目錄,或者稱之爲當前處理數據目錄的proc_dir、中間數據目錄mid_dir以及彙總目錄end_dir進行一次初始化,好比清空原有文件,不存在就新建這個目錄等。第二個函數是對大文件的切分,在一行行讀取原始兌換碼數據,寫到proc_dir目錄下,具體代碼以下:

<?php
    /**
      tool.php
      需用到的公共函數
     */
    function init_dir($dir, $del = false)
    {
        if(!file_exists($dir))
        {
            mkdir($dir, 0777, true);
        }
        else
        {
            chmod($dir, 0777);
            if($del)
            {
                $cmd = 'rm -rf '.$dir.'*';
                exec($cmd);
            }
        }
        
    }   

    // 初始化各個文件目錄
    function init_workspace()
    {
        global $proc_dir, $mid_dir, $end_dir;
        init_dir($proc_dir, true);
        init_dir($mid_dir, true);
        init_dir($end_dir);
    }
    // 劈分原始數據文件
    function get_split_file_list()
    {
        global $original_file, $proc_dir, $max_file_line, $num;
        $file_list = array();
        if(!file_exists($original_file))
        {
            return $file_list;
        }
        $i = 0;
        $handle = fopen($original_file, 'r');
        if(!$handle) return $file_list;

        $proc_handle = null;

        while(!feof($handle)) // 循環讀取大文件
        {
            $line_code = trim(fgets($handle));
            if(empty($line_code)) continue;  // 過濾空白行及fread讀取的文件空的末尾

            if($i % $max_file_line == 0)  // 寫入一個新文件,每一個文件行數最大爲$max_file_line
            {
                if(isset($proc_handle)) fclose($proc_handle);
                $filename = $proc_dir.'proc_'.uniqid().'_'.$num.'.txt'; // 切分文件的命名:proc_隨機串_導入編號.txt
                $file_list[] = $filename;
                if($proc_handle = fopen($filename, 'w'))
                {
                    chmod($filename, 0777);
                    fwrite($proc_handle, $line_code."\n");    // 寫入新的待處理文件,每一個文件行數是$max_file_line
                }      
            }
            else  // 寫入一個已存在的文件
            {      
                fwrite($proc_handle, $line_code."\n");
            }
            ++ $i;
        }

        fclose($handle);
        fclose($proc_handle);
        return $file_list;
    }

  get_split_file_list()函數功能爲把大文件切分紅小文件,將各個文件路徑放入一個數組並返回。

  接下來是啓動進程,開始導入。我圖簡單寫了一個類,這個類大概有這樣幾個方法:(1)get_uniq_mid_file_name,獲取一箇中間數據的文件路徑名;(2)

merge_file,用於將中間數據彙總到最終數據中;(3)get_curr_proc_num,獲取當前正在執行導入的進程數;(4)process_data,根據進程數數否達到上限而開啓導入腳本進程,進行導入操做。代碼以下

<?php
    /**
      model.php
      根據進程數,調用導入腳本,開始導入
     */
    class proj_manger
    {
        public $num;               // 第幾回導入
        public $max_proc_num;      // 最大併發進程數
        public $file_list;         // 待處理劈分的文件(路徑)
        public $mid_dir;           // 中間數據目錄
        public $end_dir;           // 最終彙總數據目錄
        public $comm_mid_file;     // 中間數據文件的統稱
        public $import_script;     // 執行導入的腳本名
        public $exec_import_file;  // 執行導入的腳本全路徑
        public $is_del_mid_file;   // 是否刪除中間數據

        public function __construct($num, $max_proc_num, &$file_list, $mid_dir, $end_dir, $import_script, $exec_import_file, $is_del_mid_file)
        {
            $this->num = $num;
            $this->max_proc_num = $max_proc_num;
            $this->file_list = $file_list;
            $this->mid_dir = $mid_dir;
            $this->end_dir = $end_dir;

            $this->comm_mid_file = $this->mid_dir.'mid_*_'.$this->num.'.txt';

            $this->import_script = $import_script;
            $this->exec_import_file = $exec_import_file;

            $this->is_del_mid_file = $is_del_mid_file;
        }
        // 獲取一箇中間數據文件路徑名
        public function get_uniq_mid_file_name()
        {  
            return $this->mid_dir.'mid_'.uniqid().'_'.$this->num.'.txt';
        }
        // 將中間數據合併到最終數據目錄中的文件
        private function merge_file()
        {

            $end_file = $this->end_dir.'end_'.$this->num.'.txt';  // 最終合併文件路徑名
            $cmd = "cat {$this->comm_mid_file} > {$end_file}";
            exec($cmd); // 執行合併
            echo "merge file, execute cmd=>{$cmd}\n";
            if($this->is_del_mid_file)
            {
                $cmd = "rm -rf {$this->comm_mid_file}";
                exec($cmd);
                echo "delete middle file, execute cmd=>{$cmd}\n";
            }
        }
        // 獲取當前進程數
        public function get_curr_proc_num()
        {
            $cmd = "ps -ef | grep {$this->import_script} | grep -v grep | wc -l";
            $handle = popen($cmd, 'r');
            $pnum = trim(fread($handle, 512));
            pclose($handle);
            echo "get current total import process num, execute cmd=>{$cmd}\n";
            return (int)$pnum; 
        }      
        // 開啓進程,處理數據
        public function process_data()
        {
            while(true)
            {
                $curr_proc_num = $this->get_curr_proc_num(); // 獲取當前正在執行導入腳本的進程數
                if(count($this->file_list) > 0) // 若是存在需處理的文件數據
                {
                    if($curr_proc_num < $this->max_proc_num)
                    {
                        $curr_proc_file = array_pop($this->file_list);
                        $curr_mid_file = $this->get_uniq_mid_file_name();
                        $cmd = "nohup /usr/bin/php {$this->exec_import_file} {$curr_proc_file} {$curr_mid_file} > import_code.log 2>&1 &";
                        // 開啓一個新進程
                        echo "start new proc: {$cmd}\n";
                        exec($cmd);
                    }
                    else  // 進程數已達規定最大數目
                    {
                        echo "please wait......\n";
                    }
                }
                else
                {
                    // 處理完畢,合併文件並退出
                    $this->merge_file();
                    break;
                }
                sleep(1);
            }
        }
    }

  這一行:$cmd = "nohup /usr/bin/php {$this->exec_import_file} {$curr_proc_file} {$curr_mid_file} > import_code.log 2>&1 &",大概就是

  /usr/bin/php import_code_into_cache.php proc_232dh2842_1.txt mid_c2382nxr335_1.txt > import_code.log 2>&1 &

  真正執行導入的是import_code_into_cache.php這個腳本,傳入的兩個參數是兩個問價名,一個是分配給當前這個腳本(進程)的切分後的文件,一個他要寫入的中間文件名,所以這裏是一個進程處理一個切分後的小文件,若是規定切分文件行數是10W,就是一個進程處理10w條記錄。若是當前進程數已達最大值,輸出please wait.......,只要一個小文件處理完並完整退出,它的進程就會消失,總的進程數就會減1,while循環的條件是true,所以又會檢查數組還有沒有數據,有的話檢查當前進程數是否小於最大值,小於的話,從新啓動一個import_code_into_cache進程,接續處理數組中的下一個文件名,因爲處理的是是從數組中array_pop出來的,每處理完一個,數組中少一個文件路徑名,等全部的處理完file_list長度爲0,進入while內最外層的else分支,開始彙總數據並break,break會退出這個死循環。

  下面是實際導入程序import_code_into_cache.php

<?php
    /**
      import_code_into_cache.php
      實際執行導入的腳本,導入數據並寫入中間數據文件
     */
    // 處理傳入的兩個文件名參數
    $curr_proc_file = isset($argv[1]) ? $argv[1] : '';  // 當前處理的文件
    $curr_mid_file = isset($argv[2]) ? $argv[2] : '';   // 當前要寫入的中間數據文件

    $proc_handle = fopen($curr_proc_file, 'r');
    $mid_handle = fopen($curr_mid_file, 'w');   
    
    if($proc_handle && $mid_handle)
    {
        chmod($curr_mid_file, 0777);
        while(!feof($proc_handle))
        {
            $line = fgets($proc_handle);
            // TODO: 執行導入緩存等其餘操做,獲取操做結果
            // 若是導入成功,寫入中間數據文件,這裏直接寫入
            fwrite($mid_handle, $line);
        }
    }

    sleep(10);  // 延遲10秒,僅僅爲了看進程測試

  這裏實際導入腳本沒作任何有意義的操做,僅爲了演示,while循環讀取切分後的小文件,接下來能夠執行你想要的操做,成功後再fwrite進中間數據文件。

  也就是說,並沒直接nohup /usr/bin/php import_code_into_cache.php *** 啓動導入程序,而是在已經運行的腳本中啓動的,並且對於一個正常完成的程序,它的進程會自動消失掉,這是檢測進程數的關鍵。

  最後,須要一個腳本把全部這一切連起來,這是整個導入程序的直接入口(import.php)

<?php
    /**
      import.php
      導入程序的直接入口
     */
    if($argc <= 1)
    {
        exit("input cmd like: nohup /usr/bin/php import.php 1 > import.log 2>&1 &");
    }
    
    date_default_timezone_set("Asia/Shanghai");
    set_time_limit(0);                   // 無時間限制
    ini_set('memory_limit', '512M');     // 內存限制爲512M,僅爲示例
    
    echo "\nprocess start at ".date('Y-m-d H:i:s')."\n\n";

    $start_time = microtime(true);       // 記錄開始時間
    $GLOBALS['ARGV_INPUT'] = $argv;      // 接收腳本參數

    require_once('config.php'); // 配置文件
    require_once('tool.php');   // 工具函數文件
    require_once('model.php');  // 執行導入的類文件

    init_workspace();  // 初始化待處理目錄、中間數據目錄、最終合併數據目錄
    $file_list = get_split_file_list();  // 對文件進行劈分,分給各個進程處理
    $file_num = count($file_list);      

    if($file_num > 0)
    {
        echo "total file num: {$file_num}\n";
        $proc_data_obj = new proj_manger($num, $max_proc_num, $file_list, $mid_dir, $end_dir, $import_script, $exec_import_file, $is_del_mid);
        $proc_data_obj->process_data();
    }
    else
    {
        echo "no splited files to be processed.\n";
    }

    $end_time = microtime(true);
    $length = sprintf('%.3f', $end_time - $start_time);
    echo "process end, spend time: ".$length." seconds.\n";  // 記錄本次執行時間
    exit;

  所以整個導入程序運行時是這樣工做的,先進入當前腳本所在目錄,確保當前目錄下的original目錄下有原始文件且名稱對應,執行/usr/bin/php import.php 1 > import.log 2>&1 &,整個導入程序便開始。這裏先調用了init_workspace簡單初始化一下各個須要的目錄,而後切分文件,而後new了一個proj_manager類並直接開始執行process_data方法,進行導入,從這個方法裏邊,執行實際導入數據的腳本,並保證同時最多有若干個進程在處理數據。

  這裏在給import.php傳參數時只傳了一個數字,表示第幾回導入,若是想作的有擴展性點,好比某天爲某個應用某次導入數據,就能夠多傳幾個參數,並且文件的命名也會不一樣,甚至存放文件的目錄徹底換成一個日期爲名稱的目錄下邊,存放待處理數據、中間數據和彙總數據,具體看就要看需求了。

  全部的代碼文件  

  original目錄存放原始大數據量文件,data目錄分別有三個目錄proc、mid、end,做用如前所述,正在處理的小文件目錄、中間數據文件目錄、最終彙總數據文件目錄。

  original目錄下有一個測試文件 ,36行數據

  運行示例以下(在個人mbp下邊)

  

  運行後,ps -ef | grep import能夠看到有5個執行導入的進程,進程正在跑,完事兒後做業完成給出了Done的完成狀態,並顯示做業號[1]以及後邊的一串這個做業所執行的指令。

  再看看proc和end目錄下的文件

       

  測試時是每一個文件最多5行,總共36行數據,是能對上的。

  固然俺在這兒沒講什麼設計,基本是怎麼簡單怎麼來,之後再須要處理這種大量的重複動做,拿這個爲模板,改改又能用了-_-

相關文章
相關標籤/搜索