最近一段時間在學習Netty網絡框架,又趁着計算機網絡的課程設計,決定以Netty爲核心,以WebSocket爲應用層通訊協議作一個互聯網聊天系統,總體而言就像微信網頁版同樣,但考慮到這個聊天系統的功能很是多,所以只打算實現核心的聊天功能,包括單發、羣發、文件發送,而後把項目與Spring整合作成開源、可拓展的方式,給你們參考、討論、使用,歡迎你們的指點。javascript
關於Nettyhtml
Netty 是一個利用 Java 的高級網絡的能力,隱藏其背後的複雜性而提供一個易於使用的 API 的客戶端/服務器框架。
這裏借用《Essential Netty In Action》的一句話來簡單介紹Netty,詳細的可參考閱讀該書的電子版前端
關於WebSocket通訊協議
簡單說一下WebSocket通訊協議,WebSocket是爲了解決HTTP協議中通訊只能由客戶端發起這個弊端而出現的,WebSocket基於HTTP5協議,借用HTTP進行握手、升級,可以作到輕量的、高效的、雙向的在客戶端和服務端之間傳輸文本數據。詳細可參考如下文章:java
涉及技術:git
整個通訊系統以Tomcat做爲核心服務器運行,其下另開一個線程運行Netty WebSocket服務器,Tomcat服務器主要處理客戶登陸、我的信息管理等的HTTP類型請求(一般的業務類型),端口爲8080,Netty WebSockt服務器主要處理用戶消息通訊的WebSocket類型請求,端口爲3333。用戶經過瀏覽器登陸後,瀏覽器會維持一個Session對象(有效時間30分鐘)來保持登陸狀態,Tomcat服務器會返回用戶的我的信息,同時記錄在線用戶,根據用戶id創建一條WebSocket鏈接並保存在後端以便進行實時通訊。當一個用戶向另外一用戶發起通訊,服務器會根據消息內容中的對話方用戶id,找到保存的WebSocket鏈接,經過該鏈接發送消息,對方就可以收到即時收到消息。當用戶註銷或退出時,釋放WebSocket鏈接,清空Session對象中的登陸狀態。github
事實上Netty也能夠用做一個HTTP服務器,而這裏使用Spring MVC處理HTTP請求是出於熟悉的緣故,也比較接近傳統開發的方式。web
系統採用B/S(Browser/Server),即瀏覽器/服務器的結構,主要事務邏輯在服務器端(Server)實現。借鑑MVC模式的思想,從上至下具體又分爲視圖層(View)、控制層(Controller)、業務層(Service)、模型層(Model)、數據訪問層(Data Access)算法
項目後端結構:
數據庫
項目前端結構:
編程
系統只包括兩個模塊:登陸模塊和聊天管理模塊。
到這裏,可能會有人出現疑問了,首先是前面的涉及技術中沒有ORM框架(Mybatis或Hibernate),這裏又沒有實現好友管理的功能,那用戶如何肯定本身的好友併發送信息呢?
其實,這裏我在dao層的實現裏並無鏈接數據庫,而是用硬編碼的方式固定好系統的用戶以及用戶的好友表、羣組表,之因此這麼作是由於當初考慮到這個課程設計要是鏈接數據庫就太麻煩了光是已有的模塊(包括先後端)就差很少3k行代碼了,時間上十分划不來,因而就用了硬編碼的方式偷懶,後面會再說明系統用戶的狀況。
因爲本系統涉及多個用戶狀態,有必要進行說明,下面給出本系統的用戶狀態轉換圖。
系統聊天界面以下:
不得不說的是,當關閉Tomcat服務器時,也要釋放Netty相關資源,不然會形成內存泄漏,關閉方法以下面的close()
,若是隻是使用shutdownGracefully()
方法的話,關閉時會報內存泄露Memory Leak異常(但IDE可能來不及輸出到控制檯)
/** * 描述: Netty WebSocket服務器 * 使用獨立的線程啓動 * @author Kanarien * @version 1.0 * @date 2018年5月18日 上午11:22:51 */ public class WebSocketServer implements Runnable{ private final Logger logger = LoggerFactory.getLogger(WebSocketServer.class); @Autowired private EventLoopGroup bossGroup; @Autowired private EventLoopGroup workerGroup; @Autowired private ServerBootstrap serverBootstrap; private int port; private ChannelHandler childChannelHandler; private ChannelFuture serverChannelFuture; // 構造方法少了會報錯 public WebSocketServer() {} @Override public void run() { build(); } /** * 描述:啓動Netty Websocket服務器 */ public void build() { try { long begin = System.currentTimeMillis(); serverBootstrap.group(bossGroup, workerGroup) //boss輔助客戶端的tcp鏈接請求 worker負責與客戶端以前的讀寫操做 .channel(NioServerSocketChannel.class) //配置客戶端的channel類型 .option(ChannelOption.SO_BACKLOG, 1024) //配置TCP參數,握手字符串長度設置 .option(ChannelOption.TCP_NODELAY, true) //TCP_NODELAY算法,儘量發送大塊數據,減小充斥的小塊數據 .childOption(ChannelOption.SO_KEEPALIVE, true)//開啓心跳包活機制,就是客戶端、服務端創建鏈接處於ESTABLISHED狀態,超過2小時沒有交流,機制會被啓動 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))//配置固定長度接收緩存區分配器 .childHandler(childChannelHandler); //綁定I/O事件的處理類,WebSocketChildChannelHandler中定義 long end = System.currentTimeMillis(); logger.info("Netty Websocket服務器啓動完成,耗時 " + (end - begin) + " ms,已綁定端口 " + port + " 阻塞式等候客戶端鏈接"); serverChannelFuture = serverBootstrap.bind(port).sync(); } catch (Exception e) { logger.info(e.getMessage()); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); e.printStackTrace(); } } /** * 描述:關閉Netty Websocket服務器,主要是釋放鏈接 * 鏈接包括:服務器鏈接serverChannel, * 客戶端TCP處理鏈接bossGroup, * 客戶端I/O操做鏈接workerGroup * * 若只使用 * bossGroupFuture = bossGroup.shutdownGracefully(); * workerGroupFuture = workerGroup.shutdownGracefully(); * 會形成內存泄漏。 */ public void close(){ serverChannelFuture.channel().close(); Future<?> bossGroupFuture = bossGroup.shutdownGracefully(); Future<?> workerGroupFuture = workerGroup.shutdownGracefully(); try { bossGroupFuture.await(); workerGroupFuture.await(); } catch (InterruptedException ignore) { ignore.printStackTrace(); } } public ChannelHandler getChildChannelHandler() { return childChannelHandler; } public void setChildChannelHandler(ChannelHandler childChannelHandler) { this.childChannelHandler = childChannelHandler; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }
獨立出處理器鏈類,方便修改與注入,省得混在一塊兒顯得混亂。
@Component public class WebSocketChildChannelHandler extends ChannelInitializer<SocketChannel>{ @Resource(name = "webSocketServerHandler") private ChannelHandler webSocketServerHandler; @Resource(name = "httpRequestHandler") private ChannelHandler httpRequestHandler; @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); // HTTP編碼解碼器 ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); // 把HTTP頭、HTTP體拼成完整的HTTP請求 ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); // 分塊,方便大文件傳輸,不過實質上都是短的文本數據 ch.pipeline().addLast("http-handler", httpRequestHandler); ch.pipeline().addLast("websocket-handler",webSocketServerHandler); } }
值得一提的是,當在處理鏈中使用Spring注入處理器的bean的時候,若是處理器類不使用@Sharable標籤的話,會出現錯誤。若是不使用Spring注入bean的方式,那麼應該new一個新的處理器對象,如ch.pipeline().addLast("http-handler", new HttpRequestHandler())
。另外,判斷HTTP請求仍是WebSocket請求的方式稍微不太優雅,但我按照《Essential Netty in Action》中的方法去試,結果有問題的,只好用下面的if語句判斷。
@Component @Sharable public class HttpRequestHandler extends SimpleChannelInboundHandler<Object> { /** * 描述:讀取完鏈接的消息後,對消息進行處理。 * 這裏僅處理HTTP請求,WebSocket請求交給下一個處理器。 */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { ctx.fireChannelRead(((WebSocketFrame) msg).retain()); } } /** * 描述:處理Http請求,主要是完成HTTP協議到Websocket協議的升級 * @param ctx * @param req */ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { if (!req.decoderResult().isSuccess()) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws:/" + ctx.channel() + "/websocket", null, false); WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); Constant.webSocketHandshakerMap.put(ctx.channel().id().asLongText(), handshaker);// 保存握手類到全局變量,方便之後關閉鏈接 if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回應答給客戶端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 若是是非Keep-Alive,關閉鏈接 boolean keepAlive = HttpUtil.isKeepAlive(req); ChannelFuture f = ctx.channel().writeAndFlush(res); if (!keepAlive) { f.addListener(ChannelFutureListener.CLOSE); } } /** * 描述:異常處理,關閉channel */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
考慮到規範性與可維護性,switch語句中的case常量應該放在常量類中聲明比較好。另外說下羣發的邏輯(屬於業務邏輯,這裏沒有給出代碼),羣發也就是在一個羣中發言,後端會掃描羣中在線的用戶,逐一發送信息。用戶的WebSocket鏈接(即ChannelHandlerContext對象),會保存在全局變量onlineUserMap中,以用戶id做鍵,方便操做鏈接。關於表情的發送邏輯,與單發邏輯相同,不一樣的是發送內容爲對應的img標籤字符串。
@Component @Sharable public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServerHandler.class); @Autowired private ChatService chatService; /** * 描述:讀取完鏈接的消息後,對消息進行處理。 * 這裏主要是處理WebSocket請求 */ @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception { handlerWebSocketFrame(ctx, msg); } /** * 描述:處理WebSocketFrame * @param ctx * @param frame * @throws Exception */ private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { // 關閉請求 if (frame instanceof CloseWebSocketFrame) { WebSocketServerHandshaker handshaker = Constant.webSocketHandshakerMap.get(ctx.channel().id().asLongText()); if (handshaker == null) { sendErrorMessage(ctx, "不存在的客戶端鏈接!"); } else { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); } return; } // ping請求 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 只支持文本格式,不支持二進制消息 if (!(frame instanceof TextWebSocketFrame)) { sendErrorMessage(ctx, "僅支持文本(Text)格式,不支持二進制消息"); } // 客服端發送過來的消息 String request = ((TextWebSocketFrame)frame).text(); LOGGER.info("服務端收到新信息:" + request); JSONObject param = null; try { param = JSONObject.parseObject(request); } catch (Exception e) { sendErrorMessage(ctx, "JSON字符串轉換出錯!"); e.printStackTrace(); } if (param == null) { sendErrorMessage(ctx, "參數爲空!"); return; } String type = (String) param.get("type"); switch (type) { case "REGISTER": chatService.register(param, ctx); break; case "SINGLE_SENDING": chatService.singleSend(param, ctx); break; case "GROUP_SENDING": chatService.groupSend(param, ctx); break; case "FILE_MSG_SINGLE_SENDING": chatService.FileMsgSingleSend(param, ctx); break; case "FILE_MSG_GROUP_SENDING": chatService.FileMsgGroupSend(param, ctx); break; default: chatService.typeError(ctx); break; } } /** * 描述:客戶端斷開鏈接 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { chatService.remove(ctx); } /** * 異常處理:關閉channel */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } private void sendErrorMessage(ChannelHandlerContext ctx, String errorMsg) { String responseJson = new ResponseJson() .error(errorMsg) .toString(); ctx.channel().writeAndFlush(new TextWebSocketFrame(responseJson)); } }
文件上傳的思路是先把文件上傳到服務器上,再返回給對方文件的url以及相關信息。文件上傳並無使用WebSocket鏈接來上傳,而是直接使用HTTP鏈接結合Spring的接口簡單的實現了,可自行修改實現使用WebSocket鏈接來上傳文件。另外,文件保存的路徑是http://localhost:8080/WebSocket/UploadFile
,若是Tomcat端口不是8080或者想改變存儲路徑的話,請注意修改常量。
@Service public class FileUploadServiceImpl implements FileUploadService{ private final static String SERVER_URL_PREFIX = "http://localhost:8080/WebSocket/"; private final static String FILE_STORE_PATH = "UploadFile"; @Override public ResponseJson upload(MultipartFile file, HttpServletRequest request) { // 重命名文件,防止重名 String filename = getRandomUUID(); String suffix = ""; String originalFilename = file.getOriginalFilename(); String fileSize = FileUtils.getFormatSize(file.getSize()); // 截取文件的後綴名 if (originalFilename.contains(".")) { suffix = originalFilename.substring(originalFilename.lastIndexOf(".")); } filename = filename + suffix; String prefix = request.getSession().getServletContext().getRealPath("/") + FILE_STORE_PATH; System.out.println("存儲路徑爲:" + prefix + "\\" + filename); Path filePath = Paths.get(prefix, filename); try { Files.copy(file.getInputStream(), filePath); } catch (IOException e) { e.printStackTrace(); return new ResponseJson().error("文件上傳發生錯誤!"); } return new ResponseJson().success() .setData("originalFilename", originalFilename) .setData("fileSize", fileSize) .setData("fileUrl", SERVER_URL_PREFIX + FILE_STORE_PATH + "\\" + filename); } private String getRandomUUID() { return UUID.randomUUID().toString().replace("-", ""); } }
下面只展現核心的websocket鏈接代碼。補充說明:考慮到瀏覽器的兼容性,經測試,建議使用谷歌瀏覽器和360瀏覽器(極速模式),火狐瀏覽器和IE11的界面有點問題。也說明一下,UI設計的排版是從網上找的,由修改了下,本身嘔心瀝血的用JS補充了動態功能,包括:
詳細可見chatroom.js文件
<script type="text/javascript"> var userId; var socket; var sentMessageMap; setUserInfo(); setSentMessageMap(); if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://localhost:3333"); socket.onmessage = function(event){ var json = JSON.parse(event.data); if (json.status == 200) { var type = json.data.type; console.log("收到一條新信息,類型爲:" + type); switch(type) { case "REGISTER": ws.registerReceive(); break; case "SINGLE_SENDING": ws.singleReceive(json.data); break; case "GROUP_SENDING": ws.groupReceive(json.data); break; case "FILE_MSG_SINGLE_SENDING": ws.fileMsgSingleRecieve(json.data); break; case "FILE_MSG_GROUP_SENDING": ws.fileMsgGroupRecieve(json.data); break; default: console.log("不正確的類型!"); } } else { alert(json.msg); console.log(json.msg); } }; // 鏈接成功1秒後,將用戶信息註冊到服務器在線用戶表 socket.onopen = setTimeout(function(event){ console.log("WebSocket已成功鏈接!"); ws.register(); }, 1000) socket.onclose = function(event){ console.log("WebSocket已關閉..."); }; } else { alert("您的瀏覽器不支持WebSocket!"); } </script>
登陸入口爲:http://localhost:8080/WebSocket/login 或 http://localhost:8080/WebSocket/
當前系統用戶固定爲9個,羣組1個,包括9人用戶。
······
爲了使項目具備更好的可拓展性、可讀性、可維護性,不少地方都使用Spring的Bean進行注入,也運用了面向接口編程的思想,當運用上Mybatis等ORM框架的時候,直接修改dao層實現便可,無需改動其餘地方,同時也在適當的地方加上了註釋。
最後附上git源碼地址:Kanarien GitHub
Copyright © 2018, GDUT CSCW back-end Kanarien, All Rights Reserved