swoole異步任務數據報表生成

<?php

include 'vendor/autoload.php';

class server
{
    private $serv;
    private $db;

    /**
     * [__construct description]
     * 構造方法中,初始化 $serv 服務
     */
    public function __construct()
    {
        $this->serv = new swoole_server('0.0.0.0', 9501);
        //初始化swoole服務
        $this->serv->set(array(
            'worker_num' => 1,
            'daemonize' => false, //是否做爲守護進程,此配置通常配合log_file使用
            'max_request' => 1000,
            'log_file' => './swoole.log',
            'task_worker_num' => 2
        ));

        //設置監聽
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on("Receive", array($this, 'onReceive'));
        $this->serv->on("Close", array($this, 'onClose'));
        $this->serv->on("Task", array($this, 'onTask'));
        $this->serv->on("Finish", array($this, 'onFinish'));

        //開啓
        $this->serv->start();
    }

    /**
     * 鏈接MySQL
     */
    public function connect()
    {
        $this->db = new medoo([
            'database_type' => 'mysql',
            'database_name' => 'xxxx',
            'server' => '127.0.0.1',
            'username' => 'root',
            'password' => 'xxxxx',
            'prefix' => '',
            'charset' => 'utf8'
        ]);
    }

    /**
     * 在worker啓動的時候,建立MySQL鏈接池
     * @param $ser
     * @param $worker_id
     */
    public function onWorkerStart($ser, $worker_id)
    {
        if ($ser->taskworker) {
            $this->connect();
            echo "taskWorker Starting ." . spl_object_hash($this->db) . ".........\n";
        } else {
            echo 'wokerStart...db has=>' . PHP_EOL;
        }
    }

    public function onStart($serv)
    {
        echo SWOOLE_VERSION . " onStart\n";
    }

    public function onConnect($serv, $fd)
    {
        echo $fd . "Client Connect.\n";
    }

    /**
     * 這個發生在worker進程中,當接受到來自mater進程中reactor線程轉發過來的客戶端
     * @param $serv
     * @param $fd
     * @param $from_id
     * @param $data
     */
    public function onReceive($serv, $fd, $from_id, $data)
    {
        echo "Get Message From Client {$fd}:{$data}\n";
        $param = array(
            'fd' => $fd
        );
        $serv->task(json_encode($param));
        echo "Continue Handle Worker\n";
    }

    public function onClose($serv, $fd)
    {
        echo "Client Close.\n";
    }

    /**
     * 異步任務,用來生成csv表格數據導出
     * @param $serv
     * @param $task_id
     * @param $from_id
     * @param $data
     * @return string
     */
    public function onTask($serv, $task_id, $from_id, $data)
    {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        for ($i = 0; $i < 2; $i++) {
            $all_pager = $this->db->count('dtb_checking');
            echo '總條數:' . $all_pager . PHP_EOL;
            // 其實這裏須要判斷是不是鏈接超時,等一系列操做,可是這裏就是爲了測試數據庫鏈接失效狀況。
            if ($all_pager === false) {
                echo '從新連' . PHP_EOL;
                $this->connect();
                continue;
            }
            $fp = fopen(__DIR__ . DIRECTORY_SEPARATOR . rand() . '.csv', 'w');
            $page_size = 500;
            $page = ceil($all_pager / $page_size);
            for ($p = 1; $p <= $page; $p++) {
                echo '第' . $p . '頁' . PHP_EOL;
                $start = ($p - 1) * $page_size;
                $list = $this->db->select('dtb_checking', ['id', 'real_name', 'account_no'], ['LIMIT' => [$start, 500]]);
                foreach ($list as $k => $row) {
                    $temp = [
                        iconv('utf-8', 'gbk', $row['id']),
                        iconv('utf-8', 'gbk', $row['real_name']),
                        iconv('utf-8', 'gbk', $row['account_no']),
                    ];
                    fputcsv($fp, $temp);
                }
            }
            break;
        }
        return "Task {$task_id}'s result";
    }

    public function onFinish($serv, $task_id, $data)
    {
        echo "Task {$task_id} finish\n";
        echo "Result: {$data}\n";
    }
}

$server = new server();
相關文章
相關標籤/搜索