Cobar源碼解析(一)

簡介

當業務的數據量和訪問量急劇增長的狀況下,咱們須要對數據進行水平拆分,從而下降單庫的壓力,而且數據的水平拆分須要對業務透明,屏蔽掉水平拆分的細節。而且,前端業務的高併發會致使後端的數據庫鏈接過多,從而DB的性能低下。前端

Cobar就是解決這些問題的一款分庫分表中間件,Cobar以proxy的形式位於前端應用和後端數據庫之間,Cobar對前端暴露的接口是MySQL通訊協議,其將前端傳輸過來的SQL語句按照sharding規則路由到後端的數據庫實例上,再合併多個實例返回的結果,從而模擬單庫下的數據庫行爲。react

Cobar的使用方法就很少介紹了,本文的主要內容是剖析Cobar的源代碼。sql

Cobar的前端鏈接模型

結構圖以下:數據庫

alt text

咱們先來看CobarServer的代碼:編程

private CobarServer() {
        this.config = new CobarConfig();
        SystemConfig system = config.getSystem();
        MySQLLexer.setCStyleCommentVersion(system.getParserCommentVersion());
        this.timer = new Timer(NAME + "Timer", true);
        this.initExecutor = ExecutorUtil.create("InitExecutor", system.getInitExecutor());
        this.timerExecutor = ExecutorUtil.create("TimerExecutor", system.getTimerExecutor());
        this.managerExecutor = ExecutorUtil.create("ManagerExecutor", system.getManagerExecutor());
        this.sqlRecorder = new SQLRecorder(system.getSqlRecordCount());
        this.isOnline = new AtomicBoolean(true);
        this.startupTime = TimeUtil.currentTimeMillis();
    }

上面是CobarServer的構造函數,它的限定是private的。segmentfault

private static final CobarServer INSTANCE = new CobarServer();

public static final CobarServer getInstance() {
        return INSTANCE;
    }

而CobarServer又有一個私有的靜態變量INSTANCE,以及獲取這個私有靜態變量的靜態方法,顯然,這是一個單例設計模式,使程序運行的時候全局只有一個CobarServer對象。後端

咱們再來看CobarServer的startup()方法,此方法中構造了一個NIOAcceptor(綁定服務器端口,接受客戶端的鏈接),設計模式

server = new NIOAcceptor(NAME + "Server", system.getServerPort(), sf);

構造了一個接收前端鏈接的非阻塞Acceptor,讓咱們在來看NIOAcceptor類的代碼。數組

public final class NIOAcceptor extends Thread {
    private static final Logger LOGGER = Logger.getLogger(NIOAcceptor.class);
    private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator();

    private final int port;
    private final Selector selector;
    private final ServerSocketChannel serverChannel;
    private final FrontendConnectionFactory factory;
    private NIOProcessor[] processors;
    private int nextProcessor;
    private long acceptCount;
     
     public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException {
        super.setName(name);
        this.port = port;
        this.selector = Selector.open();    # 生成選擇器
        this.serverChannel = ServerSocketChannel.open();
        this.serverChannel.socket().bind(new InetSocketAddress(port));    # 綁定服務器端口
        this.serverChannel.configureBlocking(false);    # 設置非阻塞模式
        this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);    # 監聽ACCEPT事件,
        this.factory = factory;    # 設置前端鏈接的工廠
    }
}

以上的代碼都是NIO編程中很常見的操做。下面咱們看run()方法,服務器

@Override
    public void run() {
        final Selector selector = this.selector;
        for (;;) {
            ++acceptCount;
            try {
                selector.select(1000L);    # select操做是阻塞的,若沒有監聽到相應的事件,則一直阻塞,直到超過1000毫秒,則返回
                Set<SelectionKey> keys = selector.selectedKeys();
                try {
                    for (SelectionKey key : keys) {
                        if (key.isValid() && key.isAcceptable()) {
                            accept();        # 接受鏈接,這個方法很關鍵
                        } else {
                            key.cancel();
                        }
                    }
                } finally {
                    keys.clear();
                }
            } catch (Throwable e) {
                LOGGER.warn(getName(), e);
            }
        }
    }

