Spark2.1.0——內置RPC框架詳解

Spark2.1.0——內置RPC框架詳解

         在Spark中不少地方都涉及網絡通訊,好比Spark各個組件間的消息互通、用戶文件與Jar包的上傳、節點間的Shuffle過程、Block數據的複製與備份等。在Spark 0.x.x與Spark 1.x.x版本中,組件間的消息通訊主要藉助於Akka[1],使用Akka能夠輕鬆的構建強有力的高併發與分佈式應用。可是Akka在Spark 2.0.0版本中被移除了,Spark官網文檔對此的描述爲:「Akka的依賴被移除了,所以用戶可使用任何版本的Akka來編程了。」Spark團隊的決策者或許認爲對於Akka具體版本的依賴,限制了用戶對於Akka不一樣版本的使用。儘管如此,筆者依然認爲Akka是一款很是優秀的開源分佈式系統,我參與的一些Java Application或者Java Web就利用Akka的豐富特性實現了分佈式一致性、最終一致性以及分佈式事務等分佈式環境面對的問題。在Spark 1.x.x版本中,用戶文件與Jar包的上傳採用了由Jetty[2]實現的HttpFileServer,但在Spark 2.0.0版本中也被廢棄了,如今使用的是基於Spark內置RPC框架的NettyStreamManager。節點間的Shuffle過程和Block數據的複製與備份這兩個部分在Spark 2.0.0版本中依然沿用了Netty[3],經過對接口和程序進行從新設計將各個組件間的消息互通、用戶文件與Jar包的上傳等內容統一歸入到Spark的RPC框架體系中。html

         咱們先來看看RPC框架的基本架構,如圖1所示。java

圖1       Spark內置RPC框架的基本架構數據庫

TransportContext內部包含傳輸上下文的配置信息TransportConf和對客戶端請求消息進行處理的RpcHandler。TransportConf在建立TransportClientFactory和TransportServer時都是必須的,而RpcHandler只用於建立TransportServer。TransportClientFactory是RPC客戶端的工廠類。TransportServer是RPC服務端的實現。圖中記號的含義以下:編程

記號①:表示經過調用TransportContext的createClientFactory方法建立傳輸客戶端工廠TransportClientFactory的實例。在構造TransportClientFactory的實例時,還會傳遞客戶端引導程序TransportClientBootstrap的列表。此外,TransportClientFactory內部還存在針對每一個Socket地址的鏈接池ClientPool,這個鏈接池緩存的定義以下:bootstrap

  private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;

ClientPool的類型定義以下:數組

  private static class ClientPool {
    TransportClient[] clients;
    Object[] locks;

    ClientPool(int size) {
      clients = new TransportClient[size];
      locks = new Object[size];
      for (int i = 0; i < size; i++) {
        locks[i] = new Object();
      }
    }
  } 

因而可知,ClientPool實際是由TransportClient的數組構成,而locks數組中的Object與clients數組中的TransportClient按照數組索引一一對應,經過對每一個TransportClient分別採用不一樣的鎖,下降併發狀況下線程間對鎖的爭用,進而減小阻塞,提升併發度。緩存

記號②:表示經過調用TransportContext的createServer方法建立傳輸服務端TransportServer的實例。在構造TransportServer的實例時,須要傳遞TransportContext、host、port、RpcHandler以及服務端引導程序TransportServerBootstrap的列表。安全

         有了對Spark內置RPC框架的基本架構的瞭解,如今正式介紹Spark的RPC框架所包含的各個組件:服務器

  • TransportContext:傳輸上下文,包含了用於建立傳輸服務端(TransportServer)和傳輸客戶端工廠(TransportClientFactory)的上下文信息,並支持使用TransportChannelHandler設置Netty提供的SocketChannel的Pipeline的實現。
  • TransportConf:傳輸上下文的配置信息。
  • RpcHandler:對調用傳輸客戶端(TransportClient)的sendRPC方法發送的消息進行處理的程序。
  • MessageEncoder:在將消息放入管道前,先對消息內容進行編碼,防止管道另外一端讀取時丟包和解析錯誤。
  • MessageDecoder:對從管道中讀取的ByteBuf進行解析,防止丟包和解析錯誤;
  • TransportFrameDecoder:對從管道中讀取的ByteBuf按照數據幀進行解析;
  • RpcResponseCallback:RpcHandler對請求的消息處理完畢後,進行回調的接口。
  • TransportClientFactory:建立傳輸客戶端(TransportClient)的傳輸客戶端工廠類。
  • ClientPool:在兩個對等節點間維護的關於傳輸客戶端(TransportClient)的池子。ClientPool是TransportClientFactory的內部組件。
  • TransportClient:RPC框架的客戶端,用於獲取預先協商好的流中的連續塊。TransportClient旨在容許有效傳輸大量數據,這些數據將被拆分紅幾百KB到幾MB的塊。當TransportClient處理從流中獲取的獲取的塊時,實際的設置是在傳輸層以外完成的。sendRPC方法可以在客戶端和服務端的同一水平線的通訊進行這些設置。
  • TransportClientBootstrap:當服務端響應客戶端鏈接時在客戶端執行一次的引導程序。
  • TransportRequestHandler:用於處理客戶端的請求並在寫完塊數據後返回的處理程序。
  • TransportResponseHandler:用於處理服務端的響應,而且對發出請求的客戶端進行響應的處理程序。
  • TransportChannelHandler:代理由TransportRequestHandler處理的請求和由TransportResponseHandler處理的響應,並加入傳輸層的處理。
  • TransportServerBootstrap:當客戶端鏈接到服務端時在服務端執行一次的引導程序。
  • TransportServer:RPC框架的服務端,提供高效的、低級別的流服務。

拓展知識:爲何須要MessageEncoder和MessageDecoder?由於在基於流的傳輸裏(好比TCP/IP),接收到的數據首先會被存儲到一個socket接收緩衝裏。不幸的是,基於流的傳輸並非一個數據包隊列,而是一個字節隊列。即便你發送了2個獨立的數據包,操做系統也不會做爲2個消息處理而僅僅認爲是一連串的字節。所以不能保證遠程寫入的數據會被準確地讀取。舉個例子,讓咱們假設操做系統的TCP/TP協議棧已經接收了3個數據包:ABC、DEF、GHI。因爲基於流傳輸的協議的這種統一的性質,在你的應用程序在讀取數據的時候有很高的可能性被分紅下面的片斷:AB、CDEFG、H、I。所以,接收方無論是客戶端仍是服務端,都應該把接收到的數據整理成一個或者多個更有意義而且讓程序的邏輯更好理解的數據。網絡


[1]  Akka是基於Actor併發編程模型實現的併發的分佈式的框架。Akka是用Scala語言編寫的,它提供了Java和Scala兩種語言的API,減小開發人員對併發的細節處理,並保證分佈式調用的最終一致性。在附錄B中有關於Akka的進一步介紹,感興趣的讀者不妨一讀。

[2]  Jetty 是一個開源的Servlet容器,它爲基於Java的Web容器,例如JSP和Servlet提供運行環境。Jetty是使用Java語言編寫的,它的API以一組JAR包的形式發佈。開發人員能夠將Jetty容器實例化成一個對象,能夠迅速爲一些獨立運行的Java應用提供網絡和Web鏈接。在附錄C中有對Jetty的簡單介紹,感興趣的讀者能夠選擇閱讀。

[3]  Netty是由Jboss提供的一個基於NIO的客戶、服務器端編程框架,使用Netty 能夠確保你快速、簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。附錄G中有對Netty的簡單介紹,感興趣的讀者能夠一讀。


 

