Spring整合Netty、WebSocket的互聯網聊天系統

0.前言

最近一段時間在學習Netty網絡框架,又趁着計算機網絡的課程設計,決定以Netty爲核心,以WebSocket爲應用層通訊協議作一個互聯網聊天系統,總體而言就像微信網頁版同樣,但考慮到這個聊天系統的功能很是多,所以只打算實現核心的聊天功能,包括單發、羣發、文件發送,而後把項目與Spring整合作成開源、可拓展的方式,給你們參考、討論、使用,歡迎你們的指點。javascript

關於Nettyhtml

Netty 是一個利用 Java 的高級網絡的能力,隱藏其背後的複雜性而提供一個易於使用的 API 的客戶端/服務器框架。

這裏借用《Essential Netty In Action》的一句話來簡單介紹Netty,詳細的可參考閱讀該書的電子版前端

關於WebSocket通訊協議
簡單說一下WebSocket通訊協議,WebSocket是爲了解決HTTP協議中通訊只能由客戶端發起這個弊端而出現的,WebSocket基於HTTP5協議,借用HTTP進行握手、升級,可以作到輕量的、高效的、雙向的在客戶端和服務端之間傳輸文本數據。詳細可參考如下文章:java

1. 技術準備

  • 操做平臺:Windows 7, 64bit 8G
  • IDE:MyEclipse 2016
  • JDK版本:1.8.0_121
  • 瀏覽器:谷歌瀏覽器、360瀏覽器(極速模式)(涉及網頁前端設計,後端開發表示很苦悶)
  • 涉及技術:git

    • Netty 4
    • WebSocket + HTTP
    • Spring MVC + Spring
    • JQuery
    • Bootstrap 3 + Bootstrap-fileinput
    • Maven 3.5
    • Tomcat 8.0

2. 總體說明

2.1 設計思想

整個通訊系統以Tomcat做爲核心服務器運行,其下另開一個線程運行Netty WebSocket服務器,Tomcat服務器主要處理客戶登陸、我的信息管理等的HTTP類型請求(一般的業務類型),端口爲8080,Netty WebSockt服務器主要處理用戶消息通訊的WebSocket類型請求,端口爲3333。用戶經過瀏覽器登陸後,瀏覽器會維持一個Session對象(有效時間30分鐘)來保持登陸狀態,Tomcat服務器會返回用戶的我的信息,同時記錄在線用戶,根據用戶id創建一條WebSocket鏈接並保存在後端以便進行實時通訊。當一個用戶向另外一用戶發起通訊,服務器會根據消息內容中的對話方用戶id,找到保存的WebSocket鏈接,經過該鏈接發送消息,對方就可以收到即時收到消息。當用戶註銷或退出時,釋放WebSocket鏈接,清空Session對象中的登陸狀態。github

事實上Netty也能夠用做一個HTTP服務器,而這裏使用Spring MVC處理HTTP請求是出於熟悉的緣故,也比較接近傳統開發的方式。web

2.2 系統結構

系統採用B/S(Browser/Server),即瀏覽器/服務器的結構,主要事務邏輯在服務器端(Server)實現。借鑑MVC模式的思想,從上至下具體又分爲視圖層(View)、控制層(Controller)、業務層(Service)、模型層(Model)、數據訪問層(Data Access)算法

2.3 項目結構

項目後端結構:
項目結構(後端)數據庫

項目前端結構:
項目結構(前端)編程

2.4 系統功能模塊

系統只包括兩個模塊:登陸模塊和聊天管理模塊。

  • 登陸模塊:既然做爲一個系統,那麼登陸的角色認證是必不可少的,這裏使用簡單、傳統的Session方式維持登陸狀態,固然也有對應的註銷功能,但這裏的註銷除了清空Session對象,還要釋放WebSocket鏈接,不然形成內存泄露。
  • 聊天管理模塊:系統的核心模塊,這部分主要使用Netty框架實現,功能包括信息、文件的單條和多條發送,也支持表情發送。
  • 其餘模塊:如好友管理模塊、聊天記錄管理、註冊模塊等,我並無實現,有興趣的話能夠自行實現,與傳統的開發方式相似。

