目標:介紹基於netty3的來實現的遠程通訊、介紹dubbo-remoting-netty內的源碼解析。
如今dubbo默認的網絡傳輸Transport接口默認實現的仍是基於netty3實現的網絡傳輸,不過立刻後面默認實現就要改成netty4了。因爲netty4對netty3對兼容性不是很好,因此保留了兩個版本的實現。java
下面是包結構:git
該類繼承了AbstractChannel類,是基於netty3實現的通道。github
/** * 通道集合 */ private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>(); /** * 通道 */ private final org.jboss.netty.channel.Channel channel; /** * 屬性集合 */ private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; } // 首先從集合中取通道 NettyChannel ret = channelMap.get(ch); // 若是爲空,則新建 if (ret == null) { NettyChannel nc = new NettyChannel(ch, url, handler); // 若是通道鏈接着 if (ch.isConnected()) { // 加入集合 ret = channelMap.putIfAbsent(ch, nc); } if (ret == null) { ret = nc; } } return ret; }
該方法是得到通道,當通道在集合中沒有的時候,新建一個通道。算法
static void removeChannelIfDisconnected(org.jboss.netty.channel.Channel ch) { if (ch != null && !ch.isConnected()) { channelMap.remove(ch); } }
該方法是當通道沒有鏈接的時候,從集合中移除它。bootstrap
@Override public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { // 寫入數據,發送消息 ChannelFuture future = channel.write(message); // 若是已經發送過 if (sent) { // 得到超時時間 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 等待timeout的鏈接時間後查看是否發送成功 success = future.await(timeout); } // 看是否有異常 Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }
該方法是發送消息,其中用到了channe.write方法傳輸消息,而且經過返回的future來判斷是否發送成功。segmentfault
@Override public void close() { try { super.close(); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { // 若是通道斷開,則移除該通道 removeChannelIfDisconnected(channel); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { // 清空屬性 attributes.clear(); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { if (logger.isInfoEnabled()) { logger.info("Close netty channel " + channel); } // 關閉通道 channel.close(); } catch (Exception e) { logger.warn(e.getMessage(), e); } }
該方法是關閉通道,作了三個操做,分別是從集合中移除、清除屬性、關閉通道。緩存
其餘實現方法比較簡單,我就講解了。服務器
該類繼承了SimpleChannelHandler類,是基於netty3的通道處理器,而該類被加上了@Sharable註解,也就是說該處理器能夠從屬於多個ChannelPipeline網絡
/** * 通道集合,key是主機地址 ip:port */ private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel> /** * url對象 */ private final URL url; /** * 通道 */ private final ChannelHandler handler;
該類的屬性比較簡單,而且該類中實現的方法都是調用了屬性handler的方法,我舉一個例子來說,其餘的能夠本身查看源碼,比較簡單。app
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // 得到通道實例 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { if (channel != null) { // 保存該通道,加入到集合中 channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel); } // 鏈接 handler.connected(channel); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }
該方法是通道鏈接的方法,其中先獲取了通道實例,而後吧該實例加入到集合中,最好帶哦用handler.connected來進行鏈接。
該類繼承了AbstractClient,是基於netty3實現的客戶端類。
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); // ChannelFactory's closure has a DirectMemory leak, using static to avoid // https://issues.jboss.org/browse/NETTY-424 /** * 通道工廠,用static來避免直接緩存區的一個OOM問題 */ private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)), Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)), Constants.DEFAULT_IO_THREADS); /** * 客戶端引導對象 */ private ClientBootstrap bootstrap; /** * 通道 */ private volatile Channel channel; // volatile, please copy reference to use
上述屬性中ChannelFactory用了static修飾,爲了不netty3中會有直接緩衝內存泄漏的現象,具體的討論能夠訪問註釋中的討論。
@Override protected void doOpen() throws Throwable { // 設置日誌工廠 NettyHelper.setNettyLoggerFactory(); // 實例化客戶端引導類 bootstrap = new ClientBootstrap(channelFactory); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig // 配置選擇項 bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getConnectTimeout()); // 建立通道處理器 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // 設置責任鏈路 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { /** * 得到通道 * @return */ @Override public ChannelPipeline getPipeline() { // 新建編解碼 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); // 得到管道 ChannelPipeline pipeline = Channels.pipeline(); // 設置解碼器 pipeline.addLast("decoder", adapter.getDecoder()); // 設置編碼器 pipeline.addLast("encoder", adapter.getEncoder()); // 設置通道處理器 pipeline.addLast("handler", nettyHandler); // 返回通道 return pipeline; } }); }
該方法是建立客戶端,而且打開,其中的邏輯就是用netty3的客戶端引導類來建立一個客戶端,若是對netty不熟悉的朋友能夠先補補netty知識。
@Override protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); // 用引導類鏈接 ChannelFuture future = bootstrap.connect(getConnectAddress()); try { // 在超時時間內是否鏈接完成 boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); if (ret && future.isSuccess()) { // 得到通道 Channel newChannel = future.getChannel(); // 異步修改此通道 newChannel.setInterestOps(Channel.OP_READ_WRITE); try { // Close old channel 關閉舊的通道 Channel oldChannel = NettyClient.this.channel; // copy reference if (oldChannel != null) { try { if (logger.isInfoEnabled()) { logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); } // 關閉 oldChannel.close(); } finally { // 移除通道 NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { // 若是客戶端關閉 if (NettyClient.this.isClosed()) { try { if (logger.isInfoEnabled()) { logger.info("Close new netty channel " + newChannel + ", because the client closed."); } // 關閉通道 newChannel.close(); } finally { NettyClient.this.channel = null; NettyChannel.removeChannelIfDisconnected(newChannel); } } else { NettyClient.this.channel = newChannel; } } } else if (future.getCause() != null) { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause()); } else { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); } } finally { // 若是客戶端沒有鏈接 if (!isConnected()) { // 取消future future.cancel(); } } }
該方法是客戶端鏈接服務器的方法。其中調用了bootstrap.connect。後面的邏輯是用來檢測是否鏈接,最後若是未鏈接,則會取消該鏈接任務。
@Override protected void doClose() throws Throwable { /*try { bootstrap.releaseExternalResources(); } catch (Throwable t) { logger.warn(t.getMessage()); }*/ }
在這裏不能關閉是由於channelFactory 是靜態屬性,被多個 NettyClient 共用。因此不能釋放資源。
該類繼承了AbstractServer,實現了Server,是基於netty3實現的服務器類。
/** * 鏈接該服務器的通道集合 */ private Map<String, Channel> channels; // <ip:port, channel> /** * 服務器引導類對象 */ private ServerBootstrap bootstrap; /** * 通道 */ private org.jboss.netty.channel.Channel channel;
@Override protected void doOpen() throws Throwable { // 設置日誌工廠 NettyHelper.setNettyLoggerFactory(); // 建立線程池 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); // 新建通道工廠 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); // 新建服務引導類對象 bootstrap = new ServerBootstrap(channelFactory); // 新建通道處理器 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // 得到通道集合 channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); // 禁用nagle算法,將數據當即發送出去。納格算法是以減小封包傳送量來增進TCP/IP網絡的效能 bootstrap.setOption("child.tcpNoDelay", true); // 設置管道工廠 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { /** * 得到通道 * @return */ @Override public ChannelPipeline getPipeline() { // 新建編解碼器 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); // 得到通道 ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ // 設置解碼器 pipeline.addLast("decoder", adapter.getDecoder()); // 設置編碼器 pipeline.addLast("encoder", adapter.getEncoder()); // 設置通道處理器 pipeline.addLast("handler", nettyHandler); // 返回通道 return pipeline; } }); // bind 綁定地址,也就是啓用服務器 channel = bootstrap.bind(getBindAddress()); }
該方法是建立服務器,而且打開服務器。一樣建立服務器的方式跟正常的用netty建立服務器方式同樣,只是新加了編碼器和解碼器。還有一個注意點就是這裏ServerBootstrap 的可選項。
@Override protected void doClose() throws Throwable { try { if (channel != null) { // unbind.關閉通道 channel.close(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { // 得到全部鏈接該服務器的通道集合 Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels(); if (channels != null && !channels.isEmpty()) { // 遍歷通道集合 for (com.alibaba.dubbo.remoting.Channel channel : channels) { try { // 關閉通道鏈接 channel.close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { if (bootstrap != null) { // release external resource. 回收資源 bootstrap.releaseExternalResources(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { if (channels != null) { // 清空集合 channels.clear(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } }
該方法是關閉服務器,一系列的操做很清晰,我就很少說了。
@Override public Collection<Channel> getChannels() { Collection<Channel> chs = new HashSet<Channel>(); for (Channel channel : this.channels.values()) { // 若是通道鏈接,則加入集合,返回 if (channel.isConnected()) { chs.add(channel); } else { channels.remove(NetUtils.toAddressString(channel.getRemoteAddress())); } } return chs; }
該方法是返回鏈接該服務器的通道集合,而且用了HashSet保存,不會重複。
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { // 建立一個NettyServer return new NettyServer(url, listener); } @Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { // 建立一個NettyClient return new NettyClient(url, listener); } }
該類就是基於netty3的Transporter實現類,一樣兩個方法也是分別建立了NettyServer和NettyClient。
該類是設置日誌的工具類,其中基於netty3的InternalLoggerFactory實現類一個DubboLoggerFactory。這個我就不講解了,比較好理解,不理解也無傷大雅。
該類是基於netty3實現的編解碼類。
/** * 編碼者 */ private final ChannelHandler encoder = new InternalEncoder(); /** * 解碼者 */ private final ChannelHandler decoder = new InternalDecoder(); /** * 編解碼器 */ private final Codec2 codec; /** * url對象 */ private final URL url; /** * 緩衝區大小 */ private final int bufferSize; /** * 通道對象 */ private final com.alibaba.dubbo.remoting.ChannelHandler handler;
InternalEncoder和InternalDecoder屬性是該類的內部類,分別掌管着編碼和解碼
public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) { this.codec = codec; this.url = url; this.handler = handler; int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE); // 若是緩存區大小在16字節之內,則設置配置大小,若是不是,則設置8字節的緩衝區大小 this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE; }
你會發現對於緩存區大小的規則都是同樣的。
@Sharable private class InternalEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception { // 動態分配一個1k的緩衝區 com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024); // 得到通道對象 NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { // 編碼 codec.encode(channel, buffer, msg); } finally { NettyChannel.removeChannelIfDisconnected(ch); } // 基於buteBuffer建立一個緩衝區,而且寫入數據 return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer()); } }
該內部類實現類編碼的邏輯,主要調用了codec.encode。
private class InternalDecoder extends SimpleChannelUpstreamHandler { private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { Object o = event.getMessage(); // 若是消息不是一個ChannelBuffer類型 if (!(o instanceof ChannelBuffer)) { // 轉發事件到與此上下文關聯的處理程序最近的上游 ctx.sendUpstream(event); return; } ChannelBuffer input = (ChannelBuffer) o; // 若是可讀數據不大於0,直接返回 int readable = input.readableBytes(); if (readable <= 0) { return; } com.alibaba.dubbo.remoting.buffer.ChannelBuffer message; if (buffer.readable()) { // 判斷buffer是不是動態分配的緩衝區 if (buffer instanceof DynamicChannelBuffer) { // 寫入數據 buffer.writeBytes(input.toByteBuffer()); message = buffer; } else { // 須要的緩衝區大小 int size = buffer.readableBytes() + input.readableBytes(); // 動態生成緩衝區 message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer( size > bufferSize ? size : bufferSize); // 把buffer數據寫入message message.writeBytes(buffer, buffer.readableBytes()); // 把input數據寫入message message.writeBytes(input.toByteBuffer()); } } else { // 不然 基於ByteBuffer經過buffer來建立一個新的緩衝區 message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer( input.toByteBuffer()); } NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { saveReaderIndex = message.readerIndex(); try { // 解碼 msg = codec.decode(channel, message); } catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } // 拆包 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { // 若是已經到達讀索引,則沒有數據可解碼 if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } // if (msg != null) { // 將消息發送到指定關聯的處理程序最近的上游 Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while (message.readable()); } finally { // 若是消息還有可讀數據,則丟棄 if (message.readable()) { message.discardReadBytes(); buffer = message; } else { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; } NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { ctx.sendUpstream(e); } }
該內部類實現瞭解碼的邏輯,其中大部分邏輯都在對數據作讀寫,關鍵的解碼調用了codec.decode。
該類是建立緩衝區的工廠類。它實現了ChannelBufferFactory接口,也就是實現類它的三種得到緩衝區的方法。
public class NettyBackedChannelBufferFactory implements ChannelBufferFactory { /** * 單例 */ private static final NettyBackedChannelBufferFactory INSTANCE = new NettyBackedChannelBufferFactory(); public static ChannelBufferFactory getInstance() { return INSTANCE; } @Override public ChannelBuffer getBuffer(int capacity) { return new NettyBackedChannelBuffer(ChannelBuffers.dynamicBuffer(capacity)); } @Override public ChannelBuffer getBuffer(byte[] array, int offset, int length) { org.jboss.netty.buffer.ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(length); buffer.writeBytes(array, offset, length); return new NettyBackedChannelBuffer(buffer); } @Override public ChannelBuffer getBuffer(ByteBuffer nioBuffer) { return new NettyBackedChannelBuffer(ChannelBuffers.wrappedBuffer(nioBuffer)); } }
能夠看到,都是建立了一個NettyBackedChannelBuffer,下面講解NettyBackedChannelBuffer。
該類是基於netty3的buffer從新實現的緩衝區,它實現了ChannelBuffer接口,而且有一個屬性:
private org.jboss.netty.buffer.ChannelBuffer buffer;
那麼其中的幾乎全部方法都是調用了這個buffer的方法,由於我在dubbo源碼解析(十一)遠程通訊——Buffer中寫到ChannelBuffer接口方法定義跟netty中的緩衝區定義幾乎同樣,連註釋都幾乎同樣。全部知識單純的調用了buffer的方法。具體的代碼能夠查看個人GitHub
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了基於netty3的來實現的遠程通訊、介紹dubbo-remoting-netty內的源碼解析,關鍵須要對netty有所瞭解。下一篇我會講解基於netty4實現遠程通訊部分。