1、RPC配置TransportConf

         上文提到TransportContext中的TransportConf給Spark的RPC框架提供配置信息,它有兩個成員屬性——配置提供者conf和配置的模塊名稱module。這兩個屬性的定義以下:

  private final ConfigProvider conf;
  private final String module;

其中conf是真正的配置提供者,其類型ConfigProvider是一個抽象類,見代碼清單1。

代碼清單1  ConfigProvider的實現

public abstract class ConfigProvider {
  public abstract String get(String name);

  public String get(String name, String defaultValue) {
    try {
      return get(name);
    } catch (NoSuchElementException e) {
      return defaultValue;
    }
  }

  public int getInt(String name, int defaultValue) {
    return Integer.parseInt(get(name, Integer.toString(defaultValue)));
  }

  public long getLong(String name, long defaultValue) {
    return Long.parseLong(get(name, Long.toString(defaultValue)));
  }

  public double getDouble(String name, double defaultValue) {
    return Double.parseDouble(get(name, Double.toString(defaultValue)));
  }

  public boolean getBoolean(String name, boolean defaultValue) {
    return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue)));
  }
}

從代碼清單1,能夠看到ConfigProvider中包括get、getInt、getLong、getDouble、getBoolean等方法,這些方法都是基於抽象方法get獲取值,通過一次類型轉換而實現。這個抽象的get方法將須要子類去實現。

         Spark一般使用SparkTransportConf建立TransportConf,其實現見代碼清單2。

代碼清單2  SparkTransportConf的實現

object SparkTransportConf {
  private val MAX_DEFAULT_NETTY_THREADS = 8
  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
    val conf = _conf.clone
    val numThreads = defaultNumThreads(numUsableCores)
    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)

    new TransportConf(module, new ConfigProvider {
      override def get(name: String): String = conf.get(name)
    })
  }
  private def defaultNumThreads(numUsableCores: Int): Int = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
  }
}

從代碼清單2看到,可使用SparkTransportConf的fromSparkConf方法來構造TransportConf。傳遞的三個參數分別爲SparkConf、模塊名module及可用的內核數numUsableCores。若是numUsableCores小於等於0,那麼線程數是系統可用處理器的數量,不過系統的內核數不可能所有用於網絡傳輸使用,因此這裏還將分配給網絡傳輸的內核數量最多限制在8個。最終肯定的線程數將被用於設置客戶端傳輸線程數(spark.$module.io.clientThreads屬性)和服務端傳輸線程數(spark.$module.io.serverThreads屬性)。fromSparkConf最終構造TransportConf對象時傳遞的ConfigProvider爲實現了get方法的匿名的內部類,get的實現實際是代理了SparkConf的get方法。

 

2、RPC客戶端工廠TransportClientFactory

         TransportClientFactory是建立傳輸客戶端(TransportClient)的工廠類。在說明圖3-1中的記號①時提到過TransportContext的createClientFactory方法能夠建立TransportClientFactory的實例,其實現見代碼清單3。

代碼清單3  建立客戶端工廠

  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
    return new TransportClientFactory(this, bootstraps);
  }

  public TransportClientFactory createClientFactory() {
    return createClientFactory(Lists.<TransportClientBootstrap>newArrayList());
  }

能夠看到TransportContext中有兩個重載的createClientFactory方法,它們最終在構造TransportClientFactory時都會傳遞兩個參數:TransportContext和TransportClientBootstrap列表。TransportClientFactory構造器的實現見代碼清單4。

代碼清單4  TransportClientFactory的構造器

  public TransportClientFactory(
      TransportContext context,
      List<TransportClientBootstrap> clientBootstraps) {
    this.context = Preconditions.checkNotNull(context);
    this.conf = context.getConf();
    this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
    this.connectionPool = new ConcurrentHashMap<>();
    this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
    this.rand = new Random();

    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
    this.workerGroup = NettyUtils.createEventLoop(
        ioMode,
        conf.clientThreads(),
        conf.getModuleName() + "-client");
    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
  } 

TransportClientFactory構造器中的各個變量分別爲:

  • context:即參數傳遞的TransportContext的引用;
  • conf:即TransportConf,這裏經過調用TransportContext的getConf獲取;
  • clientBootstraps:即參數傳遞的TransportClientBootstrap列表;
  • connectionPool:即針對每一個Socket地址的鏈接池ClientPool的緩存;connectionPool的數據結構較爲複雜,爲便於讀者理解,這裏以圖2來表示connectionPool的數據結構。

圖2       TransportClientFactory的connectionPool

  • numConnectionsPerPeer:即從TransportConf獲取的key爲」spark.+模塊名+.io.numConnectionsPerPeer」的屬性值。此屬性值用於指定對等節點間的鏈接數。這裏的模塊名實際爲TransportConf的module字段,Spark的不少組件都利用RPC框架構建,它們之間按照模塊名區分,例如RPC模塊的key爲「spark.rpc.io.numConnectionsPerPeer」;
  • rand:對Socket地址對應的鏈接池ClientPool中緩存的TransportClient進行隨機選擇,對每一個鏈接作負載均衡;
  • ioMode:IO模式,即從TransportConf獲取key爲」spark.+模塊名+.io.mode」的屬性值。默認值爲NIO,Spark還支持EPOLL;
  • socketChannelClass:客戶端Channel被建立時使用的類,經過ioMode來匹配,默認爲NioSocketChannel,Spark還支持EpollEventLoopGroup;
  • workerGroup:根據Netty的規範,客戶端只有worker組,因此此處建立workerGroup。workerGroup的實際類型是NioEventLoopGroup;
  • pooledAllocator :聚集ByteBuf但對本地線程緩存禁用的分配器。

TransportClientFactory裏大量使用了NettyUtils,關於NettyUtils的具體實現,請看附錄G。[1]


提示:NIO是指Java中New IO的簡稱,其特色包括:爲全部的原始類型提供(Buffer)緩衝支持;字符集編碼解碼解決方案;提供一個新的原始I/O 抽象Channel,支持鎖和內存映射文件的文件訪問接口;提供多路非阻塞式(non-bloking)的高伸縮性網絡I/O 。其具體使用屬於Java語言的範疇,本文不過多介紹。


[1] Spark將對Netty框架的使用細節都封裝在NettyUtils工具類中,因爲Netty的API使用不屬於本書主要闡述的內容,故此放入附錄G中,對Netty的使用感興趣的讀者能夠選擇閱讀。


 2.一、客戶端引導程序TransportClientBootstrap

         TransportClientFactory的clientBootstraps屬性是TransportClientBootstrap的列表。TransportClientBootstrap是在TransportClient上執行的客戶端引導程序,主要對鏈接創建時進行一些初始化的準備(例如驗證、加密)。TransportClientBootstrap所做的操做每每是昂貴的,好在創建的鏈接能夠重用。TransportClientBootstrap的接口定義見代碼清單5。

代碼清單5         TransportClientBootstrap的定義

public interface TransportClientBootstrap {
  void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

TransportClientBootstrap有兩個實現類:EncryptionDisablerBootstrap和SaslClientBootstrap。爲了對TransportClientBootstrap的做用能有更深的瞭解,這裏以EncryptionDisablerBootstrap爲例,EncryptionDisablerBootstrap的實現見代碼清單6。

代碼清單6         EncryptionDisablerBootstrap的實現

  private static class EncryptionDisablerBootstrap implements TransportClientBootstrap {
    @Override
    public void doBootstrap(TransportClient client, Channel channel) {
      channel.pipeline().remove(SaslEncryption.ENCRYPTION_HANDLER_NAME);
    }
  }

根據代碼清單6,能夠看到EncryptionDisablerBootstrap的做用是移除客戶端管道中的SASL加密。

2.二、建立Rpc客戶端TransportClient

