ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。php
在本地或者服務器中安裝activeMq,以Windows爲例:java
A、 windows下部署apache
ActiveMQ部署其實很簡單,和全部Java同樣,要跑java程序就必須先安裝JDK並配置好環境變量,這個很簡單。json
而後解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,獲得解壓後的目錄結構以下圖:windows
進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操做系統的啓動腳本。瀏覽器
個人實驗環境是windowsXP,就進入win32目錄,會看到以下目錄結構。服務器
其中activemq.bat即是啓動腳本,雙擊啓動。app
ActiveMQ默認啓動到8161端口,啓動完了後在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼爲admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼後即可看到以下圖的ActiveMQ控制檯界面了。tcp
本文使用stomp協議實現mq隊列 ,項目中要加載stomp 包; ide
在須要使用隊列的地方,導入該包:
use App\Libraries\Stomp\Stomp;
將消息放入activeMq中:
protected function sendToMQ($destination, $msg_data, $persistent = false) { try { $con = new Stomp(config('app.mq_url')); $con->connect(); $con->begin("Transaction"); $con->send($destination, json_encode($msg_data), array('persistent'=> $persistent)); $con->commit("Transaction"); $con->disconnect(); } catch (\Exception $e) { app('log')->warn($e->getMessage()); } }
1.destination是指隊列名稱;
2.msg_data隊列中存放的數據;
3.persistent是否同步;
4.app.mq_url是指activeMq安裝的服務器地址及端口號,如tcp://localhost:61613;
通過段代碼以後,咱們就將數據msg_data放到了隊列destination中了;
在activeMq中的消息如何消費呢?通常狀況下咱們會創建一個cronjob來定時消費隊列的消息,消費隊列消息主要代碼以下:
try { $this->consumer = new Stomp($this->activemq_uri); //$this->acitvemq_uri就是上面的activeMq地址app.mq_url if (!$this->consumer->isConnected()) { $this->consumer->connect(); $this->consumer->setReadTimeout(3); } app('log')->info($this->log_remark . "connect to active mq success"); } catch (StompException $e) { app('log')->info("connect to active mq failed : " . $e->getMessage()); die(); } $queue = self::getQueue();//獲得隊列名稱,上面定義的destination $this->consumer->subscribe($queue); //訂閱 //循環讀幀 while ($this->consumer->hasFrameToRead()) { try { $message = $this->consumer->readFrame(); $data = json_decode($message->body, true); // 這裏其實就是獲得了上面的隊列消息msg_data //驗證數據並更新數據 $handle_result = self::removeDuplicateExpressInfo($data['traces'], $data['shipping_id']); if ($handle_result) { $this->consumer->ack($message); } } catch (\Exception $e) { app('log')->info("handle with message failed : " . $e->getMessage()); if (!$this->consumer->isConnected()) { $this->consumer->connect(); } else { break; } } } $this->consumer->unsubscribe($queue); //釋放訂閱 $this->consumer->disconnect(); //端口鏈接