TTransport負責數據的傳輸,先看類結構圖。數組
阻塞Server使用TServerSocket,它封裝了ServerSocket實例,ServerSocket實例監聽到客戶端的請求會建立一個Socket對象,並將該Socket對象封裝爲一個TSocket對象用於通訊。
緩存
非阻塞Server使用TNonblockingServerSocket,它封裝了一個ServerSocketChannel實例,ServerSocketChannel實例監聽到客戶端的請求會建立一個SocketChannel對象,並將該對象封裝成一個TNonblockingSocket對象用於以後的通訊。當讀取完客戶端的請求數據後,保存爲本地的一個TTransport對象,而後封裝爲TFramedTransport對象進行處理。異步
TTransport是客戶端全部Transport的基類。socket
public abstract class TTransport { public abstract boolean isOpen();//transport是否打開 public boolean peek() {//是否還有數據要讀,若是transport打開時還有數據要讀 return isOpen(); } public abstract void open() throws TTransportException;//打開transport讀寫數據 public abstract void close();//關閉transport //讀取len長度的數據到字節數組buf,off表示開始讀的位置,返回實際讀取的字節數(不必定爲len) public abstract int read(byte[] buf, int off, int len) throws TTransportException; //確保讀取len長度的數據到字節數組buf,off表示開始讀的位置,經過循環調用read()實現,返回實際讀取的字節數(len) public int readAll(byte[] buf, int off, int len) throws TTransportException { int got = 0; int ret = 0; while (got < len) { ret = read(buf, off+got, len-got); if (ret <= 0) { throw new TTransportException( "Cannot read. Remote side has closed. Tried to read " + len + " bytes, but only got " + got + " bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"); } got += ret; } return got; } //將buf中的所有數據寫到output public void write(byte[] buf) throws TTransportException { write(buf, 0, buf.length); } //將buf中off位置開始len長度的數據寫到output public abstract void write(byte[] buf, int off, int len) throws TTransportException; //清空transport緩存中的數據 public void flush() throws TTransportException {} //獲取本地緩存的數據,沒有則返回null public byte[] getBuffer() { return null; } //獲取本地緩存的下一個讀取位置,沒有則返回0 public int getBufferPosition() { return 0; } //獲取本地緩存的字節數,沒有則返回-1 public int getBytesRemainingInBuffer() { return -1; } //從本地緩存中消費n個字節 public void consumeBuffer(int len) {} }
TIOStreamTransport是面向流的TTransport的子類,阻塞式,實現了流的操做。ide
public class TIOStreamTransport extends TTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName()); protected InputStream inputStream_ = null;//輸入流 protected OutputStream outputStream_ = null;//輸出流 //一波構造函數 protected TIOStreamTransport() {} public TIOStreamTransport(InputStream is) { inputStream_ = is; } public TIOStreamTransport(OutputStream os) { outputStream_ = os; } public TIOStreamTransport(InputStream is, OutputStream os) { inputStream_ = is; outputStream_ = os; } //streams必須在構造時已經被打開,so一直返回true public boolean isOpen() { return true; } //streams必須在構造時已經被打開,不須要這個方法 public void open() throws TTransportException {} //關閉流 public void close() { if (inputStream_ != null) { try { inputStream_.close(); } catch (IOException iox) { LOGGER.warn("Error closing input stream.", iox); } inputStream_ = null; } if (outputStream_ != null) { try { outputStream_.close(); } catch (IOException iox) { LOGGER.warn("Error closing output stream.", iox); } outputStream_ = null; } } //將輸入流中的指定數據讀取到buf中 public int read(byte[] buf, int off, int len) throws TTransportException { if (inputStream_ == null) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream"); } int bytesRead; try { bytesRead = inputStream_.read(buf, off, len); } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } if (bytesRead < 0) { throw new TTransportException(TTransportException.END_OF_FILE); } return bytesRead; } //將buf中的數據寫出的輸出流outputStream_ public void write(byte[] buf, int off, int len) throws TTransportException { if (outputStream_ == null) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream"); } try { outputStream_.write(buf, off, len); } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } } //清空輸出流 public void flush() throws TTransportException { if (outputStream_ == null) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream"); } try { outputStream_.flush(); } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } } }
TSocket類繼承自TIOStreamTransport類,實現了對Socket實例的包裝。inputStream_和outputStream_經過Socket初始化。函數
public class TSocket extends TIOStreamTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TSocket.class.getName()); private Socket socket_ = null;//包裝socket_ private String host_ = null;//遠程host private int port_ = 0;//遠程port private int timeout_ = 0;//Socket超時時間 //三個構造函數 public TSocket(Socket socket) throws TTransportException { socket_ = socket; try { socket_.setSoLinger(false, 0); socket_.setTcpNoDelay(true); } catch (SocketException sx) { LOGGER.warn("Could not configure socket.", sx); } if (isOpen()) { try { //初始化inputStream_和outputStream_ inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); } catch (IOException iox) { close(); throw new TTransportException(TTransportException.NOT_OPEN, iox); } } } public TSocket(String host, int port) { this(host, port, 0); } public TSocket(String host, int port, int timeout) { host_ = host; port_ = port; timeout_ = timeout; initSocket(); } //初始化Socket private void initSocket() { socket_ = new Socket(); try { socket_.setSoLinger(false, 0); socket_.setTcpNoDelay(true); socket_.setSoTimeout(timeout_); } catch (SocketException sx) { LOGGER.error("Could not configure socket.", sx); } } public void setTimeout(int timeout) { timeout_ = timeout; try { socket_.setSoTimeout(timeout); } catch (SocketException sx) { LOGGER.warn("Could not set socket timeout.", sx); } } public Socket getSocket() { if (socket_ == null) { initSocket(); } return socket_; } //檢查socket_是否鏈接 public boolean isOpen() { if (socket_ == null) { return false; } return socket_.isConnected(); } //打開socket鏈接,初始化輸入流和輸出流 public void open() throws TTransportException { if (isOpen()) { throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected."); } if (host_.length() == 0) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host."); } if (port_ <= 0) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port."); } if (socket_ == null) { initSocket(); } try { socket_.connect(new InetSocketAddress(host_, port_), timeout_); inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); } catch (IOException iox) { close(); throw new TTransportException(TTransportException.NOT_OPEN, iox); } } //關閉socket public void close() { super.close(); if (socket_ != null) { try { socket_.close(); } catch (IOException iox) { LOGGER.warn("Could not close socket.", iox); } socket_ = null; } } }
TFramedTransport做用是經過message以前的4-byte frame size確保讀到的message時完整的,防止發生粘包拆包的問題。源碼分析
//TFramedTransport做用是經過message以前的4-byte frame size確保讀到的message時完整的,防止發生粘包拆包的問題 public class TFramedTransport extends TTransport { protected static final int DEFAULT_MAX_LENGTH = 16384000;//默認的本地緩存最大字節數 private int maxLength_;//本地緩存最大字節數 private TTransport transport_ = null;//封裝的transport_,實際經過該對象實現數據的讀取與寫入 private final TByteArrayOutputStream writeBuffer_ = new TByteArrayOutputStream(1024);//輸出BUffer,將字節數組輸出 private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);//輸入buffer,用於數據讀取 //工廠類,用於將一個TTransport實例封裝爲TFramedTransport實例 public static class Factory extends TTransportFactory { private int maxLength_; public Factory() { maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; } public Factory(int maxLength) { maxLength_ = maxLength; } @Override public TTransport getTransport(TTransport base) { return new TFramedTransport(base, maxLength_); } } //兩個構造函數 public TFramedTransport(TTransport transport, int maxLength) { transport_ = transport; maxLength_ = maxLength; } public TFramedTransport(TTransport transport) { transport_ = transport; maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; } //同transport_的三個方法 public void open() throws TTransportException { transport_.open(); } public boolean isOpen() { return transport_.isOpen(); } public void close() { transport_.close(); } //讀數據,一次請求可能調用屢次 public int read(byte[] buf, int off, int len) throws TTransportException { if (readBuffer_ != null) { //在一次客戶端的請求中第一次調用該方法時,確定返回got<0,就能夠進入 readFrame()方法。 int got = readBuffer_.read(buf, off, len);//readBuffer_已讀完或字節數爲0時 確定返回got<0 if (got > 0) { return got; } } readFrame();//從transport_讀到本地緩存readBuffer_ return readBuffer_.read(buf, off, len); } @Override public byte[] getBuffer() { return readBuffer_.getBuffer(); } @Override public int getBufferPosition() { return readBuffer_.getBufferPosition(); } @Override public int getBytesRemainingInBuffer() { return readBuffer_.getBytesRemainingInBuffer(); } @Override public void consumeBuffer(int len) { readBuffer_.consumeBuffer(len); } private final byte[] i32buf = new byte[4]; private void readFrame() throws TTransportException { transport_.readAll(i32buf, 0, 4); //讀前4個字節,FrameSize int size = decodeFrameSize(i32buf);//因爲發送數據方對FrameSize進行了編碼,經過解碼獲得消息的大小 //校驗FrameSize是否正確 if (size < 0) { throw new TTransportException("Read a negative frame size (" + size + ")!"); } if (size > maxLength_) { throw new TTransportException("Frame size (" + size + ") larger than max length (" + maxLength_ + ")!"); } byte[] buff = new byte[size]; transport_.readAll(buff, 0, size);//將FrameSize大小的所有數據讀到buff readBuffer_.reset(buff);//重置本地緩存 } //write是向本地緩存寫入數據,寫完後,全部的調用方都要對輸出流調用flush進行清空,因此下面必定會進入到flush方法,再經過transport_將本地緩存的數據寫出去 public void write(byte[] buf, int off, int len) throws TTransportException { writeBuffer_.write(buf, off, len); } @Override public void flush() throws TTransportException { byte[] buf = writeBuffer_.get(); int len = writeBuffer_.len(); writeBuffer_.reset();//清空 encodeFrameSize(len, i32buf);//對FrameSize進行編碼 transport_.write(i32buf, 0, 4);//先寫數據大小FrameSize transport_.write(buf, 0, len);//在寫真實數據 transport_.flush();//清空 } //如下兩個方法是對FrameSize進行編解碼,將每一個字節高位都位移到低位組成byte數組 public static final void encodeFrameSize(final int frameSize, final byte[] buf) { buf[0] = (byte)(0xff & (frameSize >> 24)); buf[1] = (byte)(0xff & (frameSize >> 16)); buf[2] = (byte)(0xff & (frameSize >> 8)); buf[3] = (byte)(0xff & (frameSize)); } public static final int decodeFrameSize(final byte[] buf) { return ((buf[0] & 0xff) << 24) | ((buf[1] & 0xff) << 16) | ((buf[2] & 0xff) << 8) | ((buf[3] & 0xff)); } }
TFramedTransport用到了TMemoryInputTransport類,TMemoryInputTransport封裝了一個字節數組byte[]來作輸入流的封裝。this
public final class TMemoryInputTransport extends TTransport { private byte[] buf_;//保存數據的字節數組 private int pos_;//可讀數據的開始位置 private int endPos_;//可讀數據的的結束位置 public TMemoryInputTransport() { } public TMemoryInputTransport(byte[] buf) { reset(buf); } public TMemoryInputTransport(byte[] buf, int offset, int length) { reset(buf, offset, length); } //重置buf public void reset(byte[] buf) { reset(buf, 0, buf.length); } public void reset(byte[] buf, int offset, int length) { buf_ = buf; pos_ = offset; endPos_ = offset + length; } public void clear() { buf_ = null; } @Override public void close() {} @Override public boolean isOpen() { return true; } @Override public void open() throws TTransportException {} @Override public int read(byte[] buf, int off, int len) throws TTransportException { int bytesRemaining = getBytesRemainingInBuffer();//獲取剩餘可讀的數據大小 int amtToRead = (len > bytesRemaining ? bytesRemaining : len); if (amtToRead > 0) { System.arraycopy(buf_, pos_, buf, off, amtToRead);//將buf_中pos_開始的amtToRead個字節copy到buf中 consumeBuffer(amtToRead);//將可讀數據的開始位置增長amtToRead } return amtToRead; } //不支持寫 @Override public void write(byte[] buf, int off, int len) throws TTransportException { throw new UnsupportedOperationException("No writing allowed!"); } @Override public byte[] getBuffer() { return buf_; } public int getBufferPosition() { return pos_; } //剩餘可讀的數據大小 public int getBytesRemainingInBuffer() { return endPos_ - pos_; } //將可讀數據的開始位置向後移len public void consumeBuffer(int len) { pos_ += len; } }
TNonblockingTransport是TTransport的非阻塞抽象子類。編碼
public abstract class TNonblockingTransport extends TTransport { //鏈接初始化,參考SocketChannel.connect public abstract boolean startConnect() throws IOException; //鏈接完成,SocketChannel.finishConnect() public abstract boolean finishConnect() throws IOException; //註冊到selector public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException; //讀數據到buffer中 public abstract int read(ByteBuffer buffer) throws IOException; //將buffer中的數據寫出 public abstract int write(ByteBuffer buffer) throws IOException; }
TNonblockingSocket是TNonblockingTransport的子類,是非阻塞Socket的實現,用於異步客戶端。spa
public class TNonblockingSocket extends TNonblockingTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName()); private final SocketAddress socketAddress_;//Host and port info,用於非阻塞鏈接懶加載 private final SocketChannel socketChannel_;//Java NIO中實現非阻塞讀寫的核心類 //一波構造函數 public TNonblockingSocket(String host, int port) throws IOException { this(host, port, 0); } public TNonblockingSocket(String host, int port, int timeout) throws IOException { this(SocketChannel.open(), timeout, new InetSocketAddress(host, port)); } public TNonblockingSocket(SocketChannel socketChannel) throws IOException { this(socketChannel, 0, null); if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected"); } private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress) throws IOException { socketChannel_ = socketChannel; socketAddress_ = socketAddress; socketChannel.configureBlocking(false);//設置socketChannel爲非阻塞 Socket socket = socketChannel.socket(); socket.setSoLinger(false, 0); socket.setTcpNoDelay(true); setTimeout(timeout); } //將SocketChannel註冊到selector上的感興趣事件,當感興趣事件就緒時會收到notify public SelectionKey registerSelector(Selector selector, int interests) throws IOException { return socketChannel_.register(selector, interests); } public void setTimeout(int timeout) { try { socketChannel_.socket().setSoTimeout(timeout); } catch (SocketException sx) { LOGGER.warn("Could not set socket timeout.", sx); } } public SocketChannel getSocketChannel() { return socketChannel_; } //檢查是否處於鏈接狀態 public boolean isOpen() { // isConnected() does not return false after close(), but isOpen() does return socketChannel_.isOpen() && socketChannel_.isConnected(); } //不要調用該方法,該實現類提供了本身的懶加載方法startConnect()用於打開鏈接 public void open() throws TTransportException { throw new RuntimeException("open() is not implemented for TNonblockingSocket"); } //讀數據到buffer中 public int read(ByteBuffer buffer) throws IOException { return socketChannel_.read(buffer); } //讀指定數據到buf中,經過socketChannel_實現 public int read(byte[] buf, int off, int len) throws TTransportException { if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from write-only socket channel"); } try { return socketChannel_.read(ByteBuffer.wrap(buf, off, len)); } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } } //將buffer中的數據寫出 public int write(ByteBuffer buffer) throws IOException { return socketChannel_.write(buffer); } //將buffer中的指定數據寫出 public void write(byte[] buf, int off, int len) throws TTransportException { if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to write-only socket channel"); } try { socketChannel_.write(ByteBuffer.wrap(buf, off, len)); } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } } //socketChannel_不支持 public void flush() throws TTransportException { } //關閉socket public void close() { try { socketChannel_.close(); } catch (IOException iox) { LOGGER.warn("Could not close socket.", iox); } } //開始初始化 public boolean startConnect() throws IOException { return socketChannel_.connect(socketAddress_); } //是否完成鏈接 public boolean finishConnect() throws IOException { return socketChannel_.finishConnect(); } }
服務端Transport層共同的父類,主要包括開啓監聽和接收客戶端鏈接請求兩個方法。
public abstract class TServerTransport { //開啓監聽客戶端鏈接 public abstract void listen() throws TTransportException; //鏈接請求到達後,建立transport實例 public final TTransport accept() throws TTransportException { TTransport transport = acceptImpl();//具體方法由子類實現 if (transport == null) { throw new TTransportException("accept() may not return NULL"); } return transport; } //關閉監聽 public abstract void close(); protected abstract TTransport acceptImpl() throws TTransportException; public void interrupt() {} }
阻塞服務時使用,TServerSocket對ServerSocket類進行包裝,具體實現由TServerSocket完成,代碼相對比較簡單。
public class TServerSocket extends TServerTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TServerSocket.class.getName()); private ServerSocket serverSocket_ = null;//基於ServerSocket實現 private int clientTimeout_ = 0;//接收Client鏈接請求的超時時間 public TServerSocket(ServerSocket serverSocket) { this(serverSocket, 0); } //幾個構造函數略過。。。建立TServerSocket實例 public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException { clientTimeout_ = clientTimeout; try { serverSocket_ = new ServerSocket(); serverSocket_.setReuseAddress(true); serverSocket_.bind(bindAddr);// Bind to listening port } catch (IOException ioe) { serverSocket_ = null; throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + "."); } } public void listen() throws TTransportException { if (serverSocket_ != null) { try { serverSocket_.setSoTimeout(0);//等待客戶端鏈接的超時時間,0表示無限超時 } catch (SocketException sx) { LOGGER.error("Could not set socket timeout.", sx); } } } //accept客戶端鏈接並封裝爲TSocket protected TSocket acceptImpl() throws TTransportException { if (serverSocket_ == null) { throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); } try { Socket result = serverSocket_.accept(); TSocket result2 = new TSocket(result); result2.setTimeout(clientTimeout_); return result2; } catch (IOException iox) { throw new TTransportException(iox); } } //關閉serverSocket_ public void close() { if (serverSocket_ != null) { try { serverSocket_.close(); } catch (IOException iox) { LOGGER.warn("Could not close server socket.", iox); } serverSocket_ = null; } } public void interrupt() { close(); } public ServerSocket getServerSocket() { return serverSocket_; } }
TNonblockingServerTransport是非阻塞實現的抽象基類,定義了一個向Selector註冊對象的抽象方法。
public abstract class TNonblockingServerTransport extends TServerTransport { public abstract void registerSelector(Selector selector); }
TNonblockingServerSocket是TNonblockingServerTransport的具體實現,對ServerSocketChannel的封裝。
public class TNonblockingServerSocket extends TNonblockingServerTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingServerTransport.class.getName()); private ServerSocketChannel serverSocketChannel = null;//接收Client請求,Java NIO中的Channel private ServerSocket serverSocket_ = null;//serverSocketChannel中的對象 private int clientTimeout_ = 0;//客戶端鏈接創建超時時間 //幾個構造函數略過。。。建立TNonblockingServerSocket實例 public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException { clientTimeout_ = clientTimeout; try { serverSocketChannel = ServerSocketChannel.open();//建立serverSocketChannel實例 serverSocketChannel.configureBlocking(false);//設置爲非阻塞 serverSocket_ = serverSocketChannel.socket();//建立serverSocket_實例 serverSocket_.setReuseAddress(true); serverSocket_.bind(bindAddr);//綁定監聽端口 } catch (IOException ioe) { serverSocket_ = null; throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + "."); } } //開啓監聽客戶端鏈接 public void listen() throws TTransportException { if (serverSocket_ != null) { try { serverSocket_.setSoTimeout(0);//等待客戶端鏈接的超時時間,0表示無限超時 } catch (SocketException sx) { sx.printStackTrace(); } } } //接收客戶端鏈接的具體實現,接收客戶端鏈接並封裝爲TNonblockingSocket返回 protected TNonblockingSocket acceptImpl() throws TTransportException { if (serverSocket_ == null) { throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); } try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel == null) { return null; } TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel); tsocket.setTimeout(clientTimeout_); return tsocket; } catch (IOException iox) { throw new TTransportException(iox); } } //向selector註冊OP_ACCEPT事件,接收新的鏈接 public void registerSelector(Selector selector) { try { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { } } //關閉serverSocket_ public void close() { if (serverSocket_ != null) { try { serverSocket_.close(); } catch (IOException iox) { LOGGER.warn("WARNING: Could not close server socket: " + iox.getMessage()); } serverSocket_ = null; } } public void interrupt() { close(); } }
注意介紹了TTransport的實現方式。
參考資料