Netty整合WebSocket

WebSocket協議是基於 javascript

TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通訊——容許服務器主動發送信息給客戶端 ,它是先進行一次Http的鏈接,鏈接成功後轉爲TCP鏈接。html

如今咱們來作一個WebSocket HelloWorld,意思爲接收一條WebSocket客戶端發送過來的消息,而後刷到全部鏈接上的客戶端,你們均可以看到這條消息。前端

@Slf4j
@AllArgsConstructor
public class WebSocketServer {
    private int port;

    public void run() throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //web基於http協議的解碼器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //對大數據流的支持
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            //對http message進行聚合,聚合成FullHttpRequest或FullHttpResponse
                            ch.pipeline().addLast(new HttpObjectAggregator(1024 * 64));
                            //websocket服務器處理對協議,用於指定給客戶端鏈接訪問的路徑
                            //該handler會幫你處理一些繁重的複雜的事
                            //會幫你處理握手動做:handshaking(close,ping,pong) ping + pong = 心跳
                            //對於websocket來說,都是以frames進行傳輸的,不一樣的數據類型對應的frames也不一樣
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
                            //添加咱們的自定義channel處理器
                            ch.pipeline().addLast(new WebSocketHandler());
                        }
                    });
            log.info("服務器啓動中");
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }

    }
}

channel處理器java

/**
 * TextWebSocketFrame: 在netty中,用於爲websocket專門處理文本的對象,frame是消息的載體
 */
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    //用於記錄和管理全部客戶端的channel
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //獲取客戶端傳輸過來的消息
        String content = msg.text();
        log.info("接收到的數據" + content);
        clients.stream().forEach(channel -> channel.writeAndFlush(
                new TextWebSocketFrame("[服務器在]" + LocalDateTime.now() + "接收到消息:" + content)
        ));
        //下面的方法與上面一致
