基於NIO的消息路由的實現(一) 前言

1、前言:java

已經好久沒有碰編碼了,大概有9年的時間,突飛猛進的框架和新東西讓我眼花繚亂。以前一直在作web相關的應用。因爲項目不大,分佈式開發在我編碼的那個年代裏沒有作過,後來走上管理崗位才接觸到,僅限於溝通交流和方案的策劃,並無真正的作過。現在我有了一點時間和精力,決定本身學習一下,先從簡單的消息通信開始吧。
git

好,背景完畢!下面說說我想作的東西,我想作一個基於NIO的消息路由,而並不基於目前已有的各類優秀框架(mina,netty等等),這麼作的初衷也許跟我我的的習慣有關,我老是以爲若是不明白原理,即便再好的框架當遭遇問題的時候,我也會無從下手,若是我懂得了原理,再選用其餘的框架,也會更駕輕就熟。因此纔沒有使用現今那些優秀的框架,或許是個人一點點偏見吧。web

個人代碼已經發布在 http://git.oschina.net/java616網絡

目已經完成根據客戶端的標識進行消息的異步轉發,仍會持續的迭代和增長。有興趣的能夠下載回去,若是我有作的很差或者不對的地方,敬請指出。框架

2、一些概念和例程異步

NIO是啥我就不說了,咱們來看一下我理解的NIO工做流程,如圖:socket

上圖爲我所理解的NIO的工做過程,若是存在問題,請批評斧正。歸納一下個人理解:分佈式

  • SocketChannel:爲NIO工做過程當中,數據傳輸的通道,客戶端與服務端的每次交互都是經過此通道進行的;學習

  • Selector(多路複用器):會監控其註冊的通道上面的任何事件,得到SelectionKey,事件分爲OP_ACCEPT,OP_CONNECT,OP_WRITE,OP_READ(這是SelectionKey的四個屬性),OP_ACCEPT應該爲服務端接收到客戶端鏈接時的一種狀態,我在客戶端並無用到此狀態;OP_CONNECT則爲客戶端已經鏈接上服務端的一種狀態,我在服務端並無使用這個狀態;測試

  • Buffer:個人應用中,我一直使用ByteBuffer,此類是整個NIO通信的關鍵,必須理解才能進行通信的開發,不然可能產生問題;全部的通信內容都須要在此類中寫入和讀出;


若是想作nio相關的應用,那麼一些概念上的東西是不可迴避的,在這裏推薦:http://www.iteye.com/magazines/132-Java-NIO 。

下面三段代碼,分別完成了服務的建立、服務對事件的監聽以及客戶端對事件的監聽(不可直接拷貝使用,有一些變量沒有聲明,若有興趣,能夠去下載個人源碼)。

  • 服務的建立

//打開一個serversocket通道,ServerSocketChannel是一個監控是否有新鏈接進入的通道。
serverSocketChannel = ServerSocketChannel.open();
//將這個serversokect通道設置爲非阻塞模式
serverSocketChannel.configureBlocking(false);
//綁定serversokect的ip和端口
serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort()));
//打開選擇器
selector = Selector.open();
//將此通道註冊給選擇器selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  • 服務對事件的監聽

                //監聽事件key
                selector.select(2000);
                //迭代一組事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    //定義一個socket通道
                    SocketChannel socketChannel = null;

                    int count = 0;

                    SelectionKey key = keys.next();
                    //  Logs.info("有網絡事件被觸發,事件類型爲:" + key.interestOps());
                    //刪除Iterator中的當前key,避免重複處理
                    keys.remove();
                    if (!key.isValid()) {
                        continue;
                    } else if (key.isAcceptable()) {
                        //從客戶端送來的key中獲取ServerSocket通道
                        serverSocketChannel = (ServerSocketChannel) key.channel();
                        //接收此ServerSocket通道中的Socket通道,accept是一個阻塞方法,一直到獲取到鏈接纔會繼續
                        socketChannel = serverSocketChannel.accept();
                        //將此socket通道設置爲非阻塞模式
                        socketChannel.configureBlocking(false);
                        //將此通道註冊到selector,並等待接收客戶端的讀入數據
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        allocToken(socketChannel);

                    } else if (key.isReadable()) {

                        //獲取事件key中的channel
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
                        //清理緩衝區,便於使用
                        byteBuffer.clear();
                        //將channel中的字節流讀入緩衝區
                        count = socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        //處理粘包
                        if (count > 0) {
                            try {
                                handlePacket(socketChannel, byteBuffer);
                            } catch (Exception e) {
                                e.printStackTrace();
//                                continue;//若是當前包存在非法拋出異常,那麼再也不進行處理直接跳出循環,處理下一個包;此處存疑,測試階段暫時註釋
                            }
                        } else if (count == 0) {
                            continue;
                        } else {
                            socketChannel.close();

                        }

                    } else if (key.isWritable()) {
                        ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
                    }
                }


  • 客戶端對事件的監聽

            while (true) {
                try {

                    selector.select(3000);

                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                    for (int i = 0; keys.hasNext(); i++) {

                        SelectionKey key = keys.next();
                        keys.remove();
                        if (key.isConnectable()) {
                            socketChannel = (SocketChannel) key.channel();
                            if (socketChannel.isConnectionPending()) {
                                if (socketChannel.finishConnect()){
                                    Client.IS_CONNECT =true;
                                    logger.info("-------成功鏈接服務端!-------");
                                }

                            }
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isReadable()) {
                            //獲取事件key中的channel
                            socketChannel = (SocketChannel) key.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK);
                            //清理緩衝區,便於使用
                            byteBuffer.clear();
                            //將channel中的字節流讀入緩衝區
                            String readStr = "";
                            int count = socketChannel.read(byteBuffer);
                            //務必要把buffer的position重置爲0
                            byteBuffer.flip();

                            handlePacket(byteBuffer, count);
//                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } else if (key.isWritable()) {
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }

                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    continue;
                }

            }

3、我要作的是個啥?

根據我我的對NIO的理解,個人初步想法是要實現一個這樣的東西,如圖:

但在個人不斷深刻開發中,發現上面的圖中不少不成熟的內容,做爲一個完整的消息通信的服務,必須包含以下的內容:

一、對接入鏈接的管理;

二、對鏈接身份的確認;

三、對異常關閉鏈接的回收;

四、根據身份對消息的轉發;

五、鏈路的維持;

六、自動重連;

七、消息的異步處理;

八、消息的響應機制;

九、粘包和斷包的處理;

九、配置體系;

十、通信層與業務層的分離;

………………

網上不少的NIO實例都是能夠運行的,但並不能知足個人工做須要,以上的那些確定還有沒有考慮全的東西,隨着我一點點的開發會逐漸的浮出水面。

在將來的文章中,我會逐步把我本身制定的通信協議,各個模塊的結構,以及代碼貼出來,但願你們可以互相學習,互相幫助。(待續)

相關文章
相關標籤/搜索