PHP 多任務協程處理

本文首發於 PHP 多任務協程處理,轉載請註明出處!

上週 有幸和同事一塊兒在 SilverStripe 分享最近的工做事宜。今天我計劃分享 PHP 異步編程,不過因爲上週我聊過 ReactPHP;我決定討論一些不同的內容。因此本文將探討多任務協程這方面的內容。php

另外我還計劃把這個主題加入到我正在籌備的一本 PHP 異步編程的圖書中。雖然這本書相比本文來講會涉及更多細節,但我以爲本文依然具備實際意義!html

那麼,開始吧!
new MyIterator(
generators make code asynchronous without extensionsreact

這就是本文咱們要討論的問題。不過咱們會從更簡單更熟悉的示例開始。git

一切從數組開始

咱們能夠經過簡單的遍從來使用數組:github

$array = ["foo", "bar", "baz"];
 
foreach ($array as $key => $value) {
    print "item: " . $key . "|" . $value . "\n";
}
 
for ($i = 0; $i < count($array); $i++) {
    print "item: " . $i . "|" . $array[$i] . "\n";
}

這是咱們平常編碼所依賴的基本實現。能夠經過遍歷數組獲取每一個元素的鍵名和鍵值。shell

固然,若是咱們但願可以知道在什麼時候可使用數組。PHP 提供了一個方便的內置函數:數據庫

print is_array($array) ? "yes" : "no"; // yes

類數組處理

有時,咱們須要對一些數據使用相同的方式進行遍歷處理,但它們並不是數組類型。好比對 DOMDocument 類進行處理:編程

$document = new DOMDocument();
$document->loadXML("<div></div>");

$elements = $document->getElementsByTagName("div");
print_r($elements); // DOMNodeList Object ( [length] => 1 )

這顯然不是一個數組,可是它有一個 length 屬性。咱們能像遍歷數組同樣,對其進行遍歷麼?咱們能夠判斷它是否實現了下面這個特殊的接口:數組

print ($elements instanceof Traversable) ? "yes" : "no"; // yes

這真的太有用了。它不會致使咱們在遍歷非可遍歷數據時觸發錯誤。咱們僅需在處理前進行檢測便可。promise

不過,這會引起另一個問題:咱們可否讓自定義類也擁有這個功能呢?回答是確定的!第一個實現方法相似以下:

class MyTraversable implements Traversable
{
    //  在這裏編碼...
}

若是咱們執行這個類,咱們將看到一個錯誤信息:

PHP Fatal error: Class MyTraversable must implement interface Traversable as part of either Iterator or IteratorAggregate

Iterator(迭代器)

咱們沒法直接實現 Traversable,可是咱們能夠嘗試第二種方案:

class MyTraversable implements Iterator
{
    //  在這裏編碼...
}

這個接口須要咱們實現 5 個方法。讓咱們完善咱們的迭代器:

class MyTraversable implements Iterator
{
    protected $data;

    protected $index = 0;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function current()
    {
        return $this->data[$this->index];
    }

    public function next()
    {
        return $this->data[$this->index++];
    }

    public function key()
    {
        return $this->index;
    }

    public function rewind()
    {
        $this->index = 0;
    }

    public function valid()
    {
        return $this->index < count($this->data);
    }
}

這邊咱們須要注意幾個事項:

  1. 咱們須要存儲構造器方法傳入的 $data 數組,以便後續咱們能夠從中獲取它的元素。
  2. 還須要一個內部索引(或指針)來跟蹤 currentnext 元素。
  3. rewind() 僅僅重置 index 屬性,這樣 current()next() 才能正常工做。
  4. 鍵名並不是只能是數字類型!這裏使用數組索引是爲了保證示例足夠簡單。

咱們能夠向下面這樣運行這段代碼:

$iterator = new MyTraversable(["foo", "bar", "baz"]);
 
foreach ($iterator as $key => $value) {
    print "item: " . $key . "|" . $value . "\n";
}

這看起來須要處理太多工做,可是這是可以像數組同樣使用 foreach/for 功能的一個簡潔實現。

IteratorAggregate(聚合迭代器)

還記得第二個接口拋出的 Traversable 異常麼?下面看一個比實現 Iterator 接口更快的實現吧:

class MyIteratorAggregate implements IteratorAggregate
{
    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function getIterator()
    {
        return new ArrayIterator($this->data);
    }
}

這裏咱們做弊了。相比於實現一個完整的 Iterator,咱們經過 ArrayIterator() 裝飾。不過,這相比於經過實現完整的 Iterator 簡化了很多代碼。

so what does this have to do with generators?

兄弟莫急!先讓咱們比較一些代碼。首先,咱們在不使用生成器的狀況下從文件中讀取每一行數據:

$content = file_get_contents(__FILE__);

$lines = explode("\n", $content);

foreach ($lines as $i => $line) {
    print $i . ". " . $line . "\n";
}

這段代碼讀取文件自身,而後會打印出每行的行號和代碼。那麼爲何咱們不使用生成器呢!

function lines($file) {
    $handle = fopen($file, 'r');

    while (!feof($handle)) {
        yield trim(fgets($handle));
    }

    fclose($handle);
}

foreach (lines(__FILE__) as $i => $line) {
    print $i . ". " . $line . "\n";
}

我知道這看起來更加複雜。不錯,不過這是由於咱們沒有使用 file_get_contents() 函數。一個生成器看起來就像是一個函數,可是它會在每次獲取到 yield 關鍵詞是中止運行。

生成器看起來有點像迭代器:

print_r(lines(__FILE__)); // Generator Object ( )

儘管它不是迭代器,它是一個 Generator。它的內部定義了什麼方法呢?

print_r(get_class_methods(lines(__FILE__)));
 
// Array
// (
//     [0] => rewind
//     [1] => valid
//     [2] => current
//     [3] => key
//     [4] => next
//     [5] => send
//     [6] => throw
//     [7] => __wakeup
// )
若是你讀取一個大文件,而後使用 memory_get_peak_usage(),你會注意到生成器的代碼會使用固定的內存,不管這個文件有多大。它每次進度去一行。而是用 file_get_contents() 函數讀取整個文件,會使用更大的內存。這就是在迭代處理這類事物時,生成器的能給咱們帶來的優點!

Send(發送數據)

能夠將數據發送到生成器中。看下下面這個生成器:

<?php
$generator = call_user_func(function() {
    yield "foo";
});

print $generator->current() . "\n"; // foo
注意這裏咱們如何在 call_user_func() 函數中封裝生成器函數的?這裏僅僅是一個簡單的函數定義,而後當即調用它獲取一個新的生成器實例...

咱們已經見過 yield 的用法。咱們能夠經過擴展這個生成器來接收數據:

$generator = call_user_func(function() {
    $input = (yield "foo");

    print "inside: " . $input . "\n";
});

print $generator->current() . "\n";

$generator->send("bar");

數據經過 yield 關鍵字傳入和返回。首先,執行 current() 代碼直到遇到 yield,返回 foosend() 將輸出傳入到生成器打印輸入的位置。你須要習慣這種用法。

拋出異常(Throw)

因爲咱們須要同這些函數進行交互,可能但願將異常推送到生成器中。這樣這些函數就能夠自行處理異常。

看看下面這個示例:

$multiply = function($x, $y) {
    yield $x * $y;
};

print $multiply(5, 6)->current(); // 30

如今讓咱們將它封裝到另外一個函數中:

$calculate = function ($op, $x, $y) use ($multiply) {
    if ($op === 'multiply') {
        $generator = $multiply($x, $y);

        return $generator->current();
    }
};

print $calculate("multiply", 5, 6); // 30

這裏咱們經過一個普通閉包將乘法生成器封裝起來。如今讓咱們驗證無效參數:

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            throw new InvalidArgumentException();
        }

        return $generator->current();
    }
};

