WebSocket 服務端實現

        WebSocket,並不是HTML 5獨有,WebSocket是一種協議。只是在handshake的時候,發送的連接信息頭和HTTP類似。HTML 5只是實現了WebSocket的客戶端。其實,難點在於服務端,服務端相對仍是比較複雜的。javascript

 

        網上demo不少,可是能講到點上的很少,並且也不知道做者有沒有深刻試驗過。WebSokect協議 ,handshake這part其實仍是比較簡單的,比較複雜的仍是數據傳輸(第二部分)比較難。html

 

        上圖是幀格式,對於解釋能夠看文檔也能夠查看文章:http://www.cnblogs.com/imayi/archive/2012/05/05/2485343.htmljava


        也就說,你須要解讀opcode, Payload len 這些比較敏感的位值以後,你纔開始取後面的Payload Data, 好比opcode若是是1,那麼就是讀取字符串,若是是2,那麼就是讀取流,若是是8,那麼就是關閉socket。web

        若是本身用java作服務端,握手協議的響應,換行符不能使「\r\n」或「\n」,由於這不是標準的換行符,實際上是個字符串,只是屏幕顯示的時候是換行,能夠用PrintWrite.println(),或者apache

String newLine = (String) java.security.AccessController.doPrivileged(數組

              new sun.security.action.GetPropertyAction("line.separator"));緩存

        握手以後,根據opcode作相應的操做,JAVA的IO和NIO,在實現方面都缺憾,難以實現。使用IO的話,在得到socket以後,InputStream會處於阻塞,由於阻塞,因此後續作outputStream的操做時就會不方便。NIO的話,是SocketChannel寫入讀出,基本都是ByteBuffer,用這個的話,有時opcode值爲1(讀取字符串),ByteBuffer解碼獲得的字符串常常性是亂碼(這個本人沒有深究)。感受tomcat

    Tomcat在7.0.27以後就開始支持WebSocket,在它之上創建WebSocket是很簡單的,基本就是繼承WebSocketServlet,實現createWebSocketInbound方法和重寫StreamInbound的方法。Tomcat的example裏面有相關的例子。服務器

    ######若是已經知道tomcat的運行流程那麼就略過這一段#####websocket

      tomcat的源代碼研究,網上挺多的。我的就看過王程斯的Tomcat源碼學習

