文章首發於公衆號【大數據學徒】,感興趣請搜索 dashujuxuetu 或者文末掃碼關注。html
由於 Apache Zeppelin 中使用了 Jetty,因此我須要補充一下 Jetty 的相關背景,以便閱讀 Zeppelin 的相關代碼,其中 WebSocket 是很重要的內容,所以經過閱讀學習 Jetty 的 WebSocket API 文檔 總結出了本文的內容。java
內容提要:web
WebSocket 是一種應用層傳輸協議,能夠在單個 TCP 鏈接上進行全雙工通訊,容許服務端主動向客戶端推送數據,協議開銷小,協議頭是
ws
(明文) 或wss
(加密),端口和 HTTP 同樣使用 80 或 443,經過 HTTP/1.1 的Upgrade
頭部來創建鏈接。更多 WebSocket 的介紹請參考 WebSocket 維基百科。apache
一個 WebSocket 鏈接有四種基本的狀態:設計模式
Jetty 提供了比 Java 原生 WebSocket 更強大的 API,包含了一些服務端和客戶端通用的 API,是基於 WebSocket 消息的事件驅動型 API。每一個 WebSocket 鏈接會接收到四種事件:api
org.eclipse.jetty.websocket.api.Session
對象,這是一個重要的對象,通常經過它來和對端通訊。WebSocket 的最基本用法是對一個 POJO 類使用 WebSocket 註解,好比:websocket
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
// WebSocket 註解
@WebSocket(maxTextMessageSize = 64 * 1024)
public class AnnotatedEchoSocket {
// onWebSocketMessage註解,用於標記收到消息時的處理函數
@OnWebSocketMessage
public void onText(Session session, String message) {
if (session.isOpen())
{
System.out.printf("Echoing back message [%s]%n", message);
// 將收到的消息原樣返回
session.getRemote().sendString(message, null);
}
}
}
複製代碼
可用的註解有如下幾種:session
註解 | 做用 |
---|---|
@WebSocket | 必需的類級別註解,標記 POJO 類爲 WebSocket |
@OnWebSocketConnect | 可選的方法級別註解,標記處理 Connect 事件的函數 |
@OnWebSocketClose | 可選的方法級別註解,標記處理 Close 事件的函數 |
@OnWebSocketMessage | 可選的方法級別註解,標記處理 Message 事件的函數 |
@OnWebSocketError | 可選的方法級別註解,標記處理 Error 事件的函數 |
@OnWebSocketFrame | 可選的方法級別註解,標記處理 Frame 事件的函數 |
其中 @WebSocket
註解的必須是是公有的非抽象類,其它註解所註解的必須是公有的非抽象的、且返回值爲空的方法。eclipse
Jetty 也提供了WebSocketListener
接口,有上面的 5 個方法,以及 WebSocketAdapter
類,實現了這個接口,不過都是空實現,但提供瞭如下很是實用的方法來檢查 Session 的狀態。socket
服務端有兩個重要的抽象類和接口,WebSocketServlet
和 WebSocketCreator
,使用 WebSocketServlet
的例子:
@WebServlet(name = "MyEcho WebSocket Servlet", urlPatterns = {"/echo"})
public class MyEchoServlet extends WebSocketServlet {
@Override
public void configure(WebSocketServletFactory factory) {
// set a 10 second timeout
factory.getPolicy().setIdleTimeout(10000);
// register MyEchoSocket as the WebSocket to create on Upgrade
factory.register(MyEchoSocket.class);
}
}
複製代碼
類上有一個註解,代表這個 Servlet
會在 /echo
這個 URI 上收到 HTTP Upgrade 請求時建立,這個類有一個 configure()
方法,參數是 WebSocketServletFactory
類型,在這個方法內須要作的事,除了設置一些參數之外,須要使用 register()
方法註冊用來處理請求的 Servlet
,在這個例子中註冊了 MyEchoServlet
自身,也能夠不這麼作,另一種作法是,調用 WebSocketServletFactory
的 setCreator()
方法,不直接註冊 Servlet
,而是讓設置的 WebSocketCreator
來建立合適的 Servlet
,看一個 WebSocketCreator
的例子:
public class MyAdvancedEchoCreator implements WebSocketCreator {
private MyBinaryEchoSocket binaryEcho;
private MyEchoSocket textEcho;
public MyAdvancedEchoCreator() {
// 建立好能夠複用的 Socket
this.binaryEcho = new MyBinaryEchoSocket();
this.textEcho = new MyEchoSocket();
}
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
for (String subprotocol : req.getSubProtocols()) {
if ("binary".equals(subprotocol)) {
resp.setAcceptedSubProtocol(subprotocol);
return binaryEcho;
}
if ("text".equals(subprotocol)) {
resp.setAcceptedSubProtocol(subprotocol);
return textEcho;
}
}
// 其它狀況就返回 null
return null;
}
}
複製代碼
WebSocketCreator
接口須要實現 createWebSocket(ServletUpgradeRequest, ServletUpgradeResponse)
方法,這裏 MyAdvancedEchoCreator
實現類,根據子協議是文本類型仍是二進制類型生成了不一樣的 WebSocket
來處理請求。
看起來這裏使用了抽象工廠模式,但我對設計模式所知有限,就不展開講了。
客戶端須要的依賴是:
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<version>${project.version}</version>
</dependency>
複製代碼
直接看一個客戶端例子,很清晰:
public class SimpleEchoClient {
public static void main(String[] args) {
// 指定服務端 URI
String destUri = "ws://echo.websocket.org";
if (args.length > 0) {
destUri = args[0];
}
// 建立 WebSocketClient 對象
WebSocketClient client = new WebSocketClient();
SimpleEchoSocket socket = new SimpleEchoSocket();
try {
client.start();
URI echoUri = new URI(destUri);
ClientUpgradeRequest request = new ClientUpgradeRequest();
// 發送 HTTP Upgrade 請求,創建鏈接
client.connect(socket, echoUri, request);
System.out.printf("Connecting to : %s%n", echoUri);
// 等待關閉
socket.awaitClose(5, TimeUnit.SECONDS);
}
catch (Throwable t) {...}
finally {...}
}
}
複製代碼
歡迎交流討論,吐槽建議,分享收藏。