Jetty源碼學習9-WebSocket

引言 javascript

經過NIO+Continunation+HttpClient可使Jetty具備異步長鏈接的功能,但有些應用場景確須要服務器「推」的功能,好比說:聊天室、實時消息提醒、股票行情等對於實時要求比較高的應用,能想到的實時推送的解決方案大體能夠分爲下面幾種: html

一、輪詢:前臺ajax輪詢實時消息。 java

二、applet:已經OUT了不是~並且亦有安全方面的問題 web

三、長鏈接:在一次TCP的鏈接上發送屢次數據,除非手動close,但須要在HTTP協議的基礎上作協議的轉換並應用在客戶端和服務端,這些工做須要本身來實現。 ajax

我最初接觸到websocket是設計一個資源遠程加載的平臺。設想你在本地開發web應用,你只須要告訴平臺你的應用在本地的地址。第三方的人員(主管或者運營人員,亦或是一個項目組的同事)能夠隨時經過訪問平臺看到你工做的成果,由於是實時的,因此溝通會更加有效。 apache

本文描述的websocket就是一個非Http的雙向鏈接(其實也跟Http息息相關,下文有詳解),有了它你不須要沒事去輪詢實現推的功能;有了它你能夠對註冊到平臺上的計算機作一些事情(確實有安全隱患,不過都是開發環境也就無所謂了)。 瀏覽器

一個簡單的實例 安全

爲了研究websocket須要搭建一個功能環境,修改了網上的一段實例並調試無誤,貼出來主要代碼,須要完整工程的同窗請留言。 服務器

實例流程以下: websocket

A 客戶端創建websocket鏈接後發送給服務端want消息告知服務端。

B 服務端接收到消息,判斷若是是want命令的話,返回給全部客戶端begin消息。

C 客戶端接受消息,判斷若是是begin命令的話,即讀取本地文件發送給服務端。

D 服務端接收消息,判斷若是是非want命令的話,將讀取的內容加上後綴返回給客戶端。

E 客戶端接受消息,判斷若是是非begin命令的話,將讀取的內容顯示在html頁面上。

一、前臺JS

下面三段js主要是實現服務端讀取已註冊的用戶計算機上的文件,很邪惡的有木有。

