前面咱們已經學了如何部署ActiveMQ,
咱們知道經過ActiveMQ的一個管理後臺能夠查看任務隊列。php
用PHP來操做ActiveMQ,咱們能夠藉助一個第三方擴展。
下載:git
composer require fusesource/stomp-php:2.0.*
而後新建test.php:github
<?php require __DIR__.'/vendor/autoload.php'; //引入自動加載的文件 $connect = new \FuseSource\Stomp\Stomp('tcp://10.211.55.13/:61613'); $connect->connect(); $userId = 1001; $result = $connect->send('email',$userId); //好比發郵件 var_dump($result);
發送消息成功,打印bool(true)
json
咱們在ActiveMQ自帶的管理後臺查看,確實有一個名爲」email」的隊列。
服務器
上面咱們發送的一個id,咱們還能夠發送json數據。composer
$data = array('id'=>1001,'email'=>'110@qq.com','content'=>'test'); $result = $connect->send('email',json_encode($data));
咱們在MQ後臺能夠查看消息詳細
dom
上面的代碼到這裏,還不夠完美。若是咱們服務器重啓了activemq,沒有處理的消息會丟失。
這個時候咱們須要用到send()
方法的第三個參數。tcp
//消息持久化 persistent爲true,字符串的'true'
$result = $connect->send('email',json_encode($data),array('persistent'=>'true'));
給mq服務器發送消息(email消息)。
那麼在mq的隊列中的任務,又是怎麼處理的呢?優化
<?php require __DIR__.'/vendor/autoload.php'; //引入自動加載的文件 $connect = new \FuseSource\Stomp\Stomp('tcp://10.211.55.13/:61613'); $connect->connect(); //訂閱隊列消息 $connect->subscribe('email'); if ($connect->hasFrameToRead()){ $frame = $connect->readFrame(); print_r($frame); }
在mq服務端,訂閱(監聽)隊列消息。
在服務端是命令行下執行:php mqServer.php
若是有沒有處理的消息,能夠讀取出來,打印結果以下:ui
FuseSource\Stomp\Frame Object
(
[command] => MESSAGE
[headers] => Array ( [expires] => 0 [destination] => /queue/email [priority] => 4 [message-id] => ID:localhost.localdomain-38488-1488196907415-3:2:-1:1:1 [timestamp] => 1489477647931 ) [body] => {"id":1001,"email":"110@qq.com","content":"test"} )
body
就把咱們發送的內容讀取出來了。
咱們循環讀取(死循環)一直等待新消息:
do{ if ($connect->hasFrameToRead()){ $frame = $connect->readFrame(); print_r($frame->body); } } while (true);
處理消息以後(在發送郵件等業務完成以後),要通知mq我處理了該條消息了
if ($connect->hasFrameToRead()){ $frame = $connect->readFrame(); //print_r($frame->body); //作業務邏輯 //具體發送郵件的業務 //send email //最後通知mq,咱們已經處理了該條消息 $connect->ack($frame); }
咱們還能夠在優化一下代碼,解決死循環,控制循環(這裏是一種方案演示)
do{ //會等待,直到有可用消息,才執行後面代碼 if ($connect->hasFrameToRead()){ $frame = $connect->readFrame(); //print_r($frame->body); //作業務邏輯 //具體發送郵件的業務 //send email sleep(2); //模擬延時 //最後通知mq,咱們已經處理了該條消息 $connect->ack($frame); } //控制循環 $next = true; if (file_exists(__DIR__.'/stop')){ //若是有名爲stop的一個文件 //就不循環了 $next = false; } } while ($next);