官網地址:http://activemq.apache.org/php
解壓後基本目錄結構:html
進入 bin 目錄:前端
./activemq star # 啓動activeMQ服務 ./activemq stop # 關閉activeMQ服務
ActiveMQ 默認啓動時,啓動了內置的 jetty 服務器,提供一個用於監控 ActiveMQ 的 admin 應用(默認端口爲8161,默認帳號密碼都是admin):git
STOMP 是一個簡單的可互操做的協議,被用於經過中間服務器在客戶端之間進行異步消息傳遞。它定義了一種在客戶端與服務端進行消息傳遞的文本格式。github
STOMP 是基於幀的協議。以 command 字符串開始,以 EOL 結束,command 下面是0個或多個 <key>:<value> 格式的 header 條目。每一個條目由 EOL 結束。一個空白行(即額外EOL)表示 header 結束和 body 開始。body 鏈接着 NULL 字節(ASCII 中用 ctrl+@表示,看起來是 ^@)。web
大概的協議格式:apache
CONNECT accept-version:1.0,1.1,2.0 host:www.jtthink.com ^@
STOMP 1.2規範:https://stomp.github.io/stomp-specification-1.2.htmljson
下載地址:http://pecl.php.net/package/stomp服務器
$ wget http://pecl.php.net/get/stomp-2.0.2.tgz $ tar zxf stomp-2.0.2.tgz $ cd stomp-2.0.2 $ phpize $ ./configure --enable-stomp --with-php-config=/usr/local/php/bin/php-config $ make && make install
完成後能夠在結果中看見 extension 安裝路徑,在 php.ini 中添加節點:app
[stomp] extension=/usr/local/php/lib/php/extensions/no-debug-non-zts-20170718/stomp.so
驗證安裝結果:
php -m | grep Stomp
進入管理後臺,建立一個新的 Queue:test
並在該 test 隊列中發送幾條消息
<?php //61613是STOMP鏈接默認的端口,在ActiveMQ目錄conf/activemq.xml文件可修改 $broker = 'tcp://ActiveMQ服務IP:61613'; $queue = '/queue/test'; try { $stomp = new Stomp($broker); $stomp->subscribe($queue); while($stomp->hasFrame()) { //訂閱一個消息隊列 $frame = $stomp->readFrame(); //輸出消息體內容 echo $frame->body.PHP_EOL; } } catch(StompException $e) { echo $e->getMessage(); }
運行上面的代碼:
以最簡單的用戶註冊爲例,當用戶提交註冊時,分別向部署好的多個 ActiveMQ 中間站發送消息,來處理不一樣的業務流程(如信息入庫、驗證短信發送等)。
前端註冊頁面:
<?php if(isset($_POST["username"])) { $broker = 'tcp://ActiveMQ服務IP:61613'; $queue1 = '/queue/userreg'; // 用戶數據入庫隊列 $queue2 = '/queue/usersmsg'; // 用戶短信發送隊列 // 模擬數據 $userID = rand(50,500); $user = new stdClass(); $user->userID = $userID; $user->userName = $_POST["username"]; $user->userPass = $_POST["userpass"]; $user->regDate = date('Y-m-d h:i:s'); $msg = new stdClass(); $msg->userID = $userID; // 開啓事務發送消息 $stomp = new Stomp($broker,"txl"); $stomp->begin('userReg'); if($stomp->send($queue1, json_encode($user), array('transaction'=>'userReg')) && $stomp->send($queue2, json_encode($msg), array('transaction'=>'userReg'))) { $stomp->commit('userReg'); // 提交事務 } unset($stomp); } ?> <html> <head> <style> .container{margin:50px auto;width:500px;} .container div{line-height:21pt;margin-top:30px} .container .text{width:150px;height:25px;} </style> </head> <body> <div class="container"> <form method="post"> <h3>用戶註冊演示界面</h3> <div> 用戶名:<input type="text" class="text" name="username"/> </div> <div> 密 碼:<input type="text" class="text" name="userpass"/> </div> <div> <input type="submit" value="提交註冊"> </div> </form> </div> </body> </html>
用戶信息入庫中間站:
<?php $broker = 'tcp://ActiveMQ服務IP:61613'; $queue = '/queue/userreg'; try { $stomp = new Stomp($broker,'txl'); // 訂閱 userreg 隊列 $stomp->subscribe($queue); while(true) { if($stomp->hasFrame()) { $frame = $stomp->readFrame(); $userObj = json_decode($frame->body); // 這裏調用用戶數據入庫接口 echo $frame->body.' user reg is done...'.PHP_EOL; $stomp->ack($frame); // 該條記錄已處理完畢 } } } catch(StompException $e) { echo $e->getMessage(); }
用戶短信發送中間站:
<?php $broker = 'tcp://ActiveMQ服務IP:61613'; $queue = '/queue/usersmsg'; try { $stomp = new Stomp($broker,'txl'); // 訂閱 short message 隊列 $stomp->subscribe($queue); while(true) { if($stomp->hasFrame()) { $frame = $stomp->readFrame(); $userObj = json_decode($frame->body); // 這裏調用用戶短信發送接口 echo $frame->body.' user short message is done...'.PHP_EOL; $stomp->ack($frame); // 該條記錄已處理完畢 } } } catch(StompException $e) { echo $e->getMessage(); }
測試結果: