PHP中使用ActiveMQ實現消息隊列

前面咱們已經學了如何部署ActiveMQ, 
咱們知道經過ActiveMQ的一個管理後臺能夠查看任務隊列。php

今天

這裏寫圖片描述 
用PHP來操做ActiveMQ,咱們能夠藉助一個第三方擴展。 
下載:git

composer require fusesource/stomp-php:2.0.*

 

而後新建test.php:github

<?php

require __DIR__.'/vendor/autoload.php'; //引入自動加載的文件

$connect = new \FuseSource\Stomp\Stomp('tcp://10.211.55.13/:61613');
$connect->connect();

$userId = 1001;
$result = $connect->send('email',$userId); //好比發郵件
var_dump($result);

 

這裏寫圖片描述
發送消息成功,打印bool(true)json

咱們在ActiveMQ自帶的管理後臺查看,確實有一個名爲」email」的隊列。 
這裏寫圖片描述服務器

上面咱們發送的一個id,咱們還能夠發送json數據。composer

$data = array('id'=>1001,'email'=>'110@qq.com','content'=>'test');
$result = $connect->send('email',json_encode($data)); 

咱們在MQ後臺能夠查看消息詳細 
這裏寫圖片描述dom

上面的代碼到這裏,還不夠完美。若是咱們服務器重啓了activemq,沒有處理的消息會丟失。 
這個時候咱們須要用到send()方法的第三個參數。tcp

//消息持久化 persistent爲true,字符串的'true'
$result = $connect->send('email',json_encode($data),array('persistent'=>'true'));

前面咱們完成了『發送』

給mq服務器發送消息(email消息)。 
那麼在mq的隊列中的任務,又是怎麼處理的呢?優化

<?php

require __DIR__.'/vendor/autoload.php'; //引入自動加載的文件

$connect = new \FuseSource\Stomp\Stomp('tcp://10.211.55.13/:61613');
$connect->connect();

//訂閱隊列消息
$connect->subscribe('email');

if ($connect->hasFrameToRead()){
    $frame = $connect->readFrame();
    print_r($frame);
}

在mq服務端,訂閱(監聽)隊列消息。 
在服務端是命令行下執行:php mqServer.php 
若是有沒有處理的消息,能夠讀取出來,打印結果以下:ui

FuseSource\Stomp\Frame Object
(
    [command] => MESSAGE
    [headers] => Array ( [expires] => 0 [destination] => /queue/email [priority] => 4 [message-id] => ID:localhost.localdomain-38488-1488196907415-3:2:-1:1:1 [timestamp] => 1489477647931 ) [body] => {"id":1001,"email":"110@qq.com","content":"test"} )

body就把咱們發送的內容讀取出來了。

咱們循環讀取(死循環)一直等待新消息:

do{
    if ($connect->hasFrameToRead()){
        $frame = $connect->readFrame();
        print_r($frame->body);
    }
} while (true);

處理消息以後(在發送郵件等業務完成以後),要通知mq我處理了該條消息了

    if ($connect->hasFrameToRead()){
        $frame = $connect->readFrame();
        //print_r($frame->body);

        //作業務邏輯
        //具體發送郵件的業務
        //send email

        //最後通知mq,咱們已經處理了該條消息
        $connect->ack($frame);
    }

咱們還能夠在優化一下代碼,解決死循環,控制循環(這裏是一種方案演示)

do{
    //會等待,直到有可用消息,才執行後面代碼
    if ($connect->hasFrameToRead()){
        $frame = $connect->readFrame();
        //print_r($frame->body);

        //作業務邏輯
        //具體發送郵件的業務
        //send email
        sleep(2); //模擬延時

        //最後通知mq,咱們已經處理了該條消息
        $connect->ack($frame);
    }

    //控制循環
    $next = true;
    if (file_exists(__DIR__.'/stop')){
        //若是有名爲stop的一個文件
        //就不循環了
        $next = false;
    }
} while ($next);
相關文章
相關標籤/搜索