【Zookeeper】源碼分析之網絡通訊(三)之NettyServerCnxn

1、前言緩存

  前面已經學習了NIOServerCnxn,接着繼續學習NettyServerCnxn。服務器

2、NettyServerCnxn源碼分析session

  2.1 類的繼承關係 app

public class NettyServerCnxn extends ServerCnxn {}

  說明:NettyServerCnxn繼承了ServerCnxn抽象類,使用Netty框架來高效處理與客戶端之間的通訊。框架

  2.2 類的內部類ide

  1. SendBufferWriter類 函數

    private class SendBufferWriter extends Writer {
        private StringBuffer sb = new StringBuffer();
        
        /**
         * Check if we are ready to send another chunk.
         * @param force force sending, even if not a full chunk
         */
        // 是否準備好發送另外一塊
        private void checkFlush(boolean force) {
            if ((force && sb.length() > 0) || sb.length() > 2048) { // 當強制發送而且sb大小大於0,或者sb大小大於2048即發送緩存
                sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
                // clear our internal buffer
                sb.setLength(0);
            }
        }

        @Override
        public void close() throws IOException {
            if (sb == null) return;
            // 關閉以前須要強制性發送緩存
            checkFlush(true);
            sb = null; // clear out the ref to ensure no reuse
        }

        @Override
        public void flush() throws IOException {
            checkFlush(true);
        }

        @Override
        public void write(char[] cbuf, int off, int len) throws IOException {
            sb.append(cbuf, off, len);
            checkFlush(false);
        }
    }
SendBufferWriter

  說明:與NIOServerCnxn中相同,該類用來將給客戶端的響應進行分塊,再也不累贅。源碼分析

  2. ResumeMessageEvent類 學習

    static class ResumeMessageEvent implements MessageEvent {
        // 通道
        Channel channel;
        
        // 構造函數
        ResumeMessageEvent(Channel channel) {
            this.channel = channel;
        }
        @Override
        public Object getMessage() {return null;}
        @Override
        public SocketAddress getRemoteAddress() {return null;}
        @Override
        public Channel getChannel() {return channel;}
        @Override
        public ChannelFuture getFuture() {return null;}
    };
ResumeMessageEvent

  說明:ResumeMessageEvent繼承MessageEvent,其表示消息的傳輸或接收。this

  3. CommandThread類 

    private abstract class CommandThread /*extends Thread*/ {
        PrintWriter pw;
        
        CommandThread(PrintWriter pw) {
            this.pw = pw;
        }
        
        public void start() {
            run();
        }

        public void run() {
            try {
                commandRun();
            } catch (IOException ie) {
                LOG.error("Error in running command ", ie);
            } finally {
                cleanupWriterSocket(pw);
            }
        }
        
        public abstract void commandRun() throws IOException;
    }
CommandThread

  說明:其與NIOServerCnxn中相似,也是每一個子類對應着一個命令,值得注意的是針對每一個CMD命令,其僅僅使用一個線程來處理。

  2.3 類的屬性 

public class NettyServerCnxn extends ServerCnxn {
    // 日誌
    Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class);
    
    // 通道
    Channel channel;
    
    // 通道緩存
    ChannelBuffer queuedBuffer;
    
    // 節流與否
    volatile boolean throttled;
    
    // Byte緩衝區
    ByteBuffer bb;
    
    // 四個字節的緩衝區
    ByteBuffer bbLen = ByteBuffer.allocate(4);
    
    // 會話ID
    long sessionId;
    
    // 會話超時時間
    int sessionTimeout;
    
    // 計數
    AtomicLong outstandingCount = new AtomicLong();

    /** The ZooKeeperServer for this connection. May be null if the server
     * is not currently serving requests (for example if the server is not
     * an active quorum participant.
     */
    // Zookeeper服務器
    private volatile ZooKeeperServer zkServer;

    // NettyServerCnxn工廠
    NettyServerCnxnFactory factory;
    
    // 初始化與否
    boolean initialized;
    
    // 四個字節
    private static final byte[] fourBytes = new byte[4];
    
    private static final String ZK_NOT_SERVING =
        "This ZooKeeper instance is not currently serving requests";
}
類的屬性

  說明:NettyServerCnxn維護了與客戶端之間的通道緩衝、緩衝區及會話的相關屬性。

  2.4 類的構造函數 

    NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
        // 給屬性賦值
        this.channel = channel;
        this.zkServer = zks;
        this.factory = factory;
        if (this.factory.login != null) { // 須要登陸信息(用戶名和密碼登陸)
            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
        }
    }
