Swoole從入門到入土(5)——TCP服務器[異步任務]

不管對於B/S仍是C/S,程序再怎麼變,惟一不變的是用戶不想等過久的躁動心情。因此服務端對於客戶的請求,能有多快就多快。若是服務端須要執行很耗時的操做,就須要異步任務處理機制,保證當前的響應速度不受影響。php

如今如下面的一個例子爲引子:編程

<?php
$server = new Swoole\Server('0.0.0.0', 9501);
$server->set([
    'max_wait_time'=>60,
    'reload_async'=>true,
    'worker_num'=>1,
    'task_worker_num'=>1,
    'task_max_request'=>100
]);

//監聽鏈接進入事件
$server->on('Connect', function ($server, $fd) {
    
});


//監聽數據接收事件
$server->on('Receive', function ($server, $fd, $from_id, $data) {
    $server->task("receive to task:$data");    //將任務丟入異步任務列隊
});

$server->on("task",function ($serv, $fd, $from_id, $data){
    //在這裏處理任務
    $serv->finish("$data ok");    //處理完成後,將結果傳給finish
});

$server->on("finish",function ($serv, $task_id, $data){
    //處理任務完成後的事情
    echo "finish $data\n";
});

//監聽鏈接關閉事件
$server->on('Close', function ($server, $fd) {

});

//啓動服務器
$server->start();

 

上面這個例子,在以前的代碼上增長了兩個屬性(task_worker_num和task_max_request)和兩個事件(onTask和onFinish)。代碼中同時設置了這四項,即可以啓動swoole的異步任務處理。是否是很簡單?json

那麼如今分別瞭解關於異步任務的新屬性和事件:服務器

 

新配置:swoole

1) task_worker_num:配置 Task 進程的數量。【默認值:未配置則不啓動 task異步

配置此參數後將會啓用 task 功能。因此 Server 必需要註冊 onTask、onFinish 2 個事件回調函數。若是沒有註冊,服務器程序將沒法啓動。socket

注意:async

Task 進程是同步阻塞的;函數

最大值不得超過 swoole_cpu_num() * 1000;學習

若是單個 task 的處理須要 100ms,那一個進程 1 秒就能夠處理 1/0.1=10 個 task;則task 投遞的速度,如每秒產生 2000 個 task,2000/10=200,就須要設置 task_worker_num => 200,啓用 200 個 Task 進程;

Task 進程內不能使用 Swoole\Server->task 方法。

 

2) task_max_request:設置 task 進程的最大任務數。【默認值:0】

設置 task 進程的最大任務數。一個 task 進程在處理完超過此數值的任務後將自動退出。這個參數是爲了防止 PHP 進程內存溢出。若是不但願進程自動退出能夠設置爲 0。

 

3) task_tmpdir:設置 task 的數據臨時目錄。【默認值:Linux /tmp 目錄】

在 Server 中,若是投遞的數據超過 8180 字節,將啓用臨時文件來保存數據。這裏的 task_tmpdir 就是用來設置臨時文件保存的位置。

注意:

底層默認會使用 /tmp 目錄存儲 task 數據,若是你的 Linux 內核版本太低,/tmp 目錄不是內存文件系統,能夠設置爲 /dev/shm/;

task_tmpdir 目錄不存在,底層會嘗試自動建立

 

4)task_use_object:使用面向對象風格的 Task 回調格式。【默認值:false】

設置爲 true 時,onTask 回調將變成對象模式。

//面向對象風格代碼示例$server = new Swoole\Server('127.0.0.1', 9501);
$server->set([
    'worker_num'      => 1,
    'task_worker_num' => 3,
    'task_use_object' => true,
]);
$server->on('receive', function (Swoole\Server $server, $fd, $tid, $data) {
    $server->task(['fd' => $fd,]);
});
$server->on('Task', function (Swoole\Server $server, Swoole\Server\Task $task) {
    //此處$task是Swoole\Server\Task對象
    $server->send($task->data['fd'], json_encode($server->stats()));
});
$server->start();

 

5) task_ipc_mode(進階):設置 Task 進程與 Worker 進程之間通訊的方式。【默認值:1】

