Dubbo源碼解析(十五)遠程通訊——Mina

遠程通信——Mina

目標:介紹基於Mina的來實現的遠程通訊、介紹dubbo-remoting-mina內的源碼解析。

前言

Apache MINA是一個網絡應用程序框架,可幫助用戶輕鬆開發高性能和高可擴展性的網絡應用程序。它經過Java NIO在各類傳輸(如TCP / IP和UDP / IP)上提供抽象的事件驅動異步API。它一般被稱爲NIO框架庫、客戶端服務器框架庫或者網絡套接字庫。那麼本問就要講解在dubbo項目中,基於mina的API實現服務端和客戶端來完成遠程通信這件事情。java

下面是mina實現的包結構:git

mina包結構

源碼分析

(一)MinaChannel

該類繼承了AbstractChannel,是基於mina實現的通道。github

1.屬性

private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);

/**
 * 通道的key
 */
private static final String CHANNEL_KEY = MinaChannel.class.getName() + ".CHANNEL";

/**
 * mina中的一個句柄,表示兩個端點之間的鏈接,與傳輸類型無關
 */
private final IoSession session;

該類的屬性除了封裝了一個CHANNEL_KEY之外,還用到了mina中的IoSession,它封裝着一個鏈接所須要的方法,好比得到遠程地址等。segmentfault

2.getOrAddChannel

static MinaChannel getOrAddChannel(IoSession session, URL url, ChannelHandler handler) {
    // 若是鏈接session爲空,則返回空
    if (session == null) {
        return null;
    }
    // 得到MinaChannel實例
    MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY);
    // 若是不存在,則建立
    if (ret == null) {
        // 建立一個MinaChannel實例
        ret = new MinaChannel(session, url, handler);
        // 若是兩個端點鏈接
        if (session.isConnected()) {
            // 把新建立的MinaChannel添加到session 中
            MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY, ret);
            // 若是屬性的舊值不爲空,則從新設置舊值
            if (old != null) {
                session.setAttribute(CHANNEL_KEY, old);
                ret = old;
            }
        }
    }
    return ret;
}

該方法是一個得到MinaChannel對象的方法,其中每個MinaChannel都會被放在session的屬性值中。緩存

3.removeChannelIfDisconnected

static void removeChannelIfDisconnected(IoSession session) {
    if (session != null && !session.isConnected()) {
        session.removeAttribute(CHANNEL_KEY);
    }
}

該方法是當沒有鏈接時移除該通道,比較簡單。服務器

4.send

@Override
public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 發送消息,返回future
        WriteFuture future = session.write(message);
        // 若是已經發送過了
        if (sent) {
            // 得到延遲時間
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待timeout的鏈接時間後查看是否發送成功
            success = future.join(timeout);
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

該方法是最關鍵的發送消息,其中調用到了session的write方法,就是mina封裝的發送消息。而且根據返回的WriteFuture對象來判斷是否發送成功。網絡

(二)MinaHandler

該類繼承了IoHandlerAdapter,是通道處理器實現類,其中就是mina項目中IoHandler接口的幾個 方法。session

/**
 * url對象
 */
private final URL url;

/**
 * 通道處理器對象
 */
private final ChannelHandler handler;

該類有兩個屬性,上述提到的實現IoHandler接口方法都是調用了handler來實現的,我就舉例講一個,其餘的都同樣的寫法:app

@Override
public void sessionOpened(IoSession session) throws Exception {
    // 得到MinaChannel對象
    MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
    try {
        // 調用接連該通道
        handler.connected(channel);
    } finally {
        // 若是沒有鏈接則移除通道
        MinaChannel.removeChannelIfDisconnected(session);
    }
}

該方法在IoHandler中叫作sessionOpened,其實就是鏈接方法,因此調用的是handler.connected。其餘方法也同樣,請自行查看。框架

(三)MinaClient

該類繼承了AbstractClient類,是基於mina實現的客戶端類。

1.屬性

/**
 * 套接字鏈接集合
 */
private static final Map<String, SocketConnector> connectors = new ConcurrentHashMap<String, SocketConnector>();

/**
 * 鏈接的key
 */
private String connectorKey;
/**
 * 套接字鏈接者
 */
private SocketConnector connector;

/**
 * 一個句柄
 */
private volatile IoSession session; // volatile, please copy reference to use

該類中的屬性都跟mina項目中封裝類有關係。

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // 用url來做爲key
    connectorKey = getUrl().toFullString();
    // 先從集合中取套接字鏈接
    SocketConnector c = connectors.get(connectorKey);
    if (c != null) {
        connector = c;
        // 若是爲空
    } else {
        // set thread pool. 設置線程池
        connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,
                Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker", true)));
        // config 得到套接字鏈接配置
        SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
        cfg.setThreadModel(ThreadModel.MANUAL);
        // 啓用TCP_NODELAY
        cfg.getSessionConfig().setTcpNoDelay(true);
        // 啓用SO_KEEPALIVE
        cfg.getSessionConfig().setKeepAlive(true);
        int timeout = getConnectTimeout();
        // 設置鏈接超時時間
        cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
        // set codec.
        // 設置編解碼器
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
        // 加入集合
        connectors.put(connectorKey, connector);
    }
}

