gRPC中,Server、Client共享的Class不是不少,因此咱們能夠單獨的分別講解Server和Client的源碼。ide
經過第一篇,咱們知道對於gRPC來講,創建Server是很是簡單的,還記得怎麼寫的?仍是以example裏 HelloWorldServer 例子來看oop
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
你沒有看錯,就是這麼幾行搞定。ui
若是須要看懂gRPC的源碼,首先有幾點須要明白this
Builder模式生成Entityspa
Provider(SPI)模式解耦,動態選擇服務提供方.net
abstract class用於擴展netty
ServerBuilder是一個抽象類,不一樣的服務提供方(Provider),將繼承實現它。如何找到這些繼承者呢?ServerProvider就是用來找到不一樣的provider的。code
如上圖,ServerProvider也是一個抽象類,實現者都有哪些呢?咱們經過SPI模式找到他們。server
經過搜索文件知道gRPC中 io.grpc.ServerProvider 的實現方只有:Nettyblog
io.grpc.netty.NettyServerProvider,
這個類就是ServerProvider的實現者,它的builderForPort返回ServerBuilder
最後,咱們來看下當連接創建時是如何建立handle的。
public void initChannel(Channel ch) throws Exception { eventLoopReferenceCounter.retain(); ch.closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { eventLoopReferenceCounter.release(); } }); NettyServerTransport transport = new NettyServerTransport(ch, protocolNegotiator, maxStreamsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { if (channel != null && !channel.isOpen()) { // Server already shutdown. ch.close(); return; } transportListener = listener.transportCreated(transport); } transport.start(transportListener); }
看code可知,當一個連接創建時,會生成一個NettyServerTransport,全部的數據處理都將在這裏實現。
public void start(ServerTransportListener listener) { Preconditions.checkState(this.listener == null, "Handler already registered"); this.listener = listener; // Create the Netty handler for the pipeline. final NettyServerHandler grpcHandler = createHandler(listener); HandlerSettings.setAutoWindow(grpcHandler); // Notify when the channel closes. channel.closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { notifyTerminated(grpcHandler.connectionError()); } }); ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler); channel.pipeline().addLast(negotiationHandler); }
咱們看到當調用start方法是,最重要的就是createHandle,在這個方法裏將看到如何綁定HTTP/2的處理器的。
static NettyServerHandler newHandler(ServerTransportListener transportListener, int maxStreams, int flowControlWindow, int maxHeaderListSize, int maxMessageSize) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); // 就是一個log Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize); // reader Http2FrameReader frameReader = new Http2InboundFrameLogger( new DefaultHttp2FrameReader(headersDecoder), frameLogger); // writer Http2FrameWriter frameWriter = new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); return newHandler(frameReader, frameWriter, transportListener, maxStreams, flowControlWindow, maxMessageSize); } @VisibleForTesting static NettyServerHandler newHandler(Http2FrameReader frameReader, Http2FrameWriter frameWriter, ServerTransportListener transportListener, int maxStreams, int flowControlWindow, int maxMessageSize) { Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive"); Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive"); // 一個channel一個connection Http2Connection connection = new DefaultHttp2Connection(true); // Create the local flow controller configured to auto-refill the connection window. connection.local().flowController( new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); Http2Settings settings = new Http2Settings(); settings.initialWindowSize(flowControlWindow); settings.maxConcurrentStreams(maxStreams); return new NettyServerHandler(transportListener, decoder, encoder, settings, maxMessageSize); }