buddo源碼分析-transport組件之Netty(一)

dubbo 2.5.10 版本,netty仍然使用的是netty的3.10.5版本,咱們從下面的代碼能夠看出,SPI默認使用的是「netty」,而不是「netty4」。java

package com.alibaba.dubbo.remoting;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;

import javax.sound.midi.Receiver;

/**
 * Transporter. (SPI, Singleton, ThreadSafe)
 * <p>
 * <a href="http://en.wikipedia.org/wiki/Transport_Layer">Transport Layer</a>
 * <a href="http://en.wikipedia.org/wiki/Client%E2%80%93server_model">Client/Server</a>
 *
 * @see com.alibaba.dubbo.remoting.Transporters
 */ @SPI("netty") public interface Transporter {

無論是NettyClient,仍是NettyServer建立Channel的工廠類ChannelFactory地建立方式都是同樣的,代碼以下:算法

// ChannelFactory's closure has a DirectMemory leak, using static to avoid
// https://issues.jboss.org/browse/NETTY-424
private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(
    Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
    Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
    Constants.DEFAULT_IO_THREADS);

顧名思義,ChannelFactory類是用來建立Channel的,那Channel是用來作什麼的呢?一句話歸納就是:全部與I/O相關的操做都是由Channel來實現的。從上面的代碼能夠看出Dubbo建立了2個I/O線程池,分別爲Boss線程池和Workder線程池,這2個線程池都是初始化爲「無邊界」的cached線程池,也就是說剛開始都是「來者不拒」,但實際上Boss線程默認最大隻容許1個線程,而Work線程池最大爲Constants.DEFAULT_IO_THREADS指定的線程數,即:CPU核數+1與32兩者取最小值。Boss線程負責進行處理全部的鏈接請求,鏈接請求處理完成後把後續任務的處理轉交給Work線程來處理。代碼以下:bootstrap

private static final int DEFAULT_BOSS_COUNT = 1;
public NioClientSocketChannelFactory(
        Executor bossExecutor, Executor workerExecutor, int workerCount) {
    this(bossExecutor, workerExecutor, DEFAULT_BOSS_COUNT, workerCount);
}
public static final int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);

 從引導類的相關代碼能夠看出Client與Server是創建的TCP長鏈接(keepAlive=true),鏈接超時時間是從請求的Url中讀取。啓動TCP_NODELAY,就意味着禁用了Nagle算法,容許小包的發送。對於延時敏感型,同時數據傳輸量比較小的應用,開啓TCP_NODELAY選項無疑是一個正確的選擇。網絡

bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());

每一個Channel都會在同一個線程內默認建立一個對應的ChannelPipeline,能夠在ChannelPipeline中註冊一個或多個ChannelHandler(用來處理相關事件的回調)。從下面的代碼能夠看出,在ChannelPipleline中註冊了編碼處理器、解碼處理器以及自定義的nettyHanlder。框架

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
        ChannelPipeline pipeline = Channels.pipeline();
 pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline;
    }
});

編碼主要使用了TelnetCodec 與 TransportCodec,TelnetCodec用於字符串的編碼,TransportCodec用於對其它對象進行編碼。socket

TelnetCodec(字符串編碼):tcp

    public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
        if (message instanceof String) {
            if (isClientSide(channel)) {
                message = message + "\r\n";
            }
            byte[] msgData = ((String) message).getBytes(getCharset(channel).name());
            buffer.writeBytes(msgData);
        } else {
            super.encode(channel, buffer, message);
        }
    }

TransportCodec(對象編碼):ide

    public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
        OutputStream output = new ChannelBufferOutputStream(buffer);
        ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
        encodeData(channel, objectOutput, message);
        objectOutput.flushBuffer();
    } 

引導類和ChannelPiple建立好以後就能夠進行執行connect操做了this

boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

上面這行代碼用來阻塞當前線程直到connect完成或者大於設定的超時時間。若是在設定的超時時間前完成connect,則經過future來判斷鏈接是否成功。鏈接成功則把當前使用的Channel的寫功能掛起。若是鏈接不成功則拋出相關的異常。具體代碼以下:編碼

  protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        try {
            boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); if (ret && future.isSuccess()) {
                Channel newChannel = future.getChannel();
                newChannel.setInterestOps(Channel.OP_READ_WRITE);
                try {
                    // Close old channel
                    Channel oldChannel = NettyClient.this.channel; // copy reference
                    if (oldChannel != null) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                            }
                            oldChannel.close();
                        } finally {
                            NettyChannel.removeChannelIfDisconnected(oldChannel);
                        }
                    }
                } finally {
                    if (NettyClient.this.isClosed()) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                            }
                            newChannel.close();
                        } finally {
                            NettyClient.this.channel = null;
                            NettyChannel.removeChannelIfDisconnected(newChannel);
                        }
                    } else {
                        NettyClient.this.channel = newChannel;
                    }
                }
            } else if (future.getCause() != null) {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
            } else {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + " client-side timeout "
                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
            }
        } finally {
            if (!isConnected()) {
                future.cancel();
            }
        }
    }

鏈接成功,經過future獲取到的當前使用的Channel會經過getChannel()方法來把此Channel放到一個靜態的channelMap中去,代碼以下:

@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
    Channel c = channel;
    if (c == null || !c.isActive())
        return null;
    return NettyChannel.getOrAddChannel(c, getUrl(), this);
}


private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>(); static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
    if (ch == null) {
        return null;
    }
    NettyChannel ret = channelMap.get(ch);
    if (ret == null) {
        NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
        if (ch.isActive()) {
            ret = channelMap.putIfAbsent(ch, nettyChannel);
        }
        if (ret == null) {
            ret = nettyChannel;
        }
    }
    return ret;
}

 若是Client斷開了與Server的鏈接,則刪除channelMap中的於當前鏈接有關的Channel

    @Override
    protected void doDisConnect() throws Throwable {
        try {
            NettyChannel.removeChannelIfDisconnected(channel);
        } catch (Throwable t) {
            logger.warn(t.getMessage());
        }
    }

總結:

  1. Dubbo底層的網絡通信默認採用的是Netty框架;
  2. Netty Client採用2級I/O線程池,分別爲:Boss線程池(默認最大隻容許1個線程)、Worker線程池(默認最大隻容許CPU核心數+1或32,兩者取其小);Boss負責處理鏈接請求,後續任務由Workder來處理;
  3. Netty Client 與 Netty Server 之間創建的是TCP長鏈接;
  4. 由引導類(bootStrap)來註冊ChannelPipeline與創建鏈接(connect),在ChannelPipeline中須要註冊項目編碼、解碼等ChannelHanlder以處理相關事件的回調。

 Dubbo 選擇 Transporter 的流程圖:

相關文章
相關標籤/搜索