//        clients.writeAndFlush(new TextWebSocketFrame("[服務器在]" + LocalDateTime.now() +
//                "接收到消息:" + content));
    }

    /**
     * 當客戶端鏈接服務端以後(打開鏈接)
     * 獲取客戶端的channel,而且放到ChannelGroup中去進行管理
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //當觸發handlerRemoved,ChannelGroup會自動移除對應的客戶端的channel
        //因此下面這條語句可不寫
//        clients.remove(ctx.channel());
        log.info("客戶端斷開,channel對應的長id爲:" + ctx.channel().id().asLongText());
        log.info("客戶端斷開,channel對應的短id爲:" + ctx.channel().id().asShortText());
    }
}

服務器啓動jquery

@SpringBootApplication
public class WebsocketApplication {

   public static void main(String[] args) throws InterruptedException {
      SpringApplication.run(WebsocketApplication.class, args);
      new WebSocketServer(10101).run();
   }

}

客戶端寫一個html文件,代碼以下web

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8" />
        <title></title>
    </head>
    <body>
        <div>發送消息:</div>
        <input type="text" id="msgContent" />
        <input type="button" value="發送" onclick="CHAT.chat()" />
        
        <div>接受消息:</div>
        <div id="receiveMsg" style="background-color: gainsboro;"></div>
        
        <script type="application/javascript">
            window.CHAT = {
                socket: null,
                init: function() {
                    if (window.WebSocket) {
                        CHAT.socket = new WebSocket("ws://127.0.0.1:10101/ws");
                        CHAT.socket.onopen = function() {
                            console.log("鏈接創建成功");
                        },
                        CHAT.socket.onclose = function() {
                            console.log("鏈接關閉");
                        },
                        CHAT.socket.onerror = function() {
                            console.log("發生錯誤");
                        },
                        CHAT.socket.onmessage = function(e) {
                            console.log("接收到消息" + e.data);
                            var receiveMsg = document.getElementById("receiveMsg");
                            var html = receiveMsg.innerHTML;
                            receiveMsg.innerHTML = html + "<br/>" + e.data;
                        }
                    }else {
                        alert("瀏覽器不支持WebSocket協議...");
                    }
                },
                chat: function() {
                    var msg = document.getElementById("msgContent");
                    CHAT.socket.send(msg.value);
                }
            }
            CHAT.init();
        </script>
    </body>
</html>

打開瀏覽器以下ajax

此時咱們輸入一段話,發送數據庫

咱們能夠看到全部打開的鏈接,都會收到相同的消息。json

此時服務器的日誌bootstrap

2019-10-16 22:31:16.066  INFO 1376 --- [ntLoopGroup-3-5] c.g.w.netty.websocket.WebSocketHandler   : 接收到的數據helloworld
2019-10-16 22:31:33.131  INFO 1376 --- [ntLoopGroup-3-5] c.g.w.netty.websocket.WebSocketHandler   : 接收到的數據你好,中國

若是咱們關閉一個頁面,服務器的日誌爲

2019-10-16 22:36:39.390  INFO 1376 --- [ntLoopGroup-3-7] c.g.w.netty.websocket.WebSocketHandler   : 客戶端斷開,channel對應的長id爲:acde48fffe001122-00000560-00000007-9ca78ac7e2b907ab-0b15dfcb
2019-10-16 22:36:39.390  INFO 1376 --- [ntLoopGroup-3-7] c.g.w.netty.websocket.WebSocketHandler   : 客戶端斷開,channel對應的短id爲:0b15dfcb

如今咱們來作一個點對點的聊天功能。

首先WebSocketServer添加空閒處理器

@Slf4j
@AllArgsConstructor
public class WebSocketServer {
    private int port;

    public void run() throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //web基於http協議的解碼器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //對大數據流的支持
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            //對http message進行聚合,聚合成FullHttpRequest或FullHttpResponse
                            ch.pipeline().addLast(new HttpObjectAggregator(1024 * 64));
                            //websocket服務器處理對協議,用於指定給客戶端鏈接訪問的路徑
                            //該handler會幫你處理一些繁重的複雜的事
                            //會幫你處理握手動做:handshaking(close,ping,pong) ping + pong = 心跳
                            //對於websocket來說,都是以frames進行傳輸的,不一樣的數據類型對應的frames也不一樣
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
                            //針對客戶端,若是1分鐘時沒有向服務端發送讀寫心跳(All),則主動斷開
                            //若是是讀空閒或者是寫空閒,不處理
                            ch.pipeline().addLast(new IdleStateHandler(20,40,60));
                            //添加咱們的自定義channel處理器
                            ch.pipeline().addLast(new WebSocketHandler());
                        }
                    });
            log.info("服務器啓動中");
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }

    }
}

而後咱們須要增長兩個實體類——消息類和聊天類

首先添加一個消息的接口

public interface Chat {
    /**
     * 保存消息到數據庫
     * @param chatMsg
     */
    public void save(Chat chatMsg);

    /**
     * 簽名消息
     * @param msgIdList
     */
    public void updateMsgSigned(List<Long> msgIdList);

    /**
     * 查找未讀未簽名消息
     * @param acceptUserId
     * @return
     */
    public List<ChatMsg> findUnReadChat(Long acceptUserId);
}

添加一個消息的類來實現該接口

@ToString
@Data
public class ChatMsg implements Serializable,Chat{
    @JSONField(serializeUsing = ToStringSerializer.class)
    private Long senderId; //發送者的用戶id
    @JSONField(serializeUsing = ToStringSerializer.class)
    private Long receiverId; //接收者的用戶id
    private String msg;
    @JSONField(serializeUsing = ToStringSerializer.class)
    private Long msgId; //用於消息的簽收
    private MsgSignFlagEnum signed; //消息簽收狀態
    private LocalDateTime createDate;

    @Override
    @Transactional
    public void save(Chat chatMsg) {
        ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class);
        IdService idService = SpringBootUtil.getBean(IdService.class);
        ((ChatMsg)chatMsg).setMsgId(idService.genId());
        ((ChatMsg)chatMsg).setCreateDate(LocalDateTime.now());
        chatDao.saveChat((ChatMsg) chatMsg);
    }

    @Transactional
    @Override
    public void updateMsgSigned(List<Long> msgIdList) {
        ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class);
        chatDao.updateMsgSigned(msgIdList);
    }

    @Transactional
    @Override
    public List<ChatMsg> findUnReadChat(Long acceptUserId) {
        ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class);
        return chatDao.findUnReadMsg(acceptUserId);
    }
}

