swoole實現Timer定時器、心跳檢測及Task進階實例:mysql鏈接池

Table of Contents

環境說明: 系統:Ubuntu14.04 (安裝教程包括CentOS6.5)
PHP版本:PHP-5.5.10
swoole版本:1.7.7-stablephp

1.Timer定時器

在實際應用中,每每會遇到須要每隔一段時間重複作一件事,好比心跳檢測、訂閱消息、數據庫備份等工做。一般,咱們會藉助PHP的time()以及相關函數本身實現一個定時器,或者使用crontab工具來實現。可是,自定義的定時器容易出錯,而使用crontab則須要編寫額外的腳本文件,不管是遷移仍是調試都比較麻煩。
所以,Swoole提供了一個內置的Timer定時器功能,經過函數addtimer便可在Swoole中添加一個定時器,該定時器會在創建以後,按照預先設定好的時間間隔,每到對應的時間就會調用一次回調函數onTimer通知Server。
簡單示例以下:mysql

$this->serv->on('Timer', array($this, 'onTimer'));

    public function onWorkerStart( $serv , $worker_id) {
        // 在Worker進程開啓時綁定定時器
        // 只有當worker_id爲0時才添加定時器,避免重複添加
        if( $worker_id == 0 ) {
            $serv->addtimer(500);
            $serv->addtimer(1000);
            $serv->addtimer(1500);
        }
    }

    public function onTimer($serv, $interval) {
        switch( $interval ) {
            case 500: { // 
                echo "Do Thing A at interval 500\n";
                break;
            }
            case 1000:{
                echo "Do Thing B at interval 1000\n";
                break;
            }
            case 1500:{
                echo "Do Thing C at interval 1500\n";
                break;
            }
        }
    }

能夠看到,在onWorkerStart回調函數中,經過addtimer添加了三個定時器,時間間隔分別爲500、1000、1500。而在onTimer回調中,正好經過間隔的不一樣來區分不一樣的定時器回調,從而執行不一樣的操做。
須要注意的是,在上述示例中,當1000ms的定時器被觸發時,500ms的定時器一樣會被觸發,可是不能保證會在1000ms定時器前觸發仍是後觸發,所以須要注意,定時器中的操做不能依賴其餘定時器的執行結果。git

點此查看完整示例github

(PS:在Swoole-1.7.7版本,新提供了一個after函數, 這個功能的用法會在之後的教程中給出。)sql

2.心跳檢測

上文提到過,使用Timer定時器功能能夠實現發送心跳包的功能。事實上,Swoole已經內置了心跳檢測功能,能自動close掉長時間沒有數據來往的鏈接。而開啓心跳檢測功能,只須要設置heartbeat_check_intervalheartbeat_idle_time便可。以下:數據庫

$this->serv->set(
    array(
        'heartbeat_check_interval' => 60,
        'heartbeat_idle_time' => 600,
    )
);

其中heartbeat_idle_time的默認值是heartbeat_check_interval的兩倍。 在設置這兩個選項後,swoole會在內部啓動一個線程,每隔heartbeat_check_interval秒後遍歷一次所有鏈接,檢查最近一次發送數據的時間和當前時間的差,若是這個差值大於heartbeat_idle_time,則會強制關閉這個鏈接,並經過回調onClose通知Server進程。 點此查看完整示例 小技巧: 結合以前的Timer功能,若是咱們想維持鏈接,就設置一個略小於若是這個差值大於heartbeat_idle_time的定時器,在定時器內向全部鏈接發送一個心跳包。若是收到心跳回應,則判斷鏈接正常,若是沒有收到,則關閉這個鏈接或者再次嘗試發送。json

3.Task進階:MySQL鏈接池

上一章中我簡單講解了如何開啓和使用Task功能。這一節,我將提供一個Task的高級用法。swoole