print $calculate('multiply', 5, 'foo'); // PHP Fatal error...

若是咱們但願可以經過生成器處理異常?咱們怎樣才能將異常傳入生成器呢!

$multiply = function ($x, $y) {
    try {
        yield $x * $y;
    } catch (InvalidArgumentException $exception) {
        print "ERRORS!";
    }
};

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            $generator->throw(new InvalidArgumentException());
        }

        return $generator->current();
    }
};
print $calculate('multiply', 5, 'foo'); // PHP Fatal error...

棒呆了!咱們不只能夠像迭代器同樣使用生成器。還能夠經過它們發送數據並拋出異常。它們是可中斷和可恢復的函數。有些語言把這些函數叫作……

coroutines

咱們可使用協程(coroutines)來構建異步代碼。讓咱們來建立一個簡單的任務調度程序。首先咱們須要一個 Task 類:

class Task
{
    protected $generator;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        $this->generator->next();
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

Task 是普通生成器的裝飾器。咱們將生成器賦值給它的成員變量以供後續使用,而後實現一個簡單的 run()finished() 方法。run() 方法用於執行任務,finished() 方法用於讓調度程序知道什麼時候終止運行。

而後咱們須要一個 Scheduler 類:

class Scheduler
{
    protected $queue;

    public function __construct()
    {
        $this->queue = new SplQueue();
    }