系統功能圖

到這裏,可能會有人出現疑問了,首先是前面的涉及技術中沒有ORM框架(Mybatis或Hibernate),這裏又沒有實現好友管理的功能,那用戶如何肯定本身的好友併發送信息呢?
其實,這裏我在dao層的實現裏並無鏈接數據庫,而是用硬編碼的方式固定好系統的用戶以及用戶的好友表、羣組表,之因此這麼作是由於當初考慮到這個課程設計要是鏈接數據庫就太麻煩了光是已有的模塊(包括先後端)就差很少3k行代碼了,時間上十分划不來,因而就用了硬編碼的方式偷懶,後面會再說明系統用戶的狀況。

2.5 用戶狀態轉換圖

因爲本系統涉及多個用戶狀態,有必要進行說明,下面給出本系統的用戶狀態轉換圖。
狀態轉換圖

2.6 系統界面

系統聊天界面以下:
聊天界面

聊天界面

3. 核心編碼

3.1 Netty WebSocket服務端

3.1.1 Netty服務器啓動與關閉

不得不說的是,當關閉Tomcat服務器時,也要釋放Netty相關資源,不然會形成內存泄漏,關閉方法以下面的close(),若是隻是使用shutdownGracefully()方法的話,關閉時會報內存泄露Memory Leak異常(但IDE可能來不及輸出到控制檯)

/**
 * 描述: Netty WebSocket服務器
 *      使用獨立的線程啓動
 * @author Kanarien
 * @version 1.0
 * @date 2018年5月18日 上午11:22:51
 */
public class WebSocketServer implements Runnable{

    private final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);

    @Autowired
    private EventLoopGroup bossGroup;
    @Autowired
    private EventLoopGroup workerGroup;
    @Autowired
    private ServerBootstrap serverBootstrap;

    private int port;
    private ChannelHandler childChannelHandler;
    private ChannelFuture serverChannelFuture;

  // 構造方法少了會報錯
    public WebSocketServer() {}

    @Override
    public void run() {
        build();
    }

    /**
     * 描述:啓動Netty Websocket服務器
     */
    public void build() {
        try {
      long begin = System.currentTimeMillis();
    serverBootstrap.group(bossGroup, workerGroup) //boss輔助客戶端的tcp鏈接請求  worker負責與客戶端以前的讀寫操做
             .channel(NioServerSocketChannel.class) //配置客戶端的channel類型
             .option(ChannelOption.SO_BACKLOG, 1024) //配置TCP參數,握手字符串長度設置
             .option(ChannelOption.TCP_NODELAY, true) //TCP_NODELAY算法,儘量發送大塊數據,減小充斥的小塊數據
             .childOption(ChannelOption.SO_KEEPALIVE, true)//開啓心跳包活機制,就是客戶端、服務端創建鏈接處於ESTABLISHED狀態,超過2小時沒有交流,機制會被啓動
             .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))//配置固定長度接收緩存區分配器
             .childHandler(childChannelHandler); //綁定I/O事件的處理類,WebSocketChildChannelHandler中定義
    long end = System.currentTimeMillis();
            logger.info("Netty Websocket服務器啓動完成,耗時 " + (end - begin) + " ms,已綁定端口 " + port + " 阻塞式等候客戶端鏈接");

            serverChannelFuture = serverBootstrap.bind(port).sync();
        } catch (Exception e) {
            logger.info(e.getMessage());
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            e.printStackTrace();
        }

    }

  /**
     * 描述:關閉Netty Websocket服務器,主要是釋放鏈接
     *     鏈接包括:服務器鏈接serverChannel,
     *     客戶端TCP處理鏈接bossGroup,
     *     客戶端I/O操做鏈接workerGroup
     *
     *     若只使用
     *         bossGroupFuture = bossGroup.shutdownGracefully();
     *         workerGroupFuture = workerGroup.shutdownGracefully();
     *     會形成內存泄漏。
     */
    public void close(){
        serverChannelFuture.channel().close();
        Future<?> bossGroupFuture = bossGroup.shutdownGracefully();
        Future<?> workerGroupFuture = workerGroup.shutdownGracefully();

        try {
            bossGroupFuture.await();
            workerGroupFuture.await();
        } catch (InterruptedException ignore) {
            ignore.printStackTrace();
        }
    }

    public ChannelHandler getChildChannelHandler() {
        return childChannelHandler;
    }

    public void setChildChannelHandler(ChannelHandler childChannelHandler) {
        this.childChannelHandler = childChannelHandler;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

}

