php 事務處理,ActiveMQ的發送消息,與處理消息 php ActiveMQ的發送消息,與處理消息

能夠經過鏈式發送->處理->發送。。。的方式處理相似事務型業務邏輯

 

好比 發送一個註冊消息,消息隊列處理完註冊之後,緊接着發送一個新手優惠券贈送,贈送完再發一個其它後續邏輯處理的消息等待後續隊列處理php

 

 

php ActiveMQ的發送消息,與處理消息

咱們以一個簡單的用戶註冊爲例,當用戶點擊註冊按鈕後,咱們發送一個消息,後臺php接收到該消息而後處理。html

1.php代碼以下:前端

1
2
3
4
5
6
7
8
9
<?php
$stomp  new  Stomp( 'tcp://192.168.1.222:61613' );
 
$obj  new  Stdclass();
//下面這些數據,實際中是用戶經過前端頁面post來的,這裏只作演示
$obj ->username =  'test' ;
$obj ->password =  '123456' ;
//發送一個註冊消息到隊列,咱們這裏模擬用戶註冊
$stomp ->send( '/queue/userReg' , json_encode( $obj ));

2.php代碼以下:數據庫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
$stomp  new  Stomp( 'tcp://192.168.1.222:61613' );
//訂閱只對一個有效,若是啓動多個腳本,只有一個會接收到消息
$stomp ->subscribe( '/queue/userReg' );
 
while (true) {
     //判斷是否有讀取的信息
     if ( $stomp ->hasFrame()) {
         $frame  $stomp ->readFrame();
 
         $data  = json_decode( $frame ->body, true);
         var_dump( $data );
 
         //咱們經過獲取的數據
         //處理相應的邏輯,好比存入數據庫,發送驗證碼等一系列操做。
         //$db->query("insert into user values('{$username}','{$password}')");
         //sendVerify();
 
         //表示消息被處理掉了,ack()函數很重要
         $stomp ->ack( $frame );
     }
     sleep(1);
}

分別運行上面兩個腳本文件json

1
2
> /data/php56/bin/php 1.php
> /data/php56/bin/php 2.php

咱們還能夠把上面的2.php代碼分紅多步執行。tcp

2.php代碼以下:函數

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
$stomp  new  Stomp( 'tcp://192.168.1.222:61613' );
$stomp ->subscribe( '/queue/userReg' );
 
while (true) {
     //判斷是否有讀取的信息
     if ( $stomp ->hasFrame()) {
         $frame  $stomp ->readFrame();
 
         $data  = json_decode( $frame ->body, true);
 
         //註冊信息入庫
         //$ret = db->query("insert into user values('{$data['username']}', '{$data['password']}')");
         //這裏演示直接設成true了
         $ret  = true;
         if ( $ret ) {
             echo  $data [ 'username' ],  '入庫成功' , PHP_EOL;
             //若是入庫成功,再次把數據發送到另外一個消息隊列中,進行下一步處理
             $stomp ->send( '/queue/sendVerify' $frame ->body);
 
             $stomp ->ack( $frame );
         }
     }
     sleep(1);
}

3.php代碼以下:post

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?php
$stomp  new  Stomp( 'tcp://192.168.1.222:61613' );
$stomp ->subscribe( '/queue/sendVerify' );
 
while (true) {
     //判斷是否有讀取的信息
     if ( $stomp ->hasFrame()) {
         $frame  $stomp ->readFrame();
 
         $data  = json_decode( $frame ->body, true);
 
         //$ret = sendVerify()發送驗證碼,實際中應該是請求某接口
         $ret  = true;
         if ( $ret ) {
             echo  $data [ 'username' ],  '發送驗證碼成功' , PHP_EOL;
 
             $stomp ->ack( $frame );
         }
     }
     sleep(1);
}

再次分別運行上面的三個腳本url

1
2
3
> /data/php56/bin/php 1.php
> /data/php56/bin/php 2.php
> /data/php56/bin/php 3.php

相關文章
相關標籤/搜索