Dubbo源碼解析(十三)遠程通訊——Grizzly

遠程通信——Grizzly

目標:介紹基於Grizzly的來實現的遠程通訊、介紹dubbo-remoting-grizzly內的源碼解析。java

前言

Grizzly NIO框架的設計初衷是幫助開發者更好地利用Java NIO API,構建強大的可擴展的服務器應用。關於Grizzly我也沒有很熟悉,因此只能根據grizzly在dubbo的遠程通信中應用稍微講解一下。git

下面是dubbo-remoting-grizzly下的包結構:github

dubbo-remoting-grizzly包結構

源碼分析

(一)GrizzlyChannel

1.屬性

private static final Logger logger = LoggerFactory.getLogger(GrizzlyChannel.class);

/** * 通道key */
private static final String CHANNEL_KEY = GrizzlyChannel.class.getName() + ".CHANNEL";

/** * 通道屬性 */
private static final Attribute<GrizzlyChannel> ATTRIBUTE = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(CHANNEL_KEY);

/** * Grizzly的鏈接實例 */
private final Connection<?> connection;
複製代碼

能夠看到,該類中的ATTRIBUTE和connection都是Grizzly涉及到的屬性,ATTRIBUTE中封裝了GrizzlyChannel的實例還有Connection實例。Grizzly把鏈接的一些鏈接的方法定義在了Connection接口中,包括得到遠程地址、檢測通道是否鏈接等方法。segmentfault

2.send

@Override
@SuppressWarnings("rawtypes")
public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    int timeout = 0;
    try {
        // 發送消息,得到GrizzlyFuture實例
        GrizzlyFuture future = connection.write(message);
        if (sent) {
            // 得到延遲多少時間得到響應
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 得到請求的值
            future.get(timeout, TimeUnit.MILLISECONDS);
        }
    } catch (TimeoutException e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit", e);
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
}
複製代碼

該方法是發送消息的方法,調用了connection的write方法,它會返回一個GrizzlyFuture實例,GrizzlyFuture繼承了java.util.concurrent.Future。緩存

其餘方法比較簡單,實現的方法都基本調用connection的方法或者Grizzly.DEFAULT_ATTRIBUTE_BUILDER的方法。服務器

(二)GrizzlyHandler

該類是Grizzly的通道處理器,它繼承了BaseFilter。app

/** * url */
private final URL url;

/** * 通道處理器 */
private final ChannelHandler handler;
複製代碼

該類有兩個屬性,這兩個屬性應該很熟悉了。而該類有handleConnect、handleClose、handleRead、handleWrite、exceptionOccurred的實現方法,分別調用了ChannelHandler中封裝的五個方法。舉個例子:框架

@Override
public NextAction handleConnect(FilterChainContext ctx) throws IOException {
    // 得到Connection鏈接實例
    Connection<?> connection = ctx.getConnection();
    // 得到GrizzlyChannel通道
    GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
    try {
        // 鏈接
        handler.connected(channel);
    } catch (RemotingException e) {
        throw new IOException(StringUtils.toString(e));
    } finally {
        GrizzlyChannel.removeChannelIfDisconnected(connection);
    }
    return ctx.getInvokeAction();
}
複製代碼

能夠看到得到GrizzlyChannel通道就調用了handler.connected進行鏈接。而其餘四個方法也差很少,感興趣的能夠自行查看代碼。less

(三)GrizzlyClient

該類是Grizzly的客戶端實現類,繼承了AbstractClient類ide

/** * Grizzly中的傳輸對象 */
private TCPNIOTransport transport;

/** * 鏈接實例 */
private volatile Connection<?> connection; // volatile, please copy reference to use
複製代碼

能夠看到屬性中有TCPNIOTransport的實例,在該類中實現的客戶端方法都調用了transport中的方法,TCPNIOTransport中封裝了建立,鏈接,斷開鏈接等方法。

咱們來看建立客戶端的邏輯:

@Override
protected void doOpen() throws Throwable {
    // 作一些過濾,用於處理消息
    FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
    filterChainBuilder.add(new TransportFilter());
    filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
    filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
    // 傳輸構建者
    TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
    // 得到線程池配置實例
    ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
    // 設置線程池的配置,包括核心線程數等
    config.setPoolName(CLIENT_THREAD_POOL_NAME)
            .setQueueLimit(-1)
            .setCorePoolSize(0)
            .setMaxPoolSize(Integer.MAX_VALUE)
            .setKeepAliveTime(60L, TimeUnit.SECONDS);
    // 設置建立屬性
    builder.setTcpNoDelay(true).setKeepAlive(true)
            .setConnectionTimeout(getConnectTimeout())
            .setIOStrategy(SameThreadIOStrategy.getInstance());
    // 建立一個transport
    transport = builder.build();
    transport.setProcessor(filterChainBuilder.build());
    // 建立客戶端
    transport.start();
}
複製代碼

能夠看到首先是設置及一些過濾,在上面對信息先處理,而後設置了線程池的配置,建立transport,而且用transport來進行建立客戶端。

(四)GrizzlyServer

該類是Grizzly的服務器實現類,繼承了AbstractServer類。

/** * 鏈接該服務器的客戶端通道集合 */
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>

/** * 傳輸實例 */
private TCPNIOTransport transport;
複製代碼

該類中有兩個屬性,其中transport用法跟GrizzlyClient中的同樣。

我也就講解一個doOpen方法:

@Override
protected void doOpen() throws Throwable {
    // 增長過濾器,來處理信息
    FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
    filterChainBuilder.add(new TransportFilter());

    filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
    filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
    TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
    // 得到線程池配置
    ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
    config.setPoolName(SERVER_THREAD_POOL_NAME).setQueueLimit(-1);
    // 得到url配置中線程池類型
    String threadpool = getUrl().getParameter(Constants.THREADPOOL_KEY, Constants.DEFAULT_THREADPOOL);
    if (Constants.DEFAULT_THREADPOOL.equals(threadpool)) {
        // 優先從url得到線程池的線程數,默認線程數爲200
        int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 設置線程池配置
        config.setCorePoolSize(threads).setMaxPoolSize(threads)
                .setKeepAliveTime(0L, TimeUnit.SECONDS);
        // 若是是cached類型的線程池
    } else if ("cached".equals(threadpool)) {
        int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // 設置核心線程數爲0、最大線程數爲threads
        config.setCorePoolSize(0).setMaxPoolSize(threads)
                .setKeepAliveTime(60L, TimeUnit.SECONDS);
    } else {
        throw new IllegalArgumentException("Unsupported threadpool type " + threadpool);
    }
    builder.setKeepAlive(true).setReuseAddress(false)
            .setIOStrategy(SameThreadIOStrategy.getInstance());
    // 建立transport
    transport = builder.build();
    transport.setProcessor(filterChainBuilder.build());
    transport.bind(getBindAddress());
    // 開啓服務器
    transport.start();
}
複製代碼

該方法是建立服務器,能夠看到操做跟GrizzlyClient中的差很少。

(五)GrizzlyTransporter

該類實現了Transporter接口,是基於Grizzly的傳輸層實現。

public class GrizzlyTransporter implements Transporter {

    public static final String NAME = "grizzly";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        // 返回GrizzlyServer實例
        return new GrizzlyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        // // 返回GrizzlyClient實例
        return new GrizzlyClient(url, listener);
    }

}
複製代碼

能夠看到,bind和connect方法分別就是建立了GrizzlyServer和GrizzlyClient實例。這裏我建議查看一下《dubbo源碼解析(九)遠程通訊——Transport層》。

(六)GrizzlyCodecAdapter

該類是Grizzly編解碼類,繼承了BaseFilter。

1.屬性和構造方法

/** * 編解碼器 */
private final Codec2 codec;

/** * url */
private final URL url;

/** * 通道處理器 */
private final ChannelHandler handler;

/** * 緩存大小 */
private final int bufferSize;

/** * 空緩存區 */
private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;

public GrizzlyCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
    this.codec = codec;
    this.url = url;
    this.handler = handler;
    int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
    // 若是緩存區大小在16k之內,則設置配置大小,若是不是,則設置8k的緩衝區大小
    this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
