咱們上次講到zbus網絡通信的核心API:java
Dispatcher -- 負責-NIO網絡事件Selector引擎的管理,對Selector引擎負載均衡git
IoAdaptor -- 網絡事件的處理,服務器與客戶端共用,負責讀寫,消息分包組包等服務器
Session -- 表明網絡連接,能夠讀寫消息網絡
實際的應用,咱們幾乎只須要作IoAdaptor的個性化實現就能完成高效的網絡通信服務,今天咱們將舉例說明如何個性化這個IoAdaptor。負載均衡
咱們今天要完成的目標是:實現MySQL服務器的透明代理。效果是,你訪問代理服務器跟訪問目標MySQL無差別。async
咱們在測試環境10.17.2.30:3306 這臺機器上提供了MySql,在咱們本地機器上跑起來咱們今天基於zbus.NET實現的一個代理程序,就能達到下面的效果。ide
完成大概不到100 行的代碼, Cool?Let’s roll! 工具
首先,咱們思考透明TCP代理到底在幹啥,透明的TCP代理的業務邏輯其實很是簡單,能夠描述爲,未來自代理上游(發起請求到代理)的數據轉發到目標TCP服務器,把目標服務器回來的數據原路返回代理上游客戶端。 注意這個原路,如何作到原路返回成爲關鍵點。這個示例其實跟MySQL沒有任何關係,原則上任何TCP層面的服務都應該適配。測試
基於zbus.NET怎麼來將上面的邏輯在體現出來,也就是如何個性化IoAdaptor?直觀的講,咱們要處理的幾個事件應該包括:1)從上游客戶端發起的連接請求--代理服務器的Accept事件,2)代理服務器鏈接目標服務器的Connect事件,3)上下游的數據事件onMessage。this
zbus.NET的IoAdaptor提供的個性化事件以下
基本包括一個連接(客戶端或者服務端)的生命週期,與消息的編解碼。
咱們的代理IoAdaptor就是逐一個性化處理。
第一步,編解碼: 透明代理對消息內容不作理解,因此不須要編解碼。
// 透傳不須要編解碼,簡單返回ByteBuffer數據 public IoBuffer encode(Object msg) { if (msg instanceof IoBuffer) { IoBuffer buff = (IoBuffer) msg; return buff; } else { throw new RuntimeException("Message Not Support"); } } // 透傳不須要編解碼,簡單返回ByteBuffer數據 public Object decode(IoBuffer buff) { if (buff.remaining() > 0) { byte[] data = new byte[buff.remaining()]; buff.readBytes(data); return IoBuffer.wrap(data); } else { return null; } }
第二步,代理服務接入:
@Override protected void onSessionAccepted(Session sess) throws IOException { Session target = null; Dispatcher dispatcher = sess.getDispatcher(); try { target = dispatcher.createClientSession(targetAddress, this); } catch (Exception e) { sess.asyncClose(); return; } sess.chain = target; target.chain = sess; dispatcher.registerSession(SelectionKey.OP_CONNECT, target); }
這裏的邏輯思路是,代理服務器每接受到一個請求--經過onSessionAccepted表達,咱們將同時建立一個到目標服務器的連接,今天的例子是目標MySQL服務器,注意上面的處理中把建立目標服務器Session過程與真正連接到目標服務分開(Dispatcher也提供合併兩者的工具方法),是爲了能在沒有發生連接以前綁定上好上下游關係,經過Session的chain變量來表達,也就是當前Session的關聯Session,關聯好以後啓動感興趣Connect事件,邏輯處理完畢。
第三步,連接成功事件(第二步中須要連接到目標服務器)
@Override public void onSessionConnected(Session sess) throws IOException { Session chain = sess.chain; if(chain == null){ sess.asyncClose(); return; } if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ); chain.register(SelectionKey.OP_READ); } }
這裏的一個核心是當上下游都處於連接正常態,上下游Session都啓動感興趣消息讀事件(寫事件是在讀取處理中自動觸發),爲何在這裏作的緣由是必定要等上下游都正常態後才啓動雙方消息處理,否則會出現字節丟失。
第四步,處理上下游數據事件
@Override protected void onMessage(Object msg, Session sess) throws IOException { Session chain = sess.chain; if(chain == null){ sess.asyncClose(); return; } chain.write(msg); }
是否是很是簡單,相似pipeline,從一端的數據寫到另一端。
原則上面4步結束,整個透明代理就完成了,可是爲了處理連接異常清理,咱們增長了Session清理處理,以下
@Override public void onSessionToDestroy(Session sess) throws IOException { try { sess.close(); } catch (IOException e) { //ignore } if (sess.chain == null) return; try { sess.chain.close(); sess.chain.chain = null; sess.chain = null; } catch (IOException e) { } }
工做就是解決上下游連接清理連接。
至此爲止咱們的IoAdaptor個性化就完成了,是否是很是簡單,如今咱們要跑起來測試了,下面的代碼就是上一次講到重複的設置,沒有新意。
public static void main(String[] args) throws Exception { Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.start(); }
騷年,包括渣渣import和少量註釋加起來折騰了不到100行,該跑一跑了,仍是那句話,不是HelloWorld,你能夠規模壓力測。看看你是否在本地代理出來了你的目標服務MySQL,gl,hf, gogogo.
完整代碼可運行代碼以下,也可直接到zbus示例代碼庫中找到
package org.zbus.net; import java.io.IOException; import java.nio.channels.SelectionKey; import org.zbus.net.core.Dispatcher; import org.zbus.net.core.IoAdaptor; import org.zbus.net.core.IoBuffer; import org.zbus.net.core.Session; public class TcpProxyAdaptor extends IoAdaptor { private String targetAddress; public TcpProxyAdaptor(String targetAddress) { this.targetAddress = targetAddress; } // 透傳不須要編解碼,簡單返回ByteBuffer數據 public IoBuffer encode(Object msg) { if (msg instanceof IoBuffer) { IoBuffer buff = (IoBuffer) msg; return buff; } else { throw new RuntimeException("Message Not Support"); } } // 透傳不須要編解碼,簡單返回ByteBuffer數據 public Object decode(IoBuffer buff) { if (buff.remaining() > 0) { byte[] data = new byte[buff.remaining()]; buff.readBytes(data); return IoBuffer.wrap(data); } else { return null; } } @Override protected void onSessionAccepted(Session sess) throws IOException { Session target = null; Dispatcher dispatcher = sess.getDispatcher(); try { target = dispatcher.createClientSession(targetAddress, this); } catch (Exception e) { sess.asyncClose(); return; } sess.chain = target; target.chain = sess; dispatcher.registerSession(SelectionKey.OP_CONNECT, target); } @Override public void onSessionConnected(Session sess) throws IOException { Session chain = sess.chain; if(chain == null){ sess.asyncClose(); return; } if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ); chain.register(SelectionKey.OP_READ); } } @Override protected void onMessage(Object msg, Session sess) throws IOException { Session chain = sess.chain; if(chain == null){ sess.asyncClose(); return; } chain.write(msg); } @Override public void onSessionToDestroy(Session sess) throws IOException { try { sess.close(); } catch (IOException e) { //ignore } if (sess.chain == null) return; try { sess.chain.close(); sess.chain.chain = null; sess.chain = null; } catch (IOException e) { } } @SuppressWarnings("resource") public static void main(String[] args) throws Exception { Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.setServerName("TcpProxyServer"); server.start(); } }