基於swoole與php協程實現異步非阻塞IO

本文協程調度的實現參考了有讚的zanphp實現:zanphp.io/php

本身寫了一個簡單的swoole+php協程的框架:github.com/neuSnail/Pc… 有興趣的能夠看看,很不成熟歡迎指教。html

Pcs是我參考zanphp作的畢業設計,和zan不一樣的是zan本身寫了一個zan拓展代替swoole來實現eventloop,eventchain等。而pcs選擇繼續使用swoole,使用異步swoole_task來實現異步,對比zan複雜度較低雜易於理解。mysql

關於什麼是協程以及php基於generator的協程是怎麼實現的這裏不作詳細解釋,不瞭解的同窗能夠參考laruence的這篇文章:www.laruence.com/2015/...git

在許多文章中能夠看到這樣的描述:github

「協程能夠在遇到阻塞的時候中斷主動讓渡資源,調度程序選擇其餘的協程運行。從而實現非阻塞IO」 然而php是不支持原生協程的,遇到阻塞時如不交由異步進程來執行是沒有任何意義的,代碼仍是同步執行的,以下所示:sql

function foo() {
    $db=new Db();
    $result=(yield $db->query());
    yield $result;
}
複製代碼

上面的數據庫查詢操做是阻塞的,當調度器調度該協程到這一步時發現執行了阻塞操做,此時調度器該怎麼辦?選擇其他協程執行?那該協程的阻塞操做又該什麼時候執行,交由誰執行呢?因此說在php協程中拋開異步調用談非阻塞IO屬於耍流氓。 而swoole的異步task提供了一個實現異步的解決方案,關於swoole_task能夠參考官方文檔https://wiki.swoole.com/wiki/...數據庫

核心功能實現

  • 將一次請求造成一個協程

首先建立一個swoole_server並設置回調segmentfault

class HttpServer implements Server {
    private $swooleHttpServer;

    public function __construct(\swoole_http_server $swooleHttpServer) {
        $this->swooleHttpServer = $swooleHttpServer;
    }

    public function start() {
        $this->swooleHttpServer->on('start', [$this, 'onStart']);
        $this->swooleHttpServer->on('shutdown', [$this, 'onShutdown']);

        $this->swooleHttpServer->on('workerStart', [$this, 'onWorkerStart']);
        $this->swooleHttpServer->on('workerStop', [$this, 'onWorkerStop']);
        $this->swooleHttpServer->on('workerError', [$this, 'onWorkerError']);
        $this->swooleHttpServer->on('task', [$this, 'onTask']);
        $this->swooleHttpServer->on('finish', [$this, 'onFinish']);


        $this->swooleHttpServer->on('request', [$this, 'onRequest']);

        $this->swooleHttpServer->start();
    }
複製代碼

onRequest方法:bash

public function onRequest(\swoole_http_request $request, \swoole_http_response $response) {
        $requestHandler = new RequestHandler($request, $response);
        $requestHandler->handle();
    }
複製代碼

在ReqeustHandler中執行handle方法,來解析請求的路由,並建立控制器,調用相應的方法,相關實現這裏再也不贅述。swoole

public function handle() {
        $this->context = new Context($this->request, $this->response, $this->getFd());
        $this->router = new Router($this->request);

        try {
            if (false === $this->router->parse()) {
                $this->response->output('');
                return;
            }
            $coroutine = $this->doRun();
            $task = new Task($coroutine, $this->context);
            $task->run();
        } catch (\Exception $e) {
            PcsExceptionHandler::handle($e, $this->response);
        }
    }
    
 private function doRun() {
        $ret = (yield $this->dispatch());
        yield $this->response->send($ret);
    }
複製代碼

上面代碼中的coroutine就是一次請求封裝成的協程,doRun方法中的ret是controller->action()的調用結果,yield $this->response->send($ret);是向對客戶端請求的應答。

$coroutine是這一次請求造成的一個協程(Genetator對象),包含了整個請求的流程,接下來就要對這個協程進行調度來獲取真正的執行結果。

  • 協程調度
namespace Pcs\Coroutine;

use Pcs\Network\Context\Context;

class Task {
    private $coroutine;
    private $context;
    private $status;
    private $scheduler;
    private $sendValue;

    public function __construct(\Generator $coroutine, Context $context) {
        $this->coroutine = $coroutine;
        $this->context = $context;
        $this->scheduler = new Scheduler($this);

    }

