RPC-Thrift(二)

  TTransport

    TTransport負責數據的傳輸,先看類結構圖。數組

      

    阻塞Server使用TServerSocket,它封裝了ServerSocket實例,ServerSocket實例監聽到客戶端的請求會建立一個Socket對象,並將該Socket對象封裝爲一個TSocket對象用於通訊。
緩存

    非阻塞Server使用TNonblockingServerSocket,它封裝了一個ServerSocketChannel實例,ServerSocketChannel實例監聽到客戶端的請求會建立一個SocketChannel對象,並將該對象封裝成一個TNonblockingSocket對象用於以後的通訊。當讀取完客戶端的請求數據後,保存爲本地的一個TTransport對象,而後封裝爲TFramedTransport對象進行處理。異步

    TTransport

      TTransport是客戶端全部Transport的基類。socket

public abstract class TTransport {
  public abstract boolean isOpen();//transport是否打開
  public boolean peek() {//是否還有數據要讀,若是transport打開時還有數據要讀
    return isOpen();
  }
  public abstract void open() throws TTransportException;//打開transport讀寫數據
  public abstract void close();//關閉transport
  //讀取len長度的數據到字節數組buf,off表示開始讀的位置,返回實際讀取的字節數(不必定爲len)
  public abstract int read(byte[] buf, int off, int len) throws TTransportException;
  //確保讀取len長度的數據到字節數組buf,off表示開始讀的位置,經過循環調用read()實現,返回實際讀取的字節數(len)
  public int readAll(byte[] buf, int off, int len)
    throws TTransportException {
    int got = 0;
    int ret = 0;
    while (got < len) {
      ret = read(buf, off+got, len-got);
      if (ret <= 0) {
        throw new TTransportException(
            "Cannot read. Remote side has closed. Tried to read "
                + len
                + " bytes, but only got "
                + got
                + " bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)");
      }
      got += ret;
    }
    return got;
  }
  //將buf中的所有數據寫到output
  public void write(byte[] buf) throws TTransportException {
    write(buf, 0, buf.length);
  }
  //將buf中off位置開始len長度的數據寫到output
  public abstract void write(byte[] buf, int off, int len) throws TTransportException;
  //清空transport緩存中的數據
  public void flush() throws TTransportException {}
  //獲取本地緩存的數據,沒有則返回null
  public byte[] getBuffer() {
    return null;
  }
  //獲取本地緩存的下一個讀取位置,沒有則返回0
  public int getBufferPosition() {
    return 0;
  }
  //獲取本地緩存的字節數,沒有則返回-1
  public int getBytesRemainingInBuffer() {
    return -1;
  }
  //從本地緩存中消費n個字節
  public void consumeBuffer(int len) {}
}

 

    TIOStreamTransport

      TIOStreamTransport是面向流的TTransport的子類,阻塞式,實現了流的操做。ide

public class TIOStreamTransport extends TTransport {
  private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName());
  protected InputStream inputStream_ = null;//輸入流
  protected OutputStream outputStream_ = null;//輸出流
  //一波構造函數
  protected TIOStreamTransport() {}
  public TIOStreamTransport(InputStream is) {
    inputStream_ = is;
  }
  public TIOStreamTransport(OutputStream os) {
    outputStream_ = os;
  }
  public TIOStreamTransport(InputStream is, OutputStream os) {
    inputStream_ = is;
    outputStream_ = os;
  }
  //streams必須在構造時已經被打開,so一直返回true
  public boolean isOpen() {
    return true;
  }
  //streams必須在構造時已經被打開,不須要這個方法
  public void open() throws TTransportException {}
  //關閉流
  public void close() {
    if (inputStream_ != null) {
      try {
        inputStream_.close();
      } catch (IOException iox) {
        LOGGER.warn("Error closing input stream.", iox);
      }
      inputStream_ = null;
    }
    if (outputStream_ != null) {
      try {
        outputStream_.close();
      } catch (IOException iox) {
        LOGGER.warn("Error closing output stream.", iox);
      }
      outputStream_ = null;
    }
  }
  //將輸入流中的指定數據讀取到buf中
  public int read(byte[] buf, int off, int len) throws TTransportException {
    if (inputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
    }
    int bytesRead;
    try {
      bytesRead = inputStream_.read(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
    if (bytesRead < 0) {
      throw new TTransportException(TTransportException.END_OF_FILE);
    }
    return bytesRead;
  }
  //將buf中的數據寫出的輸出流outputStream_
  public void write(byte[] buf, int off, int len) throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
    }
    try {
      outputStream_.write(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }
  //清空輸出流
  public void flush() throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
    }
    try {
      outputStream_.flush();
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }
}

 

    TSocket

      TSocket類繼承自TIOStreamTransport類,實現了對Socket實例的包裝。inputStream_和outputStream_經過Socket初始化。函數

