數據庫路由中間件MyCat - 源代碼篇(6)
3. 鏈接模塊
3.3 AbstractConnection:
3.3.2 NIOHandler
NIOHandler實際上就是對於業務處理方法的封裝,對於不一樣的鏈接有不一樣的處理方法,也就是不一樣的NIOHandlerjava
public interface NIOHandler { void handle(byte[] data); }
它的實現以及子類會在以後的對應的處理模塊細講。數據庫
3.3.3 NIOSocketWR
實現對於AbstractConnection(實際就是對裏面封裝的channel)進行異步讀寫,將從channel中讀取到的放到AbstractConnection的readBuffer中,將writeBuffer和寫隊列中的數據寫入到channel中。能夠這麼說,AbstractConnection的方法只對它裏面的buffer進行操做,而buffer與channel之間的交互,是經過NIOSocketWR的方法完成的。
下面是它的方法以及對應的說明:緩存
public void register(Selector selector) throws IOException { try { processKey = channel.register(selector, SelectionKey.OP_READ, con); } finally { if (con.isClosed.get()) { clearSelectionKey(); } } } private void clearSelectionKey() { try { SelectionKey key = this.processKey; if (key != null && key.isValid()) { key.attach(null); key.cancel(); } } catch (Exception e) { AbstractConnection.LOGGER.warn("clear selector keys err:" + e); } }
調用關係:
這個方法就是以前講的AbstractionConnection與RW線程綁定,AbstractionConnection封裝的channel須要在RW線程的selector上註冊讀事件以監聽讀事件。markdown
public void doNextWriteCheck() { //檢查是否正在寫,看CAS更新writing值是否成功 if (!writing.compareAndSet(false, true)) { return; } try { //利用緩存隊列和寫緩衝記錄保證寫的可靠性,返回true則爲所有寫入成功 boolean noMoreData = write0(); //由於只有一個線程能夠成功CAS更新writing值,因此這裏不用再CAS writing.set(false); //若是所有寫入成功並且寫入隊列爲空(有可能在寫入過程當中又有新的Bytebuffer加入到隊列),則取消註冊寫事件 //不然,繼續註冊寫事件 if (noMoreData && con.writeQueue.isEmpty()) { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) { disableWrite(); } } else { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) { enableWrite(false); } } } catch (IOException e) { if (AbstractConnection.LOGGER.isDebugEnabled()) { AbstractConnection.LOGGER.debug("caught err:", e); } con.close("err:" + e); } } private boolean write0() throws IOException { int written = 0; ByteBuffer buffer = con.writeBuffer; if (buffer != null) { //只要寫緩衝記錄中還有數據就不停寫入,但若是寫入字節爲0,證實網絡繁忙,則退出 while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //若是寫緩衝中還有數據證實網絡繁忙,計數並退出,不然清空緩衝 if (buffer.hasRemaining()) { con.writeAttempts++; return false; } else { con.writeBuffer = null; con.recycle(buffer); } } //讀取緩存隊列並寫channel while ((buffer = con.writeQueue.poll()) != null) { if (buffer.limit() == 0) { con.recycle(buffer); con.close("quit send"); return true; } buffer.flip(); while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.lastWriteTime = TimeUtil.currentTimeMillis(); con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //若是寫緩衝中還有數據證實網絡繁忙,計數,記錄下此次未寫完的數據到寫緩衝記錄並退出,不然回收緩衝 if (buffer.hasRemaining()) { con.writeBuffer = buffer; con.writeAttempts++; return false; } else { con.recycle(buffer); } } return true; } private void disableWrite() { try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() & OP_NOT_WRITE); } catch (Exception e) { AbstractConnection.LOGGER.warn("can't disable write " + e + " con " + con); } } private void enableWrite(boolean wakeup) { boolean needWakeup = false; try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); needWakeup = true; } catch (Exception e) { AbstractConnection.LOGGER.warn("can't enable write " + e); } if (needWakeup && wakeup) { processKey.selector().wakeup(); } }
這個doNextWriteCheck方法以前也講過,看調用關係:
第一個調用關係沒意義,WriteEventCheckRunner這個類從沒被調用過。
第二個調用很。。。就是將這個方法簡單封裝,估計是爲了好修改,以後會提兩種寫策略對比。
第三個調用是主要調用,全部往AbstractionConnection中寫入都會調用Abstraction.write(ByteBuffer),這個方法先把要寫的放入緩存隊列,以後調用上面這個doNextWriteCheck方法。
第四個和第五個都是定時檢查任務,爲了檢查是否有AbstractionConnection的寫緩存沒有寫完的狀況網絡
@Override public void asynRead() throws IOException { ByteBuffer theBuffer = con.readBuffer; //若是buffer爲空,證實被回收或者是第一次讀,新分配一個buffer給AbstractConnection做爲readBuffer if (theBuffer == null) { theBuffer = con.processor.getBufferPool().allocate(); con.readBuffer = theBuffer; } //從channel中讀取數據,而且保存到對應AbstractConnection的readBuffer中,readBuffer處於write mode,返回讀取了多少字節 int got = channel.read(theBuffer); //調用處理讀取到的數據的方法 con.onReadData(got); }
這個方法以前也講過,異步將channel中的數據讀取到readBuffer中,以後調用對應AbstractConnection的處理方法。
調用關係:
按理說,應該只有在RW線程檢測到讀事件以後,纔會調用這個異步讀方法。可是在FrontendConnection的register()方法和BackendAIOConnection的register()方法都調用了。這是由於這兩個方法在正常工做狀況下爲了註冊一個會先主動發一個握手包,另外一個會先讀取一個握手包。因此都會執行異步讀方法。 異步