本文首發於 PHP 多任務協程處理,轉載請註明出處!
上週 有幸和同事一塊兒在 SilverStripe 分享最近的工做事宜。今天我計劃分享 PHP 異步編程,不過因爲上週我聊過 ReactPHP;我決定討論一些不同的內容。因此本文將探討多任務協程這方面的內容。php
另外我還計劃把這個主題加入到我正在籌備的一本 PHP 異步編程的圖書中。雖然這本書相比本文來講會涉及更多細節,但我以爲本文依然具備實際意義!html
那麼,開始吧!
new MyIterator(
react
這就是本文咱們要討論的問題。不過咱們會從更簡單更熟悉的示例開始。git
咱們能夠經過簡單的遍從來使用數組:github
$array = ["foo", "bar", "baz"]; foreach ($array as $key => $value) { print "item: " . $key . "|" . $value . "\n"; } for ($i = 0; $i < count($array); $i++) { print "item: " . $i . "|" . $array[$i] . "\n"; }
這是咱們平常編碼所依賴的基本實現。能夠經過遍歷數組獲取每一個元素的鍵名和鍵值。shell
固然,若是咱們但願可以知道在什麼時候可使用數組。PHP 提供了一個方便的內置函數:數據庫
print is_array($array) ? "yes" : "no"; // yes
有時,咱們須要對一些數據使用相同的方式進行遍歷處理,但它們並不是數組類型。好比對 DOMDocument 類進行處理:編程
$document = new DOMDocument(); $document->loadXML("<div></div>"); $elements = $document->getElementsByTagName("div"); print_r($elements); // DOMNodeList Object ( [length] => 1 )
這顯然不是一個數組,可是它有一個 length 屬性。咱們能像遍歷數組同樣,對其進行遍歷麼?咱們能夠判斷它是否實現了下面這個特殊的接口:數組
print ($elements instanceof Traversable) ? "yes" : "no"; // yes
這真的太有用了。它不會致使咱們在遍歷非可遍歷數據時觸發錯誤。咱們僅需在處理前進行檢測便可。promise
不過,這會引起另一個問題:咱們可否讓自定義類也擁有這個功能呢?回答是確定的!第一個實現方法相似以下:
class MyTraversable implements Traversable { // 在這裏編碼... }
若是咱們執行這個類,咱們將看到一個錯誤信息:
PHP Fatal error: Class MyTraversable must implement interface Traversable as part of either Iterator or IteratorAggregate
咱們沒法直接實現 Traversable,可是咱們能夠嘗試第二種方案:
class MyTraversable implements Iterator { // 在這裏編碼... }
這個接口須要咱們實現 5 個方法。讓咱們完善咱們的迭代器:
class MyTraversable implements Iterator { protected $data; protected $index = 0; public function __construct($data) { $this->data = $data; } public function current() { return $this->data[$this->index]; } public function next() { return $this->data[$this->index++]; } public function key() { return $this->index; } public function rewind() { $this->index = 0; } public function valid() { return $this->index < count($this->data); } }
這邊咱們須要注意幾個事項:
咱們能夠向下面這樣運行這段代碼:
$iterator = new MyTraversable(["foo", "bar", "baz"]); foreach ($iterator as $key => $value) { print "item: " . $key . "|" . $value . "\n"; }
這看起來須要處理太多工做,可是這是可以像數組同樣使用 foreach/for 功能的一個簡潔實現。
還記得第二個接口拋出的 Traversable 異常麼?下面看一個比實現 Iterator 接口更快的實現吧:
class MyIteratorAggregate implements IteratorAggregate { protected $data; public function __construct($data) { $this->data = $data; } public function getIterator() { return new ArrayIterator($this->data); } }
這裏咱們做弊了。相比於實現一個完整的 Iterator,咱們經過 ArrayIterator() 裝飾。不過,這相比於經過實現完整的 Iterator 簡化了很多代碼。
兄弟莫急!先讓咱們比較一些代碼。首先,咱們在不使用生成器的狀況下從文件中讀取每一行數據:
$content = file_get_contents(__FILE__); $lines = explode("\n", $content); foreach ($lines as $i => $line) { print $i . ". " . $line . "\n"; }
這段代碼讀取文件自身,而後會打印出每行的行號和代碼。那麼爲何咱們不使用生成器呢!
function lines($file) { $handle = fopen($file, 'r'); while (!feof($handle)) { yield trim(fgets($handle)); } fclose($handle); } foreach (lines(__FILE__) as $i => $line) { print $i . ". " . $line . "\n"; }
我知道這看起來更加複雜。不錯,不過這是由於咱們沒有使用 file_get_contents() 函數。一個生成器看起來就像是一個函數,可是它會在每次獲取到 yield 關鍵詞是中止運行。
生成器看起來有點像迭代器:
print_r(lines(__FILE__)); // Generator Object ( )
儘管它不是迭代器,它是一個 Generator。它的內部定義了什麼方法呢?
print_r(get_class_methods(lines(__FILE__))); // Array // ( // [0] => rewind // [1] => valid // [2] => current // [3] => key // [4] => next // [5] => send // [6] => throw // [7] => __wakeup // )
若是你讀取一個大文件,而後使用 memory_get_peak_usage(),你會注意到生成器的代碼會使用固定的內存,不管這個文件有多大。它每次進度去一行。而是用 file_get_contents() 函數讀取整個文件,會使用更大的內存。這就是在迭代處理這類事物時,生成器的能給咱們帶來的優點!
能夠將數據發送到生成器中。看下下面這個生成器:
<?php $generator = call_user_func(function() { yield "foo"; }); print $generator->current() . "\n"; // foo
注意這裏咱們如何在 call_user_func() 函數中封裝生成器函數的?這裏僅僅是一個簡單的函數定義,而後當即調用它獲取一個新的生成器實例...
咱們已經見過 yield 的用法。咱們能夠經過擴展這個生成器來接收數據:
$generator = call_user_func(function() { $input = (yield "foo"); print "inside: " . $input . "\n"; }); print $generator->current() . "\n"; $generator->send("bar");
數據經過 yield 關鍵字傳入和返回。首先,執行 current() 代碼直到遇到 yield,返回 foo。send() 將輸出傳入到生成器打印輸入的位置。你須要習慣這種用法。
因爲咱們須要同這些函數進行交互,可能但願將異常推送到生成器中。這樣這些函數就能夠自行處理異常。
看看下面這個示例:
$multiply = function($x, $y) { yield $x * $y; }; print $multiply(5, 6)->current(); // 30
如今讓咱們將它封裝到另外一個函數中:
$calculate = function ($op, $x, $y) use ($multiply) { if ($op === 'multiply') { $generator = $multiply($x, $y); return $generator->current(); } }; print $calculate("multiply", 5, 6); // 30
這裏咱們經過一個普通閉包將乘法生成器封裝起來。如今讓咱們驗證無效參數:
$calculate = function ($op, $x, $y) use ($multiply) { if ($op === "multiply") { $generator = $multiply($x, $y); if (!is_numeric($x) || !is_numeric($y)) { throw new InvalidArgumentException(); } return $generator->current(); } }; print $calculate('multiply', 5, 'foo'); // PHP Fatal error...
若是咱們但願可以經過生成器處理異常?咱們怎樣才能將異常傳入生成器呢!
$multiply = function ($x, $y) { try { yield $x * $y; } catch (InvalidArgumentException $exception) { print "ERRORS!"; } }; $calculate = function ($op, $x, $y) use ($multiply) { if ($op === "multiply") { $generator = $multiply($x, $y); if (!is_numeric($x) || !is_numeric($y)) { $generator->throw(new InvalidArgumentException()); } return $generator->current(); } }; print $calculate('multiply', 5, 'foo'); // PHP Fatal error...
棒呆了!咱們不只能夠像迭代器同樣使用生成器。還能夠經過它們發送數據並拋出異常。它們是可中斷和可恢復的函數。有些語言把這些函數叫作……
咱們可使用協程(coroutines)來構建異步代碼。讓咱們來建立一個簡單的任務調度程序。首先咱們須要一個 Task 類:
class Task { protected $generator; public function __construct(Generator $generator) { $this->generator = $generator; } public function run() { $this->generator->next(); } public function finished() { return !$this->generator->valid(); } }
Task 是普通生成器的裝飾器。咱們將生成器賦值給它的成員變量以供後續使用,而後實現一個簡單的 run() 和 finished() 方法。run() 方法用於執行任務,finished() 方法用於讓調度程序知道什麼時候終止運行。
而後咱們須要一個 Scheduler 類:
class Scheduler { protected $queue; public function __construct() { $this->queue = new SplQueue(); } public function enqueue(Task $task) { $this->queue->enqueue($task); } pulic function run() { while (!$this->queue->isEmpty()) { $task = $this->queue->dequeue(); $task->run(); if (!$task->finished()) { $this->queue->enqueue($task); } } } }
Scheduler 用於維護一個待執行的任務隊列。run() 會彈出隊列中的全部任務並執行它,直到運行完整個隊列任務。若是某個任務沒有執行完畢,當這個任務本次運行完成後,咱們將再次入列。
SplQueue 對於這個示例來說再合適不過了。它是一種 FIFO(先進先出:fist in first out) 數據結構,可以確保每一個任務都可以獲取足夠的處理時間。
咱們能夠像這樣運行這段代碼:
$scheduler = new Scheduler(); $task1 = new Task(call_user_func(function() { for ($i = 0; $i < 3; $i++) { print "task1: " . $i . "\n"; yield; } })); $task2 = new Task(call_user_func(function() { for ($i = 0; $i < 6; $i++) { print "task2: " . $i . "\n"; yield; } })); $scheduler->enqueue($task1); $scheduler->enqueue($task2); $scheduler->run();
運行時,咱們將看到以下執行結果:
task 1: 0 task 1: 1 task 2: 0 task 2: 1 task 1: 2 task 2: 2 task 2: 3 task 2: 4 task 2: 5
這幾乎就是咱們想要的執行結果。不過有個問題發生在首次運行每一個任務時,它們都執行了兩次。咱們能夠對 Task 類稍做修改來修復這個問題:
class Task { protected $generator; protected $run = false; public function __construct(Generator $generator) { $this->generator = $generator; } public function run() { if ($this->run) { $this->generator->next(); } else { $this->generator->current(); } $this->run = true; } public function finished() { return !$this->generator->valid(); } }
咱們須要調整首次 run() 方法調用,從生成器當前有效的指針讀取運行。後續調用能夠從下一個指針讀取運行...
有些人基於這個思路實現了一些超讚的類庫。咱們來看看其中的兩個...
RecoilPHP 是一套基於協程的類庫,它最使人印象深入的是用於 ReactPHP 內核。能夠將事件循環在 RecoilPHP 和 RecoilPHP 之間進行交換,而你的程序無需架構上的調整。
咱們來看一下 ReactPHP 異步 DNS 解決方案:
function resolve($domain, $resolver) { $resolver ->resolve($domain) ->then(function ($ip) use ($domain) { print "domain: " . $domain . "\n"; print "ip: " . $ip . "\n"; }, function ($error) { print $error . "\n"; }) } function run() { $loop = React\EventLoop\Factory::create(); $factory = new React\Dns\Resolver\Factory(); $resolver = $factory->create("8.8.8.8", $loop); resolve("silverstripe.org", $resolver); resolve("wordpress.org", $resolver); resolve("wardrobecms.com", $resolver); resolve("pagekit.com", $resolver); $loop->run(); } run();
resolve() 接收域名和 DNS 解析器,並使用 ReactPHP 執行標準的 DNS 查找。不用太過糾結與 resolve() 函數內部。重要的是這個函數不是生成器,而是一個函數!
run() 建立一個 ReactPHP 事件循環,DNS 解析器(這裏是個工廠實例)解析若干域名。一樣,這個也不是一個生成器。
想知道 RecoilPHP 到底有何不一樣?還但願掌握更多細節!
use Recoil\Recoil; function resolve($domain, $resolver) { try { $ip = (yield $resolver->resolve($domain)); print "domain: " . $domain . "\n"; print "ip: " . $ip . "\n"; } catch (Exception $exception) { print $exception->getMessage() . "\n"; } } function run() { $loop = (yield Recoil::eventLoop()); $factory = new React\Dns\Resolver\Factory(); $resolver = $factory->create("8.8.8.8", $loop); yield [ resolve("silverstripe.org", $resolver), resolve("wordpress.org", $resolver), resolve("wardrobecms.com", $resolver), resolve("pagekit.com", $resolver), ]; } Recoil::run("run");
經過將它集成到 ReactPHP 來完成一些使人稱奇的工做。每次運行 resolve() 時,RecoilPHP 會管理由 $resoler->resolve() 返回的 promise 對象,而後將數據發送給生成器。此時咱們就像在編寫同步代碼同樣。與咱們在其餘一步模型中使用回調代碼不一樣,這裏只有一個指令列表。
RecoilPHP 知道它應該管理一個有執行 run() 函數時返回的 yield 數組。RoceilPHP 還支持基於協程的數據庫(PDO)和日誌庫。
IcicleIO 爲了一全新的方案實現 ReactPHP 同樣的目標,而僅僅使用協程功能。相比 ReactPHP 它僅包含極少的組件。可是,核心的異步流、服務器、Socket、事件循環特性一個不落。
讓咱們看一個 socket 服務器示例:
use Icicle\Coroutine\Coroutine; use Icicle\Loop\Loop; use Icicle\Socket\Client\ClientInterface; use Icicle\Socket\Server\ServerInterface; use Icicle\Socket\Server\ServerFactory; $factory = new ServerFactory(); $coroutine = Coroutine::call(function (ServerInterface $server) { $clients = new SplObjectStorage(); $handler = Coroutine::async( function (ClientInterface $client) use (&$clients) { $clients->attach($client); $host = $client->getRemoteAddress(); $port = $client->getRemotePort(); $name = $host . ":" . $port; try { foreach ($clients as $stream) { if ($client !== $stream) { $stream->write($name . "connected.\n"); } } yield $client->write("Welcome " . $name . "!\n"); while ($client->isReadable()) { $data = trim(yield $client->read()); if ("/exit" === $data) { yield $client->end("Goodbye!\n"); } else { $message = $name . ":" . $data . "\n"; foreach ($clients as $stream) { if ($client !== $stream) { $stream->write($message); } } } } } catch (Exception $exception) { $client->close($exception); } finally { $clients->detach($client); foreach ($clients as $stream) { $stream->write($name . "disconnected.\n"); } } } ); while ($server->isOpen()) { $handler(yield $server->accept()); } }, $factory->create("127.0.0.1", 6000)); Loop::run();
據我所知,這段代碼所作的事情以下:
打開命令行終端輸入 nc localhost 6000 查看執行結果!
該示例使用 SplObjectStorage 跟蹤 socket 鏈接。這樣咱們就能夠向全部 socket 發送消息。
這個話題能夠包含不少內容。但願您能看到生成器是如何建立的,以及它們如何幫助編寫迭代程序和異步代碼。
若是你有問題,能夠隨時問我。
感謝 Nikita Popov(還有它的啓蒙教程 Cooperative multitasking using coroutines (in PHP!) ), Anthony Ferrara 和 Joe Watkins。這些研究工做澤被蒼生,給我以寫做此篇文章的靈感。關注他們吧,好麼?