public class TSocket extends TIOStreamTransport {
  private static final Logger LOGGER = LoggerFactory.getLogger(TSocket.class.getName());
  private Socket socket_ = null;//包裝socket_
  private String host_  = null;//遠程host
  private int port_ = 0;//遠程port
  private int timeout_ = 0;//Socket超時時間
  //三個構造函數
  public TSocket(Socket socket) throws TTransportException {
    socket_ = socket;
    try {
      socket_.setSoLinger(false, 0);
      socket_.setTcpNoDelay(true);
    } catch (SocketException sx) {
      LOGGER.warn("Could not configure socket.", sx);
    }

    if (isOpen()) {
      try {
        //初始化inputStream_和outputStream_
        inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
        outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
      } catch (IOException iox) {
        close();
        throw new TTransportException(TTransportException.NOT_OPEN, iox);
      }
    }
  }
  public TSocket(String host, int port) {
    this(host, port, 0);
  }
  public TSocket(String host, int port, int timeout) {
    host_ = host;
    port_ = port;
    timeout_ = timeout;
    initSocket();
  }
  //初始化Socket
  private void initSocket() {
    socket_ = new Socket();
    try {
      socket_.setSoLinger(false, 0);
      socket_.setTcpNoDelay(true);
      socket_.setSoTimeout(timeout_);
    } catch (SocketException sx) {
      LOGGER.error("Could not configure socket.", sx);
    }
  }
  public void setTimeout(int timeout) {
    timeout_ = timeout;
    try {
      socket_.setSoTimeout(timeout);
    } catch (SocketException sx) {
      LOGGER.warn("Could not set socket timeout.", sx);
    }
  }
  public Socket getSocket() {
    if (socket_ == null) {
      initSocket();
    }
    return socket_;
  }
  //檢查socket_是否鏈接
  public boolean isOpen() {
    if (socket_ == null) {
      return false;
    }
    return socket_.isConnected();
  }
  //打開socket鏈接,初始化輸入流和輸出流
  public void open() throws TTransportException {
    if (isOpen()) {
      throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
    }
    if (host_.length() == 0) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
    }
    if (port_ <= 0) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
    }
    if (socket_ == null) {
      initSocket();
    }
    try {
      socket_.connect(new InetSocketAddress(host_, port_), timeout_);
      inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
      outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
    } catch (IOException iox) {
      close();
      throw new TTransportException(TTransportException.NOT_OPEN, iox);
    }
  }
  //關閉socket
  public void close() {
    super.close();
    if (socket_ != null) {
      try {
        socket_.close();
      } catch (IOException iox) {
        LOGGER.warn("Could not close socket.", iox);
      }
      socket_ = null;
    }
  }
}

    TFramedTransport

      TFramedTransport做用是經過message以前的4-byte frame size確保讀到的message時完整的,防止發生粘包拆包的問題。源碼分析

