在前面源碼剖析介紹中,spark 源碼分析之二 -- SparkContext 的初始化過程 中的SparkEnv和 spark 源碼分析之四 -- TaskScheduler的建立和啓動過程 中的ClientApp啓動過程當中,都涉及到了Spark的內置RPC的知識。本篇專門把RPC 拿出來剖析一下。html
由於RPC 在 Spark 中內容雖然很少,但理清楚仍是花費不少精力的,計劃天天只剖析一小部分,等剖析完畢,會專門有一篇總結性的文章出來。java
本篇做爲RPC分析開篇,主要剖析了NettyRpcEnv建立的過程。apache
咱們以 org.apache.spark.deploy.ClientApp#start 方法中的調用API建立 RPC 的過程入口。bootstrap
// 1. 建立 RPC Environment val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
以下是建立NettyRpcEnv的時序圖(畫的很差看,見諒):併發
RpcEnv是scala 的object伴生對象(本質上是一個java 單例對象),去調用NettyRpcEnvFactory去建立 NettyRpcEnv 對象,序列化使用的是java序列化內建的方式,而後調用Utils 類重試啓動Server。啓動成功後返回給用戶。app
org.apache.spark.rpc.netty.NettyRpcEnv#startServer 代碼以下:ide
1 def startServer(bindAddress: String, port: Int): Unit = { 2 val bootstraps: java.util.List[TransportServerBootstrap] = 3 if (securityManager.isAuthenticationEnabled()) { 4 java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager)) 5 } else { 6 java.util.Collections.emptyList() 7 } 8 server = transportContext.createServer(bindAddress, port, bootstraps) 9 dispatcher.registerRpcEndpoint( 10 RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) 11 }
在TransportServer構造過程當中調用了init方法。org.apache.spark.network.server.TransportServer#init 源碼以下:oop
1 private void init(String hostToBind, int portToBind) { 2 3 IOMode ioMode = IOMode.valueOf(conf.ioMode()); 4 EventLoopGroup bossGroup = 5 NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); 6 EventLoopGroup workerGroup = bossGroup; 7 8 PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( 9 conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); 10 11 bootstrap = new ServerBootstrap() 12 .group(bossGroup, workerGroup) 13 .channel(NettyUtils.getServerChannelClass(ioMode)) 14 .option(ChannelOption.ALLOCATOR, allocator) 15 .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS) 16 .childOption(ChannelOption.ALLOCATOR, allocator); 17 18 this.metrics = new NettyMemoryMetrics( 19 allocator, conf.getModuleName() + "-server", conf); 20 21 if (conf.backLog() > 0) { 22 bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); 23 } 24 25 if (conf.receiveBuf() > 0) { 26 bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); 27 } 28 29 if (conf.sendBuf() > 0) { 30 bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); 31 } 32 33 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { 34 @Override 35 protected void initChannel(SocketChannel ch) { 36 RpcHandler rpcHandler = appRpcHandler; 37 for (TransportServerBootstrap bootstrap : bootstraps) { 38 rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); 39 } 40 context.initializePipeline(ch, rpcHandler); 41 } 42 }); 43 44 InetSocketAddress address = hostToBind == null ? 45 new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); 46 channelFuture = bootstrap.bind(address); 47 channelFuture.syncUninterruptibly(); 48 49 port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); 50 logger.debug("Shuffle server started on port: {}", port); 51 }
主要功能是:調用netty API 初始化 nettyServer。源碼分析
org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint的源碼以下:post
1 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { 2 val addr = RpcEndpointAddress(nettyEnv.address, name) 3 val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) 4 synchronized { 5 if (stopped) { 6 throw new IllegalStateException("RpcEnv has been stopped") 7 } 8 if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { 9 throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") 10 } 11 val data = endpoints.get(name) 12 endpointRefs.put(data.endpoint, data.ref) 13 receivers.offer(data) // for the OnStart message 14 } 15 endpointRef 16 }
EndpointData 在初始化過程當中會放入 OnStart 消息。
在 Inbox 的 process 中,有以下代碼:
1 case OnStart => 2 endpoint.onStart() 3 if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) { 4 inbox.synchronized { 5 if (!stopped) { 6 enableConcurrent = true 7 } 8 } 9 }
調用 endpoint 的 onStart 方法和 初始化 是否支持併發處理模式。endpoint 指的是 RpcEndpointVerifier, 其 onStart 方法以下:
1 /** 2 * Invoked before [[RpcEndpoint]] starts to handle any message. 3 */ 4 def onStart(): Unit = { 5 // By default, do nothing. 6 }
即不作任何事情,直接返回,至此初始化NettyRPCEnv 流程就剖析完。伴生對象RpcEnv調用netty rpc 工廠建立NettyRpcEnv 對象,而後使用重試機制啓動TransportServer,而後NettyRpcEnv註冊RpcEndpointVerifier
到Dispatcher。最終返回 NettyRpcEnv 給API調用端,NettyRpcEnv 建立成功。在這裏,Dispatcher 和 TransportServer 等組件暫不作深刻了解,後續會一一剖析。