3.1.2 Netty服務器處理鏈

獨立出處理器鏈類,方便修改與注入,省得混在一塊兒顯得混亂。

@Component
public class WebSocketChildChannelHandler extends ChannelInitializer<SocketChannel>{

    @Resource(name = "webSocketServerHandler")
    private ChannelHandler webSocketServerHandler;

    @Resource(name = "httpRequestHandler")
    private ChannelHandler httpRequestHandler;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast("http-codec", new HttpServerCodec()); // HTTP編碼解碼器
        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); // 把HTTP頭、HTTP體拼成完整的HTTP請求
        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); // 分塊,方便大文件傳輸,不過實質上都是短的文本數據
        ch.pipeline().addLast("http-handler", httpRequestHandler);
        ch.pipeline().addLast("websocket-handler",webSocketServerHandler);
    }

}

3.1.3 Netty服務器HTTP請求處理器

值得一提的是,當在處理鏈中使用Spring注入處理器的bean的時候,若是處理器類不使用@Sharable標籤的話,會出現錯誤。若是不使用Spring注入bean的方式,那麼應該new一個新的處理器對象,如ch.pipeline().addLast("http-handler", new HttpRequestHandler())。另外,判斷HTTP請求仍是WebSocket請求的方式稍微不太優雅,但我按照《Essential Netty in Action》中的方法去試,結果有問題的,只好用下面的if語句判斷。

@Component
@Sharable
public class HttpRequestHandler extends SimpleChannelInboundHandler<Object> {