         有了TransportClientFactory,Spark的各個模塊就可使用它建立RPC客戶端TransportClient了。每一個TransportClient實例只能和一個遠端的RPC服務通訊,因此Spark中的組件若是想要和多個RPC服務通訊,就須要持有多個TransportClient實例。建立TransportClient的方法見代碼清單7(實際爲從緩存中獲取TransportClient)。

代碼清單7        從緩存獲取TransportClient 

  public TransportClient createClient(String remoteHost, int remotePort)
      throws IOException, InterruptedException {
    // 建立InetSocketAddress
    final InetSocketAddress unresolvedAddress =
      InetSocketAddress.createUnresolved(remoteHost, remotePort);

    ClientPool clientPool = connectionPool.get(unresolvedAddress);
    if (clientPool == null) {
      connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
      clientPool = connectionPool.get(unresolvedAddress);
    }
    
    int clientIndex = rand.nextInt(numConnectionsPerPeer); // 隨機選擇一個TransportClient
    TransportClient cachedClient = clientPool.clients[clientIndex];

    if (cachedClient != null && cachedClient.isActive()) {// 獲取並返回激活的TransportClient
      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
        .get(TransportChannelHandler.class);
      synchronized (handler) {
        handler.getResponseHandler().updateTimeOfLastRequest();
      }

      if (cachedClient.isActive()) {
        logger.trace("Returning cached connection to {}: {}",
          cachedClient.getSocketAddress(), cachedClient);
        return cachedClient;
      }
    }

    final long preResolveHost = System.nanoTime();
    final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
    final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
    if (hostResolveTimeMs > 2000) {
      logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
    } else {
      logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
    }
    // 建立並返回TransportClient對象
    synchronized (clientPool.locks[clientIndex]) {
      cachedClient = clientPool.clients[clientIndex];

      if (cachedClient != null) {
        if (cachedClient.isActive()) {
          logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
          return cachedClient;
        } else {
          logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
        }
      }
      clientPool.clients[clientIndex] = createClient(resolvedAddress); 
      return clientPool.clients[clientIndex];
    }
  } 

從代碼清單7得知,建立TransportClient的步驟以下:

  1. 調用InetSocketAddress的靜態方法createUnresolved構建InetSocketAddress(這種方式建立InetSocketAddress,能夠在緩存中已經有TransportClient時避免沒必要要的域名解析),而後從connectionPool中獲取與此地址對應的ClientPool,若是沒有則須要新建ClientPool,並放入緩存connectionPool中;
  2. 根據numConnectionsPerPeer的大小(使用「spark.+模塊名+.io.numConnectionsPerPeer」屬性配置),從ClientPool中隨機選擇一個TransportClient;
  3. 若是ClientPool的clients中在隨機產生索引位置不存在TransportClient或者TransportClient沒有激活,則進入第5)步,不然對此TransportClient進行第4)步的檢查;
  4. 更新TransportClient的channel中配置的TransportChannelHandler的最後一次使用時間,確保channel沒有超時,而後檢查TransportClient是不是激活狀態,最後返回此TransportClient給調用方;
  5. 因爲緩存中沒有TransportClient可用,因而調用InetSocketAddress的構造器建立InetSocketAddress對象(直接使用InetSocketAddress的構造器建立InetSocketAddress,會進行域名解析),在這一步驟多個線程可能會產生競態條件(因爲沒有同步處理,因此多個線程極有可能同時執行到此處,都發現緩存中沒有TransportClient可用,因而都使用InetSocketAddress的構造器建立InetSocketAddress);
  6. 第5步中建立InetSocketAddress的過程當中產生的競態條件若是不妥善處理,會產生線程安全問題,因此到了ClientPool的locks數組發揮做用的時候了。按照隨機產生的數組索引,locks數組中的鎖對象能夠對clients數組中的TransportClient一對一進行同步。即使以前產生了競態條件,可是在這一步只能有一個線程進入臨界區。在臨界區內,先進入的線程調用重載的createClient方法建立TransportClient對象並放入ClientPool的clients數組中。當率先進入臨界區的線程退出臨界區後,其餘線程才能進入,此時發現ClientPool的clients數組中已經存在了TransportClient對象,那麼將再也不建立TransportClient,而是直接使用它。

代碼清單7的整個執行過程實際解決了TransportClient緩存的使用以及createClient方法的線程安全問題,並無涉及建立TransportClient的實現。TransportClient的建立過程在重載的createClient方法(見代碼清單8)中實現。

代碼清單8         建立TransportClient

  private TransportClient createClient(InetSocketAddress address)
      throws IOException, InterruptedException {
    logger.debug("Creating new connection to {}", address);
    // 構建根引導器Bootstrap並對其進行配置
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(workerGroup)
      .channel(socketChannelClass)
      .option(ChannelOption.TCP_NODELAY, true)
      .option(ChannelOption.SO_KEEPALIVE, true)
      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
      .option(ChannelOption.ALLOCATOR, pooledAllocator);

    final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
    final AtomicReference<Channel> channelRef = new AtomicReference<>();
    // 爲根引導程序設置管道初始化回調函數
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) {
        TransportChannelHandler clientHandler = context.initializePipeline(ch);
        clientRef.set(clientHandler.getClient());
        channelRef.set(ch);
      }
    });

    long preConnect = System.nanoTime();
    ChannelFuture cf = bootstrap.connect(address);// 使用根引導程序鏈接遠程服務器
    if (!cf.await(conf.connectionTimeoutMs())) {
      throw new IOException(
        String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
    } else if (cf.cause() != null) {
      throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
    }

    TransportClient client = clientRef.get();
    Channel channel = channelRef.get();
    assert client != null : "Channel future completed successfully with null client";

    // Execute any client bootstraps synchronously before marking the Client as successful.
    long preBootstrap = System.nanoTime();
    logger.debug("Connection to {} successful, running bootstraps...", address);
    try {
      for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
        clientBootstrap.doBootstrap(client, channel);// 給TransportClient設置客戶端引導程序
      }
    } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
      long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
      logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
      client.close();
      throw Throwables.propagate(e);
    }
    long postBootstrap = System.nanoTime();

    logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
      address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);

    return client;
  }

從代碼清單8得知,真正建立TransportClient的步驟以下:

  1. 構建根引導器Bootstrap並對其進行配置;
  2. 爲根引導程序設置管道初始化回調函數,此回調函數將調用TransportContext的initializePipeline方法初始化Channel的pipeline;
  3. 使用根引導程序鏈接遠程服務器,當鏈接成功對管道初始化時會回調初始化回調函數,將TransportClient和Channel對象分別設置到原子引用clientRef與channelRef中;
  4. 給TransportClient設置客戶端引導程序,即設置TransportClientFactory中的TransportClientBootstrap列表;
  5. 最後返回此TransportClient對象。

 3、RPC服務器TransportServer

         TransportServer是RPC框架的服務端,可提供高效的、低級別的流服務。在說明圖1中的記號②時提到過TransportContext的createServer方法用於建立TransportServer,其實現見代碼清單9。

代碼清單9         建立RPC服務端

  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, null, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
    return createServer(0, bootstraps);
  }

  public TransportServer createServer() {
    return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
  }

代碼清單9中列出了四個名爲createServer的重載方法,可是它們最終調用了TransportServer的構造器(見代碼清單10)來建立TransportServer實例。

