yii2用php-amqplib使用rabbitmq

1.將php-amqplib拷貝至yii2項目,新建phpclient類  php

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PhpClient
{
   public static function Call($n){
       require_once __DIR__ . '/vendor/autoload.php';

       $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
       $channel = $connection->channel();

       $channel->queue_declare('task_queue', false, true, false, false);

       $data=empty($n)?"Hello World!":$n;

       $msg = new AMQPMessage($data,
           array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
       );

       $channel->basic_publish($msg, '', 'task_queue');

       $channel->close();
       $connection->close();

       return true;
   }
}

2.commands中新建controller簡單實現tasker和workeryii2

<?php
namespace app\commands;

use yii\console\Controller;
use Yii;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbittaskController extends Controller
{
    public function actionWorker()
    {
        require(Yii::getAlias('@vendor') . '/rabbitmq/vendor/autoload.php');

        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        $channel->queue_declare('task_queue', false, true, false, false);

        $callback = function ($msg) {
       //$msg->body
            //do sth

            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };

        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('task_queue', '', false, false, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

    public function actionNewTask()
    {
        require(Yii::getAlias('@vendor') . '/rabbitmq/PhpClient.php');
        \PhpClient::Call('test!');
    }
}
相關文章
相關標籤/搜索