//TFramedTransport做用是經過message以前的4-byte frame size確保讀到的message時完整的,防止發生粘包拆包的問題
public class TFramedTransport extends TTransport {
  protected static final int DEFAULT_MAX_LENGTH = 16384000;//默認的本地緩存最大字節數
  private int maxLength_;//本地緩存最大字節數
  private TTransport transport_ = null;//封裝的transport_,實際經過該對象實現數據的讀取與寫入
  private final TByteArrayOutputStream writeBuffer_ = new TByteArrayOutputStream(1024);//輸出BUffer,將字節數組輸出
  private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);//輸入buffer,用於數據讀取
  //工廠類,用於將一個TTransport實例封裝爲TFramedTransport實例
  public static class Factory extends TTransportFactory {
    private int maxLength_;
    public Factory() {
      maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
    }
    public Factory(int maxLength) {
      maxLength_ = maxLength;
    }
    @Override
    public TTransport getTransport(TTransport base) {
      return new TFramedTransport(base, maxLength_);
    }
  }
  //兩個構造函數
  public TFramedTransport(TTransport transport, int maxLength) {
    transport_ = transport;
    maxLength_ = maxLength;
  }
  public TFramedTransport(TTransport transport) {
    transport_ = transport;
    maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
  }
  //同transport_的三個方法
  public void open() throws TTransportException {
    transport_.open();
  }
  public boolean isOpen() {
    return transport_.isOpen();
  }
  public void close() {
    transport_.close();
  }
  //讀數據,一次請求可能調用屢次
  public int read(byte[] buf, int off, int len) throws TTransportException {
    if (readBuffer_ != null) {
      //在一次客戶端的請求中第一次調用該方法時,確定返回got<0,就能夠進入 readFrame()方法。
      int got = readBuffer_.read(buf, off, len);//readBuffer_已讀完或字節數爲0時 確定返回got<0
      if (got > 0) {
        return got;
      }
    }
    readFrame();//從transport_讀到本地緩存readBuffer_
    return readBuffer_.read(buf, off, len);
  }
  @Override
  public byte[] getBuffer() {
    return readBuffer_.getBuffer();
  }
  @Override
  public int getBufferPosition() {
    return readBuffer_.getBufferPosition();
  }
  @Override
  public int getBytesRemainingInBuffer() {
    return readBuffer_.getBytesRemainingInBuffer();
  }
  @Override
  public void consumeBuffer(int len) {
    readBuffer_.consumeBuffer(len);
  }

  private final byte[] i32buf = new byte[4];

  private void readFrame() throws TTransportException {
    transport_.readAll(i32buf, 0, 4); //讀前4個字節,FrameSize
    int size = decodeFrameSize(i32buf);//因爲發送數據方對FrameSize進行了編碼,經過解碼獲得消息的大小
    //校驗FrameSize是否正確
    if (size < 0) {
      throw new TTransportException("Read a negative frame size (" + size + ")!");
    }
    if (size > maxLength_) {
      throw new TTransportException("Frame size (" + size + ") larger than max length (" + maxLength_ + ")!");
    }
    byte[] buff = new byte[size];
    transport_.readAll(buff, 0, size);//將FrameSize大小的所有數據讀到buff
    readBuffer_.reset(buff);//重置本地緩存
  }
  //write是向本地緩存寫入數據,寫完後,全部的調用方都要對輸出流調用flush進行清空,因此下面必定會進入到flush方法,再經過transport_將本地緩存的數據寫出去
  public void write(byte[] buf, int off, int len) throws TTransportException {
    writeBuffer_.write(buf, off, len);
  }
  @Override
  public void flush() throws TTransportException {
    byte[] buf = writeBuffer_.get();
    int len = writeBuffer_.len();
    writeBuffer_.reset();//清空
    encodeFrameSize(len, i32buf);//對FrameSize進行編碼
    transport_.write(i32buf, 0, 4);//先寫數據大小FrameSize
    transport_.write(buf, 0, len);//在寫真實數據
    transport_.flush();//清空
  }
  //如下兩個方法是對FrameSize進行編解碼,將每一個字節高位都位移到低位組成byte數組
  public static final void encodeFrameSize(final int frameSize, final byte[] buf) {
    buf[0] = (byte)(0xff & (frameSize >> 24));
    buf[1] = (byte)(0xff & (frameSize >> 16));
    buf[2] = (byte)(0xff & (frameSize >> 8));
    buf[3] = (byte)(0xff & (frameSize));
  }
  public static final int decodeFrameSize(final byte[] buf) {
    return 
      ((buf[0] & 0xff) << 24) |
      ((buf[1] & 0xff) << 16) |
      ((buf[2] & 0xff) <<  8) |
      ((buf[3] & 0xff));
  }
}

       TFramedTransport用到了TMemoryInputTransport類,TMemoryInputTransport封裝了一個字節數組byte[]來作輸入流的封裝。this