代碼清單10         TransportServer的構造器

  public TransportServer(
      TransportContext context,
      String hostToBind,
      int portToBind,
      RpcHandler appRpcHandler,
      List<TransportServerBootstrap> bootstraps) {
    this.context = context;
    this.conf = context.getConf();
    this.appRpcHandler = appRpcHandler;
    this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

    try {
      init(hostToBind, portToBind);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(this);
      throw e;
    }
  }

TransportServer的構造器中的各個變量分別爲:

  • context:即參數傳遞的TransportContext的引用;
  • conf:即TransportConf,這裏經過調用TransportContext的getConf獲取;
  • appRpcHandler:即RPC請求處理器RpcHandler;
  • bootstraps:即參數傳遞的TransportServerBootstrap列表;

         TransportServer的構造器(見代碼清單10)中調用了init方法, init方法用於對TransportServer進行初始化,見代碼清單11。

代碼清單11         TransportServer的初始化

  private void init(String hostToBind, int portToBind) {
    // 根據Netty的API文檔,Netty服務端需同時建立bossGroup和workerGroup
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    EventLoopGroup bossGroup =
      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
    EventLoopGroup workerGroup = bossGroup;
    // 建立一個聚集ByteBuf但對本地線程緩存禁用的分配器
    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
    // 建立Netty的服務端根引導程序並對其進行配置
    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, allocator)
      .childOption(ChannelOption.ALLOCATOR, allocator);

    if (conf.backLog() > 0) {
      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    }
    if (conf.receiveBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    }
    if (conf.sendBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    }
    // 爲根引導程序設置管道初始化回調函數
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        context.initializePipeline(ch, rpcHandler);
      }
    });
    // 給根引導程序綁定Socket的監聽端口
    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    logger.debug("Shuffle server started on port: {}", port);
  }

代碼清單11中TransportServer初始化的步驟以下:

  1. 建立bossGroup和workerGroup;
  2. 建立一個聚集ByteBuf但對本地線程緩存禁用的分配器;
  3. 調用Netty的API建立Netty的服務端根引導程序並對其進行配置;
  4. 爲根引導程序設置管道初始化回調函數,此回調函數首先設置TransportServerBootstrap到根引導程序中,而後調用TransportContext的initializePipeline方法初始化Channel的pipeline;
  5. 給根引導程序綁定Socket的監聽端口,最後返回監聽的端口。

小貼士:根據Netty的API文檔,Netty服務端需同時建立bossGroup和workerGroup。

提示:代碼清單11中使用了NettyUtils工具類的不少方法,在附錄G中有對它們的詳細介紹。EventLoopGroup、PooledByteBufAllocator、ServerBootstrap都是Netty提供的API,對於它們的更多介紹,請訪問http://netty.io/


 4、管道初始化

         在代碼清單8建立TransportClient和代碼清單11對TransportServer初始化的實現中都在管道初始化回調函數中調用了TransportContext的initializePipeline方法,initializePipeline方法(見代碼清單12)將調用Netty的API對管道初始化。

代碼清單12         管道初始化

  public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        .addLast("handler", channelHandler);
      return channelHandler;
    } catch (RuntimeException e) {
      logger.error("Error while initializing Netty pipeline", e);
      throw e;
    }
  }

 根據代碼清單12,initializePipeline方法的執行步驟以下:

  1. 調用createChannelHandler方法建立TransportChannelHandler,從createChannelHandler的實現(見代碼清單13)中能夠看到真正建立TransportClient是在這裏發生的。從TransportClient的構造過程看到RpcHandler 與TransportClient毫無關係,TransportClient只使用了TransportResponseHandler。TransportChannelHandler在服務端將代理TransportRequestHandler對請求消息進行處理,並在客戶端代理TransportResponseHandler對響應消息進行處理。
  2. 對管道進行設置,這裏的ENCODER(即MessageEncoder)派生自Netty的ChannelOutboundHandler接口;DECODER(即MessageDecoder)、TransportChannelHandler以及TransportFrameDecoder(由工具類NettyUtils的靜態方法createFrameDecoder建立)派生自Netty的ChannelInboundHandler接口;IdleStateHandler同時實現了ChannelOutboundHandler和ChannelInboundHandler接口。根據Netty的API行爲,經過addLast方法註冊多個handler時,ChannelInboundHandler按照註冊的前後順序執行;ChannelOutboundHandler按照註冊的前後順序逆序執行,所以在管道兩端(不管是服務端仍是客戶端)處理請求和響應的流程如圖3所示。

 代碼清單13         建立TransportChannelHandler

  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler);
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

圖3       管道處理請求和響應的流程圖

 5、TransportChannelHandler詳解

 

         TransportChannelHandler實現了Netty的ChannelInboundHandler[1],以便對Netty管道中的消息進行處理。圖3中的這些Handler(除了MessageEncoder)因爲都實現了ChannelInboundHandler接口,做爲自定義的ChannelInboundHandler,於是都要重寫channelRead方法。Netty框架使用工做鏈模式來對每一個ChannelInboundHandler的實現類的channelRead方法進行鏈式調用。TransportChannelHandler實現的channelRead方法見代碼清單14。

代碼清單14       TransportChannelHandler的channelRead實現

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
    } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
    } else {
      ctx.fireChannelRead(request);
    }
  }

從代碼清單14看到,當TransportChannelHandler讀取到的request是RequestMessage類型時,則將此消息的處理進一步交給TransportRequestHandler,當request是ResponseMessage時,則將此消息的處理進一步交給TransportResponseHandler。

5.一、MessageHandler的繼承體系

         TransportRequestHandler與TransportResponseHandler都繼承自抽象類MessageHandler,MessageHandler定義了子類的規範,詳細定義見代碼清單15。

代碼清單15         MessageHandler規範

public abstract class MessageHandler<T extends Message> {
  public abstract void handle(T message) throws Exception;
  public abstract void channelActive();
  public abstract void exceptionCaught(Throwable cause);
  public abstract void channelInactive();
}

MessageHandler中定義的各個方法的做用分別爲:

  • handle:用於對接收到的單個消息進行處理;
  • channelActive:當channel激活時調用;
  • exceptionCaught:當捕獲到channel發生異常時調用;
  • channelInactive:當channel非激活時調用;

Spark中MessageHandler類的繼承體系如圖4所示。

圖4       MessageHandler類的繼承體系

5.二、Message的繼承體系

         根據代碼清單15,咱們知道MessageHandler同時也是一個Java泛型類,其子類能處理的消息都派生自接口Message。Message的定義見代碼清單16。

代碼清單16         Message的定義

public interface Message extends Encodable {
  Type type();
  ManagedBuffer body();
  boolean isBodyInFrame();

Message中定義的三個接口方法的做用分別爲:

  • type:返回消息的類型;
  • body:返回消息中可選的內容體;
  • isBodyInFrame:用於判斷消息的主體是否包含在消息的同一幀中。

Message接口繼承了Encodable接口,Encodable的定義見代碼清單17。

代碼清單17         Encodable的定義

public interface Encodable {
  int encodedLength();
  void encode(ByteBuf buf);
}

實現Encodable接口的類將能夠轉換到一個ByteBuf中,多個對象將被存儲到預先分配的單個ByteBuf,因此這裏的encodedLength用於返回轉換的對象數量。下面一塊兒來看看Message的類繼承體系,如圖5所示。

圖5       Message的類繼承體系

從圖5看到最終的消息實現類都直接或間接的實現了RequestMessage或ResponseMessage接口,其中RequestMessage的具體實現有四種,分別是:

