使用swoole的Process實現生產者消費者模型

零.源碼

<?php
abstract class Schedule{
    protected $_consumerList = array();
    protected $_msgqkey = null;

    protected $_consumerNum = 2;
    protected $_finishFlag = 'ALLDONE';

    public function __construct($cNum = 0){
        if ($cNum){
            $this->_consumerNum = $cNum;
        }
    }

    public function setConsumerNum($num = 0){
        if ($num){
            $this->_consumerNum = $num;
            return true;
        }

        return false;
    }

    public function setFinishFlag($flag = null){
        if ($flag){
            $this->_finishFlag = $flag;
            return true;
        }

        return false;
    }

    public function run(){
        $this->_consumerList = array();
        for($i=0; $i<$this->_consumerNum; $i++){
            $consumer = new swoole_process(function($worker){
                $this->_consumerFunc($worker);
            });

            if ($this->_msgqkey){
                $consumer->useQueue($this->_msgqkey);
            }
            else{
                $consumer->useQueue();
            }
            $pid = $consumer->start();

            $this->_consumerList[$pid] = $consumer;
        }

        $producer = new swoole_process(function($worker){
            //echo "i'm passer\n";
            exit(0);
        });

        if ($this->_msgqkey){
            $producer->useQueue($this->_msgqkey);
        }
        else{
            $producer->useQueue();
        }

        $pid = $producer->start();
        echo "begin:\n";
        echo sprintf("msgqkey:%s\n", $producer->msgQueueKey);

        $this->_producerFunc($producer);
    }

    protected function _producerFunc($worker){
        if ($this->_onlyConsume()){
            return;
        }

        foreach ($this->doProduce($worker) as $data){
            $worker->push($data);
        }

        //任務數據被取完
        while(true){
            $c = $worker->statQueue();
            $n = $c['queue_num'];
            if ($n === 0){
                break;  
            }
        }

        //放入consumer進程程結束標識
        foreach($this->_consumerList as $pid => $w){
            $w->push($this->_finishFlag);
        }

        //確認結束
        while(true){
            $c = $worker->statQueue();
            $n = $c['queue_num'];
            if ($n === 0){
                break;  
            }
        }

        $worker->freeQueue();
    }

    protected function _consumerFunc($worker){
        while(1){
            $data = $worker->pop();
            $pid = $worker->pid;
            if ($data == $this->_finishFlag){
                echo "consumer $pid exit\n";
                $worker->exit(0);
            }
            else{
                $this->doConsume($data, $worker);
            }
        }
    }

    protected function _onlyConsume(){
        return !! $this->_msgqkey;
    }

    abstract protected function doProduce($worker);

    abstract protected function doConsume($data, $worker);
}

一.功能說明

  1. 實現了生產者消費者模型,一個生產者向任務隊列寫數據,N個消費者取數據作處理。
  2. 數據處理完後生產者與消費者自動退出
  3. 在消費者意外掛掉的狀況下,容許單獨運行消費者繼續處理以前隊列中的任務

二.使用說明

1. 生產者消費者demo

<?php
class Taskdemo extends Schedule{
    protected $_consumerNum = 5;

    protected function doProduce($worker){
        $all = 100;
        for($i=0; $i<$all; $i+=4){
            yield json_encode(array('data'=>$i));
        }
    }

    protected function doConsume($data, $worker){
        //your process
        sleep(1);
        echo "consumer:{$worker->pid} redv {$data}\n";
    }
}

說明 
1. 要繼承Schedule 
2. _consumerNum爲消費者個數,不設置,默認2個。 
3. doProduce($worker)用於產生任務數據的函數,要求返回值必須是數組或迭代器,每一項爲一條任務數據。$worker爲swoole進程句柄。 
4. doConsume($data,$worker)用於消費者處理數據的函數。$data爲單條消息,$worker爲swoole進程句柄。 
5. 通常狀況進程句柄$worker都不會用到,能夠忽略php

2. 處理程序中途掛掉的狀況

步驟:linux

1.確認當前隊列的key 
程序運行時,會打出json

msgqkey:1078263
  •  

也能夠使用命令行數組

ipcs -q
------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0x001073f7 2359298    ballqiu    666        165          15

key值便是所須要的swoole

  1. 修改Taskdemo,加入一行代碼
protected $_msgqkey = 0x001073f7;
  1. 從新運行程序
  2. 手動刪除隊列
ipcrm -q $msgqkey
  •  

三.實現原理

  • 使用swoole的Process,主進程調用doProduce()向消息隊列寫任務數據,fork出的n個子進程從隊列取數據。隊列就是linux用於進程間通訊的消息隊列。
  • 子進程從隊列裏不停取任務處理,若是拿到「完成標識串」(一個特定字符串),就退出。
  • 主進程發現隊列數據被處理完後,若是有n個子進程,就向隊列發n個到「完成標識串」。而後再次檢查隊列,隊列空時刪除隊列,自身退出。

四.注意事項

  • 消息隊列的一些使用上的限制,能夠參見這裏
  • 默認的」完成標識串」是ALLDONE,如需修改,可在Taskdemo中增長
protected $_finishFlag = 'youflag';
相關文章
相關標籤/搜索