目標:介紹基於netty4的來實現的遠程通訊、介紹dubbo-remoting-netty4內的源碼解析。
netty4對netty3兼容性不是很好,而且netty4在不少的術語和api也發生了改變,致使升級netty4會很艱辛,網上應該有不少相關文章,高版本的總有高版本的優點所在,因此dubbo也須要與時俱進,又新增了基於netty4來實現遠程通信模塊。下面講解的,若是跟上一篇文章有重複的地方我就略過去了。關鍵仍是要把遠程通信的api那幾篇看懂,看這幾篇實現纔會很簡單。java
下面是包的結構:git
該類繼承了AbstractChannel,是基於netty4的通道實現類github
/** * 通道集合 */ private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>(); /** * 通道 */ private final Channel channel; /** * 屬性集合 */ private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
屬性跟netty3實現的通道類屬性幾乎同樣,我就不講解了。bootstrap
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; } // 首先從集合中取通道 NettyChannel ret = channelMap.get(ch); // 若是爲空,則新建 if (ret == null) { NettyChannel nettyChannel = new NettyChannel(ch, url, handler); // 若是通道還活躍着 if (ch.isActive()) { // 加入集合 ret = channelMap.putIfAbsent(ch, nettyChannel); } if (ret == null) { ret = nettyChannel; } } return ret; }
該方法是得到通道,若是集合中沒有找到對應通道,則建立一個,而後加入集合。segmentfault
@Override public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { // 寫入數據,發送消息 ChannelFuture future = channel.writeAndFlush(message); // 若是已經發送過 if (sent) { // 得到超時時間 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 等待timeout的鏈接時間後查看是否發送成功 success = future.await(timeout); } // 得到異常 Throwable cause = future.cause(); // 若是異常不爲空,則拋出異常 if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }
該方法是發送消息,調用了channel.writeAndFlush方法,與netty3的實現只是調用的api不一樣。api
@Override public void close() { try { super.close(); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { // 移除通道 removeChannelIfDisconnected(channel); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { // 清理屬性集合 attributes.clear(); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { if (logger.isInfoEnabled()) { logger.info("Close netty channel " + channel); } // 關閉通道 channel.close(); } catch (Exception e) { logger.warn(e.getMessage(), e); } }
該方法就是操做了四個步驟,比較清晰。服務器
該類繼承了ChannelDuplexHandler,是基於netty4實現的客戶端通道處理實現類。這裏的設計與netty3實現的通道處理器有所不一樣,netty3實現的通道處理器是被客戶端和服務端統一使用的,而在這裏服務端和客戶端使用了兩個不一樣的Handler來處理。而且netty3的NettyHandler是基於netty3的SimpleChannelHandler設計的,而這裏是基於netty4的ChannelDuplexHandler。ide
/** * url對象 */ private final URL url; /** * 通道 */ private final ChannelHandler handler;
該類的屬性只有兩個,下面實現的方法也都是調用了handler的方法,我就舉一個例子:oop
@Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { // 得到通道 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { // 斷開鏈接 handler.disconnected(channel); } finally { // 從集合中移除 NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
能夠看到分了三部,得到通道對象,調用handler方法,最後檢測一下通道是否活躍。其餘方法也是差很少。源碼分析
該類繼承了ChannelDuplexHandler,是基於netty4實現的服務端通道處理實現類。
/** * 鏈接該服務器的通道數 key爲ip:port */ private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel> /** * url對象 */ private final URL url; /** * 通道處理器 */ private final ChannelHandler handler;
該類有三個屬性,比NettyClientHandler多了一個屬性channels,下面的實現方法也是同樣的,都是調用了handler方法,來看一個例子:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 激活事件 ctx.fireChannelActive(); // 得到通道 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { // 若是通道不爲空,則加入集合中 if (channel != null) { channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel); } // 鏈接該通道 handler.connected(channel); } finally { // 若是通道不活躍,則移除通道 NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
該方法是通道活躍的時候調用了handler.connected,差很少也是常規套路,就多了激活事件和加入到通道中。其餘方法也差很少。
該類繼承了AbstractClient,是基於netty4實現的客戶端實現類。
/** * NioEventLoopGroup對象 */ private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true)); /** * 客戶端引導類 */ private Bootstrap bootstrap; /** * 通道 */ private volatile Channel channel; // volatile, please copy reference to use
屬性裏的NioEventLoopGroup對象是netty4中的對象,什麼用處請看netty的解析。
@Override protected void doOpen() throws Throwable { // 建立一個客戶端的通道處理器 final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); // 建立一個引導類 bootstrap = new Bootstrap(); // 設置可選項 bootstrap.group(nioEventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) .channel(NioSocketChannel.class); // 若是鏈接超時時間小於3s,則設置爲3s,也就是說最低的超時時間爲3s if (getConnectTimeout() < 3000) { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); } else { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout()); } // 建立一個客戶端 bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { // 編解碼器 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); // 加入責任鏈 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyClientHandler); } }); }
該方法仍是作了建立客戶端,而且打開的操做,其中不少的參數設置操做。
其餘方法跟 dubbo源碼解析(十六)遠程通訊——Netty3中寫到的NettyClient實現同樣。
該類繼承了AbstractServer,實現了Server。是基於netty4實現的服務器類
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); /** * 鏈接該服務器的通道集合 key爲ip:port */ private Map<String, Channel> channels; // <ip:port, channel> /** * 服務器引導類 */ private ServerBootstrap bootstrap; /** * 通道 */ private io.netty.channel.Channel channel; /** * boss線程組 */ private EventLoopGroup bossGroup; /** * worker線程組 */ private EventLoopGroup workerGroup;
屬性相較netty3而言,新增了兩個線程組,一樣也是由於netty3和netty4的設計不一樣。
@Override protected void doOpen() throws Throwable { // 建立服務引導類 bootstrap = new ServerBootstrap(); // 建立boss線程組 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); // 建立worker線程組 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // 建立服務器處理器 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); // 得到通道集合 channels = nettyServerHandler.getChannels(); // 設置ventLoopGroup還有可選項 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 編解碼器 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); // 增長責任鏈 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind 綁定 ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); // 等待綁定完成 channelFuture.syncUninterruptibly(); // 設置通道 channel = channelFuture.channel(); }
該方法是建立服務器,而且開啓。若是熟悉netty4點朋友應該以爲仍是很好理解的。其餘方法跟《 dubbo源碼解析(十六)遠程通訊——Netty3》中寫到的NettyClient實現同樣,處理close中要多關閉兩個線程組
該類跟 《dubbo源碼解析(十六)遠程通訊——Netty3》中的NettyTransporter同樣的實現。
該類是基於netty4的編解碼器。
/** * 編碼器 */ private final ChannelHandler encoder = new InternalEncoder(); /** * 解碼器 */ private final ChannelHandler decoder = new InternalDecoder(); /** * 編解碼器 */ private final Codec2 codec; /** * url對象 */ private final URL url; /** * 通道處理器 */ private final com.alibaba.dubbo.remoting.ChannelHandler handler;
屬性跟基於netty3實現的編解碼同樣。
private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { // 建立緩衝區 com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); // 得到通道 Channel ch = ctx.channel(); // 得到netty通道 NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { // 編碼 codec.encode(channel, buffer, msg); } finally { // 檢測通道是否活躍 NettyChannel.removeChannelIfDisconnected(ch); } } }
該內部類是編碼器的抽象,主要的編碼仍是調用了codec.encode。
private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { // 建立緩衝區 ChannelBuffer message = new NettyBackedChannelBuffer(input); // 得到通道 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { // 記錄讀索引 saveReaderIndex = message.readerIndex(); try { // 解碼 msg = codec.decode(channel, message); } catch (IOException e) { throw e; } // 拆包 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data."); } // 讀取數據 if (msg != null) { out.add(msg); } } } while (message.readable()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } }
該內部類是解碼器的抽象類,其中關鍵的是調用了codec.decode。
該類是緩衝區類。
/** * 緩衝區 */ private ByteBuf buffer;
其中的方法幾乎都調用了該屬性的方法。而ByteBuf是netty4中的字節數據的容器。
這兩個類是用於用於格式化的,是從netty4中複製出來的,其中而且稍微作了一下改動。我就再也不講解了。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了基於netty4的來實現的遠程通訊、介紹dubbo-remoting-netty4內的源碼解析,關鍵須要對netty4有所瞭解。下一篇我會講解基於zookeeper實現遠程通訊部分。