    public function enqueue(Task $task)
    {
        $this->queue->enqueue($task);
    }

    pulic function run()
    {
        while (!$this->queue->isEmpty()) {
            $task = $this->queue->dequeue();
            $task->run();

            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }
        }
    }
}

Scheduler 用於維護一個待執行的任務隊列。run() 會彈出隊列中的全部任務並執行它,直到運行完整個隊列任務。若是某個任務沒有執行完畢,當這個任務本次運行完成後,咱們將再次入列。

SplQueue 對於這個示例來說再合適不過了。它是一種 FIFO(先進先出:fist in first out) 數據結構,可以確保每一個任務都可以獲取足夠的處理時間。

咱們能夠像這樣運行這段代碼:

$scheduler = new Scheduler();

$task1 = new Task(call_user_func(function() {
    for ($i = 0; $i < 3; $i++) {
        print "task1: " . $i . "\n";
        yield;
    }
}));

$task2 = new Task(call_user_func(function() {
    for ($i = 0; $i < 6; $i++) {
        print "task2: " . $i . "\n";
        yield;
    }
}));

$scheduler->enqueue($task1);
$scheduler->enqueue($task2);

$scheduler->run();

運行時,咱們將看到以下執行結果:

task 1: 0
task 1: 1
task 2: 0
task 2: 1
task 1: 2
task 2: 2
task 2: 3
task 2: 4
task 2: 5

這幾乎就是咱們想要的執行結果。不過有個問題發生在首次運行每一個任務時,它們都執行了兩次。咱們能夠對 Task 類稍做修改來修復這個問題:

class Task
{
    protected $generator;

    protected $run = false;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        if ($this->run) {
            $this->generator->next();
        } else {
            $this->generator->current();
        }

        $this->run = true;
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}

咱們須要調整首次 run() 方法調用,從生成器當前有效的指針讀取運行。後續調用能夠從下一個指針讀取運行...

so simple!

有些人基於這個思路實現了一些超讚的類庫。咱們來看看其中的兩個...

RecoilPHP

RecoilPHP 是一套基於協程的類庫,它最使人印象深入的是用於 ReactPHP 內核。能夠將事件循環在 RecoilPHP 和 RecoilPHP 之間進行交換,而你的程序無需架構上的調整。

咱們來看一下 ReactPHP 異步 DNS 解決方案:

function resolve($domain, $resolver) {
    $resolver
        ->resolve($domain)
        ->then(function ($ip) use ($domain) {
            print "domain: " . $domain . "\n";
            print "ip: " . $ip . "\n";
        }, function ($error) {            
            print $error . "\n";
        })
}

function run()
{
    $loop = React\EventLoop\Factory::create();
 
    $factory = new React\Dns\Resolver\Factory();
 
    $resolver = $factory->create("8.8.8.8", $loop);
 
    resolve("silverstripe.org", $resolver);
    resolve("wordpress.org", $resolver);
    resolve("wardrobecms.com", $resolver);
    resolve("pagekit.com", $resolver);
 
    $loop->run();
}
 
run();

resolve() 接收域名和 DNS 解析器,並使用 ReactPHP 執行標準的 DNS 查找。不用太過糾結與 resolve() 函數內部。重要的是這個函數不是生成器,而是一個函數!

run() 建立一個 ReactPHP 事件循環,DNS 解析器(這裏是個工廠實例)解析若干域名。一樣,這個也不是一個生成器。

想知道 RecoilPHP 到底有何不一樣?還但願掌握更多細節!

use Recoil\Recoil;
 
function resolve($domain, $resolver)
{
    try {
        $ip = (yield $resolver->resolve($domain));
 
        print "domain: " . $domain . "\n";
        print "ip: " . $ip . "\n";
    } catch (Exception $exception) {
        print $exception->getMessage() . "\n";
    }
}
 