其中MsgSignFlagEnum爲一個枚舉類型,代碼以下

public enum MsgSignFlagEnum implements LocalisableAll {
    unsign(0,"未簽收"),
    signed(1,"已簽收");

    public final int type;
    public final String value;

    private MsgSignFlagEnum(int type,String value) {
        this.type = type;
        this.value = value;
    }

    @Override
    public int getType() {
        return type;
    }

    @Override
    public String getValue() {
        return value;
    }
}

創建一個消息的工廠

public class ChatMsgFactory {
    public static Chat createChatMsgService() {
        return new ChatMsg();
    }
}

而後是聊天類

@Data
public class DataContent implements Serializable {
    private Integer action; //動做類型
    private ChatMsg chatMsg; //用戶的聊天內容entity
    private String extand; //擴展字段
}

咱們根據動做類型來定義一個枚舉

public enum MsgActionEnum {
    CONNECT(1,"第一次(或重連)初始化鏈接"),
    CHAT(2,"聊天消息"),
    SIGNED(3,"消息簽收"),
    KEEPALIVE(4,"客戶端保持心跳");

    public final Integer type;
    public final String content;

    private MsgActionEnum(Integer type,String content) {
        this.type = type;
        this.content = content;
    }
}

在寫WebSocketHandler以前,咱們須要將用戶Id跟Channel作一個綁定

/**
 * 用戶id和channel的關聯關係的處理
 */
@Slf4j
public class UserChannelRel {
    private static Map<Long,Channel> manager = new HashMap<>();

    public static void put(Long senderId,Channel channel) {
        manager.put(senderId,channel);
    }

    public static Channel get(Long senderId) {
        return manager.get(senderId);
    }

    public static void output() {
        manager.entrySet().stream().forEach(entry ->
            log.info("UserId:" + entry.getKey() + ",ChannelId:" +
                    entry.getValue().id().asLongText())
        );
    }
}

最後WebSocketHandler改寫以下

