一提到高併發,就沒有辦法繞開I/O複用,再具體到特定的平臺linux, 就沒辦法繞開epoll. epoll爲啥高效的原理就不講了,感興趣的同窗能夠自行搜索研究一下。php
php怎麼玩epoll?首先得安裝個libevent庫,再裝個event擴展或者libevent擴展就能夠愉快地玩耍了.html
有些人搞不清楚libevent庫跟libevent擴展的區別,簡單來講,libevent庫是C語言對epoll的封裝,跟PHP毛關係都沒有;libevent擴展就是PHP跟libevent庫的溝通橋樑。實際上PHP的不少擴展就是幹這個事的,有一些優秀的C語言庫,PHP想直接拿來用,就經過PHP擴展的方式接入到PHP。react
libevent擴展和event擴展隨便選一個裝,我我的更喜歡event擴展,由於更面向對象一點。本身去http://pecl.php.net裏面搜跟本身PHP版本對應的擴展,下好編譯安裝一下就OK了.電腦裝了多個版本的PHP編譯的時候注意一下,phpize的版本要對應上,別搞錯了,典型的五步曲:linux
phpize
./configure
make
make install
php -m | grep event #看看裝上了沒
複製代碼
咱們要實現的服務器,傳輸層是TCP協議,應用層協議太多太複雜,限於篇幅,會簡單地以HTTP服務器舉個例子,HTTP協議自己就很複雜,要實現起來細節上有不少考究,咱們也不會徹底實現HTTP協議。nginx
首先,建立一個socket,三步曲,socket_create、socket_bind、socket_listen,爲何是這三步曲呢?很簡單,無論你傳輸層協議是啥,你下面的網絡層協議你得選個版本吧,IPV4仍是IPV6,傳輸層工做方式你得選一個吧,全雙工、半雙工仍是單工,TCP仍是UDP你也得選一個吧,socket_create就是這三個選項;肯定了網絡層和傳輸層,你得告訴我監聽哪一個端口吧,這就對應了socket_bind;而後你得開啓監聽,並指定一個客戶端的隊列長度吧,這就是socket_listen乾的事。shell
建立完了,同步阻塞咱就不介紹了,一個進程同時最多hold處一個鏈接,多幾個鏈接同時請求,就得等唄,超過了socket_listen指定的隊列長度,就得返回504了。多進程也同樣,幾個進程就有幾個併發,進程又是昂貴資源,並且進程的上下文切換費時費力,致使整個系統效率低下。服務器
不要緊,咱有epoll,hold住萬千請求不是夢,先實現一個Reactor。libevent庫就是Reactor模式,直接調用函數就是在使用Reactor模式,因此無需糾結到底php怎麼實現Reactor模式。網絡
<?php
use Event;
use EventBase;
class Reactor {
protected $reactor;
protected $events;
public static $instance = null;
const READ = Event::READ | Event::PERSIST;
const WRITE = Event::WRITE | Event::PERSIST;
public static function getInstance() {
if (is_null(self::$instance)) {
self::$instance = new self();
self::$instance->reactor = new EventBase;
}
return self::$instance;
}
public function add($fd, $what, $cb, $arg = null) {
switch ($what) {
case self::READ:
$event = new Event($this->reactor, $fd, self::READ, $cb, $arg);
break;
case self::WRITE:
$event = new Event($this->reactor, $fd, self::WRITE, $cb, $arg);
break;
default:
$event = new Event($this->reactor, $fd, $what, $cb, $arg);
break;
}
$event->add();
$this->events[(int) $fd][$what] = $event;
}
public function del($fd, $what = 'all') {
$events = $this->events[(int) $fd];
if ($what == 'all') {
foreach ($events as $event) {
$event->free();
}
} else {
if ($what != self::READ && $what != self::WRITE) {
throw new \Exception('不存在的事件');
}
$events[$what]->free();
}
}
public function run() {
$this->reactor->loop();
}
public function stop() {
foreach ($this->events as $events) {
foreach ($events as $event) {
$event->free();
}
}
$this->reactor->stop();
}
}
複製代碼
上面的代碼很簡單,簡單解釋一下概念,EventBase就是個容器,裏面裝的Event實例,這麼一說,上面的代碼就很是好懂了。 而後一個Server.數據結構
<?php
use Throwable;
use Monolog\Handler\StreamHandler;
class Server {
protected $ip;
protected $port;
protected $socket;
protected $reactor;
public function __construct($ip, $port) {
$this->ip = $ip;
$this->port = $port;
}
public function start() {
$socket = $this->createTcpConnection();
stream_set_blocking($socket, false);
Reactor::getInstance()->add($socket, Reactor::READ, function($socket) {
$conn = stream_socket_accept($socket);
stream_set_blocking($conn, false);
(new Connection($conn))->handle();
});
Reactor::getInstance()->run();
}
public function createTcpConnection() {
$schema = sprintf("tcp://%s:%d", $this->ip, $this->port);
$socket = stream_socket_server($schema, $errno, $errstr);
if ($errno) {
throw new \Exception($errstr);
}
return $socket;
}
}
複製代碼
Connection併發
<?php
class Connection {
protected $conn;
protected $read_buffer = '';
protected $write_buffer = '';
public function __construct($conn) {
$this->conn = $conn;
}
public function handle() {
Reactor::getInstance()->add($this->conn, Reactor::READ, \Closure::fromCallable([$this, 'read']));
}
private function read($conn) {
$this->read_buffer = '';
if (is_resource($conn)) {
while ($content = fread($conn, 65535)) {
$this->read_buffer .= $content;
}
}
if ($this->read_buffer) {
Reactor::getInstance()->add($conn, Reactor::WRITE, \Closure::fromCallable([$this, 'write']));
} else {
Reactor::getInstance()->del($conn);
fclose($conn);
}
}
private function write($conn) {
if (is_resource($conn)) {
fwrite($conn, "HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf8\r\nContent-Length:11\r\nConnection: keep-alive\r\n\r\nHello!world");
}
}
}
複製代碼
先建立Socket的三步曲,設置成非阻塞模式。而後把socket加到Reactor中監聽可讀事件,可讀的意思就是,緩衝區有數據了,纔可讀。可讀事件發生,說明有新鏈接來了,用stream_socket_accept
接收新鏈接Conn,把Conn放到Reactor中監聽可讀事件,可讀事件發生,說明客戶端有數據發送過來了,循環讀直到沒數據,而後把Conn放到Reactor中監聽可寫事件,可寫事件發生,說明客戶端數據發送完了,把協議組裝一下寫入響應。
應用層若是是HTTP協議要注意一下Connection: keep-alive頭,由於要複用鏈接,不要一寫完就關閉鏈接。
擼完收工,用ab
測一下併發,加-k
參數複用鏈接,i5+8G,3W的併發沒啥問題,固然咱們這兒沒有磁盤I/O,實際狀況要從磁盤讀取文件,讀文件要經過linux的系統調用,並且有幾回的文件拷貝操做,花銷比較大,經常使用的解決思路是sendfile,零拷貝直接從一個FD到另外一個FD,效率比較高,缺點就是PHP沒有現成的已經實現sendfile的擴展,得本身動手,開發成本有點高。
ab測試PO圖:
這就是PHP實現高併發服務器的思路了,只要是用EPOLL解決的,思路都同樣,都是三步曲,放到Reactor下監聽FD事件。固然這個只是最簡單的模型,還有不少能夠改進的地方,好比說多進程,抄襲一下nginx,一個主進程+N個工做進程,多進程的目的仍是想利用多核並行工做。C語言實現也是這樣,只是你可能不用libevent庫,本身封裝EPOLL,畢竟libevent庫有點重,你也用不到libevent的不少東西;固然了,C語言有一堆的數據結構以及定義在數據結構上的操做要寫,沒有GC,本身管理內存,還要有良好的設計,上多進程還得搞一搞IPC進程間通訊的東西,開發難度比PHP要大地多,開發週期也很長,有興趣的同窗能夠本身擼一個玩。