1 <?php
2 namespace Helloweba\Swoole;
3
4 use swoole_server;
5
6 /**
7 * 任務調度
8 */
9 class Task
10 {
11 protected $serv;
12 protected $host = '127.0.0.1';
13 protected $port = 9506;
14 // 進程名稱
15 protected $taskName = 'swooleTask';
16 // PID路徑
17 protected $pidPath = '/run/swooletask.pid';
18 // 設置運行時參數
19 protected $options = [
20 'worker_num' => 4, //worker進程數,通常設置爲CPU數的1-4倍
21 'daemonize' => true, //啓用守護進程
22 'log_file' => '/data/log/swoole-task.log', //指定swoole錯誤日誌文件
23 'log_level' => 0, //日誌級別 範圍是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
24 'dispatch_mode' => 1, //數據包分發策略,1-輪詢模式
25 'task_worker_num' => 4, //task進程的數量
26 'task_ipc_mode' => 3, //使用消息隊列通訊,並設置爲爭搶模式
27 ];
28
29 public function __construct($options = [])
30 {
31 date_default_timezone_set('PRC');
32 // 構建Server對象,監聽127.0.0.1:9506端口
33 $this->serv = new swoole_server($this->host, $this->port);
34
35 if (!empty($options)) {
36 $this->options = array_merge($this->options, $options);
37 }
38 $this->serv->set($this->options);
39
40 // 註冊事件
41 $this->serv->on('Start', [$this, 'onStart']);
42 $this->serv->on('Connect', [$this, 'onConnect']);
43 $this->serv->on('Receive', [$this, 'onReceive']);
44 $this->serv->on('Task', [$this, 'onTask']);
45 $this->serv->on('Finish', [$this, 'onFinish']);
46 $this->serv->on('Close', [$this, 'onClose']);
47 }
48
49 public function start()
50 {
51 // Run worker
52 $this->serv->start();
53 }
54
55 public function onStart($serv)
56 {
57 // 設置進程名
58 cli_set_process_title($this->taskName);
59 //記錄進程id,腳本實現自動重啓
60 $pid = "{$serv->master_pid}\n{$serv->manager_pid}";
61 file_put_contents($this->pidPath, $pid);
62 }
63
64 //監聽鏈接進入事件
65 public function onConnect($serv, $fd, $from_id)
66 {
67 $serv->send( $fd, "Hello {$fd}!" );
68 }
69
70 // 監聽數據接收事件
71 public function onReceive(swoole_server $serv, $fd, $from_id, $data)
72 {
73 echo "Get Message From Client {$fd}:{$data}\n";
74 //$this->writeLog('接收客戶端參數:'.$fd .'-'.$data);
75 $res['result'] = 'success';
76 $serv->send($fd, json_encode($res)); // 同步返回消息給客戶端
77 $serv->task($data); // 執行異步任務
78 }
79
80 /**
81 * @param $serv swoole_server swoole_server對象
82 * @param $task_id int 任務id
83 * @param $from_id int 投遞任務的worker_id
84 * @param $data string 投遞的數據
85 */
86 public function onTask(swoole_server $serv, $task_id, $from_id, $data)
87 {
88 swoole_timer_tick(30000, function($timer) use ($task_id) { // 啓用定時器,每30秒執行一次
89 $memPercent = $this->getMemoryUsage();
90 echo date('Y-m-d H:i:s') . '當前內存使用率:'.$memPercent."\n";
91 });
92 }
93
94
95 /**
96 * @param $serv swoole_server swoole_server對象
97 * @param $task_id int 任務id
98 * @param $data string 任務返回的數據
99 */
100 public function onFinish(swoole_server $serv, $task_id, $data)
101 {
102 //
103 }
104
105
106 // 監聽鏈接關閉事件
107 public function onClose($serv, $fd, $from_id) {
108 echo "Client {$fd} close connection\n";
109 }
110
111 public function stop()
112 {
113 $this->serv->stop();
114 }
115
116 private function getMemoryUsage()
117 {
118 // MEMORY
119 if (false === ($str = @file("/proc/meminfo"))) return false;
120 $str = implode("", $str);
121 preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buf);
122 //preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buffers);
123
124 $memTotal = round($buf[1][0]/1024, 2);
125 $memFree = round($buf[2][0]/1024, 2);
126 $memUsed = $memTotal - $memFree;
127 $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0;
128
129 return $memPercent;
130 }
131 }