tp5下基於Swoole通用隊列實現

一【隊列用途】

通常隊列主要用途就是異步任務,用來緩解:php

1)耗時操做,如生成圖片等;html

2)並行任務,如消息下發通知、批量處理任務等。linux

理論上不須要實時返回結果的請求均可以放到隊列裏面執行(固然實際項目中,不可能這麼傻的用隊列),隊列的好處是:thinkphp

1)它能夠快速的返回請求,減小服務端由於等待而產生的壓力,同時不影響業務的處理。數據庫

2)另外在處理大批量的任務時能很好的分散處理壓力。json

二【隊列擴展說明】

一、geaman隊列

公司老的項目一直都是使用geaman做爲隊列服務,蠻穩定的,比較喜歡它,但也有不能忍受的地方:php框架

1)易學習掌握,使用方便。你們能夠先封裝了一個geaman通用工具類,包含server和client,後來的使用者就能夠很方便的再業務中使用;服務器

2)能夠經過命令查看隊列的執行狀況,這對於查bug頗有效;swoole

3)不能肯定異步任務的最終執行狀況,如,插庫任務,丟到隊列若是插庫失敗,調用者並不會知道,這對於須要結果的隊列任務來講會讓人很抓狂。固然這種狀況咱們也有不少迂迴的解決方案,如,數據表作標識符等。網絡

二、swoole隊列

對於swoole,咱們很早就接觸瞭解,它跟php的結合無疑是最佳搭檔,一度有一段時期咱們用了swoole框架來開發,惋惜後面因爲人爲緣由都放棄了;但整體開發體驗仍是很不錯的,包括它的擴展性、給開發人員的自由度、自己性能、特別對於比較底層的功能實現讓人不能罷手,好比異步隊列、異步文件讀寫、網絡通訊部分等;另外,swoole不會像純C寫的php框架那麼重,如phalcon、鳥哥的yaf,純c框架,學習曲線高不說,在出現問題須要調試也讓人煩躁。

回到隊列正題,這裏咱們只用到swoole的異步隊列功能,swoole隊列特色(根據咱們使用狀況的小結):

1)上手快,使用簡單方便,簡單到直接拷貝官方示例就能夠跑起來;但不能輕敵,想要用好swoole的隊列及高級點功能仍是須要費點功夫,根據本身服務器的配置狀況來進行微調。

2)對window開發環境不友好,擴展在linux下安裝方便,不過官方也給出了在window下使用方法。若是你是window環境開發的話,我的建議,在虛擬機弄個linux環境。

3)它提供了每一個隊列執行狀況的回調,很好的解決上面geaman的第3點問題

4)不能查看隊列狀況(即上面geaman的第2點),即你不知道目前隊列是否有任務存在

5)每一個服務隊列須要佔用一個監聽端口,即當你有多個不一樣業務須要跑隊列任務時,你須要swoole server端監聽不一樣的端口,來爲不一樣的業務隊列服務;----通用隊列實現就爲了緩解此問題。

官方環境依賴說明:https://wiki.swoole.com/wiki/page/7.html

想了解更多swoole內容,你們能夠參考官網:http://www.swoole.com/ 

三【swoole的通用隊列實現】

一、swoole隊列 -- Server端啓動代碼

$serv = new Swoole\Server("127.0.0.1", 8888);//監聽本機8888端口

//swoole配置信息

$serv->set([

// 通常設置爲服務器CPU數的1-4倍

'worker_num'      => 12,

// task進程的數量(通常任務都是同步阻塞的,能夠設置爲單進程單線程)

'task_worker_num' => 20,

'daemonize'       => true,

'open_eof_split'  => true,//打開eof_split檢測

'package_eof'     => PHP_EOL,//設置EOF

// 以守護進程執行

//  'task_ipc_mode' => 1,  // 使用unix socket通訊,默認模式

'log_file'        => $this->logFile,

// swoole日誌

// 數據包分發策略(dispatch_mode=1/3時,底層會屏蔽onConnect/onClose事件,

// 緣由是這2種模式下沒法保證onConnect/onClose/onReceive的順序,非請求響應式的服務器程序,請不要使用模式1或3)

//  'dispatch_mode' => 2,        // 固定模式,根據鏈接的文件描述符分配worker。這樣能夠保證同一個鏈接發來的數據只會被同一個worker處理

]);