  • ChunkFetchRequest:請求獲取流的單個塊的序列。
  • RpcRequest:此消息類型由遠程的RPC服務端進行處理,是一種須要服務端向客戶端回覆的RPC請求信息類型。
  • OneWayMessage:此消息也須要由遠程的RPC服務端進行處理,與RpcRequest不一樣的是不須要服務端向客戶端回覆。
  • StreamRequest:此消息表示向遠程的服務發起請求,以獲取流式數據。

因爲OneWayMessage 不須要響應,因此ResponseMessage的對於成功或失敗狀態的實現各有三種,分別是:

  • ChunkFetchSuccess:處理ChunkFetchRequest成功後返回的消息;
  • ChunkFetchFailure:處理ChunkFetchRequest失敗後返回的消息;
  • RpcResponse:處理RpcRequest成功後返回的消息;
  • RpcFailure:處理RpcRequest失敗後返回的消息;
  • StreamResponse:處理StreamRequest成功後返回的消息;
  • StreamFailure:處理StreamRequest失敗後返回的消息;

5.三、ManagedBuffer的繼承體系

         回頭再看看代碼清單16中對body接口的定義,能夠看到其返回內容體的類型爲ManagedBuffer。ManagedBuffer提供了由字節構成數據的不可變視圖(也就是說ManagedBuffer並不存儲數據,也不是數據的實際來源,這同關係型數據庫的視圖相似)。咱們先來看看抽象類ManagedBuffer中對行爲的定義,見代碼清單18。

代碼清單18         ManagedBuffer的定義

public abstract class ManagedBuffer {
  public abstract long size();
  public abstract ByteBuffer nioByteBuffer() throws IOException;
  public abstract InputStream createInputStream() throws IOException;
  public abstract ManagedBuffer retain();
  public abstract ManagedBuffer release();
  public abstract Object convertToNetty() throws IOException;
}

ManagedBuffer中定義了六個方法,分別爲:

  • size:返回數據的字節數。
  • nioByteBuffer:將數據按照Nio的ByteBuffer類型返回。
  • createInputStream:將數據按照InputStream返回。
  • retain:當有新的使用者使用此視圖時,增長引用此視圖的引用數。
  • release:當有使用者再也不使用此視圖時,減小引用此視圖的引用數。當引用數爲0時釋放緩衝區。
  • convertToNetty:將緩衝區的數據轉換爲Netty的對象,用來將數據寫到外部。此方法返回的數據類型要麼是io.netty.buffer.ByteBuf,要麼是io.netty.channel.FileRegion。

ManagedBuffer的具體實現有不少,咱們能夠經過圖6來了解。

圖6       ManagedBuffer的繼承體系

圖6中列出了ManagedBuffer的五個實現類,其中TestManagedBuffer和RecordingManagedBuffer用於測試。NettyManagedBuffer中的緩衝爲io.netty.buffer.ByteBuf,NioManagedBuffer中的緩衝爲java.nio.ByteBuffer。NettyManagedBuffer和NioManagedBuffer的實現都很是簡單,留給讀者自行閱讀。本節挑選FileSegmentManagedBuffer做爲ManagedBuffer具體實現的例子進行介紹。

         FileSegmentManagedBuffer的做用爲獲取一個文件中的一段,它一共有四個由final修飾的屬性,所有都經過FileSegmentManagedBuffer的構造器傳入屬性值,這四個屬性爲:

  • conf:即TransportConf。
  • file:所要讀取的文件。
  • offset:所要讀取文件的偏移量。
  • length:所要讀取的長度。

下面將逐個介紹FileSegmentManagedBuffer對於ManagedBuffer的實現。

  • NIO方式讀取文件。FileSegmentManagedBuffer實現的nioByteBuffer方法見代碼清單19。

代碼清單19         nioByteBuffer方法的實現

  @Override
  public ByteBuffer nioByteBuffer() throws IOException {
    FileChannel channel = null;
    try {
      channel = new RandomAccessFile(file, "r").getChannel();
      if (length < conf.memoryMapBytes()) {
        ByteBuffer buf = ByteBuffer.allocate((int) length);
        channel.position(offset);
        while (buf.remaining() != 0) {
          if (channel.read(buf) == -1) {
            throw new IOException(String.format("Reached EOF before filling buffer\n" +
              "offset=%s\nfile=%s\nbuf.remaining=%s",
              offset, file.getAbsoluteFile(), buf.remaining()));
          }
        }
        buf.flip();
        return buf;
      } else {
        return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
      }
    } catch (IOException e) {
      try {
        if (channel != null) {
          long size = channel.size();
          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
            e);
        }
      } catch (IOException ignored) {
        // ignore
      }
      throw new IOException("Error in opening " + this, e);
    } finally {
      JavaUtils.closeQuietly(channel);
    }
  }

nioByteBuffer的實現仍是很簡單的,主要利用RandomAccessFile獲取FileChannel,而後使用java.nio.ByteBuffer和FileChannel的API將數據寫入緩衝區java.nio.ByteBuffer中。

  • 文件流方式讀取文件。FileSegmentManagedBuffer實現的createInputStream方法見代碼清單20。

代碼清單20         createInputStream的實現

  @Override
  public InputStream createInputStream() throws IOException {
    FileInputStream is = null;
    try {
      is = new FileInputStream(file);
      ByteStreams.skipFully(is, offset);
      return new LimitedInputStream(is, length);
    } catch (IOException e) {
      try {
        if (is != null) {
          long size = file.length();
          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
              e);
        }
      } catch (IOException ignored) {
        // ignore
      } finally {
        JavaUtils.closeQuietly(is);
      }
      throw new IOException("Error in opening " + this, e);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(is);
      throw e;
    }
  }

createInputStream的實現仍是很簡單的,這裏很少做介紹。

  • 將數據轉換爲Netty對象。FileSegmentManagedBuffer實現的convertToNetty方法見代碼清單21。

代碼清單21         convertToNetty的實現

  @Override
  public Object convertToNetty() throws IOException {
    if (conf.lazyFileDescriptor()) {
      return new DefaultFileRegion(file, offset, length);
    } else {
      FileChannel fileChannel = new FileInputStream(file).getChannel();
      return new DefaultFileRegion(fileChannel, offset, length);
    }
  }
  • 其餘方法的實現。其餘方法因爲實現很是簡單,因此這裏就不一一列出了,感興趣的讀者能夠自行查閱。

[1] ChannelInboundHandler接口的實現及原理不屬於本書要分析的內容,感興趣的同窗能夠閱讀Netty的官方文檔或者研究Netty的源碼。

6、服務端RpcHandler詳解

 

         因爲TransportRequestHandler實際是把請求消息交給RpcHandler進一步處理的,因此這裏對RpcHandler首先作個介紹。RpcHandler是一個抽象類,定義了一些RPC處理器的規範,其主要實現見代碼清單22。

代碼清單22         RpcHandler的實現

public abstract class RpcHandler {

  private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();

  public abstract void receive(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback);

  public abstract StreamManager getStreamManager();

  public void receive(TransportClient client, ByteBuffer message) {
    receive(client, message, ONE_WAY_CALLBACK);
  }

  public void channelActive(TransportClient client) { }

  public void channelInactive(TransportClient client) { }

  public void exceptionCaught(Throwable cause, TransportClient client) { }

  private static class OneWayRpcCallback implements RpcResponseCallback {

    private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);

    @Override
    public void onSuccess(ByteBuffer response) {
      logger.warn("Response provided for one-way RPC.");
    }

    @Override
    public void onFailure(Throwable e) {
      logger.error("Error response provided for one-way RPC.", e);
    }

  }

}

