<?php namespace Message\Controller; use Think\Controller; use Thrift\Exception\TException; use Thrift\Protocol\TBinaryProtocol; use Thrift\Transport\TBufferedTransport; use Thrift\Transport\THttpClient; use Thrift\Transport\TPhpStream; use Thrift\TMultiplexedProcessor; use Thrift\Protocol\TMultiplexedProtocol; use Message\Services\MessageServie; use Rpc\Msg\MessageClient; use Rpc\Msg\MessageProcessor; use Thrift\Factory\TBinaryProtocolFactory; use Thrift\Factory\TTransportFactory; use Thrift\Server\TServerSocket; use Thrift\Server\TSimpleServer; use Thrift\Server\TForkingServer; use Thrift\Transport\TSocket;
serverphp
public function message_rpc() { try { // 初始化多個服務提供者handle $messageprocessor = new \Rpc\Msg\MessageProcessor(new MessageServie()); // 建立多個服務Processor $sendProcessor = new \Rpc\Msg\SendMsgProcessor(new \Message\Services\SendMsgServie()); // 將服務註冊到TMultiplexedProcessor中 $tFactory = new TTransportFactory(); $pFactory = new TBinaryProtocolFactory(true, true); $processor = new TMultiplexedProcessor(); // 將服務註冊到TMultiplexedProcessor中 //隊列消費者rpc請求 $processor->registerProcessor("MessageApiAction", $messageprocessor); //消息發送rpc請求 $processor->registerProcessor("sendMsg", $sendProcessor); // 初始化數據傳輸方式transport // 利用該傳輸方式初始化數據傳輸格式protocol // 監聽開始 $transport = new TServerSocket('0.0.0.0', '9988'); // $processor->process($pFactory, $pFactory); $server = new TForkingServer($processor, $transport, $tFactory, $tFactory, $pFactory, $pFactory); $server->serve(); } catch (TException $tx) { \Think\Log::write($tx->getMessage()); } catch(\Exception $e){ \Think\Log::write($e->getMessage()); } }
clientapi
public function local() { try { ini_set('memory_limit', '1024M'); $socket = new TSocket('192.168.1.188', '9988'); $socket->setRecvTimeout(50000); $socket->setDebug(true); $transport = new TBufferedTransport($socket, 1024, 1024); $protocol = new TBinaryProtocol($transport); $client = new MessageClient(new TMultiplexedProtocol($protocol, "MessageApiAction")); // $client = new MessageClient($protocol); $transport->open(); $result = $client->MessageApiAction('message api'); print_r($result); $transport->close(); } catch (TException $tx) { print_r($tx->getMessage()); } } public function send_msg() { try { ini_set('memory_limit', '1024M'); $socket = new TSocket('192.168.1.188', '9988'); $socket->setRecvTimeout(50000); $socket->setDebug(true); $transport = new TBufferedTransport($socket, 1024, 1024); $protocol = new TBinaryProtocol($transport); $client = new \Rpc\Msg\SendMsgClient(new TMultiplexedProtocol($protocol, "sendMsg")); // $client = new \Rpc\Msg\SendMsgClient($protocol); $transport->open(); $result = $client->sendMsg('send msg'); print_r($result); $transport->close(); } catch (TException $tx) { print_r($tx->getMessage()); } }