public final class TMemoryInputTransport extends TTransport {
  private byte[] buf_;//保存數據的字節數組
  private int pos_;//可讀數據的開始位置
  private int endPos_;//可讀數據的的結束位置
  public TMemoryInputTransport() {
  }
  public TMemoryInputTransport(byte[] buf) {
    reset(buf);
  }
  public TMemoryInputTransport(byte[] buf, int offset, int length) {
    reset(buf, offset, length);
  }
  //重置buf
  public void reset(byte[] buf) {
    reset(buf, 0, buf.length);
  }
  public void reset(byte[] buf, int offset, int length) {
    buf_ = buf;
    pos_ = offset;
    endPos_ = offset + length;
  }
  public void clear() {
    buf_ = null;
  }
  @Override
  public void close() {}
  @Override
  public boolean isOpen() {
    return true;
  }
  @Override
  public void open() throws TTransportException {}
  @Override
  public int read(byte[] buf, int off, int len) throws TTransportException {
    int bytesRemaining = getBytesRemainingInBuffer();//獲取剩餘可讀的數據大小
    int amtToRead = (len > bytesRemaining ? bytesRemaining : len);
    if (amtToRead > 0) {
      System.arraycopy(buf_, pos_, buf, off, amtToRead);//將buf_中pos_開始的amtToRead個字節copy到buf中
      consumeBuffer(amtToRead);//將可讀數據的開始位置增長amtToRead
    }
    return amtToRead;
  }
  //不支持寫
  @Override
  public void write(byte[] buf, int off, int len) throws TTransportException {
    throw new UnsupportedOperationException("No writing allowed!");
  }
  @Override
  public byte[] getBuffer() {
    return buf_;
  }
  public int getBufferPosition() {
    return pos_;
  }
  //剩餘可讀的數據大小
  public int getBytesRemainingInBuffer() {
    return endPos_ - pos_;
  }
  //將可讀數據的開始位置向後移len
  public void consumeBuffer(int len) {
    pos_ += len;
  }
}

 

    TNonblockingTransport    

      TNonblockingTransport是TTransport的非阻塞抽象子類。編碼

public abstract class TNonblockingTransport extends TTransport {
  //鏈接初始化,參考SocketChannel.connect
  public abstract boolean startConnect() throws IOException;
  //鏈接完成,SocketChannel.finishConnect()
  public abstract boolean finishConnect() throws IOException;
  //註冊到selector
  public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
  //讀數據到buffer中
  public abstract int read(ByteBuffer buffer) throws IOException;
  //將buffer中的數據寫出
  public abstract int write(ByteBuffer buffer) throws IOException;
}

 

    TNonblockingSocket

      TNonblockingSocket是TNonblockingTransport的子類,是非阻塞Socket的實現,用於異步客戶端。spa