該方法是打開客戶端,在mina中用套接字鏈接者connector來表示。其中的操做就是新建一個connector,而且設置相應的屬性,而後加入集合。

3.doConnect

@Override
protected void doConnect() throws Throwable {
    // 鏈接服務器
    ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this));
    long start = System.currentTimeMillis();
    final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
    // 用於對線程的阻塞和喚醒
    final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
    // 加入監聽器
    future.addListener(new IoFutureListener() {
        @Override
        public void operationComplete(IoFuture future) {
            try {
                // 若是已經讀完了
                if (future.isReady()) {
                    // 建立得到該鏈接的IoSession實例
                    IoSession newSession = future.getSession();
                    try {
                        // Close old channel 關閉舊的session
                        IoSession oldSession = MinaClient.this.session; // copy reference
                        if (oldSession != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession);
                                }
                                // 關閉鏈接
                                oldSession.close();
                            } finally {
                                // 移除通道
                                MinaChannel.removeChannelIfDisconnected(oldSession);
                            }
                        }
                    } finally {
                        // 若是MinaClient關閉了
                        if (MinaClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new mina channel " + newSession + ", because the client closed.");
                                }
                                // 關閉session
                                newSession.close();
                            } finally {
                                MinaClient.this.session = null;
                                MinaChannel.removeChannelIfDisconnected(newSession);
                            }
                        } else {
                            // 設置新的session
                            MinaClient.this.session = newSession;
                        }
                    }
                }
            } catch (Exception e) {
                exception.set(e);
            } finally {
                // 減小數量,釋放全部等待的線程
                finish.countDown();
            }
        }
    });
    try {
        // 當前線程等待,直到鎖存器倒計數到零,除非線程被中斷,或者指定的等待時間過去
        finish.await(getConnectTimeout(), TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout "
                + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
                + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
                + Version.getVersion() + ", cause: " + e.getMessage(), e);
    }
    Throwable e = exception.get();
    if (e != null) {
        throw e;
    }
}

該方法是客戶端鏈接服務器的實現方法。其中用到了CountDownLatch來表明完成完成事件,它來作一個線程等待,直到1個線程完成上述的動做,也就是鏈接完成結束,才釋放等待的線程。保證每次只有一條線程去鏈接,解決future.awaitUninterruptibly()死鎖問題。

其餘方法請自行查看我寫的註釋。

(四)MinaServer

該類繼承了AbstractServer,是基於mina實現的服務端實現類。

1.屬性

private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);

/**
 * 套接字接收者對象
 */
private SocketAcceptor acceptor;

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // set thread pool.
    // 建立套接字接收者對象
    acceptor = new SocketAcceptor(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            Executors.newCachedThreadPool(new NamedThreadFactory("MinaServerWorker",
                    true)));
    // config
    // 設置配置
    SocketAcceptorConfig cfg = (SocketAcceptorConfig) acceptor.getDefaultConfig();
    cfg.setThreadModel(ThreadModel.MANUAL);
    // set codec. 設置編解碼器
    acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));

    // 開啓服務器
    acceptor.bind(getBindAddress(), new MinaHandler(getUrl(), this));
}

該方法是建立服務器,而且打開服務器。關鍵就是調用了acceptor的方法。

3.doClose