代碼清單22中RpcHandler的各個方法的做用以下:

  • receive:這是一個抽象方法,用來接收單一的RPC消息,具體處理邏輯須要子類去實現。receive接收三個參數,分別是TransportClient、ByteBuffer和RpcResponseCallback。RpcResponseCallback用於對請求處理結束後進行回調,不管處理結果是成功仍是失敗,RpcResponseCallback都會被調用一次。RpcResponseCallback的接口定義以下:
public interface RpcResponseCallback {
  void onSuccess(ByteBuffer response);
  void onFailure(Throwable e);
}
  • 重載的receive:只接收TransportClient和ByteBuffer兩個參數,RpcResponseCallback爲默認的ONE_WAY_CALLBACK,其類型爲OneWayRpcCallback,從代碼清單22中OneWayRpcCallback的實現能夠看出其onSuccess和onFailure只是打印日誌,並無針對客戶端作回覆處理。
  • channelActive:當與給定客戶端相關聯的channel處於活動狀態時調用。
  • channelInactive:當與給定客戶端相關聯的channel處於非活動狀態時調用。
  • exceptionCaught:當channel產生異常時調用。
  • getStreamManager:獲取StreamManager,StreamManager能夠從流中獲取單個的塊,所以它也包含着當前正在被TransportClient獲取的流的狀態。

介紹完RpcHandler,如今回到TransportRequestHandler的處理過程。TransportRequestHandler處理以上四種RequestMessage的實現見代碼清單23。

代碼清單23         TransportRequestHandler的handle方法

  @Override
  public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
      processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }

結合代碼清單23,下面逐一詳細分析這四種類型請求的處理過程。

6.一、處理塊獲取請求

         processFetchRequest方法用於處理ChunkFetchRequest類型的消息,其實現見代碼清單24。

代碼清單24         processFetchRequest的實現

  private void processFetchRequest(final ChunkFetchRequest req) {
    if (logger.isTraceEnabled()) {
      logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
        req.streamChunkId);
    }

    ManagedBuffer buf;
    try {
      streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
      streamManager.registerChannel(channel, req.streamChunkId.streamId);
      buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
    } catch (Exception e) {
      logger.error(String.format("Error opening block %s for request from %s",
        req.streamChunkId, getRemoteAddress(channel)), e);
      respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
      return;
    }

    respond(new ChunkFetchSuccess(req.streamChunkId, buf));
  }

代碼清單24中的streamManager是經過調用RpcHandler的getStreamManager方法獲取的StreamManager。processFetchRequest的處理都依託於RpcHandler的StreamManager,其處理步驟以下:

  1. 調用StreamManager的checkAuthorization方法,校驗客戶端是否有權限從給定的流中讀取;
  2. 調用StreamManager的registerChannel方法,將一個流和一條(只能是一條)客戶端的TCP鏈接關聯起來,這能夠保證對於單個的流只會有一個客戶端讀取。流關閉以後就永遠不可以重用了;
  3. 調用StreamManager的getChunk方法,獲取單個的塊(塊被封裝爲ManagedBuffer)。因爲單個的流只能與單個的TCP鏈接相關聯,所以getChunk方法不能爲了某個特殊的流而並行調用;
  4. 將ManagedBuffer和流的塊Id封裝爲ChunkFetchSuccess後,調用respond方法返回給客戶端。

有關StreamManager的具體實現,讀者能夠參考《Spark內核設計的藝術》一書5.3.5節介紹的NettyStreamManager和《Spark內核設計的藝術》一書6.9.2節介紹的NettyBlockRpcServer中的OneForOneStreamManager。

6.二、處理RPC請求

         processRpcRequest方法用於處理RpcRequest類型的消息,其實現見代碼清單25。

代碼清單25         processRpcRequest的實現

  private void processRpcRequest(final RpcRequest req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }

        @Override
        public void onFailure(Throwable e) {
          respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
        }
      });
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    } finally {
      req.body().release();
    }
  }

代碼清單25中將RpcRequest消息的內容體、發送消息的客戶端以及一個RpcResponseCallback類型的匿名內部類做爲參數傳遞給了RpcHandler的receive方法。這就是說真正用於處理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。因爲RpcHandler是抽象類(見代碼清單22),其receive方法也是抽象方法,因此具體的操做將由RpcHandler的實現了receive方法的子類來完成。全部繼承RpcHandler的子類都須要在其receive方法的具體實現中回調RpcResponseCallback的onSuccess(處理成功時)或者onFailure(處理失敗時)方法。從RpcResponseCallback的實現來看,不管處理結果成功仍是失敗,都將調用respond方法對客戶端進行響應。

6.三、處理流請求

         processStreamRequest方法用於處理StreamRequest類型的消息,其實現見代碼清單26。

代碼清單26         processStreamRequest的實現

  private void processStreamRequest(final StreamRequest req) {
    ManagedBuffer buf;
    try {
      buf = streamManager.openStream(req.streamId);// 將獲取到的流數據封裝爲ManagedBuffer
    } catch (Exception e) {
      logger.error(String.format(
        "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
      respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
      return;
    }

    if (buf != null) {
      respond(new StreamResponse(req.streamId, buf.size(), buf));
    } else {
      respond(new StreamFailure(req.streamId, String.format(
        "Stream '%s' was not found.", req.streamId)));
    }
  }

代碼清單26中也使用了RpcHandler的StreamManager,其處理步驟以下:

  1. 調用StreamManager的openStream方法將獲取到的流數據封裝爲ManagedBuffer;
  2. 當成功或失敗時調用respond方法向客戶端響應。

6.四、處理無需回覆的RPC請求

         processOneWayMessage方法用於處理StreamRequest類型的消息,其實現見代碼清單27。

代碼清單27         processOneWayMessage的實現

  private void processOneWayMessage(OneWayMessage req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
    } finally {
      req.body().release();
    }
  }

processOneWayMessage方法的實現processRpcRequest很是類似,區別在於processOneWayMessage調用了代碼清單22中ONE_WAY_CALLBACK的receive方法,於是processOneWayMessage在處理完RPC請求後不會對客戶端做出響應。

         從以上四種處理的分析能夠看出最終的處理都由RpcHandler及其內部組件完成。除了OneWayMessage的消息外,其他三種消息都是最終調用respond方法響應客戶端,其實現見代碼清單28。

代碼清單28         respond的實現

  private void respond(final Encodable result) {
    final SocketAddress remoteAddress = channel.remoteAddress();
    channel.writeAndFlush(result).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            logger.trace("Sent result {} to client {}", result, remoteAddress);
          } else {
            logger.error(String.format("Error sending result %s to %s; closing connection",
              result, remoteAddress), future.cause());
            channel.close();
          }
        }
      }
    );
  }

能夠看到respond方法中實際調用了Channel的writeAndFlush方法[1]來響應客戶端。



[1] Channel的writeAndFlush方法涉及Netty的實現細節及原理,這並非本書所要闡述的內容,有興趣的讀者能夠訪問Netty官網:http://netty.io獲取更多信息。

 

7、服務端引導程序TransportServerBootstrap

       TransportServer的構造器(見代碼清單10)中的bootstraps是TransportServerBootstrap的列表。接口TransportServerBootstrap定義了服務端引導程序的規範,服務端引導程序旨在當客戶端與服務端創建鏈接以後,在服務端持有的客戶端管道上執行的引導程序。TransportServerBootstrap的定義見代碼清單29。

代碼清單29         TransportServerBootstrap的定義 

