RPC-Thrift(四)

Client

  Thrift客戶端有兩種:同步客戶端和異步客戶端。html

  同步客戶端

    同步客戶端比較簡單,以RPC-Thrift(一)中的的例子爲基礎進行研究源碼,先看一下類圖。apache

    

    TServiceClient:用於以同步方式與TService進行通訊;數組

    Iface接口和Client類都是經過Thrift文件自動生成的代碼。緩存

    TServiceClient

      TServiceClient定義了基礎的向Server發送請求和從Server接收響應的方法。安全

public abstract class TServiceClient {
  public TServiceClient(TProtocol prot) {
    this(prot, prot);
  }
  public TServiceClient(TProtocol iprot, TProtocol oprot) {
    iprot_ = iprot;
    oprot_ = oprot;
  }
  protected TProtocol iprot_;//輸入TProtocol
  protected TProtocol oprot_;//輸出TProtocol
  protected int seqid_;//序列號
  public TProtocol getInputProtocol() {
    return this.iprot_;
  }
  public TProtocol getOutputProtocol() {
    return this.oprot_;
  }
  //向Server發送請求
  protected void sendBase(String methodName, TBase args) throws TException {
    //寫消息頭,seqid_只是簡單的++,非線程安全,接收響應時要進行seqid_的校驗
    oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));
    args.write(oprot_);//寫參數
    oprot_.writeMessageEnd();
    oprot_.getTransport().flush();//發送
  }
  //從Server接收響應
  protected void receiveBase(TBase result, String methodName) throws TException {
    TMessage msg = iprot_.readMessageBegin();//讀消息頭,若沒有數據一直等待,詳見TTransport的實現
    if (msg.type == TMessageType.EXCEPTION) {
      //異常消息經過TApplicationException讀取
      TApplicationException x = TApplicationException.read(iprot_);
      iprot_.readMessageEnd();
      throw x;
    }
    if (msg.seqid != seqid_) {
      //序列號不一致報異常
      throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
    }
    result.read(iprot_);//讀數據,由其result子類實現
    iprot_.readMessageEnd();
  }
}

    Iface

  public interface Iface {
    //thrift中定義的方法
    public ResultCommon sayHello(String paramJson) throws org.apache.thrift.TException;
  }

 

    Client

  public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    //Client工廠類
    public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
      public Factory() {}
      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
        return new Client(prot);
      }
      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
        return new Client(iprot, oprot);
      }
    }
    public Client(org.apache.thrift.protocol.TProtocol prot)
    {
      super(prot, prot);
    }
    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
      super(iprot, oprot);
    }
    //sayHello方法調用入口
    public ResultCommon sayHello(String paramJson) throws org.apache.thrift.TException
    {
      send_sayHello(paramJson);//發送請求
      return recv_sayHello();//接收響應
    }
    //發送請求
    public void send_sayHello(String paramJson) throws org.apache.thrift.TException
    {
      sayHello_args args = new sayHello_args();//組裝參數
      args.setParamJson(paramJson);
      sendBase("sayHello", args);//調用父類的sendBase方法發送請求
    }
    //接收響應
    public ResultCommon recv_sayHello() throws org.apache.thrift.TException
    {
      sayHello_result result = new sayHello_result();
      receiveBase(result, "sayHello");//調用父類的receiveBase方法發送請求
      if (result.isSetSuccess()) {
        return result.success;
      }
      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result");
    }
  }

 

 

  異步客戶端

    異步客戶端實現比較複雜,經過回調實現,先看一個異步客戶端的例子。異步客戶端須要使用TNonblockingSocket,經過AsyncMethodCallback接收服務端的回調。併發

 1 String paramJson = "{\"wewe\":\"111\"}";
 2 TNonblockingSocket tNonblockingSocket = new TNonblockingSocket("127.0.0.1", 8090);//使用非阻塞TNonblockingSocket
 3 TAsyncClientManager tAsyncClientManager = new TAsyncClientManager();
 4 HelloService.AsyncClient asyncClient = new HelloService.AsyncClient.Factory(tAsyncClientManager, new TBinaryProtocol.Factory()).getAsyncClient(tNonblockingSocket);
 5 asyncClient.sayHello(paramJson, new AsyncMethodCallback<HelloService.AsyncClient.sayHello_call>() {
 6     @Override
 7     public void onError(Exception exception) {
 8         //...
 9     }
10     @Override
11     public void onComplete(sayHello_call response) {
12         ResultCommon resultCommon = response.getResult();
13         System.out.println(resultCommon.getDesc());
14     }
15 });

    涉及到的類結構圖以下:異步

    

    TAsyncClient:異步客戶端抽象類,經過Thrift文件生成的AsyncClient需繼承該類;async

    TAsyncClientManager:異步客戶端管理類,包含一個selector線程,用於轉換方法調用對象;ide

    TAsyncMethodCall:封裝了異步方法調用,Thrift文件定義的全部方法都會在AsyncClient中生成對應的繼承於TAsyncMethodCall的內部類(如sayHello_call);源碼分析

    AsyncMethodCallback:接收服務端回調的接口,用戶須要定義實現該接口的類。

 

    TAsyncClient

      TAsyncClient爲異步客戶端提供了公共的屬性和方法。

