基於swoole的swoolefy實現相似go的waitGroup多併發協程調度

swoolefy是一個基於swoole實現的輕量級高性能的常駐內存型的API和Web應用服務框架, 高度封裝了http,websocket,udp服務器,以及基於tcp實現可擴展的rpc服務, 同時支持composer包方式安裝部署項目。基於實用,swoolefy抽象Event事件處理類, 實現與底層的回調的解耦,支持協程調度,同步|異步調用,全局事件註冊,心跳檢查,異步任務,多進程(池)等, 內置view、log、session、mysql、redis、mongodb等經常使用組件等。php

目前swoolefy4.2+版本徹底支持swoole4.2.13+的協程,推薦使用swoole4.3+mysql

GitHub:https://github.com/bingcool/s...git

下面主要講解一下如何實現了相似go的waitGroup的功能
一、定義GoWaitGroup的類:github

<?php
/**
+----------------------------------------------------------------------
| swoolefy framework bases on swoole extension development, we can use it easily!
+----------------------------------------------------------------------
| Licensed ( https://opensource.org/licenses/MIT )
+----------------------------------------------------------------------
| Author: bingcool <bingcoolhuang@gmail.com || 2437667702@qq.com>
+----------------------------------------------------------------------
 */

namespace Swoolefy\Core;

use Swoole\Coroutine\Channel;

class GoWaitGroup {
    /**
     * @var int
     */
    private $count = 0;

    /**
     * @var Channel
     */
    private $chan;

    /**
     * @var array
     */
    private $result = [];

    /**
     * WaitGroup constructor
     */
    public function __construct() {
        $this->chan = new Channel;
    }

    /**
     * add
     */
    public function go(\Closure $go_func = null) {
        $this->count++;
        if($go_func instanceof \Closure) {
            go($go_func);
        }
    }

    /**
     * start
     */
    public function start() {
        $this->count++;
        return $this->count;
    }

    /**
     * done
     */
    public function done(string $key, $data = null) {
        if(!empty($data)) {
            $this->result[$key] = $data;
        }
        $this->chan->push(1);
    }

    /**
     * wait
     */
    public function wait() {
        while($this->count--) {
            $this->chan->pop();
        }
        $result = $this->result;
        $this->result = [];
        $this->count = 0;
        return $result;
    }

}

二、在swoolefy中調用web

class GroupController extends BController {
    public function waitgroup() {
        // 建立一個waitGroup實例
        $wg = new \Swoolefy\Core\GoWaitGroup();
         
         // 第一種方式,直接$wg->go()函數中執行go的協程函數
        $wg->go(function() use ($wg) {
            // 掛起協程
            $fp = stream_socket_client("tcp://www.baidu.com:80", $errno, $errstr, 30);
            // 協程返回的數據
            $wg->done('mysql', 'mysql');
        });

        $wg->go(function() use ($wg) {
            sleep(1);
            $wg->done('tengxun', 'weixin and qq');
        });

        //掛起當前協程,等待全部任務完成後恢復
        //$result = $wg->wait();
        //這裏 $result 包含了 1 個任務執行結果
        //var_dump($result);
        
        //第二種方式,添加$wg->start(),啓動協程,而後使用swoole的原生go執行協程函數
        $wg->start();
        go(function () use ($wg) {
            // 掛起協程
            sleep(1);
            $wg->done('taobao', 'ali baba');
        });
        
         //第二種方式,添加$wg->start(),啓動協程,而後使用swoole的原生go執行協程函數
        $wg->start();
        go(function () use ($wg) {
            // 掛起協程
            sleep(1);
            $wg->done('baidu', 'baidu');
        });
        // 以上三個協程將會併發調用,wait()函數實現等待三個協程數據返回
        //掛起當前協程,讓出cpu控制權,cpu能夠作其餘的事情,直到待全部任務完成後恢復
        $result = $wg->wait();
        //這裏 $result 包含了 2 個任務執行結果
        var_dump($result);
    }
}

至此一個最簡單的併發調用就完成了,你能夠愉快使用gowaitGroup的協程調用了redis

相關文章
相關標籤/搜索