/**
 * TextWebSocketFrame: 在netty中,用於爲websocket專門處理文本的對象,frame是消息的載體
 */
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    //用於記錄和管理全部客戶端的channel
    private static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private Chat chatMsgService = ChatMsgFactory.createChatMsgService();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //獲取客戶端傳輸過來的消息
        String content = msg.text();
        log.info("content爲:" + content);
        Channel currentChannel = ctx.channel();
        //解析傳輸過來的消息轉成聊天對象
        DataContent dataContent = JSONObject.parseObject(content,DataContent.class);
        //獲取聊天對象的動做
        Integer action = dataContent.getAction();

        if (action == MsgActionEnum.CONNECT.type) {
            //當websocket第一次open的時候,初始化channel,把用的channel和userId關聯起來
            Long senderId = dataContent.getChatMsg().getSenderId();
            UserChannelRel.put(senderId,currentChannel);
            //測試
            users.stream().forEach(channel -> log.info(channel.id().asLongText()));
            UserChannelRel.output();
        }else if (action == MsgActionEnum.CHAT.type) {
            //聊天類型的消息,把聊天記錄保存到數據庫,同時標記消息的簽收狀態[未簽收]
            ChatMsg chatMsg = dataContent.getChatMsg();
            String msgText = chatMsg.getMsg();
            Long receiverId = chatMsg.getReceiverId();
            Long senderId = chatMsg.getSenderId();
            //保存數據庫
            chatMsgService.save(chatMsg);
            Channel receiverChannel = UserChannelRel.get(receiverId);
            if (receiverChannel == null) {
                //接收方離線狀態,此處無需處理
            }else {
                Channel findChannel = users.find(receiverChannel.id());
                if (findChannel != null) {
                    findChannel.writeAndFlush(new TextWebSocketFrame(
                            JSONObject.toJSONString(chatMsg)
                    ));
                }else {
                    //接收方離線,此處無需處理
                }
            }
        }else if (action == MsgActionEnum.SIGNED.type) {
            //簽收消息類型,針對具體的消息進行簽收,修改數據庫中對應消息的簽收狀態[已簽收]
            //擴展字段在signed類型的消息中,表明須要去簽收的消息id,逗號間隔
            String msgIdsStr = dataContent.getExtand();
            log.info("extand爲:" + msgIdsStr);
            String[] msgIds = msgIdsStr.split(",");
            List<Long> msgIdList = new ArrayList<>();
            for (String mId : msgIds) {
                if (!StringUtils.isEmpty(mId)) {
                    msgIdList.add(Long.valueOf(mId));
                }
            }
            log.info(msgIdList.toString());
            if (!CollectionUtils.isEmpty(msgIdList)) {
                //批量簽收
                chatMsgService.updateMsgSigned(msgIdList);
            }
        }else if (action == MsgActionEnum.KEEPALIVE.type) {
            //心跳類型的消息
            log.info("收到來自channel爲[" + currentChannel + "]的心跳包");
        }
    }

    /**
     * 當客戶端鏈接服務端以後(打開鏈接)
     * 獲取客戶端的channel,而且放到ChannelGroup中去進行管理
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        users.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //當觸發handlerRemoved,ChannelGroup會自動移除對應的客戶端的channel
        //因此下面這條語句可不寫
//        clients.remove(ctx.channel());
        log.info("客戶端斷開,channel對應的長id爲:" + ctx.channel().id().asLongText());
        log.info("客戶端斷開,channel對應的短id爲:" + ctx.channel().id().asShortText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.channel().close();
        users.remove(ctx.channel());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //IdleStateEvent是一個用戶事件,包含讀空閒/寫空閒/讀寫空閒
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("進入讀空閒");
            }else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("進入寫空閒");
            }else if (event.state() == IdleState.ALL_IDLE) {
                log.info("channel關閉前,用戶數量爲:" + users.size());
                //關閉無用的channel,以防資源浪費
                ctx.channel().close();
                log.info("channel關閉後,用戶數量爲:" + users.size());
            }

        }
    }
}

此處咱們能夠看到有兩個地方註釋了//接收方離線狀態,此處無需處理。因此咱們須要在用戶上線的時候獲取未簽名的消息,只須要經過Controller從數據庫獲取就好

@RestController
public class UnReadMessageController {
   private Chat chatService = ChatMsgFactory.createChatMsgService();

   @SuppressWarnings("unchecked")
   @PostMapping("/notification-anon/getunreadmeg")
   public Result<List<ChatMsg>> getUnReadMessage(@RequestParam("receiverid") Long receiverId) {
      return Result.success(chatService.findUnReadChat(receiverId));
   }
}

因爲咱們登陸用的是OAuth框架,在user模塊添加

@PostMapping("/users-anon/finduser")
public LoginAppUser findUserByName(@RequestParam("username") String username) {
    return appUserService.findByUsername(username);
}

修改網關登陸,登陸後能夠獲取用戶的信息,首先添加User模塊的feign

@FeignClient("user-center")
public interface UserClient {
    @PostMapping("/users-anon/finduser")
    LoginAppUser findUserByName(@RequestParam("username") String username);
}
/**
 * 登錄、刷新token、退出
 *
 * @author 關鍵
 */
@Slf4j
@RestController
public class TokenController {

    @Autowired
    private Oauth2Client oauth2Client;
    @Autowired
    private UserClient userClient;

    /**
     * 系統登錄<br>
     * 根據用戶名登陸<br>
     * 採用oauth2密碼模式獲取access_token和refresh_token
     *
     * @param username
     * @param password
     * @return
     */
    @SuppressWarnings("unchecked")
    @PostMapping("/sys/login")
    public Result<Map> login(@RequestParam String username,@RequestParam String password) {
        Map<String, String> parameters = new HashMap<>();
        parameters.put(OAuth2Utils.GRANT_TYPE, "password");
        parameters.put(OAuth2Utils.CLIENT_ID, "system");
        parameters.put("client_secret", "system");
        parameters.put(OAuth2Utils.SCOPE, "app");
//    parameters.put("username", username);
        // 爲了支持多類型登陸,這裏在username後拼裝上登陸類型
        parameters.put("username", username + "|" + CredentialType.USERNAME.name());
        parameters.put("password", password);

        Map<String, Object> tokenInfo = oauth2Client.postAccessToken(parameters);
        AppUser user = userClient.findUserByName(username);
        tokenInfo.put("user",user);
        saveLoginLog(username, "用戶名密碼登錄");

        return Result.success(tokenInfo);
    }

