MySql實現事務型消息隊列以及php多進程消費設計

因公司業務須要,最近在設計一個通用隊列功能模塊,主體要求兩大點:php

  • 用MySql實現事務型消息隊列(固然,主流的隊列服務可以使用redis或者rabbitmq等,此處討論的是mysql實現)
  • php多進程消費隊列消息

用MySql實現事務型消息隊列

消息隊列的做用有:異步化、解耦和消除峯值等。目前異步化對於我來講使用最頻繁,在不少業務場景下,咱們能夠將實時性要求較低的請求轉爲異步處理,減少系統負載壓力,提升系統穩定性。在離線數據異步處理過程當中,消息隊列要知足如下要求:mysql

  • 消息不能丟失,即便在系統失敗的狀況下。消息一旦被插入就必定會被至少處理一次(只被處理一次是最好的,可是實現起來有難度,因此只要求at-least-once semantic);
  • FIFO順序。(mysql id自增可知足此特性。固然,能夠設計特殊參數作特殊處理)
  • 支持多生產者(mysql支持併發操做,支持此特色)
  • 支持多消費者。每一個消息只能被其中一個消費者處理(業務的處理須要考慮冪等性)。

以上是隊列實現的說明,具體用MySql實現事務型消息隊列能夠參考文章
https://spockwangs.github.io/...git

這次設計的表結構以下:github

CREATE TABLE `comom_queue` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '隊列類型,代碼業務備註',
  `conn_id` int(11) NOT NULL DEFAULT '0' COMMENT '消費者標識',
  `param_content` text COMMENT '隊列入參',
  `callback` varchar(255) NOT NULL DEFAULT '' COMMENT '隊列消費回調函數',
  `status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '0新建 1消費中 2成功 3失敗 4需重試',
  `create_time` int(11) NOT NULL DEFAULT '0' COMMENT '建立時間',
  `update_time` int(11) NOT NULL DEFAULT '0' COMMENT '狀態變動時間',
  `preexec_time` int(11) NOT NULL DEFAULT '0' COMMENT '預消費時間',
  `p_key` varchar(100) NOT NULL DEFAULT '' COMMENT '業務惟一標識key,查詢用',
  `mark` varchar(255) NOT NULL DEFAULT '' COMMENT '備註',
  PRIMARY KEY (`id`),
  KEY `indx_s` (`p_key`,`type`) USING BTREE,
  KEY `indx_exec` (`conn_id`,`status`) USING BTREE,
  KEY `indx_ty` (`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

說明下幾個字段的設計:redis

  • callback 隊列中不一樣的業務消息有不一樣的業務處理,利用callback值回調對應的業務方法
  • type 隊列業務類型,區分不一樣的業務,可用不一樣的消費者分開消費。在FIFO的特色外,可單獨開消費者對有特殊要求(消息優先級高)的業務消息進行消費
  • preexec_time 預消費時間,有的業務消息有消費時間要求,可設置出隊列時間

php多進程消費設計

這次php多進程的實現依賴pcntl,posix擴展,讀者可自行檢查是否安裝了此拓展。queue隊列服務設計和實現包括如下功能點:sql

  • 主進程和子進程的運行時間可配
  • 主進程(master進程)建立和監聽子進程行爲
  • 建立定時器信號,主進程(master進程)定時監聽隊列信息,可用於消息堆積通知等
  • 子進程(worker進程)消費消息
  • 針對不一樣的業務消息可配置不一樣數量的子進程
  • 各個業務子進程數可配置正常拉起數和最大進程數,根據隊列積壓狀況,子進程動態啓動進程數(暫未實現,後續添加)

很少說了,直接看代碼,抽離出來的queue服務類代碼以下:數組

<?php

/**
 * Created by PhpStorm.
 * User: Javion
 * Date: 2018/12/7
 * Time: 15:10
 */
abstract class queue
{
    protected $process = []; // 子進程數組  ['type' => 'process_num']
    protected $child   = []; // 子進程pid數組
    protected $result  = []; // 計算的結果
    protected $overTime = 0; //主進程超時時間
    protected $startTime; //主進程運行時間
    protected $childOverTime = 3600; //子進程超時時間
    protected $alarm_time = 2;
    public function __construct($process = [], $overTime = 0, $childOverTime = 3600)
    {
        if (!function_exists('pcntl_fork')) {
            die("pcntl_fork not existing");
        }
        $this->process  = $process;
        $this->overTime = $overTime;
        $this->childOverTime = $childOverTime;
        $this->startTime = time();
    }
    /**
     * 設置子進程
     */
    public function setProcess($process)
    {
        $this->process = $process;
    }

    /**
     * 設置檢測時間間隔 單位s
     */
    public function setAlarmTime($time){
        $this->alarm_time = $time;
    }

    /**
     * fork 子進程
     */
    protected function forkProcess()
    {
        //循環建立每一個type 的消費子進程
        $process  = $this->process;
        foreach($process as $key => $num) {
            for ($i = 0; $i < $num; $i++){
                $this->forkOneProcess($key);
            }
        }
        return $this;
    }

    /**
     * 建立子進程操做
     * @param $key
     * @return $this
     */
    private function forkOneProcess($key)
    {
        $pid = pcntl_fork();
        if ($pid == 0) {
            $id = getmypid();
            $this->processDo($id, $key);
            exit(0);
        } else if ($pid > 0) {
            //記錄子進程信息
            $childProcess = array(
                'pid' => $pid,
                'type' => $key,
                'create_time' => time()
            );
            $this->child[$pid] = $childProcess;
        }
        return $this;
    }

    /**
     * 子進程作的事情,消費者
     */
    abstract protected function processDo($id, $key);

    /**
     * 隊列數量檢測
     */
    abstract protected function checkQueueNum();

    /**
     * 等待子進程結束
     */
    protected function waiteProcess()
    {
        while(count($this->child)) {
            foreach($this->child as $pid => $item){
                $res = pcntl_waitpid($pid,$status,WNOHANG);
                pcntl_signal_dispatch();
                if ( -1 == $res || $res > 0 ) {
                    unset($this->child[$pid]);
                    echo "pid $pid 退出", PHP_EOL;
                    //判斷主進程是否超時 未超時拉起新的子進程
                    $leftTime = time() - $this->startTime;
                    if ($this->overTime > $leftTime){
                        $this->forkOneProcess($item['type']);
                        echo "建立新進程", PHP_EOL;
                    }
                }//判斷子進程是否存在且超時,超過期限20分鐘則強制退出
                elseif (posix_kill($pid, 0) && (time() - $item['create_time'] - 20*60) > $this->childOverTime){
                    posix_kill($pid, SIGUSR1);
                    echo "pid $pid 退出2", PHP_EOL;
                }
            }
        }

        return $this;
    }

    /**
     * 隊列檢測
     */
    protected function timeHandler(){
        $this->checkQueueNum();
        pcntl_alarm($this->alarm_time);
    }

    /**
     * 啓動
     */
    public function runProcess() {
        //註冊信號
        pcntl_signal(SIGALRM, array($this, 'timeHandler'));
        pcntl_alarm($this->alarm_time);
        $leftTime = time() - $this->startTime;
        while(($this->overTime ==0 || $this->overTime > $leftTime)){
            echo "新進程processlist", PHP_EOL;
            $this->forkProcess()->waiteProcess();
            $leftTime = time() - $this->startTime;
        }
    }
}

最後一個功能點:各個業務子進程數可配置正常拉起數和最大進程數,根據隊列積壓狀況,子進程動態啓動進程數 暫未實現。目前的queue服務設計如上,請各位看官多多指教!併發

相關文章
相關標籤/搜索