下面引用一下他的插圖:

    大致來說就是經過監聽幾個端口,運用線程池處理Socket,以後adapter打包數據給容器。

    下面羅嗦一下:

    tomcat啓動時,會啓用幾個AcceptorThread 監控端口,JIoEndpointAcceptor - Acceptor(線程),將接收到的socket放入  JIoEndpointAcceptor - processSocket 處理。processSocket 裏面 socket被打包以後SocketWrapper放入JIoEndpointAcceptor - SocketProcessor(線程)處理  ( SSL handshake在這裏作處理)。SocketProcessor線程裏面,socket被交給 AbstractProtocol  process方法進行處理處理的過程,建立Http11Processor,Http11Protocol - Http11ConnectionHandler - createProcessorprocessor以後會被註冊,Http11Processor  register(processor);被註冊緩存起來,以便其餘socket過來能夠沿用(對了,Http11Processor  繼承於 AbstractHttp11Processor。繼續,socket被Http11Processor  - process(SocketWrapper<S> socketWrapper) 處理生成Request對象,要知道tomcat爲咱們作了封裝,咱們的輸入輸出都只須要調用Request和Response。Http11Processor  的process都作了什麼呢??getInputBuffer().init(socketWrapper, endpoint);InternalInputBuffer  得到socket的inputStream,getOutputBuffer().init(socketWrapper, endpoint);InternalOutputBuffer  得到socket的outputStream。讀取inputStream的值到buf裏面,prepareRequest   將值包裝到Request。

######################end

        好吧,很亂,就着源代碼畫着圖就比較容易理解。並且這個只是Http11的普通IO處理流程,tomcat還能處理ajp協議。並且對於HTTP11還有另外一套的NIO處理流程。

        將上面的流程,主要是由於:WebSocket啓用的是JIOEndpoint,而不是JNIOEndpoint等。

        每次交互都會由Http11Processor  process處理,若是處理的時候返回的狀態是upgrade,也就是http升級協議(websocket協議)。那麼tomcat就會封裝出一個UpgradeInbound 。

UpgradeInbound inbound = processor.getUpgradeInbound();
// Release the Http11 processor to be re-used
release(socket, processor, false, false);
// Create the light-weight upgrade processor
processor = createUpgradeProcessor(socket, inbound);
inbound.onUpgradeComplete();

以後就會開始握手:

state = processor.upgradeDispatch();

        這些過程還會建立WsOutbound 和 StreamInbound、WsInputStream(繼承inputStream),很明顯,Ws就是websocket的意思,這些輸入流和輸出流都是WebSocket協議定製的。WsOutbound負責輸出結果,StreamInbound和WsInputStream負責接收數據, tomcat對字節和字符串處理是不同的。
涉及到的對象(org.apache.catalina):

        WebSokect協議有70頁左右,沒有深刻的看下去,太煎熬了(但願有牛人,能夠翻譯一下,造福人類)。看了些協議,又看了tomcat的WebSocket實現。如今就是貼本身作的demo(demo很簡單,請別噴):

客戶端:

<!DOCTYPE HTML>
<html>
<head>
<title>Web Socket Demo --  EchoClient</title>
<meta charset="utf-8">
</head>
<script type="text/javascript">
	var websocket = null;
	function connect(){
		var msg = document.getElementById("msg");
		try{
			var readyState = new Array("正在鏈接", "已創建鏈接", "正在關閉鏈接"
						, "已關閉鏈接");
			var host = "ws://localhost:8000";
			websocket = new WebSocket(host);
			websocket.onopen = function(){
				msg.innerHTML += "<p>Socket狀態: " + readyState[websocket.readyState] + "</p>";
			}
			websocket.onmessage = function(event){
				msg.innerHTML += "<p>接收信息: " + event.data + "</p>";
			}
			websocket.onclose = function(){
				msg.innerHTML += "<p>Socket狀態: " + readyState[websocket.readyState] + "</p>";
			}
			msg = document.getElementById("msg");
			msg.innerHTML += "<p>Socket狀態: " + readyState[websocket.readyState] + "</p>";
		}catch(exception){
			msg.innerHTML += "<p>有錯誤發生</p>";
		}
	}

	function send(){
		var msg = document.getElementById("msg");
		var text = document.getElementById("text").value;
		if(text == ""){
			msg.innerHTML += "<p>請輸入一些文字</p>";
			return;
		}
		try{
			websocket.send(text);
			msg.innerHTML += "<p>發送數據:  " + text + "</p>";
		}catch(exception){
			msg.innerHTML += "<p>發送數據出錯</p>";
		}
		document.getElementById("text").value = "";
	}

	function disconnect(){
		websocket.close();
	}
</script>
<body>
	<h1>WebSocket客戶端實例</h1>
	<div id="msg" style="height: 300px;"></div>
	<p>請輸入一些文字</p>
	<input type="text" id="text"/>
	<button id="connect" onclick="connect();">創建鏈接</button>
	<button id="send" onclick="send();">發送數據</button>
	<button id="disconnect" onclick="disconnect();">斷開鏈接</button>
</body>
</html>

略過,不說,哪哪都有這部分的代碼。

服務端:

         上面說了,服務端是挺難的。

一、處理握手,本人使用InputStream.read(byte[], off, len)方法讀取了字節以後再處理。

二、使用InputStream.read()方法逐一讀取字節,解析FIN,opcode,PayloadLen,mask(掩碼,根據websocket協議,客戶端傳過來的數據必須經過掩碼計算再傳輸)等信息,固然rsv在個人demo沒有,沒用上。

三、知道數據長度了(PS:JAVA的UTF8裏面中文可能佔用3-4個字節),讀取數據,而後經過解碼獲取真正的byte值。

四、輸出結果,結果無需掩碼,可是也有格式要求,看協議,懶得看,就看tomcat源碼。

package socket;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.security.MessageDigest;


/*
 * 垃圾程序,只求速成,沒有效率,複用這個概念,望諒解
 * */
public class EchoServer {
	private int port = 8000;
	private ServerSocket serverSocket;

	public EchoServer() throws IOException {
		serverSocket = new ServerSocket(port);
		System.out.println("服務器啓動");
	}

	private void service() {
		Socket socket = null;
		while (true) {
			try {
				socket = serverSocket.accept();
				Thread workThread = new Thread(new Handler(socket));
				workThread.start();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	class Handler implements Runnable {
		private Socket socket;
		private boolean hasHandshake = false;
		Charset charset = Charset.forName("UTF-8");  
		
		public Handler(Socket socket) {
			this.socket = socket;
		}

		private PrintWriter getWriter(Socket socket) throws IOException {
			OutputStream socketOut = socket.getOutputStream();
			return new PrintWriter(socketOut, true);
		}


		public String echo(String msg) {
			return "echo:" + msg;
		}

		public void run() {
			
			try {
				System.out.println("New connection accepted"
						+ socket.getInetAddress() + ":" + socket.getPort());
				InputStream in = socket.getInputStream();
			    
				PrintWriter pw = getWriter(socket);
				//讀入緩存
				byte[] buf = new byte[1024];
				//讀到字節
				int len = in.read(buf, 0, 1024);
				//讀到字節數組
				byte[] res = new byte[len];
				System.arraycopy(buf, 0, res, 0, len);
				String key = new String(res);
				if(!hasHandshake && key.indexOf("Key") > 0){
					//握手
					key = key.substring(0, key.indexOf("==") + 2);
					key = key.substring(key.indexOf("Key") + 4, key.length()).trim();
					key+= "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
					MessageDigest md = MessageDigest.getInstance("SHA-1");  
					md.update(key.getBytes("utf-8"), 0, key.length());
					byte[] sha1Hash = md.digest();  
				        sun.misc.BASE64Encoder encoder = new sun.misc.BASE64Encoder();  
					key = encoder.encode(sha1Hash);  
					pw.println("HTTP/1.1 101 Switching Protocols");
					pw.println("Upgrade: websocket");
					pw.println("Connection: Upgrade");
					pw.println("Sec-WebSocket-Accept: " + key);
					pw.println();
					pw.flush();
					hasHandshake = true;
					
					//接收數據
					byte[] first = new byte[1];
					//這裏會阻塞
				        int read = in.read(first, 0, 1);
				        while(read > 0){
				    	    int b = first[0] & 0xFF;
				    	    //1爲字符數據,8爲關閉socket
				            byte opCode = (byte) (b & 0x0F);
				        
				            if(opCode == 8){
				        	socket.getOutputStream().close();
				        	break;
				            }
				            b = in.read();
				            int payloadLength = b & 0x7F;
				            if (payloadLength == 126) {
				                byte[] extended = new byte[2];
				                in.read(extended, 0, 2);
				                int shift = 0;
				                payloadLength = 0;
				                for (int i = extended.length - 1; i >= 0; i--) {
				                	payloadLength = payloadLength + ((extended[i] & 0xFF) << shift);
				                    shift += 8;
				                }

				            } else if (payloadLength == 127) {
				                byte[] extended = new byte[8];
				                in.read(extended, 0, 8);
				                int shift = 0;
				                payloadLength = 0;
				                for (int i = extended.length - 1; i >= 0; i--) {
				                	payloadLength = payloadLength + ((extended[i] & 0xFF) << shift);
				                    shift += 8;
				                }
				            }
				        
				        //掩碼
				        byte[] mask = new byte[4];
				        in.read(mask, 0, 4);
				        int readThisFragment = 1;
				        ByteBuffer byteBuf = ByteBuffer.allocate(payloadLength + 10);
				        byteBuf.put("echo: ".getBytes("UTF-8"));
				        while(payloadLength > 0){
				        	 int masked = in.read();
				             masked = masked ^ (mask[(int) ((readThisFragment - 1) % 4)] & 0xFF);
				             byteBuf.put((byte) masked);
				             payloadLength--;
				             readThisFragment++;
				        }
				        byteBuf.flip();
				        responseClient(byteBuf, true);
				        printRes(byteBuf.array());
				        in.read(first, 0, 1);
				    }
				    
				}
				in.close();
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				try {
					if (socket != null)
						socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

		private void responseClient(ByteBuffer byteBuf, boolean finalFragment) throws IOException {
		    OutputStream out = socket.getOutputStream();
		    int first = 0x00;
			//是不是輸出最後的WebSocket響應片斷
	            if (finalFragment) {
	                first = first + 0x80;
	                first = first + 0x1;
	            }
	            out.write(first);
	        

	            if (byteBuf.limit() < 126) {
	                out.write(byteBuf.limit());
	            } else if (byteBuf.limit() < 65536) {
	        	out.write(126);
	        	out.write(byteBuf.limit() >>> 8);
	        	out.write(byteBuf.limit() & 0xFF);
	            } else {
	            // Will never be more than 2^31-1
	        	out.write(127);
	        	out.write(0);
	        	out.write(0);
	        	out.write(0);
	        	out.write(0);
	        	out.write(byteBuf.limit() >>> 24);
	        	out.write(byteBuf.limit() >>> 16);
	        	out.write(byteBuf.limit() >>> 8);
	        	out.write(byteBuf.limit() & 0xFF);

	            }

	            // Write the content
	            out.write(byteBuf.array(), 0, byteBuf.limit());
	            out.flush();
		}

		
		private void printRes(byte[] array) {
			ByteArrayInputStream  byteIn = new ByteArrayInputStream(array);
			InputStreamReader reader = new InputStreamReader(byteIn, charset.newDecoder());
			int b = 0;
			String res = "";
			try {
				while((b = reader.read()) > 0){
					res += (char)b;
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
			System.out.println(res);
		}
	}

	public static void main(String[] args) throws IOException {
		new EchoServer().service();
	}
}

        本文出自:      http://my.oschina.net/u/590484/blog/71797

                                                                                                                        by lin_bobo

相關文章
相關標籤/搜索