    @Autowired
    private LogClient logClient;

    /**
     * 登錄日誌
     *
     * @param username
     */
    private void saveLoginLog(String username, String remark) {
        log.info("{}登錄", username);
        // 異步
        CompletableFuture.runAsync(() -> {
            try {
                Log log = Log.builder().username(username).module(LogModule.LOGIN).remark(remark).createTime(new Date())
                        .build();
                logClient.save(log);
            } catch (Exception e) {
                // do nothing
            }

        });
    }

    /**
     * 系統刷新refresh_token
     *
     * @param refresh_token
     * @return
     */
    @PostMapping("/sys/refresh_token")
    public Map<String, Object> refresh_token(String refresh_token) {
        Map<String, String> parameters = new HashMap<>();
        parameters.put(OAuth2Utils.GRANT_TYPE, "refresh_token");
        parameters.put(OAuth2Utils.CLIENT_ID, "system");
        parameters.put("client_secret", "system");
        parameters.put(OAuth2Utils.SCOPE, "app");
        parameters.put("refresh_token", refresh_token);

        return oauth2Client.postAccessToken(parameters);
    }

    /**
     * 退出
     *
     * @param access_token
     */
    @GetMapping("/sys/logout")
    public void logout(String access_token, @RequestHeader(required = false, value = "Authorization") String token) {
        if (StringUtils.isBlank(access_token)) {
            if (StringUtils.isNoneBlank(token)) {
                access_token = token.substring(OAuth2AccessToken.BEARER_TYPE.length() + 1);
            }
        }
        oauth2Client.removeToken(access_token);
    }
}

作如上修改後,咱們登陸後能夠得到用戶的Id,以上省略Dao,Mapper

這個時候服務端就所有完成了,咱們再來看一下客戶端

先創建一個全局變量app.js

window.app = {
	
	/**
	 * 和後端的枚舉對應
	 */
	CONNECT: 1,			//"第一次(或重連)初始化鏈接"),
	CHAT: 2,			//"聊天消息"),
	SIGNED: 3,			//"消息簽收"),
	KEEPALIVE: 4,		//"客戶端保持心跳");

	/**
	* 和後端的ChatMsg聊天模型對象保持一致
	* @param {Object} senderId
	* @param {Object} receiverId
	* @param {Object} msg
	* @param {Object} msgId
	*/
	ChatMsg: function(senderId,receiverId,msg,msgId) {
		this.senderId = senderId;
		this.receiverId = receiverId;
		this.msg = msg;
		this.msgId = msgId;
	},

	/**
	* 構建消息模型對象
	* @param {Object} action
	* @param {Object} chatMsg
	* @param {Object} extand
	*/
	DataContent: function(action,chatMsg,extand) {
		this.action = action;
		this.chatMsg = chatMsg;
		this.extand = extand;
	}
}

因爲這裏只是聊天樣例,沒有處理登陸功能,咱們如今就以1,2來表明兩個用戶的id