//讀取本地的文件
function read(file) { 
     if(typeof window.ActiveXObject != 'undefined') {  
         var content = ""; 
         try { 
             var fso = new ActiveXObject("Scripting.FileSystemObject");   
             var reader = fso.openTextFile(file, 1); 
             while(!reader.AtEndofStream) { 
                 content += reader.readline(); 
                 content += "\n"; 
             }  
             // close the reader 
             reader.close(); 
         } 
         catch (e) {  
             alert("Internet Explore read local file error: \n" + e);  
         }           
         return content; 
     } 
     else if(document.implementation && document.implementation.createDocument) { 
         var content = ""
         try { 
             netscape.security.PrivilegeManager.enablePrivilege('UniversalXPConnect'); 
             var lf = Components.classes["@mozilla.org/file/local;1"].createInstance(Components.interfaces.nsILocalFile); 
             lf.initWithPath(file); 
             if (lf.exists() == false) {    
                 alert("File does not exist");   
             }               
             var fis = Components.classes["@mozilla.org/network/file-input-stream;1"].createInstance(Components.interfaces.nsIFileInputStream);   
             fis.init(lf, 0x01, 00004, null);   
             var sis = Components.classes["@mozilla.org/scriptableinputstream;1"].createInstance(Components.interfaces.nsIScriptableInputStream);   
             sis.init(fis);   
             var converter = Components.classes["@mozilla.org/intl/scriptableunicodeconverter"].createInstance(Components.interfaces.nsIScriptableUnicodeConverter);   
             var insis = sis.read(sis.available());
             converter.charset = "GBK";   
             content = converter.ConvertToUnicode(insis); 
         } 
         catch (e) {  
             alert("Mozilla Firefox read local file error: \n" + e);  
         }        
         return content; 
     } 
 } 
 
 </script> 
        <script type='text/javascript'>
            //判斷當前瀏覽器是否支持websocket
            if (!window.WebSocket)
                alert("window.WebSocket unsuport!");
			else
				alert("suport!");

            function $() {
                return document.getElementById(arguments[0]);
            }
            function $F() {
                return document.getElementById(arguments[0]).value;
            }

            function getKeyCode(ev) {
                if (window.event)
                    return window.event.keyCode;
                return ev.keyCode;
            }
            //websocket主要實現
            var server = {
                connect : function() {
                    var location ="ws://localhost:8888/petstore/servlet/a?key=123";
					alert("before conn!");
                    this._ws =new WebSocket(location);
					alert("has conned!");
                    this._ws.onopen =this._onopen;
                    this._ws.onmessage =this._onmessage;
                    this._ws.onclose =this._onclose;
					server._send("want");
                },

                _onopen : function() {
				
                },

                _send : function(message) {
                    if (this._ws)
                        this._ws.send(message);
                },

                send : function(text) {
                    if (text !=null&& text.length >0)
                        server._send(text);
                },

                _onmessage : function(m) {				
                    if (m.data) {
						if (m.data=="begin") {
							var res = read("/Users/apple/workspace/apache/chenshuai.html");
							server._send(res);
						}
						else {
					
                        var messageBox = $('messageBox');
                        var spanText = document.createElement('span');
                        spanText.className ='text';
                        spanText.innerHTML = m.data;
                        var lineBreak = document.createElement('br');
                        messageBox.appendChild(spanText);
                        messageBox.appendChild(lineBreak);
                        messageBox.scrollTop = messageBox.scrollHeight
                                - messageBox.clientHeight;
								}
                    }
                },

                _onclose : function(m) {
                    this._ws =null;
                }
            };
        </script>

        <script type='text/javascript'>
            //顯式觸發websocket的創建
            $('connect').onclick =function(event) {
				alert("has clicked!");
                server.connect();
                return false;
            };
        </script>

二、後臺servlet

public class MyWebSocketServlet extends WebSocketServlet {
    private static final long serialVersionUID = -7289719281366784056L;
    public static String newLine = System.getProperty("line.separator");

    private final Set<TailorSocket> _members = new CopyOnWriteArraySet<TailorSocket>();
    private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();


