thinkphp 實現rabbitMq常駐進程消費隊列

1,項目一級目錄新建一個server文件php

#!/usr/bin/env php
<?php
try {
    require __DIR__ . "/start.php";
} catch (\Exception $e) {}

$queueNames = config('queue.qnames');
if (empty($queueNames)) exit('隊列名稱未配置');

$option = $argv[1];
$queueName = isset($argv[2]) ? $argv[2] : null;
if ($queueName && !in_array($queueName, $queueNames)) exit('隊列名稱不存在');

if ($queueName) {
    unset($queueNames);
    $queueNames[] = $queueName;
}

echo "開始操做... \n";
switch ($option) {
    case 'start':
        startQueue($queueNames);
        break;

    case 'stop':
        stopQueue($queueNames);
        break;

    case 'reload':
        stopQueue($queueNames);
        startQueue($queueNames);
        break;

    case 'monitor':
        demon();
        break;

    default:
        echo '操做類型: start|stop|reload|monitor' . "\n";
        break;
}

echo "結束操做... \n";

function startQueue(array $queueNames)
{
    $correct = 'dispatch/Correct/index$';
    $ret = isProcessExist($correct);
    if($ret ==  0){
        $cmd = "nohup php start.php dispatch/Correct/index >> /tmp/correct.log &";
        system($cmd,$result);
        ($result == 0) or die("$cmd 啓動失敗 \n"); 
        echo "$correct 隊列已啓動 \n"; 
    }else {
        echo "$correct 進程已存在,無需重啓 \n";
    }
    foreach ($queueNames as $_queueName) {
        $proccessname = "dispatch/Consume/index/qname/$_queueName$";
        if(isProcessExist($proccessname) == 1) {
            echo $_queueName . "進程已經存在,無需重啓 \n";
        }else {
            $command = "nohup php start.php dispatch/Consume/index/qname/" . $_queueName." >> /tmp/$_queueName.log &";
            system($command,$result);
            ($result == 0) or die("$command 啓動失敗 \n");
            echo $_queueName . "隊列已啓動 \n";
        } 
    }
}

function demon()
{
    $queues = config('queue.qnames'); 
    
    
    foreach($queues as $queue) {
        if(!isProcessExist($queue)) {
            try{
                startQueue([$queue]);
            }catch(\Exception $e){
                echo "隊列監控啓動失敗: ".$e->getMessage()."\n";
            }
        }
    }
}

function stopQueue(array $queueNames)
{
    $redisconf = config('redis');
    $redis = \library\Redis::getInstance($redisconf);
    foreach ($queueNames as $key => $_queueName) {
        $redis->hset('script:signal', $_queueName, false);
        \library\Queue::getInstance()->addEvent('Stop', [])->push($_queueName);
        $proccessname = "dispatch/Consume/index/qname/$_queueName$";
        while(true) {
            usleep(200000); // 0.2s
            $result = isProcessExist($proccessname);
            if(!$result) {
                break;
            }
            unset($result);
        }
        echo $_queueName . "隊列已中止 \n";
    }
}

function isProcessExist($processname) 
{
    $ps = 'ps axu|grep "'.$processname.'"|grep -v "grep"|wc -l';
    $ret = shell_exec("$ps");
    $ret = rtrim($ret, "\r\n");
    return $ret;
}
View Code

2,項目一級目錄新建一個start.phpredis

 

3,修改application/extra/queue.phpthinkphp

 

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

return [
//    'connector' => 'Sync'
    'qnames'=>[
        'Trade',        //投資,變現,返本相關
        'Activity',        //活動相關
        'Sms',            //發短信相關
        'Repay',        //借款人還款
        'Interest',     //計息,派息相關
        'DebtOnline',   //債權上線相關
        'Other'            //其餘
    ],
    'driver' => env('QUEUE_DRIVER', 'redis'),
];
View Code
相關文章
相關標籤/搜索