<?php include 'vendor/autoload.php'; class server { private $serv; private $db; /** * [__construct description] * 構造方法中,初始化 $serv 服務 */ public function __construct() { $this->serv = new swoole_server('0.0.0.0', 9501); //初始化swoole服務 $this->serv->set(array( 'worker_num' => 1, 'daemonize' => false, //是否做爲守護進程,此配置通常配合log_file使用 'max_request' => 1000, 'log_file' => './swoole.log', 'task_worker_num' => 2 )); //設置監聽 $this->serv->on('Start', array($this, 'onStart')); $this->serv->on('WorkerStart', array($this, 'onWorkerStart')); $this->serv->on('Connect', array($this, 'onConnect')); $this->serv->on("Receive", array($this, 'onReceive')); $this->serv->on("Close", array($this, 'onClose')); $this->serv->on("Task", array($this, 'onTask')); $this->serv->on("Finish", array($this, 'onFinish')); //開啓 $this->serv->start(); } /** * 鏈接MySQL */ public function connect() { $this->db = new medoo([ 'database_type' => 'mysql', 'database_name' => 'xxxx', 'server' => '127.0.0.1', 'username' => 'root', 'password' => 'xxxxx', 'prefix' => '', 'charset' => 'utf8' ]); } /** * 在worker啓動的時候,建立MySQL鏈接池 * @param $ser * @param $worker_id */ public function onWorkerStart($ser, $worker_id) { if ($ser->taskworker) { $this->connect(); echo "taskWorker Starting ." . spl_object_hash($this->db) . ".........\n"; } else { echo 'wokerStart...db has=>' . PHP_EOL; } } public function onStart($serv) { echo SWOOLE_VERSION . " onStart\n"; } public function onConnect($serv, $fd) { echo $fd . "Client Connect.\n"; } /** * 這個發生在worker進程中,當接受到來自mater進程中reactor線程轉發過來的客戶端 * @param $serv * @param $fd * @param $from_id * @param $data */ public function onReceive($serv, $fd, $from_id, $data) { echo "Get Message From Client {$fd}:{$data}\n"; $param = array( 'fd' => $fd ); $serv->task(json_encode($param)); echo "Continue Handle Worker\n"; } public function onClose($serv, $fd) { echo "Client Close.\n"; } /** * 異步任務,用來生成csv表格數據導出 * @param $serv * @param $task_id * @param $from_id * @param $data * @return string */ public function onTask($serv, $task_id, $from_id, $data) { echo "This Task {$task_id} from Worker {$from_id}\n"; for ($i = 0; $i < 2; $i++) { $all_pager = $this->db->count('dtb_checking'); echo '總條數:' . $all_pager . PHP_EOL; // 其實這裏須要判斷是不是鏈接超時,等一系列操做,可是這裏就是爲了測試數據庫鏈接失效狀況。 if ($all_pager === false) { echo '從新連' . PHP_EOL; $this->connect(); continue; } $fp = fopen(__DIR__ . DIRECTORY_SEPARATOR . rand() . '.csv', 'w'); $page_size = 500; $page = ceil($all_pager / $page_size); for ($p = 1; $p <= $page; $p++) { echo '第' . $p . '頁' . PHP_EOL; $start = ($p - 1) * $page_size; $list = $this->db->select('dtb_checking', ['id', 'real_name', 'account_no'], ['LIMIT' => [$start, 500]]); foreach ($list as $k => $row) { $temp = [ iconv('utf-8', 'gbk', $row['id']), iconv('utf-8', 'gbk', $row['real_name']), iconv('utf-8', 'gbk', $row['account_no']), ]; fputcsv($fp, $temp); } } break; } return "Task {$task_id}'s result"; } public function onFinish($serv, $task_id, $data) { echo "Task {$task_id} finish\n"; echo "Result: {$data}\n"; } } $server = new server();