消息隊列ActiveMQ初步

安裝ActiveMQ

官網地址:http://activemq.apache.org/php

解壓後基本目錄結構:html

  • bin存放的是腳本文件
  • conf存放的是基本配置文件
  • data存放的是日誌文件
  • docs存放的是說明文檔
  • examples存放的是簡單的實例
  • lib存放的是activemq所需jar包
  • webapps用於存放項目的目錄

進入 bin 目錄:前端

./activemq star # 啓動activeMQ服務
./activemq stop # 關閉activeMQ服務

ActiveMQ 默認啓動時,啓動了內置的 jetty 服務器,提供一個用於監控 ActiveMQ 的 admin 應用(默認端口爲8161,默認帳號密碼都是admin):git

PHP經過STOMP協議鏈接ActiveMQ

STOMP 是一個簡單的可互操做的協議,被用於經過中間服務器在客戶端之間進行異步消息傳遞。它定義了一種在客戶端與服務端進行消息傳遞的文本格式。github

STOMP 是基於幀的協議。以 command 字符串開始,以 EOL 結束,command 下面是0個或多個 <key>:<value> 格式的 header 條目。每一個條目由 EOL 結束。一個空白行(即額外EOL)表示 header 結束和 body 開始。body 鏈接着 NULL 字節(ASCII 中用 ctrl+@表示,看起來是 ^@)。web

大概的協議格式:apache

CONNECT
accept-version:1.0,1.1,2.0
host:www.jtthink.com

^@

STOMP 1.2規範:https://stomp.github.io/stomp-specification-1.2.htmljson

PHP擴展——STOMP安裝

下載地址:http://pecl.php.net/package/stomp服務器

$ wget http://pecl.php.net/get/stomp-2.0.2.tgz
$ tar zxf stomp-2.0.2.tgz
$ cd stomp-2.0.2
$ phpize 
$ ./configure --enable-stomp --with-php-config=/usr/local/php/bin/php-config
$ make && make install

完成後能夠在結果中看見 extension 安裝路徑,在 php.ini 中添加節點:app

[stomp]
extension=/usr/local/php/lib/php/extensions/no-debug-non-zts-20170718/stomp.so

驗證安裝結果:

php -m | grep Stomp

手動建立一個測試隊列

進入管理後臺,建立一個新的 Queue:test

並在該 test 隊列中發送幾條消息

PHP鏈接並獲取消息

<?php
//61613是STOMP鏈接默認的端口,在ActiveMQ目錄conf/activemq.xml文件可修改
$broker = 'tcp://ActiveMQ服務IP:61613';
$queue  = '/queue/test';

try {
    $stomp = new Stomp($broker);
    $stomp->subscribe($queue);

    while($stomp->hasFrame()) {
    //訂閱一個消息隊列
        $frame = $stomp->readFrame();
    //輸出消息體內容
        echo $frame->body.PHP_EOL;
    }
} catch(StompException $e) {
    echo $e->getMessage();
}

運行上面的代碼:

PHP+ActiveMQ 多步驟業務流程處理

以最簡單的用戶註冊爲例,當用戶提交註冊時,分別向部署好的多個 ActiveMQ 中間站發送消息,來處理不一樣的業務流程(如信息入庫、驗證短信發送等)。

前端註冊頁面:

<?php
    if(isset($_POST["username"]))
    {    
        $broker = 'tcp://ActiveMQ服務IP:61613';
        $queue1 = '/queue/userreg';        // 用戶數據入庫隊列
        $queue2 = '/queue/usersmsg';      // 用戶短信發送隊列
        
        // 模擬數據
        $userID = rand(50,500);
        
        $user = new stdClass();
        $user->userID = $userID;
        $user->userName = $_POST["username"];
        $user->userPass = $_POST["userpass"];
        $user->regDate = date('Y-m-d h:i:s');
        
        $msg = new stdClass();
        $msg->userID = $userID;
    
        // 開啓事務發送消息
        $stomp = new Stomp($broker,"txl");
        $stomp->begin('userReg');
        if($stomp->send($queue1, json_encode($user), array('transaction'=>'userReg')) && $stomp->send($queue2, json_encode($msg), array('transaction'=>'userReg'))) {
            $stomp->commit('userReg');    // 提交事務
        }
        
        unset($stomp);
    }
?>
<html>
<head>
<style>
 .container{margin:50px auto;width:500px;}
  .container div{line-height:21pt;margin-top:30px}
    .container .text{width:150px;height:25px;}
</style>
</head>
<body>
  <div class="container">
  <form method="post">
  <h3>用戶註冊演示界面</h3>
 <div>
  用戶名:<input type="text" class="text" name="username"/>
 </div>
  <div>&nbsp;&nbsp;&nbsp;&nbsp;碼:<input type="text" class="text" name="userpass"/>
 </div>
 <div>
   <input type="submit" value="提交註冊">
 </div>
 </form>
  </div>
</body>
</html>

用戶信息入庫中間站:

<?php
$broker = 'tcp://ActiveMQ服務IP:61613';
$queue  = '/queue/userreg';

try {
    $stomp = new Stomp($broker,'txl');
    // 訂閱 userreg 隊列
    $stomp->subscribe($queue);

    while(true) {
        if($stomp->hasFrame()) {
            $frame = $stomp->readFrame();
            $userObj = json_decode($frame->body);

            // 這裏調用用戶數據入庫接口

            echo $frame->body.' user reg is done...'.PHP_EOL;
            $stomp->ack($frame);    // 該條記錄已處理完畢
        }
    }    
} catch(StompException $e) {
    echo $e->getMessage();
}

用戶短信發送中間站:

<?php
$broker = 'tcp://ActiveMQ服務IP:61613';
$queue = '/queue/usersmsg';    

try {
    $stomp = new Stomp($broker,'txl');
    // 訂閱 short message 隊列
    $stomp->subscribe($queue);

    while(true) {
        if($stomp->hasFrame()) {
            $frame = $stomp->readFrame();
            $userObj = json_decode($frame->body);
            
            // 這裏調用用戶短信發送接口

            echo $frame->body.' user short message is done...'.PHP_EOL;
            $stomp->ack($frame);    // 該條記錄已處理完畢
            
        }
    }    
} catch(StompException $e) {
    echo $e->getMessage();
}

測試結果:

ActiveMQ 中 queue、topic 的區別

  • queue(點對點):一個消息生產者發送的消息,只能有一個訂閱者接收。若是發佈一條消息沒有消費者閱讀,消息會保存起來,直至有消費者訂閱。
  • topic(發佈/訂閱):一對多發佈,一條消息能夠多個消費者訂閱。沒有訂閱的,就無法接收以前的消息 。

參考:《ActiveMQ——activemq的詳細說明,queue、topic的區別》

相關文章
相關標籤/搜索