    public function run() {
        while (true) {
            try {
                $this->status = $this->scheduler->schedule();
                switch ($this->status) {
                    case TaskStatus::TASK_WAIT:
                        echo "task status: TASK_WAIT\n";
                        return null;

                    case TaskStatus::TASK_DONE:
                        echo "task status: TASK_DONE\n";
                        return null;

                    case TaskStatus::TASK_CONTINUE;
                        echo "task status: TASK_CONTINUE\n";
                        break;
                }

            } catch (\Exception $e) {
                $this->scheduler->throwException($e);
            }
        }
    }
    public function setCoroutine($coroutine) {
        $this->coroutine = $coroutine;
    }

    public function getCoroutine() {
        return $this->coroutine;
    }

    public function valid() {
        if ($this->coroutine->valid()) {
            return true;
        } else {
            return false;
        }
    }

    public function send($value) {
        $this->sendValue = $value;
        $ret = $this->coroutine->send($value);
        return $ret;
    }

    public function getSendVal() {
        return $this->sendValue;
    }
}
複製代碼

Task依賴於Generator對象$coroutine,在Task類中定義了一些get/set方法,以及一些Generator的方法,Task::run()方法用來執行對協程的調度,調度行爲由Schedule來執行,每次調度都會返回當前此次調度的狀態。laruence的文章以及大部分網上的資料都是多個協程共用一個調度器,而這裏run方法會爲每一個協程建立一個調度器,緣由是每一個協程都是一個客戶端的請求,使用一個單獨的調度器能減小相互間的影響,並且多個協程之間的調度順序是swoole來處理的,這裏的調度器不用關心。下面給出調度的代碼:

namespace Pcs\Coroutine;

class Scheduler {
    private $task;
    private $stack;
    const SCHEDULE_CONTINUE = 10;

    public function __construct(Task $task) {
        $this->task = $task;
        $this->stack = new \SplStack();
    }
    
