dubbo 2.5.10 版本,netty仍然使用的是netty的3.10.5版本,咱們從下面的代碼能夠看出,SPI默認使用的是「netty」,而不是「netty4」。java
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.Adaptive; import com.alibaba.dubbo.common.extension.SPI; import javax.sound.midi.Receiver; /** * Transporter. (SPI, Singleton, ThreadSafe) * <p> * <a href="http://en.wikipedia.org/wiki/Transport_Layer">Transport Layer</a> * <a href="http://en.wikipedia.org/wiki/Client%E2%80%93server_model">Client/Server</a> * * @see com.alibaba.dubbo.remoting.Transporters */ @SPI("netty") public interface Transporter {
無論是NettyClient,仍是NettyServer建立Channel的工廠類ChannelFactory地建立方式都是同樣的,代碼以下:算法
// ChannelFactory's closure has a DirectMemory leak, using static to avoid // https://issues.jboss.org/browse/NETTY-424 private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)), Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)), Constants.DEFAULT_IO_THREADS);
顧名思義,ChannelFactory類是用來建立Channel的,那Channel是用來作什麼的呢?一句話歸納就是:全部與I/O相關的操做都是由Channel來實現的。從上面的代碼能夠看出Dubbo建立了2個I/O線程池,分別爲Boss線程池和Workder線程池,這2個線程池都是初始化爲「無邊界」的cached線程池,也就是說剛開始都是「來者不拒」,但實際上Boss線程默認最大隻容許1個線程,而Work線程池最大爲Constants.DEFAULT_IO_THREADS指定的線程數,即:CPU核數+1與32兩者取最小值。Boss線程負責進行處理全部的鏈接請求,鏈接請求處理完成後把後續任務的處理轉交給Work線程來處理。代碼以下:bootstrap
private static final int DEFAULT_BOSS_COUNT = 1; public NioClientSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount); }
public static final int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
從引導類的相關代碼能夠看出Client與Server是創建的TCP長鏈接(keepAlive=true),鏈接超時時間是從請求的Url中讀取。啓動TCP_NODELAY,就意味着禁用了Nagle算法,容許小包的發送。對於延時敏感型,同時數據傳輸量比較小的應用,開啓TCP_NODELAY選項無疑是一個正確的選擇。網絡
bootstrap = new ClientBootstrap(channelFactory); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getTimeout());
每一個Channel都會在同一個線程內默認建立一個對應的ChannelPipeline,能夠在ChannelPipeline中註冊一個或多個ChannelHandler(用來處理相關事件的回調)。從下面的代碼能夠看出,在ChannelPipleline中註冊了編碼處理器、解碼處理器以及自定義的nettyHanlder。框架
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } });
編碼主要使用了TelnetCodec 與 TransportCodec,TelnetCodec用於字符串的編碼,TransportCodec用於對其它對象進行編碼。socket
TelnetCodec(字符串編碼):tcp
public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException { if (message instanceof String) { if (isClientSide(channel)) { message = message + "\r\n"; } byte[] msgData = ((String) message).getBytes(getCharset(channel).name()); buffer.writeBytes(msgData); } else { super.encode(channel, buffer, message); } }
TransportCodec(對象編碼):ide
public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException { OutputStream output = new ChannelBufferOutputStream(buffer); ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output); encodeData(channel, objectOutput, message); objectOutput.flushBuffer(); }
引導類和ChannelPiple建立好以後就能夠進行執行connect操做了this
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
上面這行代碼用來阻塞當前線程直到connect完成或者大於設定的超時時間。若是在設定的超時時間前完成connect,則經過future來判斷鏈接是否成功。鏈接成功則把當前使用的Channel的寫功能掛起。若是鏈接不成功則拋出相關的異常。具體代碼以下:編碼
protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); ChannelFuture future = bootstrap.connect(getConnectAddress()); try { boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.getChannel(); newChannel.setInterestOps(Channel.OP_READ_WRITE); try { // Close old channel Channel oldChannel = NettyClient.this.channel; // copy reference if (oldChannel != null) { try { if (logger.isInfoEnabled()) { logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); } oldChannel.close(); } finally { NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { if (NettyClient.this.isClosed()) { try { if (logger.isInfoEnabled()) { logger.info("Close new netty channel " + newChannel + ", because the client closed."); } newChannel.close(); } finally { NettyClient.this.channel = null; NettyChannel.removeChannelIfDisconnected(newChannel); } } else { NettyClient.this.channel = newChannel; } } } else if (future.getCause() != null) { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause()); } else { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); } } finally { if (!isConnected()) { future.cancel(); } } }
鏈接成功,經過future獲取到的當前使用的Channel會經過getChannel()方法來把此Channel放到一個靜態的channelMap中去,代碼以下:
@Override protected com.alibaba.dubbo.remoting.Channel getChannel() { Channel c = channel; if (c == null || !c.isActive()) return null; return NettyChannel.getOrAddChannel(c, getUrl(), this); } private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>(); static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; } NettyChannel ret = channelMap.get(ch); if (ret == null) { NettyChannel nettyChannel = new NettyChannel(ch, url, handler); if (ch.isActive()) { ret = channelMap.putIfAbsent(ch, nettyChannel); } if (ret == null) { ret = nettyChannel; } } return ret; }
若是Client斷開了與Server的鏈接,則刪除channelMap中的於當前鏈接有關的Channel
@Override protected void doDisConnect() throws Throwable { try { NettyChannel.removeChannelIfDisconnected(channel); } catch (Throwable t) { logger.warn(t.getMessage()); } }