此文已由做者張鎬薪受權網易雲社區發佈。
前端
歡迎訪問網易雲社區,瞭解更多網易技術產品運營經驗。react
Title:MySql鏈接創建以及認證過程client->MySql:1.TCP鏈接請求 MySql->client:2.接受TCP鏈接client->MySql:3.TCP鏈接創建MySql->client:4.握手包HandshakePacketclient->MySql:5.認證包AuthPacketMySql->client:6.若是驗證成功,則返回OkPacketclient->MySql:7.默認會發送查詢版本信息的包MySql->client:8.返回結果包
NIOReactor其實就是一個網絡事件反應轉發器。 不少地方會用到NIOReactor,這裏先講FrontendConnection和NIOReactor綁定這一部分。上一節說到,NIOAcceptor的accept()最後將FrontendConnection交給了NIOReactor池其中的一個NIOReactor。調用的是 postRegister(AbstractConnection c)方法。sql
final void postRegister(AbstractConnection c) { reactorR.registerQueue.offer(c); reactorR.selector.wakeup(); }
postRegister將剛纔傳入的FrontendConnection放入RW線程的註冊隊列。以後,喚醒RW線程的selector。 爲何放入RW線程的註冊隊列,而不是直接註冊呢?若是是直接註冊,那麼就是NIOAcceptor這個線程負責註冊,這裏就會有鎖競爭,由於NIOAcceptor這個線程和每一個RW線程會去競爭selector的鎖。這樣NIOAcceptor就不能高效的處理鏈接。因此,更好的方式是將FrontendConnection放入RW線程的註冊隊列,以後讓RW線程本身完成註冊工做。 RW線程的源代碼:後端
private final class RW implements Runnable { private final Selector selector; private final ConcurrentLinkedQueue<AbstractConnection> registerQueue; private long reactCount; private RW() throws IOException { this.selector = Selector.open(); this.registerQueue = new ConcurrentLinkedQueue<AbstractConnection>(); } @Override public void run() { final Selector selector = this.selector; Set<SelectionKey> keys = null; for (;;) { ++reactCount; try { selector.select(500L); //從註冊隊列中取出AbstractConnection以後註冊讀事件 //以後作一些列操做,請參考下面註釋 register(selector); keys = selector.selectedKeys(); for (SelectionKey key : keys) { AbstractConnection con = null; try { Object att = key.attachment(); if (att != null) { con = (AbstractConnection) att; if (key.isValid() && key.isReadable()) { try { //異步讀取數據並處理數據 con.asynRead(); } catch (IOException e) { con.close("program err:" + e.toString()); continue; } catch (Exception e) { LOGGER.debug("caught err:", e); con.close("program err:" + e.toString()); continue; } } if (key.isValid() && key.isWritable()) { //異步寫數據 con.doNextWriteCheck(); } } else { key.cancel(); } } catch (CancelledKeyException e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(con + " socket key canceled"); } } catch (Exception e) { LOGGER.warn(con + " " + e); } } } catch (Exception e) { LOGGER.warn(name, e); } finally { if (keys != null) { keys.clear(); } } } } private void register(Selector selector) { AbstractConnection c = null; if (registerQueue.isEmpty()) { return; } while ((c = registerQueue.poll()) != null) { try { //註冊讀事件 ((NIOSocketWR) c.getSocketWR()).register(selector); //鏈接註冊,對於FrontendConnection是發送HandshakePacket並異步讀取響應 //響應爲AuthPacket,讀取其中的信息,驗證用戶名密碼等信息,若是符合條件 //則發送OkPacket c.register(); } catch (Exception e) { c.close("register err" + e.toString()); } } } }
由於NIOAcceptor線程和RW線程這兩個都會操做RW線程的註冊隊列,因此要用ConcurrentLinkedQueue RW線程不斷檢查selector中須要響應的事件,並若是註冊隊列不爲空,就不斷註冊其中的AbstractConnection,在這裏就是FrontendConnection。 以後執行FrontendConnection的register()方法:緩存
@Override public void register() throws IOException { if (!isClosed.get()) { // 生成認證數據 byte[] rand1 = RandomUtil.randomBytes(8); byte[] rand2 = RandomUtil.randomBytes(12); // 保存認證數據 byte[] seed = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, seed, 0, rand1.length); System.arraycopy(rand2, 0, seed, rand1.length, rand2.length); this.seed = seed; // 發送握手數據包 HandshakePacket hs = new HandshakePacket(); hs.packetId = 0; hs.protocolVersion = Versions.PROTOCOL_VERSION; hs.serverVersion = Versions.SERVER_VERSION; hs.threadId = id; hs.seed = rand1; hs.serverCapabilities = getServerCapabilities(); hs.serverCharsetIndex = (byte) (charsetIndex & 0xff); hs.serverStatus = 2; hs.restOfScrambleBuff = rand2; // 異步寫,本節就講到這裏 hs.write(this); // 異步讀取並處理,這個與RW線程中的asynRead()相同,以後客戶端收到握手包返回AuthPacket(就是下一節)就是從這裏開始看。 this.asynRead(); } }
這個方法就是生成HandshakePacket併發送出去,以後異步讀取響應。 以前的示例中MySql的HandshakePacket結構: 能夠總結出: HandshakePacket:安全
packet length(3 bytes)網絡
packet number (1)架構
protocol version (1)併發
version (null terminated string)框架
thread id (4)
salt (8)
server capabilities (2)
server charset (1)
server status (2)
unused (13)
salt (12)
0x00 --- 結束
這裏咱們看下MyCat中的實現這一部分MySql協議棧的packet類結構: 這裏能夠看出,每一個包都實現了本身的包長度和信息方法,而且針對前段後端鏈接都有讀寫方法實現,因此,以後讀寫數據都會根據場景不一樣調用這些類中的方法。這些包就是整個MySql協議棧除邏輯外的內容實現。 HandshakePacket.write(FrontendConnection c)方法將上面傳入的數據封裝成ByteBuffer,並傳入給FrontendConnection c的write(ByteBuffer buffer),這個方法直接繼承自AbstractConnection:
public final void write(ByteBuffer buffer) { //首先判斷是否爲壓縮協議 if(isSupportCompress()) { //CompressUtil爲壓縮協議輔助工具類 ByteBuffer newBuffer= CompressUtil.compressMysqlPacket(buffer,this,compressUnfinishedDataQueue); //將要寫的數據先放入寫緩存隊列 writeQueue.offer(newBuffer); } else { //將要寫的數據先放入寫緩存隊列 writeQueue.offer(buffer); } try { //處理寫事件,這個方法比較複雜,須要重點分析其思路 this.socketWR.doNextWriteCheck(); } catch (Exception e) { LOGGER.warn("write err:", e); this.close("write err:" + e); } }
如代碼註釋中所述,先將要寫的數據放入寫緩衝隊列,以後調用NIOSocketWR.doNextWriteCheck()處理寫事件。
public void doNextWriteCheck() { //檢查是否正在寫,看CAS更新writing值是否成功 if (!writing.compareAndSet(false, true)) { return; } try { //利用緩存隊列和寫緩衝記錄保證寫的可靠性,返回true則爲所有寫入成功 boolean noMoreData = write0(); //由於只有一個線程能夠成功CAS更新writing值,因此這裏不用再CAS writing.set(false); //若是所有寫入成功並且寫入隊列爲空(有可能在寫入過程當中又有新的Bytebuffer加入到隊列),則取消註冊寫事件 //不然,繼續註冊寫事件 if (noMoreData && con.writeQueue.isEmpty()) { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) { disableWrite(); } } else { if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) { enableWrite(false); } } } catch (IOException e) { if (AbstractConnection.LOGGER.isDebugEnabled()) { AbstractConnection.LOGGER.debug("caught err:", e); } con.close("err:" + e); } } private boolean write0() throws IOException { int written = 0; ByteBuffer buffer = con.writeBuffer; if (buffer != null) { //只要寫緩衝記錄中還有數據就不停寫入,但若是寫入字節爲0,證實網絡繁忙,則退出 while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //若是寫緩衝中還有數據證實網絡繁忙,計數並退出,不然清空緩衝 if (buffer.hasRemaining()) { con.writeAttempts++; return false; } else { con.writeBuffer = null; con.recycle(buffer); } } //讀取緩存隊列並寫channel while ((buffer = con.writeQueue.poll()) != null) { if (buffer.limit() == 0) { con.recycle(buffer); con.close("quit send"); return true; } buffer.flip(); while (buffer.hasRemaining()) { written = channel.write(buffer); if (written > 0) { con.lastWriteTime = TimeUtil.currentTimeMillis(); con.netOutBytes += written; con.processor.addNetOutBytes(written); con.lastWriteTime = TimeUtil.currentTimeMillis(); } else { break; } } //若是寫緩衝中還有數據證實網絡繁忙,計數,記錄下此次未寫完的數據到寫緩衝記錄並退出,不然回收緩衝 if (buffer.hasRemaining()) { con.writeBuffer = buffer; con.writeAttempts++; return false; } else { con.recycle(buffer); } } return true; } private void disableWrite() { try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() & OP_NOT_WRITE); } catch (Exception e) { AbstractConnection.LOGGER.warn("can't disable write " + e + " con " + con); } } private void enableWrite(boolean wakeup) { boolean needWakeup = false; try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); needWakeup = true; } catch (Exception e) { AbstractConnection.LOGGER.warn("can't enable write " + e); } if (needWakeup && wakeup) { processKey.selector().wakeup(); } }
註釋已經很詳細,如此執行完,便成功將握手包發送給了客戶端。 在這裏稍微吐槽下,因爲MyCat在網絡通訊上同時作了AIO和NIO,可是在設計上AbstractionConnection和這些並無關係。可是又涉及到緩存隊列,因此設計上出現了一些以下的類模式: 這樣應該是不推薦這麼設計的,目前我還沒想好如何去改善
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】 一行代碼搞定Dubbo接口調用
【推薦】 網易美學-系統架構系列1-分佈式與服務化
【推薦】 Vue框架核心之數據劫持