複製代碼

2.handleWrite

@Override
public NextAction handleWrite(FilterChainContext context) throws IOException {
    Connection<?> connection = context.getConnection();
    GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
    try {
        // 分配一個1024的動態緩衝區
        ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(1024); // Do not need to close

        // 得到消息
        Object msg = context.getMessage();
        // 編碼
        codec.encode(channel, channelBuffer, msg);

        // 檢測是否鏈接
        GrizzlyChannel.removeChannelIfDisconnected(connection);
        // 分配緩衝區
        Buffer buffer = connection.getTransport().getMemoryManager().allocate(channelBuffer.readableBytes());
        // 把channelBuffer的數據寫到buffer
        buffer.put(channelBuffer.toByteBuffer());
        buffer.flip();
        buffer.allowBufferDispose(true);
        // 設置到上下文
        context.setMessage(buffer);
    } finally {
        GrizzlyChannel.removeChannelIfDisconnected(connection);
    }
    return context.getInvokeAction();
}
複製代碼

該方法是寫數據,能夠發現編碼調用的是 codec.encode,其餘的我都在註釋裏寫明瞭,關鍵仍是對前面兩篇文章的一些內容須要理解。

3.handleRead

@Override
public NextAction handleRead(FilterChainContext context) throws IOException {
    Object message = context.getMessage();
    Connection<?> connection = context.getConnection();
    Channel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
    try {
        // 若是接收的是一個數據包
        if (message instanceof Buffer) { // receive a new packet
            Buffer grizzlyBuffer = (Buffer) message; // buffer

            ChannelBuffer frame;

            // 若是緩衝區可讀
            if (previousData.readable()) {
                // 若是該緩衝區是動態的緩衝區
                if (previousData instanceof DynamicChannelBuffer) {
                    // 寫入數據
                    previousData.writeBytes(grizzlyBuffer.toByteBuffer());
                    frame = previousData;
                } else {
                    // 得到須要的緩衝區大小
                    int size = previousData.readableBytes() + grizzlyBuffer.remaining();
                    // 新建一個動態緩衝區
                    frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
                    // 寫入previousData中的數據
                    frame.writeBytes(previousData, previousData.readableBytes());
                    // 寫入grizzlyBuffer中的數據
                    frame.writeBytes(grizzlyBuffer.toByteBuffer());
                }
            } else {
                // 不然是基於Java NIO的ByteBuffer生成的緩衝區
                frame = ChannelBuffers.wrappedBuffer(grizzlyBuffer.toByteBuffer());
            }

            Object msg;
            int savedReadIndex;

            do {
                savedReadIndex = frame.readerIndex();
                try {
                    // 解碼
                    msg = codec.decode(channel, frame);
                } catch (Exception e) {
                    previousData = ChannelBuffers.EMPTY_BUFFER;
                    throw new IOException(e.getMessage(), e);
                }
                // 拆包
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    frame.readerIndex(savedReadIndex);
                    // 結束調用鏈
                    return context.getStopAction();
                } else {
                    if (savedReadIndex == frame.readerIndex()) {
                        // 沒有可讀內容
                        previousData = ChannelBuffers.EMPTY_BUFFER;
                        throw new IOException("Decode without read data.");
                    }
                    if (msg != null) {
                        // 把解碼後信息放入上下文
                        context.setMessage(msg);
                        // 繼續下面的調用鏈
                        return context.getInvokeAction();
                    } else {
                        return context.getInvokeAction();
                    }
                }
            } while (frame.readable());
        } else { // Other events are passed down directly
            return context.getInvokeAction();
        }
    } finally {
        GrizzlyChannel.removeChannelIfDisconnected(connection);
    }
}
複製代碼

該方法是讀數據,直接調用了codec.decode進行解碼。

後記

該部分相關的源碼解析地址:github.com/CrazyHZM/in…

該文章講解了基於grizzly的來實現的遠程通訊、介紹dubbo-remoting-grizzly內的源碼解析,關鍵須要對grizzly有所瞭解。下一篇我會講解基於http實現遠程通訊部分。

相關文章
相關標籤/搜索