public abstract class TAsyncClient {
  protected final TProtocolFactory ___protocolFactory;
  protected final TNonblockingTransport ___transport;
  protected final TAsyncClientManager ___manager;//異步客戶端管理類
  protected TAsyncMethodCall ___currentMethod;//異步方法調用
  private Exception ___error;
  private long ___timeout;
  public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) {
    this(protocolFactory, manager, transport, 0);
  }
  public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) {
    this.___protocolFactory = protocolFactory;
    this.___manager = manager;
    this.___transport = transport;
    this.___timeout = timeout;
  }
  public TProtocolFactory getProtocolFactory() {
    return ___protocolFactory;
  }
  public long getTimeout() {
    return ___timeout;
  }
  public boolean hasTimeout() {
    return ___timeout > 0;
  }
  public void setTimeout(long timeout) {
    this.___timeout = timeout;
  }
  //客戶端是否處於異常狀態
  public boolean hasError() {
    return ___error != null;
  }
  public Exception getError() {
    return ___error;
  }
  //檢查是否準備就緒,若是當前Cilent正在執行一個方法或處於error狀態則報異常
  protected void checkReady() {
    if (___currentMethod != null) {
      throw new IllegalStateException("Client is currently executing another method: " + ___currentMethod.getClass().getName());
    }
    if (___error != null) {
      throw new IllegalStateException("Client has an error!", ___error);
    }
  }
  //執行完成時delegate方法會調用該方法,將___currentMethod置爲null
  protected void onComplete() {
    ___currentMethod = null;
  }
  //執行出現異常時delegate方法會調用該方法,
  protected void onError(Exception exception) {
    ___transport.close();//關閉鏈接
    ___currentMethod = null;//將___currentMethod置爲null
    ___error = exception;//異常信息
  }
}

 

    AsyncClient

      AsyncClient類是經過Thrift文件自動生成的,在該類中含有每一個方法的調用入口,而且爲每一個方法生成了一個方法調用類方法名_call,如sayHello_call。sayHello_call實現了父類TAsyncMethodCall的連個抽象方法:write_args和getResult,由於每一個方法的參數和返回值不一樣,因此這兩個方法須要具體子類實現。

  public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
    //AsyncClient工廠類
    public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
      private org.apache.thrift.async.TAsyncClientManager clientManager;
      private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
      public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
        this.clientManager = clientManager;
        this.protocolFactory = protocolFactory;
      }
      public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
        return new AsyncClient(protocolFactory, clientManager, transport);
      }
    }
    public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
      super(protocolFactory, clientManager, transport);
    }
    //sayHello方法調用入口
    public void sayHello(String paramJson, org.apache.thrift.async.AsyncMethodCallback<sayHello_call> resultHandler) throws org.apache.thrift.TException {
      checkReady();//檢查當前Client是否可用
      //建立方法調用實例
      sayHello_call method_call = new sayHello_call(paramJson, resultHandler, this, ___protocolFactory, ___transport);
      this.___currentMethod = method_call;
      //調用TAsyncClientManager的call方法
      ___manager.call(method_call);
    }
    public static class sayHello_call extends org.apache.thrift.async.TAsyncMethodCall {
      private String paramJson;
      public sayHello_call(String paramJson, org.apache.thrift.async.AsyncMethodCallback<sayHello_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
        super(client, protocolFactory, transport, resultHandler, false);
        this.paramJson = paramJson;
      }
      //發送請求
      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("sayHello", org.apache.thrift.protocol.TMessageType.CALL, 0));
        sayHello_args args = new sayHello_args();
        args.setParamJson(paramJson);
        args.write(prot);
        prot.writeMessageEnd();
      }
      //獲取返回結果
      public ResultCommon getResult() throws org.apache.thrift.TException {
        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
          throw new IllegalStateException("Method call not finished!");
        }
        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
        return (new Client(prot)).recv_sayHello();
      }
    }
  }  

 

    TAsyncClientManager

      TAsyncClientManager是異步客戶端管理類,它爲維護了一個待處理的方法調用隊列pendingCalls,並經過SelectThread線程監聽selector事件,當有就緒事件時進行方法調用的處理。