public interface TransportServerBootstrap {
  RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

TransportServerBootstrap的doBootstrap方法將對服務端的RpcHandler進行代理,接收客戶端的請求。TransportServerBootstrap有SaslServerBootstrap和EncryptionCheckerBootstrap兩個實現類。爲了更清楚的說明TransportServerBootstrap的意義,咱們以SaslServerBootstrap爲例,來說解其實現(見代碼清單30)。

代碼清單30         SaslServerBootstrap的doBootstrap實現

  public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
    return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
  }

根據代碼清單30,咱們知道SaslServerBootstrap的doBootstrap方法實際建立了SaslRpcHandler,SaslRpcHandler負責對管道進行SASL(Simple Authentication and Security Layer)加密。SaslRpcHandler自己也繼承了RpcHandler,因此咱們重點來看其receive方法的實現,見代碼清單31。

代碼清單31         SaslRpcHandler的receive方法

  @Override
  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    if (isComplete) {
      // 將消息傳遞給SaslRpcHandler所代理的下游RpcHandler並返回
      delegate.receive(client, message, callback);
      return;
    }

    ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
    SaslMessage saslMessage;
    try {
      saslMessage = SaslMessage.decode(nettyBuf);// 對客戶端發送的消息進行SASL解密
    } finally {
      nettyBuf.release();
    }

    if (saslServer == null) {
      // 若是saslServer還未建立,則須要建立SparkSaslServer
      client.setClientId(saslMessage.appId);
      saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
        conf.saslServerAlwaysEncrypt());
    }

    byte[] response;
    try {
      response = saslServer.response(JavaUtils.bufferToArray(// 使用saslServer處理已解密的消息
        saslMessage.body().nioByteBuffer()));
    } catch (IOException ioe) {
      throw new RuntimeException(ioe);
    }
    callback.onSuccess(ByteBuffer.wrap(response));

    if (saslServer.isComplete()) {
      logger.debug("SASL authentication successful for channel {}", client);
      isComplete = true;// SASL認證交換已經完成
      if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
        logger.debug("Enabling encryption for channel {}", client);
        // 對管道進行SASL加密
        SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
        saslServer = null;
      } else {
        saslServer.dispose();
        saslServer = null;
      }
    }
  }

根據代碼清單31,SaslRpcHandler處理客戶端消息的步驟以下:

  1. 若是SASL認證交換已經完成(isComplete等於true),則將消息傳遞給SaslRpcHandler所代理的下游RpcHandler並返回。
  2. 若是SASL認證交換未完成(isComplete等於false),則對客戶端發送的消息進行SASL解密。
  3. 若是saslServer還未建立,則須要建立SparkSaslServer。當SaslRpcHandler接收到客戶端的第一條消息時會作此操做。
  4. 使用saslServer處理已解密的消息,並將處理結果經過RpcResponseCallback的回調方法返回給客戶端。
  5. 若是SASL認證交換已經完成,則將isComplete置爲true。
  6. 對管道進行SASL加密。

SaslServerBootstrap是經過SaslRpcHandler對下游RpcHandler進行代理的一種TransportServerBootstrap。EncryptionCheckerBootstrap是另外一種TransportServerBootstrap的實現,它經過將自身加入Netty的管道中實現引導,EncryptionCheckerBootstrap的doBootstrap方法的實現見代碼清單32。

代碼清單32         EncryptionCheckerBootstrap的doBootstrap實現

    @Override
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
      channel.pipeline().addFirst("encryptionChecker", this);
      return rpcHandler;
    }

         在詳細介紹了TransportChannelHandler以後咱們就能夠對圖3-3進行擴展,把TransportRequestHandler、TransportServerBootstrap及RpcHandler的處理流程增長進來,如圖7所示。

圖7       RPC框架服務端處理請求、響應流程圖

有讀者可能會問,圖7中並未見TransportServerBootstrap的身影。根據對TransportServerBootstrap的兩種實現的舉例,咱們知道TransportServerBootstrap將可能存在於圖中任何兩個組件的箭頭連線中間,起到引導、包裝、代理的做用。

8、客戶端TransportClient詳解

         在介紹完服務端RpcHandler對請求消息的處理以後,如今來看看客戶端發送RPC請求的原理。咱們在分析代碼清單13中的createChannelHandler方法時,看到調用了TransportClient的構造器(見代碼清單33),其中TransportResponseHandler的引用將賦給handler屬性。

代碼清單33         TransportClient的構造器 

  public TransportClient(Channel channel, TransportResponseHandler handler) {
    this.channel = Preconditions.checkNotNull(channel);
    this.handler = Preconditions.checkNotNull(handler);
    this.timedOut = false;
  }

TransportClient一共有五個方法用於發送請求,分別爲:

  • fetchChunk:從遠端協商好的流中請求單個塊;
  • stream:使用流的ID,從遠端獲取流數據;
  • sendRpc:向服務端發送RPC的請求,經過At least Once Delivery原則保證請求不會丟失;
  • sendRpcSync:向服務端發送異步的RPC的請求,並根據指定的超時時間等待響應;
  • send:向服務端發送RPC的請求,可是並不指望能獲取響應,於是不能保證投遞的可靠性;

本節只選擇最經常使用的sendRpc和fetchChunk進行分析,其他實現均可以舉一反三。

8.一、發送RPC請求

         sendRpc方法的實現見代碼清單34。

代碼清單34         sendRpc的實現

  public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isTraceEnabled()) {
      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    }
    // 使用UUID生成請求主鍵requestId
    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    handler.addRpcRequest(requestId, callback);// 添加requestId與RpcResponseCallback的引用之間的關係
    // 發送RPC請求
    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", requestId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeRpcRequest(requestId);
            channel.close();
            try {
              callback.onFailure(new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });

    return requestId;
  }

結合代碼清單34,咱們知道sendRpc方法的實現步驟以下:

1)    使用UUID生成請求主鍵requestId;

2)    調用addRpcRequest向handler(特別提醒下讀者這裏的handler不是RpcHandler,而是經過TransportClient構造器傳入的TransportResponseHandler)添加requestId與回調類RpcResponseCallback的引用之間的關係。TransportResponseHandler的addRpcRequest方法(見代碼清單35)將更新最後一次請求的時間爲當前系統時間,而後將requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中。outstandingRpcs專門用於緩存發出的RPC請求信息。

代碼清單35         添加RPC請求到緩存

  public void addRpcRequest(long requestId, RpcResponseCallback callback) {
    updateTimeOfLastRequest();
    outstandingRpcs.put(requestId, callback);
  }

3)    調用Channel的writeAndFlush方法將RPC請求發送出去,這和在代碼清單28中服務端調用的respond方法響應客戶端的同樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。若是發送成功,那麼只會打印requestId、遠端地址及花費時間的日誌,若是發送失敗,除了打印錯誤日誌外,還要調用TransportResponseHandler的removeRpcRequest方法(見代碼清單36)將這次請求從outstandingRpcs緩存中移除。

代碼清單36         從緩存中刪除RPC請求

  public void removeRpcRequest(long requestId) {
    outstandingRpcs.remove(requestId);
  }

請求發送成功後,客戶端將等待接收服務端的響應。根據圖3,返回的消息也會傳遞給TransportChannelHandler的channelRead方法(見代碼清單14),根據以前的分析,消息的分析將最後交給TransportResponseHandler的handle方法來處理。TransportResponseHandler的handle方法分別對圖5中的六種ResponseMessage進行處理,因爲服務端使用processRpcRequest方法(見代碼清單25)處理RpcRequest類型的消息後返回給客戶端的消息爲RpcResponse或RpcFailure,因此咱們來看看客戶端的TransportResponseHandler的handle方法是如何處理RpcResponse和RpcFailure,見代碼清單37。

