PHP多進程消費隊列

引言

最近開發一個小功能,用到了隊列mcq,啓動一個進程消費隊列數據,後邊發現一個進程處理不過來了,又加了一個進程,過了段時間又處理不過來了......php

這種方式每次都要修改crontab,若是進程掛掉了,不會及時的啓動,要等到下次crontab執行的時候纔會啓動。關閉(重啓)進程的時候用的是kill,這可能會丟失正在處理的數據,好比下面這個例子,咱們假設sleep過程就是處理邏輯,這裏爲了明顯看出效果,將處理時間放大到10s:linux

<?php
$i = 1;
while (1) {
    echo "開始第[{$i}]次循環\n";
    sleep(10);
    echo "結束第[{$i}]次循環\n";
    $i++;
}

當咱們運行腳本以後,等到循環開始以後,給進程發送 kill {$pid},默認發送的是編號爲15的SIGTERM信號。假設$i是從隊列拿到的,拿到2的時候,正在處理,咱們給程序發送了kill信號,和隊列數據丟失同樣,問題比較大,所以我要想辦法解決這些問題。nginx

開始第[1]次循環
結束第[1]次循環
開始第[2]次循環


[1]    28372 terminated  php t.php

nginx進程模型

這時候我想到了nginx,nginx做爲高性能服務器的中流砥柱,爲成千上萬的企業和我的服務,他的進程模型比較經典,以下所示:git

nginx 進程模型 圖片來自網絡

管理員經過master進程和nginx進行交互,從/path/to/nginx.pid讀取nginx master進程的pid,發送信號給master進程,master根據不一樣的信號作出不一樣的處理,而後反饋信息給管理員。worker是master進程fork出來的,master負責管理worker,不會去處理業務,worker纔是具體業務的處理者,master能夠控制worker的退出、啓動,當worker意外退出,master會收到子進程退出的消息,也會從新啓動新的worker進程補充上來,不讓業務處理受影響。nginx還能夠平滑退出,不丟失任何一個正在處理的數據,更新配置時nginx能夠作到不影響線上服務來加載新的配置,這在請求量很大的時候特別有用。github

進程設計

看了nginx的進模型,咱們徹底能夠開發一個相似的類庫來知足處理mcq數據的需求,作到單文件控制全部進程、能夠平滑退出、能夠查看子進程狀態。不須要太複雜,由於咱們處理隊列數據接收必定的延遲,作到nginx那樣不間斷服務比較麻煩,費時費力,意義不是很大。設計的進程模型跟nginx相似,更像是nginx的簡化版本。
多進程模型shell

進程信號量設計

信號量是進程間通信的一種方式,比較簡單,單功能也比較弱,只能發送信號給進程,進程根據信號作出不一樣的處理。編程

master進程啓動的時候保存pid到文件/path/to/daeminze.pid,管理員經過信號和master進程通信,master進程安裝3種信號,碰到不一樣的信號,作出不一樣的處理,以下所示:windows

SIGINT  => 平滑退出,處理完正在處理的數據再退出
SIGTERM => 暴力退出,不管進程是否正在處理數據直接退出
SIGUSR1 => 查看進程狀態,查看進程佔用內存,運行時間等信息

master進程經過信號和worker進程通信,worker進程安裝了2個信號,以下所示:服務器

SIGINT  => 平滑退出
SIGUSR1 => 查看worker進程自身狀態

爲何worker進程只安裝2個信號呢,少了個SIGTERM,由於master進程收到信號SIGTERM以後,向worker進程發送SIGKILL信號,默認強制關閉進程便可。網絡

worker進程是經過master進程fork出來的,這樣master進程能夠經過pcntl_wait來等待子進程退出事件,當有子進程退出的時候返回子進程pid,作處理並啓動新的進程補充上來。

master進程也經過pcntl_wait來等待接收信號,當有信號到達的時候,會返回-1,這個地方還有些坑,在下文中會詳細講。

PHP中有2種信號觸發的方式,第一種方式是declare(ticks = 1);,這種效率不高,Zend每執行一次低級語句,都會去檢查進程中是否有未處理的信號,如今已經不多使用了,PHP 5.3.0及以前的版本可能會用到這個。

第二種是經過pcntl_signal_dispatch來調用未處理的信號,PHP 5.4.0及以後的版本適用,能夠巧妙的將該函數放在循環中,性能上基本沒什麼損失,如今推薦適用。

