1、前言:java
已經好久沒有碰編碼了,大概有9年的時間,突飛猛進的框架和新東西讓我眼花繚亂。以前一直在作web相關的應用。因爲項目不大,分佈式開發在我編碼的那個年代裏沒有作過,後來走上管理崗位才接觸到,僅限於溝通交流和方案的策劃,並無真正的作過。現在我有了一點時間和精力,決定本身學習一下,先從簡單的消息通信開始吧。
git
好,背景完畢!下面說說我想作的東西,我想作一個基於NIO的消息路由,而並不基於目前已有的各類優秀框架(mina,netty等等),這麼作的初衷也許跟我我的的習慣有關,我老是以爲若是不明白原理,即便再好的框架當遭遇問題的時候,我也會無從下手,若是我懂得了原理,再選用其餘的框架,也會更駕輕就熟。因此纔沒有使用現今那些優秀的框架,或許是個人一點點偏見吧。web
個人代碼已經發布在 http://git.oschina.net/java616網絡
目已經完成根據客戶端的標識進行消息的異步轉發,仍會持續的迭代和增長。有興趣的能夠下載回去,若是我有作的很差或者不對的地方,敬請指出。框架
2、一些概念和例程異步
NIO是啥我就不說了,咱們來看一下我理解的NIO工做流程,如圖:socket
上圖爲我所理解的NIO的工做過程,若是存在問題,請批評斧正。歸納一下個人理解:分佈式
SocketChannel:爲NIO工做過程當中,數據傳輸的通道,客戶端與服務端的每次交互都是經過此通道進行的;學習
Selector(多路複用器):會監控其註冊的通道上面的任何事件,得到SelectionKey,事件分爲OP_ACCEPT,OP_CONNECT,OP_WRITE,OP_READ(這是SelectionKey的四個屬性),OP_ACCEPT應該爲服務端接收到客戶端鏈接時的一種狀態,我在客戶端並無用到此狀態;OP_CONNECT則爲客戶端已經鏈接上服務端的一種狀態,我在服務端並無使用這個狀態;測試
Buffer:個人應用中,我一直使用ByteBuffer,此類是整個NIO通信的關鍵,必須理解才能進行通信的開發,不然可能產生問題;全部的通信內容都須要在此類中寫入和讀出;
若是想作nio相關的應用,那麼一些概念上的東西是不可迴避的,在這裏推薦:http://www.iteye.com/magazines/132-Java-NIO 。
下面三段代碼,分別完成了服務的建立、服務對事件的監聽以及客戶端對事件的監聽(不可直接拷貝使用,有一些變量沒有聲明,若有興趣,能夠去下載個人源碼)。
服務的建立
//打開一個serversocket通道,ServerSocketChannel是一個監控是否有新鏈接進入的通道。 serverSocketChannel = ServerSocketChannel.open(); //將這個serversokect通道設置爲非阻塞模式 serverSocketChannel.configureBlocking(false); //綁定serversokect的ip和端口 serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort())); //打開選擇器 selector = Selector.open(); //將此通道註冊給選擇器selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
服務對事件的監聽
//監聽事件key selector.select(2000); //迭代一組事件key Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { //定義一個socket通道 SocketChannel socketChannel = null; int count = 0; SelectionKey key = keys.next(); // Logs.info("有網絡事件被觸發,事件類型爲:" + key.interestOps()); //刪除Iterator中的當前key,避免重複處理 keys.remove(); if (!key.isValid()) { continue; } else if (key.isAcceptable()) { //從客戶端送來的key中獲取ServerSocket通道 serverSocketChannel = (ServerSocketChannel) key.channel(); //接收此ServerSocket通道中的Socket通道,accept是一個阻塞方法,一直到獲取到鏈接纔會繼續 socketChannel = serverSocketChannel.accept(); //將此socket通道設置爲非阻塞模式 socketChannel.configureBlocking(false); //將此通道註冊到selector,並等待接收客戶端的讀入數據 socketChannel.register(selector, SelectionKey.OP_READ); allocToken(socketChannel); } else if (key.isReadable()) { //獲取事件key中的channel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock()); //清理緩衝區,便於使用 byteBuffer.clear(); //將channel中的字節流讀入緩衝區 count = socketChannel.read(byteBuffer); byteBuffer.flip(); //處理粘包 if (count > 0) { try { handlePacket(socketChannel, byteBuffer); } catch (Exception e) { e.printStackTrace(); // continue;//若是當前包存在非法拋出異常,那麼再也不進行處理直接跳出循環,處理下一個包;此處存疑,測試階段暫時註釋 } } else if (count == 0) { continue; } else { socketChannel.close(); } } else if (key.isWritable()) { ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ); } }
客戶端對事件的監聽
while (true) { try { selector.select(3000); Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); for (int i = 0; keys.hasNext(); i++) { SelectionKey key = keys.next(); keys.remove(); if (key.isConnectable()) { socketChannel = (SocketChannel) key.channel(); if (socketChannel.isConnectionPending()) { if (socketChannel.finishConnect()){ Client.IS_CONNECT =true; logger.info("-------成功鏈接服務端!-------"); } } socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { //獲取事件key中的channel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK); //清理緩衝區,便於使用 byteBuffer.clear(); //將channel中的字節流讀入緩衝區 String readStr = ""; int count = socketChannel.read(byteBuffer); //務必要把buffer的position重置爲0 byteBuffer.flip(); handlePacket(byteBuffer, count); // socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isWritable()) { socketChannel.register(selector, SelectionKey.OP_READ); } } } catch (IOException e) { e.printStackTrace(); continue; } }
3、我要作的是個啥?
根據我我的對NIO的理解,個人初步想法是要實現一個這樣的東西,如圖:
但在個人不斷深刻開發中,發現上面的圖中不少不成熟的內容,做爲一個完整的消息通信的服務,必須包含以下的內容:
一、對接入鏈接的管理;
二、對鏈接身份的確認;
三、對異常關閉鏈接的回收;
四、根據身份對消息的轉發;
五、鏈路的維持;
六、自動重連;
七、消息的異步處理;
八、消息的響應機制;
九、粘包和斷包的處理;
九、配置體系;
十、通信層與業務層的分離;
………………
網上不少的NIO實例都是能夠運行的,但並不能知足個人工做須要,以上的那些確定還有沒有考慮全的東西,隨着我一點點的開發會逐漸的浮出水面。
在將來的文章中,我會逐步把我本身制定的通信協議,各個模塊的結構,以及代碼貼出來,但願你們可以互相學習,互相幫助。(待續)