spark 源碼分析之五--Spark RPC剖析之建立NettyRpcEnv

在前面源碼剖析介紹中,spark 源碼分析之二 -- SparkContext 的初始化過程 中的SparkEnv和 spark 源碼分析之四 -- TaskScheduler的建立和啓動過程 中的ClientApp啓動過程當中,都涉及到了Spark的內置RPC的知識。本篇專門把RPC 拿出來剖析一下。html

由於RPC 在 Spark 中內容雖然很少,但理清楚仍是花費不少精力的,計劃天天只剖析一小部分,等剖析完畢,會專門有一篇總結性的文章出來。java

本篇做爲RPC分析開篇,主要剖析了NettyRpcEnv建立的過程。apache

Spark Rpc使用示例

咱們以 org.apache.spark.deploy.ClientApp#start 方法中的調用API建立 RPC 的過程入口。bootstrap

// 1. 建立 RPC Environment
val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

建立NettyRpcEnv

以下是建立NettyRpcEnv的時序圖(畫的很差看,見諒):併發

 

RpcEnv是scala 的object伴生對象(本質上是一個java 單例對象),去調用NettyRpcEnvFactory去建立 NettyRpcEnv 對象,序列化使用的是java序列化內建的方式,而後調用Utils 類重試啓動Server。啓動成功後返回給用戶。app

org.apache.spark.rpc.netty.NettyRpcEnv#startServer 代碼以下:ide

 1 def startServer(bindAddress: String, port: Int): Unit = {
 2     val bootstraps: java.util.List[TransportServerBootstrap] =
 3       if (securityManager.isAuthenticationEnabled()) {
 4         java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
 5       } else {
 6         java.util.Collections.emptyList()
 7       }
 8     server = transportContext.createServer(bindAddress, port, bootstraps)
 9     dispatcher.registerRpcEndpoint(
10       RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
11   }

在TransportServer構造過程當中調用了init方法。org.apache.spark.network.server.TransportServer#init 源碼以下:oop

 1 private void init(String hostToBind, int portToBind) {
 2 
 3   IOMode ioMode = IOMode.valueOf(conf.ioMode());
 4   EventLoopGroup bossGroup =
 5     NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
 6   EventLoopGroup workerGroup = bossGroup;
 7 
 8   PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
 9     conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
10 
11   bootstrap = new ServerBootstrap()
12     .group(bossGroup, workerGroup)
13     .channel(NettyUtils.getServerChannelClass(ioMode))
14     .option(ChannelOption.ALLOCATOR, allocator)
15     .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
16     .childOption(ChannelOption.ALLOCATOR, allocator);
17 
18   this.metrics = new NettyMemoryMetrics(
19     allocator, conf.getModuleName() + "-server", conf);
20 
21   if (conf.backLog() > 0) {
22     bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
23   }
24 
25   if (conf.receiveBuf() > 0) {
26     bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
27   }
28 
29   if (conf.sendBuf() > 0) {
30     bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
31   }
32 
33   bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
34     @Override
35     protected void initChannel(SocketChannel ch) {
36       RpcHandler rpcHandler = appRpcHandler;
37       for (TransportServerBootstrap bootstrap : bootstraps) {
38         rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
39       }
40       context.initializePipeline(ch, rpcHandler);
41     }
42   });
43 
44   InetSocketAddress address = hostToBind == null ?
45       new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
46   channelFuture = bootstrap.bind(address);
47   channelFuture.syncUninterruptibly();
48 
49   port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
50   logger.debug("Shuffle server started on port: {}", port);
51 }

主要功能是:調用netty API 初始化 nettyServer。源碼分析

org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint的源碼以下:post

 1 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
 2   val addr = RpcEndpointAddress(nettyEnv.address, name)
 3   val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
 4   synchronized {
 5     if (stopped) {
 6       throw new IllegalStateException("RpcEnv has been stopped")
 7     }
 8     if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
 9       throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
10     }
11     val data = endpoints.get(name)
12     endpointRefs.put(data.endpoint, data.ref)
13     receivers.offer(data)  // for the OnStart message
14   }
15   endpointRef
16 }

EndpointData 在初始化過程當中會放入 OnStart 消息。
在 Inbox 的 process 中,有以下代碼:

1 case OnStart =>
2   endpoint.onStart()
3   if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
4     inbox.synchronized {
5       if (!stopped) {
6         enableConcurrent = true
7       }
8     }
9   }

調用 endpoint 的 onStart 方法和 初始化 是否支持併發處理模式。endpoint 指的是 RpcEndpointVerifier, 其 onStart 方法以下:

1 /**
2    * Invoked before [[RpcEndpoint]] starts to handle any message.
3    */
4   def onStart(): Unit = {
5     // By default, do nothing.
6   }

即不作任何事情,直接返回,至此初始化NettyRPCEnv 流程就剖析完。伴生對象RpcEnv調用netty rpc 工廠建立NettyRpcEnv 對象,而後使用重試機制啓動TransportServer,而後NettyRpcEnv註冊RpcEndpointVerifier

到Dispatcher。最終返回 NettyRpcEnv 給API調用端,NettyRpcEnv 建立成功。在這裏,Dispatcher 和 TransportServer 等組件暫不作深刻了解,後續會一一剖析。

相關文章
相關標籤/搜索