用戶id爲1的代碼,文件名index.html

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		<div>發送消息:</div>
		<input type="text" id="msgContent" />
		<input type="button" value="發送" onclick="CHAT.chat(1,2,msgContent.value,app.CHAT,null)" />
		
		<div>接受消息:</div>
		<div id="receiveMsg" style="background-color: gainsboro;"></div>
		<script type="application/javascript" src="js/app.js"></script>
		<script type="application/javascript" src="js/mui.min.js"></script>
		<script type="application/javascript">
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws");
						CHAT.socket.onopen = function() {
							console.log("鏈接創建成功");
							
							CHAT.chat(1,null,null,app.CONNECT,null);
							//每次鏈接的時候獲取未讀消息
							fetchUnReadMsg();
							//定時發送心跳,30秒一次
							setInterval("CHAT.keepalive()",30000);
						},
						CHAT.socket.onclose = function() {
							console.log("鏈接關閉");
						},
						CHAT.socket.onerror = function() {
							console.log("發生錯誤");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接收到消息" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							var chatMsg = JSON.parse(e.data);
							receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg;
							//消息簽收
							CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId));
						}
					}else {
						alert("瀏覽器不支持WebSocket協議...");
					}
				},
				chat: function(senderId,receiverId,msg,action,extand) {
					var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null);
					var dataContent = new app.DataContent(action,chatMsg,extand);
					CHAT.socket.send(JSON.stringify(dataContent));
				},
				keepalive: function() {
					CHAT.chat(1,null,null,app.KEEPALIVE,null);
					fetchUnReadMsg();
				}
			}
			CHAT.init();
			function fetchUnReadMsg() {
				mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=1',{
					data:{},
					dataType:'json',//服務器返回json格式數據
					type:'post',//HTTP請求類型
					timeout:10000,//超時時間設置爲10秒;
					success:function(data){
						if (data.code == 200) {
							var contactList = data.data;
							var ids = "";
							console.log(JSON.stringify(contactList));
							var receiveMsg = document.getElementById("receiveMsg");
							for (var i = 0;i < contactList.length;i++) {
								var msgObj = contactList[i];
								var html = receiveMsg.innerHTML;
								receiveMsg.innerHTML = html + "<br/>" + msgObj.msg;
								ids = ids + msgObj.msgId + ",";
							}
							//批量簽收未讀消息
							CHAT.chat(1,null,null,app.SIGNED,ids);
						}
					}
				});
			}
		</script>
	</body>
</html>

用戶id爲2的代碼,文件名receive.html

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		<div>發送消息:</div>
		<input type="text" id="msgContent" />
		<input type="button" value="發送" onclick="CHAT.chat(2,1,msgContent.value,app.CHAT,null)" />
		
		<div>接受消息:</div>
		<div id="receiveMsg" style="background-color: gainsboro;"></div>
		<script type="application/javascript" src="js/app.js"></script>
		<script type="application/javascript" src="js/mui.min.js"></script>
		<script type="application/javascript">
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws");
						CHAT.socket.onopen = function() {
							console.log("鏈接創建成功");
							
							CHAT.chat(2,null,null,app.CONNECT,null);
							//每次鏈接的時候獲取未讀消息
							fetchUnReadMsg();
							//定時發送心跳,30秒一次
							setInterval("CHAT.keepalive()",30000);
						},
						CHAT.socket.onclose = function() {
							console.log("鏈接關閉");
						},
						CHAT.socket.onerror = function() {
							console.log("發生錯誤");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接收到消息" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							var chatMsg = JSON.parse(e.data);
							receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg;
							//消息簽收
							CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId));
						}
					}else {
						alert("瀏覽器不支持WebSocket協議...");
					}
				},
				chat: function(senderId,receiverId,msg,action,extand) {
					var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null);
					var dataContent = new app.DataContent(action,chatMsg,extand);
					CHAT.socket.send(JSON.stringify(dataContent));
				},
				keepalive: function() {
					CHAT.chat(2,null,null,app.KEEPALIVE,null);
					fetchUnReadMsg();
				}
			}
			CHAT.init();
			function fetchUnReadMsg() {
				mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{
					data:{},
					dataType:'json',//服務器返回json格式數據
					type:'post',//HTTP請求類型
					timeout:10000,//超時時間設置爲10秒;
					success:function(data){
						if (data.code == 200) {
							var contactList = data.data;
							var ids = "";
							console.log(JSON.stringify(contactList));
							var receiveMsg = document.getElementById("receiveMsg");
							for (var i = 0;i < contactList.length;i++) {
								var msgObj = contactList[i];
								var html = receiveMsg.innerHTML;
								receiveMsg.innerHTML = html + "<br/>" + msgObj.msg;
								ids = ids + msgObj.msgId + ",";
							}
							//批量簽收未讀消息
							CHAT.chat(2,null,null,app.SIGNED,ids);
						}
					}
				});
		</script>
	</body>
