Thrift RPC實戰(二) Thrift 網絡服務模型

TServer類層次體系

TSimpleServer/TThreadPoolServer是阻塞服務模型
TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服務模型(NIO)數據庫

1 TServer抽象類的定義

內部靜態類Args的定義, 用於TServer類用於串聯軟件棧(傳輸層, 協議層, 處理層)服務器

public abstract class TServer {
    protected TProcessorFactory processorFactory_;
    protected TServerTransport serverTransport_;
    protected TTransportFactory inputTransportFactory_;
    protected TTransportFactory outputTransportFactory_;
    protected TProtocolFactory inputProtocolFactory_;
    protected TProtocolFactory outputProtocolFactory_;
    private boolean isServing;
    protected TServerEventHandler eventHandler_;
    protected volatile boolean stopped_ = false;

    protected TServer(TServer.AbstractServerArgs args) {
        this.processorFactory_ = args.processorFactory;
        this.serverTransport_ = args.serverTransport;
        this.inputTransportFactory_ = args.inputTransportFactory;
        this.outputTransportFactory_ = args.outputTransportFactory;
        this.inputProtocolFactory_ = args.inputProtocolFactory;
        this.outputProtocolFactory_ = args.outputProtocolFactory;
    }

    // 啓動服務 由具體的子類實現
    public abstract void serve();

    // 中止服務
    public void stop() {
    }

    public abstract static class AbstractServerArgs<T extends TServer.AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new Factory();
        TProtocolFactory outputProtocolFactory = new Factory();

        public AbstractServerArgs(TServerTransport transport) {
            this.serverTransport = transport;
        }
    }
    
    //內部靜態類Args的定義, 用於TServer類用於串聯軟件棧(傳輸層, 協議層, 處理層)
    public static class Args extends TServer.AbstractServerArgs<TServer.Args> {
        public Args(TServerTransport transport) {
            super(transport);
        }
    }
}

2 TSimpleServer

TSimpleServer的工做模式採用最簡單的阻塞IO,實現方法簡潔明瞭,便於理解,可是一次只能接收和處理一個socket鏈接,效率比較低,主要用於演示Thrift的工做過程,在實際開發過程當中不多用到它。網絡

serverTransport_.listen();//啓動監聽  即 new ServerSocket(port)
while (!stopped_) {//stopped 初始位false
    TTransport client = null;
    TProcessor processor = null;
    TTransport inputTransport = null;
    TTransport outputTransport = null;
    TProtocol inputProtocol = null;
    TProtocol outputProtocol = null;
    ServerContext connectionContext = null;
    try {
      client = serverTransport_.accept();//接收客戶端 即 ServerSocket.accept();
      if (client != null) {
        processor = processorFactory_.getProcessor(client);
        inputTransport = inputTransportFactory_.getTransport(client);
        outputTransport = outputTransportFactory_.getTransport(client);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
        if (eventHandler_ != null) {
          connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
        }
        while (true) {
          if (eventHandler_ != null) {
            eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
          }
          if(!processor.process(inputProtocol, outputProtocol)) {//處理輸入輸出
            break;
          }
        }
      }
    } catch (TTransportException ttx) {
      // Client died, just move on
    } catch (TException tx) {
      if (!stopped_) {
        LOGGER.error("Thrift error occurred during processing of message.", tx);
      }
    } catch (Exception x) {
      if (!stopped_) {
        LOGGER.error("Error occurred during processing of message.", x);
      }
    }

    if (eventHandler_ != null) {
      eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
    }

    if (inputTransport != null) {
      inputTransport.close();
    }

    if (outputTransport != null) {
      outputTransport.close();
    }

  }
  setServing(false);
}

3 ThreadPoolServer

TThreadPoolServer模式採用阻塞socket方式工做,,主線程負責阻塞式監聽「監聽socket」中是否有新socket到來,業務處理交由一個線程池來處併發

工做模式圖:負載均衡

主線程循環簡單描述代碼:socket