public class TNonblockingSocket extends TNonblockingTransport {
  private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName());
  private final SocketAddress socketAddress_;//Host and port info,用於非阻塞鏈接懶加載
  private final SocketChannel socketChannel_;//Java NIO中實現非阻塞讀寫的核心類
  //一波構造函數
  public TNonblockingSocket(String host, int port) throws IOException {
    this(host, port, 0);
  }
  public TNonblockingSocket(String host, int port, int timeout) throws IOException {
    this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
  }
  public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
    this(socketChannel, 0, null);
    if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
  }
  private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
      throws IOException {
    socketChannel_ = socketChannel;
    socketAddress_ = socketAddress;
    socketChannel.configureBlocking(false);//設置socketChannel爲非阻塞
    Socket socket = socketChannel.socket();
    socket.setSoLinger(false, 0);
    socket.setTcpNoDelay(true);
    setTimeout(timeout);
  }
  //將SocketChannel註冊到selector上的感興趣事件,當感興趣事件就緒時會收到notify
  public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
    return socketChannel_.register(selector, interests);
  }
  public void setTimeout(int timeout) {
    try {
      socketChannel_.socket().setSoTimeout(timeout);
    } catch (SocketException sx) {
      LOGGER.warn("Could not set socket timeout.", sx);
    }
  }
  public SocketChannel getSocketChannel() {
    return socketChannel_;
  }
  //檢查是否處於鏈接狀態
  public boolean isOpen() {
    // isConnected() does not return false after close(), but isOpen() does
    return socketChannel_.isOpen() && socketChannel_.isConnected();
  }
  //不要調用該方法,該實現類提供了本身的懶加載方法startConnect()用於打開鏈接
  public void open() throws TTransportException {
    throw new RuntimeException("open() is not implemented for TNonblockingSocket");
  }
  //讀數據到buffer中
  public int read(ByteBuffer buffer) throws IOException {
    return socketChannel_.read(buffer);
  }
  //讀指定數據到buf中,經過socketChannel_實現
  public int read(byte[] buf, int off, int len) throws TTransportException {
    if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
      throw new TTransportException(TTransportException.NOT_OPEN,
        "Cannot read from write-only socket channel");
    }
    try {
      return socketChannel_.read(ByteBuffer.wrap(buf, off, len));
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }
  //將buffer中的數據寫出
  public int write(ByteBuffer buffer) throws IOException {
    return socketChannel_.write(buffer);
  }
  //將buffer中的指定數據寫出
  public void write(byte[] buf, int off, int len) throws TTransportException {
    if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
      throw new TTransportException(TTransportException.NOT_OPEN,
        "Cannot write to write-only socket channel");
    }
    try {
      socketChannel_.write(ByteBuffer.wrap(buf, off, len));
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }
  //socketChannel_不支持
  public void flush() throws TTransportException {
  }
  //關閉socket
  public void close() {
    try {
      socketChannel_.close();
    } catch (IOException iox) {
      LOGGER.warn("Could not close socket.", iox);
    }
  }
  //開始初始化
  public boolean startConnect() throws IOException {
    return socketChannel_.connect(socketAddress_);
  }
  //是否完成鏈接
  public boolean finishConnect() throws IOException {
    return socketChannel_.finishConnect();
  }
}

    TServerTransport

      服務端Transport層共同的父類,主要包括開啓監聽和接收客戶端鏈接請求兩個方法。

public abstract class TServerTransport {
  //開啓監聽客戶端鏈接
  public abstract void listen() throws TTransportException;
  //鏈接請求到達後,建立transport實例
  public final TTransport accept() throws TTransportException {
    TTransport transport = acceptImpl();//具體方法由子類實現
    if (transport == null) {
      throw new TTransportException("accept() may not return NULL");
    }
    return transport;
  }
  //關閉監聽
  public abstract void close();
  protected abstract TTransport acceptImpl() throws TTransportException;
  public void interrupt() {}
}

 

     TServerSocket

      阻塞服務時使用,TServerSocket對ServerSocket類進行包裝,具體實現由TServerSocket完成,代碼相對比較簡單。

