標籤(空格分隔): swoolephp
這節來學習Swoole最基礎的Server
和Client
。會經過建立一個tcp Server來說解。mysql
<?php class Server { private $serv; public function __construct() { $this->serv = new Swoole\Server('127.0.0.1', 9501); //當啓動一個Swoole應用時,一共會建立2 + n + m個進程,2爲一個Master進程和一個Manager進程,其中n爲Worker進程數。m爲TaskWorker進程數。 //默認若是不設置,swoole底層會根據當前機器有多少CPU核數,啓動對應數量的Reactor線程和Worker進程。我機器爲4核的。Worker爲4。TaskWorker爲0。 //下面我來設置worker_num = 10。看下啓動了多少個進程 $this->serv->set([ 'worker_num' => 10, //'task_worker_num' => 2, 'deamonize' => true, ]); //啓動10個work,總共12個進程。 /* ➜ Event git:(master) pstree |grep server.php | \-+= 54172 yangyi php server.php #Master進程 | \-+- 54173 yangyi php server.php # Manager 進程 | |--- 54174 yangyi php server.php #Work 進程 | |--- 54175 yangyi php server.php | |--- 54176 yangyi php server.php | |--- 54177 yangyi php server.php | |--- 54178 yangyi php server.php | |--- 54179 yangyi php server.php | |--- 54180 yangyi php server.php | |--- 54181 yangyi php server.php | |--- 54182 yangyi php server.php | \--- 54183 yangyi php server.php * */ //增長新的監控的ip:post:mode $this->serv->addlistener("::1", 9500, SWOOLE_SOCK_TCP); //監聽事件 /* * * - onStart * - onShutdown * - onWorkerStart * - onWorkerStop * - onTimer * - onConnect * - onReceive * - onClose * - onTask * - onFinish * - onPipeMessage * - onWorkerError * - onManagerStart * - onManagerStop */ $this->serv->on('Start', array($this, 'onStart')); $this->serv->on('Connect', array($this, 'onConnect')); $this->serv->on('Receive', array($this, 'onReceive')); $this->serv->on('Close', array($this, 'onClose')); //master進程啓動後, fork出Manager進程, 而後觸發ManagerStart $this->serv->on('ManagerStart', function (\swoole_server $server){ echo "On manager start."; }); //manager進程啓動,啓動work進程的時候調用 workid表示第幾個id, 從0開始。 $this->serv->on('WorkerStart', function($serv, $workerId) { echo $workerId . '---'; }); //當一個work進程死掉後,會觸發 $this->serv->on('WorkerStop', function() { echo '--stop'; }); //啓動 $this->serv->start(); } //啓動server時候會觸發。 public function onStart( $serv ) { echo "Start\n"; } //client鏈接成功後觸發。 public function onConnect( $serv, $fd, $from_id ) { $a = $serv->send( $fd, "Hello {$fd}!" ); //var_dump($a); //成功返回true } //接收client發過來的請求 public function onReceive( swoole_server $serv, $fd, $from_id, $data ) { echo "Get Message From Client {$fd}:{$data}\n"; //$serv->send($fd, $data); //關閉該work進程 //$serv->stop(); //宕機 //$serv->shutdown(); //主動關閉 客戶端鏈接,也會觸發onClose事件 //$serv->close($fd); $serv->send($fd, $data); //$list = $serv->connection_list(); // foreach ($list as $fd) { // $serv->send($fd, $data); // } } } //客戶端斷開觸發 public function onClose( $serv, $fd, $from_id ) { echo "Client {$fd} close connection\n"; } } //輸出swoole的版本 echo swoole_version(); // 1.9.0 //輸出本機iP var_dump(swoole_get_local_ip()); /** array(1) { 'en4' => string(13) "172.16.71.149" } */ // 啓動服務器 $server = new Server();
咱們啓動服務端server:react
$ php server.php
0--start 2--start Start 1--start 3--start 4--start 5--start 6--start manager start. 7--start 9--start 8--start
咱們來分析整個server 啓動的步驟:git
- 啓動php server.php後,當前進程fork出Master進程,而後退出。
- Master進程啓動成功以後,fork出Manager進程,並觸發OnManagerStart事件。
- Manager進程啓動成功時候,fork出Worker進程,並觸發OnWorkerStart事件。
server端好了,那麼就會須要client端來鏈接,swoole裏面client分爲同步和異步,先來一個同步clent客戶端。github
<?php // sync 同步客戶端 class client { private $client; public function __construct() { $this->client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP); $this->client->connect('127.0.0.1', 9501, 1); } public function connect() { //fwrite(STDOUT, "請輸入消息:"); //$msg = trim(fgets(STDIN)); $msg = rand(1,12); //發送給消息到服務端 $this->client->send( $msg ); //接受服務端發來的信息 $message = $this->client->recv(); echo "Get Message From Server:{$message}\n"; //關閉客戶端 $this->client->close(); } } $client = new Client(); $client->connect();
同步client是同步阻塞的。一整套connect->send()->rev()->close()
是同步進行的。sql
因此,若是是大量的循環數據,就不適合同步client了:數據庫
好比下面:json
<?php // sync 同步客戶端 class client { private $client; public function __construct() { var_dump(swoole_get_local_ip()); $this->client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP); $this->client->connect('127.0.0.1', 9501, 1); $i = 0; while ($i < 100) { $this->client->send($i."\n"); $message = $this->client->recv(); echo "Get Message From Server:{$message}\n"; $i++; } } } $client = new Client();
打印的結果就是順序執行。要是想要異步了。segmentfault
<?php //異步客戶端 $client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC); $client->on("connect", function($cli) { var_dump($cli->isConnected()); // true var_dump($cli->getsockname()); //['port' => 57305, 'host'=> '127.0.0.1'] var_dump($cli->sock); // 5 $i = 0; while ($i < 100) { $cli->send($i."\n"); $i++; } //關閉 //$cli->close(); }); $client->on("receive", function($cli, $data){ echo "Receive: $data"; }); $client->on("error", function(swoole_client $cli){ echo "error\n" . $cli->errCode; }); $client->on("close", function(swoole_client $cli){ echo "Connection close\n"; }); $client->connect('127.0.0.1', 9501);
這樣就是一個異步的client了,處理更快,可是隻支持PHP的cli模式
。ruby
總結一下client與server的鏈接過程:
- Client主動Connect的時候,Client其實是與Master進程中的某個Reactor線程發生了鏈接。
- 當TCP的三次握手成功了之後,由這個Reactor線程將鏈接成功的消息告訴Manager進程,再由Manager進程轉交給Worker進程。
- 在這個Worker進程中觸發了OnConnect的方法。
- 當Client向Server發送了一個數據包的時候,首先收到數據包的是Reactor線程,同時Reactor線程會完成組包,再將組好的包交給Manager進程,由Manager進程轉交給Worker。
- 此時Worker進程觸發OnReceive事件。
- 若是在Worker進程中作了什麼處理,而後再用Send方法將數據發回給客戶端時,數據則會沿着這個路徑逆流而上。
關於上面說到的幾個進程,解釋下:
Master進程是一個多線程進程,其中有一組很是重要的線程,叫作Reactor線程(組),每當一個客戶端鏈接上服務器的時候,都會由Master進程從已有的Reactor線程中,根據必定規則挑選一個,專門負責向這個客戶端提供維持連接、處理網絡IO與收發數據等服務。
而Manager進程,某種意義上能夠看作一個代理層,它自己並不直接處理業務,其主要工做是將Master進程中收到的數據轉交給Worker進程,或者將Worker進程中但願發給客戶端的數據轉交給Master進程進行發送。另外,Manager進程還負責監控Worker進程,若是Worker進程由於某些意外掛了,Manager進程會從新拉起新的Worker進程,有點像Supervisor的工做。
Worker進程了,顧名思義,Worker進程其實就是處理各類業務工做的進程,Manager將數據包轉交給Worker進程,而後Worker進程進行具體的處理,並根據實際狀況將結果反饋給客戶端。
在swoole中work進程分爲EventWorker和TaskWorker,對應的配置文件設置爲:
$this->serv->set([ 'worker_num' => 10, #EventWorker 'task_worker_num' => 2, #TaskWorker 'deamonize' => true, ]);
worker是基於event觸發,而task則是manager直接生成的子進程。那麼他們有什麼區別呢?
共同點是:他們都是最底層負責處理業務的進程。
Swoole的業務邏輯部分是同步阻塞運行的,若是遇到一些耗時較大的操做,例如訪問數據庫、廣播消息等,就會影響服務器的響應速度。所以Swoole提供了Task功能,將這些耗時操做放到另外的進程去處理,當前woker進程繼續執行後面的邏輯。運行Task,須要在swoole服務中配置參數task_worker_num
,便可開啓task功能。此外,必須給swoole_server綁定兩個回調函數:onTask
和onFinish
。這兩個回調函數分別用於執行Task任務和處理Task任務的返回結果。
先來寫一個demo,來如何用 taskWoker來處理業務。
taskServer.php
<?php /** * Created by PhpStorm. * User: yangyi * Date: 2016/12/7 * Time: 16:16 */ class taskServer { private $serv; /** * [__construct description] * 構造方法中,初始化 $serv 服務 */ public function __construct() { $this->serv = new Swoole\Server('0.0.0.0', 9501); //初始化swoole服務 $this->serv->set(array( 'worker_num' => 8, 'daemonize' => false, //是否做爲守護進程,此配置通常配合log_file使用 'max_request' => 1000, 'log_file' => './swoole.log', 'task_worker_num' => 8 )); //設置監聽 $this->serv->on('Start', array($this, 'onStart')); $this->serv->on('Connect', array($this, 'onConnect')); $this->serv->on("Receive", array($this, 'onReceive')); $this->serv->on("Close", array($this, 'onClose')); $this->serv->on("Task", array($this, 'onTask')); $this->serv->on("Finish", array($this, 'onFinish')); //開啓 $this->serv->start(); } public function onStart($serv) { echo SWOOLE_VERSION . " onStart\n"; } public function onConnect($serv, $fd) { echo $fd."Client Connect.\n"; } public function onReceive($serv, $fd, $from_id, $data) { echo "Get Message From Client {$fd}:{$data}\n"; // send a task to task worker. $param = array( 'fd' => $fd ); // start a task $serv->task(json_encode($param)); echo "Continue Handle Worker\n"; } public function onClose($serv, $fd) { echo "Client Close.\n"; } public function onTask($serv, $task_id, $from_id, $data) { echo "This Task {$task_id} from Worker {$from_id}\n"; echo "Data: {$data}\n"; for($i = 0 ; $i < 200 ; $i ++ ) { sleep(1); echo "Task {$task_id} Handle {$i} times...\n"; } $fd = json_decode($data, true); $serv->send($fd['fd'] , "Data in Task {$task_id}"); return "Task {$task_id}'s result"; } public function onFinish($serv,$task_id, $data) { echo "Task {$task_id} finish\n"; echo "Result: {$data}\n"; } } $server = new taskServer();
taskClient.php 異步的客戶端
<?php /** * Created by PhpStorm. * User: yangyi * Date: 2016/12/7 * Time: 16:18 */ class taskClient { private $client; public function __construct() { $this->client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC); $this->client->on('Connect', array($this, 'onConnect')); $this->client->on('Receive', array($this, 'onReceive')); $this->client->on('Close', array($this, 'onClose')); $this->client->on('Error', array($this, 'onError')); } public function connect() { if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) { echo "Error: {$fp->errMsg}[{$fp->errCode}]\n"; return; } } //connect以後,會調用onConnect方法 public function onConnect($cli) { fwrite(STDOUT, "Enter Msg:"); swoole_event_add(STDIN,function(){ fwrite(STDOUT, "Enter Msg:"); $msg = trim(fgets(STDIN)); $this->send($msg); }); } public function onClose($cli) { echo "Client close connection\n"; } public function onError() { } public function onReceive($cli, $data) { echo "Received: ".$data."\n"; } public function send($data) { $this->client->send($data); } public function isConnected($cli) { return $this->client->isConnected(); } } $client = new taskClient(); $client->connect();
運行一下:
php taskServer.php
php taskClient.php
服務端打印:
$ php task_server.php 1.9.0 onStart 1Client Connect. Get Message From Client 1:12345 Continue Handle Worker This Task 0 from Worker 3 Data: {"fd":1} Task 0 Handle 0 times... Task 0 Handle 1 times... Task 0 finish Result: Task 0's result
客戶端打印:
$ php task_client.php Enter Msg:12345 Enter Msg:Received: Data in Task 0
這裏面有幾點須要注意:
1. 運行Task,必需要在swoole服務中配置參數task_worker_num
,此外,必須給swoole_server綁定兩個回調函數:onTask
和onFinish
。
2. onTash 要return 數據
3. onFinish 會接收到onTash的數據,標記成完成。
4. swoole_event_add 把輸入綁定成事件,這個後續將,這樣client就能夠連續的屢次輸入。
上面說了這麼,圖表總結一下swoole結構:
swoole採用 多線程Reactor+多進程Worker
swoole的處理鏈接流程圖以下:
當請求到達時,swoole是這樣處理的:
請求到達 Main Reactor
|
|
Main Reactor根據Reactor的狀況,將請求註冊給對應的Reactor
(每一個Reactor都有epoll。用來監聽客戶端的變化)
|
|
客戶端有變化時,交給worker來處理
|
|
worker處理完畢,經過進程間通訊(好比管道、共享內存、消息隊列)發給對應的reactor。
|
|
reactor將響應結果發給相應的鏈接
|
|
請求處理完成
由於reactor基於epoll,因此每一個reactor能夠處理無數個鏈接請求。 如此,swoole就輕鬆的處理了高併發。
參考資料:
http://rango.swoole.com/archives/305