// 建立線程池執行器
private static ExecutorService createDefaultExecutorService(Args args) {
  SynchronousQueue<Runnable> executorQueue =
    new SynchronousQueue<Runnable>();
  return new ThreadPoolExecutor(args.minWorkerThreads,
                                args.maxWorkerThreads,
                                args.stopTimeoutVal,
                                args.stopTimeoutUnit,
                                executorQueue);
}
stopped_ = false;
serverTransport_.listen();
while (!stopped_) {
  try {
    TTransport client = serverTransport_.accept();
    WorkerProcess wp = new WorkerProcess(client);
    while(true) {
      try {
        executorService_.execute(wp);//ThreadPoolExecutor.execute(wp)
        break;
      } catch(Throwable t) { 
    }
  } 
}

executorService_.shutdown();

TThreadPoolServer模式優勢:
線程池模式中,拆分了監聽線程(accept)和處理客戶端鏈接的工做線程(worker),數據讀取和業務處理都交由線程池完成,主線程只負責監聽新鏈接,所以在併發量較大時新鏈接也可以被及時接受。線程池模式比較適合服務器端能預知最多有多少個客戶端併發的狀況,這時每一個請求都能被業務線程池及時處理,性能也很是高。
TThreadPoolServer模式缺點:
線程池模式的處理能力受限於線程池的工做能力,當併發請求數大於線程池中的線程數時,新請求也只能排隊等待。async

4 TNonblockingServer

TNonblockingServer該模式也是單線程工做,可是採用NIO的模式, 藉助Channel/Selector機制, 採用IO事件模型來處理.ide

全部的socket都被註冊到selector中,在一個線程中經過seletor循環監控全部的socket,每次selector結束時,處理全部的處於就緒狀態的socket,對於有數據到來的socket進行數據讀取操做,對於有數據發送的socket則進行數據發送,對於監聽socket則產生一個新業務socket並將其註冊到selector中。函數

工做原理圖:性能

public void run() {
  try {
  
    while (!stopped_) {
      select();
      processInterestChanges();
    }
   
  } catch (Throwable t) {
    
  } 
}
private void select() {
  try {
    // wait for io events.
    selector.select();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }

      // if the key is marked Accept, then it has to be the server
      // transport.
      if (key.isAcceptable()) {
        handleAccept();
      } else if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

TNonblockingServer模式優勢:
相比於TSimpleServer效率提高主要體如今IO多路複用上,TNonblockingServer採用非阻塞IO,對accept/read/write等IO事件進行監控和處理,同時監控多個socket的狀態變化;
TNonblockingServer模式缺點:
TNonblockingServer模式在業務處理上仍是採用單線程順序來完成,在業務處理比較複雜、耗時的時候,例如某些接口函數須要讀取數據庫執行時間較長,會致使整個服務被阻塞住,此時該模式效率也不高,由於多個調用請求任務依然是順序一個接一個執行。

5 THsHaServer

鑑於TNonblockingServer的缺點, THsHaServer繼承TNonblockingServer,引入了線程池去處理, 其模型把讀寫任務放到線程池去處理.THsHaServer是Half-sync/Half-async的處理模式, Half-aysnc是在處理IO事件上(accept/read/write io), Half-sync用於handler對rpc的同步處理上.

/**
 * Helper to create an invoker pool
 */
protected static ExecutorService createInvokerPool(Args options) {
  int minWorkerThreads = options.minWorkerThreads;
  int maxWorkerThreads = options.maxWorkerThreads;
  int stopTimeoutVal = options.stopTimeoutVal;
  TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

  LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
  ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
    maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

  return invoker;
}

THsHaServer的優勢:
與TNonblockingServer模式相比,THsHaServer在完成數據讀取以後,將業務處理過程交由一個線程池來完成,主線程直接返回進行下一次循環操做,效率大大提高;
THsHaServer的缺點:
主線程須要完成對全部socket的監聽以及數據讀寫的工做,當併發請求數較大時,且發送數據量較多時,監聽socket上新鏈接請求不能被及時接受。

6. TThreadedSelectorServer

TThreadedSelectorServer是對以上NonblockingServer的擴充, 其分離了Accept和Read/Write的Selector線程, 同時引入Worker工做線程池. 它也是種Half-sync/Half-async的服務模型

TThreadedSelectorServer模式是目前Thrift提供的最高級的模式,它內部有若是幾個部分構成:
(1) 一個AcceptThread線程對象,專門用於處理監聽socket上的新鏈接;
(2) 若干個SelectorThread對象專門用於處理業務socket的網絡I/O操做,全部網絡數據的讀寫均是有這些線程來完成;
(3) 一個負載均衡器SelectorThreadLoadBalancer對象,主要用於AcceptThread線程接收到一個新socket鏈接請求時,決定將這個新鏈接請求分配給哪一個SelectorThread線程。
(4) 一個ExecutorService類型的工做線程池,在SelectorThread線程中,監聽到有業務socket中有調用請求過來,則將請求讀取以後,交個ExecutorService線程池中的線程完成這次調用的具體執行;主要用於處理每一個rpc請求的handler回調處理(這部分是同步的).


TThreadedSelectorServer模式中有一個專門的線程AcceptThread用於處理新鏈接請求,所以可以及時響應大量併發鏈接請求;另外它將網絡I/O操做分散到多個SelectorThread線程中來完成,所以可以快速對網絡I/O進行讀寫操做,可以很好地應對網絡I/O較多的狀況

從accpect線程到selectorThreads關鍵代碼

@Override
protected boolean startThreads() {
  try {
    for (int i = 0; i < args.selectorThreads; ++i) {
      selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
    }
    acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
      createSelectorThreadLoadBalancer(selectorThreads));//建立負載均衡器
    for (SelectorThread thread : selectorThreads) {
      thread.start();
    }
    acceptThread.start();
    return true;
  } catch (IOException e) {
    LOGGER.error("Failed to start threads!", e);
    return false;
  }
}

從SelectorThread線程中,監聽到有業務socket中有調用請求,轉到業務工做線程池關鍵代碼

private void select() {
  try {
    // wait for connect events.
    acceptSelector.select();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        continue;
      }

      if (key.isAcceptable()) {
        handleAccept();
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}
private void handleAccept() {
  final TNonblockingTransport client = doAccept();
  if (client != null) {
    // Pass this connection to a selector thread
    final SelectorThread targetThread = threadChooser.nextThread();

    if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
      doAddAccept(targetThread, client);
    } else {
      // FAIR_ACCEPT
      try {
        invoker.submit(new Runnable() {
          public void run() {
            doAddAccept(targetThread, client);
          }
        });
      } catch (RejectedExecutionException rx) {
        LOGGER.warn("ExecutorService rejected accept registration!", rx);
        // close immediately
        client.close();
      }
    }
  }
}

圖片來源網上

相關文章
相關標籤/搜索