在Spark中不少地方都涉及網絡通訊,好比Spark各個組件間的消息互通、用戶文件與Jar包的上傳、節點間的Shuffle過程、Block數據的複製與備份等。在Spark 0.x.x與Spark 1.x.x版本中,組件間的消息通訊主要藉助於Akka[1],使用Akka能夠輕鬆的構建強有力的高併發與分佈式應用。可是Akka在Spark 2.0.0版本中被移除了,Spark官網文檔對此的描述爲:「Akka的依賴被移除了,所以用戶可使用任何版本的Akka來編程了。」Spark團隊的決策者或許認爲對於Akka具體版本的依賴,限制了用戶對於Akka不一樣版本的使用。儘管如此,筆者依然認爲Akka是一款很是優秀的開源分佈式系統,我參與的一些Java Application或者Java Web就利用Akka的豐富特性實現了分佈式一致性、最終一致性以及分佈式事務等分佈式環境面對的問題。在Spark 1.x.x版本中,用戶文件與Jar包的上傳採用了由Jetty[2]實現的HttpFileServer,但在Spark 2.0.0版本中也被廢棄了,如今使用的是基於Spark內置RPC框架的NettyStreamManager。節點間的Shuffle過程和Block數據的複製與備份這兩個部分在Spark 2.0.0版本中依然沿用了Netty[3],經過對接口和程序進行從新設計將各個組件間的消息互通、用戶文件與Jar包的上傳等內容統一歸入到Spark的RPC框架體系中。html
咱們先來看看RPC框架的基本架構,如圖1所示。java
圖1 Spark內置RPC框架的基本架構數據庫
TransportContext內部包含傳輸上下文的配置信息TransportConf和對客戶端請求消息進行處理的RpcHandler。TransportConf在建立TransportClientFactory和TransportServer時都是必須的,而RpcHandler只用於建立TransportServer。TransportClientFactory是RPC客戶端的工廠類。TransportServer是RPC服務端的實現。圖中記號的含義以下:編程
記號①:表示經過調用TransportContext的createClientFactory方法建立傳輸客戶端工廠TransportClientFactory的實例。在構造TransportClientFactory的實例時,還會傳遞客戶端引導程序TransportClientBootstrap的列表。此外,TransportClientFactory內部還存在針對每一個Socket地址的鏈接池ClientPool,這個鏈接池緩存的定義以下:bootstrap
private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
ClientPool的類型定義以下:數組
private static class ClientPool { TransportClient[] clients; Object[] locks; ClientPool(int size) { clients = new TransportClient[size]; locks = new Object[size]; for (int i = 0; i < size; i++) { locks[i] = new Object(); } } }
因而可知,ClientPool實際是由TransportClient的數組構成,而locks數組中的Object與clients數組中的TransportClient按照數組索引一一對應,經過對每一個TransportClient分別採用不一樣的鎖,下降併發狀況下線程間對鎖的爭用,進而減小阻塞,提升併發度。緩存
記號②:表示經過調用TransportContext的createServer方法建立傳輸服務端TransportServer的實例。在構造TransportServer的實例時,須要傳遞TransportContext、host、port、RpcHandler以及服務端引導程序TransportServerBootstrap的列表。安全
有了對Spark內置RPC框架的基本架構的瞭解,如今正式介紹Spark的RPC框架所包含的各個組件:服務器
拓展知識:爲何須要MessageEncoder和MessageDecoder?由於在基於流的傳輸裏(好比TCP/IP),接收到的數據首先會被存儲到一個socket接收緩衝裏。不幸的是,基於流的傳輸並非一個數據包隊列,而是一個字節隊列。即便你發送了2個獨立的數據包,操做系統也不會做爲2個消息處理而僅僅認爲是一連串的字節。所以不能保證遠程寫入的數據會被準確地讀取。舉個例子,讓咱們假設操做系統的TCP/TP協議棧已經接收了3個數據包:ABC、DEF、GHI。因爲基於流傳輸的協議的這種統一的性質,在你的應用程序在讀取數據的時候有很高的可能性被分紅下面的片斷:AB、CDEFG、H、I。所以,接收方無論是客戶端仍是服務端,都應該把接收到的數據整理成一個或者多個更有意義而且讓程序的邏輯更好理解的數據。網絡
[1] Akka是基於Actor併發編程模型實現的併發的分佈式的框架。Akka是用Scala語言編寫的,它提供了Java和Scala兩種語言的API,減小開發人員對併發的細節處理,並保證分佈式調用的最終一致性。在附錄B中有關於Akka的進一步介紹,感興趣的讀者不妨一讀。
[2] Jetty 是一個開源的Servlet容器,它爲基於Java的Web容器,例如JSP和Servlet提供運行環境。Jetty是使用Java語言編寫的,它的API以一組JAR包的形式發佈。開發人員能夠將Jetty容器實例化成一個對象,能夠迅速爲一些獨立運行的Java應用提供網絡和Web鏈接。在附錄C中有對Jetty的簡單介紹,感興趣的讀者能夠選擇閱讀。
[3] Netty是由Jboss提供的一個基於NIO的客戶、服務器端編程框架,使用Netty 能夠確保你快速、簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。附錄G中有對Netty的簡單介紹,感興趣的讀者能夠一讀。
上文提到TransportContext中的TransportConf給Spark的RPC框架提供配置信息,它有兩個成員屬性——配置提供者conf和配置的模塊名稱module。這兩個屬性的定義以下:
private final ConfigProvider conf; private final String module;
其中conf是真正的配置提供者,其類型ConfigProvider是一個抽象類,見代碼清單1。
代碼清單1 ConfigProvider的實現
public abstract class ConfigProvider { public abstract String get(String name); public String get(String name, String defaultValue) { try { return get(name); } catch (NoSuchElementException e) { return defaultValue; } } public int getInt(String name, int defaultValue) { return Integer.parseInt(get(name, Integer.toString(defaultValue))); } public long getLong(String name, long defaultValue) { return Long.parseLong(get(name, Long.toString(defaultValue))); } public double getDouble(String name, double defaultValue) { return Double.parseDouble(get(name, Double.toString(defaultValue))); } public boolean getBoolean(String name, boolean defaultValue) { return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue))); } }
從代碼清單1,能夠看到ConfigProvider中包括get、getInt、getLong、getDouble、getBoolean等方法,這些方法都是基於抽象方法get獲取值,通過一次類型轉換而實現。這個抽象的get方法將須要子類去實現。
Spark一般使用SparkTransportConf建立TransportConf,其實現見代碼清單2。
代碼清單2 SparkTransportConf的實現
object SparkTransportConf { private val MAX_DEFAULT_NETTY_THREADS = 8 def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = { val conf = _conf.clone val numThreads = defaultNumThreads(numUsableCores) conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString) conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString) new TransportConf(module, new ConfigProvider { override def get(name: String): String = conf.get(name) }) } private def defaultNumThreads(numUsableCores: Int): Int = { val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() math.min(availableCores, MAX_DEFAULT_NETTY_THREADS) } }
從代碼清單2看到,可使用SparkTransportConf的fromSparkConf方法來構造TransportConf。傳遞的三個參數分別爲SparkConf、模塊名module及可用的內核數numUsableCores。若是numUsableCores小於等於0,那麼線程數是系統可用處理器的數量,不過系統的內核數不可能所有用於網絡傳輸使用,因此這裏還將分配給網絡傳輸的內核數量最多限制在8個。最終肯定的線程數將被用於設置客戶端傳輸線程數(spark.$module.io.clientThreads屬性)和服務端傳輸線程數(spark.$module.io.serverThreads屬性)。fromSparkConf最終構造TransportConf對象時傳遞的ConfigProvider爲實現了get方法的匿名的內部類,get的實現實際是代理了SparkConf的get方法。
TransportClientFactory是建立傳輸客戶端(TransportClient)的工廠類。在說明圖3-1中的記號①時提到過TransportContext的createClientFactory方法能夠建立TransportClientFactory的實例,其實現見代碼清單3。
代碼清單3 建立客戶端工廠
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) { return new TransportClientFactory(this, bootstraps); } public TransportClientFactory createClientFactory() { return createClientFactory(Lists.<TransportClientBootstrap>newArrayList()); }
能夠看到TransportContext中有兩個重載的createClientFactory方法,它們最終在構造TransportClientFactory時都會傳遞兩個參數:TransportContext和TransportClientBootstrap列表。TransportClientFactory構造器的實現見代碼清單4。
代碼清單4 TransportClientFactory的構造器
public TransportClientFactory( TransportContext context, List<TransportClientBootstrap> clientBootstraps) { this.context = Preconditions.checkNotNull(context); this.conf = context.getConf(); this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps)); this.connectionPool = new ConcurrentHashMap<>(); this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); this.rand = new Random(); IOMode ioMode = IOMode.valueOf(conf.ioMode()); this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); this.workerGroup = NettyUtils.createEventLoop( ioMode, conf.clientThreads(), conf.getModuleName() + "-client"); this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); }
TransportClientFactory構造器中的各個變量分別爲:
圖2 TransportClientFactory的connectionPool
TransportClientFactory裏大量使用了NettyUtils,關於NettyUtils的具體實現,請看附錄G。[1]
提示:NIO是指Java中New IO的簡稱,其特色包括:爲全部的原始類型提供(Buffer)緩衝支持;字符集編碼解碼解決方案;提供一個新的原始I/O 抽象Channel,支持鎖和內存映射文件的文件訪問接口;提供多路非阻塞式(non-bloking)的高伸縮性網絡I/O 。其具體使用屬於Java語言的範疇,本文不過多介紹。
[1] Spark將對Netty框架的使用細節都封裝在NettyUtils工具類中,因爲Netty的API使用不屬於本書主要闡述的內容,故此放入附錄G中,對Netty的使用感興趣的讀者能夠選擇閱讀。
TransportClientFactory的clientBootstraps屬性是TransportClientBootstrap的列表。TransportClientBootstrap是在TransportClient上執行的客戶端引導程序,主要對鏈接創建時進行一些初始化的準備(例如驗證、加密)。TransportClientBootstrap所做的操做每每是昂貴的,好在創建的鏈接能夠重用。TransportClientBootstrap的接口定義見代碼清單5。
代碼清單5 TransportClientBootstrap的定義
public interface TransportClientBootstrap { void doBootstrap(TransportClient client, Channel channel) throws RuntimeException; }
TransportClientBootstrap有兩個實現類:EncryptionDisablerBootstrap和SaslClientBootstrap。爲了對TransportClientBootstrap的做用能有更深的瞭解,這裏以EncryptionDisablerBootstrap爲例,EncryptionDisablerBootstrap的實現見代碼清單6。
代碼清單6 EncryptionDisablerBootstrap的實現
private static class EncryptionDisablerBootstrap implements TransportClientBootstrap { @Override public void doBootstrap(TransportClient client, Channel channel) { channel.pipeline().remove(SaslEncryption.ENCRYPTION_HANDLER_NAME); } }
根據代碼清單6,能夠看到EncryptionDisablerBootstrap的做用是移除客戶端管道中的SASL加密。
有了TransportClientFactory,Spark的各個模塊就可使用它建立RPC客戶端TransportClient了。每一個TransportClient實例只能和一個遠端的RPC服務通訊,因此Spark中的組件若是想要和多個RPC服務通訊,就須要持有多個TransportClient實例。建立TransportClient的方法見代碼清單7(實際爲從緩存中獲取TransportClient)。
代碼清單7 從緩存獲取TransportClient
public TransportClient createClient(String remoteHost, int remotePort) throws IOException, InterruptedException { // 建立InetSocketAddress final InetSocketAddress unresolvedAddress = InetSocketAddress.createUnresolved(remoteHost, remotePort); ClientPool clientPool = connectionPool.get(unresolvedAddress); if (clientPool == null) { connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer)); clientPool = connectionPool.get(unresolvedAddress); } int clientIndex = rand.nextInt(numConnectionsPerPeer); // 隨機選擇一個TransportClient TransportClient cachedClient = clientPool.clients[clientIndex]; if (cachedClient != null && cachedClient.isActive()) {// 獲取並返回激活的TransportClient TransportChannelHandler handler = cachedClient.getChannel().pipeline() .get(TransportChannelHandler.class); synchronized (handler) { handler.getResponseHandler().updateTimeOfLastRequest(); } if (cachedClient.isActive()) { logger.trace("Returning cached connection to {}: {}", cachedClient.getSocketAddress(), cachedClient); return cachedClient; } } final long preResolveHost = System.nanoTime(); final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort); final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000; if (hostResolveTimeMs > 2000) { logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs); } else { logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs); } // 建立並返回TransportClient對象 synchronized (clientPool.locks[clientIndex]) { cachedClient = clientPool.clients[clientIndex]; if (cachedClient != null) { if (cachedClient.isActive()) { logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient); return cachedClient; } else { logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress); } } clientPool.clients[clientIndex] = createClient(resolvedAddress); return clientPool.clients[clientIndex]; } }
從代碼清單7得知,建立TransportClient的步驟以下:
代碼清單7的整個執行過程實際解決了TransportClient緩存的使用以及createClient方法的線程安全問題,並無涉及建立TransportClient的實現。TransportClient的建立過程在重載的createClient方法(見代碼清單8)中實現。
代碼清單8 建立TransportClient
private TransportClient createClient(InetSocketAddress address) throws IOException, InterruptedException { logger.debug("Creating new connection to {}", address); // 構建根引導器Bootstrap並對其進行配置 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(socketChannelClass) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) .option(ChannelOption.ALLOCATOR, pooledAllocator); final AtomicReference<TransportClient> clientRef = new AtomicReference<>(); final AtomicReference<Channel> channelRef = new AtomicReference<>(); // 爲根引導程序設置管道初始化回調函數 bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { TransportChannelHandler clientHandler = context.initializePipeline(ch); clientRef.set(clientHandler.getClient()); channelRef.set(ch); } }); long preConnect = System.nanoTime(); ChannelFuture cf = bootstrap.connect(address);// 使用根引導程序鏈接遠程服務器 if (!cf.await(conf.connectionTimeoutMs())) { throw new IOException( String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); } else if (cf.cause() != null) { throw new IOException(String.format("Failed to connect to %s", address), cf.cause()); } TransportClient client = clientRef.get(); Channel channel = channelRef.get(); assert client != null : "Channel future completed successfully with null client"; // Execute any client bootstraps synchronously before marking the Client as successful. long preBootstrap = System.nanoTime(); logger.debug("Connection to {} successful, running bootstraps...", address); try { for (TransportClientBootstrap clientBootstrap : clientBootstraps) { clientBootstrap.doBootstrap(client, channel);// 給TransportClient設置客戶端引導程序 } } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000; logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e); client.close(); throw Throwables.propagate(e); } long postBootstrap = System.nanoTime(); logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000); return client; }
從代碼清單8得知,真正建立TransportClient的步驟以下:
TransportServer是RPC框架的服務端,可提供高效的、低級別的流服務。在說明圖1中的記號②時提到過TransportContext的createServer方法用於建立TransportServer,其實現見代碼清單9。
代碼清單9 建立RPC服務端
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) { return new TransportServer(this, null, port, rpcHandler, bootstraps); } public TransportServer createServer( String host, int port, List<TransportServerBootstrap> bootstraps) { return new TransportServer(this, host, port, rpcHandler, bootstraps); } public TransportServer createServer(List<TransportServerBootstrap> bootstraps) { return createServer(0, bootstraps); } public TransportServer createServer() { return createServer(0, Lists.<TransportServerBootstrap>newArrayList()); }
代碼清單9中列出了四個名爲createServer的重載方法,可是它們最終調用了TransportServer的構造器(見代碼清單10)來建立TransportServer實例。
代碼清單10 TransportServer的構造器
public TransportServer( TransportContext context, String hostToBind, int portToBind, RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps) { this.context = context; this.conf = context.getConf(); this.appRpcHandler = appRpcHandler; this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps)); try { init(hostToBind, portToBind); } catch (RuntimeException e) { JavaUtils.closeQuietly(this); throw e; } }
TransportServer的構造器中的各個變量分別爲:
TransportServer的構造器(見代碼清單10)中調用了init方法, init方法用於對TransportServer進行初始化,見代碼清單11。
代碼清單11 TransportServer的初始化
private void init(String hostToBind, int portToBind) { // 根據Netty的API文檔,Netty服務端需同時建立bossGroup和workerGroup IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; // 建立一個聚集ByteBuf但對本地線程緩存禁用的分配器 PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); // 建立Netty的服務端根引導程序並對其進行配置 bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); if (conf.backLog() > 0) { bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); } if (conf.receiveBuf() > 0) { bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); } if (conf.sendBuf() > 0) { bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); } // 爲根引導程序設置管道初始化回調函數 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { RpcHandler rpcHandler = appRpcHandler; for (TransportServerBootstrap bootstrap : bootstraps) { rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); } context.initializePipeline(ch, rpcHandler); } }); // 給根引導程序綁定Socket的監聽端口 InetSocketAddress address = hostToBind == null ? new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); channelFuture = bootstrap.bind(address); channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port: {}", port); }
代碼清單11中TransportServer初始化的步驟以下:
小貼士:根據Netty的API文檔,Netty服務端需同時建立bossGroup和workerGroup。
提示:代碼清單11中使用了NettyUtils工具類的不少方法,在附錄G中有對它們的詳細介紹。EventLoopGroup、PooledByteBufAllocator、ServerBootstrap都是Netty提供的API,對於它們的更多介紹,請訪問http://netty.io/。
在代碼清單8建立TransportClient和代碼清單11對TransportServer初始化的實現中都在管道初始化回調函數中調用了TransportContext的initializePipeline方法,initializePipeline方法(見代碼清單12)將調用Netty的API對管道初始化。
代碼清單12 管道初始化
public TransportChannelHandler initializePipeline( SocketChannel channel, RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) .addLast("handler", channelHandler); return channelHandler; } catch (RuntimeException e) { logger.error("Error while initializing Netty pipeline", e); throw e; } }
根據代碼清單12,initializePipeline方法的執行步驟以下:
對管道進行設置,這裏的ENCODER(即MessageEncoder)派生自Netty的ChannelOutboundHandler接口;DECODER(即MessageDecoder)、TransportChannelHandler以及TransportFrameDecoder(由工具類NettyUtils的靜態方法createFrameDecoder建立)派生自Netty的ChannelInboundHandler接口;IdleStateHandler同時實現了ChannelOutboundHandler和ChannelInboundHandler接口。根據Netty的API行爲,經過addLast方法註冊多個handler時,ChannelInboundHandler按照註冊的前後順序執行;ChannelOutboundHandler按照註冊的前後順序逆序執行,所以在管道兩端(不管是服務端仍是客戶端)處理請求和響應的流程如圖3所示。
代碼清單13 建立TransportChannelHandler
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) { TransportResponseHandler responseHandler = new TransportResponseHandler(channel); TransportClient client = new TransportClient(channel, responseHandler); TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler); return new TransportChannelHandler(client, responseHandler, requestHandler, conf.connectionTimeoutMs(), closeIdleConnections); }
圖3 管道處理請求和響應的流程圖
TransportChannelHandler實現了Netty的ChannelInboundHandler[1],以便對Netty管道中的消息進行處理。圖3中的這些Handler(除了MessageEncoder)因爲都實現了ChannelInboundHandler接口,做爲自定義的ChannelInboundHandler,於是都要重寫channelRead方法。Netty框架使用工做鏈模式來對每一個ChannelInboundHandler的實現類的channelRead方法進行鏈式調用。TransportChannelHandler實現的channelRead方法見代碼清單14。
代碼清單14 TransportChannelHandler的channelRead實現
@Override public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception { if (request instanceof RequestMessage) { requestHandler.handle((RequestMessage) request); } else if (request instanceof ResponseMessage) { responseHandler.handle((ResponseMessage) request); } else { ctx.fireChannelRead(request); } }
從代碼清單14看到,當TransportChannelHandler讀取到的request是RequestMessage類型時,則將此消息的處理進一步交給TransportRequestHandler,當request是ResponseMessage時,則將此消息的處理進一步交給TransportResponseHandler。
TransportRequestHandler與TransportResponseHandler都繼承自抽象類MessageHandler,MessageHandler定義了子類的規範,詳細定義見代碼清單15。
代碼清單15 MessageHandler規範
public abstract class MessageHandler<T extends Message> { public abstract void handle(T message) throws Exception; public abstract void channelActive(); public abstract void exceptionCaught(Throwable cause); public abstract void channelInactive(); }
MessageHandler中定義的各個方法的做用分別爲:
Spark中MessageHandler類的繼承體系如圖4所示。
圖4 MessageHandler類的繼承體系
根據代碼清單15,咱們知道MessageHandler同時也是一個Java泛型類,其子類能處理的消息都派生自接口Message。Message的定義見代碼清單16。
代碼清單16 Message的定義
public interface Message extends Encodable { Type type(); ManagedBuffer body(); boolean isBodyInFrame();
Message中定義的三個接口方法的做用分別爲:
Message接口繼承了Encodable接口,Encodable的定義見代碼清單17。
代碼清單17 Encodable的定義
public interface Encodable { int encodedLength(); void encode(ByteBuf buf); }
實現Encodable接口的類將能夠轉換到一個ByteBuf中,多個對象將被存儲到預先分配的單個ByteBuf,因此這裏的encodedLength用於返回轉換的對象數量。下面一塊兒來看看Message的類繼承體系,如圖5所示。
圖5 Message的類繼承體系
從圖5看到最終的消息實現類都直接或間接的實現了RequestMessage或ResponseMessage接口,其中RequestMessage的具體實現有四種,分別是:
因爲OneWayMessage 不須要響應,因此ResponseMessage的對於成功或失敗狀態的實現各有三種,分別是:
回頭再看看代碼清單16中對body接口的定義,能夠看到其返回內容體的類型爲ManagedBuffer。ManagedBuffer提供了由字節構成數據的不可變視圖(也就是說ManagedBuffer並不存儲數據,也不是數據的實際來源,這同關係型數據庫的視圖相似)。咱們先來看看抽象類ManagedBuffer中對行爲的定義,見代碼清單18。
代碼清單18 ManagedBuffer的定義
public abstract class ManagedBuffer { public abstract long size(); public abstract ByteBuffer nioByteBuffer() throws IOException; public abstract InputStream createInputStream() throws IOException; public abstract ManagedBuffer retain(); public abstract ManagedBuffer release(); public abstract Object convertToNetty() throws IOException; }
ManagedBuffer中定義了六個方法,分別爲:
ManagedBuffer的具體實現有不少,咱們能夠經過圖6來了解。
圖6 ManagedBuffer的繼承體系
圖6中列出了ManagedBuffer的五個實現類,其中TestManagedBuffer和RecordingManagedBuffer用於測試。NettyManagedBuffer中的緩衝爲io.netty.buffer.ByteBuf,NioManagedBuffer中的緩衝爲java.nio.ByteBuffer。NettyManagedBuffer和NioManagedBuffer的實現都很是簡單,留給讀者自行閱讀。本節挑選FileSegmentManagedBuffer做爲ManagedBuffer具體實現的例子進行介紹。
FileSegmentManagedBuffer的做用爲獲取一個文件中的一段,它一共有四個由final修飾的屬性,所有都經過FileSegmentManagedBuffer的構造器傳入屬性值,這四個屬性爲:
下面將逐個介紹FileSegmentManagedBuffer對於ManagedBuffer的實現。
代碼清單19 nioByteBuffer方法的實現
@Override public ByteBuffer nioByteBuffer() throws IOException { FileChannel channel = null; try { channel = new RandomAccessFile(file, "r").getChannel(); if (length < conf.memoryMapBytes()) { ByteBuffer buf = ByteBuffer.allocate((int) length); channel.position(offset); while (buf.remaining() != 0) { if (channel.read(buf) == -1) { throw new IOException(String.format("Reached EOF before filling buffer\n" + "offset=%s\nfile=%s\nbuf.remaining=%s", offset, file.getAbsoluteFile(), buf.remaining())); } } buf.flip(); return buf; } else { return channel.map(FileChannel.MapMode.READ_ONLY, offset, length); } } catch (IOException e) { try { if (channel != null) { long size = channel.size(); throw new IOException("Error in reading " + this + " (actual file length " + size + ")", e); } } catch (IOException ignored) { // ignore } throw new IOException("Error in opening " + this, e); } finally { JavaUtils.closeQuietly(channel); } }
nioByteBuffer的實現仍是很簡單的,主要利用RandomAccessFile獲取FileChannel,而後使用java.nio.ByteBuffer和FileChannel的API將數據寫入緩衝區java.nio.ByteBuffer中。
代碼清單20 createInputStream的實現
@Override public InputStream createInputStream() throws IOException { FileInputStream is = null; try { is = new FileInputStream(file); ByteStreams.skipFully(is, offset); return new LimitedInputStream(is, length); } catch (IOException e) { try { if (is != null) { long size = file.length(); throw new IOException("Error in reading " + this + " (actual file length " + size + ")", e); } } catch (IOException ignored) { // ignore } finally { JavaUtils.closeQuietly(is); } throw new IOException("Error in opening " + this, e); } catch (RuntimeException e) { JavaUtils.closeQuietly(is); throw e; } }
createInputStream的實現仍是很簡單的,這裏很少做介紹。
代碼清單21 convertToNetty的實現
@Override public Object convertToNetty() throws IOException { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { FileChannel fileChannel = new FileInputStream(file).getChannel(); return new DefaultFileRegion(fileChannel, offset, length); } }
[1] ChannelInboundHandler接口的實現及原理不屬於本書要分析的內容,感興趣的同窗能夠閱讀Netty的官方文檔或者研究Netty的源碼。
因爲TransportRequestHandler實際是把請求消息交給RpcHandler進一步處理的,因此這裏對RpcHandler首先作個介紹。RpcHandler是一個抽象類,定義了一些RPC處理器的規範,其主要實現見代碼清單22。
代碼清單22 RpcHandler的實現
public abstract class RpcHandler { private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback(); public abstract void receive( TransportClient client, ByteBuffer message, RpcResponseCallback callback); public abstract StreamManager getStreamManager(); public void receive(TransportClient client, ByteBuffer message) { receive(client, message, ONE_WAY_CALLBACK); } public void channelActive(TransportClient client) { } public void channelInactive(TransportClient client) { } public void exceptionCaught(Throwable cause, TransportClient client) { } private static class OneWayRpcCallback implements RpcResponseCallback { private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class); @Override public void onSuccess(ByteBuffer response) { logger.warn("Response provided for one-way RPC."); } @Override public void onFailure(Throwable e) { logger.error("Error response provided for one-way RPC.", e); } } }
代碼清單22中RpcHandler的各個方法的做用以下:
public interface RpcResponseCallback { void onSuccess(ByteBuffer response); void onFailure(Throwable e); }
介紹完RpcHandler,如今回到TransportRequestHandler的處理過程。TransportRequestHandler處理以上四種RequestMessage的實現見代碼清單23。
代碼清單23 TransportRequestHandler的handle方法
@Override public void handle(RequestMessage request) { if (request instanceof ChunkFetchRequest) { processFetchRequest((ChunkFetchRequest) request); } else if (request instanceof RpcRequest) { processRpcRequest((RpcRequest) request); } else if (request instanceof OneWayMessage) { processOneWayMessage((OneWayMessage) request); } else if (request instanceof StreamRequest) { processStreamRequest((StreamRequest) request); } else { throw new IllegalArgumentException("Unknown request type: " + request); } }
結合代碼清單23,下面逐一詳細分析這四種類型請求的處理過程。
processFetchRequest方法用於處理ChunkFetchRequest類型的消息,其實現見代碼清單24。
代碼清單24 processFetchRequest的實現
private void processFetchRequest(final ChunkFetchRequest req) { if (logger.isTraceEnabled()) { logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId); } ManagedBuffer buf; try { streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId); streamManager.registerChannel(channel, req.streamChunkId.streamId); buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex); } catch (Exception e) { logger.error(String.format("Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e); respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e))); return; } respond(new ChunkFetchSuccess(req.streamChunkId, buf)); }
代碼清單24中的streamManager是經過調用RpcHandler的getStreamManager方法獲取的StreamManager。processFetchRequest的處理都依託於RpcHandler的StreamManager,其處理步驟以下:
有關StreamManager的具體實現,讀者能夠參考《Spark內核設計的藝術》一書5.3.5節介紹的NettyStreamManager和《Spark內核設計的藝術》一書6.9.2節介紹的NettyBlockRpcServer中的OneForOneStreamManager。
processRpcRequest方法用於處理RpcRequest類型的消息,其實現見代碼清單25。
代碼清單25 processRpcRequest的實現
private void processRpcRequest(final RpcRequest req) { try { rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { respond(new RpcResponse(req.requestId, new NioManagedBuffer(response))); } @Override public void onFailure(Throwable e) { respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } }); } catch (Exception e) { logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e); respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } finally { req.body().release(); } }
代碼清單25中將RpcRequest消息的內容體、發送消息的客戶端以及一個RpcResponseCallback類型的匿名內部類做爲參數傳遞給了RpcHandler的receive方法。這就是說真正用於處理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。因爲RpcHandler是抽象類(見代碼清單22),其receive方法也是抽象方法,因此具體的操做將由RpcHandler的實現了receive方法的子類來完成。全部繼承RpcHandler的子類都須要在其receive方法的具體實現中回調RpcResponseCallback的onSuccess(處理成功時)或者onFailure(處理失敗時)方法。從RpcResponseCallback的實現來看,不管處理結果成功仍是失敗,都將調用respond方法對客戶端進行響應。
processStreamRequest方法用於處理StreamRequest類型的消息,其實現見代碼清單26。
代碼清單26 processStreamRequest的實現
private void processStreamRequest(final StreamRequest req) { ManagedBuffer buf; try { buf = streamManager.openStream(req.streamId);// 將獲取到的流數據封裝爲ManagedBuffer } catch (Exception e) { logger.error(String.format( "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e); respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e))); return; } if (buf != null) { respond(new StreamResponse(req.streamId, buf.size(), buf)); } else { respond(new StreamFailure(req.streamId, String.format( "Stream '%s' was not found.", req.streamId))); } }
代碼清單26中也使用了RpcHandler的StreamManager,其處理步驟以下:
processOneWayMessage方法用於處理StreamRequest類型的消息,其實現見代碼清單27。
代碼清單27 processOneWayMessage的實現
private void processOneWayMessage(OneWayMessage req) { try { rpcHandler.receive(reverseClient, req.body().nioByteBuffer()); } catch (Exception e) { logger.error("Error while invoking RpcHandler#receive() for one-way message.", e); } finally { req.body().release(); } }
processOneWayMessage方法的實現processRpcRequest很是類似,區別在於processOneWayMessage調用了代碼清單22中ONE_WAY_CALLBACK的receive方法,於是processOneWayMessage在處理完RPC請求後不會對客戶端做出響應。
從以上四種處理的分析能夠看出最終的處理都由RpcHandler及其內部組件完成。除了OneWayMessage的消息外,其他三種消息都是最終調用respond方法響應客戶端,其實現見代碼清單28。
代碼清單28 respond的實現
private void respond(final Encodable result) { final SocketAddress remoteAddress = channel.remoteAddress(); channel.writeAndFlush(result).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { logger.trace("Sent result {} to client {}", result, remoteAddress); } else { logger.error(String.format("Error sending result %s to %s; closing connection", result, remoteAddress), future.cause()); channel.close(); } } } ); }
能夠看到respond方法中實際調用了Channel的writeAndFlush方法[1]來響應客戶端。
TransportServer的構造器(見代碼清單10)中的bootstraps是TransportServerBootstrap的列表。接口TransportServerBootstrap定義了服務端引導程序的規範,服務端引導程序旨在當客戶端與服務端創建鏈接以後,在服務端持有的客戶端管道上執行的引導程序。TransportServerBootstrap的定義見代碼清單29。
代碼清單29 TransportServerBootstrap的定義
public interface TransportServerBootstrap { RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler); }
TransportServerBootstrap的doBootstrap方法將對服務端的RpcHandler進行代理,接收客戶端的請求。TransportServerBootstrap有SaslServerBootstrap和EncryptionCheckerBootstrap兩個實現類。爲了更清楚的說明TransportServerBootstrap的意義,咱們以SaslServerBootstrap爲例,來說解其實現(見代碼清單30)。
代碼清單30 SaslServerBootstrap的doBootstrap實現
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) { return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder); }
根據代碼清單30,咱們知道SaslServerBootstrap的doBootstrap方法實際建立了SaslRpcHandler,SaslRpcHandler負責對管道進行SASL(Simple Authentication and Security Layer)加密。SaslRpcHandler自己也繼承了RpcHandler,因此咱們重點來看其receive方法的實現,見代碼清單31。
代碼清單31 SaslRpcHandler的receive方法
@Override public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) { if (isComplete) { // 將消息傳遞給SaslRpcHandler所代理的下游RpcHandler並返回 delegate.receive(client, message, callback); return; } ByteBuf nettyBuf = Unpooled.wrappedBuffer(message); SaslMessage saslMessage; try { saslMessage = SaslMessage.decode(nettyBuf);// 對客戶端發送的消息進行SASL解密 } finally { nettyBuf.release(); } if (saslServer == null) { // 若是saslServer還未建立,則須要建立SparkSaslServer client.setClientId(saslMessage.appId); saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder, conf.saslServerAlwaysEncrypt()); } byte[] response; try { response = saslServer.response(JavaUtils.bufferToArray(// 使用saslServer處理已解密的消息 saslMessage.body().nioByteBuffer())); } catch (IOException ioe) { throw new RuntimeException(ioe); } callback.onSuccess(ByteBuffer.wrap(response)); if (saslServer.isComplete()) { logger.debug("SASL authentication successful for channel {}", client); isComplete = true;// SASL認證交換已經完成 if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) { logger.debug("Enabling encryption for channel {}", client); // 對管道進行SASL加密 SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize()); saslServer = null; } else { saslServer.dispose(); saslServer = null; } } }
根據代碼清單31,SaslRpcHandler處理客戶端消息的步驟以下:
SaslServerBootstrap是經過SaslRpcHandler對下游RpcHandler進行代理的一種TransportServerBootstrap。EncryptionCheckerBootstrap是另外一種TransportServerBootstrap的實現,它經過將自身加入Netty的管道中實現引導,EncryptionCheckerBootstrap的doBootstrap方法的實現見代碼清單32。
代碼清單32 EncryptionCheckerBootstrap的doBootstrap實現
@Override public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) { channel.pipeline().addFirst("encryptionChecker", this); return rpcHandler; }
在詳細介紹了TransportChannelHandler以後咱們就能夠對圖3-3進行擴展,把TransportRequestHandler、TransportServerBootstrap及RpcHandler的處理流程增長進來,如圖7所示。
圖7 RPC框架服務端處理請求、響應流程圖
有讀者可能會問,圖7中並未見TransportServerBootstrap的身影。根據對TransportServerBootstrap的兩種實現的舉例,咱們知道TransportServerBootstrap將可能存在於圖中任何兩個組件的箭頭連線中間,起到引導、包裝、代理的做用。
在介紹完服務端RpcHandler對請求消息的處理以後,如今來看看客戶端發送RPC請求的原理。咱們在分析代碼清單13中的createChannelHandler方法時,看到調用了TransportClient的構造器(見代碼清單33),其中TransportResponseHandler的引用將賦給handler屬性。
代碼清單33 TransportClient的構造器
public TransportClient(Channel channel, TransportResponseHandler handler) { this.channel = Preconditions.checkNotNull(channel); this.handler = Preconditions.checkNotNull(handler); this.timedOut = false; }
TransportClient一共有五個方法用於發送請求,分別爲:
本節只選擇最經常使用的sendRpc和fetchChunk進行分析,其他實現均可以舉一反三。
sendRpc方法的實現見代碼清單34。
代碼清單34 sendRpc的實現
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) { final long startTime = System.currentTimeMillis(); if (logger.isTraceEnabled()) { logger.trace("Sending RPC to {}", getRemoteAddress(channel)); } // 使用UUID生成請求主鍵requestId final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); handler.addRpcRequest(requestId, callback);// 添加requestId與RpcResponseCallback的引用之間的關係 // 發送RPC請求 channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; if (logger.isTraceEnabled()) { logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken); } } else { String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId, getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); handler.removeRpcRequest(requestId); channel.close(); try { callback.onFailure(new IOException(errorMsg, future.cause())); } catch (Exception e) { logger.error("Uncaught exception in RPC response callback handler!", e); } } } }); return requestId; }
結合代碼清單34,咱們知道sendRpc方法的實現步驟以下:
1) 使用UUID生成請求主鍵requestId;
2) 調用addRpcRequest向handler(特別提醒下讀者這裏的handler不是RpcHandler,而是經過TransportClient構造器傳入的TransportResponseHandler)添加requestId與回調類RpcResponseCallback的引用之間的關係。TransportResponseHandler的addRpcRequest方法(見代碼清單35)將更新最後一次請求的時間爲當前系統時間,而後將requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中。outstandingRpcs專門用於緩存發出的RPC請求信息。
代碼清單35 添加RPC請求到緩存
public void addRpcRequest(long requestId, RpcResponseCallback callback) { updateTimeOfLastRequest(); outstandingRpcs.put(requestId, callback); }
3) 調用Channel的writeAndFlush方法將RPC請求發送出去,這和在代碼清單28中服務端調用的respond方法響應客戶端的同樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。若是發送成功,那麼只會打印requestId、遠端地址及花費時間的日誌,若是發送失敗,除了打印錯誤日誌外,還要調用TransportResponseHandler的removeRpcRequest方法(見代碼清單36)將這次請求從outstandingRpcs緩存中移除。
代碼清單36 從緩存中刪除RPC請求
public void removeRpcRequest(long requestId) { outstandingRpcs.remove(requestId); }
請求發送成功後,客戶端將等待接收服務端的響應。根據圖3,返回的消息也會傳遞給TransportChannelHandler的channelRead方法(見代碼清單14),根據以前的分析,消息的分析將最後交給TransportResponseHandler的handle方法來處理。TransportResponseHandler的handle方法分別對圖5中的六種ResponseMessage進行處理,因爲服務端使用processRpcRequest方法(見代碼清單25)處理RpcRequest類型的消息後返回給客戶端的消息爲RpcResponse或RpcFailure,因此咱們來看看客戶端的TransportResponseHandler的handle方法是如何處理RpcResponse和RpcFailure,見代碼清單37。
代碼清單37 RpcResponse和RpcFailure消息的處理
} else if (message instanceof RpcResponse) { RpcResponse resp = (RpcResponse) message; RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);// 獲取RpcResponseCallback if (listener == null) { logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding", resp.requestId, getRemoteAddress(channel), resp.body().size()); } else { outstandingRpcs.remove(resp.requestId); try { listener.onSuccess(resp.body().nioByteBuffer()); } finally { resp.body().release(); } } } else if (message instanceof RpcFailure) { RpcFailure resp = (RpcFailure) message; RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); // 獲取RpcResponseCallback if (listener == null) { logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding", resp.requestId, getRemoteAddress(channel), resp.errorString); } else { outstandingRpcs.remove(resp.requestId); listener.onFailure(new RuntimeException(resp.errorString)); }
從代碼清單37看到,處理RpcResponse的邏輯爲:
處理RpcFailure的邏輯爲:
fetchChunk的實現見代碼清單38。
代碼清單38 fetchChunk的實現
public void fetchChunk( long streamId, final int chunkIndex, final ChunkReceivedCallback callback) { final long startTime = System.currentTimeMillis(); if (logger.isDebugEnabled()) { logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel)); } final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);// 建立StreamChunkId // 添加StreamChunkId與ChunkReceivedCallback之間的對應關係 handler.addFetchRequest(streamChunkId, callback); // 發送塊請求 channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; if (logger.isTraceEnabled()) { logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel), timeTaken); } } else { String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId, getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); handler.removeFetchRequest(streamChunkId); channel.close(); try { callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause())); } catch (Exception e) { logger.error("Uncaught exception in RPC response callback handler!", e); } } } }); }
結合代碼清單38,咱們知道fetchChunk方法的實現步驟以下:
1) 使用流的標記streamId和塊的索引chunkIndex建立StreamChunkId;
2) 調用addFetchRequest向handler(特別提醒下讀者這裏的handler不是RpcHandler,而是經過TransportClient構造器傳入的TransportResponseHandler)添加StreamChunkId與回調類ChunkReceivedCallback的引用之間的關係。TransportResponseHandler的addFetchRequest方法(見代碼清單39)將更新最後一次請求的時間爲當前系統時間,而後將StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中。outstandingFetches專門用於緩存發出的塊請求信息。
代碼清單39 添加塊請求到緩存
public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) { updateTimeOfLastRequest(); outstandingFetches.put(streamChunkId, callback); }
3) 調用Channel的writeAndFlush方法將塊請求發送出去,這和在代碼清單28中服務端調用的respond方法響應客戶端的同樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。若是發送成功,那麼只會打印StreamChunkId、遠端地址及花費時間的日誌,若是發送失敗,除了打印錯誤日誌外,還要調用TransportResponseHandler的removeFetchRequest方法(見代碼清單40)將這次請求從outstandingFetches緩存中移除。
代碼清單40 從緩存中刪除RPC請求
public void removeRpcRequest(long requestId) { outstandingRpcs.remove(requestId); }
請求發送成功後,客戶端將等待接收服務端的響應。根據圖3,返回的消息也會傳遞給TransportChannelHandler的channelRead方法(見代碼清單14),根據以前的分析,消息的分析將最後交給TransportResponseHandler的handle方法來處理。TransportResponseHandler的handle方法分別對圖5中的六種處理結果進行處理,因爲服務端使用processFetchRequest方法(見代碼清單24)處理ChunkFetchRequest類型的消息後返回給客戶端的消息爲ChunkFetchSuccess或ChunkFetchFailure,因此咱們來看看客戶端的TransportResponseHandler的handle方法是如何處理ChunkFetchSuccess和ChunkFetchFailure,見代碼清單41。
代碼清單41 ChunkFetchSuccess和ChunkFetchFailure消息的處理
if (message instanceof ChunkFetchSuccess) { ChunkFetchSuccess resp = (ChunkFetchSuccess) message; ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { logger.warn("Ignoring response for block {} from {} since it is not outstanding", resp.streamChunkId, getRemoteAddress(channel)); resp.body().release(); } else { outstandingFetches.remove(resp.streamChunkId); listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body()); resp.body().release(); } } else if (message instanceof ChunkFetchFailure) { ChunkFetchFailure resp = (ChunkFetchFailure) message; ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding", resp.streamChunkId, getRemoteAddress(channel), resp.errorString); } else { outstandingFetches.remove(resp.streamChunkId); listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException( "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString)); } }
從代碼清單41看到,處理ChunkFetchSuccess的邏輯爲:
處理ChunkFetchFailure的邏輯爲:
在詳細介紹了TransportClient和TransportResponseHandler以後,對於客戶端咱們就能夠擴展圖3,把TransportResponseHandler及TransportClient的處理流程增長進來,如圖8所示。
圖8 客戶端請求、響應流程圖
圖8中的序號①表示調用TransportResponseHandler的addRpcRequest方法(或addFetchRequest方法)將更新最後一次請求的時間爲當前系統時間,而後將requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中(或將StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中)。②表示調用Channel的writeAndFlush方法將RPC請求發送出去。圖中的虛線表示當TransportResponseHandler處理RpcResponse和RpcFailure時將從outstandingRpcs緩存中獲取此請求對應的RpcResponseCallback(或處理ChunkFetchSuccess和ChunkFetchFailure時將從outstandingFetches緩存中獲取StreamChunkId對應的ChunkReceivedCallback),並執行回調。此外,TransportClientBootstrap將可能存在於圖8中任何兩個組件的箭頭連線中間。
通過近一年的準備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
紙質版售賣連接以下: