做者:bromine
連接:https://www.jianshu.com/p/411...
來源:簡書
著做權歸做者全部,本文已得到做者受權轉載,並對原文進行了從新的排版。
Swoft Github: https://github.com/swoft-clou...php
Swoft提供了一個自建RPC(遠程方法調用)實現,讓你能夠方便的調用其餘Swoft上的服務。git
RPC有兩種啓動方式Http伴隨啓動和RPC單獨啓動。值得一提的是目前swoole的tcp服務即RPC服務,暫沒有其餘的tcp服務功能,因此基本上tcp相關的配置指代的就是RPC。github
Swoft 的 RPC 服務在Http服務啓動時候伴隨啓動json
//Swoft\Http\Server\Http\HttpServer.php /** * Http Server */ class HttpServer extends AbstractServer /** * Start Server * * @throws \Swoft\Exception\RuntimeException */ public function start() { //code ... //根據.env配置文件Server區段的TCPABLE字段決定是否啓動RPC服務 if ((int)$this->serverSetting['tcpable'] === 1) { $this->registerRpcEvent(); } //code .... } }
初始化流程即根據相關注解註冊一個swoole監聽segmentfault
//Swoft\Http\Server\Http\HttpServer.php /** * Register rpc event, swoft/rpc-server required * * @throws \Swoft\Exception\RuntimeException */ protected function registerRpcEvent() { //含有@SwooleListener且type爲SwooleEvent::TYPE_PORT的Bean,即RpcEventListener $swooleListeners = SwooleListenerCollector::getCollector(); if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) { throw new RuntimeException("Please use swoft/rpc-server, run 'composer require swoft/rpc-server'"); } //添加swoole RPC相關的tcp監聽端口,使用的是.env文件中的TCP區段配置 $this->listen = $this->server->listen($this->tcpSetting['host'], $this->tcpSetting['port'], $this->tcpSetting['type']); $tcpSetting = $this->getListenTcpSetting(); $this->listen->set($tcpSetting); //根據RpcEventListener的相關注解添加監聽處理句柄 $swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0]; $this->registerSwooleEvents($this->listen, $swooleRpcPortEvents); }
因爲是第一版,根據@SwooleListener
獲取RPC監聽Bean的相關處理暫時還有點生硬。
目前swoft中type爲SwooleEvent::TYPE_PORT
的@SwooleListener
只有RpcEventListener
一個,若是添加了同類Bean容易出問題,穩定版出的時候應該會有相關優化。服務器
入口從Swoft\Http\Server\Command\ServerCommand
換成Swoft\Rpc\Server\Command\RpcCommand
,流程和Http大同小異,就是swoole的設定監聽,僅僅是少了HTTP相關的監聽接口和事件而已,此處再也不贅述。swoole
RPC服務器和HTTP服務器的區別僅僅在於與客戶端交互報文格式和報文所在的網絡層(Swoft的RPC基於TCP層次),運行原理基本相通,都是路由,中間件,RPC Service(對應Http的Controller),你徹底能夠以Http服務的思路去理解他。網絡
Swoole的RPC-TCP監聽設置好後,RPC服務端就能夠開始接受請求了。RpcEventListener
的負責的工做僅僅是把收到的數據轉發給\Swoft\Rpc\Server\ServiceDispatcher
分發。Dispatcher
會將請求傳遞給各個Middleware中間件,最終最終傳遞給HandlerAdapterMiddleware
處理。app
PackerMiddleware 是RPC中比較重要的一箇中間件,負責將TCP請求中數據流解包和數據流封包。composer
<?php //Swoft\Rpc\Server\Middleware.PackerMiddleware namespace Swoft\Rpc\Server\Middleware; /** * service packer * * @Bean() */ class PackerMiddleware implements MiddlewareInterface { /** * packer middleware * * @param \Psr\Http\Message\ServerRequestInterface $request * @param \Psr\Http\Server\RequestHandlerInterface $handler * * @return \Psr\Http\Message\ResponseInterface */ public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface { //獲取servicePacker Bean(\Swoft\Rpc\Packer\ServicePacker)用於字符串解包封包 $packer = service_packer(); $data = $request->getAttribute(self::ATTRIBUTE_DATA); $data = $packer->unpack($data); // 觸發一個RpcServerEvent::BEFORE_RECEIVE事件,默認只有一個用於添加請求上下文信息的BeforeReceiveListener // 利用中間件觸發流程關鍵事件的作法耦合有點高,猜想之後會調整 App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data); //替換解包後的解包到Request中,提供給後續中間件和Handler使用 $request = $request->withAttribute(self::ATTRIBUTE_DATA, $data); /* @var \Swoft\Rpc\Server\Rpc\Response $response */ $response = $handler->handle($request); //爲Response封包返回給RPC客戶端 $serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE); $serviceResult = $packer->pack($serviceResult); return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult); } }
RouterMiddleware負責根據RPC請求的method,version,interface 獲取處理的RPC服務類,充當了路由的做用
<?php //Swoft\Rpc\Server\Middleware\RouterMiddleware.php /** * Service router * * @Bean() */ class RouterMiddleware implements MiddlewareInterface { /** * get handler from router * * @param \Psr\Http\Message\ServerRequestInterface $request * @param \Psr\Http\Server\RequestHandlerInterface $handler * * @return \Psr\Http\Message\ResponseInterface */ public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface { // service data $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA); $method = $data['method']??""; $version = $data['version']??""; $interface = $data['interface']??""; /* @var \Swoft\Rpc\Server\Router\HandlerMapping $serviceRouter */ $serviceRouter = App::getBean('serviceRouter'); //路由匹配,即向Swoft\Rpc\Server\Router\HandlerMapping->$routes獲取RPC服務信息 $serviceHandler = $serviceRouter->getHandler($interface, $version, $method); // deliver service data $request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler); return $handler->handle($request); } }
Swoft啓動階段會掃描並初始化註解信息(參考註解章節),註解初始化完畢後會觸發一個AppEvent::APPLICATION_LOADER
事件,此時會未來自@Service的全部RPC的路由信息註冊到Swoft\Rpc\Server\Router\HandlerMapping->$routes
中,用於serviceRouter Bean
的路由匹配。
HandlerAdapterMiddleware
最終轉發請求給HandlerAdapter
處理,HandlerAdapter會使用剛剛RouterMiddleware匹配到的服務類信息轉發請求並封裝Response最終返回給ServiceDispatcher,ServiceDispatcher會返回TCP流給客戶端而後結束本次請求。
<?php //Swoft\Rpc\Server\Router\HandlerAdapter.php /** * Service handler adapter * @Bean("serviceHandlerAdapter") */ class HandlerAdapter implements HandlerAdapterInterface { /** * Execute service handler * * @param \Psr\Http\Message\ServerRequestInterface $request * @param array $handler * @return Response */ public function doHandler(ServerRequestInterface $request, array $handler): Response { // RPC方法的各個參數 $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA); $params = $data['params'] ?? []; //路由解析出來的,處理該請求的服務Bean和方法 list($serviceClass, $method) = $handler; $service = App::getBean($serviceClass); // execute handler with params $response = PhpHelper::call([$service, $method], $params); $response = ResponseHelper::formatData($response); // 構造Response返回客戶端 if (! $response instanceof Response) { $response = (new Response())->withAttribute(self::ATTRIBUTE, $response); } return $response; } }
在Bean的屬性中聲明@Reference
,Swoft即會根據@var
聲明的類型注入相應的RPC客戶端實例。
/** * @Reference(name="useraaaaaa") * * @var DemoInterface */ private $demoService;
依賴注入的實現會專門另外用一篇文章單獨解釋,這裏先看看RPC客戶端的相關代碼。
namespace Swoft\Rpc\Client\Service; /** * The proxy of service */ class ServiceProxy { /** * @param string $className * @param string $interfaceClass */ public static function loadProxyClass(string $className, string $interfaceClass) { $reflectionClass = new \ReflectionClass($interfaceClass); $reflectionMethods = $reflectionClass->getMethods(\ReflectionMethod::IS_PUBLIC); $template = "class $className extends \\Swoft\\Rpc\\Client\\Service implements {$interfaceClass} {"; //\Swoft\Rpc\Client\Service::class // the template of methods $template .= self::getMethodsTemplate($reflectionMethods); $template .= "}"; eval($template); } //code ... }
和AOP同樣,原理同樣是使用了動態代理,更具體的說法是動態遠程代理。
RPC動態客戶端類實現了客戶端聲明的Interface類型(如DemoInterface)並繼承了\Swoft\Rpc\Client\Service
類。
動態類的實現很簡單,對於接口顯式聲明的方法,實際上都是調用\Swoft\Rpc\Client\Service->call()
方法。
interface DemoInterface { /** * @param array $ids * @return array */ public function getUsers(array $ids); }
class 動態生成RPC客戶端類 extends \Swoft\Rpc\Client\Service implements \App\Lib\DemoInterface { public function getUsers ( array $ids ) { $params = func_get_args(); return $this->call('getUsers', $params); } //code ... }
對於自動生成的defer方法,則是經過魔術方法__call()
,調用\Swoft\Rpc\Client\Service->deferCall()
/** * @param string $name * @param array $arguments * * @return ResultInterface * @throws RpcClientException */ function __call(string $name, array $arguments) { $method = $name; $prefix = self::DEFER_PREFIX;//'defer' if (strpos($name, $prefix) !== 0) { throw new RpcClientException(sprintf('the method of %s is not exist! ', $name)); } if ($name == $prefix) { $method = array_shift($arguments); } elseif (strpos($name, $prefix) === 0) { $method = lcfirst(ltrim($name, $prefix)); } return $this->deferCall($method, $arguments); }
咱們這裏只看具備表明性的call()
方法,deferCall()
大體相同。
RPC客戶端動態類的本質是將客戶端的參數和接口信息根據Swoft本身的格式傳遞給RPC服務端,而後將服務器返回的數據解包取出返回值返回給RPC的調用者,對外假裝成一個普通的對象,屏蔽遠程調用操做。
// Swoft\Rpc\Client\Service.php /** * Do call service * * @param string $func * @param array $params * * @throws \Throwable * @return mixed */ public function call(string $func, array $params) { $profileKey = $this->interface . '->' . $func; //根據@reference的fallback屬性獲取降級處理句柄,在RPC服務調用失敗的時候能夠會使用fallback句柄代替 $fallback = $this->getFallbackHandler($func); try { $connectPool = $this->getPool(); $circuitBreaker = $this->getBreaker(); /* @var $client AbstractServiceConnection */ $client = $connectPool->getConnection(); //數據封包,和RPC服務端一致 $packer = service_packer(); $type = $this->getPackerName(); $data = $packer->formatData($this->interface, $this->version, $func, $params); $packData = $packer->pack($data, $type); //經過熔斷器調用接口 $result = $circuitBreaker->call([$client, 'send'], [$packData], $fallback); if ($result === null || $result === false) { return null; } //和defercall不一致這裏直接收包,解包 App::profileStart($profileKey); $result = $client->recv(); App::profileEnd($profileKey); $connectPool->release($client); App::debug(sprintf('%s call %s success, data=%', $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE))); $result = $packer->unpack($result); $data = $packer->checkData($result); } catch (\Throwable $throwable) { if (empty($fallback)) { throw $throwable; } //RPC調用失敗則調用降級句柄,代替實際RPC服務直接返回 $data = PhpHelper::call($fallback, $params); } return $data; }
熔斷器的swoft-RPC的另外一重要概念,RPC的全部請求都經過熔斷器發送。
熔斷器使用狀態模式實現,熔斷器有開啓,半開,關閉 3種狀態,不一樣狀態下熔斷器會持有不一樣的狀態實例,狀態根據RPC調用狀況切換,熔斷器根據持有狀態實例的不一樣,行爲也有所不一樣。
<?php //Swoft\Sg\Circuit\CloseState.php /** * close狀態的熔斷器,對全部RPC調用都經過協程客戶端發送到RPC服務器 * 關閉狀態及切換 * 1. 重置failCounter=0 successCount=0 * 2. 操做失敗,failCounter計數 * 3. 操做失敗必定計數,切換爲open開啓狀態 */ class CloseState extends CircuitBreakerState { /** * 熔斷器調用 * * @param mixed $callback 回調函數 * @param array $params 參數 * @param mixed $fallback 失敗回調 * * @return mixed 返回結果 */ public function doCall($callback, $params = [], $fallback = null) { list($class, $method) = $callback; try { if ($class == null) { throw new \Exception($this->circuitBreaker->serviceName . "服務,鏈接創建失敗(null)"); } if ($class instanceof Client && $class->isConnected() == false) { throw new \Exception($this->circuitBreaker->serviceName . "服務,當前鏈接已斷開"); } //調用swoole協程客戶端的send()方法發送數據 $data = $class->$method(...$params); } catch (\Exception $e) { //遞增失敗計數 if ($this->circuitBreaker->isClose()) { $this->circuitBreaker->incFailCount(); } App::error($this->circuitBreaker->serviceName . "服務,當前[關閉狀態],服務端調用失敗,開始服務降級容錯處理,error=" . $e->getMessage()); //RPC調用失敗則使用降級接口 $data = $this->circuitBreaker->fallback($fallback); } //失敗次數過線則切換狀態 $failCount = $this->circuitBreaker->getFailCounter(); $switchToFailCount = $this->circuitBreaker->getSwitchToFailCount(); if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) { App::trace($this->circuitBreaker->serviceName . "服務,當前[關閉狀態],服務失敗次數達到上限,開始切換爲開啓狀態,failCount=" . $failCount); $this->circuitBreaker->switchToOpenState(); } App::trace($this->circuitBreaker->serviceName . "服務,當前[關閉狀態],failCount=" . $this->circuitBreaker->getFailCounter()); return $data; } }
<?php \\Swoft\Sg\Circuit\OpenState .php; /** * open狀態的熔斷器,對全部RPC調用都使用降級句柄代替 * 開啓狀態及切換(open) * 1. 重置failCounter=0 successCounter=0 * 2. 請求當即返回錯誤響應 * 3. 定時器必定時間後切換爲半開狀態(open) */ class OpenState extends CircuitBreakerState { /** * 熔斷器調用 * * @param mixed $callback 回調函數 * @param array $params 參數 * @param mixed $fallback 失敗回調 * * @return mixed 返回結果 */ public function doCall($callback, $params = [], $fallback = null) { $data = $this->circuitBreaker->fallback(); App::trace($this->getServiceName() . "服務,當前[開啓狀態],執行服務fallback服務降級容錯處理"); $nowTime = time(); if ($this->circuitBreaker->isOpen() && $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime() ) { $delayTime = $this->circuitBreaker->getDelaySwitchTimer(); // swoole定時器不是嚴格的,3s容錯時間 ,定時切換狀態的半開 $switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3; App::getTimer()->addAfterTimer('openState', $delayTime, [$this, 'delayCallback']); $this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime); App::trace($this->getServiceName() . "服務,當前[開啓狀態],建立延遲觸發器,一段時間後狀態切換爲半開狀態"); } return $data; } }
半開熔斷器是熔斷器關閉狀態和熔斷器開啓狀態的過分狀態,半開熔斷器的全部RPC調用都是加鎖的,連續成功或者連續失敗到閾值後會切換到關閉狀態或者開啓狀態,代碼相似,此處再也不累述,有興趣的讀者能夠自行研究。
Swoft源碼剖析系列目錄: https://segmentfault.com/a/11...