以上的run方法也是常見的NIO中監聽事件的套路,其中accept()方法是定義的私有函數,accept方法是爲了將channel與selector綁定,代碼以下,

private void accept() {
        SocketChannel channel = null;
        try {
            channel = serverChannel.accept();    # 爲新的鏈接分配socket
            channel.configureBlocking(false);    # 設置爲非阻塞模式
            # factory將channel進行封裝,進行相應的設置,返回一個FrontendConnection,connection本質上就是一個封裝好的channel
            FrontendConnection c = factory.make(channel);
            c.setAccepted(true);
            c.setId(ID_GENERATOR.getId());    # 爲鏈接設置ID
            NIOProcessor processor = nextProcessor();    # 爲鏈接分配processor,NIOAcceptor中包含了一個NIOProcessor數組,分配的策略即根據下標不斷後移,到達數組末尾後又從數組的起始位置開始分配
            c.setProcessor(processor);
            # 回調NIOProcessor的postRegister方法,而processor的postRegister調用的是NIOReactor類的postRegister方法
            processor.postRegister(c);    
        } catch (Throwable e) {
            closeChannel(channel);
            LOGGER.warn(getName(), e);
        }
    }

讓我來看NIOProcessor的postRegister方法,

public void postRegister(NIOConnection c) {
        reactor.postRegister(c);
}

NIOProcessor類中定義了一個NIOReactor類的成員變量reactor,而postRegister調用的是NIOReactor的postRegister方法。下面讓咱們來看NIOReactor的postRegister代碼,

final void postRegister(NIOConnection c) {
        # 只是先將前端鏈接插入R線程的阻塞隊列中,並無馬上將channel與selector進行綁定
        reactorR.registerQueue.offer(c);
        # 喚醒R線程的selector,若以前的select操做沒有返回的話則當即返回
        reactorR.selector.wakeup();
}

既然channel與selector沒有馬上進行綁定,那它們是何時綁定的呢?咱們來看NIOReactor中內部類R的run()方法,

@Override
        public void run() {
            final Selector selector = this.selector;
            for (;;) {
                ++reactCount;
                try {
                    selector.select(1000L);
                    # 將connection與selector進行綁定
                    register(selector);   
                    Set<SelectionKey> keys = selector.selectedKeys();
                    try {
                        for (SelectionKey key : keys) {
                            Object att = key.attachment();
                            if (att != null && key.isValid()) {
                                int readyOps = key.readyOps();
                                if ((readyOps & SelectionKey.OP_READ) != 0) {
                                    read((NIOConnection) att);
                                } else if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                                    write((NIOConnection) att);
                                } else {
                                    key.cancel();
                                }
                            } else {
                                key.cancel();
                            }
                        }
                    } finally {
                        keys.clear();
                    }
                } catch (Throwable e) {
                    LOGGER.warn(name, e);
                }
            }
        }

在run方法中,當select方法返回的時候,就會進行channel和selector的綁定,由於當connection插入到阻塞隊列中的時候,會對selector進行wakeup(),即select(1000L)方法會當即返回,因此沒必要擔憂channel會卡一秒鐘纔會和selector進行綁定。

咱們再來看R線程的register方法,

private void register(Selector selector) {
            NIOConnection c = null;
            # 將R線程阻塞隊列中的全部鏈接都輪詢取出,與selector進行綁定
            while ((c = registerQueue.poll()) != null) {
                try {
                    c.register(selector);
                } catch (Throwable e) {
                    c.error(ErrorCode.ERR_REGISTER, e);
                }
            }
        }

總結

關於NIOAcceptor爲什麼先將connection放入Reactor的阻塞隊列,而不是直接綁定。筆者的觀點是,若是由NIOAcceptor負責綁定則會形成鎖競爭,selector的register方法會爭用鎖,會致使NIOAcceptor線程和R、W線程競爭selector的鎖,若acceptor中處理綁定connection的邏輯,則NIOAcceptor就不能快速地處理大量的鏈接,整個系統的吞吐就會下降。因此Cobar中的設計是將connection的綁定放到R線程的阻塞隊列中去,讓R線程來完成connection的綁定工做。

圖就隨意看看吧-.-,有點醜。

以上。


原文連接

https://segmentfault.com/a/11...

相關文章
相關標籤/搜索