轉自:https://newt0n.github.io/2017/02/10/PHP-%E5%8D%8F%E7%A8%8B%E5%8E%9F%E7%90%86/php
最先的服務器端程序都是經過多進程、多線程來解決併發IO的問題。進程模型出現的最先,從Unix 系統誕生就開始有了進程的概念。最先的服務器端程序通常都是 Accept 一個客戶端鏈接就建立一個進程,而後子進程進入循環同步阻塞地與客戶端鏈接進行交互,收發處理數據。python
多線程模式出現要晚一些,線程與進程相比更輕量,並且線程之間共享內存堆棧,因此不一樣的線程之間交互很是容易實現。好比實現一個聊天室,客戶端鏈接之間能夠交互,聊天室中的玩家能夠任意的其餘人發消息。用多線程模式實現很是簡單,線程中能夠直接向某一個客戶端鏈接發送數據。而多進程模式就要用到管道、消息隊列、共享內存等等統稱進程間通訊(IPC)複雜的技術才能實現。git
最簡單的多進程服務端模型github
$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr)
or die("Create server failed");
while(1) {
$conn = stream_socket_accept($serv);
if (pcntl_fork() == 0) {
$request = fread($conn);
// do something
// $response = "hello world";
fwrite($response);
fclose($conn);
exit(0);
}
}
|
多進程/線程模型的流程是:golang
建立一個 socket
,綁定服務器端口(bind
),監聽端口(listen
),在 PHP 中用 stream_socket_server
一個函數就能完成上面 3 個步驟,固然也可使用更底層的sockets
擴展分別實現。數據庫
進入 while
循環,阻塞在 accept
操做上,等待客戶端鏈接進入。此時程序會進入睡眠狀態,直到有新的客戶端發起 connect
到服務器,操做系統會喚醒此進程。accept
函數返回客戶端鏈接的 socket
主進程在多進程模型下經過 fork
(php: pcntl_fork)建立子進程,多線程模型下使用 pthread_create
(php: new Thread)建立子線程。編程
下文如無特殊聲明將使用進程同時表示進程/線程。windows
子進程建立成功後進入 while
循環,阻塞在 recv
(php:fread)調用上,等待客戶端向服務器發送數據。收到數據後服務器程序進行處理而後使用 send
(php: fwrite)向客戶端發送響應。長鏈接的服務會持續與客戶端交互,而短鏈接服務通常收到響應就會 close
。數組
當客戶端鏈接關閉時,子進程退出並銷燬全部資源,主進程會回收掉此子進程。
這種模式最大的問題是,進程建立和銷燬的開銷很大。因此上面的模式沒辦法應用於很是繁忙的服務器程序。對應的改進版解決了此問題,這就是經典的 Leader-Follower
模型。
$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr)
or die("Create server failed");
for($i = 0; $i < 32; $i++) {
if (pcntl_fork() == 0) {
while(1) {
$conn = stream_socket_accept($serv);
if ($conn == false) continue;
// do something
$request = fread($conn);
// $response = "hello world";
fwrite($response);
fclose($conn);
}
exit(0);
}
}
|
它的特色是程序啓動後就會建立 N 個進程。每一個子進程進入 Accept
,等待新的鏈接進入。當客戶端鏈接到服務器時,其中一個子進程會被喚醒,開始處理客戶端請求,而且再也不接受新的 TCP 鏈接。當此鏈接關閉時,子進程會釋放,從新進入 Accept
,參與處理新的鏈接。
這個模型的優點是徹底能夠複用進程,沒有額外消耗,性能很是好。不少常見的服務器程序都是基於此模型的,好比 Apache、PHP-FPM。
多進程模型也有一些缺點。
這種模型嚴重依賴進程的數量解決併發問題,一個客戶端鏈接就須要佔用一個進程,工做進程的數量有多少,併發處理能力就有多少。操做系統能夠建立的進程數量是有限的。
啓動大量進程會帶來額外的進程調度消耗。數百個進程時可能進程上下文切換調度消耗佔 CPU 不到 1% 能夠忽略不計,若是啓動數千甚至數萬個進程,消耗就會直線上升。調度消耗可能佔到 CPU 的百分之幾十甚至 100%。
談到多進程以及相似同時執行多個任務的模型,就不得不先談談並行和併發。
是指能處理多個同時活動的能力,併發事件之間不必定要同一時刻發生。
是指同時刻發生的兩個併發事件,具備併發的含義,但併發不必定並行。
正確的併發設計的標準是:
使多個操做能夠在重疊的時間段內進行。
two tasks can start, run, and complete in overlapping time periods
參考:
在瞭解 PHP 協程前,還有 迭代器 和 生成器 這兩個概念須要先認識一下。
PHP5 開始內置了 Iterator
即迭代器接口,因此若是你定義了一個類,並實現了Iterator
接口,那麼你的這個類對象就是 ZEND_ITER_OBJECT
便可迭代的,不然就是 ZEND_ITER_PLAIN_OBJECT
。
對於 ZEND_ITER_PLAIN_OBJECT
的類,foreach
會獲取該對象的默認屬性數組,而後對該數組進行迭代。
而對於 ZEND_ITER_OBJECT
的類對象,則會經過調用對象實現的 Iterator
接口相關函數來進行迭代。
任何實現了 Iterator
接口的類都是可迭代的,即均可以用 foreach
語句來遍歷。
interface Iterator extends Traversable
{
// 獲取當前內部標量指向的元素的數據
public mixed current()
// 獲取當前標量
public scalar key()
// 移動到下一個標量
public void next()
// 重置標量
public void rewind()
// 檢查當前標量是否有效
public boolean valid()
}
|
PHP 自帶的 range 函數原型:
range — 根據範圍建立數組,包含指定的元素
array range (mixed $start , mixed $end [, number $step = 1 ])
創建一個包含指定範圍單元的數組。
在不使用迭代器的狀況要實現一個和 PHP 自帶的 range
函數相似的功能,可能會這麼寫:
function range ($start, $end, $step = 1)
{
$ret = [];
for ($i = $start; $i <= $end; $i += $step) {
$ret[] = $i;
}
return $ret;
}
|
須要將生成的全部元素放在內存數組中,若是須要生成一個很是大的集合,則會佔用巨大的內存。
來看看迭代實現的 range
,咱們叫作 xrange
,他實現了 Iterator
接口必須的 5 個方法:
class Xrange implements Iterator
{
protected $start;
protected $limit;
protected $step;
protected $current;
public function __construct($start, $limit, $step = 1)
{
$this->start = $start;
$this->limit = $limit;
$this->step = $step;
}
public function rewind()
{
$this->current = $this->start;
}
public function next()
{
$this->current += $this->step;
}
public function current()
{
return $this->current;
}
public function key()
{
return $this->current + 1;
}
public function valid()
{
return $this->current <= $this->limit;
}
}
|
使用時代碼以下:
foreach (new Xrange(0, 9) as $key => $val) {
echo $key, ' ', $val, "\n";
}
|
輸出:
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
|
看上去功能和 range()
函數所作的一致,不一樣點在於迭代的是一個 對象(Object)
而不是數組:
var_dump(new Xrange(0, 9));
|
輸出:
object(Xrange)#1 (4) {
["start":protected]=>
int(0)
["limit":protected]=>
int(9)
["step":protected]=>
int(1)
["current":protected]=>
NULL
}
|
另外,內存的佔用狀況也徹底不一樣:
// range
$startMemory = memory_get_usage();
$arr = range(0, 500000);
echo 'range(): ', memory_get_usage() - $startMemory, " bytes\n";
unset($arr);
// xrange
$startMemory = memory_get_usage();
$arr = new Xrange(0, 500000);
echo 'xrange(): ', memory_get_usage() - $startMemory, " bytes\n";
|
輸出:
xrange(): 624 bytes
range(): 72194784 bytes
|
range()
函數在執行後佔用了 50W 個元素內存空間,而 xrange
對象在整個迭代過程當中只佔用一個對象的內存。
在喜聞樂見的各類 PHP 框架裏有很多生成器的實例,好比 Yii2 中用來構建 SQL 語句的 \yii\db\Query
類:
$query = (new \yii\db\Query)->from('user');
// yii\db\BatchQueryResult
foreach ($query->batch() as $users) {
// 每次循環獲得多條 user 記錄
}
|
來看一下 batch()
作了什麼:
/**
* Starts a batch query.
*
* A batch query supports fetching data in batches, which can keep the memory usage under a limit.
* This method will return a [[BatchQueryResult]] object which implements the [[\Iterator]] interface
* and can be traversed to retrieve the data in batches.
*
* For example,
*
*
* $query = (new Query)->from('user');
* foreach ($query->batch() as $rows) {
* // $rows is an array of 10 or fewer rows from user table
* }
*
*
* @param integer $batchSize the number of records to be fetched in each batch.
* @param Connection $db the database connection. If not set, the "db" application component will be used.
* @return BatchQueryResult the batch query result. It implements the [[\Iterator]] interface
* and can be traversed to retrieve the data in batches.
*/
public function batch($batchSize = 100, $db = null)
{
return Yii::createObject([
'class' => BatchQueryResult::className(),
'query' => $this,
'batchSize' => $batchSize,
'db' => $db,
'each' => false,
]);
}
|
實際上返回了一個 BatchQueryResult
類,類的源碼實現了 Iterator
接口 5 個關鍵方法:
class BatchQueryResult extends Object implements \Iterator
{
public $db;
public $query;
public $batchSize = 100;
public $each = false;
private $_dataReader;
private $_batch;
private $_value;
private $_key;
/**
* Destructor.
*/
public function __destruct()
{
// make sure cursor is closed
$this->reset();
}
/**
* Resets the batch query.
* This method will clean up the existing batch query so that a new batch query can be performed.
*/
public function reset()
{
if ($this->_dataReader !== null) {
$this->_dataReader->close();
}
$this->_dataReader = null;
$this->_batch = null;
$this->_value = null;
$this->_key = null;
}
/**
* Resets the iterator to the initial state.
* This method is required by the interface [[\Iterator]].
*/
public function rewind()
{
$this->reset();
$this->next();
}
/**
* Moves the internal pointer to the next dataset.
* This method is required by the interface [[\Iterator]].
*/
public function next()
{
if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) {
$this->_batch = $this->fetchData();
reset($this->_batch);
}
if ($this->each) {
$this->_value = current($this->_batch);
if ($this->query->indexBy !== null) {
$this->_key = key($this->_batch);
} elseif (key($this->_batch) !== null) {
$this->_key++;
} else {
$this->_key = null;
}
} else {
$this->_value = $this->_batch;
$this->_key = $this->_key === null ? 0 : $this->_key + 1;
}
}
/**
* Fetches the next batch of data.
* @return array the data fetched
*/
protected function fetchData()
{
// ...
}
/**
* Returns the index of the current dataset.
* This method is required by the interface [[\Iterator]].
* @return integer the index of the current row.
*/
public function key()
{
return $this->_key;
}
/**
* Returns the current dataset.
* This method is required by the interface [[\Iterator]].
* @return mixed the current dataset.
*/
public function current()
{
return $this->_value;
}
/**
* Returns whether there is a valid dataset at the current position.
* This method is required by the interface [[\Iterator]].
* @return boolean whether there is a valid dataset at the current position.
*/
public function valid()
{
return !empty($this->_batch);
}
}
|
以迭代器的方式實現了相似分頁取的效果,同時避免了一次性取出全部數據佔用太多的內存空間。
雖然迭代器僅需繼承接口便可實現,但畢竟須要定義一整個類而後實現接口的全部方法,實在是不怎麼方便。
生成器則提供了一種更簡單的方式來實現簡單的對象迭代,相比定義類來實現
PHP ManualIterator
接口的方式,性能開銷和複雜度大大下降。
生成器容許在 foreach
代碼塊中迭代一組數據而不須要建立任何數組。一個生成器函數,就像一個普通的有返回值的自定義函數相似,但普通函數只返回一次, 而生成器能夠根據須要經過 yield
關鍵字返回屢次,以便連續生成須要迭代返回的值。
一個最簡單的例子就是使用生成器來從新實現 xrange()
函數。效果和上面咱們用迭代器實現的差很少,但實現起來要簡單的多。
xrange
函數
function xrange($start, $limit, $step = 1) {
for ($i = 0; $i < $limit; $i += $step) {
yield $i + 1 => $i;
}
}
foreach (xrange(0, 9) as $key => $val) {
printf("%d %d \n", $key, $val);
}
// 輸出
// 1 0
// 2 1
// 3 2
// 4 3
// 5 4
// 6 5
// 7 6
// 8 7
// 9 8
|
實際上生成器生成的正是一個迭代器對象實例,該迭代器對象繼承了 Iterator
接口,同時也包含了生成器對象自有的接口,具體能夠參考 Generator 類的定義以及語法參考。
同時須要注意的是:
須要注意的是 yield
關鍵字,這是生成器的關鍵。經過上面的例子能夠看出,yield
會將當前產生的值傳遞給 foreach
,換句話說,foreach
每一次迭代過程都會從 yield
處取一個值,直到整個遍歷過程再也不能執行到 yield
時遍歷結束,此時生成器函數簡單的退出,而調用生成器的上層代碼還能夠繼續執行,就像一個數組已經被遍歷完了。
yield
最簡單的調用形式看起來像一個 return
申明,不一樣的是 yield
暫停當前過程的執行並返回值,而 return
是中斷當前過程並返回值。暫停當前過程,意味着將處理權轉交由上一級繼續進行,直到上一級再次調用被暫停的過程,該過程又會從上一次暫停的位置繼續執行。這像是什麼呢?若是以前已經在鳥哥的文章中粗略看過,應該知道這很像操做系統的進程調度,多個進程在一個 CPU 核心上執行,在系統調度下每個進程執行一段指令就被暫停,切換到下一個進程,這樣外部用戶看起來就像是同時在執行多個任務。
但僅僅如此還不夠,yield
除了能夠返回值之外,還能接收值,也就是能夠在兩個層級間實現雙向通訊。
來看看如何傳遞一個值給 yield
:
function printer()
{
while (true) {
printf("receive: %s\n", yield);
}
}
$printer = printer();
$printer->send('hello');
$printer->send('world');
// 輸出
receive: hello
receive: world
|
根據 PHP 官方文檔的描述能夠知道 Generator
對象除了實現 Iterator
接口中的必要方法之外,還有一個 send
方法,這個方法就是向 yield
語句處傳遞一個值,同時從 yield
語句處繼續執行,直至再次遇到 yield
後控制權回到外部。
既然 yield
能夠在其位置中斷並返回或者接收一個值,那能不能同時進行接收和返回呢?固然,這也是實現協程的根本。對上述代碼作出修改:
function printer()
{
$i = 0;
while (true) {
printf("receive: %s\n", (yield ++$i));
}
}
$printer = printer();
printf("%d\n", $printer->current());
$printer->send('hello');
printf("%d\n", $printer->current());
$printer->send('world');
printf("%d\n", $printer->current());
// 輸出
1
receive: hello
2
receive: world
3
|
這是另外一個例子:
function gen() {
$ret = (yield 'yield1');
var_dump($ret);
$ret = (yield 'yield2');
var_dump($ret);
}
$gen = gen();
var_dump($gen->current()); // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1" (第一個 var_dump)
// string(6) "yield2" (繼續執行到第二個 yield,吐出了返回值)
var_dump($gen->send('ret2')); // string(4) "ret2" (第二個 var_dump)
// NULL (var_dump 以後沒有其餘語句,因此此次 ->send() 的返回值爲 null)
|
current
方法是迭代器 Iterator
接口必要的方法,foreach
語句每一次迭代都會經過其獲取當前值,然後調用迭代器的 next
方法。在上述例子裏則是手動調用了 current
方法獲取值。
上述例子已經足以表示 yield 可以做爲實現雙向通訊的工具,也就是具有了後續實現協程的基本條件。
上面的例子若是第一次接觸並稍加思考,難免會疑惑爲何一個 yield
既是語句又是表達式,並且這兩種狀況還同時存在:
yield
,首先它都是語句,而跟在 yield
後面的任何表達式的值將做爲調用生成器函數的返回值,若是 yield
後面沒有任何表達式(變量、常量都是表達式),那麼它會返回 NULL
,這一點和 return
語句一致。yield
也是表達式,它的值就是 send
函數傳過來的值(至關於一個特殊變量,只不過賦值是經過 send
函數進行的)。只要調用send方法,而且生成器對象的迭代並未終結,那麼當前位置的 yield
就會獲得 send
方法傳遞過來的值,這和生成器函數有沒有把這個值賦值給某個變量沒有任何關係。這個地方可能須要仔細品味上面兩個 send()
方法的例子才能理解。但能夠簡單的記住:
除了 send()
方法,還有一種控制生成器執行的方法是 next()
函數:
Next()
,恢復生成器函數的執行直到下一個 yield
Send()
,向生成器傳入一個值,恢復執行直到下一個 yield
對於單核處理器,多進程實現多任務的原理是讓操做系統給一個任務每次分配必定的 CPU 時間片,而後中斷、讓下一個任務執行必定的時間片接着再中斷並繼續執行下一個,如此反覆。因爲切換執行任務的速度很是快,給外部用戶的感覺就是多個任務的執行是同時進行的。
多進程的調度是由操做系統來實現的,進程自身不能控制本身什麼時候被調度,也就是說:
進程的調度是由外層調度器搶佔式實現的
而協程要求當前正在運行的任務自動把控制權回傳給調度器,這樣就能夠繼續運行其餘任務。這與『搶佔式』的多任務正好相反, 搶佔多任務的調度器能夠強制中斷正在運行的任務, 無論它本身有沒有意願。『協做式多任務』在 Windows 的早期版本 (windows95) 和 Mac OS 中有使用, 不過它們後來都切換到『搶佔式多任務』了。理由至關明確:若是僅依靠程序自動交出控制的話,那麼一些惡意程序將會很容易佔用所有 CPU 時間而不與其餘任務共享。
協程的調度是由協程自身主動讓出控制權到外層調度器實現的
回到剛纔生成器實現 xrange
函數的例子,整個執行過程的交替能夠用下圖來表示:
協程能夠理解爲純用戶態的線程,經過協做而不是搶佔來進行任務切換。相對於進程或者線程,協程全部的操做均可以在用戶態而非操做系統內核態完成,建立和切換的消耗很是低。
簡單的說 Coroutine(協程) 就是提供一種方法來中斷當前任務的執行,保存當前的局部變量,下次再過來又能夠恢復當前局部變量繼續執行。
咱們能夠把大任務拆分紅多個小任務輪流執行,若是有某個小任務在等待系統 IO,就跳過它,執行下一個小任務,這樣往復調度,實現了 IO 操做和 CPU 計算的並行執行,整體上就提高了任務的執行效率,這也即是協程的意義。
PHP 從 5.5 開始支持生成器及 yield
關鍵字,而 PHP 協程則由 yield
來實現。
要理解協程,首先要理解:代碼是代碼,函數是函數。函數包裹的代碼賦予了這段代碼附加的意義:無論是否顯式的指明返回值,當函數內的代碼塊執行完後都會返回到調用層。而當調用層調用某個函數的時候,必須等這個函數返回,當前函數才能繼續執行,這就構成了後進先出,也就是 Stack
。
而協程包裹的代碼,不是函數,不徹底遵照函數的附加意義,協程執行到某個點,協會協程會 yield
返回一個值而後掛起,而不是 return
一個值而後結束,當再次調用協程的時候,會在上次 yield
的點繼續執行。
因此協程違背了一般操做系統和 x86 的 CPU 認定的代碼執行方式,也就是 Stack
的這種執行方式,須要運行環境(好比 php,python 的 yield 和 golang 的 goroutine)本身調度,來實現任務的中斷和恢復,具體到 PHP,就是靠 yield
來實現。
堆棧式調用 和 協程調用的對比:
結合以前的例子,能夠總結一下 yield
能作的就是:
send()
實現不一樣任務間的雙向通訊,也就能夠實現任務和調度器之間的通訊。yield
就是 PHP 實現協程的方式。
下面是雄文 Cooperative multitasking using coroutines (in PHP!) 裏一個簡單但完整的例子,來展現如何具體的在 PHP 裏實現協程任務的調度。
首先是一個任務類:
Task
class Task
{
// 任務 ID
protected $taskId;
// 協程對象
protected $coroutine;
// send() 值
protected $sendVal = null;
// 是否首次 yield
protected $beforeFirstYield = true;
public function __construct($taskId, Generator $coroutine) {
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}
public function getTaskId() {
return $this->taskId;
}
public function setSendValue($sendVal) {
$this->sendVal = $sendVal;
}
public function run() {
// 如以前提到的在send以前, 當迭代器被建立後第一次 yield 以前,一個 renwind() 方法會被隱式調用
// 因此實際上發生的應該相似:
// $this->coroutine->rewind();
// $this->coroutine->send();
// 這樣 renwind 的執行將會致使第一個 yield 被執行, 而且忽略了他的返回值.
// 真正當咱們調用 yield 的時候, 咱們獲得的是第二個yield的值,致使第一個yield的值被忽略。
// 因此這個加上一個是否第一次 yield 的判斷來避免這個問題
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendVal);
$this->sendVal = null;
return $retval;
}
}
public function isFinished() {
return !$this->coroutine->valid();
}
}
|
接下來是調度器,比 foreach
是要複雜一點,但好歹也能算個正兒八經的 Scheduler
:)
Scheduler
class Scheduler
{
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue;
public function __construct() {
$this->taskQueue = new SplQueue();
}
// (使用下一個空閒的任務id)建立一個新任務,而後把這個任務放入任務map數組裏. 接着它經過把任務放入任務隊列裏來實現對任務的調度. 接着run()方法掃描任務隊列, 運行任務.若是一個任務結束了, 那麼它將從隊列裏刪除, 不然它將在隊列的末尾再次被調度。
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {
// 任務入隊
$this->queue->enqueue($task);
}
public function run() {
while (!$this->queue->isEmpty()) {
// 任務出隊
$task = $this->queue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}
|
隊列可使每一個任務得到同等的 CPU 使用時間,
Demo
function task1() {
for ($i = 1; $i <= 10; ++$i) {
echo "This is task 1 iteration $i.\n";
yield;
}
}
function task2() {
for ($i = 1; $i <= 5; ++$i) {
echo "This is task 2 iteration $i.\n";
yield;
}
}
$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();
|
輸出:
This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.
|
結果正是咱們期待的,最初的 5 次迭代,兩個任務是交替進行的,而在第二個任務結束後,只有第一個任務繼續執行到結束。
若想真正的發揮出協程的做用,那必定是在一些涉及到阻塞 IO 的場景,咱們都知道 Web 服務器最耗時的部分一般都是 socket 讀取數據等操做上,若是進程對每一個請求都掛起的等待 IO 操做,那處理效率就過低了,接下來咱們看個支持非阻塞 IO 的 Scheduler:
<?php
class Scheduler
{
protected $maxTaskId = 0;
protected $tasks = []; // taskId => task
protected $queue;
// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];
public function __construct() {
// SPL 隊列
$this->queue = new SplQueue();
}
public function newTask(Generator $coroutine) {
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->tasks[$tid] = $task;
$this->schedule($task);
return $tid;
}
public function schedule(Task $task) {
// 任務入隊
$this->queue->enqueue($task);
}
public function run() {
while (!$this->queue->isEmpty()) {
// 任務出隊
$task = $this->queue->dequeue();
$task->run();
if ($task->isFinished()) {
unset($this->tasks[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
public function waitForRead($socket, Task $task)
{
if (isset($this->waitingForRead[(int)$socket])) {
$this->waitingForRead[(int)$socket][1][] = $task;
} else {
$this->waitingForRead[(int)$socket] = [$socket, [$task]];
}
}
public function waitForWrite($socket, Task $task)
{
if (isset($this->waitingForWrite[(int)$socket])) {
$this->waitingForWrite[(int)$socket][1][] = $task;
} else {
$this->waitingForWrite[(int)$socket] = [$socket, [$task]];
}
}
/**
* @param $timeout 0 represent
*/
protected function ioPoll($timeout)
{
$rSocks = [];
foreach ($this->waitingForRead as list($socket)) {
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($this->waitingForWrite as list($socket)) {
$wSocks[] = $socket;
}
$eSocks = [];
// $timeout 爲 0 時, stream_select 爲當即返回,爲 null 時則會阻塞的等,見
http://php.net/manual/zh/function.stream-select.php
if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
return;
}
foreach ($rSocks as $socket) {
list(, $tasks) = $this->waitingForRead[(int)$socket];
unset($this->waitingForRead[(int)$socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
foreach ($wSocks as $socket) {
list(, $tasks) = $this->waitingForWrite[(int)$socket];
unset($this->waitingForWrite[(int)$socket]);
foreach ($tasks as $task) {
$this->schedule($task);
}
}
}
/**
* 檢查隊列是否爲空,若爲空則掛起的執行 stream_select,不然檢查完 IO 狀態當即返回,詳見 ioPoll()
* 做爲任務加入隊列後,因爲 while true,會被一直重複的加入任務隊列,實現每次任務前檢查 IO 狀態
* @return Generator object for newTask
*
*/
protected function ioPollTask()
{
while (true) {
if ($this->taskQueue->isEmpty()) {
$this->ioPoll(null);
} else {
$this->ioPoll(0);
}
yield;
}
}
/**
* $scheduler = new Scheduler;
* $scheduler->newTask(Web Server Generator);
* $scheduler->withIoPoll()->run();
*
* 新建 Web Server 任務後先執行 withIoPoll() 將 ioPollTask() 做爲任務入隊
*
* @return $this
*/
public function withIoPoll()
{
$this->newTask($this->ioPollTask());
return $this;
}
}
|
這個版本的 Scheduler 里加入一個永不退出的任務,而且經過 stream_select
支持的特性來實現快速的來回檢查各個任務的 IO 狀態,只有 IO 完成的任務纔會繼續執行,而 IO 還未完成的任務則會跳過,完整的代碼和例子能夠戳這裏。
也就是說任務交替執行的過程當中,一旦遇到須要 IO 的部分,調度器就會把 CPU 時間分配給不須要 IO 的任務,等到當前任務遇到 IO 或者以前的任務 IO 結束纔再次調度 CPU 時間,以此實現 CPU 和 IO 並行來提高執行效率,相似下圖:
若是想將一個單進程任務改形成併發執行,咱們能夠選擇改形成多進程或者協程:
多進程改造
協程改造
PHP 的協程或者其餘語言中,好比 Python、Lua 等都有協程的概念,和 Go 協程有些類似,不過有兩點不一樣:
runtime.GOMAXPROCS()
指定可同時使用的 CPU 個數),協程通常來講只是併發。channel
來通訊;協程經過 yield
讓出和恢復操做來通訊。Go 協程比普通協程更強大,也很容易從協程的邏輯複用到 Go 協程,並且在 Go 的開發中也使用的極爲廣泛,有興趣的話能夠了解一下做爲對比。
我的感受 PHP 的協程在實際使用中想要徒手實現和應用並不方便並且場景有限,但瞭解其概念及實現原理對更好的理解併發不無裨益。
若是想更多的瞭解協程的實際應用場景不妨試試已經大名鼎鼎的 Swoole,其對多種協議的 client 作了底層的協程封裝,幾乎能夠作到以同步編程的寫法實現協程異步 IO 的效果。