rabbit mq的一個實例,異步功能

簡單的使用場景:消息隊列的場景有:解耦,異步,削峯。php

此例用的場景,異步shell

有時候會有請求消耗時間過長,不能老讓用戶等待返回結果,能夠用消息隊列來作異步實現,以前用過workmain等相似的異步,但不如rabbit強大,數據庫

一個實際場景:
咱們作的一個搶任務功能,成功或失敗會有一個日誌記錄,這個功能不必實時返回,能夠使用異步,json

框架tp5,一個任務執行程序consumer,多個任務發佈入口producer,用定時任務監控consumer是否掛掉,掛掉則從新啓動,並記錄日誌掛掉的緣由(有時候會數據庫鏈接失敗。。)bash

producer的代碼:

//存儲任務$data 字符串
function savetask($data){框架

//做爲任務發佈者,
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue_log', false, true, false, false);
//$data = "work1";
$data = json_encode($data);
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue_log');

//echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();
}異步

consumer的代碼

//執行任務
function runwork(){函數

//做爲worker執行任務
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue_log', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
$this->dowork($msg);
//$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue_log', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();
}this

回調函數,操做具體任務,好比根據不一樣的類型執行不一樣的函數,存儲日誌
function dowork($msg){
echo " [x] Received ", $msg->body, "\n";
//sleep(substr_count($msg->body, '.'));
//處理程序
$data = json_decode($msg->body,true);
switch ($data['type']){
case 'union_repeat':
$this->union_repeat($data['data']);
break;
case 'union_active':
$this->union_active($data);
break;
case 'union_click':
$this->union_click($data);
break;
case 'task_active':
$this->task_active($data);
break;
case 'task_click':
$this->task_click($data);
break;
case 'task_repeat':
$this->task_repeat($data);
break;

}

echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

}spa

consumer的執行保證

用shell檢查進程是否存在,不存在則開啓

#!/bin/sh
ps -fe|grep runrabbitmqc |grep -v grep
if [ $? -ne 0 ]
then
sh /home/wwwroot/server/runrabbit.sh
else
echo "run....."
fi

runrabbit.sh的代碼:

#!/bin/bash
cd /home/wwwroot/server
php think runrabbitmqc


任務發佈入口producer主要功能就是將須要記錄的日誌發佈到消息隊列,而消費者接受到消息則執行存儲日誌到數據庫

相關文章
相關標籤/搜索