WebSocket協議是基於TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通訊——容許服務器主動發送信息給客戶端。 WebSocket通訊協議於2011年被IETF定爲標準RFC 6455,並被RFC7936所補充規範。通俗的講就是服務器和客戶端能夠均可以主動的向對方發送消息,而不是請求-響應的模式了。html
java-websocket-1.3.8.jar oracle官方APIjava
private static WebSocketClient webSocketClient;
private void connetToServer(final String str){
try {
webSocketClient = new WebSocketClient(new URI("ws://192.168.25.188:8080/websocket"), new Draft_6455() {},null,100000) {
@Override
public void onOpen(ServerHandshake handshakedata) {
}
@Override
public void onMessage(String message) {
}
@Override
public void onClose(int code, String reason, boolean remote) {
}
@Override
public void onClosing(int code, String reason, boolean remote) {
super.onClosing(code, reason, remote);
}
@Override
public void onError(Exception ex) {
}
};
} catch (URISyntaxException e) {
e.printStackTrace();
}
webSocketClient.connect();
}
複製代碼
這裏須要注意的是,WwebSocket在握手鍊接階段借用了http協議,握手鍊接階段的報文跟http報文同樣。差很少以下形式:git
GET / HTTP/1.1
Host: ws://192.168.25.188:8080/
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
Upgrade: websocket
Origin: null
Sec-WebSocket-Version: 13
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36
Accept-Encoding: gzip, deflate, sdch
Accept-Language: zh-CN,zh;q=0.8
Sec-WebSocket-Key: VR+OReqwhymoQ21dBtoIMQ==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
複製代碼
跟http協議請求頭部,主要不同的是Connetiong:Upgrad和Upgrade: websocket以及Sec-WebSocket-Version: 13這些就告訴服務器說我要握手的完成的是哪一個版本的websocket協議。github
鏈接完成以後,能夠向服務端發送消息,直接調用WebSocketClient中的發送方法。有下面兩個發送消息的方法。web
public void send( String text )
}
public void send( byte[] data )
}
複製代碼
正常關閉 調用webSocket的close()方法。數組
WebSocketClient和WebSocketImpl使用了代理模式。瀏覽器
public void connect() {
if( writeThread != null )
throw new IllegalStateException( "WebSocketClient objects are not reuseable" );
writeThread = new Thread( this );
writeThread.start();
}
複製代碼
websocketclient的connet()方法,啓動一個線程,這個線程裏面的操做是什麼呢?就查看WebSocketClient的繼承在Runable實現的run()方法。bash
對run()方法作了適合的化簡服務器
public void run() {
InputStream istream;
socket = new Socket( proxy );
socket.setTcpNoDelay( isTcpNoDelay() );
socket.setReuseAddress( isReuseAddr() );
istream = socket.getInputStream();
ostream = socket.getOutputStream();
sendHandshake();
writeThread = new Thread( new WebsocketWriteThread() );
writeThread.start();
byte[] rawbuffer = new byte[ WebSocketImpl.RCVBUF ];
int readBytes;
while ( !isClosing() && !isClosed() && ( readBytes = istream.read( rawbuffer ) ) != -1 ) {
engine.decode( ByteBuffer.wrap( rawbuffer, 0, readBytes ) );
}
engine.eot();
}
複製代碼
在run方法中作了五件事:websocket
發送握手以後,在流中接收到服務端的返回,就交給engine的decode()方法,decode()方法最終會調用onWebsocketHandshakeReceivedAsClient()方法和onOpen() 至此完成握手鍊接。
//NOT_YET_CONNECTED, CONNECTING, OPEN, CLOSING, CLOSED
完成握手鍊接創建的通道以後,發送消息就很簡單了。關鍵代碼以下:
webSocketClient:
public void send( String text ) throws NotYetConnectedException {
engine.send( text );
}
複製代碼
WebsocketImpl:
private void send( Collection<Framedata> frames ) {
ArrayList<ByteBuffer> outgoingFrames = new ArrayList<ByteBuffer>();
for( Framedata f : frames ) {
outgoingFrames.add( draft.createBinaryFrame( f ) );
}
write( outgoingFrames );
}
private void write( ByteBuffer buf ) {
outQueue.put( buf );
wsl.onWriteDemand( this );
}
複製代碼
客戶端向服務器發送消息,歸根到底就是engin把要發送給的字節數組或者字符串轉換成ByteBuffer,而後放在outQueue的隊列中。等待WebSocketClient中啓動的寫線程在outQueue隊列讀出來,寫入socket的流中。
接受消息在websocketClient的run()方法中一直在流中讀數據,交給engin的decode()處理。
websocket:
private void decodeFrames( ByteBuffer socketBuffer ) {
List<Framedata> frames;
frames = draft.translateFrame( socketBuffer );
for( Framedata f : frames ) {
if( DEBUG )
System.out.println( "matched frame: " + f );
draft.processFrame( this, f );
}
}
複製代碼
Draft_6455:
public void processFrame( WebSocketImpl webSocketImpl, Framedata frame ) throws InvalidDataException {
Framedata.Opcode curop = frame.getOpcode();
if( curop == Framedata.Opcode.CLOSING ) {
}else if( curop == Framedata.Opcode.PING ) {
webSocketImpl.getWebSocketListener().onWebsocketPing( webSocketImpl, frame );
} else if( curop == Framedata.Opcode.PONG ) {
webSocketImpl.updateLastPong();
webSocketImpl.getWebSocketListener().onWebsocketPong( webSocketImpl, frame );
} else if( !frame.isFin() || curop == Framedata.Opcode.CONTINUOUS ) {
} else if( current_continuous_frame != null ) {
} else if( curop == Framedata.Opcode.TEXT ) {
webSocketImpl.getWebSocketListener().onWebsocketMessage( webSocketImpl, Charsetfunctions.stringUtf8( frame.getPayloadData() ) );
} else if( curop == Framedata.Opcode.BINARY ) {
webSocketImpl.getWebSocketListener().onWebsocketMessage( webSocketImpl, frame.getPayloadData() );
}
}
複製代碼
在Draft_draft_6455中首先檢查接受的數據幀的類型,連續的幀、仍是字節、仍是text或或者心跳檢測幀,而後作相應的處理,回調WebSocketListener相應的方法,接受到正常的String或者字節的時候,最終回調到onMessage()方法,用戶就接受到信息了。
由於WebSocket是基於TCP協議的,而正常的TCP協議的斷開須要通過「四次揮手」。爲何呢?TCP是雙工通訊的,客戶端和服務端都正常的向對方推送消息,因此揮手的過程以下:
客戶端:我要關閉了啊。
服務端:好的,我知道你要關閉了。
服務端:我要關閉了兄弟。
客戶端:好的我知道你要關閉了
複製代碼
通過以上過程,整個TCP協議就關閉了。
固然也會存在單項關閉的問題。客戶端關了,這時候客戶端不能向服務端發送消息了,可是服務端沒有關,服務端能夠向客戶端發送消息。相反也是。
那麼再java-websocket中是如何實現關閉的呢?
在WebsocketImpl中close方法向服務端發送一個關閉幀。
public synchronized void close( int code, String message, boolean remote ) {
if( isOpen() ) {
CloseFrame closeFrame = new CloseFrame();
closeFrame.setReason( message );
closeFrame.setCode( code );
closeFrame.isValid();
sendFrame( closeFrame );
}
setReadyState( READYSTATE.CLOSING );
}
複製代碼
服務端接受到關閉後會向客戶端發送一個關閉確認幀,客戶端接收到以後最會調用WebsocketImpl的closeConnetion()。
public synchronized void closeConnection( int code, String message, boolean remote ) {
if( getReadyState() == READYSTATE.CLOSED ) {
return;
}
if( getReadyState() == READYSTATE.OPEN ) {
if( code == CloseFrame.ABNORMAL_CLOSE ) {
setReadyState( READYSTATE.CLOSING );
}
}
this.wsl.onWebsocketClose( this, code, message, remote );
setReadyState( READYSTATE.CLOSED );
}
複製代碼
closeconnetion作的主要事情就是把客戶端的狀態設置爲closed,而後回調WebSocketLisetener的onWebsocketClose()。 至此,關閉了。
可能會有疑問若是客戶端已經處於關閉狀態了,若是服務端發送來關閉消息,客戶端就沒有辦法向服端關閉確認了。這個問題的解決,是使用超時機制,也就是服務端再發送關閉信息一段時間後沒有接受到確認,他就默認是客戶端收到了他的關閉信息。
須要 特別提出的是,WebSocket可能會出現不可預知的斷開鏈接了,可是這時候客戶端還不知道,客端戶依舊開心的處於「Open」的狀態。這個問題怎麼解決呢?
開心的是java-websocket中提供了心跳檢測,一旦檢測到不在線了鏈接斷了就會啓動從新鏈接機制。具體這個心跳檢測多長時間檢測一次咱們能夠經過WebSocketClient的以下方法
public void setConnectionLostTimeout( int connectionLostTimeout )
複製代碼
進行設置,系統默認的是60秒。經過判斷ping和pong一旦檢測到websocket斷開了就會調用engine的closeConnetion()方法。
當在監聽close方法中監聽到是ping不通的狀態致使的關閉的時候,就啓動一個線程,作五次鏈接操做,這部分的具體實現,能夠在Demo中查看,再次具體不作詳解了。
private static OkHttpClient sClient;
private static WebSocket sWebSocket;
public static WebSocket startRequest(String url,WebSocketListener socketListener) {
if (sClient == null) {
sClient = new OkHttpClient();
}
if (sWebSocket == null) {
Request request = new Request.Builder().url(url).build();
sWebSocket = sClient.newWebSocket(request, socketListener);
}
return sWebSocket;
}
複製代碼
webSocketClient = WebSocketClient.startRequest("ws://192.168.25.188:8080/websocket", new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
super.onFailure(webSocket, t, response);
}
});
複製代碼
固然能夠根據須要繼承WebSocketListener而後本身實現,好比把onMessage(WebSocket webSocket, String text) 和onMessage(WebSocket webSocket, ByteString bytes) 方法合併同一回調onMessage(String msg);
本文的demo地址: github.com/poecook/And…