因公司業務須要,最近在設計一個通用隊列功能模塊,主體要求兩大點:php
消息隊列的做用有:異步化、解耦和消除峯值等。目前異步化對於我來講使用最頻繁,在不少業務場景下,咱們能夠將實時性要求較低的請求轉爲異步處理,減少系統負載壓力,提升系統穩定性。在離線數據異步處理過程當中,消息隊列要知足如下要求:mysql
以上是隊列實現的說明,具體用MySql實現事務型消息隊列能夠參考文章
https://spockwangs.github.io/...git
這次設計的表結構以下:github
CREATE TABLE `comom_queue` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id', `type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '隊列類型,代碼業務備註', `conn_id` int(11) NOT NULL DEFAULT '0' COMMENT '消費者標識', `param_content` text COMMENT '隊列入參', `callback` varchar(255) NOT NULL DEFAULT '' COMMENT '隊列消費回調函數', `status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '0新建 1消費中 2成功 3失敗 4需重試', `create_time` int(11) NOT NULL DEFAULT '0' COMMENT '建立時間', `update_time` int(11) NOT NULL DEFAULT '0' COMMENT '狀態變動時間', `preexec_time` int(11) NOT NULL DEFAULT '0' COMMENT '預消費時間', `p_key` varchar(100) NOT NULL DEFAULT '' COMMENT '業務惟一標識key,查詢用', `mark` varchar(255) NOT NULL DEFAULT '' COMMENT '備註', PRIMARY KEY (`id`), KEY `indx_s` (`p_key`,`type`) USING BTREE, KEY `indx_exec` (`conn_id`,`status`) USING BTREE, KEY `indx_ty` (`type`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
說明下幾個字段的設計:redis
這次php多進程的實現依賴pcntl,posix擴展,讀者可自行檢查是否安裝了此拓展。queue隊列服務設計和實現包括如下功能點:sql
很少說了,直接看代碼,抽離出來的queue服務類代碼以下:數組
<?php /** * Created by PhpStorm. * User: Javion * Date: 2018/12/7 * Time: 15:10 */ abstract class queue { protected $process = []; // 子進程數組 ['type' => 'process_num'] protected $child = []; // 子進程pid數組 protected $result = []; // 計算的結果 protected $overTime = 0; //主進程超時時間 protected $startTime; //主進程運行時間 protected $childOverTime = 3600; //子進程超時時間 protected $alarm_time = 2; public function __construct($process = [], $overTime = 0, $childOverTime = 3600) { if (!function_exists('pcntl_fork')) { die("pcntl_fork not existing"); } $this->process = $process; $this->overTime = $overTime; $this->childOverTime = $childOverTime; $this->startTime = time(); } /** * 設置子進程 */ public function setProcess($process) { $this->process = $process; } /** * 設置檢測時間間隔 單位s */ public function setAlarmTime($time){ $this->alarm_time = $time; } /** * fork 子進程 */ protected function forkProcess() { //循環建立每一個type 的消費子進程 $process = $this->process; foreach($process as $key => $num) { for ($i = 0; $i < $num; $i++){ $this->forkOneProcess($key); } } return $this; } /** * 建立子進程操做 * @param $key * @return $this */ private function forkOneProcess($key) { $pid = pcntl_fork(); if ($pid == 0) { $id = getmypid(); $this->processDo($id, $key); exit(0); } else if ($pid > 0) { //記錄子進程信息 $childProcess = array( 'pid' => $pid, 'type' => $key, 'create_time' => time() ); $this->child[$pid] = $childProcess; } return $this; } /** * 子進程作的事情,消費者 */ abstract protected function processDo($id, $key); /** * 隊列數量檢測 */ abstract protected function checkQueueNum(); /** * 等待子進程結束 */ protected function waiteProcess() { while(count($this->child)) { foreach($this->child as $pid => $item){ $res = pcntl_waitpid($pid,$status,WNOHANG); pcntl_signal_dispatch(); if ( -1 == $res || $res > 0 ) { unset($this->child[$pid]); echo "pid $pid 退出", PHP_EOL; //判斷主進程是否超時 未超時拉起新的子進程 $leftTime = time() - $this->startTime; if ($this->overTime > $leftTime){ $this->forkOneProcess($item['type']); echo "建立新進程", PHP_EOL; } }//判斷子進程是否存在且超時,超過期限20分鐘則強制退出 elseif (posix_kill($pid, 0) && (time() - $item['create_time'] - 20*60) > $this->childOverTime){ posix_kill($pid, SIGUSR1); echo "pid $pid 退出2", PHP_EOL; } } } return $this; } /** * 隊列檢測 */ protected function timeHandler(){ $this->checkQueueNum(); pcntl_alarm($this->alarm_time); } /** * 啓動 */ public function runProcess() { //註冊信號 pcntl_signal(SIGALRM, array($this, 'timeHandler')); pcntl_alarm($this->alarm_time); $leftTime = time() - $this->startTime; while(($this->overTime ==0 || $this->overTime > $leftTime)){ echo "新進程processlist", PHP_EOL; $this->forkProcess()->waiteProcess(); $leftTime = time() - $this->startTime; } } }
最後一個功能點:各個業務子進程數可配置正常拉起數和最大進程數,根據隊列積壓狀況,子進程動態啓動進程數 暫未實現。目前的queue服務設計如上,請各位看官多多指教!併發