當業務的數據量和訪問量急劇增長的狀況下,咱們須要對數據進行水平拆分,從而下降單庫的壓力,而且數據的水平拆分須要對業務透明,屏蔽掉水平拆分的細節。而且,前端業務的高併發會致使後端的數據庫鏈接過多,從而DB的性能低下。前端
Cobar就是解決這些問題的一款分庫分表中間件,Cobar以proxy的形式位於前端應用和後端數據庫之間,Cobar對前端暴露的接口是MySQL通訊協議,其將前端傳輸過來的SQL語句按照sharding規則路由到後端的數據庫實例上,再合併多個實例返回的結果,從而模擬單庫下的數據庫行爲。react
Cobar的使用方法就很少介紹了,本文的主要內容是剖析Cobar的源代碼。sql
結構圖以下:數據庫
咱們先來看CobarServer的代碼:編程
private CobarServer() { this.config = new CobarConfig(); SystemConfig system = config.getSystem(); MySQLLexer.setCStyleCommentVersion(system.getParserCommentVersion()); this.timer = new Timer(NAME + "Timer", true); this.initExecutor = ExecutorUtil.create("InitExecutor", system.getInitExecutor()); this.timerExecutor = ExecutorUtil.create("TimerExecutor", system.getTimerExecutor()); this.managerExecutor = ExecutorUtil.create("ManagerExecutor", system.getManagerExecutor()); this.sqlRecorder = new SQLRecorder(system.getSqlRecordCount()); this.isOnline = new AtomicBoolean(true); this.startupTime = TimeUtil.currentTimeMillis(); }
上面是CobarServer的構造函數,它的限定是private的。segmentfault
private static final CobarServer INSTANCE = new CobarServer(); public static final CobarServer getInstance() { return INSTANCE; }
而CobarServer又有一個私有的靜態變量INSTANCE
,以及獲取這個私有靜態變量的靜態方法,顯然,這是一個單例設計模式,使程序運行的時候全局只有一個CobarServer對象。後端
咱們再來看CobarServer的startup()方法,此方法中構造了一個NIOAcceptor(綁定服務器端口,接受客戶端的鏈接),設計模式
server = new NIOAcceptor(NAME + "Server", system.getServerPort(), sf);
構造了一個接收前端鏈接的非阻塞Acceptor,讓咱們在來看NIOAcceptor類的代碼。數組
public final class NIOAcceptor extends Thread { private static final Logger LOGGER = Logger.getLogger(NIOAcceptor.class); private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator(); private final int port; private final Selector selector; private final ServerSocketChannel serverChannel; private final FrontendConnectionFactory factory; private NIOProcessor[] processors; private int nextProcessor; private long acceptCount; public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException { super.setName(name); this.port = port; this.selector = Selector.open(); # 生成選擇器 this.serverChannel = ServerSocketChannel.open(); this.serverChannel.socket().bind(new InetSocketAddress(port)); # 綁定服務器端口 this.serverChannel.configureBlocking(false); # 設置非阻塞模式 this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); # 監聽ACCEPT事件, this.factory = factory; # 設置前端鏈接的工廠 } }
以上的代碼都是NIO編程中很常見的操做。下面咱們看run()方法,服務器
@Override public void run() { final Selector selector = this.selector; for (;;) { ++acceptCount; try { selector.select(1000L); # select操做是阻塞的,若沒有監聽到相應的事件,則一直阻塞,直到超過1000毫秒,則返回 Set<SelectionKey> keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { accept(); # 接受鏈接,這個方法很關鍵 } else { key.cancel(); } } } finally { keys.clear(); } } catch (Throwable e) { LOGGER.warn(getName(), e); } } }
以上的run方法也是常見的NIO中監聽事件的套路,其中accept()方法是定義的私有函數,accept方法是爲了將channel與selector綁定,代碼以下,
private void accept() { SocketChannel channel = null; try { channel = serverChannel.accept(); # 爲新的鏈接分配socket channel.configureBlocking(false); # 設置爲非阻塞模式 # factory將channel進行封裝,進行相應的設置,返回一個FrontendConnection,connection本質上就是一個封裝好的channel FrontendConnection c = factory.make(channel); c.setAccepted(true); c.setId(ID_GENERATOR.getId()); # 爲鏈接設置ID NIOProcessor processor = nextProcessor(); # 爲鏈接分配processor,NIOAcceptor中包含了一個NIOProcessor數組,分配的策略即根據下標不斷後移,到達數組末尾後又從數組的起始位置開始分配 c.setProcessor(processor); # 回調NIOProcessor的postRegister方法,而processor的postRegister調用的是NIOReactor類的postRegister方法 processor.postRegister(c); } catch (Throwable e) { closeChannel(channel); LOGGER.warn(getName(), e); } }
讓我來看NIOProcessor的postRegister方法,
public void postRegister(NIOConnection c) { reactor.postRegister(c); }
NIOProcessor類中定義了一個NIOReactor類的成員變量reactor,而postRegister調用的是NIOReactor的postRegister方法。下面讓咱們來看NIOReactor的postRegister代碼,
final void postRegister(NIOConnection c) { # 只是先將前端鏈接插入R線程的阻塞隊列中,並無馬上將channel與selector進行綁定 reactorR.registerQueue.offer(c); # 喚醒R線程的selector,若以前的select操做沒有返回的話則當即返回 reactorR.selector.wakeup(); }
既然channel與selector沒有馬上進行綁定,那它們是何時綁定的呢?咱們來看NIOReactor中內部類R的run()方法,
@Override public void run() { final Selector selector = this.selector; for (;;) { ++reactCount; try { selector.select(1000L); # 將connection與selector進行綁定 register(selector); Set<SelectionKey> keys = selector.selectedKeys(); try { for (SelectionKey key : keys) { Object att = key.attachment(); if (att != null && key.isValid()) { int readyOps = key.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0) { read((NIOConnection) att); } else if ((readyOps & SelectionKey.OP_WRITE) != 0) { write((NIOConnection) att); } else { key.cancel(); } } else { key.cancel(); } } } finally { keys.clear(); } } catch (Throwable e) { LOGGER.warn(name, e); } } }
在run方法中,當select方法返回的時候,就會進行channel和selector的綁定,由於當connection插入到阻塞隊列中的時候,會對selector進行wakeup(),即select(1000L)方法會當即返回,因此沒必要擔憂channel會卡一秒鐘纔會和selector進行綁定。
咱們再來看R線程的register方法,
private void register(Selector selector) { NIOConnection c = null; # 將R線程阻塞隊列中的全部鏈接都輪詢取出,與selector進行綁定 while ((c = registerQueue.poll()) != null) { try { c.register(selector); } catch (Throwable e) { c.error(ErrorCode.ERR_REGISTER, e); } } }
關於NIOAcceptor爲什麼先將connection放入Reactor的阻塞隊列,而不是直接綁定。筆者的觀點是,若是由NIOAcceptor負責綁定則會形成鎖競爭,selector的register方法會爭用鎖,會致使NIOAcceptor線程和R、W線程競爭selector的鎖,若acceptor中處理綁定connection的邏輯,則NIOAcceptor就不能快速地處理大量的鏈接,整個系統的吞吐就會下降。因此Cobar中的設計是將connection的綁定放到R線程的阻塞隊列中去,讓R線程來完成connection的綁定工做。
圖就隨意看看吧-.-,有點醜。
以上。