構造函數

  說明:構造函數對NettyServerCnxn中的部分重要屬性進行了賦值,其中還涉及到是否須要用戶登陸。

  2.5 核心函數分析

  1. receiveMessage函數 

    public void receiveMessage(ChannelBuffer message) {
        try {
            while(message.readable() && !throttled) { // 當writerIndex > readerIndex,而且不節流時,知足條件
                if (bb != null) { // 不爲null
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable " + message.readableBytes()
                                + " bb len " + bb.remaining() + " " + bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace(Long.toHexString(sessionId)
                                + " bb 0x"
                                + ChannelBuffers.hexDump(
                                        ChannelBuffers.copiedBuffer(dat)));
                    }

                    if (bb.remaining() > message.readableBytes()) { // bb剩餘空間大於message中可讀字節大小
                        // 肯定新的limit
                        int newLimit = bb.position() + message.readableBytes();
                        bb.limit(newLimit);
                    }
                    // 將message寫入bb中
                    message.readBytes(bb);
                    // 重置bb的limit
                    bb.limit(bb.capacity());

                    if (LOG.isTraceEnabled()) {
                        LOG.trace("after readBytes message readable "
                                + message.readableBytes()
                                + " bb len " + bb.remaining() + " " + bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace("after readbytes "
                                + Long.toHexString(sessionId)
                                + " bb 0x"
                                + ChannelBuffers.hexDump(
                                        ChannelBuffers.copiedBuffer(dat)));
                    }
                    if (bb.remaining() == 0) { // 已經讀完message,表示內容已經所有接收
                        // 統計接收信息
                        packetReceived();
                        // 翻轉,可讀
                        bb.flip();

                        ZooKeeperServer zks = this.zkServer;
                        if (zks == null) { // Zookeeper服務器爲空
                            throw new IOException("ZK down");
                        }
                        if (initialized) { // 未被初始化
                            // 處理bb中包含的包信息
                            zks.processPacket(this, bb);

                            if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { // 是否已經節流
                                // 不接收數據
                                disableRecvNoWait();
                            }
                        } else { // 已經初始化
                            LOG.debug("got conn req request from "
                                    + getRemoteSocketAddress());
                            // 處理鏈接請求
                            zks.processConnectRequest(this, bb);
                            initialized = true;
                        }
                        bb = null;
                    }
                } else { // bb爲null
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable "
                                + message.readableBytes()
                                + " bblenrem " + bbLen.remaining());
                        // 複製bbLen緩衝
                        ByteBuffer dat = bbLen.duplicate();
                        // 翻轉
                        dat.flip();
                        LOG.trace(Long.toHexString(sessionId)
                                + " bbLen 0x"
                                + ChannelBuffers.hexDump(
                                        ChannelBuffers.copiedBuffer(dat)));
                    }

                    if (message.readableBytes() < bbLen.remaining()) { // bb剩餘空間大於message中可讀字節大小
                        // 重設bbLen的limit
                        bbLen.limit(bbLen.position() + message.readableBytes());
                    }
                    // 將message內容寫入bbLen中
                    message.readBytes(bbLen);
                    // 重置bbLen的limit
                    bbLen.limit(bbLen.capacity());
                    if (bbLen.remaining() == 0) { // 已經讀完message,表示內容已經所有接收
                        // 翻轉
                        bbLen.flip();

                        if (LOG.isTraceEnabled()) {
                            LOG.trace(Long.toHexString(sessionId)
                                    + " bbLen 0x"
                                    + ChannelBuffers.hexDump(
                                            ChannelBuffers.copiedBuffer(bbLen)));
                        }
                        // 讀取position後四個字節
                        int len = bbLen.getInt();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace(Long.toHexString(sessionId)
                                    + " bbLen len is " + len);
                        }
                        
                        // 清除緩存
                        bbLen.clear();
                        if (!initialized) { // 未被初始化
                            if (checkFourLetterWord(channel, message, len)) { // 是不是四個字母的命令
                                return;
                            }
                        }
                        if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                            throw new IOException("Len error " + len);
                        }
                        // 根據len從新分配緩衝,以便接收內容
                        bb = ByteBuffer.allocate(len);
                    }
                }
            }
        } catch(IOException e) {
            LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
            close();
        }
    }