function run()
{
    $loop = (yield Recoil::eventLoop());
 
    $factory = new React\Dns\Resolver\Factory();
 
    $resolver = $factory->create("8.8.8.8", $loop);
 
    yield [
        resolve("silverstripe.org", $resolver),
        resolve("wordpress.org", $resolver),
        resolve("wardrobecms.com", $resolver),
        resolve("pagekit.com", $resolver),
    ];
}
 
Recoil::run("run");

經過將它集成到 ReactPHP 來完成一些使人稱奇的工做。每次運行 resolve() 時,RecoilPHP 會管理由 $resoler->resolve() 返回的 promise 對象,而後將數據發送給生成器。此時咱們就像在編寫同步代碼同樣。與咱們在其餘一步模型中使用回調代碼不一樣,這裏只有一個指令列表。

RecoilPHP 知道它應該管理一個有執行 run() 函數時返回的 yield 數組。RoceilPHP 還支持基於協程的數據庫(PDO)和日誌庫。

IcicleIO

IcicleIO 爲了一全新的方案實現 ReactPHP 同樣的目標,而僅僅使用協程功能。相比 ReactPHP 它僅包含極少的組件。可是,核心的異步流、服務器、Socket、事件循環特性一個不落。

讓咱們看一個 socket 服務器示例:

use Icicle\Coroutine\Coroutine;
use Icicle\Loop\Loop;
use Icicle\Socket\Client\ClientInterface;
use Icicle\Socket\Server\ServerInterface;
use Icicle\Socket\Server\ServerFactory;
 
$factory = new ServerFactory();
 
$coroutine = Coroutine::call(function (ServerInterface $server) {
    $clients = new SplObjectStorage();
     
    $handler = Coroutine::async(
        function (ClientInterface $client) use (&$clients) {
            $clients->attach($client);
             
            $host = $client->getRemoteAddress();
            $port = $client->getRemotePort();
             
            $name = $host . ":" . $port;
             
            try {
                foreach ($clients as $stream) {
                    if ($client !== $stream) {
                        $stream->write($name . "connected.\n");
                    }
                }
 
                yield $client->write("Welcome " . $name . "!\n");
                 
                while ($client->isReadable()) {
                    $data = trim(yield $client->read());
                     
                    if ("/exit" === $data) {
                        yield $client->end("Goodbye!\n");
                    } else {
                        $message = $name . ":" . $data . "\n";
                        
                        foreach ($clients as $stream) {
                            if ($client !== $stream) {
                                $stream->write($message);
                            }
                        }
                    }
                }
            } catch (Exception $exception) {
                $client->close($exception);
            } finally {
                $clients->detach($client);
                foreach ($clients as $stream) {
                    $stream->write($name . "disconnected.\n");
                }
            }
        }
    );
     
    while ($server->isOpen()) {
        $handler(yield $server->accept());
    }
}, $factory->create("127.0.0.1", 6000));
 
Loop::run();

據我所知,這段代碼所作的事情以下:

  1. 在 127.0.0.1 和 6000 端口建立一個服務器實例,而後將其傳入外部生成器.
  2. 外部生成器運行,同時服務器等待新鏈接。當服務器接收一個鏈接它將其傳入內部生成器。
  3. 內部生成器寫入消息到 socket。當 socket 可讀時運行。
  4. 每次 socket 向服務器發送消息時,內部生成器檢測消息是不是退出標識。若是是,通知其餘 socket。不然,其它 socket 發送這個相同的消息。

打開命令行終端輸入 nc localhost 6000 查看執行結果!

該示例使用 SplObjectStorage 跟蹤 socket 鏈接。這樣咱們就能夠向全部 socket 發送消息。

mathematical!

這個話題能夠包含不少內容。但願您能看到生成器是如何建立的,以及它們如何幫助編寫迭代程序和異步代碼。

若是你有問題,能夠隨時問我

感謝 Nikita Popov(還有它的啓蒙教程 Cooperative multitasking using coroutines (in PHP!) ), Anthony FerraraJoe Watkins。這些研究工做澤被蒼生,給我以寫做此篇文章的靈感。關注他們吧,好麼?

原文

Co-operative PHP Multitasking

相關文章
相關標籤/搜索