在laravel框架中使用mq

本文寫於2018-11-28php

一、部署laravel項目linux

https://github.com/laravel/laravel  經過git克隆項目,或者下載zip包而後解壓等方式均可以把laravel框架源碼下載下來。laravel

而後composer install 安裝各類依賴git

而後複製.env.example 爲.env文件,執行php artisan key:generate 生成APP_KEYgithub

 

二、上傳文件到github【備註:這一步能夠略過】segmentfault

1)git bash 到項目目錄,而後git init,初始化本地倉庫windows

2)git remote add origin 遠程庫地址數組

3)git add . 把文件緩存到緩衝區緩存

4)git commit -m '初始提交'bash

5)git push origin master  推送本地代碼到遠程庫master分支

 

三、安裝mq依賴

從  https://packagist.org/?query=rabbitmq 選擇一個依賴包

從上面截圖看到,能用的就是第一個和最後一個依賴。暫時先使用第一個依賴,後續有時間補充下第二個依賴的使用。

composer require php-amqplib/php-amqplib  便可安裝依賴

 四、laravel框架中使用mq

1)在config目錄下新增mq.php,文件內容:

<?php
return [
	'host' => env('MQ_HOST', '127.0.0.1'),
	'port' => env('MQ_PORT', 5672),
	'user' => env('MQ_USER', 'guest'),
	'password' => env('MQ_PASSWORD', 'guest'),
	'queue' => env('MQ_QUEUE', 'default'),
	'exchange' => env('MQ_EXCHANGE', 'default'),
	'key' => env('MQ_KEY', 'default'),
];

 具體的配置信息,能夠在.env文件中配置,也能夠修改這個文件。

2)新增MqSend.php文件,路徑:app/Console/Commands

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

class MqSend extends Command
{
    /**
     * 控制檯命令 signature 的名稱。
     *
     * @var string
     */
    protected $signature = 'mq:send {msg}';
    /**
     * 控制檯命令說明。
     *
     * @var string
     */
    protected $description = 'send messages to rabbitMQ';

    /**
     * 建立一個新的命令實例。
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * 執行控制檯命令。
     *
     * @return mixed
     */
    public function handle()
    {
        $host = config('mq.host');
        $port = config('mq.port');
        $user = config('mq.user');
        $password = config('mq.password');
        $queue = config('mq.queue');        // 隊列名稱
        $exchange = config('mq.exchange');  // 交換機名稱
        $key = config('mq.key');            // 隊列綁定交換機時配置的routingKey

        $connection = new AMQPConnection($host, $port, $user, $password);
        $channel = $connection->channel();

        /**
         * 若是管理後臺上已經配置了交換機、隊列,以及綁定了關係,則不須要下面的3條語句
         */
        $channel->exchange_declare($exchange, 'direct', false, true, false);    // 初始化交換機
        $channel->queue_declare($queue, false, true, false, false);             // 初始化隊列
        $channel->queue_bind($queue, $exchange, $key);  // 將隊列與某個交換機進行綁定,並使用路由關鍵字


        $msg = '[' . date('Y-m-d H:i:s') . '] ' . $this->argument('msg');
        $data = new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        $channel->basic_publish($data, $exchange, $queue);
        echo "[X] Sent: $msg \n";

        $channel->close();
        $connection->close();
    }
}

3)新增MqReceive.php文件, 路徑:app/Console/Commands

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

class MqReceive extends Command
{
    /**
     * 控制檯命令 signature 的名稱。
     *
     * @var string
     */
    protected $signature = 'mq:receive';
    /**
     * 控制檯命令說明。
     *
     * @var string
     */
    protected $description = 'receive messages to rabbitMQ';

    /**
     * 建立一個新的命令實例。
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * 執行控制檯命令。
     *
     * @return mixed
     */
    public function handle()
    {
        $host = config('mq.host');
        $port = config('mq.port');
        $user = config('mq.user');
        $password = config('mq.password');
        $queue = config('mq.queue');        // 隊列名稱
        $exchange = config('mq.exchange');  // 交換機名稱
        $key = config('mq.key');            // 隊列綁定交換機時配置的routingKey

        $connection = new AMQPConnection($host, $port, $user, $password);
        $channel = $connection->channel();

        /**
         * 若是管理後臺上已經配置了交換機、隊列,以及綁定了關係,則不須要下面的3條語句
         */
        $channel->queue_declare($queue, false, true, false, false);

        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

        $callback = function($msg) {
            echo " [x] Received ", $msg->body, "\n";
        };

        $channel->basic_consume($queue, '', false, true, false, false, $callback);

        while(count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
}

4)修改app/console/Kernel.php文件,在$commands數組中增長:

    protected $commands = [
        Commands\MqSend::class,
        Commands\MqReceive::class,
    ];

這樣子就能在php artisan看到有mq的命令:

5)生產者發佈消息

發佈成功。代碼中設置了交換機、隊列名、理由關鍵詞,這些默認值都是default,在rabbitMQ管理後臺能夠看到有新增了交換機、隊列,隊列裏面也有消息。

【備註】windows系統上可能執行生產者腳本會報錯:

這是由於windows不支持這個SOCKET_EAGAIN常量。

參考:https://github.com/php-amqplib/php-amqplib/issues/619

要改下vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php文件:

        // self::$ERRNO_EQUALS_EAGAIN = 'errno=' . SOCKET_EAGAIN;
        // windows不支持SOCKET_EAGAIN,因此會出現未定義的報錯。在linux上SOCKET_EAGAIN是SOCKET_EWOULDBLOCK的別名
        // https://github.com/php-amqplib/php-amqplib/issues/619  使用SOCKET_EWOULDBLOCK替換
        self::$ERRNO_EQUALS_EAGAIN = 'errno=' . (defined('SOCKET_EAGAIN') ? SOCKET_EAGAIN : SOCKET_EWOULDBLOCK);
        self::$ERRNO_EQUALS_EWOULDBLOCK = 'errno=' . SOCKET_EWOULDBLOCK;
        self::$ERRNO_EQUALS_EINTR = 'errno=' . SOCKET_EINTR;

修改完以後就沒問題了。

 

6)消費者接收消息

消息有2條,都接收到了。按Ctrl+C能夠退出消費者,由於消費是阻塞,一直在等待接收消息。

 

五、參考文檔:

1)laravel中新增artisan命令:https://laravel-china.org/docs/laravel/5.7/artisan/2276

2)laravel中使用 php-amqplib/php-amqplib 依賴包

https://segmentfault.com/a/1190000012308675

https://segmentfault.com/a/1190000011825148

相關文章
相關標籤/搜索