</html>

這裏都是經過Json來作數據的序列化的,以後會修改成ProtoBuffer來處理。

如今來增長髮圖片的功能。首先咱們須要搭建好一個fastdfs服務器,具體能夠參考分佈式文件系統FastDFS安裝配置 以及Springboot 2.0+FastDFS開發配置

這裏只放入咱們須要的Controller

@RestController
public class FastDFSController {
    @Autowired
    private FastDFSClientWrapper dfsClient;

    @PostMapping("/files-anon/fdfsupload")
    @SuppressWarnings("unchecked")
    public Result<String> upload(@RequestParam("file") MultipartFile file) throws Exception {
        String imgUrl = dfsClient.uploadFile(file);
        return Result.success(imgUrl);
    }
}

而後修改咱們的用戶id爲1的前端代碼

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		<div>發送消息:</div>
		<input type="text" id="msgContent" />
		<input type="button" value="發送" onclick="CHAT.chat(1,2,msgContent.value,app.CHAT,null)" />
		<input type="file" id="file" name="file">
		<input type="button" id="button" value="發送圖片" >
		<div>接受消息:</div>
		<div id="receiveMsg" style="background-color: gainsboro;"></div>
		<script type="application/javascript" src="js/app.js"></script>
		<script type="application/javascript" src="js/mui.min.js"></script>
		<script type="application/javascript" src="js/jquery-3.3.1.min.js"></script>
		<script type="application/javascript">
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws");
						CHAT.socket.onopen = function() {
							console.log("鏈接創建成功");
							
							CHAT.chat(1,null,null,app.CONNECT,null);
							//每次鏈接的時候獲取未讀消息
							fetchUnReadMsg();
							//定時發送心跳,30秒一次
							setInterval("CHAT.keepalive()",30000);
						},
						CHAT.socket.onclose = function() {
							console.log("鏈接關閉");
						},
						CHAT.socket.onerror = function() {
							console.log("發生錯誤");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接收到消息" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							var chatMsg = JSON.parse(e.data);
							receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg;
							//消息簽收
							CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId));
						}
					}else {
						alert("瀏覽器不支持WebSocket協議...");
					}
				},
				chat: function(senderId,receiverId,msg,action,extand) {
					var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null);
					var dataContent = new app.DataContent(action,chatMsg,extand);
					CHAT.socket.send(JSON.stringify(dataContent));
				},
				keepalive: function() {
					CHAT.chat(1,null,null,app.KEEPALIVE,null);
					fetchUnReadMsg();
				}
			}
			CHAT.init();
			function fetchUnReadMsg() {
				mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{
					data:{},
					dataType:'json',//服務器返回json格式數據
					type:'post',//HTTP請求類型
					timeout:10000,//超時時間設置爲10秒;
					success:function(data){
						if (data.code == 200) {
							var contactList = data.data;
							var ids = "";
							console.log(JSON.stringify(contactList));
							var receiveMsg = document.getElementById("receiveMsg");
							for (var i = 0;i < contactList.length;i++) {
								var msgObj = contactList[i];
								var html = receiveMsg.innerHTML;
								receiveMsg.innerHTML = html + "<br/>" + msgObj.msg;
								ids = ids + msgObj.msgId + ",";
							}
							//批量簽收未讀消息
							CHAT.chat(2,null,null,app.SIGNED,ids);
						}
					}
				});
			}
			$(function () {
			        $("#button").click(function () {
			            var form = new FormData();
			            form.append("file", document.getElementById("file").files[0]);
			             $.ajax({
			                 url: "http://xxx.xxx.xxx.xxx:8010/files-anon/fdfsupload",        //後臺url
			                 data: form,
			                 cache: false,
			                 async: false,
			                 type: "POST",                   //類型,POST或者GET
			                 dataType: 'json',              //數據返回類型,能夠是xml、json等
			                 processData: false,
			                 contentType: false,
			                 success: function (data) {      //成功,回調函數
			                     if (data.code == 200) {
			                     	console.log(data.data);
									CHAT.chat(1,2,"<img src='" + data.data + "' height='200' width='200' />",app.CHAT,null);
			                     }   
			                 }
			             });
			
			        })
			
			    })
		</script>
	</body>