PHP安裝修信號量

PHP經過pcntl_signal安裝信號,函數聲明以下所示:

bool pcntl_signal ( int $signo , [callback $handler [, bool $restart_syscalls = true ] )

第三個參數restart_syscalls不太好理解,找了不少資料,也沒太查明白,通過試驗發現,這個參數對pcntl_wait函數接收信號有影響,當設置爲缺省值true的時候,發送信號,進程用pcntl_wait收不到,必須設置爲false才能夠,看看下面這個例子:

<?php
$i = 0;
while ($i<5) {
    $pid = pcntl_fork();
    $random = rand(10, 50);
    if ($pid == 0) {
        sleep($random);
        exit();
    }
    echo "child {$pid} sleep {$random}\n";
    $i++;
}

pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
});

while (1) {
    $pid = pcntl_wait($status);
    var_dump($pid);
    pcntl_signal_dispatch();
}

運行以後,咱們對父進程發送kill -SIGINT {$pid}信號,發現pcntl_wait沒有反應,等到有子進程退出的時候,發送過的SIGINT會一個個執行,好比下面結果:

child 29643 sleep 48
child 29644 sleep 24
child 29645 sleep 37
child 29646 sleep 20
child 29647 sleep 31
int(29643)
Ctrl + C
Ctrl + C
Ctrl + C
Ctrl + C
int(29646)

這是運行腳本以後立刻給父進程發送了四次SIGINT信號,等到一個子進程推出的時候,全部信號都會觸發。

但當把安裝信號的第三個參數設置爲false

pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
}, false);

這時候給父進程發送SIGINT信號,pcntl_wait會立刻返回-1,信號對應的事件也會觸發。

因此第三個參數大概意思就是,是否從新註冊此信號,若是爲false只註冊一次,觸發以後就返回,pcntl_wait就能收到消息,若是爲true,會重複註冊,不會返回,pcntl_wait收不到消息。

信號量和系統調用

信號量會打斷系統調用,讓系統調用馬上返回,好比sleep,當進程正在sleep的時候,收到信號,sleep會立刻返回剩餘sleep秒數,好比:

<?php
pcntl_signal(SIGINT,  function($signo) {
     echo "Ctrl + C\n";
}, false);

while (true) {
    pcntl_signal_dispatch();
    echo "123\n";
    $limit = sleep(2);
    echo "limit sleep [{$limit}] s\n";
}

運行以後,按Ctrl + C,結果以下所示:

123
^Climit sleep [1] s
Ctrl + C
123
limit sleep [0] s
123
^Climit sleep [1] s
Ctrl + C
123
^Climit sleep [2] s

daemon(守護)進程

這種進程通常設計爲daemon進程,不受終端控制,不與終端交互,長時間運行在後臺,而對於一個進程,咱們能夠經過下面幾個步驟把他升級爲一個標準的daemon進程:

protected function daemonize()
{
    $pid = pcntl_fork();
    if (-1 == $pid) {
        throw new Exception("fork進程失敗");
    } elseif ($pid != 0) {
        exit(0);
    }
    if (-1 == posix_setsid()) {
        throw new Exception("新創建session會話失敗");
    }

    $pid = pcntl_fork();
    if (-1 == $pid) {
        throw new Exception("fork進程失敗");
    } else if($pid != 0) {
        exit(0);
    }

    umask(0);
    chdir("/");
}

攏共分五步:

  1. fork子進程,父進程退出。
  2. 設置子進程爲會話組長,進程組長。
  3. 再次fork,父進程退出,子進程繼續運行。
  4. 恢復文件掩碼爲0
  5. 切換當前目錄到根目錄/

第2步是爲第1步作準備,設置進程爲會話組長,必要條件是進程非進程組長,所以作第一次fork,進程組長(父進程)退出,子進程經過posix_setsid()設置爲會話組長,同時也爲進程組長。

第3步是爲了避免讓進程從新控制終端,由於一個進程控制一個終端的必要條件是會話組長(pid=sid)。

第4步是爲了恢復默認的文件掩碼,避免以前作的操做對文件掩碼作了設置,帶來沒必要要的麻煩。關於文件掩碼, linux中,文件掩碼在建立文件、文件夾的時候會用到,文件的默認權限爲666,文件夾爲777,建立文件(夾)的時候會用默認值減去掩碼的值做爲建立文件(夾)的最終值,好比掩碼022下建立文件666 - 222 = 644,建立文件夾777 - 022 = 755