在PHP中,訪問MySQL數據庫每每是性能提高的瓶頸。而MySQL鏈接池我想你們都不陌生,這是一個很好的提高數據庫訪問性能的方式。傳統的MySQL鏈接池,是預先申請必定數量的鏈接,每個新的請求都會佔用其中一個鏈接,請求結束後再將鏈接放回池中,若是全部鏈接都被佔用,新來的鏈接則會進入等待狀態。
知道了MySQL鏈接池的實現原理,那咱們來看如何使用Swoole實現一個鏈接池。
首先,Swoole容許開啓必定量的Task Worker進程,咱們可讓每一個進程都擁有一個MySQL鏈接,並保持這個鏈接,這樣,咱們就建立了一個鏈接池。
其次,設置swoole的dispatch_mode爲搶佔模式(主進程會根據Worker的忙閒狀態選擇投遞,只會投遞給處於閒置狀態的Worker)。這樣,每一個task都會被投遞給閒置的Task Worker。這樣,咱們保證了每一個新的task都會被閒置的Task Worker處理,若是所有Task Worker都被佔用,則會進入等待隊列。併發

下面直接上關鍵代碼:框架

public function onWorkerStart( $serv , $worker_id) {
    echo "onWorkerStart\n";
    // 斷定是否爲Task Worker進程
    if( $worker_id >= $serv->setting['worker_num'] ) {
        $this->pdo = new PDO(
            "mysql:host=localhost;port=3306;dbname=Test", 
            "root", 
            "123456", 
            array(
                PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8';",
                PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
                PDO::ATTR_PERSISTENT => true
            )
        );
    }
}

首先,在每一個Task Worker進程中,建立一個MySQL鏈接。這裏我選用了PDO擴展。

public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
    $sql = array(
        'sql'=>'select * from Test where pid > ?',
        'param' => array(
            0
        ),
        'fd' => $fd
    );
    $serv->task( json_encode($sql) );
}

其次,在須要的時候,經過task函數投遞一個任務(也就是發起一次SQL請求)

public function onTask($serv,$task_id,$from_id, $data) {
    $sql = json_decode( $data , true );

    $statement = $this->pdo->prepare($sql['sql']);
    $statement->execute($sql['param']);     

    $result = $statement->fetchAll(PDO::FETCH_ASSOC);
    $serv->send( $sql['fd'],json_encode($result));
    return true;
}

最後,在onTask回調中,根據請求過來的SQL語句以及相應的參數,發起一次MySQL請求,並將獲取到的結果經過send發送給客戶端(或者經過return返回給Worker進程)。並且,這樣的一次MySQL請求還不會阻塞Worker進程,Worker進程能夠繼續處理其餘的邏輯。

能夠看到,簡單十幾行代碼,就實現了一個高效的異步MySQL鏈接池。
經過測試,單個客戶端一共發起1W次select請求,共耗時9s;
1W次insert請求,共耗時21s。
(客戶端會在每次收到前一個請求的結果後纔會發起下一次請求,而不是併發)。

點此查看完整服務端代碼
點此查看完整客戶端代碼

4.Task實戰:yii中應用task

在YII框架中結合了swoole 的task 作了異步處理。 本例中 主要用到 一、protected/commands/ServerCommand.php 用來作server。 二、protected/event/下的文件 這裏是在異步中的具體實現。

客戶端調用參照 TestController

<?php
class TestController extends Controller{
    public function actionTT(){
        $message['uid'] = 2;
        $message['email'] = '83212019@qq.com';
        $message['title'] = '接口報警郵件';
        $message['contents'] = "'EmailEvent'接口請求過程出錯! 錯誤信息以下:err_no:'00000' err_msg:'測試隊列' 請求參數爲:'[]'";
        $message['type'] = 2;

        $data['param'] = $message;
        $data['class'] = 'Email';
        $client = new EventClient();
        $data = $client->send($data);
    }
}
?>

有個task表是用來記錄異步任務的。若是失敗重試3次。sql在protected/data/sql.sql裏。
點此查看完整客戶端代碼

下章預告:Swoole多端口監聽、熱重啓以及Timer進階:簡單crontab

相關文章
相關標籤/搜索