這是關於 Swoole 學習的第七篇文章:Swoole RPC 的實現。php
有位讀者說 「上篇文章,下載代碼後直接運行成功,代碼簡潔明瞭,簡直是 Swoole 入門最好的 Demo 」。react
「哈哈哈...」json
還有讀者說 「有一塊兒學習的組織羣嗎,能夠在裏面進行疑難答疑?」segmentfault
這個還真沒有,總以爲維護一個微信羣不容易,由於本身自己就不愛在羣裏說話,另外,本身也在不少微信羣中,開始氛圍挺好的,你們都聊聊技術,後來技術聊的少了改爲聊八卦啦,再後來慢慢就安靜了,還有在羣裏起衝突的...緩存
固然我也知道維護一個微信羣的好處是很是大的,若是有這方面經驗的同窗,我們一塊兒交流交流 ~ 安全
還有出版社找我寫書的.微信
他們也真是放心,我本身肚子裏幾滴墨水仍是知道的,目前確定是不行,之後嘛,再說。負載均衡
還有一些大佬加了微信,多是出於對晚輩的提攜吧,偷偷告訴你,從大佬的朋友圈能學到不少東西。框架
我真誠的建議,作技術的應該本身多總結總結,將本身會的東西寫出來分享給你們,先不說給別人帶來太多的價值,反正對本身的幫助是很是很是大的,這方面想交流的同窗,能夠加我,我能夠給你無私分享。curl
可能都會說時間少,時間只要擠,總會有的,每一個人都 24 小時,時間對每一個人是最公平的。說到這推薦你們讀一下《暗時間》這本書,這是我這本書的 讀書筆記,你們能夠瞅瞅。
開始今天的文章吧,這篇文章實現了一個簡單的 RPC 遠程調用,在實現以前須要先了解什麼是 RPC,不清楚的能夠看下以前發的這篇文章 《我眼中的 RPC》。
下面的演示代碼主要使用了 Swoole 的 Task 任務池,經過 OnRequest/OnReceive 得到信息交給 Task 去處理。
舉個工做中的例子吧,在電商系統中的兩個模塊,我的中心模塊和訂單管理模塊,這兩個模塊是獨立部署的,可能不在一個機房,可能不是一個域名,如今我的中心須要經過 用戶ID 和 訂單類型 獲取訂單數據。
//代碼片斷 <?php $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ]; $ch = curl_init(); $options = [ CURLOPT_URL => 'http://10.211.55.4:9509/', CURLOPT_POST => 1, CURLOPT_POSTFIELDS => json_encode($demo), ]; curl_setopt_array($ch, $options); curl_exec($ch); curl_close($ch);
//代碼片斷 $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ]; $this->client->send(json_encode($demo));
發出請求後,分配給 Task ,並等待 Task 執行完成後,再返回。
發出請求後,分配給 Task 以後,就直接返回。
$demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'get_list', 'param' => [ 'uid' => 1, 'type' => 2, ], ], ];
<?php if (!defined('SERVER_PATH')) exit("No Access"); class OnRequest { private static $query; private static $code; private static $msg; private static $data; public static function run($serv, $request, $response) { try { $data = decrypt($request->rawContent()); self::$query = $data; if (empty($data)) { self::$code = '-1'; self::$msg = '非法請求'; self::end($request, $response); } //TODO 驗證Token switch ($data['type']) { case 'SW': //單個請求,等待結果 $task = [ 'request' => $data, 'server' => 'http' ]; $rs = $serv->task(json_encode($task), -1, function ($serv, $task_id, $rs_data) use ($request, $response) { self::$code = '1'; self::$msg = '成功'; self::$data = $rs_data['response']; self::end($request, $response); }); if ($rs === false) { self::$code = '-1'; self::$msg = '失敗'; self::end($request, $response); } break; case 'SN': //單個請求,不等待結果 $task = [ 'request' => $data, 'server' => 'http' ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失敗'; } else { self::$code = '1'; self::$msg = '成功'; } self::end($request, $response); break; default: self::$code = '-1'; self::$msg = '非法請求'; self::end($request, $response); } } catch(Exception $e) { } } private static function end($request = null, $response = null) { $rs['request_method'] = $request->server['request_method']; $rs['request_time'] = $request->server['request_time']; $rs['response_time'] = time(); $rs['code'] = self::$code; $rs['msg'] = self::$msg; $rs['data'] = self::$data; $rs['query'] = self::$query; $response->end(json_encode($rs)); self::$data = []; return; } }
<?php if (!defined('SERVER_PATH')) exit("No Access"); class OnReceive { private static $request_time; private static $query; private static $code; private static $msg; private static $data; public static function run($serv, $fd, $reactor_id, $data) { try { self::$request_time = time(); $data = decrypt($data); self::$query = $data; //TODO 驗證Token switch ($data['type']) { case 'SW': //單個請求,等待結果 $task = [ 'fd' => $fd, 'request' => $data, 'server' => 'tcp', 'request_time' => self::$request_time, ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失敗'; self::handlerTask($serv, $fd); } break; case 'SN': //單個請求,不等待結果 $task = [ 'fd' => $fd, 'request' => $data, 'server' => 'tcp', 'request_time' => self::$request_time, ]; $rs = $serv->task(json_encode($task)); if ($rs === false) { self::$code = '-1'; self::$msg = '失敗'; } else { self::$code = '1'; self::$msg = '成功'; } self::handlerTask($serv, $fd); break; default: self::$code = '-1'; self::$msg = '非法請求'; self::handlerTask($serv, $fd); } } catch(Exception $e) { } } private static function handlerTask($serv, $fd) { $rs['request_method'] = 'TCP'; $rs['request_time'] = self::$request_time; $rs['response_time'] = time(); $rs['code'] = self::$code; $rs['msg'] = self::$msg; $rs['data'] = self::$data; $rs['query'] = self::$query; $serv->send($fd, json_encode($rs)); } }
Demo 代碼僅供參考,裏面有不少不嚴謹的地方!
服務的調用方與提供方中間須要有一個服務註冊中心,很顯然上面的代碼中沒有,須要本身去實現。
服務註冊中心,負責管理 IP、Port 信息,提供給調用方使用,還要能負載均衡和故障切換。
根據本身的狀況,服務註冊中心實現可容易可複雜,用 Redis 也行,用 Zookeeper、Consul 也行。
感興趣的也能夠了解下網關 Kong ,包括 身份認證、權限認證、流量控制、監控預警...
再推薦一個 Swoole RPC 框架 Hprose,支持多語言。
就到這了,上面的 Demo 須要源碼的,加我微信。(菜單-> 加我微信-> 掃我)
本文歡迎轉發,轉發請註明做者和出處,謝謝!