@Override
protected void doClose() throws Throwable {
    try {
        if (acceptor != null) {
            // 取消綁定,也就是關閉服務器
            acceptor.unbind(getBindAddress());
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
}

該方法是關閉服務器,就是調用了acceptor.unbind方法。

4.getChannels

@Override
public Collection<Channel> getChannels() {
    // 得到鏈接到該服務器到全部鏈接句柄
    Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
    Collection<Channel> channels = new HashSet<Channel>();
    for (IoSession session : sessions) {
        if (session.isConnected()) {
            // 每次都用一個鏈接句柄建立一個通道
            channels.add(MinaChannel.getOrAddChannel(session, getUrl(), this));
        }
    }
    return channels;
}

該方法是得到全部鏈接該服務器的通道。

5.getChannel

@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
    // 得到鏈接到該服務器到全部鏈接句柄
    Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
    // 遍歷全部句柄,找到要找的通道
    for (IoSession session : sessions) {
        if (session.getRemoteAddress().equals(remoteAddress)) {
            return MinaChannel.getOrAddChannel(session, getUrl(), this);
        }
    }
    return null;
}

該方法是得到地址對應的單個通道。

(五)MinaTransporter

public class MinaTransporter implements Transporter {

    public static final String NAME = "mina";

    @Override
    public Server bind(URL url, ChannelHandler handler) throws RemotingException {
        // 建立MinaServer實例
        return new MinaServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        // 建立MinaClient實例
        return new MinaClient(url, handler);
    }

}

該類實現了Transporter接口,是基於mina的傳輸層實現。能夠看到,bind和connect方法分別就是建立了MinaServer和MinaClient實例。這裏我建議查看一下《dubbo源碼解析(九)遠程通訊——Transport層》。

(六)MinaCodecAdapter

該類是基於mina實現的編解碼類,實現了ProtocolCodecFactory。

1.屬性

/**
 * 編碼對象
 */
private final ProtocolEncoder encoder = new InternalEncoder();

/**
 * 解碼對象
 */
private final ProtocolDecoder decoder = new InternalDecoder();

/**
 * 編解碼器
 */
private final Codec2 codec;

/**
 * url對象
 */
private final URL url;

/**
 * 通道處理器對象
 */
private final ChannelHandler handler;

/**
 * 緩衝區大小
 */
private final int bufferSize;

屬性比較好理解,該編解碼器用到了ProtocolEncoder和ProtocolDecoder,而InternalEncoder和InternalDecoder兩個類是該類的內部類,它們實現了ProtocolEncoder和ProtocolDecoder,關鍵的編解碼邏輯在這兩個類中實現。

2.構造方法

public MinaCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
    this.codec = codec;
    this.url = url;
    this.handler = handler;
    int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
    // 若是緩存區大小在16字節之內,則設置配置大小,若是不是,則設置8字節的緩衝區大小
    this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}

3.InternalEncoder

private class InternalEncoder implements ProtocolEncoder {

    @Override
    public void dispose(IoSession session) throws Exception {
    }

    @Override
    public void encode(IoSession session, Object msg, ProtocolEncoderOutput out) throws Exception {
        // 動態分配一個1k的緩衝區
        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
        // 得到通道
        MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
        try {
            // 編碼
            codec.encode(channel, buffer, msg);
        } finally {
            // 檢測是否斷開鏈接,若是斷開,則移除
            MinaChannel.removeChannelIfDisconnected(session);
        }
        // 寫數據到out中
        out.write(ByteBuffer.wrap(buffer.toByteBuffer()));
        out.flush();
    }
}

該內部類是編碼類,其中的encode方法中寫到了編碼核心調用的是codec.encode。

4.InternalDecoder

private class InternalDecoder implements ProtocolDecoder {

    private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;

    @Override
    public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
        int readable = in.limit();
        if (readable <= 0) return;

        ChannelBuffer frame;

        // 若是緩衝區還有可讀字節數
        if (buffer.readable()) {
            // 若是緩衝區是DynamicChannelBuffer類型的
            if (buffer instanceof DynamicChannelBuffer) {
                // 往buffer中寫入數據
                buffer.writeBytes(in.buf());
                frame = buffer;
            } else {
                // 緩衝區大小
                int size = buffer.readableBytes() + in.remaining();
                // 動態分配一個緩衝區
                frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
                // buffer的數據把寫到frame
                frame.writeBytes(buffer, buffer.readableBytes());
                // 把流中的數據寫到frame
                frame.writeBytes(in.buf());
            }
        } else {
            // 不然是基於Java NIO的ByteBuffer生成的緩衝區
            frame = ChannelBuffers.wrappedBuffer(in.buf());
        }

        // 得到通道
        Channel channel = MinaChannel.getOrAddChannel(session, url, handler);
        Object msg;
        int savedReadIndex;

        try {
            do {
                // 得到讀索引
                savedReadIndex = frame.readerIndex();
                try {
                    // 解碼
                    msg = codec.decode(channel, frame);
                } catch (Exception e) {
                    buffer = ChannelBuffers.EMPTY_BUFFER;
                    throw e;
                }
                // 拆包
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    frame.readerIndex(savedReadIndex);
                    break;
                } else {
                    if (savedReadIndex == frame.readerIndex()) {
                        buffer = ChannelBuffers.EMPTY_BUFFER;
                        throw new Exception("Decode without read data.");
                    }
                    if (msg != null) {
                        // 把數據寫到輸出流裏面
                        out.write(msg);
                    }
                }
            } while (frame.readable());
        } finally {
            // 若是frame還有可讀數據
            if (frame.readable()) {
                //丟棄可讀數據
                frame.discardReadBytes();
                buffer = frame;
            } else {
                buffer = ChannelBuffers.EMPTY_BUFFER;
            }
            MinaChannel.removeChannelIfDisconnected(session);
        }
    }

    @Override
    public void dispose(IoSession session) throws Exception {
    }

    @Override
    public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
    }
}

該內部類是解碼類,其中decode方法中關鍵的是調用了codec.decode,其他的操做是利用緩衝區對數據的沖刷流轉。

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了基於mina的來實現的遠程通訊、介紹dubbo-remoting-mina內的源碼解析,關鍵須要對mina有所瞭解。下一篇我會講解基於netty3實現遠程通訊部分。

相關文章
相關標籤/搜索