</html>

意思即爲使用ajax訪問咱們的上傳文件Controller,獲取上傳成功後的url,將該url拼接到<img />的標籤中,當稱普通聊天信息發送出去便可。

用戶id爲2的前端代碼相同。

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		<div>發送消息:</div>
		<input type="text" id="msgContent" />
		<input type="button" value="發送" onclick="CHAT.chat(2,1,msgContent.value,app.CHAT,null)" />
		<input type="file" id="file" name="file">
		<input type="button" id="button" value="發送圖片" >
		<div>接受消息:</div>
		<div id="receiveMsg" style="background-color: gainsboro;"></div>
		<script type="application/javascript" src="js/app.js"></script>
		<script type="application/javascript" src="js/mui.min.js"></script>
		<script type="application/javascript" src="js/jquery-3.3.1.min.js"></script>
		<script type="application/javascript">
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws");
						CHAT.socket.onopen = function() {
							console.log("鏈接創建成功");
							
							CHAT.chat(2,null,null,app.CONNECT,null);
							//每次鏈接的時候獲取未讀消息
							fetchUnReadMsg();
							//定時發送心跳,30秒一次
							setInterval("CHAT.keepalive()",30000);
						},
						CHAT.socket.onclose = function() {
							console.log("鏈接關閉");
						},
						CHAT.socket.onerror = function() {
							console.log("發生錯誤");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接收到消息" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							var chatMsg = JSON.parse(e.data);
							receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg;
							//消息簽收
							CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId));
						}
					}else {
						alert("瀏覽器不支持WebSocket協議...");
					}
				},
				chat: function(senderId,receiverId,msg,action,extand) {
					var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null);
					var dataContent = new app.DataContent(action,chatMsg,extand);
					CHAT.socket.send(JSON.stringify(dataContent));
				},
				keepalive: function() {
					CHAT.chat(2,null,null,app.KEEPALIVE,null);
					fetchUnReadMsg();
				}
			}
			CHAT.init();
			function fetchUnReadMsg() {
				mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{
					data:{},
					dataType:'json',//服務器返回json格式數據
					type:'post',//HTTP請求類型
					timeout:10000,//超時時間設置爲10秒;
					success:function(data){
						if (data.code == 200) {
							var contactList = data.data;
							var ids = "";
							console.log(JSON.stringify(contactList));
							var receiveMsg = document.getElementById("receiveMsg");
							for (var i = 0;i < contactList.length;i++) {
								var msgObj = contactList[i];
								var html = receiveMsg.innerHTML;
								receiveMsg.innerHTML = html + "<br/>" + msgObj.msg;
								ids = ids + msgObj.msgId + ",";
							}
							//批量簽收未讀消息
							CHAT.chat(2,null,null,app.SIGNED,ids);
						}
					}
				});
			}
			$(function () {
			        $("#button").click(function () {
			            var form = new FormData();
			            form.append("file", document.getElementById("file").files[0]);
			             $.ajax({
			                 url: "http://xxx.xxx.xxx.xxx:8010/files-anon/fdfsupload",        //後臺url
			                 data: form,
			                 cache: false,
			                 async: false,
			                 type: "POST",                   //類型,POST或者GET
			                 dataType: 'json',              //數據返回類型,能夠是xml、json等
			                 processData: false,
			                 contentType: false,
			                 success: function (data) {      //成功,回調函數
			                     if (data.code == 200) {
			                     	console.log(data.data);
									CHAT.chat(2,1,"<img src='" + data.data + "' height='200' width='200' />",app.CHAT,null);
			                     }   
			                 }
			             });
			
			        })
			
			    })
		</script>
	</body>
</html>
相關文章
相關標籤/搜索