1.將php-amqplib拷貝至yii2項目,新建phpclient類 php
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class PhpClient { public static function Call($n){ require_once __DIR__ . '/vendor/autoload.php'; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data=empty($n)?"Hello World!":$n; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); $channel->close(); $connection->close(); return true; } }
2.commands中新建controller簡單實現tasker和workeryii2
<?php namespace app\commands; use yii\console\Controller; use Yii; use PhpAmqpLib\Connection\AMQPStreamConnection; class RabbittaskController extends Controller { public function actionWorker() { require(Yii::getAlias('@vendor') . '/rabbitmq/vendor/autoload.php'); $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $callback = function ($msg) { //$msg->body //do sth $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } public function actionNewTask() { require(Yii::getAlias('@vendor') . '/rabbitmq/PhpClient.php'); \PhpClient::Call('test!'); } }