這是一個進階屬性,正常狀況下是不用設置,使用默認值便可。要了解這個屬性,請先看文末的高級話題 :什麼是IPC? 

 知道了什麼是IPC後,這個屬性可取如下三個值:

其中:

模式1:支持定向投遞,可在 task 和 taskwait 方法中使用 dst_worker_id,指定目標 Task進程。dst_worker_id 設置爲 -1 時,底層會判斷每一個 Task 進程的狀態,向當前狀態爲空閒的進程投遞任務。

模式二、3:消息隊列模式使用操做系統提供的內存隊列存儲數據,未指定 mssage_queue_key 消息隊列 Key,將使用私有隊列,在 Server 程序終止後會刪除消息隊列。指定消息隊列 Key 後 Server 程序終止後,消息隊列中的數據不會刪除,所以進程重啓後仍然能取到數據。這二者的不一樣之處在於,模式2 支持定向投遞,$serv->task($data, $task_worker_id) 能夠指定投遞到哪一個 task 進程。模式3 是徹底爭搶模式, task 進程會爭搶隊列,將沒法使用定向投遞,task/taskwait 將沒法指定目標進程 ID,即便指定了 $task_worker_id,在模式3 下也是無效的。另外:模式3 會影響 sendMessage 方法,使 sendMessage 發送的消息會隨機被某一個 task 進程獲取。

 

 

新函數

函數task:投遞一個異步任務到 task_worker 池中。此函數是非阻塞的,執行完畢會當即返回。Worker 進程能夠繼續處理新的請求。

Swoole\Server->task(mixed $data, int $dstWorkerId = -1): int

$data:要投遞的任務數據,必須是可序列化的 PHP 變量。

$dstWorkerId:能夠指定要給投遞給哪一個 Task 進程,傳入 ID 便可,範圍參考 $worker_id;默認值:-1。

 

函數finish: 用於在 Task 進程中通知 Worker 進程,投遞的任務已完成。此函數能夠傳遞結果數據給 Worker 進程(即,觸發worker進程的onFinish事件)。

Swoole\Server->finish(mixed $data)

$data:任務處理的結果內容

注意:·

finish 方法能夠連續屢次調用,Worker 進程會屢次觸發 onFinish 事件;

在 onTask 回調函數中調用過 finish 方法後,return 數據依然會觸發 onFinish 事件;

Server->finish 是可選的。若是 Worker 進程不關心任務執行的結果,不須要調用此函數;

在 onTask 回調函數中 return 字符串,等同於調用 finish;

 

 

新事件

1) 事件onTask:在 task 進程內被調用。worker 進程可使用 task 函數向 task_worker 進程投遞新的任務。當前的 Task 進程在調用 onTask 回調函數時會將進程狀態切換爲忙碌,這時將再也不接收新的 Task,當 onTask 函數返回時會將進程狀態切換爲空閒而後繼續接收新的 Task。

function onTask(Swoole\Server $server, int $task_id, int $src_worker_id, mixed $data);

$server:Swoole\Server 對象

$task_id:執行任務的 task 進程 id【$task_id 和 $src_worker_id 組合起來纔是全局惟一的,不一樣的 worker 進程投遞的任務 ID 可能會有相同

$src_worker_id:投遞任務的 worker 進程 id

$data:任務的數據內容

注意:

若是開啓了 task_enable_coroutine 則回調函數原型是:

$server->on('Task', function (Swoole\Server $server, Swoole\Server\Task $task) {
    $task->worker_id;              //來自哪一個`Worker`進程
    $task->id;                     //任務的編號
    $task->flags;                  //任務的類型,taskwait, task, taskCo, taskWaitMulti 可能使用不一樣的 flags
    $task->data;                   //任務的數據
    co::sleep(0.2);                //協程 API
    $task->finish([123, 'hello']); //完成任務,結束並返回數據
});

在 onTask 函數中 return 字符串(return 的變量能夠是任意非 null 的 PHP 變量),表示將此內容返回給 worker 進程。也能夠經過 Swoole\Server->finish() 來觸發 onFinish 函數,而無需再 return。此時worker 進程中會觸發 onFinish 函數,表示投遞的 task 已完成。

