目標:介紹基於Mina的來實現的遠程通訊、介紹dubbo-remoting-mina內的源碼解析。
Apache MINA是一個網絡應用程序框架,可幫助用戶輕鬆開發高性能和高可擴展性的網絡應用程序。它經過Java NIO在各類傳輸(如TCP / IP和UDP / IP)上提供抽象的事件驅動異步API。它一般被稱爲NIO框架庫、客戶端服務器框架庫或者網絡套接字庫。那麼本問就要講解在dubbo項目中,基於mina的API實現服務端和客戶端來完成遠程通信這件事情。java
下面是mina實現的包結構:git
該類繼承了AbstractChannel,是基於mina實現的通道。github
private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class); /** * 通道的key */ private static final String CHANNEL_KEY = MinaChannel.class.getName() + ".CHANNEL"; /** * mina中的一個句柄,表示兩個端點之間的鏈接,與傳輸類型無關 */ private final IoSession session;
該類的屬性除了封裝了一個CHANNEL_KEY之外,還用到了mina中的IoSession,它封裝着一個鏈接所須要的方法,好比得到遠程地址等。segmentfault
static MinaChannel getOrAddChannel(IoSession session, URL url, ChannelHandler handler) { // 若是鏈接session爲空,則返回空 if (session == null) { return null; } // 得到MinaChannel實例 MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY); // 若是不存在,則建立 if (ret == null) { // 建立一個MinaChannel實例 ret = new MinaChannel(session, url, handler); // 若是兩個端點鏈接 if (session.isConnected()) { // 把新建立的MinaChannel添加到session 中 MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY, ret); // 若是屬性的舊值不爲空,則從新設置舊值 if (old != null) { session.setAttribute(CHANNEL_KEY, old); ret = old; } } } return ret; }
該方法是一個得到MinaChannel對象的方法,其中每個MinaChannel都會被放在session的屬性值中。緩存
static void removeChannelIfDisconnected(IoSession session) { if (session != null && !session.isConnected()) { session.removeAttribute(CHANNEL_KEY); } }
該方法是當沒有鏈接時移除該通道,比較簡單。服務器
@Override public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { // 發送消息,返回future WriteFuture future = session.write(message); // 若是已經發送過了 if (sent) { // 得到延遲時間 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 等待timeout的鏈接時間後查看是否發送成功 success = future.join(timeout); } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }
該方法是最關鍵的發送消息,其中調用到了session的write方法,就是mina封裝的發送消息。而且根據返回的WriteFuture對象來判斷是否發送成功。網絡
該類繼承了IoHandlerAdapter,是通道處理器實現類,其中就是mina項目中IoHandler接口的幾個 方法。session
/** * url對象 */ private final URL url; /** * 通道處理器對象 */ private final ChannelHandler handler;
該類有兩個屬性,上述提到的實現IoHandler接口方法都是調用了handler來實現的,我就舉例講一個,其餘的都同樣的寫法:app
@Override public void sessionOpened(IoSession session) throws Exception { // 得到MinaChannel對象 MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler); try { // 調用接連該通道 handler.connected(channel); } finally { // 若是沒有鏈接則移除通道 MinaChannel.removeChannelIfDisconnected(session); } }
該方法在IoHandler中叫作sessionOpened,其實就是鏈接方法,因此調用的是handler.connected。其餘方法也同樣,請自行查看。框架
該類繼承了AbstractClient類,是基於mina實現的客戶端類。
/** * 套接字鏈接集合 */ private static final Map<String, SocketConnector> connectors = new ConcurrentHashMap<String, SocketConnector>(); /** * 鏈接的key */ private String connectorKey; /** * 套接字鏈接者 */ private SocketConnector connector; /** * 一個句柄 */ private volatile IoSession session; // volatile, please copy reference to use
該類中的屬性都跟mina項目中封裝類有關係。
@Override protected void doOpen() throws Throwable { // 用url來做爲key connectorKey = getUrl().toFullString(); // 先從集合中取套接字鏈接 SocketConnector c = connectors.get(connectorKey); if (c != null) { connector = c; // 若是爲空 } else { // set thread pool. 設置線程池 connector = new SocketConnector(Constants.DEFAULT_IO_THREADS, Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker", true))); // config 得到套接字鏈接配置 SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig(); cfg.setThreadModel(ThreadModel.MANUAL); // 啓用TCP_NODELAY cfg.getSessionConfig().setTcpNoDelay(true); // 啓用SO_KEEPALIVE cfg.getSessionConfig().setKeepAlive(true); int timeout = getConnectTimeout(); // 設置鏈接超時時間 cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000); // set codec. // 設置編解碼器 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this))); // 加入集合 connectors.put(connectorKey, connector); } }
該方法是打開客戶端,在mina中用套接字鏈接者connector來表示。其中的操做就是新建一個connector,而且設置相應的屬性,而後加入集合。
@Override protected void doConnect() throws Throwable { // 鏈接服務器 ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this)); long start = System.currentTimeMillis(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); // 用於對線程的阻塞和喚醒 final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock // 加入監聽器 future.addListener(new IoFutureListener() { @Override public void operationComplete(IoFuture future) { try { // 若是已經讀完了 if (future.isReady()) { // 建立得到該鏈接的IoSession實例 IoSession newSession = future.getSession(); try { // Close old channel 關閉舊的session IoSession oldSession = MinaClient.this.session; // copy reference if (oldSession != null) { try { if (logger.isInfoEnabled()) { logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession); } // 關閉鏈接 oldSession.close(); } finally { // 移除通道 MinaChannel.removeChannelIfDisconnected(oldSession); } } } finally { // 若是MinaClient關閉了 if (MinaClient.this.isClosed()) { try { if (logger.isInfoEnabled()) { logger.info("Close new mina channel " + newSession + ", because the client closed."); } // 關閉session newSession.close(); } finally { MinaClient.this.session = null; MinaChannel.removeChannelIfDisconnected(newSession); } } else { // 設置新的session MinaClient.this.session = newSession; } } } } catch (Exception e) { exception.set(e); } finally { // 減小數量,釋放全部等待的線程 finish.countDown(); } } }); try { // 當前線程等待,直到鎖存器倒計數到零,除非線程被中斷,或者指定的等待時間過去 finish.await(getConnectTimeout(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); } Throwable e = exception.get(); if (e != null) { throw e; } }
該方法是客戶端鏈接服務器的實現方法。其中用到了CountDownLatch來表明完成完成事件,它來作一個線程等待,直到1個線程完成上述的動做,也就是鏈接完成結束,才釋放等待的線程。保證每次只有一條線程去鏈接,解決future.awaitUninterruptibly()死鎖問題。
其餘方法請自行查看我寫的註釋。
該類繼承了AbstractServer,是基於mina實現的服務端實現類。
private static final Logger logger = LoggerFactory.getLogger(MinaServer.class); /** * 套接字接收者對象 */ private SocketAcceptor acceptor;
@Override protected void doOpen() throws Throwable { // set thread pool. // 建立套接字接收者對象 acceptor = new SocketAcceptor(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), Executors.newCachedThreadPool(new NamedThreadFactory("MinaServerWorker", true))); // config // 設置配置 SocketAcceptorConfig cfg = (SocketAcceptorConfig) acceptor.getDefaultConfig(); cfg.setThreadModel(ThreadModel.MANUAL); // set codec. 設置編解碼器 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this))); // 開啓服務器 acceptor.bind(getBindAddress(), new MinaHandler(getUrl(), this)); }
該方法是建立服務器,而且打開服務器。關鍵就是調用了acceptor的方法。
@Override protected void doClose() throws Throwable { try { if (acceptor != null) { // 取消綁定,也就是關閉服務器 acceptor.unbind(getBindAddress()); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } }
該方法是關閉服務器,就是調用了acceptor.unbind方法。
@Override public Collection<Channel> getChannels() { // 得到鏈接到該服務器到全部鏈接句柄 Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress()); Collection<Channel> channels = new HashSet<Channel>(); for (IoSession session : sessions) { if (session.isConnected()) { // 每次都用一個鏈接句柄建立一個通道 channels.add(MinaChannel.getOrAddChannel(session, getUrl(), this)); } } return channels; }
該方法是得到全部鏈接該服務器的通道。
@Override public Channel getChannel(InetSocketAddress remoteAddress) { // 得到鏈接到該服務器到全部鏈接句柄 Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress()); // 遍歷全部句柄,找到要找的通道 for (IoSession session : sessions) { if (session.getRemoteAddress().equals(remoteAddress)) { return MinaChannel.getOrAddChannel(session, getUrl(), this); } } return null; }
該方法是得到地址對應的單個通道。
public class MinaTransporter implements Transporter { public static final String NAME = "mina"; @Override public Server bind(URL url, ChannelHandler handler) throws RemotingException { // 建立MinaServer實例 return new MinaServer(url, handler); } @Override public Client connect(URL url, ChannelHandler handler) throws RemotingException { // 建立MinaClient實例 return new MinaClient(url, handler); } }
該類實現了Transporter接口,是基於mina的傳輸層實現。能夠看到,bind和connect方法分別就是建立了MinaServer和MinaClient實例。這裏我建議查看一下《dubbo源碼解析(九)遠程通訊——Transport層》。
該類是基於mina實現的編解碼類,實現了ProtocolCodecFactory。
/** * 編碼對象 */ private final ProtocolEncoder encoder = new InternalEncoder(); /** * 解碼對象 */ private final ProtocolDecoder decoder = new InternalDecoder(); /** * 編解碼器 */ private final Codec2 codec; /** * url對象 */ private final URL url; /** * 通道處理器對象 */ private final ChannelHandler handler; /** * 緩衝區大小 */ private final int bufferSize;
屬性比較好理解,該編解碼器用到了ProtocolEncoder和ProtocolDecoder,而InternalEncoder和InternalDecoder兩個類是該類的內部類,它們實現了ProtocolEncoder和ProtocolDecoder,關鍵的編解碼邏輯在這兩個類中實現。
public MinaCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) { this.codec = codec; this.url = url; this.handler = handler; int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE); // 若是緩存區大小在16字節之內,則設置配置大小,若是不是,則設置8字節的緩衝區大小 this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE; }
private class InternalEncoder implements ProtocolEncoder { @Override public void dispose(IoSession session) throws Exception { } @Override public void encode(IoSession session, Object msg, ProtocolEncoderOutput out) throws Exception { // 動態分配一個1k的緩衝區 ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024); // 得到通道 MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler); try { // 編碼 codec.encode(channel, buffer, msg); } finally { // 檢測是否斷開鏈接,若是斷開,則移除 MinaChannel.removeChannelIfDisconnected(session); } // 寫數據到out中 out.write(ByteBuffer.wrap(buffer.toByteBuffer())); out.flush(); } }
該內部類是編碼類,其中的encode方法中寫到了編碼核心調用的是codec.encode。
private class InternalDecoder implements ProtocolDecoder { private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER; @Override public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { int readable = in.limit(); if (readable <= 0) return; ChannelBuffer frame; // 若是緩衝區還有可讀字節數 if (buffer.readable()) { // 若是緩衝區是DynamicChannelBuffer類型的 if (buffer instanceof DynamicChannelBuffer) { // 往buffer中寫入數據 buffer.writeBytes(in.buf()); frame = buffer; } else { // 緩衝區大小 int size = buffer.readableBytes() + in.remaining(); // 動態分配一個緩衝區 frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize); // buffer的數據把寫到frame frame.writeBytes(buffer, buffer.readableBytes()); // 把流中的數據寫到frame frame.writeBytes(in.buf()); } } else { // 不然是基於Java NIO的ByteBuffer生成的緩衝區 frame = ChannelBuffers.wrappedBuffer(in.buf()); } // 得到通道 Channel channel = MinaChannel.getOrAddChannel(session, url, handler); Object msg; int savedReadIndex; try { do { // 得到讀索引 savedReadIndex = frame.readerIndex(); try { // 解碼 msg = codec.decode(channel, frame); } catch (Exception e) { buffer = ChannelBuffers.EMPTY_BUFFER; throw e; } // 拆包 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { frame.readerIndex(savedReadIndex); break; } else { if (savedReadIndex == frame.readerIndex()) { buffer = ChannelBuffers.EMPTY_BUFFER; throw new Exception("Decode without read data."); } if (msg != null) { // 把數據寫到輸出流裏面 out.write(msg); } } } while (frame.readable()); } finally { // 若是frame還有可讀數據 if (frame.readable()) { //丟棄可讀數據 frame.discardReadBytes(); buffer = frame; } else { buffer = ChannelBuffers.EMPTY_BUFFER; } MinaChannel.removeChannelIfDisconnected(session); } } @Override public void dispose(IoSession session) throws Exception { } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { } }
該內部類是解碼類,其中decode方法中關鍵的是調用了codec.decode,其他的操做是利用緩衝區對數據的沖刷流轉。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了基於mina的來實現的遠程通訊、介紹dubbo-remoting-mina內的源碼解析,關鍵須要對mina有所瞭解。下一篇我會講解基於netty3實現遠程通訊部分。