php 利用activeMq+stomp實現消息隊列

php 利用activeMq+stomp實現消息隊列

1、activeMq概述

  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。php

2、特性列表

⒈ 多種語言和協議編寫客戶端。語言: Java,C,C++,C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP, AMQP
⒉ 徹底支持 JMS1.1和 J2EE 1.4規範 (持久化,XA消息, 事務)
⒊ 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性
⒋ 經過了常見J2EE 服務器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上
⒌ 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持經過JDBC和journal提供高速的消息持久化
⒎ 從設計上保證了高性能的集羣,客戶端- 服務器,點對點
⒏ 支持 Ajax
⒐ 支持與Axis的整合
⒑ 能夠很容易的調用內嵌JMS provider,進行測試

3、運行環境

  在本地或者服務器中安裝activeMq,以Windows爲例:java

A、 windows下部署apache

ActiveMQ部署其實很簡單,和全部Java同樣,要跑java程序就必須先安裝JDK並配置好環境變量,這個很簡單。json

而後解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,獲得解壓後的目錄結構以下圖:windows

進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操做系統的啓動腳本。瀏覽器

個人實驗環境是windowsXP,就進入win32目錄,會看到以下目錄結構。服務器

其中activemq.bat即是啓動腳本,雙擊啓動。app

ActiveMQ默認啓動到8161端口,啓動完了後在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼爲admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼後即可看到以下圖的ActiveMQ控制檯界面了。tcp

4、生產消息

  本文使用stomp協議實現mq隊列 ,項目中要加載stomp 包;  ide

  在須要使用隊列的地方,導入該包:

  

use App\Libraries\Stomp\Stomp;

  將消息放入activeMq中:

  

    protected function sendToMQ($destination, $msg_data, $persistent = false)
    {
        try {
            $con = new Stomp(config('app.mq_url'));
            $con->connect();
            $con->begin("Transaction");
            $con->send($destination, json_encode($msg_data), array('persistent'=> $persistent));
            $con->commit("Transaction");
            $con->disconnect();
        } catch (\Exception $e) {
            app('log')->warn($e->getMessage());
        }
    }

  1.destination是指隊列名稱;

  2.msg_data隊列中存放的數據;

  3.persistent是否同步; 

  4.app.mq_url是指activeMq安裝的服務器地址及端口號,如tcp://localhost:61613;

  通過段代碼以後,咱們就將數據msg_data放到了隊列destination中了;

5、消費消息

  在activeMq中的消息如何消費呢?通常狀況下咱們會創建一個cronjob來定時消費隊列的消息,消費隊列消息主要代碼以下:

        try {
            $this->consumer = new Stomp($this->activemq_uri); //$this->acitvemq_uri就是上面的activeMq地址app.mq_url
            if (!$this->consumer->isConnected()) {
                $this->consumer->connect();
                $this->consumer->setReadTimeout(3);
            }
            app('log')->info($this->log_remark . "connect to active mq success");
        } catch (StompException $e) {
            app('log')->info("connect to active mq failed : " . $e->getMessage());
            die();
        }
        $queue = self::getQueue();//獲得隊列名稱,上面定義的destination
        $this->consumer->subscribe($queue);  //訂閱
        //循環讀幀
        while ($this->consumer->hasFrameToRead()) {
            try {
                $message = $this->consumer->readFrame();
                $data = json_decode($message->body, true);  // 這裏其實就是獲得了上面的隊列消息msg_data
                //驗證數據並更新數據
                $handle_result = self::removeDuplicateExpressInfo($data['traces'], $data['shipping_id']);
                if ($handle_result) {
                    $this->consumer->ack($message);
                }
            } catch (\Exception $e) {
                app('log')->info("handle with message failed : " . $e->getMessage());
                if (!$this->consumer->isConnected()) {
                    $this->consumer->connect();
                } else {
                    break;
                }
            }
        }
        $this->consumer->unsubscribe($queue);    //釋放訂閱
        $this->consumer->disconnect();      //端口鏈接
相關文章
相關標籤/搜索