WebSocket協議是基於 javascript
TCP的一種新的網絡協議。它實現了瀏覽器與服務器全雙工(full-duplex)通訊——容許服務器主動發送信息給客戶端 ,它是先進行一次Http的鏈接,鏈接成功後轉爲TCP鏈接。html
如今咱們來作一個WebSocket HelloWorld,意思爲接收一條WebSocket客戶端發送過來的消息,而後刷到全部鏈接上的客戶端,你們均可以看到這條消息。前端
@Slf4j @AllArgsConstructor public class WebSocketServer { private int port; public void run() throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss,worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childOption(ChannelOption.TCP_NODELAY,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //web基於http協議的解碼器 ch.pipeline().addLast(new HttpServerCodec()); //對大數據流的支持 ch.pipeline().addLast(new ChunkedWriteHandler()); //對http message進行聚合,聚合成FullHttpRequest或FullHttpResponse ch.pipeline().addLast(new HttpObjectAggregator(1024 * 64)); //websocket服務器處理對協議,用於指定給客戶端鏈接訪問的路徑 //該handler會幫你處理一些繁重的複雜的事 //會幫你處理握手動做:handshaking(close,ping,pong) ping + pong = 心跳 //對於websocket來說,都是以frames進行傳輸的,不一樣的數據類型對應的frames也不一樣 ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws")); //添加咱們的自定義channel處理器 ch.pipeline().addLast(new WebSocketHandler()); } }); log.info("服務器啓動中"); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
channel處理器java
/** * TextWebSocketFrame: 在netty中,用於爲websocket專門處理文本的對象,frame是消息的載體 */ @Slf4j public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //用於記錄和管理全部客戶端的channel private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //獲取客戶端傳輸過來的消息 String content = msg.text(); log.info("接收到的數據" + content); clients.stream().forEach(channel -> channel.writeAndFlush( new TextWebSocketFrame("[服務器在]" + LocalDateTime.now() + "接收到消息:" + content) )); //下面的方法與上面一致 // clients.writeAndFlush(new TextWebSocketFrame("[服務器在]" + LocalDateTime.now() + // "接收到消息:" + content)); } /** * 當客戶端鏈接服務端以後(打開鏈接) * 獲取客戶端的channel,而且放到ChannelGroup中去進行管理 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { clients.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //當觸發handlerRemoved,ChannelGroup會自動移除對應的客戶端的channel //因此下面這條語句可不寫 // clients.remove(ctx.channel()); log.info("客戶端斷開,channel對應的長id爲:" + ctx.channel().id().asLongText()); log.info("客戶端斷開,channel對應的短id爲:" + ctx.channel().id().asShortText()); } }
服務器啓動jquery
@SpringBootApplication public class WebsocketApplication { public static void main(String[] args) throws InterruptedException { SpringApplication.run(WebsocketApplication.class, args); new WebSocketServer(10101).run(); } }
客戶端寫一個html文件,代碼以下web
<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title></title> </head> <body> <div>發送消息:</div> <input type="text" id="msgContent" /> <input type="button" value="發送" onclick="CHAT.chat()" /> <div>接受消息:</div> <div id="receiveMsg" style="background-color: gainsboro;"></div> <script type="application/javascript"> window.CHAT = { socket: null, init: function() { if (window.WebSocket) { CHAT.socket = new WebSocket("ws://127.0.0.1:10101/ws"); CHAT.socket.onopen = function() { console.log("鏈接創建成功"); }, CHAT.socket.onclose = function() { console.log("鏈接關閉"); }, CHAT.socket.onerror = function() { console.log("發生錯誤"); }, CHAT.socket.onmessage = function(e) { console.log("接收到消息" + e.data); var receiveMsg = document.getElementById("receiveMsg"); var html = receiveMsg.innerHTML; receiveMsg.innerHTML = html + "<br/>" + e.data; } }else { alert("瀏覽器不支持WebSocket協議..."); } }, chat: function() { var msg = document.getElementById("msgContent"); CHAT.socket.send(msg.value); } } CHAT.init(); </script> </body> </html>
打開瀏覽器以下ajax
此時咱們輸入一段話,發送數據庫
咱們能夠看到全部打開的鏈接,都會收到相同的消息。json
此時服務器的日誌bootstrap
2019-10-16 22:31:16.066 INFO 1376 --- [ntLoopGroup-3-5] c.g.w.netty.websocket.WebSocketHandler : 接收到的數據helloworld
2019-10-16 22:31:33.131 INFO 1376 --- [ntLoopGroup-3-5] c.g.w.netty.websocket.WebSocketHandler : 接收到的數據你好,中國
若是咱們關閉一個頁面,服務器的日誌爲
2019-10-16 22:36:39.390 INFO 1376 --- [ntLoopGroup-3-7] c.g.w.netty.websocket.WebSocketHandler : 客戶端斷開,channel對應的長id爲:acde48fffe001122-00000560-00000007-9ca78ac7e2b907ab-0b15dfcb
2019-10-16 22:36:39.390 INFO 1376 --- [ntLoopGroup-3-7] c.g.w.netty.websocket.WebSocketHandler : 客戶端斷開,channel對應的短id爲:0b15dfcb
如今咱們來作一個點對點的聊天功能。
首先WebSocketServer添加空閒處理器
@Slf4j @AllArgsConstructor public class WebSocketServer { private int port; public void run() throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss,worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childOption(ChannelOption.TCP_NODELAY,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //web基於http協議的解碼器 ch.pipeline().addLast(new HttpServerCodec()); //對大數據流的支持 ch.pipeline().addLast(new ChunkedWriteHandler()); //對http message進行聚合,聚合成FullHttpRequest或FullHttpResponse ch.pipeline().addLast(new HttpObjectAggregator(1024 * 64)); //websocket服務器處理對協議,用於指定給客戶端鏈接訪問的路徑 //該handler會幫你處理一些繁重的複雜的事 //會幫你處理握手動做:handshaking(close,ping,pong) ping + pong = 心跳 //對於websocket來說,都是以frames進行傳輸的,不一樣的數據類型對應的frames也不一樣 ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws")); //針對客戶端,若是1分鐘時沒有向服務端發送讀寫心跳(All),則主動斷開 //若是是讀空閒或者是寫空閒,不處理 ch.pipeline().addLast(new IdleStateHandler(20,40,60)); //添加咱們的自定義channel處理器 ch.pipeline().addLast(new WebSocketHandler()); } }); log.info("服務器啓動中"); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
而後咱們須要增長兩個實體類——消息類和聊天類
首先添加一個消息的接口
public interface Chat { /** * 保存消息到數據庫 * @param chatMsg */ public void save(Chat chatMsg); /** * 簽名消息 * @param msgIdList */ public void updateMsgSigned(List<Long> msgIdList); /** * 查找未讀未簽名消息 * @param acceptUserId * @return */ public List<ChatMsg> findUnReadChat(Long acceptUserId); }
添加一個消息的類來實現該接口
@ToString @Data public class ChatMsg implements Serializable,Chat{ @JSONField(serializeUsing = ToStringSerializer.class) private Long senderId; //發送者的用戶id @JSONField(serializeUsing = ToStringSerializer.class) private Long receiverId; //接收者的用戶id private String msg; @JSONField(serializeUsing = ToStringSerializer.class) private Long msgId; //用於消息的簽收 private MsgSignFlagEnum signed; //消息簽收狀態 private LocalDateTime createDate; @Override @Transactional public void save(Chat chatMsg) { ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class); IdService idService = SpringBootUtil.getBean(IdService.class); ((ChatMsg)chatMsg).setMsgId(idService.genId()); ((ChatMsg)chatMsg).setCreateDate(LocalDateTime.now()); chatDao.saveChat((ChatMsg) chatMsg); } @Transactional @Override public void updateMsgSigned(List<Long> msgIdList) { ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class); chatDao.updateMsgSigned(msgIdList); } @Transactional @Override public List<ChatMsg> findUnReadChat(Long acceptUserId) { ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class); return chatDao.findUnReadMsg(acceptUserId); } }
其中MsgSignFlagEnum爲一個枚舉類型,代碼以下
public enum MsgSignFlagEnum implements LocalisableAll { unsign(0,"未簽收"), signed(1,"已簽收"); public final int type; public final String value; private MsgSignFlagEnum(int type,String value) { this.type = type; this.value = value; } @Override public int getType() { return type; } @Override public String getValue() { return value; } }
創建一個消息的工廠
public class ChatMsgFactory { public static Chat createChatMsgService() { return new ChatMsg(); } }
而後是聊天類
@Data public class DataContent implements Serializable { private Integer action; //動做類型 private ChatMsg chatMsg; //用戶的聊天內容entity private String extand; //擴展字段 }
咱們根據動做類型來定義一個枚舉
public enum MsgActionEnum { CONNECT(1,"第一次(或重連)初始化鏈接"), CHAT(2,"聊天消息"), SIGNED(3,"消息簽收"), KEEPALIVE(4,"客戶端保持心跳"); public final Integer type; public final String content; private MsgActionEnum(Integer type,String content) { this.type = type; this.content = content; } }
在寫WebSocketHandler以前,咱們須要將用戶Id跟Channel作一個綁定
/** * 用戶id和channel的關聯關係的處理 */ @Slf4j public class UserChannelRel { private static Map<Long,Channel> manager = new HashMap<>(); public static void put(Long senderId,Channel channel) { manager.put(senderId,channel); } public static Channel get(Long senderId) { return manager.get(senderId); } public static void output() { manager.entrySet().stream().forEach(entry -> log.info("UserId:" + entry.getKey() + ",ChannelId:" + entry.getValue().id().asLongText()) ); } }
最後WebSocketHandler改寫以下
/** * TextWebSocketFrame: 在netty中,用於爲websocket專門處理文本的對象,frame是消息的載體 */ @Slf4j public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //用於記錄和管理全部客戶端的channel private static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private Chat chatMsgService = ChatMsgFactory.createChatMsgService(); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //獲取客戶端傳輸過來的消息 String content = msg.text(); log.info("content爲:" + content); Channel currentChannel = ctx.channel(); //解析傳輸過來的消息轉成聊天對象 DataContent dataContent = JSONObject.parseObject(content,DataContent.class); //獲取聊天對象的動做 Integer action = dataContent.getAction(); if (action == MsgActionEnum.CONNECT.type) { //當websocket第一次open的時候,初始化channel,把用的channel和userId關聯起來 Long senderId = dataContent.getChatMsg().getSenderId(); UserChannelRel.put(senderId,currentChannel); //測試 users.stream().forEach(channel -> log.info(channel.id().asLongText())); UserChannelRel.output(); }else if (action == MsgActionEnum.CHAT.type) { //聊天類型的消息,把聊天記錄保存到數據庫,同時標記消息的簽收狀態[未簽收] ChatMsg chatMsg = dataContent.getChatMsg(); String msgText = chatMsg.getMsg(); Long receiverId = chatMsg.getReceiverId(); Long senderId = chatMsg.getSenderId(); //保存數據庫 chatMsgService.save(chatMsg); Channel receiverChannel = UserChannelRel.get(receiverId); if (receiverChannel == null) { //接收方離線狀態,此處無需處理 }else { Channel findChannel = users.find(receiverChannel.id()); if (findChannel != null) { findChannel.writeAndFlush(new TextWebSocketFrame( JSONObject.toJSONString(chatMsg) )); }else { //接收方離線,此處無需處理 } } }else if (action == MsgActionEnum.SIGNED.type) { //簽收消息類型,針對具體的消息進行簽收,修改數據庫中對應消息的簽收狀態[已簽收] //擴展字段在signed類型的消息中,表明須要去簽收的消息id,逗號間隔 String msgIdsStr = dataContent.getExtand(); log.info("extand爲:" + msgIdsStr); String[] msgIds = msgIdsStr.split(","); List<Long> msgIdList = new ArrayList<>(); for (String mId : msgIds) { if (!StringUtils.isEmpty(mId)) { msgIdList.add(Long.valueOf(mId)); } } log.info(msgIdList.toString()); if (!CollectionUtils.isEmpty(msgIdList)) { //批量簽收 chatMsgService.updateMsgSigned(msgIdList); } }else if (action == MsgActionEnum.KEEPALIVE.type) { //心跳類型的消息 log.info("收到來自channel爲[" + currentChannel + "]的心跳包"); } } /** * 當客戶端鏈接服務端以後(打開鏈接) * 獲取客戶端的channel,而且放到ChannelGroup中去進行管理 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { users.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //當觸發handlerRemoved,ChannelGroup會自動移除對應的客戶端的channel //因此下面這條語句可不寫 // clients.remove(ctx.channel()); log.info("客戶端斷開,channel對應的長id爲:" + ctx.channel().id().asLongText()); log.info("客戶端斷開,channel對應的短id爲:" + ctx.channel().id().asShortText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel().close(); users.remove(ctx.channel()); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //IdleStateEvent是一個用戶事件,包含讀空閒/寫空閒/讀寫空閒 if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("進入讀空閒"); }else if (event.state() == IdleState.WRITER_IDLE) { log.info("進入寫空閒"); }else if (event.state() == IdleState.ALL_IDLE) { log.info("channel關閉前,用戶數量爲:" + users.size()); //關閉無用的channel,以防資源浪費 ctx.channel().close(); log.info("channel關閉後,用戶數量爲:" + users.size()); } } } }
此處咱們能夠看到有兩個地方註釋了//接收方離線狀態,此處無需處理。因此咱們須要在用戶上線的時候獲取未簽名的消息,只須要經過Controller從數據庫獲取就好
@RestController public class UnReadMessageController { private Chat chatService = ChatMsgFactory.createChatMsgService(); @SuppressWarnings("unchecked") @PostMapping("/notification-anon/getunreadmeg") public Result<List<ChatMsg>> getUnReadMessage(@RequestParam("receiverid") Long receiverId) { return Result.success(chatService.findUnReadChat(receiverId)); } }
因爲咱們登陸用的是OAuth框架,在user模塊添加
@PostMapping("/users-anon/finduser") public LoginAppUser findUserByName(@RequestParam("username") String username) { return appUserService.findByUsername(username); }
修改網關登陸,登陸後能夠獲取用戶的信息,首先添加User模塊的feign
@FeignClient("user-center") public interface UserClient { @PostMapping("/users-anon/finduser") LoginAppUser findUserByName(@RequestParam("username") String username); }
/** * 登錄、刷新token、退出 * * @author 關鍵 */ @Slf4j @RestController public class TokenController { @Autowired private Oauth2Client oauth2Client; @Autowired private UserClient userClient; /** * 系統登錄<br> * 根據用戶名登陸<br> * 採用oauth2密碼模式獲取access_token和refresh_token * * @param username * @param password * @return */ @SuppressWarnings("unchecked") @PostMapping("/sys/login") public Result<Map> login(@RequestParam String username,@RequestParam String password) { Map<String, String> parameters = new HashMap<>(); parameters.put(OAuth2Utils.GRANT_TYPE, "password"); parameters.put(OAuth2Utils.CLIENT_ID, "system"); parameters.put("client_secret", "system"); parameters.put(OAuth2Utils.SCOPE, "app"); // parameters.put("username", username); // 爲了支持多類型登陸,這裏在username後拼裝上登陸類型 parameters.put("username", username + "|" + CredentialType.USERNAME.name()); parameters.put("password", password); Map<String, Object> tokenInfo = oauth2Client.postAccessToken(parameters); AppUser user = userClient.findUserByName(username); tokenInfo.put("user",user); saveLoginLog(username, "用戶名密碼登錄"); return Result.success(tokenInfo); } @Autowired private LogClient logClient; /** * 登錄日誌 * * @param username */ private void saveLoginLog(String username, String remark) { log.info("{}登錄", username); // 異步 CompletableFuture.runAsync(() -> { try { Log log = Log.builder().username(username).module(LogModule.LOGIN).remark(remark).createTime(new Date()) .build(); logClient.save(log); } catch (Exception e) { // do nothing } }); } /** * 系統刷新refresh_token * * @param refresh_token * @return */ @PostMapping("/sys/refresh_token") public Map<String, Object> refresh_token(String refresh_token) { Map<String, String> parameters = new HashMap<>(); parameters.put(OAuth2Utils.GRANT_TYPE, "refresh_token"); parameters.put(OAuth2Utils.CLIENT_ID, "system"); parameters.put("client_secret", "system"); parameters.put(OAuth2Utils.SCOPE, "app"); parameters.put("refresh_token", refresh_token); return oauth2Client.postAccessToken(parameters); } /** * 退出 * * @param access_token */ @GetMapping("/sys/logout") public void logout(String access_token, @RequestHeader(required = false, value = "Authorization") String token) { if (StringUtils.isBlank(access_token)) { if (StringUtils.isNoneBlank(token)) { access_token = token.substring(OAuth2AccessToken.BEARER_TYPE.length() + 1); } } oauth2Client.removeToken(access_token); } }
作如上修改後,咱們登陸後能夠得到用戶的Id,以上省略Dao,Mapper
這個時候服務端就所有完成了,咱們再來看一下客戶端
先創建一個全局變量app.js
window.app = { /** * 和後端的枚舉對應 */ CONNECT: 1, //"第一次(或重連)初始化鏈接"), CHAT: 2, //"聊天消息"), SIGNED: 3, //"消息簽收"), KEEPALIVE: 4, //"客戶端保持心跳"); /** * 和後端的ChatMsg聊天模型對象保持一致 * @param {Object} senderId * @param {Object} receiverId * @param {Object} msg * @param {Object} msgId */ ChatMsg: function(senderId,receiverId,msg,msgId) { this.senderId = senderId; this.receiverId = receiverId; this.msg = msg; this.msgId = msgId; }, /** * 構建消息模型對象 * @param {Object} action * @param {Object} chatMsg * @param {Object} extand */ DataContent: function(action,chatMsg,extand) { this.action = action; this.chatMsg = chatMsg; this.extand = extand; } }
因爲這裏只是聊天樣例,沒有處理登陸功能,咱們如今就以1,2來表明兩個用戶的id
用戶id爲1的代碼,文件名index.html
<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title></title> </head> <body> <div>發送消息:</div> <input type="text" id="msgContent" /> <input type="button" value="發送" onclick="CHAT.chat(1,2,msgContent.value,app.CHAT,null)" /> <div>接受消息:</div> <div id="receiveMsg" style="background-color: gainsboro;"></div> <script type="application/javascript" src="js/app.js"></script> <script type="application/javascript" src="js/mui.min.js"></script> <script type="application/javascript"> window.CHAT = { socket: null, init: function() { if (window.WebSocket) { CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws"); CHAT.socket.onopen = function() { console.log("鏈接創建成功"); CHAT.chat(1,null,null,app.CONNECT,null); //每次鏈接的時候獲取未讀消息 fetchUnReadMsg(); //定時發送心跳,30秒一次 setInterval("CHAT.keepalive()",30000); }, CHAT.socket.onclose = function() { console.log("鏈接關閉"); }, CHAT.socket.onerror = function() { console.log("發生錯誤"); }, CHAT.socket.onmessage = function(e) { console.log("接收到消息" + e.data); var receiveMsg = document.getElementById("receiveMsg"); var html = receiveMsg.innerHTML; var chatMsg = JSON.parse(e.data); receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg; //消息簽收 CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId)); } }else { alert("瀏覽器不支持WebSocket協議..."); } }, chat: function(senderId,receiverId,msg,action,extand) { var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null); var dataContent = new app.DataContent(action,chatMsg,extand); CHAT.socket.send(JSON.stringify(dataContent)); }, keepalive: function() { CHAT.chat(1,null,null,app.KEEPALIVE,null); fetchUnReadMsg(); } } CHAT.init(); function fetchUnReadMsg() { mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=1',{ data:{}, dataType:'json',//服務器返回json格式數據 type:'post',//HTTP請求類型 timeout:10000,//超時時間設置爲10秒; success:function(data){ if (data.code == 200) { var contactList = data.data; var ids = ""; console.log(JSON.stringify(contactList)); var receiveMsg = document.getElementById("receiveMsg"); for (var i = 0;i < contactList.length;i++) { var msgObj = contactList[i]; var html = receiveMsg.innerHTML; receiveMsg.innerHTML = html + "<br/>" + msgObj.msg; ids = ids + msgObj.msgId + ","; } //批量簽收未讀消息 CHAT.chat(1,null,null,app.SIGNED,ids); } } }); } </script> </body> </html>
用戶id爲2的代碼,文件名receive.html
<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title></title> </head> <body> <div>發送消息:</div> <input type="text" id="msgContent" /> <input type="button" value="發送" onclick="CHAT.chat(2,1,msgContent.value,app.CHAT,null)" /> <div>接受消息:</div> <div id="receiveMsg" style="background-color: gainsboro;"></div> <script type="application/javascript" src="js/app.js"></script> <script type="application/javascript" src="js/mui.min.js"></script> <script type="application/javascript"> window.CHAT = { socket: null, init: function() { if (window.WebSocket) { CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws"); CHAT.socket.onopen = function() { console.log("鏈接創建成功"); CHAT.chat(2,null,null,app.CONNECT,null); //每次鏈接的時候獲取未讀消息 fetchUnReadMsg(); //定時發送心跳,30秒一次 setInterval("CHAT.keepalive()",30000); }, CHAT.socket.onclose = function() { console.log("鏈接關閉"); }, CHAT.socket.onerror = function() { console.log("發生錯誤"); }, CHAT.socket.onmessage = function(e) { console.log("接收到消息" + e.data); var receiveMsg = document.getElementById("receiveMsg"); var html = receiveMsg.innerHTML; var chatMsg = JSON.parse(e.data); receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg; //消息簽收 CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId)); } }else { alert("瀏覽器不支持WebSocket協議..."); } }, chat: function(senderId,receiverId,msg,action,extand) { var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null); var dataContent = new app.DataContent(action,chatMsg,extand); CHAT.socket.send(JSON.stringify(dataContent)); }, keepalive: function() { CHAT.chat(2,null,null,app.KEEPALIVE,null); fetchUnReadMsg(); } } CHAT.init(); function fetchUnReadMsg() { mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{ data:{}, dataType:'json',//服務器返回json格式數據 type:'post',//HTTP請求類型 timeout:10000,//超時時間設置爲10秒; success:function(data){ if (data.code == 200) { var contactList = data.data; var ids = ""; console.log(JSON.stringify(contactList)); var receiveMsg = document.getElementById("receiveMsg"); for (var i = 0;i < contactList.length;i++) { var msgObj = contactList[i]; var html = receiveMsg.innerHTML; receiveMsg.innerHTML = html + "<br/>" + msgObj.msg; ids = ids + msgObj.msgId + ","; } //批量簽收未讀消息 CHAT.chat(2,null,null,app.SIGNED,ids); } } }); </script> </body> </html>
這裏都是經過Json來作數據的序列化的,以後會修改成ProtoBuffer來處理。
如今來增長髮圖片的功能。首先咱們須要搭建好一個fastdfs服務器,具體能夠參考分佈式文件系統FastDFS安裝配置 以及Springboot 2.0+FastDFS開發配置 。
這裏只放入咱們須要的Controller
@RestController public class FastDFSController { @Autowired private FastDFSClientWrapper dfsClient; @PostMapping("/files-anon/fdfsupload") @SuppressWarnings("unchecked") public Result<String> upload(@RequestParam("file") MultipartFile file) throws Exception { String imgUrl = dfsClient.uploadFile(file); return Result.success(imgUrl); } }
而後修改咱們的用戶id爲1的前端代碼
<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title></title> </head> <body> <div>發送消息:</div> <input type="text" id="msgContent" /> <input type="button" value="發送" onclick="CHAT.chat(1,2,msgContent.value,app.CHAT,null)" /> <input type="file" id="file" name="file"> <input type="button" id="button" value="發送圖片" > <div>接受消息:</div> <div id="receiveMsg" style="background-color: gainsboro;"></div> <script type="application/javascript" src="js/app.js"></script> <script type="application/javascript" src="js/mui.min.js"></script> <script type="application/javascript" src="js/jquery-3.3.1.min.js"></script> <script type="application/javascript"> window.CHAT = { socket: null, init: function() { if (window.WebSocket) { CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws"); CHAT.socket.onopen = function() { console.log("鏈接創建成功"); CHAT.chat(1,null,null,app.CONNECT,null); //每次鏈接的時候獲取未讀消息 fetchUnReadMsg(); //定時發送心跳,30秒一次 setInterval("CHAT.keepalive()",30000); }, CHAT.socket.onclose = function() { console.log("鏈接關閉"); }, CHAT.socket.onerror = function() { console.log("發生錯誤"); }, CHAT.socket.onmessage = function(e) { console.log("接收到消息" + e.data); var receiveMsg = document.getElementById("receiveMsg"); var html = receiveMsg.innerHTML; var chatMsg = JSON.parse(e.data); receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg; //消息簽收 CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId)); } }else { alert("瀏覽器不支持WebSocket協議..."); } }, chat: function(senderId,receiverId,msg,action,extand) { var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null); var dataContent = new app.DataContent(action,chatMsg,extand); CHAT.socket.send(JSON.stringify(dataContent)); }, keepalive: function() { CHAT.chat(1,null,null,app.KEEPALIVE,null); fetchUnReadMsg(); } } CHAT.init(); function fetchUnReadMsg() { mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{ data:{}, dataType:'json',//服務器返回json格式數據 type:'post',//HTTP請求類型 timeout:10000,//超時時間設置爲10秒; success:function(data){ if (data.code == 200) { var contactList = data.data; var ids = ""; console.log(JSON.stringify(contactList)); var receiveMsg = document.getElementById("receiveMsg"); for (var i = 0;i < contactList.length;i++) { var msgObj = contactList[i]; var html = receiveMsg.innerHTML; receiveMsg.innerHTML = html + "<br/>" + msgObj.msg; ids = ids + msgObj.msgId + ","; } //批量簽收未讀消息 CHAT.chat(2,null,null,app.SIGNED,ids); } } }); } $(function () { $("#button").click(function () { var form = new FormData(); form.append("file", document.getElementById("file").files[0]); $.ajax({ url: "http://xxx.xxx.xxx.xxx:8010/files-anon/fdfsupload", //後臺url data: form, cache: false, async: false, type: "POST", //類型,POST或者GET dataType: 'json', //數據返回類型,能夠是xml、json等 processData: false, contentType: false, success: function (data) { //成功,回調函數 if (data.code == 200) { console.log(data.data); CHAT.chat(1,2,"<img src='" + data.data + "' height='200' width='200' />",app.CHAT,null); } } }); }) }) </script> </body> </html>
意思即爲使用ajax訪問咱們的上傳文件Controller,獲取上傳成功後的url,將該url拼接到<img />的標籤中,當稱普通聊天信息發送出去便可。
用戶id爲2的前端代碼相同。
<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title></title> </head> <body> <div>發送消息:</div> <input type="text" id="msgContent" /> <input type="button" value="發送" onclick="CHAT.chat(2,1,msgContent.value,app.CHAT,null)" /> <input type="file" id="file" name="file"> <input type="button" id="button" value="發送圖片" > <div>接受消息:</div> <div id="receiveMsg" style="background-color: gainsboro;"></div> <script type="application/javascript" src="js/app.js"></script> <script type="application/javascript" src="js/mui.min.js"></script> <script type="application/javascript" src="js/jquery-3.3.1.min.js"></script> <script type="application/javascript"> window.CHAT = { socket: null, init: function() { if (window.WebSocket) { CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws"); CHAT.socket.onopen = function() { console.log("鏈接創建成功"); CHAT.chat(2,null,null,app.CONNECT,null); //每次鏈接的時候獲取未讀消息 fetchUnReadMsg(); //定時發送心跳,30秒一次 setInterval("CHAT.keepalive()",30000); }, CHAT.socket.onclose = function() { console.log("鏈接關閉"); }, CHAT.socket.onerror = function() { console.log("發生錯誤"); }, CHAT.socket.onmessage = function(e) { console.log("接收到消息" + e.data); var receiveMsg = document.getElementById("receiveMsg"); var html = receiveMsg.innerHTML; var chatMsg = JSON.parse(e.data); receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg; //消息簽收 CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId)); } }else { alert("瀏覽器不支持WebSocket協議..."); } }, chat: function(senderId,receiverId,msg,action,extand) { var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null); var dataContent = new app.DataContent(action,chatMsg,extand); CHAT.socket.send(JSON.stringify(dataContent)); }, keepalive: function() { CHAT.chat(2,null,null,app.KEEPALIVE,null); fetchUnReadMsg(); } } CHAT.init(); function fetchUnReadMsg() { mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{ data:{}, dataType:'json',//服務器返回json格式數據 type:'post',//HTTP請求類型 timeout:10000,//超時時間設置爲10秒; success:function(data){ if (data.code == 200) { var contactList = data.data; var ids = ""; console.log(JSON.stringify(contactList)); var receiveMsg = document.getElementById("receiveMsg"); for (var i = 0;i < contactList.length;i++) { var msgObj = contactList[i]; var html = receiveMsg.innerHTML; receiveMsg.innerHTML = html + "<br/>" + msgObj.msg; ids = ids + msgObj.msgId + ","; } //批量簽收未讀消息 CHAT.chat(2,null,null,app.SIGNED,ids); } } }); } $(function () { $("#button").click(function () { var form = new FormData(); form.append("file", document.getElementById("file").files[0]); $.ajax({ url: "http://xxx.xxx.xxx.xxx:8010/files-anon/fdfsupload", //後臺url data: form, cache: false, async: false, type: "POST", //類型,POST或者GET dataType: 'json', //數據返回類型,能夠是xml、json等 processData: false, contentType: false, success: function (data) { //成功,回調函數 if (data.code == 200) { console.log(data.data); CHAT.chat(2,1,"<img src='" + data.data + "' height='200' width='200' />",app.CHAT,null); } } }); }) }) </script> </body> </html>