receiveMessage

  說明:該函數用於接收ChannelBuffer中的數據,函數在while循環體中,當writerIndex大於readerIndex(表示ChannelBuffer中還有可讀內容)且throttled爲false時執行while循環體,該函數大體能夠分爲兩部分,首先是當bb不爲空時,表示已經準備好讀取ChannelBuffer中的內容,其流程以下

                if (bb != null) { // 不爲null,表示已經準備好讀取message
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable " + message.readableBytes()
                                + " bb len " + bb.remaining() + " " + bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace(Long.toHexString(sessionId)
                                + " bb 0x"
                                + ChannelBuffers.hexDump(
                                        ChannelBuffers.copiedBuffer(dat)));
                    }

                    if (bb.remaining() > message.readableBytes()) { // bb剩餘空間大於message中可讀字節大小
                        // 肯定新的limit
                        int newLimit = bb.position() + message.readableBytes();
                        bb.limit(newLimit);
                    }
                    // 將message寫入bb中
                    message.readBytes(bb);
                    // 重置bb的limit
                    bb.limit(bb.capacity());

                    if (LOG.isTraceEnabled()) {
                        LOG.trace("after readBytes message readable "
                                + message.readableBytes()
                                + " bb len " + bb.remaining() + " " + bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace("after readbytes "
                                + Long.toHexString(sessionId)
                                + " bb 0x"
                                + ChannelBuffers.hexDump(
                                        ChannelBuffers.copiedBuffer(dat)));
                    }
                    if (bb.remaining() == 0) { // 已經讀完message,表示內容已經所有接收
                        // 統計接收信息
                        packetReceived();
                        // 翻轉,可讀
                        bb.flip();

                        ZooKeeperServer zks = this.zkServer;
                        if (zks == null) { // Zookeeper服務器爲空
                            throw new IOException("ZK down");
                        }
                        if (initialized) { // 未被初始化
                            // 處理bb中包含的包信息
                            zks.processPacket(this, bb);

                            if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { // 是否已經節流
                                // 不接收數據
                                disableRecvNoWait();
                            }
                        } else { // 已經初始化
                            LOG.debug("got conn req request from "
                                    + getRemoteSocketAddress());
                            // 處理鏈接請求
                            zks.processConnectRequest(this, bb);
                            initialized = true;
                        }
                        bb = null;
                    }
                }

  其中主要的部分是判斷bb的剩餘空間是否大於message中的內容,簡單而言,就是判斷bb是否還有足夠空間存儲message內容,而後設置bb的limit,以後將message內容讀入bb緩衝中,以後再次肯定時候已經讀完message內容,統計接收信息,再根據是否已經初始化來處理包或者是鏈接請求,其中的請求內容都存儲在bb中。而當bb爲空時,其流程以下 

                else { // bb爲null
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable "
                                + message.readableBytes()
                                + " bblenrem " + bbLen.remaining());
                        // 複製bbLen緩衝
                        ByteBuffer dat = bbLen.duplicate();
                        // 翻轉
                        dat.flip();
                        LOG.trace(Long.toHexString(sessionId)
                                + " bbLen 0x"
                                + ChannelBuffers.hexDump(
                                        ChannelBuffers.copiedBuffer(dat)));
                    }

                    if (message.readableBytes() < bbLen.remaining()) { // bb剩餘空間大於message中可讀字節大小
                        // 重設bbLen的limit
                        bbLen.limit(bbLen.position() + message.readableBytes());
                    }
                    // 將message內容寫入bbLen中
                    message.readBytes(bbLen);
                    // 重置bbLen的limit
                    bbLen.limit(bbLen.capacity());
                    if (bbLen.remaining() == 0) { // 已經讀完message,表示內容已經所有接收
                        // 翻轉
                        bbLen.flip();

                        if (LOG.isTraceEnabled()) {
                            LOG.trace(Long.toHexString(sessionId)
                                    + " bbLen 0x"
                                    + ChannelBuffers.hexDump(
                                            ChannelBuffers.copiedBuffer(bbLen)));
                        }
                        // 讀取position後四個字節
                        int len = bbLen.getInt();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace(Long.toHexString(sessionId)
                                    + " bbLen len is " + len);
                        }
                        
                        // 清除緩存
                        bbLen.clear();
                        if (!initialized) { // 未被初始化
                            if (checkFourLetterWord(channel, message, len)) { // 是不是四個字母的命令
                                return;
                            }
                        }
                        if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                            throw new IOException("Len error " + len);
                        }
                        // 根據len從新分配緩衝,以便接收內容
                        bb = ByteBuffer.allocate(len);
                    }
                }

  當bb爲空時,表示尚未給bb分配足夠的內存空間來讀取message,首先仍是將message內容(後續內容的長度)讀入bbLen中,而後再肯定讀入的內容表明後續真正內容的長度len,而後再根據len來爲bb分配存儲空間,方便後續讀取真正的內容。

  2. sendResponse函數 

    public void sendResponse(ReplyHeader h, Record r, String tag)
            throws IOException {
        if (!channel.isOpen()) {
            return;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        // Make space for length
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        try {
            // 向baos中寫入四個字節(空)
            baos.write(fourBytes);
            // 寫入記錄
            bos.writeRecord(h, "header");
            if (r != null) { 
                // 寫入記錄
                bos.writeRecord(r, tag);
            }
            // 關閉
            baos.close();
        } catch (IOException e) {
            LOG.error("Error serializing response");
        }
        
        // 轉化爲Byte Array
        byte b[] = baos.toByteArray();
        // 將Byte Array封裝成ByteBuffer
        ByteBuffer bb = ByteBuffer.wrap(b);
        bb.putInt(b.length - 4).rewind();
        // 發送緩衝
        sendBuffer(bb);
        if (h.getXid() > 0) {
            // zks cannot be null otherwise we would not have gotten here!
            if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
                enableRecv();
            }
        }
    }

  說明:其首先會將header和record都寫入baos,以後再將baos轉化爲ByteBuffer,以後在調用sendBuffer來發送緩衝,而sendBuffer完成的操做是將ByteBuffer寫入ChannelBuffer中。

  3. process函數 

    public void process(WatchedEvent event) {
        // 建立響應頭
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();

        try {
            // 發送響應
            sendResponse(h, e, "notification");
        } catch (IOException e1) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
            }
            close();
        }
    }

  說明:首先建立ReplyHeader,而後再調用sendResponse來發送響應,最後調用close函數進行後續關閉處理。

3、總結

  本篇博文講解了基於Netty完成服務端與客戶端之間的通訊,其效率相對較高,也謝謝各位園友的觀看~ 

相關文章
相關標籤/搜索