[Sw] 使用 Swoole Server task/協程 處理大數據量異步任務時注意

 

關於 Buffered Query 和 Unbuffered Query:http://www.php.net/manual/zh/mysqlinfo.concepts.buffering.phpphp

對於結果集小的查詢,通常就開啓 Buffered Query 一次取回(fetchAll);html

對於結果集很大的查詢,能夠開啓 Unbuffered Query 來遍歷資源一條條 fetch,避免撐爆客戶端內存;mysql

PDO 屬性設置:http://php.net/manual/zh/pdo.setattribute.phpreact

 

其它解決方案:sql

1. 高頻投遞(依賴進程數),少許處理(每批次數據),能夠本身用 Process 實現進程池處理隊列任務,或者使用自帶的 task 功能。數據庫

2. 使用自帶 task 功能的狀況下,若是 worker 不須要與 task worker 通信,那麼 onTask 不要使用 return 返回數據,減小消耗。編程

3. worker 使用 task( ) 投遞頻率必須小於 task 進程數(task_worker_num),能夠程序來限制。swoole

  比方說 $taskWorkerNum 是 50,併發

  任務投遞次數累加 $deliverNo,fetch

  onTask 內完成任務時計數 $serv->atomic->add(1),

  完成任務數 $serv->atomic->get() 得到。

  那麼在投遞以後須要進行判斷,投遞總數 - 完成數 >= 任務進程數,說明投遞次數滿了,暫停一下子,保證 task 進程不是滿負荷工做。

/**
 * 調度工做
* https://cnblogs.com/farwish
*/ public function onWorkerStart(\Swoole\Server $server, $workerId) { if ($workerId == 0) { $data = [1, 2, 3]; foreach ($data as $item) { // 限流與投遞 while (($server->deliverNo - $server->atomic->get()) >= $this->taskWorker) { echo "等待空閒 task 進程\n"; sleep(1); } $server->task($item); $server->deliverNo++; } // 任務結束後退出 server while (true) { if ($server->deliverNo == $server->atomic->get()) { $server->shutdown(); } sleep(1); } } }

服務初始化部分:

    public function initTaskServer()
    {
        $server = new \Swoole\Server('0.0.0.0');

        $server->atomic = new \Swoole\Atomic(0);
        $server->deliverNo = 0;

        $server->set([
            'worker_num' => 1,
            'task_worker_num' => $this->taskWorkerNum,
            'task_ipc_mode' => 1,
            'task_max_request' => 5000,
        ]);

        $server->on('workerStart', [$this, 'onWorkerStart']);
        $server->on('task', [$this, 'onTask']);
        $server->on('receive', [$this, 'onReceive']);
        $server->on('finish', [$this, 'onFinish']);

        $server->start();
    }

    protected function onWorkerStart(Server $server, $workerId)
    {
    }

    protected function onTask(Server $server, $taskId, $fromId, $data)
    {
    }

    protected function onReceive(Server $server, $fd, $reactorId, $data)
    {
    }

    protected function onFinish(Server $server, $taskId, $data)
    {
    }

 

4. 不使用 server 和 task 多進程的狀況,利用 swoole 協程中的 channel 實現 producer、consumer 模式,生產者 unbuffer query 持續 push 數據到通道,消費者持續 pop 消費;生產者沒有數據時可退出,消費者檢測到生產者退出後也隨即退出。

缺點是在複雜場景下(好比多層查詢再加循環處理),編程會比較困難,好比:等待全部子協程結束的功能(WaitGroup)、多 consumer 的場景,須要本身封裝不少組件。

 

其它:

多進程、多協程的狀況下,須要配合使用數據庫鏈接池,由於數據庫併發鏈接數資源有限。

多進程只是利用到了多核,計算密集型場景有優點;協程併發相比更輕量,單進程內利用I/O切換實現併發,適合IO密集型場景。 

 

Doc:https://wiki.swoole.com/wiki/page/481.html

Course:http://www.yzmedu.com/course/330

Link:http://www.javashuo.com/article/p-nhftlnts-d.html

相關文章
相關標籤/搜索