public class TAsyncClientManager {
  private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
  private final SelectThread selectThread;
  //TAsyncMethodCall待處理隊列
  private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new ConcurrentLinkedQueue<TAsyncMethodCall>();
  //初始化TAsyncClientManager,新建selectThread線程並啓動
  public TAsyncClientManager() throws IOException {
    this.selectThread = new SelectThread();
    selectThread.start();
  }
  //方法調用
  public void call(TAsyncMethodCall method) throws TException {
    if (!isRunning()) {
      throw new TException("SelectThread is not running");
    }
    method.prepareMethodCall();//作方法調用前的準備
    pendingCalls.add(method);//加入待處理隊列
    selectThread.getSelector().wakeup();//喚醒selector,很重要,由於首次執行方法調用時select Thread還阻塞在selector.select()上
  }
  public void stop() {
    selectThread.finish();
  }
  public boolean isRunning() {
    return selectThread.isAlive();
  }
  //SelectThread線程類,處理方法調用的核心
  private class SelectThread extends Thread {
    private final Selector selector;
    private volatile boolean running;
    private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator());

    public SelectThread() throws IOException {
      this.selector = SelectorProvider.provider().openSelector();
      this.running = true;
      this.setName("TAsyncClientManager#SelectorThread " + this.getId());
      setDaemon(true);//非守護線程
    }
    public Selector getSelector() {
      return selector;
    }
    public void finish() {
      running = false;
      selector.wakeup();
    }
    public void run() {
      while (running) {
        try {
          try {
            
            if (timeoutWatchSet.size() == 0) {
              //若是超時TAsyncMethodCall監控集合爲空,直接無限期阻塞監聽select()事件。TAsyncClientManager剛初始化時是空的
              selector.select();
            } else {
              //若是超時TAsyncMethodCall監控集合不爲空,則計算Set中第一個元素的超時時間戳是否到期
              long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp();
              long selectTime = nextTimeout - System.currentTimeMillis();
              if (selectTime > 0) {
                //尚未到期,超時監聽select()事件,超過selectTime自動喚醒selector
                selector.select(selectTime);
              } else {
                //已經到期,馬上監聽select()事件,不會阻塞selector
                selector.selectNow();
              }
            }
          } catch (IOException e) {
            LOGGER.error("Caught IOException in TAsyncClientManager!", e);
          }
          //監聽到就緒事件或者selector被喚醒會執行到此處
          transitionMethods();//處理就緒keys
          timeoutMethods();//超時方法調用處理
          startPendingMethods();//處理pending的方法調用
        } catch (Exception exception) {
          LOGGER.error("Ignoring uncaught exception in SelectThread", exception);
        }
      }
    }
    //監聽到就緒事件或者selector被喚醒,若是有就緒的SelectionKey就調用methodCall.transition(key);
    private void transitionMethods() {
      try {
        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
        while (keys.hasNext()) {
          SelectionKey key = keys.next();
          keys.remove();
          if (!key.isValid()) {
            //跳過無效key,方法調用出現異常或key被取消等會致使無效key
            continue;
          }
          TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment();
          //調用methodCall的transition方法,執行相關的動做並將methodCall的狀態轉換爲下一個狀態
          methodCall.transition(key);
          //若是完成或發生錯誤,從timeoutWatchSet刪除該methodCall
          if (methodCall.isFinished() || methodCall.getClient().hasError()) {
            timeoutWatchSet.remove(methodCall);
          }
        }
      } catch (ClosedSelectorException e) {
        LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
      }
    }
    //超時方法調用處理
    private void timeoutMethods() {
      Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator();
      long currentTime = System.currentTimeMillis();
      while (iterator.hasNext()) {
        TAsyncMethodCall methodCall = iterator.next();
        if (currentTime >= methodCall.getTimeoutTimestamp()) {
          //若是超時,從timeoutWatchSet中刪除並調用onError()方法
          iterator.remove();
          methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms."));
        } else {
          //若是沒有超時,說明以後的TAsyncMethodCall也不會超時,跳出循環,由於越早進入timeoutWatchSet的TAsyncMethodCall越先超時。
          break;
        }
      }
    }
    //開始等待的方法調用,循環處理pendingCalls中的methodCall
    private void startPendingMethods() {
      TAsyncMethodCall methodCall;
      while ((methodCall = pendingCalls.poll()) != null) {
        // Catch registration errors. method will catch transition errors and cleanup.
        try {
          //向selector註冊並設置初次狀態
          methodCall.start(selector);
          //若是客戶端指定了超時時間且transition成功,將methodCall加入到timeoutWatchSet
          TAsyncClient client = methodCall.getClient();
          if (client.hasTimeout() && !client.hasError()) {
            timeoutWatchSet.add(methodCall);
          }
        } catch (Exception exception) {
          //異常處理
          LOGGER.warn("Caught exception in TAsyncClientManager!", exception);
          methodCall.onError(exception);
        }
      }
    }
  }
  //TreeSet用的比較器,判斷是不是同一個TAsyncMethodCall實例
  private static class TAsyncMethodCallTimeoutComparator implements Comparator<TAsyncMethodCall> {
    public int compare(TAsyncMethodCall left, TAsyncMethodCall right) {
      if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) {
        return (int)(left.getSequenceId() - right.getSequenceId());
      } else {
        return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp());
      }
    }
  }
}

 

 

    TAsyncMethodCall

      TAsyncMethodCall實現了對方法調用的封裝。一次方法調用過程就是一個TAsyncMethodCall實例的生命週期。TAsyncMethodCall實例在整個生命週期內有如下狀態,正常狀況下的狀態狀態過程爲:CONNECTING -> WRITING_REQUEST_SIZE -> WRITING_REQUEST_BODY -> READING_RESPONSE_SIZE -> READING_RESPONSE_BODY -> RESPONSE_READ,若是任何一個過程當中發生了異常則直接轉換爲ERROR狀態。

  public static enum State {
    CONNECTING,//鏈接狀態
    WRITING_REQUEST_SIZE,//寫請求size
    WRITING_REQUEST_BODY,//寫請求體
    READING_RESPONSE_SIZE,//讀響應size
    READING_RESPONSE_BODY,//讀響應體
    RESPONSE_READ,//讀響應完成
    ERROR;//異常狀態
  }

 

      TAsyncMethodCall的源碼分析以下:

