RPC(Remote procedure call):In computer science, a remote procedure call (RPC) is an inter-process communication(IPC) that allows a computer program to cause a subroutine or procedure to execute in another address space (commonly on another computer on a shared network) without the programmer explicitly coding the details for this remote interaction. That is, the programmer writes essentially the same code whether the subroutine is local to the executing program, or remote.java
Why using Protobuff?apache
爲了讓多種不一樣的Clients(不一樣語言的客戶端方法、對象、參數都不相同)能夠訪問共同的Server,一般使用IDL(Interface description language)來描述不一樣平臺的RPC服務。你們都使用server使用IDL定義的接口,來調用server上的方法,這樣纔有共同的語言。網絡
What is Protobuff?session
簡單學習下https://github.com/jcai/fepss-rpc的實現(貌似Protobuff如今不按這個模式搞了,使用code generator plugins了)ide
.proto文件中定義了錯誤碼、RPC請求對象、RPC響應對象 package com.aquar.rpc;
option javapackage = "com.aquar.rpc";
option javaouterclassname = "RpcProtobuf";
option optimizefor = SPEED;
// Possible error reasons
enum ErrorReason {
message Request {
// RPC request id, used to identify different request
optional uint32 id = 1;
// RPC service full name
required string service_name = 2;
// RPC method name
required string method_name = 3;
// RPC request proto
required bytes request_proto = 4;
message Response {
// RPC request id
optional uint32 id = 1;
// RPC response proto
optional bytes response_proto = 2;
// Eror, if any
optional string error = 3;
// Was callback invoked
optional bool callback = 4 [default = false];
// Error Reason
optional ErrorReason error_reason = 5;
編譯生成RpcProtobuf類文件 protoc -I=./ --java_out=./src ./rpc.proto
字節流中結構爲 整數+Message對象,其中整數代表整個消息大小,Message爲RPC請求或響應對象
Message msg = (Message) message; int size = msg.getSerializedSize(); IoBuffer buffer = IoBuffer.allocate(SizeContext.computeTotal(size)); CodedOutputStream cos = CodedOutputStream.newInstance(buffer.asOutputStream()); cos.writeRawVarint32(size); msg.writeTo(cos); cos.flush(); buffer.flip(); out.write(buffer);
Decoder: extends CumulativeProtocolDecoder
@Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SizeContext ctx = SizeContext.get(session, in); if(ctx.hasEnoughData(in)) { try { decodeMessage(in, out, ctx); return true; } finally { ctx.shiftPositionAndReset(session, in); } } return false; } private void decodeMessage(IoBuffer in, ProtocolDecoderOutput out, SizeContext ctx) throws IOException { Message.Builder builder = newBuilder(); ctx.getInputStream(in).readMessage(builder, ExtensionRegistry.getEmptyRegistry()); out.write(builder.build()); } protected abstract Message.Builder newBuilder();
int processorCount = Runtime.getRuntime().availableProcessors(); acceptor = new NioSocketAcceptor(processorCount); acceptor.setReuseAddress(true); acceptor.getSessionConfig().setReuseAddress(true); acceptor.getSessionConfig().setReceiveBufferSize(1024); acceptor.getSessionConfig().setSendBufferSize(1024); acceptor.getSessionConfig().setTcpNoDelay(true); acceptor.getSessionConfig().setSoLinger(-1); acceptor.setBacklog(1024); acceptor.setDefaultLocalAddress(new InetSocketAddress(port)); DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); chain.addLast("protobuf", new ProtocolCodecFilter( new ProtobufEncoder(), new ProtobufDecoder() { @Override protected Builder newBuilder() { return RpcProtobuf.Request.newBuilder(); } })); acceptor.setHandler(ioHandler); acceptor.bind(new InetSocketAddress(port));
private Map<String, Service> services; @Override public void messageReceived(IoSession session, Object message) throws Exception { Request rpcRequest = (Request) message; if (rpcRequest == null) { throw new RpcException(ErrorReason.BAD_REQUEST_DATA, "request data is null!"); } // Get the service/method Service service = services.get(rpcRequest.getServiceName()); if (service == null) { throw new RpcException(ErrorReason.SERVICE_NOT_FOUND, "could not find service: " + rpcRequest.getServiceName()); } MethodDescriptor method = service.getDescriptorForType() .findMethodByName(rpcRequest.getMethodName()); if (method == null) { throw new RpcException(ErrorReason.METHOD_NOT_FOUND, String.format( "Could not find method %s in service %s", rpcRequest .getMethodName(), service.getDescriptorForType() .getFullName())); } // Parse request Message.Builder builder = null; try { builder = service.getRequestPrototype(method).newBuilderForType() .mergeFrom(rpcRequest.getRequestProto()); if (!builder.isInitialized()) { throw new RpcException(ErrorReason.BAD_REQUEST_PROTO, "Invalid request proto"); } } catch (InvalidProtocolBufferException e) { throw new RpcException(ErrorReason.BAD_REQUEST_PROTO, e); } Message request = builder.build(); // Call method RpcControllerImpl controller = new RpcControllerImpl(); Callback callback = new Callback(); try { service.callMethod(method, controller, request, callback); } catch (RuntimeException e) { throw new RpcException(ErrorReason.RPC_ERROR, e); } // Build and return response (callback is optional) Builder responseBuilder = Response.newBuilder(); if (callback.response != null) { responseBuilder.setCallback(true).setResponseProto( callback.response.toByteString()); } else { // Set whether callback was called responseBuilder.setCallback(callback.invoked); } if (controller.failed()) { responseBuilder.setError(controller.errorText()); responseBuilder.setErrorReason(ErrorReason.RPC_FAILED); } Response rpcResponse = responseBuilder.build(); outputResponse(session, rpcResponse); } /** * Callback that just saves the response and the fact that it was invoked. */ private class Callback implements RpcCallback<Message> { private Message response; private boolean invoked = false; public void run(Message response) { this.response = response; invoked = true; } }
RpcChannel is Abstract interface for an RPC channel. An RpcChannel represents a communication line to a Service which can be used to call that Service's methods. The Service may be running on another machine. Normally, you should not call an RpcChannel directly, but instead construct a stub Service wrapping it. Starting with version 2.3.0, RPC implementations should not try to build on this, but should instead provide code generator plugins which generate code specific to the particular RPC implementation. This way the generated code can be more appropriate for the implementation in use and can avoid unnecessary layers of indirection.
a. 在sessionOpened()中根據callMethod傳入的參數建立RpcRequest對象,並寫入session
b. 在messageReceived()方法中,解析Server傳來的RpcReponse,將它轉爲客戶端所調用的方法的返回值,即RpcReponse中的response_proto所對應的返回值 c. 最終回調給客戶端請求服務方法時傳入的RpcCallback對象中,從而將實際的返回值傳給了客戶端
public void callMethod(final MethodDescriptor method, final RpcController controller, final Message request, final Message responsePrototype, final RpcCallback<Message> done) { // check rpc request if (!request.isInitialized()) { throw new RpcException(ErrorReason.BAD_REQUEST_DATA, "request uninitialized!"); } // using MINA IoConnector IoConnector connector = new NioSocketConnector(); // add protocol buffer codec DefaultIoFilterChainBuilder chain = connector.getFilterChain(); chain.addLast("protobuf", new ProtocolCodecFilter( new ProtobufEncoder(), new ProtobufDecoder() { @Override protected Message.Builder newBuilder() { return RpcProtobuf.Response.newBuilder(); } })); // connector handler connector.setHandler(new IoHandlerAdapter() { @Override public void messageReceived(IoSession session, Object message) throws Exception { Response rpcResponse = (Response) message; handleResponse(responsePrototype, rpcResponse, controller, done); session.close(true); } /** * @see org.apache.mina.core.service.IoHandlerAdapter#sessionOpened(org.apache.mina.core.session.IoSession) */ @Override public void sessionOpened(IoSession session) throws Exception { ((SocketSessionConfig) session.getConfig()).setKeepAlive(true); // Create request protocol buffer Request rpcRequest = Request.newBuilder() .setRequestProto(request.toByteString()) .setServiceName(method.getService().getFullName()) .setMethodName(method.getName()).build(); // Write request session.write(rpcRequest); } /** * @see org.apache.mina.core.service.IoHandlerAdapter#exceptionCaught(org.apache.mina.core.session.IoSession, * java.lang.Throwable) */ @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { StringBuilder errorBuilder = new StringBuilder(); errorBuilder.append("client has runtime exception!\n"); ByteArrayOutputStream out = new ByteArrayOutputStream(); cause.printStackTrace(new PrintStream(out)); errorBuilder.append(out.toString()); controller.setFailed(errorBuilder.toString()); } }); // connect remote server ConnectFuture cf = connector.connect(new InetSocketAddress(host, port)); try { cf.awaitUninterruptibly();// wait to connect remote server cf.getSession().getCloseFuture().awaitUninterruptibly(); } finally { connector.dispose(); } } private void handleResponse(Message responsePrototype, Response rpcResponse, RpcController controller, RpcCallback<Message> callback) { // Check for error if (rpcResponse.hasError()) { ErrorReason reason = rpcResponse.getErrorReason(); controller .setFailed(reason.name() + " : " + rpcResponse.getError()); return; } if ((callback == null) || !rpcResponse.getCallback()) { // No callback needed return; } if (!rpcResponse.hasResponseProto()) { // Callback was called with null on server side callback.run(null); return; } try { Message.Builder builder = responsePrototype.newBuilderForType() .mergeFrom(rpcResponse.getResponseProto()); Message response = builder.build(); //調用客戶端請求時定義的回調接口,將實際的返回值response傳個客戶端 callback.run(response); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } }
package com.aquar.rpc.services;
option javapackage = "com.aquar.rpc.services"; option javaouter_classname = "GameServiceProto"; option javagenericservices = true;
message Game{
optional string gameName = 1;
message Result{
optional string result=1;
optional bool success=2;
service GameService {
rpc findGame(Game) returns(Result);
protoc -I=./ --java_out=./src ./GameService.proto
public class ServerDemo { public static String host = ""; public static int port = 5566; public static void main(String[] args) { Map<String, Service> services = new HashMap<String, Service>(); services.put(GameService.getDescriptor().getFullName(), new GameServiceImpl()); ServerIoHandler ioHandler = new ServerIoHandler(services); RpcServer server = new RpcServer(host, port, ioHandler); try { server.start(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
public static void main(String[] args) { RpcChannelImpl channel = new RpcChannelImpl(ServerDemo.host, ServerDemo.port); RpcControllerImpl controller = channel.newRpcController(); Stub service = GameService.newStub(channel); Game request = Game.newBuilder().setGameName("FIFA").build(); RpcCallback<Result> done = new RpcCallback<GameServiceProto.Result>() { @Override public void run(Result result) { if (result.getSuccess()) { System.out.println("Client get " + result.getResult()); } } }; System.out.println("Client request Gameservice for findGame: " + request.getGameName()); service.findGame(controller, request, done); }