1,項目一級目錄新建一個server文件php
#!/usr/bin/env php <?php try { require __DIR__ . "/start.php"; } catch (\Exception $e) {} $queueNames = config('queue.qnames'); if (empty($queueNames)) exit('隊列名稱未配置'); $option = $argv[1]; $queueName = isset($argv[2]) ? $argv[2] : null; if ($queueName && !in_array($queueName, $queueNames)) exit('隊列名稱不存在'); if ($queueName) { unset($queueNames); $queueNames[] = $queueName; } echo "開始操做... \n"; switch ($option) { case 'start': startQueue($queueNames); break; case 'stop': stopQueue($queueNames); break; case 'reload': stopQueue($queueNames); startQueue($queueNames); break; case 'monitor': demon(); break; default: echo '操做類型: start|stop|reload|monitor' . "\n"; break; } echo "結束操做... \n"; function startQueue(array $queueNames) { $correct = 'dispatch/Correct/index$'; $ret = isProcessExist($correct); if($ret == 0){ $cmd = "nohup php start.php dispatch/Correct/index >> /tmp/correct.log &"; system($cmd,$result); ($result == 0) or die("$cmd 啓動失敗 \n"); echo "$correct 隊列已啓動 \n"; }else { echo "$correct 進程已存在,無需重啓 \n"; } foreach ($queueNames as $_queueName) { $proccessname = "dispatch/Consume/index/qname/$_queueName$"; if(isProcessExist($proccessname) == 1) { echo $_queueName . "進程已經存在,無需重啓 \n"; }else { $command = "nohup php start.php dispatch/Consume/index/qname/" . $_queueName." >> /tmp/$_queueName.log &"; system($command,$result); ($result == 0) or die("$command 啓動失敗 \n"); echo $_queueName . "隊列已啓動 \n"; } } } function demon() { $queues = config('queue.qnames'); foreach($queues as $queue) { if(!isProcessExist($queue)) { try{ startQueue([$queue]); }catch(\Exception $e){ echo "隊列監控啓動失敗: ".$e->getMessage()."\n"; } } } } function stopQueue(array $queueNames) { $redisconf = config('redis'); $redis = \library\Redis::getInstance($redisconf); foreach ($queueNames as $key => $_queueName) { $redis->hset('script:signal', $_queueName, false); \library\Queue::getInstance()->addEvent('Stop', [])->push($_queueName); $proccessname = "dispatch/Consume/index/qname/$_queueName$"; while(true) { usleep(200000); // 0.2s $result = isProcessExist($proccessname); if(!$result) { break; } unset($result); } echo $_queueName . "隊列已中止 \n"; } } function isProcessExist($processname) { $ps = 'ps axu|grep "'.$processname.'"|grep -v "grep"|wc -l'; $ret = shell_exec("$ps"); $ret = rtrim($ret, "\r\n"); return $ret; }
2,項目一級目錄新建一個start.phpredis
3,修改application/extra/queue.phpthinkphp
<?php // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // +---------------------------------------------------------------------- // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- return [ // 'connector' => 'Sync' 'qnames'=>[ 'Trade', //投資,變現,返本相關 'Activity', //活動相關 'Sms', //發短信相關 'Repay', //借款人還款 'Interest', //計息,派息相關 'DebtOnline', //債權上線相關 'Other' //其餘 ], 'driver' => env('QUEUE_DRIVER', 'redis'), ];