public abstract class TAsyncMethodCall<T> {
  private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
  private static AtomicLong sequenceIdCounter = new AtomicLong(0);//序列號計數器private State state = null;//狀態在start()方法中初始化
  protected final TNonblockingTransport transport;
  private final TProtocolFactory protocolFactory;
  protected final TAsyncClient client;
  private final AsyncMethodCallback<T> callback;//回調實例
  private final boolean isOneway;
  private long sequenceId;//序列號
  
  private ByteBuffer sizeBuffer;//Java NIO概念,frameSize buffer
  private final byte[] sizeBufferArray = new byte[4];//4字節的消息Size字節數組
  private ByteBuffer frameBuffer;//Java NIO概念,frame buffer

  private long startTime = System.currentTimeMillis();

  protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
    this.transport = transport;
    this.callback = callback;
    this.protocolFactory = protocolFactory;
    this.client = client;
    this.isOneway = isOneway;
    this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement();
  }
  protected State getState() {
    return state;
  }
  protected boolean isFinished() {
    return state == State.RESPONSE_READ;
  }
  protected long getStartTime() {
    return startTime;
  }
  protected long getSequenceId() {
    return sequenceId;
  }
  public TAsyncClient getClient() {
    return client;
  }
  public boolean hasTimeout() {
    return client.hasTimeout();
  }
  public long getTimeoutTimestamp() {
    return client.getTimeout() + startTime;
  }
  //將請求寫入protocol,由子類實現
  protected abstract void write_args(TProtocol protocol) throws TException;
  //方法調用前的準備處理,初始化frameBuffer和sizeBuffer
  protected void prepareMethodCall() throws TException {
    //TMemoryBuffer內存緩存傳輸類,繼承了TTransport
    TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE);
    TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);
    write_args(protocol);//將請求寫入protocol

    int length = memoryBuffer.length();
    frameBuffer = ByteBuffer.wrap(memoryBuffer.getArray(), 0, length);

    TFramedTransport.encodeFrameSize(length, sizeBufferArray);
    sizeBuffer = ByteBuffer.wrap(sizeBufferArray);
  }
  //向selector註冊並設置開始狀態,多是鏈接狀態或寫狀態
  void start(Selector sel) throws IOException {
    SelectionKey key;
    if (transport.isOpen()) {
      state = State.WRITING_REQUEST_SIZE;
      key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
    } else {
      state = State.CONNECTING;
      key = transport.registerSelector(sel, SelectionKey.OP_CONNECT);
      //若是是非阻塞鏈接初始化會當即成功,轉換爲寫狀態並修改感興趣事件
      if (transport.startConnect()) {
        registerForFirstWrite(key);
      }
    }
    key.attach(this);//將本methodCall附加在key上
  }
  void registerForFirstWrite(SelectionKey key) throws IOException {
    state = State.WRITING_REQUEST_SIZE;
    key.interestOps(SelectionKey.OP_WRITE);
  }
  protected ByteBuffer getFrameBuffer() {
    return frameBuffer;
  }
  //轉換爲下一個狀態,根據不一樣的狀態作不一樣的處理。該方法只會在selector thread中被調用,不用擔憂併發
  protected void transition(SelectionKey key) {
    // 確保key是有效的
    if (!key.isValid()) {
      key.cancel();
      Exception e = new TTransportException("Selection key not valid!");
      onError(e);
      return;
    }
    try {
      switch (state) {
        case CONNECTING:
          doConnecting(key);//建鏈接
          break;
        case WRITING_REQUEST_SIZE:
          doWritingRequestSize();//寫請求size
          break;
        case WRITING_REQUEST_BODY:
          doWritingRequestBody(key);//寫請求體
          break;
        case READING_RESPONSE_SIZE:
          doReadingResponseSize();//讀響應size
          break;
        case READING_RESPONSE_BODY:
          doReadingResponseBody(key);//讀響應體
          break;
        default: // RESPONSE_READ, ERROR, or bug
          throw new IllegalStateException("Method call in state " + state
              + " but selector called transition method. Seems like a bug...");
      }
    } catch (Exception e) {
      key.cancel();
      key.attach(null);
      onError(e);
    }
  }
  //出現異常時的處理
  protected void onError(Exception e) {
    client.onError(e);//置Client異常信息
    callback.onError(e);//回調異常方法
    state = State.ERROR;//置當前對象爲ERROR狀態
  }
  //讀響應消息體
  private void doReadingResponseBody(SelectionKey key) throws IOException {
    if (transport.read(frameBuffer) < 0) {
      throw new IOException("Read call frame failed");
    }
    if (frameBuffer.remaining() == 0) {
      cleanUpAndFireCallback(key);
    }
  }
  //方法調用完成的處理
  private void cleanUpAndFireCallback(SelectionKey key) {
    state = State.RESPONSE_READ;//狀態轉換爲讀取response完成
    key.interestOps(0);//清空感興趣事件
    key.attach(null);//清理key的附加信息
    client.onComplete();//將client的___currentMethod置爲null
    callback.onComplete((T)this);//回調onComplete方法
  }
  //讀響應size,一樣可能須要多屢次直到把sizeBuffer讀滿
  private void doReadingResponseSize() throws IOException {
    if (transport.read(sizeBuffer) < 0) {
      throw new IOException("Read call frame size failed");
    }
    if (sizeBuffer.remaining() == 0) {
      state = State.READING_RESPONSE_BODY;
      //讀取FrameSize完成,爲frameBuffer分配FrameSize大小的空間用於讀取響應體
      frameBuffer = ByteBuffer.allocate(TFramedTransport.decodeFrameSize(sizeBufferArray));
    }
  }
  //寫請求體
  private void doWritingRequestBody(SelectionKey key) throws IOException {
    if (transport.write(frameBuffer) < 0) {
      throw new IOException("Write call frame failed");
    }
    if (frameBuffer.remaining() == 0) {
      if (isOneway) {
        //若是是單向RPC,此時方法調用已經結束,清理key並進行回調
        cleanUpAndFireCallback(key);
      } else {
        //非單向RPC,狀態轉換爲READING_RESPONSE_SIZE
        state = State.READING_RESPONSE_SIZE;
        //重置sizeBuffer,準備讀取frame size
        sizeBuffer.rewind();
        key.interestOps(SelectionKey.OP_READ);//修改感興趣事件
      }
    }
  }
  //寫請求size到transport,可能會寫屢次直到sizeBuffer.remaining() == 0才轉換狀態
  private void doWritingRequestSize() throws IOException {
    if (transport.write(sizeBuffer) < 0) {
      throw new IOException("Write call frame size failed");
    }
    if (sizeBuffer.remaining() == 0) {
      state = State.WRITING_REQUEST_BODY;
    }
  }
  //創建鏈接
  private void doConnecting(SelectionKey key) throws IOException {
    if (!key.isConnectable() || !transport.finishConnect()) {
      throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
    }
    registerForFirstWrite(key);
  }
}

 

 

  總結

    最後總結一下異步客戶端的處理流程,以下圖所示。

    須要注意的是,一個AsyncClient實例只能同時處理一個方法調用,必須等待前一個方法調用完成後才能使用該AsyncClient實例調用其餘方法,疑問:和同步客戶端相比有什麼優點?不用等返回結果,能夠幹其餘的活?又能幹什麼活呢?若是客戶端使用了鏈接池(也是AsyncClient實例池,一個AsyncClient實例對應一個鏈接),該線程不用等待前一個鏈接進行方法調用的返回結果,就能夠去線程池獲取一個可用的鏈接,使用新的鏈接進行方法調用,而原來的鏈接在收到返回結果後,狀態變爲可用,返回給鏈接池。這樣相對於同步客戶端單個線程串行發送請求的狀況,異步客戶端單個線程進行發送請求的效率會大大提升,須要的線程數變小,可是可能須要的鏈接數會增大,單個請求的響應時間會變長。在線程數是性能瓶頸,或對請求的響應時間要求不高的狀況下,使用異步客戶端比較合適。

 

    

相關文章
相關標籤/搜索