目標:介紹基於Mina的來實現的遠程通訊、介紹dubbo-remoting-mina內的源碼解析。java
Apache MINA是一個網絡應用程序框架,可幫助用戶輕鬆開發高性能和高可擴展性的網絡應用程序。它經過Java NIO在各類傳輸(如TCP / IP和UDP / IP)上提供抽象的事件驅動異步API。它一般被稱爲NIO框架庫、客戶端服務器框架庫或者網絡套接字庫。那麼本問就要講解在dubbo項目中,基於mina的API實現服務端和客戶端來完成遠程通信這件事情。git
下面是mina實現的包結構:github
該類繼承了AbstractChannel,是基於mina實現的通道。segmentfault
private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);
/** * 通道的key */
private static final String CHANNEL_KEY = MinaChannel.class.getName() + ".CHANNEL";
/** * mina中的一個句柄,表示兩個端點之間的鏈接,與傳輸類型無關 */
private final IoSession session;
複製代碼
該類的屬性除了封裝了一個CHANNEL_KEY之外,還用到了mina中的IoSession,它封裝着一個鏈接所須要的方法,好比得到遠程地址等。緩存
static MinaChannel getOrAddChannel(IoSession session, URL url, ChannelHandler handler) {
// 若是鏈接session爲空,則返回空
if (session == null) {
return null;
}
// 得到MinaChannel實例
MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY);
// 若是不存在,則建立
if (ret == null) {
// 建立一個MinaChannel實例
ret = new MinaChannel(session, url, handler);
// 若是兩個端點鏈接
if (session.isConnected()) {
// 把新建立的MinaChannel添加到session 中
MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY, ret);
// 若是屬性的舊值不爲空,則從新設置舊值
if (old != null) {
session.setAttribute(CHANNEL_KEY, old);
ret = old;
}
}
}
return ret;
}
複製代碼
該方法是一個得到MinaChannel對象的方法,其中每個MinaChannel都會被放在session的屬性值中。服務器
static void removeChannelIfDisconnected(IoSession session) {
if (session != null && !session.isConnected()) {
session.removeAttribute(CHANNEL_KEY);
}
}
複製代碼
該方法是當沒有鏈接時移除該通道,比較簡單。網絡
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 發送消息,返回future
WriteFuture future = session.write(message);
// 若是已經發送過了
if (sent) {
// 得到延遲時間
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 等待timeout的鏈接時間後查看是否發送成功
success = future.join(timeout);
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
複製代碼
該方法是最關鍵的發送消息,其中調用到了session的write方法,就是mina封裝的發送消息。而且根據返回的WriteFuture對象來判斷是否發送成功。session
該類繼承了IoHandlerAdapter,是通道處理器實現類,其中就是mina項目中IoHandler接口的幾個 方法。app
/** * url對象 */
private final URL url;
/** * 通道處理器對象 */
private final ChannelHandler handler;
複製代碼
該類有兩個屬性,上述提到的實現IoHandler接口方法都是調用了handler來實現的,我就舉例講一個,其餘的都同樣的寫法:框架
@Override
public void sessionOpened(IoSession session) throws Exception {
// 得到MinaChannel對象
MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
try {
// 調用接連該通道
handler.connected(channel);
} finally {
// 若是沒有鏈接則移除通道
MinaChannel.removeChannelIfDisconnected(session);
}
}
複製代碼
該方法在IoHandler中叫作sessionOpened,其實就是鏈接方法,因此調用的是handler.connected。其餘方法也同樣,請自行查看。
該類繼承了AbstractClient類,是基於mina實現的客戶端類。
/** * 套接字鏈接集合 */
private static final Map<String, SocketConnector> connectors = new ConcurrentHashMap<String, SocketConnector>();
/** * 鏈接的key */
private String connectorKey;
/** * 套接字鏈接者 */
private SocketConnector connector;
/** * 一個句柄 */
private volatile IoSession session; // volatile, please copy reference to use
複製代碼
該類中的屬性都跟mina項目中封裝類有關係。
@Override
protected void doOpen() throws Throwable {
// 用url來做爲key
connectorKey = getUrl().toFullString();
// 先從集合中取套接字鏈接
SocketConnector c = connectors.get(connectorKey);
if (c != null) {
connector = c;
// 若是爲空
} else {
// set thread pool. 設置線程池
connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,
Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker", true)));
// config 得到套接字鏈接配置
SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
// 啓用TCP_NODELAY
cfg.getSessionConfig().setTcpNoDelay(true);
// 啓用SO_KEEPALIVE
cfg.getSessionConfig().setKeepAlive(true);
int timeout = getConnectTimeout();
// 設置鏈接超時時間
cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
// set codec.
// 設置編解碼器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
// 加入集合
connectors.put(connectorKey, connector);
}
}
複製代碼
該方法是打開客戶端,在mina中用套接字鏈接者connector來表示。其中的操做就是新建一個connector,而且設置相應的屬性,而後加入集合。
@Override
protected void doConnect() throws Throwable {
// 鏈接服務器
ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this));
long start = System.currentTimeMillis();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
// 用於對線程的阻塞和喚醒
final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
// 加入監聽器
future.addListener(new IoFutureListener() {
@Override
public void operationComplete(IoFuture future) {
try {
// 若是已經讀完了
if (future.isReady()) {
// 建立得到該鏈接的IoSession實例
IoSession newSession = future.getSession();
try {
// Close old channel 關閉舊的session
IoSession oldSession = MinaClient.this.session; // copy reference
if (oldSession != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession);
}
// 關閉鏈接
oldSession.close();
} finally {
// 移除通道
MinaChannel.removeChannelIfDisconnected(oldSession);
}
}
} finally {
// 若是MinaClient關閉了
if (MinaClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new mina channel " + newSession + ", because the client closed.");
}
// 關閉session
newSession.close();
} finally {
MinaClient.this.session = null;
MinaChannel.removeChannelIfDisconnected(newSession);
}
} else {
// 設置新的session
MinaClient.this.session = newSession;
}
}
}
} catch (Exception e) {
exception.set(e);
} finally {
// 減小數量,釋放全部等待的線程
finish.countDown();
}
}
});
try {
// 當前線程等待,直到鎖存器倒計數到零,除非線程被中斷,或者指定的等待時間過去
finish.await(getConnectTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+ "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
+ Version.getVersion() + ", cause: " + e.getMessage(), e);
}
Throwable e = exception.get();
if (e != null) {
throw e;
}
}
複製代碼
該方法是客戶端鏈接服務器的實現方法。其中用到了CountDownLatch來表明完成完成事件,它來作一個線程等待,直到1個線程完成上述的動做,也就是鏈接完成結束,才釋放等待的線程。保證每次只有一條線程去鏈接,解決future.awaitUninterruptibly()死鎖問題。
其餘方法請自行查看我寫的註釋。
該類繼承了AbstractServer,是基於mina實現的服務端實現類。
private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);
/** * 套接字接收者對象 */
private SocketAcceptor acceptor;
複製代碼
@Override
protected void doOpen() throws Throwable {
// set thread pool.
// 建立套接字接收者對象
acceptor = new SocketAcceptor(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
Executors.newCachedThreadPool(new NamedThreadFactory("MinaServerWorker",
true)));
// config
// 設置配置
SocketAcceptorConfig cfg = (SocketAcceptorConfig) acceptor.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
// set codec. 設置編解碼器
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
// 開啓服務器
acceptor.bind(getBindAddress(), new MinaHandler(getUrl(), this));
}
複製代碼
該方法是建立服務器,而且打開服務器。關鍵就是調用了acceptor的方法。
@Override
protected void doClose() throws Throwable {
try {
if (acceptor != null) {
// 取消綁定,也就是關閉服務器
acceptor.unbind(getBindAddress());
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
複製代碼
該方法是關閉服務器,就是調用了acceptor.unbind方法。
@Override
public Collection<Channel> getChannels() {
// 得到鏈接到該服務器到全部鏈接句柄
Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
Collection<Channel> channels = new HashSet<Channel>();
for (IoSession session : sessions) {
if (session.isConnected()) {
// 每次都用一個鏈接句柄建立一個通道
channels.add(MinaChannel.getOrAddChannel(session, getUrl(), this));
}
}
return channels;
}
複製代碼
該方法是得到全部鏈接該服務器的通道。
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
// 得到鏈接到該服務器到全部鏈接句柄
Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
// 遍歷全部句柄,找到要找的通道
for (IoSession session : sessions) {
if (session.getRemoteAddress().equals(remoteAddress)) {
return MinaChannel.getOrAddChannel(session, getUrl(), this);
}
}
return null;
}
複製代碼
該方法是得到地址對應的單個通道。
public class MinaTransporter implements Transporter {
public static final String NAME = "mina";
@Override
public Server bind(URL url, ChannelHandler handler) throws RemotingException {
// 建立MinaServer實例
return new MinaServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
// 建立MinaClient實例
return new MinaClient(url, handler);
}
}
複製代碼
該類實現了Transporter接口,是基於mina的傳輸層實現。能夠看到,bind和connect方法分別就是建立了MinaServer和MinaClient實例。這裏我建議查看一下《dubbo源碼解析(九)遠程通訊——Transport層》。
該類是基於mina實現的編解碼類,實現了ProtocolCodecFactory。
/** * 編碼對象 */
private final ProtocolEncoder encoder = new InternalEncoder();
/** * 解碼對象 */
private final ProtocolDecoder decoder = new InternalDecoder();
/** * 編解碼器 */
private final Codec2 codec;
/** * url對象 */
private final URL url;
/** * 通道處理器對象 */
private final ChannelHandler handler;
/** * 緩衝區大小 */
private final int bufferSize;
複製代碼
屬性比較好理解,該編解碼器用到了ProtocolEncoder和ProtocolDecoder,而InternalEncoder和InternalDecoder兩個類是該類的內部類,它們實現了ProtocolEncoder和ProtocolDecoder,關鍵的編解碼邏輯在這兩個類中實現。
public MinaCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
// 若是緩存區大小在16字節之內,則設置配置大小,若是不是,則設置8字節的緩衝區大小
this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
複製代碼
private class InternalEncoder implements ProtocolEncoder {
@Override
public void dispose(IoSession session) throws Exception {
}
@Override
public void encode(IoSession session, Object msg, ProtocolEncoderOutput out) throws Exception {
// 動態分配一個1k的緩衝區
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
// 得到通道
MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
try {
// 編碼
codec.encode(channel, buffer, msg);
} finally {
// 檢測是否斷開鏈接,若是斷開,則移除
MinaChannel.removeChannelIfDisconnected(session);
}
// 寫數據到out中
out.write(ByteBuffer.wrap(buffer.toByteBuffer()));
out.flush();
}
}
複製代碼
該內部類是編碼類,其中的encode方法中寫到了編碼核心調用的是codec.encode。
private class InternalDecoder implements ProtocolDecoder {
private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;
@Override
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
int readable = in.limit();
if (readable <= 0) return;
ChannelBuffer frame;
// 若是緩衝區還有可讀字節數
if (buffer.readable()) {
// 若是緩衝區是DynamicChannelBuffer類型的
if (buffer instanceof DynamicChannelBuffer) {
// 往buffer中寫入數據
buffer.writeBytes(in.buf());
frame = buffer;
} else {
// 緩衝區大小
int size = buffer.readableBytes() + in.remaining();
// 動態分配一個緩衝區
frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
// buffer的數據把寫到frame
frame.writeBytes(buffer, buffer.readableBytes());
// 把流中的數據寫到frame
frame.writeBytes(in.buf());
}
} else {
// 不然是基於Java NIO的ByteBuffer生成的緩衝區
frame = ChannelBuffers.wrappedBuffer(in.buf());
}
// 得到通道
Channel channel = MinaChannel.getOrAddChannel(session, url, handler);
Object msg;
int savedReadIndex;
try {
do {
// 得到讀索引
savedReadIndex = frame.readerIndex();
try {
// 解碼
msg = codec.decode(channel, frame);
} catch (Exception e) {
buffer = ChannelBuffers.EMPTY_BUFFER;
throw e;
}
// 拆包
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
frame.readerIndex(savedReadIndex);
break;
} else {
if (savedReadIndex == frame.readerIndex()) {
buffer = ChannelBuffers.EMPTY_BUFFER;
throw new Exception("Decode without read data.");
}
if (msg != null) {
// 把數據寫到輸出流裏面
out.write(msg);
}
}
} while (frame.readable());
} finally {
// 若是frame還有可讀數據
if (frame.readable()) {
//丟棄可讀數據
frame.discardReadBytes();
buffer = frame;
} else {
buffer = ChannelBuffers.EMPTY_BUFFER;
}
MinaChannel.removeChannelIfDisconnected(session);
}
}
@Override
public void dispose(IoSession session) throws Exception {
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
}
}
複製代碼
該內部類是解碼類,其中decode方法中關鍵的是調用了codec.decode,其他的操做是利用緩衝區對數據的沖刷流轉。
該部分相關的源碼解析地址:github.com/CrazyHZM/in…
該文章講解了基於mina的來實現的遠程通訊、介紹dubbo-remoting-mina內的源碼解析,關鍵須要對mina有所瞭解。下一篇我會講解基於netty3實現遠程通訊部分。