RPC(Remote procedure call):In computer science, a remote procedure call (RPC) is an inter-process communication(IPC) that allows a computer program to cause a subroutine or procedure to execute in another address space (commonly on another computer on a shared network) without the programmer explicitly coding the details for this remote interaction. That is, the programmer writes essentially the same code whether the subroutine is local to the executing program, or remote.java
簡單思想:RPC框架將客戶端調用的[服務名稱、方法名稱、方法參數]打包,經過網絡發送到服務端,服務端將數據包解包,在本地查找請求的服務、在服務中調用使用傳來的參數調用請求的方法,最後將執行結果打包,經過網絡再返回給客戶端。相似Restful請求,後者根據URI來定爲服務的請求,最終經過HTTP返回JSON格式是數據,RPC通常都封裝了底層的網絡服務,傳輸是基於socket的字節流數據包,固然也能夠簡單的使用HTTP來傳輸JSON格式的請求和應答。Restful的本質思想略有不一樣。git
RPC中的事件流:github
Why using Protobuff?apache
爲了讓多種不一樣的Clients(不一樣語言的客戶端方法、對象、參數都不相同)能夠訪問共同的Server,一般使用IDL(Interface description language)來描述不一樣平臺的RPC服務。你們都使用server使用IDL定義的接口,來調用server上的方法,這樣纔有共同的語言。網絡
What is Protobuff?session
Protobuff是Google提供的一種跨語言的對象序列化方案,同時又支持了IDL的功能,在Google內部普遍使用。相似的有Facebook的thrift,可是後者提供了RPC的實現,而Protobuff只是提供了機制,沒有具體的實現,網上有不少對兩者的比較。Protobuff的官方文檔https://developers.google.com/protocol-buffers/docs/overviewapp
有了共同的語言,還須要解決的是數據傳輸,客戶端的請求和數據須要傳送到服務端,此時就須要網絡通訊了。Java裏面使用Mina和Nettry均可以,網上也有不少兩者比較。框架
Protobuff官方列出了一些第三方的RPC實現https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns#RPC_Implementationssocket
簡單學習下https://github.com/jcai/fepss-rpc的實現(貌似Protobuff如今不按這個模式搞了,使用code generator plugins了)ide
.proto文件中定義了錯誤碼、RPC請求對象、RPC響應對象 package com.aquar.rpc;
option javapackage = "com.aquar.rpc";
option javaouterclassname = "RpcProtobuf";
option optimizefor = SPEED;
// Possible error reasons
enum ErrorReason {
BADREQUESTDATA = 0;
BADREQUESTPROTO = 1;
SERVICENOTFOUND = 2;
METHODNOTFOUND = 3;
RPC_ERROR = 4;
RPCFAILED = 5;
CLIENTFAILED=6;
}
message Request {
// RPC request id, used to identify different request
optional uint32 id = 1;
// RPC service full name
required string service_name = 2;
// RPC method name
required string method_name = 3;
// RPC request proto
required bytes request_proto = 4;
}
message Response {
// RPC request id
optional uint32 id = 1;
// RPC response proto
optional bytes response_proto = 2;
// Eror, if any
optional string error = 3;
// Was callback invoked
optional bool callback = 4 [default = false];
// Error Reason
optional ErrorReason error_reason = 5;
}
編譯生成RpcProtobuf類文件 protoc -I=./ --java_out=./src ./rpc.proto
定義Mina用到的編碼和解碼對象
字節流中結構爲 整數+Message對象,其中整數代表整個消息大小,Message爲RPC請求或響應對象
Encoder:
Message msg = (Message) message; int size = msg.getSerializedSize(); IoBuffer buffer = IoBuffer.allocate(SizeContext.computeTotal(size)); CodedOutputStream cos = CodedOutputStream.newInstance(buffer.asOutputStream()); cos.writeRawVarint32(size); msg.writeTo(cos); cos.flush(); buffer.flip(); out.write(buffer);
Decoder: extends CumulativeProtocolDecoder
@Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SizeContext ctx = SizeContext.get(session, in); if(ctx.hasEnoughData(in)) { try { decodeMessage(in, out, ctx); return true; } finally { ctx.shiftPositionAndReset(session, in); } } return false; } private void decodeMessage(IoBuffer in, ProtocolDecoderOutput out, SizeContext ctx) throws IOException { Message.Builder builder = newBuilder(); ctx.getInputStream(in).readMessage(builder, ExtensionRegistry.getEmptyRegistry()); out.write(builder.build()); } protected abstract Message.Builder newBuilder();
其中使用了SizeContext類來輔助統計字節個數以及buffer相關計算
和普通的Mina服務端同樣初始化服務端
int processorCount = Runtime.getRuntime().availableProcessors(); acceptor = new NioSocketAcceptor(processorCount); acceptor.setReuseAddress(true); acceptor.getSessionConfig().setReuseAddress(true); acceptor.getSessionConfig().setReceiveBufferSize(1024); acceptor.getSessionConfig().setSendBufferSize(1024); acceptor.getSessionConfig().setTcpNoDelay(true); acceptor.getSessionConfig().setSoLinger(-1); acceptor.setBacklog(1024); acceptor.setDefaultLocalAddress(new InetSocketAddress(port)); DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); chain.addLast("protobuf", new ProtocolCodecFilter( new ProtobufEncoder(), new ProtobufDecoder() { @Override protected Builder newBuilder() { return RpcProtobuf.Request.newBuilder(); } })); acceptor.setHandler(ioHandler); acceptor.bind(new InetSocketAddress(port));
服務端的IoHandler
其中Callback類主要用來保存具體的Service的method方法須要返回的對象,能夠看做一個wrapper,實際的實現方法將須要返回給客戶端的對象塞到callback裏面,服務端在從callback中將該對象取出來,存入RpcResponse中,客戶端經過解析RpcResponse的ResponseProto就獲得了服務端方法返回的對象值。
private Map<String, Service> services; @Override public void messageReceived(IoSession session, Object message) throws Exception { Request rpcRequest = (Request) message; if (rpcRequest == null) { throw new RpcException(ErrorReason.BAD_REQUEST_DATA, "request data is null!"); } // Get the service/method Service service = services.get(rpcRequest.getServiceName()); if (service == null) { throw new RpcException(ErrorReason.SERVICE_NOT_FOUND, "could not find service: " + rpcRequest.getServiceName()); } MethodDescriptor method = service.getDescriptorForType() .findMethodByName(rpcRequest.getMethodName()); if (method == null) { throw new RpcException(ErrorReason.METHOD_NOT_FOUND, String.format( "Could not find method %s in service %s", rpcRequest .getMethodName(), service.getDescriptorForType() .getFullName())); } // Parse request Message.Builder builder = null; try { builder = service.getRequestPrototype(method).newBuilderForType() .mergeFrom(rpcRequest.getRequestProto()); if (!builder.isInitialized()) { throw new RpcException(ErrorReason.BAD_REQUEST_PROTO, "Invalid request proto"); } } catch (InvalidProtocolBufferException e) { throw new RpcException(ErrorReason.BAD_REQUEST_PROTO, e); } Message request = builder.build(); // Call method RpcControllerImpl controller = new RpcControllerImpl(); Callback callback = new Callback(); try { service.callMethod(method, controller, request, callback); } catch (RuntimeException e) { throw new RpcException(ErrorReason.RPC_ERROR, e); } // Build and return response (callback is optional) Builder responseBuilder = Response.newBuilder(); if (callback.response != null) { responseBuilder.setCallback(true).setResponseProto( callback.response.toByteString()); } else { // Set whether callback was called responseBuilder.setCallback(callback.invoked); } if (controller.failed()) { responseBuilder.setError(controller.errorText()); responseBuilder.setErrorReason(ErrorReason.RPC_FAILED); } Response rpcResponse = responseBuilder.build(); outputResponse(session, rpcResponse); } /** * Callback that just saves the response and the fact that it was invoked. */ private class Callback implements RpcCallback<Message> { private Message response; private boolean invoked = false; public void run(Message response) { this.response = response; invoked = true; } }
RpcChannel的實現
RpcChannel is Abstract interface for an RPC channel. An RpcChannel represents a communication line to a Service which can be used to call that Service's methods. The Service may be running on another machine. Normally, you should not call an RpcChannel directly, but instead construct a stub Service wrapping it. Starting with version 2.3.0, RPC implementations should not try to build on this, but should instead provide code generator plugins which generate code specific to the particular RPC implementation. This way the generated code can be more appropriate for the implementation in use and can avoid unnecessary layers of indirection.
RpcChannel主要在客戶端實現,能夠和服務端使用徹底不一樣的語言或者通訊框架
在callMethod的實現中主要處理:
a. 在sessionOpened()中根據callMethod傳入的參數建立RpcRequest對象,並寫入session
b. 在messageReceived()方法中,解析Server傳來的RpcReponse,將它轉爲客戶端所調用的方法的返回值,即RpcReponse中的response_proto所對應的返回值 c. 最終回調給客戶端請求服務方法時傳入的RpcCallback對象中,從而將實際的返回值傳給了客戶端
public void callMethod(final MethodDescriptor method, final RpcController controller, final Message request, final Message responsePrototype, final RpcCallback<Message> done) { // check rpc request if (!request.isInitialized()) { throw new RpcException(ErrorReason.BAD_REQUEST_DATA, "request uninitialized!"); } // using MINA IoConnector IoConnector connector = new NioSocketConnector(); // add protocol buffer codec DefaultIoFilterChainBuilder chain = connector.getFilterChain(); chain.addLast("protobuf", new ProtocolCodecFilter( new ProtobufEncoder(), new ProtobufDecoder() { @Override protected Message.Builder newBuilder() { return RpcProtobuf.Response.newBuilder(); } })); // connector handler connector.setHandler(new IoHandlerAdapter() { @Override public void messageReceived(IoSession session, Object message) throws Exception { Response rpcResponse = (Response) message; handleResponse(responsePrototype, rpcResponse, controller, done); session.close(true); } /** * @see org.apache.mina.core.service.IoHandlerAdapter#sessionOpened(org.apache.mina.core.session.IoSession) */ @Override public void sessionOpened(IoSession session) throws Exception { ((SocketSessionConfig) session.getConfig()).setKeepAlive(true); // Create request protocol buffer Request rpcRequest = Request.newBuilder() .setRequestProto(request.toByteString()) .setServiceName(method.getService().getFullName()) .setMethodName(method.getName()).build(); // Write request session.write(rpcRequest); } /** * @see org.apache.mina.core.service.IoHandlerAdapter#exceptionCaught(org.apache.mina.core.session.IoSession, * java.lang.Throwable) */ @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { StringBuilder errorBuilder = new StringBuilder(); errorBuilder.append("client has runtime exception!\n"); ByteArrayOutputStream out = new ByteArrayOutputStream(); cause.printStackTrace(new PrintStream(out)); errorBuilder.append(out.toString()); controller.setFailed(errorBuilder.toString()); } }); // connect remote server ConnectFuture cf = connector.connect(new InetSocketAddress(host, port)); try { cf.awaitUninterruptibly();// wait to connect remote server cf.getSession().getCloseFuture().awaitUninterruptibly(); } finally { connector.dispose(); } } private void handleResponse(Message responsePrototype, Response rpcResponse, RpcController controller, RpcCallback<Message> callback) { // Check for error if (rpcResponse.hasError()) { ErrorReason reason = rpcResponse.getErrorReason(); controller .setFailed(reason.name() + " : " + rpcResponse.getError()); return; } if ((callback == null) || !rpcResponse.getCallback()) { // No callback needed return; } if (!rpcResponse.hasResponseProto()) { // Callback was called with null on server side callback.run(null); return; } try { Message.Builder builder = responsePrototype.newBuilderForType() .mergeFrom(rpcResponse.getResponseProto()); Message response = builder.build(); //調用客戶端請求時定義的回調接口,將實際的返回值response傳個客戶端 callback.run(response); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } }
自定義一個查找遊戲的服務,proto文件:
package com.aquar.rpc.services;
option javapackage = "com.aquar.rpc.services"; option javaouter_classname = "GameServiceProto"; option javagenericservices = true;
message Game{
optional string gameName = 1;
}
message Result{
optional string result=1;
optional bool success=2;
}
service GameService {
rpc findGame(Game) returns(Result);
}
protoc -I=./ --java_out=./src ./GameService.proto
主要是初始化服務列表,把服務端端口綁定運行起來
public class ServerDemo { public static String host = "127.0.0.1"; public static int port = 5566; public static void main(String[] args) { Map<String, Service> services = new HashMap<String, Service>(); services.put(GameService.getDescriptor().getFullName(), new GameServiceImpl()); ServerIoHandler ioHandler = new ServerIoHandler(services); RpcServer server = new RpcServer(host, port, ioHandler); try { server.start(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
public static void main(String[] args) { RpcChannelImpl channel = new RpcChannelImpl(ServerDemo.host, ServerDemo.port); RpcControllerImpl controller = channel.newRpcController(); Stub service = GameService.newStub(channel); Game request = Game.newBuilder().setGameName("FIFA").build(); RpcCallback<Result> done = new RpcCallback<GameServiceProto.Result>() { @Override public void run(Result result) { if (result.getSuccess()) { System.out.println("Client get " + result.getResult()); } } }; System.out.println("Client request Gameservice for findGame: " + request.getGameName()); service.findGame(controller, request, done); }