    public void init() throws ServletException {
        super.init();
    }

    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        getServletContext().getNamedDispatcher("default").forward(request,
                response);
    }

    public WebSocket doWebSocketConnect(HttpServletRequest request,
            String protocol) {
    	String key = (String) request.getParameter("key");
        return new TailorSocket(key);
    }

    class TailorSocket implements WebSocket.OnTextMessage {
        private Connection _connection;
        private String key;
        
        public String getKey() {
        	return key;
        }
        
        public TailorSocket(String key) {
        	this.key = key;
        }

        public void onClose(int closeCode, String message) {
            _members.remove(this);
        }

        public void sendMessage(String data) throws IOException {
            _connection.sendMessage(data);
        }

    
        public void onMessage(String data) {
        	for(TailorSocket member : _members){
                System.out.println("Trying to send to Member!");
                if(member.isOpen()){
                    System.out.println("Sending!");
                    try {
                    	if (data.equals("want")) {
                    		member.sendMessage("begin");
                    	}
                    	else {
                    		member.sendMessage(data+member.getKey());
                    	}                    
                    } catch (IOException e) {                   
                    }
                }
            }     		
            System.out.println("Received: "+data);
        }

        public boolean isOpen() {
            return _connection.isOpen();
        }


        public void onOpen(Connection connection) {
            _members.add(this);
            _connection = connection;
            try {
                connection.sendMessage("onOpen:Server received Web Socket upgrade and added it to Receiver List.");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
三、web.xml配置
<servlet>
        <servlet-name>WebSocket</servlet-name>
        <servlet-class>com.alibaba.myX3.dal.dataobject.MyWebSocketServlet</servlet-class>
    </servlet>
    <servlet-mapping>
        <servlet-name>WebSocket</servlet-name>
        <url-pattern>/servlet/*</url-pattern>
    </servlet-mapping>
四、前臺頁面
<body>
        <div id='messageBox'></div>
        <div id='input'>
            <div>
                <input id='connect' class='button' type='submit' name='Connect'
                    value='Connect' />
            </div>
        </div>
        <script type='text/javascript'>
            $('connect').onclick =function(event) {
				alert("has clicked!");
                server.connect();
                return false;
            };
        </script>

        <p>
            JAVA Jetty for WebSocket
        </p>
    </body>

五、運行結果

1)本機的文件

2)點擊鏈接以後的結果

只要瀏覽器過關,任何人均可以看到你本地文件的內容了~推送功能算是完成了。

HTTP狀態碼101

給出狀態碼的預備知識,對於理解websocket挺有用的。

1xx:這一類型的狀態碼,表明請求已被接受,須要繼續處理。這類響應是臨時響應,只包含狀態行和某些可選的響應頭信息,並以空行結束。因爲 HTTP/1.0 協議中沒有定義任何 1xx 狀態碼,因此除非在某些試驗條件下,服務器禁止向此類客戶端發送 1xx 響應。

101:服務器已經理解了客戶端的請求,並將經過Upgrade 消息頭通知客戶端採用不一樣的協議來完成這個請求。在發送完這個響應最後的空行後,服務器將會切換到在Upgrade 消息頭中定義的那些協議。

WebSocket原理

客戶端不在本文的研究範圍,這裏只分析Jetty是如何實現的,其實也大同小異,無非是新的協議罷了~

一、WebSocket模型

紅色:請求的入口,它定義了WebSocket並持有WebSocketFactory,從而初始化WebSocket鏈接並設置鏈接的WebSocket值。

藍色:至關於HTTP協議的HttpConnection,不需解釋。

橙色:新的協議天然須要新的解析和生產規則了。

二、模擬一次鏈接的創建

A 簡要流程:

B 詳細流程:

1)客戶端請求創建WebSocket鏈接

var location ="ws://localhost:8888/petstore/servlet/a?key=123";
					alert("before conn!");
                    this._ws =new WebSocket(location);
此時會向服務器發出:http://localhosts:8888/petstore/servlet/a?key=123,天然不是ws協議,服務器也得認識才行啊,因此ws協議的最初的創建也是依賴了http協議。

2)服務端servlet處理請求

//判斷客戶端是否要求更新協議爲websocket
if ("websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
        {
            String origin = request.getHeader("Origin");
            if (origin==null)
                origin = request.getHeader("Sec-WebSocket-Origin");
            if (!_acceptor.checkOrigin(request,origin))
            {
                response.sendError(HttpServletResponse.SC_FORBIDDEN);
                return false;
            }

            // Try each requested protocol
            WebSocket websocket = null;

            @SuppressWarnings("unchecked")
            Enumeration<String> protocols = request.getHeaders("Sec- 
            WebSocket-Protocol");
            String protocol=null;
            while (protocol==null && protocols!=null && 
            protocols.hasMoreElements())
            {
                String candidate = protocols.nextElement();
                for (String p : parseProtocols(candidate))
                {
                    websocket = _acceptor.doWebSocketConnect(request, p);
                    if (websocket != null)
                    {
                        protocol = p;
                        break;
                    }
                }
            }

            // Did we get a websocket?
            if (websocket == null)
            {
               // 用servlet提供的webSocket實現,上面的實例有詳細實現
               websocket = _acceptor.doWebSocketConnect(request, null);

               if (websocket==null)
               {
                     
               response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
               return false;
               }
           }

            // 告訴客戶端,我已經準備好了切換協議了
            upgrade(request, response, websocket, protocol);
            return true;
        }

        return false;

看下upgrade的實現:

AbstractHttpConnection http = AbstractHttpConnection.getCurrentConnection();
if (http instanceof BlockingHttpConnection)
      throw new IllegalStateException("Websockets not supported on blocking 
      connectors");
//用着同樣的信道,並木有從新創建socket鏈接
ConnectedEndPoint endp = (ConnectedEndPoint)http.getEndPoint();
connection = new WebSocketServletConnectionRFC6455(this, websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, extensions, draft);
// Set the defaults
connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize);
connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize);
// 完成「握手」階段,總要告訴點客戶端什麼,大體就是:我已經換好協議了,你那邊能夠發送新協議格式的數據了啊!
connection.handshake(request, response, protocol);
response.flushBuffer();
// Give the connection any unused data from the HTTP connection.       
connection.fillBuffersFrom(((HttpParser)http.getParser()).getHeaderBuffer());      
connection.fillBuffersFrom(((HttpParser)http.getParser()).getBodyBuffer());
// 至此換了新的鏈接,新的協議解析器和生成器,總不至於還用外面的HttpConnection吧,那我就把新的協議放在request裏面,把協議發生改變的標示放在reponse裏面,後面jetty判斷response若是有協議改變的話就會更新endpoint的connection了。
LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),draft,protocol,connection);
request.setAttribute("org.eclipse.jetty.io.Connection", connection);

3)Jetty替換原來的Http協議爲最新的WebSocket協議

// look for a switched connection instance?
if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
    Connection switched=
         (Connection)_request.getAttribute("org.eclipse.jetty.io.Connection");
    if (switched!=null)
         connection=switched;
}

C 報文數據

URL:http://localhost:8888/petstore/servlet/a?key=123

狀態碼:101

報文頭信息:

三、模擬請求的接受和發送

1)WebSocketParserRFC6455解析請求的參數

progress=true;
_handler.onFrame(_flags, _opcode, data);
_bytesNeeded=0;
_state=State.START;
2)接着看下ws協議框架處理器 WSFrameHandler是如何處理解析出的data的。
//示例中的websocket就是該類型的,所以由它來處理
if(_onTextMessage!=null)
    {
    if (_connection.getMaxTextMessageSize()<=0)
    {
        // No size limit, so handle only final frames
        if (lastFrame)
        //調用servlet中定義的websocket的回調接口                                                      
            _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
        else
        {
            LOG.warn("Frame discarded. Text aggregation disabled for 
            {}",_endp);                                    
            _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text frame   
            aggregation disabled");
        }
   }

3)看下servlet中定義的websocket:

class TailorSocket implements WebSocket.OnTextMessage {
        private Connection _connection;

        public void onClose(int closeCode, String message) {
            _members.remove(this);
        }

        public void sendMessage(String data) throws IOException {
            _connection.sendMessage(data);
        }

    
        public void onMessage(String data) {
        	for(TailorSocket member : _members){
                System.out.println("Trying to send to Member!");
                if(member.isOpen()){
                    System.out.println("Sending!");
                    try {
                    	if (data.equals("want")) {
                    		member.sendMessage("begin");
                    	}
                    	else {
                    		member.sendMessage(data+member.getKey());
                    	}                    
                    } catch (IOException e) {                   
                    }
                }
            }     		
            System.out.println("Received: "+data);
        }

        public boolean isOpen() {
            return _connection.isOpen();
        }


        public void onOpen(Connection connection) {
            _members.add(this);
            _connection = connection;
            try {
                connection.sendMessage("onOpen:Server received Web Socket upgrade and added it to Receiver List.");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

4)至於最後的發送數據,無非就是利用WebSocketGenerator生成協議格式的數據flush到信道中,不詳述了。

總結

學習了WebSocket感受對於HttpConnection和Http狀態碼的認識更加深入了,之後能夠基於Http定製好玩的協議,不過須要客戶端的配合。

Jetty實現了服務器推的功能而無需輪詢,缺陷就是支持的瀏覽器太少。大體流程是如此:

一、客戶端首先發送一條要求切換協議格式的http請求要求創建websocket鏈接。

二、服務端的servlet處理請求時發現Http請求中要求切換協議,所以在原有的信道上建立了新的協議鏈接器,並返回給客戶端101狀態碼,告訴客戶端已經創建好了新的協議鏈接器了,此即上面註釋中說的握手。

三、客戶端收到101狀態碼後返回客戶端的websocket實例,並開始發送和接受數據。

相關文章
相關標籤/搜索