public class TServerSocket extends TServerTransport {
  private static final Logger LOGGER = LoggerFactory.getLogger(TServerSocket.class.getName());
  private ServerSocket serverSocket_ = null;//基於ServerSocket實現
  private int clientTimeout_ = 0;//接收Client鏈接請求的超時時間
  public TServerSocket(ServerSocket serverSocket) {
    this(serverSocket, 0);
  }
  //幾個構造函數略過。。。建立TServerSocket實例
  public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
    clientTimeout_ = clientTimeout;
    try {
      serverSocket_ = new ServerSocket();
      serverSocket_.setReuseAddress(true);
      serverSocket_.bind(bindAddr);// Bind to listening port
    } catch (IOException ioe) {
      serverSocket_ = null;
      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
    }
  }
  public void listen() throws TTransportException {
    if (serverSocket_ != null) {
      try {
        serverSocket_.setSoTimeout(0);//等待客戶端鏈接的超時時間,0表示無限超時
      } catch (SocketException sx) {
        LOGGER.error("Could not set socket timeout.", sx);
      }
    }
  }
  //accept客戶端鏈接並封裝爲TSocket
  protected TSocket acceptImpl() throws TTransportException {
    if (serverSocket_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
    }
    try {
      Socket result = serverSocket_.accept();
      TSocket result2 = new TSocket(result);
      result2.setTimeout(clientTimeout_);
      return result2;
    } catch (IOException iox) {
      throw new TTransportException(iox);
    }
  }
  //關閉serverSocket_
  public void close() {
    if (serverSocket_ != null) {
      try {
        serverSocket_.close();
      } catch (IOException iox) {
        LOGGER.warn("Could not close server socket.", iox);
      }
      serverSocket_ = null;
    }
  }
  public void interrupt() {
    close();
  }
  public ServerSocket getServerSocket() {
    return serverSocket_;
  }
}

 

    TNonblockingServerTransport

      TNonblockingServerTransport是非阻塞實現的抽象基類,定義了一個向Selector註冊對象的抽象方法。

public abstract class TNonblockingServerTransport extends TServerTransport {
  public abstract void registerSelector(Selector selector);
}

 

    TNonblockingServerSocket

      TNonblockingServerSocket是TNonblockingServerTransport的具體實現,對ServerSocketChannel的封裝。

public class TNonblockingServerSocket extends TNonblockingServerTransport {
  private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingServerTransport.class.getName());
  private ServerSocketChannel serverSocketChannel = null;//接收Client請求,Java NIO中的Channel
  private ServerSocket serverSocket_ = null;//serverSocketChannel中的對象
  private int clientTimeout_ = 0;//客戶端鏈接創建超時時間
  //幾個構造函數略過。。。建立TNonblockingServerSocket實例
  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
    clientTimeout_ = clientTimeout;
    try {
      serverSocketChannel = ServerSocketChannel.open();//建立serverSocketChannel實例
      serverSocketChannel.configureBlocking(false);//設置爲非阻塞
      serverSocket_ = serverSocketChannel.socket();//建立serverSocket_實例
      serverSocket_.setReuseAddress(true);
      serverSocket_.bind(bindAddr);//綁定監聽端口
    } catch (IOException ioe) {
      serverSocket_ = null;
      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
    }
  }
  //開啓監聽客戶端鏈接
  public void listen() throws TTransportException {
    if (serverSocket_ != null) {
      try {
        serverSocket_.setSoTimeout(0);//等待客戶端鏈接的超時時間,0表示無限超時
      } catch (SocketException sx) {
        sx.printStackTrace();
      }
    }
  }
  //接收客戶端鏈接的具體實現,接收客戶端鏈接並封裝爲TNonblockingSocket返回
  protected TNonblockingSocket acceptImpl() throws TTransportException {
    if (serverSocket_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
    }
    try {
      SocketChannel socketChannel = serverSocketChannel.accept();
      if (socketChannel == null) {
        return null;
      }
      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
      tsocket.setTimeout(clientTimeout_);
      return tsocket;
    } catch (IOException iox) {
      throw new TTransportException(iox);
    }
  }
  //向selector註冊OP_ACCEPT事件,接收新的鏈接
  public void registerSelector(Selector selector) {
    try {
      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    } catch (ClosedChannelException e) {
    }
  }
  //關閉serverSocket_
  public void close() {
    if (serverSocket_ != null) {
      try {
        serverSocket_.close();
      } catch (IOException iox) {
        LOGGER.warn("WARNING: Could not close server socket: " + iox.getMessage());
      }
      serverSocket_ = null;
    }
  }
  public void interrupt() {
    close();
  }
}

 

   總結

    注意介紹了TTransport的實現方式。

 

 

 

 

 

 參考資料

  Thrift源碼系列----2.TTransport層源碼分析

相關文章
相關標籤/搜索