    /**
     * 描述:讀取完鏈接的消息後,對消息進行處理。
     * 這裏僅處理HTTP請求,WebSocket請求交給下一個處理器。
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            ctx.fireChannelRead(((WebSocketFrame) msg).retain());
        }
    }

    /**
     * 描述:處理Http請求,主要是完成HTTP協議到Websocket協議的升級
     * @param ctx
     * @param req
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        if (!req.decoderResult().isSuccess()) {
            sendHttpResponse(ctx, req,
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }

        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                "ws:/" + ctx.channel() + "/websocket", null, false);
        WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
        Constant.webSocketHandshakerMap.put(ctx.channel().id().asLongText(), handshaker);// 保存握手類到全局變量,方便之後關閉鏈接

        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回應答給客戶端
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 若是是非Keep-Alive,關閉鏈接
        boolean keepAlive = HttpUtil.isKeepAlive(req);
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!keepAlive) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    /**
     * 描述:異常處理,關閉channel
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.1.4 Netty服務器WebSocket請求處理器

考慮到規範性與可維護性,switch語句中的case常量應該放在常量類中聲明比較好。另外說下羣發的邏輯(屬於業務邏輯,這裏沒有給出代碼),羣發也就是在一個羣中發言,後端會掃描羣中在線的用戶,逐一發送信息。用戶的WebSocket鏈接(即ChannelHandlerContext對象),會保存在全局變量onlineUserMap中,以用戶id做鍵,方便操做鏈接。關於表情的發送邏輯,與單發邏輯相同,不一樣的是發送內容爲對應的img標籤字符串。

@Component
@Sharable
public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServerHandler.class);

    @Autowired
    private ChatService chatService;

    /**
     * 描述:讀取完鏈接的消息後,對消息進行處理。
     *      這裏主要是處理WebSocket請求
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
        handlerWebSocketFrame(ctx, msg);
    }

    /**
     * 描述:處理WebSocketFrame
     * @param ctx
     * @param frame
     * @throws Exception
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 關閉請求
        if (frame instanceof CloseWebSocketFrame) {
            WebSocketServerHandshaker handshaker =
                    Constant.webSocketHandshakerMap.get(ctx.channel().id().asLongText());
            if (handshaker == null) {
                sendErrorMessage(ctx, "不存在的客戶端鏈接!");
            } else {
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            }
            return;
        }
        // ping請求
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 只支持文本格式,不支持二進制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            sendErrorMessage(ctx, "僅支持文本(Text)格式,不支持二進制消息");
        }

        // 客服端發送過來的消息
        String request = ((TextWebSocketFrame)frame).text();
        LOGGER.info("服務端收到新信息:" + request);
        JSONObject param = null;
        try {
            param = JSONObject.parseObject(request);
        } catch (Exception e) {
            sendErrorMessage(ctx, "JSON字符串轉換出錯!");
            e.printStackTrace();
        }
        if (param == null) {
            sendErrorMessage(ctx, "參數爲空!");
            return;
        }

        String type = (String) param.get("type");
        switch (type) {
            case "REGISTER":
                chatService.register(param, ctx);
                break;
            case "SINGLE_SENDING":
                chatService.singleSend(param, ctx);
                break;
            case "GROUP_SENDING":
                chatService.groupSend(param, ctx);
                break;
            case "FILE_MSG_SINGLE_SENDING":
                chatService.FileMsgSingleSend(param, ctx);
                break;
            case "FILE_MSG_GROUP_SENDING":
                chatService.FileMsgGroupSend(param, ctx);
                break;
            default:
                chatService.typeError(ctx);
                break;
        }
    }

    /**
     * 描述:客戶端斷開鏈接
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        chatService.remove(ctx);
    }

    /**
     * 異常處理:關閉channel
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }


    private void sendErrorMessage(ChannelHandlerContext ctx, String errorMsg) {
        String responseJson = new ResponseJson()
                .error(errorMsg)
                .toString();
        ctx.channel().writeAndFlush(new TextWebSocketFrame(responseJson));
    }

}

3.1.5 文件上傳

文件上傳的思路是先把文件上傳到服務器上,再返回給對方文件的url以及相關信息。文件上傳並無使用WebSocket鏈接來上傳,而是直接使用HTTP鏈接結合Spring的接口簡單的實現了,可自行修改實現使用WebSocket鏈接來上傳文件。另外,文件保存的路徑是http://localhost:8080/WebSocket/UploadFile,若是Tomcat端口不是8080或者想改變存儲路徑的話,請注意修改常量。

@Service
public class FileUploadServiceImpl implements FileUploadService{

    private final static String SERVER_URL_PREFIX = "http://localhost:8080/WebSocket/";
    private final static String FILE_STORE_PATH = "UploadFile";

    @Override
    public ResponseJson upload(MultipartFile file, HttpServletRequest request) {
        // 重命名文件,防止重名
        String filename = getRandomUUID();
        String suffix = "";
        String originalFilename = file.getOriginalFilename();
        String fileSize = FileUtils.getFormatSize(file.getSize());
        // 截取文件的後綴名
        if (originalFilename.contains(".")) {
            suffix = originalFilename.substring(originalFilename.lastIndexOf("."));
        }
        filename = filename + suffix;
        String prefix = request.getSession().getServletContext().getRealPath("/") + FILE_STORE_PATH;
        System.out.println("存儲路徑爲:" + prefix + "\\" + filename);
        Path filePath = Paths.get(prefix, filename);
        try {
            Files.copy(file.getInputStream(), filePath);
        } catch (IOException e) {
            e.printStackTrace();
            return new ResponseJson().error("文件上傳發生錯誤!");
        }
        return new ResponseJson().success()
                .setData("originalFilename", originalFilename)
                .setData("fileSize", fileSize)
                .setData("fileUrl", SERVER_URL_PREFIX + FILE_STORE_PATH + "\\" + filename);
    }

    private String getRandomUUID() {
        return UUID.randomUUID().toString().replace("-", "");
    }

}

3.2 WebSocket客戶端

3.2.1 瀏覽器客戶端代碼

下面只展現核心的websocket鏈接代碼。補充說明:考慮到瀏覽器的兼容性,經測試,建議使用谷歌瀏覽器和360瀏覽器(極速模式),火狐瀏覽器和IE11的界面有點問題。也說明一下,UI設計的排版是從網上找的,由修改了下,本身嘔心瀝血的用JS補充了動態功能,包括:

  • 新消息紅標籤提醒
  • 新消息置頂
  • 客戶端保存已發聊天記錄
  • 用戶己方聊天信息靠左,接收信息靠右
  • 聊天信息框的寬度動態計算

詳細可見chatroom.js文件

<script type="text/javascript">
    var userId;
    var socket;
    var sentMessageMap;

    setUserInfo();
    setSentMessageMap();

    if(!window.WebSocket){
        window.WebSocket = window.MozWebSocket;
    }
    if(window.WebSocket){
        socket = new WebSocket("ws://localhost:3333");
        socket.onmessage = function(event){
            var json = JSON.parse(event.data);
            if (json.status == 200) {
                var type = json.data.type;
                console.log("收到一條新信息,類型爲:" + type);
                switch(type) {
                    case "REGISTER":
                        ws.registerReceive();
                        break;
                    case "SINGLE_SENDING":
                        ws.singleReceive(json.data);
                        break;
                    case "GROUP_SENDING":
                        ws.groupReceive(json.data);
                        break;
                    case "FILE_MSG_SINGLE_SENDING":
                        ws.fileMsgSingleRecieve(json.data);
                        break;
                    case "FILE_MSG_GROUP_SENDING":
                        ws.fileMsgGroupRecieve(json.data);
                        break;
                    default:
                        console.log("不正確的類型!");
                }
            } else {
                alert(json.msg);
                console.log(json.msg);
            }
        };

        // 鏈接成功1秒後,將用戶信息註冊到服務器在線用戶表
        socket.onopen = setTimeout(function(event){
              console.log("WebSocket已成功鏈接!");
              ws.register();
        }, 1000)

        socket.onclose = function(event){
              console.log("WebSocket已關閉...");
        };
    } else {
          alert("您的瀏覽器不支持WebSocket!");
    }

</script>

4. 效果及操做演示

4.1 登陸操做

登陸入口爲:http://localhost:8080/WebSocket/login 或 http://localhost:8080/WebSocket/
當前系統用戶固定爲9個,羣組1個,包括9人用戶。

  • 用戶1 用戶名:Member001 密碼:001
  • 用戶2 用戶名:Member002 密碼:002

······

  • 用戶9 用戶名:Member009 密碼:009

登陸入口

登陸界面

4.2 聊天演示

聊天演示

4.3 文件上傳演示

文件上傳演示

5.補充

爲了使項目具備更好的可拓展性、可讀性、可維護性,不少地方都使用Spring的Bean進行注入,也運用了面向接口編程的思想,當運用上Mybatis等ORM框架的時候,直接修改dao層實現便可,無需改動其餘地方,同時也在適當的地方加上了註釋。

最後附上git源碼地址:Kanarien GitHub


Copyright © 2018, GDUT CSCW back-end Kanarien, All Rights Reserved
相關文章
相關標籤/搜索