$serv->on('Receive', function($serv, $fd, $from_id, $data) {

$task_id = $serv->task("Async");

    echo "Dispath AsyncTask: id=$task_id\n";

});

$serv->on('Task', function ($serv, $task_id, $from_id, $data) {

    echo "New AsyncTask[id=$task_id]".PHP_EOL;

    $serv->finish("$data -> OK");

});

$serv->on('Finish', function ($serv, $task_id, $data) {

    echo "AsyncTask[$task_id] Finish: $data".PHP_EOL;

});

$serv->start();

二、swoole隊列 -- client代碼

$client = new \swoole_client(SWOOLE_SOCK_TCP);
$client->connect('127.0.0.1', 8888, 1);//與server端對應
$client->send($data.PHP_EOL);

經過以上代碼,就能夠愉快的使用swoole提供的隊列功能;

三、通用隊列實現

1)將以上Server和Client封裝爲各自基類,之後上層的server及client業務均需集成相應的基類,如圖:

 

2)創建一個Server.php類

本代碼基於thinkphp5實現,若是是其餘框架能夠相似實現,具體說明見下面代碼及註釋說明:以異步發郵件作爲例子說明

<?php
/**
 * swoole通用服務
 */
namespace app\console\swooletask;
use app\console\common\ServerCommand;
use think\Log;
//繼承Server基類
class Server extends ServerCommand {

//實際業務的接口地址前綴
    private $swooleTaskPath = '\\app\console\\swooletask\\task\\';
    protected function configure() {
       //此爲tp5特有的cli命名模式 $this->setName('kmads-common-swoole')->setDescription('swoole通用服務');
    }

    public function onReceive($serv, $fd, $from_id, $data) {
        $data = trim($data);
        $datas = json_decode($data, true);

/*注意:核心點都在這裏

* 實現思路:client端經過傳遞不一樣的type來區分業務隊列,而後通

* 過神奇的函數call_user_func_array對接到實際業務logic代碼

*/
        switch($datas['type']) {

    //異步發郵件功能
            case 'Email': {
                $taskClass = $this->swooleTaskPath . 'email\\' . $datas['type'] . 'Task';
                call_user_func_array(array(
                    new $taskClass(),
                    'index'
                ), array(
                    $serv,
                    $datas['data']
                ));
                break;
            }
            case Test: {
                $taskClass = $this->swooleTaskPath . 'wechat\\' . $datas['type'] . 'Task';
                call_user_func_array(array(
                    new $taskClass(),
                    'index'
                ), array(
                    $serv,
                    $datas['data']
                ));
                break;
            }

            default:
                return [
                    'type'   => 'undefided',
                    'status' => false
                ];
        }
    }
    public function onTask($serv, $task_id, $src_worker_id, $data) {
        $className = $data['class'];
        $action    = $data['action'];
        return call_user_func_array(array(
            new $className(),
            $action
        ), array($data['datas']));
    }
}

業務的對應實現接口:EmailTask.php

class EmailTask{
    /**
     * swoole task執行觸發入口
     * @param $serv
     * @param $data
     */
    public function index($serv,$data) {

//添加swoole隊列收到請求數據onReceive的其餘邏輯

//執行swoole隊列的task任務;task與worker的關係見官網文檔
        $serv->task([
            'class'  => '\app\console\swooletask\logic\Email',
            'action' => 'sendEmail',
            'datas'  => $data
        ]);
    }
}

task實際邏輯處理,logic/Email.php -- 這個就是實現實際的發郵件功能,代碼就不貼出來了

目錄結構:你們能夠根據本身的業務結構進行自由組織

 

四、client端調用

$email = [
    'type' => 'Email',//隊列業務類型
    'data' => $data,//你須要傳遞的數據
];
$client = new \ClientCommand();
$client->sendData(json_encode($email));//基類ClientCommand提供的發送數據到server端的方法

四【小結】

一、經過定義type及實現相應type的接口就能夠很方便的擴展一個swoole隊列需求,而不須要單獨監聽一個端口

二、Client端調用,引入ClientCommand 基類便可調用

 五【使用過程坑的記錄】

【坑1】使用swoole跑隊列任務,且隊列裏涉及數據庫操做時,注意進程間數據庫鏈接會被共享,致使數據庫執行失敗或不執行現象

【解決方案1】加了上面那段代碼後,會對每個隊列進程建進程對應的連接,不影響其餘進程,如:

附,官方回覆:https://group.swoole.com/question/106787

相關文章
相關標籤/搜索