代碼清單37         RpcResponse和RpcFailure消息的處理

    } else if (message instanceof RpcResponse) {
      RpcResponse resp = (RpcResponse) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);// 獲取RpcResponseCallback
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.body().size());
      } else {
        outstandingRpcs.remove(resp.requestId);
        try {
          listener.onSuccess(resp.body().nioByteBuffer());
        } finally {
          resp.body().release();
        }
      }
    } else if (message instanceof RpcFailure) {
      RpcFailure resp = (RpcFailure) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); // 獲取RpcResponseCallback
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingRpcs.remove(resp.requestId);
        listener.onFailure(new RuntimeException(resp.errorString));
      }

從代碼清單37看到,處理RpcResponse的邏輯爲:

  1. 使用RpcResponse對應的RpcRequest的主鍵requestId,從outstandingRpcs緩存中獲取註冊的RpcResponseCallback,此處的RpcResponseCallback即爲代碼清單34中傳遞給sendRpc方法的RpcResponseCallback;
  2. 移除outstandingRpcs緩存中requestId和RpcResponseCallback的註冊信息;
  3. 調用RpcResponseCallback的onSuccess方法,處理成功響應後的具體邏輯。這裏的RpcResponseCallback須要各個使用TransportClient的sendRpc方法的場景中分別實現;
  4. 最後釋放RpcResponse的body,回收資源。

處理RpcFailure的邏輯爲:

  1. 使用RpcFailure對應的RpcRequest的主鍵requestId,從outstandingRpcs緩存中獲取註冊的RpcResponseCallback,此處的RpcResponseCallback即爲代碼清單34中傳遞給sendRpc方法的RpcResponseCallback;
  2. 移除outstandingRpcs緩存中requestId和RpcResponseCallback的註冊信息;
  3. 調用RpcResponseCallback的onFailure方法,處理失敗響應後的具體邏輯。這裏的RpcResponseCallback須要在使用TransportClient的sendRpc方法時指定或實現。

8.二、發送獲取塊請求

         fetchChunk的實現見代碼清單38。

代碼清單38         fetchChunk的實現

  public void fetchChunk(
      long streamId,
      final int chunkIndex,
      final ChunkReceivedCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isDebugEnabled()) {
      logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
    }

    final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);// 建立StreamChunkId
    // 添加StreamChunkId與ChunkReceivedCallback之間的對應關係
    handler.addFetchRequest(streamChunkId, callback);
    // 發送塊請求
    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", streamChunkId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeFetchRequest(streamChunkId);
            channel.close();
            try {
              callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });
  }

結合代碼清單38,咱們知道fetchChunk方法的實現步驟以下:

1)    使用流的標記streamId和塊的索引chunkIndex建立StreamChunkId;

2)    調用addFetchRequest向handler(特別提醒下讀者這裏的handler不是RpcHandler,而是經過TransportClient構造器傳入的TransportResponseHandler)添加StreamChunkId與回調類ChunkReceivedCallback的引用之間的關係。TransportResponseHandler的addFetchRequest方法(見代碼清單39)將更新最後一次請求的時間爲當前系統時間,而後將StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中。outstandingFetches專門用於緩存發出的塊請求信息。

代碼清單39         添加塊請求到緩存

  public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
    updateTimeOfLastRequest();
    outstandingFetches.put(streamChunkId, callback);
  }

3)    調用Channel的writeAndFlush方法將塊請求發送出去,這和在代碼清單28中服務端調用的respond方法響應客戶端的同樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。若是發送成功,那麼只會打印StreamChunkId、遠端地址及花費時間的日誌,若是發送失敗,除了打印錯誤日誌外,還要調用TransportResponseHandler的removeFetchRequest方法(見代碼清單40)將這次請求從outstandingFetches緩存中移除。

代碼清單40         從緩存中刪除RPC請求

  public void removeRpcRequest(long requestId) {

    outstandingRpcs.remove(requestId);

  }

請求發送成功後,客戶端將等待接收服務端的響應。根據圖3,返回的消息也會傳遞給TransportChannelHandler的channelRead方法(見代碼清單14),根據以前的分析,消息的分析將最後交給TransportResponseHandler的handle方法來處理。TransportResponseHandler的handle方法分別對圖5中的六種處理結果進行處理,因爲服務端使用processFetchRequest方法(見代碼清單24)處理ChunkFetchRequest類型的消息後返回給客戶端的消息爲ChunkFetchSuccess或ChunkFetchFailure,因此咱們來看看客戶端的TransportResponseHandler的handle方法是如何處理ChunkFetchSuccess和ChunkFetchFailure,見代碼清單41。

代碼清單41         ChunkFetchSuccess和ChunkFetchFailure消息的處理

    if (message instanceof ChunkFetchSuccess) {
      ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel));
        resp.body().release();
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
        resp.body().release();
      }
    } else if (message instanceof ChunkFetchFailure) {
      ChunkFetchFailure resp = (ChunkFetchFailure) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
          "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
      }
    }

從代碼清單41看到,處理ChunkFetchSuccess的邏輯爲:

  1. 使用ChunkFetchSuccess對應的StreamChunkId,從outstandingFetches緩存中獲取註冊的ChunkReceivedCallback,此處的ChunkReceivedCallback即爲代碼清單38中傳遞給fetchChunk方法的ChunkReceivedCallback;
  2. 移除outstandingFetches緩存中StreamChunkId和ChunkReceivedCallback的註冊信息;
  3. 調用ChunkReceivedCallback的onSuccess方法,處理成功響應後的具體邏輯。這裏的ChunkReceivedCallback須要各個使用TransportClient的fetchChunk方法的場景中分別實現;
  4. 最後釋放ChunkFetchSuccess的body,回收資源。

處理ChunkFetchFailure的邏輯爲:

  1. 使用ChunkFetchFailure對應的StreamChunkId,從outstandingFetches緩存中獲取註冊的ChunkReceivedCallback,此處的ChunkReceivedCallback即爲代碼清單38中傳遞給fetchChunk方法的ChunkReceivedCallback;
  2. 移除outstandingFetches緩存中StreamChunkId和ChunkReceivedCallback的註冊信息;
  3. 調用ChunkReceivedCallback的onFailure方法,處理失敗響應後的具體邏輯。這裏的ChunkReceivedCallback須要各個使用TransportClient的fetchChunk方法的場景中分別實現。

         在詳細介紹了TransportClient和TransportResponseHandler以後,對於客戶端咱們就能夠擴展圖3,把TransportResponseHandler及TransportClient的處理流程增長進來,如圖8所示。

圖8       客戶端請求、響應流程圖

圖8中的序號①表示調用TransportResponseHandler的addRpcRequest方法(或addFetchRequest方法)將更新最後一次請求的時間爲當前系統時間,而後將requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中(或將StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中)。②表示調用Channel的writeAndFlush方法將RPC請求發送出去。圖中的虛線表示當TransportResponseHandler處理RpcResponse和RpcFailure時將從outstandingRpcs緩存中獲取此請求對應的RpcResponseCallback(或處理ChunkFetchSuccess和ChunkFetchFailure時將從outstandingFetches緩存中獲取StreamChunkId對應的ChunkReceivedCallback),並執行回調。此外,TransportClientBootstrap將可能存在於圖8中任何兩個組件的箭頭連線中間。

 

關於《Spark內核設計的藝術 架構設計與實現》

通過近一年的準備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:

Spark內核設計的藝術

 

紙質版售賣連接以下:

京東:https://item.jd.com/12302500.html

相關文章
相關標籤/搜索