    public function schedule() {
        $coroutine = $this->task->getCoroutine();
        $value = $coroutine->current();

        $status = $this->handleSystemCall($value);
        if ($status !== self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handleStackPush($value);
        if ($status !== self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handleAsyncJob($value);
        if ($status !== self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handelYieldValue($value);
        if ($status !== self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handelStackPop();
        if ($status !== self::SCHEDULE_CONTINUE) return $status;


        return TaskStatus::TASK_DONE;
    }

    public function isStackEmpty() {
        return $this->stack->isEmpty();
    }

    private function handleSystemCall($value) {
        if (!$value instanceof SystemCall) {
            return self::SCHEDULE_CONTINUE;
        }
    }

    private function handleStackPush($value) {
        if (!$value instanceof \Generator) {
            return self::SCHEDULE_CONTINUE;
        }

        $coroutine = $this->task->getCoroutine();
        $this->stack->push($coroutine);
        $this->task->setCoroutine($value);

        return TaskStatus::TASK_CONTINUE;
    }

    private function handleAsyncJob($value) {
        if (!is_subclass_of($value, Async::class)) {
            return self::SCHEDULE_CONTINUE;
        }

        $value->execute([$this, 'asyncCallback']);

        return TaskStatus::TASK_WAIT;
    }

    public function asyncCallback($response, $exception = null) {
        if ($exception !== null
            && $exception instanceof \Exception
        ) {
            $this->throwException($exception, true);
        } else {
            $this->task->send($response);
            $this->task->run();
        }
    }

    private function handelYieldValue($value) {
        if (!$this->task->valid()) {
            return self::SCHEDULE_CONTINUE;
        }

        $ret = $this->task->send($value);
        return TaskStatus::TASK_CONTINUE;
    }


    private function handelStackPop() {
        if ($this->isStackEmpty()) {
            return self::SCHEDULE_CONTINUE;
        }

        $coroutine = $this->stack->pop();
        $this->task->setCoroutine($coroutine);

        $value = $this->task->getSendVal();
        $this->task->send($value);

        return TaskStatus::TASK_CONTINUE;
    }

    public function throwException($e, $isFirstCall = false) {
        if ($this->isStackEmpty()) {
            $this->task->getCoroutine()->throw($e);
            return;
        }

        try {
            if ($isFirstCall) {
                $coroutine = $this->task->getCoroutine();
            } else {
                $coroutine = $this->stack->pop();
            }

            $this->task->setCoroutine($coroutine);
            $coroutine->throw($e);

            $this->task->run();
        } catch (\Exception $e) {
            $this->throwException($e);
        }
    }
}
複製代碼

Scheduler中的schedule方法會獲取當前Task的協程,並經過current()方法獲取當前中斷點的返回值,接着依次調用5個方法來對返回值進行處理。 1:handleSystemCall 若是返回的值是SystemCall類型的對象,則執行系統調用,如killTask之類的操做,systemCall是第一優先級。 2:handleStackPush 在A函數中調用B函數,則B函數稱爲A函數的子例程(子函數),然而在協程中卻不能像普通函數那樣調用。

function funcA() {
    return funcB();
}

function genA() {
    yield genB();
}
複製代碼

在funcA中funcB();會返回funcB的執行結果,可是在genA中,yield genB();會返回一個Generator對象,而不是genB的最終執行結果。想獲得genB的執行結果須要對genB進行調度,而genB中又可能有genC()genD()的協程嵌套,因此爲了讓協程像函數一眼正常調用,這裏使用協程棧來實現。

coroutine1
如上圖,當調度器獲取到GenA(父協程)的返回值is instance of Generator時,調度器會把父協程push到stack中,而後把子協程分配給Task,繼續調度子協程。如此反覆直到最後一個子協程返回,而後開始pop,將stack中的協程依次取出,接下來會在handleStackPop裏詳細說明。 3:handleAsyncJob handleAsyncJob是整個協程調度的核心

private function handleAsyncJob($value)
    {
        if (!is_subclass_of($value, Async::class)) {
            return self::SCHEDULE_CONTINUE;
        }

        $value->execute([$this, 'asyncCallback']);

        return TaskStatus::TASK_WAIT;
    }

    public function asyncCallback($response, $exception = null)
    {
        if ($exception !== null
            && $exception instanceof \Exception
        ) {
            $this->throwException($exception, true);
        } else {
            $this->task->send($response);
            $this->task->run();
        }
    }
複製代碼

當協程調度的返回值是繼承了Async的子類或者是實現了Asycn接口的實例的時候,會執行Async的execute方法。這裏用mysqli數據庫查詢類舉例。

public function execute(callable $callback)
    {
        $this->callback = $callback;
        $serv = ServerHolder::getServer();
        $serv->task($this->sql, -1, [$this, 'queryReady']);

    }

    public function queryReady(\swoole_http_server $serv, $task_id, $data)
    {
        $queryResult = unserialize($data);
        $exception = null;
        if ($queryResult->errno != 0) {

            $exception = new \Exception($queryResult->error);
        }
        call_user_func_array($this->callback, [$queryResult, $exception]);
    }

複製代碼

execute方法接收一個函數做爲該異步操做完成以後的回調函數,在Mysqli類中的execute方法中,啓動了一個異步swoole_task,將sql操做交給swoole_task異步執行,在執行結束後會執行queryReady方法,該方法在解析異步返回數據以後執行$this->callback()也就是以前在調度器中傳入的 asyncCallback方法,該方法在檢測異常以後會執行send()方法將異步執行的結果發送到中斷處,繼續執行。 handleAsyncJob不會等待異步操做的返回結果,而是直接返回TASK_WAIT信號,回到上面的Task->run()方法能夠看到TASK_WAIT信號會致使run()方法返回null,釋放當前worker,調度流程圖以下圖所示,(segmentfault不支持圖片縮放也不支持html語法是真的難受)

coroutine2

4:handleYieldValue

private function handelYieldValue($value)
    {
        if (!$this->task->valid()) {
            return self::SCHEDULE_CONTINUE;
        }

        $ret = $this->task->send($value);
        return TaskStatus::TASK_CONTINUE;
    }

複製代碼

若是某次yield的返回值既不是異步調用也不是Generator,那麼判斷當前的generator是不是valid(是否執行完)若是執行完畢,繼續調度,執行下面的handleStackPush方法,不然的話返回Task_Continue繼續調度,也就是說在一個generator中屢次yield,最後只會取最後一次yield的返回值。 5:handleStackPush 當上一步中判斷!$this->task->valid()也就是當前生成器執行完畢的時候,會執行本方法來控制以前的協程stack進行pop操做,首先檢查Stac是不是非空,非空的話pop出一個父協程,並將當前協程的返回值send()到父協程中斷出繼續執行。

協程優點在哪裏

當一次請求遇到IO的時候,同步操做會致使當前請求阻塞在IO處等待IO返回,體如今swoole上就是一個請求一直佔用一個worker。

coroutine3

可是當使用了協程調度以後,用戶能夠在阻塞的地方經過yield手動中斷,交由swoole_task去異步操做,同時釋放worker佔用來處理其餘請求。 當異步處理執行結束後再繼續調度。

coroutine4

注意php的協程只負責中斷,異步操做是Swoole_task作的

相關文章
相關標籤/搜索