onTask 函數執行時遇到致命錯誤退出,或者被外部進程強制 kill,當前的任務會被丟棄,但不會影響其餘正在排隊的 Task。

 

2)事件onFinish:在 worker 進程被調用,當 worker 進程投遞的任務在 task 進程中完成時被觸發。

function onFinish(Swoole\Server $server, int $task_id, mixed $data)

$server:Swoole\Server 對象;

$task_id:執行任務的 task 進程 id;

$data:任務處理的結果內容。

注意:

- task 進程的 onTask 事件中沒有調用 finish 方法或者 return 結果,worker 進程不會觸發 onFinish。

-執行 onFinish 邏輯的 worker 進程與下發 task 任務的 worker 進程是同一個進程。

 

 

關於異步任務的注意點:

-使用消息隊列通訊,若是 Task進程 處理能力低於投遞速度,可能會引發 Worker 進程阻塞。

-使用消息隊列通訊後 task 進程沒法支持協程 (開啓 task_enable_coroutine)。

 

 

 

----------- 高級話題分隔線--------------

什麼是IPC

同一臺主機上兩個進程間通訊 (簡稱 IPC) 的方式有不少種,在 Swoole 下咱們使用了 2 種方式 Unix Socket 和 sysvmsg,下面分別介紹:

第一種:Unix Socket

全名 UNIX Domain Socket, 簡稱 UDS, 使用套接字的 API (socket,bind,listen,connect,read,write,close 等),和 TCP/IP 不一樣的是不須要指定 ip 和 port,而是經過一個文件名來表示 (例如 FPM 和 Nginx 之間的 /tmp/php-fcgi.sock),UDS 是 Linux 內核實現的全內存通訊,無任何 IO 消耗。在 1 進程 write,1 進程 read,每次讀寫 1024 字節數據的測試中,100 萬次通訊僅需 1.02 秒,並且功能很是的強大,Swoole 下默認用的就是這種 IPC 方式

Swoole 下面使用 UDS 通信有兩種類型:SOCK_STREAM 和 SOCK_DGRAM,能夠簡單的理解爲 TCP 和 UDP 的區別,當使用 SOCK_STREAM 類型的時候一樣須要考慮 TCP 粘包問題。

當使用 SOCK_DGRAM 類型的時候不須要考慮粘包問題,每一個 send() 的數據都是有邊界的,發送多大的數據接收的時候就收到多大的數據,沒有傳輸過程當中的丟包、亂序問題,send 寫入和 recv 讀取的順序是徹底一致的。send 返回成功後必定是能夠 recv 到。
在 IPC 傳輸的數據比較小時很是適合用 SOCK_DGRAM 這種方式,因爲 IP 包每一個最大有 64k 的限制,因此用 SOCK_DGRAM 進行 IPC 時候單次發送數據不能大於 64k,同時要注意收包速度太慢操做系統緩衝區滿了會丟棄包,由於 UDP 是容許丟包的,能夠適當調大緩衝區。

第二種:sysvmsg

即 Linux 提供的消息隊列,這種 IPC 方式經過一個文件名來做爲 key 進行通信,這種方式很是的不靈活,實際項目使用的並很少,不作過多介紹。此種 IPC 方式只有兩個場景下有用:

1)防止丟數據,若是整個服務都掛掉,再次啓動隊列中的消息也在,能夠繼續消費,但一樣有髒數據的問題。

2)能夠外部投遞數據,好比 Swoole 下的 Worker進程經過消息隊列給 Task進程投遞任務,第三方的進程也能夠投遞任務到隊列裏面讓 Task 消費,甚至能夠在命令行手動添加消息到隊列。

 

 

 

---------------------------  我是可愛的分割線  ----------------------------

最後博主借地宣傳一下,漳州編程小組招新了,這是一個面向漳州青少年信息學/軟件設計的學習小組,有意向的同窗點擊連接,聯繫我吧。

相關文章
相關標籤/搜索