掩碼 新建文件權限 新建文件夾權限
umask(0) 666 (-rw-rw-rw-) 777 (drwxrwxrwx)
umask(022) 644 (-rw-r--r--) 755 (drwxr-xr-x)

第5步是切換了當前目錄到根目錄/,網上說避免起始運行他的目錄不能被正確卸載,這個不是太瞭解。

對應5步,每一步的各類id變化信息:

操做後 pid ppid pgid sid
開始 17723 31381 17723 31381
第一次fork 17723 1 17723 31381
posix_setsid() 17740 1 17740 17740
第二次fork 17840 1 17740 17740

另外,會話、進程組、進程的關係以下圖所示,這張圖有助於更好的理解。
會話、進程組、進程關係

至此,你也能夠輕鬆地造出一個daemon進程了。

命令設計

我準備給這個類庫設計6個命令,以下所示:

  1. start 啓動命令
  2. restart 強制重啓
  3. stop 平滑中止
  4. reload 平滑重啓
  5. quit 強制中止
  6. status 查看進程狀態

啓動命令

啓動命令就是默認的流程,按照默認流程走就是啓動命令,啓動命令會檢測pid文件中是否已經有pid,pid對應的進程是否健康,是否須要從新啓動。

強制中止命令

管理員經過入口文件結合pid給master進程發送SIGTERM信號,master進程給全部子進程發送SIGKILL信號,等待全部worker進程退出後,master進程也退出。

強制重啓命令

強制中止命令 + 啓動命令

平滑中止命令

平滑中止命令,管理員給master進程發送SIGINT信號,master進程給全部子進程發送SIGINT,worker進程將自身狀態標記爲stoping,當worker進程下次循環的時候會根據stoping決定中止,不在接收新的數據,等全部worker進程退出以後,master進程也退出。

平滑重啓命令

平滑中止命令 + 啓動命令

查看進程狀態

查看進程狀態這個借鑑了workerman的思路,管理員給master進程發送SIGUSR1信號,告訴主進程,我要看全部進程的信息,master進程,master進程將自身的進程信息寫入配置好的文件路徑A中,而後發送SIGUSR1,告訴worker進程把本身的信息也寫入文件A中,因爲這個過程是異步的,不知道worker進程啥時候寫完,因此master進程在此處等待,等全部worker進程都寫入文件以後,格式化全部的信息輸出,最後輸出的內容以下所示:

➜/dir /usr/local/bin/php DaemonMcn.php status
Daemon [DaemonMcn] 信息:
-------------------------------- master進程狀態 --------------------------------
pid       佔用內存       處理次數       開始時間                 運行時間
16343     0.75M          --             2018-05-15 09:42:45      0 天 0 時 3 分
12 slaver
-------------------------------- slaver進程狀態 --------------------------------
任務task-mcq:
16345     0.75M          236            2018-05-15 09:42:45      0 天 0 時 3 分
16346     0.75M          236            2018-05-15 09:42:45      0 天 0 時 3 分
--------------------------------------------------------------------------------
任務test-mcq:
16348     0.75M          49             2018-05-15 09:42:45      0 天 0 時 3 分
16350     0.75M          49             2018-05-15 09:42:45      0 天 0 時 3 分
16358     0.75M          49             2018-05-15 09:42:45      0 天 0 時 3 分
16449     0.75M          1              2018-05-15 09:46:40      0 天 0 時 0 分
--------------------------------------------------------------------------------

等待worker進程將進程信息寫入文件的時候,這個地方用了個比較trick的方法,每一個worker進程輸出一行信息,統計文件的行數,達到worker進程的行數以後表示全部worker進程都將信息寫入完畢,不然,每一個1s檢測一次。

其餘設計

另外還加了兩個比較實用的功能,一個是worker進程運行時間限制,一個是worker進程循環處理次數限制,防止長時間循環進程出現內存溢出等意外狀況。時間默認是1小時,運行次數默認是10w次。

除此以外,也能夠支持多任務,每一個任務幾個進程獨立開,統一由master進程管理。

代碼已經放到github中,有興趣的能夠試試,不支持windows哦,有什麼錯誤還望指出來。

參考文章

憑記憶想到的參考文章,查了好多忘記了

  1. Linux多任務編程(七)---Linux守護進程及其基礎實驗
  2. workerman源碼
相關文章
相關標籤/搜索