使用NIO實現一個多人聊天室。聊天室包含如下功能。
服務端java
客戶端服務器
系統類設計
系統包括四個類,分別爲:app
系統業務流程socket
客戶端註冊ui
發送聊天信息this
消息設計
系統消息採用簡單的特殊字符串 String MSG_SPLIT = "#@@#"
分割字段的格式,消息格式分兩種:spa
程序注意點.net
selector.keys().stream().filter(item -> item.channel() instanceof SocketChannel && item.channel().isOpen()).collect(Collectors.toSet());
。OP_READ
事件,但此時 socketChannel.read(byteBuffer)
返回-1或拋IOException,須要捕獲這個異常並關閉socketChannel。客戶端分爲兩個階段線程
messageType = MessageType.REG_CLIENT_ACK
REG_SERVER_ACK
後,進入聊天階段 messageType = MessageType.CHAT_MSG_SEND
。上述第3步,經過Selector.keys獲取全部向Selector註冊的客戶端時,特別注意要過濾已經關閉的Channel,否則處理客戶端下線事件時,取到的用戶列表會包含剛下線的這個用戶,多是由於Selector只有執行select時纔會去刷新並刪除關閉的Channel的緣由吧。
代碼仍是比較簡單的,設計點上面基本都描述了,代碼沒有註釋,將就着看吧。設計
Message 和 MessageType
package chart; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; public class Message { public static final String MSG_SPLIT = "#@@#"; public static final Charset CHARSET = StandardCharsets.UTF_8; private MessageType action; private String option; private String message; public Message(MessageType action, String option, String message) { this.action = action; this.option = option; this.message = message; } public Message(MessageType action, String message) { this.action = action; this.message = message; } public ByteBuffer encode() { StringBuilder builder = new StringBuilder(action.getAction()); if (option != null && option.length() > 0) { builder.append(MSG_SPLIT); builder.append(option); } builder.append(MSG_SPLIT); builder.append(message); return CHARSET.encode(builder.toString()); } public static Message decode(String message) { if (message == null || message.length() == 0) return null; String[] msgArr = message.split(MSG_SPLIT); MessageType messageType = msgArr.length > 1 ? MessageType.getActionType(msgArr[0]) : null; switch (msgArr.length) { case 2: return new Message(messageType, msgArr[1]); case 3: return new Message(messageType, msgArr[1], msgArr[2]); default: return null; } } public static ByteBuffer encodeRegSyn() { return encodeRegSyn(false); } public static ByteBuffer encodeRegSyn(boolean duplicate) { MessageType action = MessageType.REG_SERVER_SYN; String message = "Please input your name to register."; if (duplicate) { message = "This name is used, Please input another name."; } return new Message(action, message).encode(); } public static ByteBuffer encodeSendMsg(String msg) { return encodeSendMsg(null, msg); } public static ByteBuffer encodeSendMsg(String toUser, String msg) { MessageType action = MessageType.CHAT_MSG_SEND; String option = toUser; String message = msg; return new Message(action, option, message).encode(); } public static ByteBuffer encodeReceiveMsg(String fromUser, String msg) { MessageType action = MessageType.CHAT_MSG_RECEIVE; String option = fromUser; String message = msg; return new Message(action, option, message).encode(); } public static ByteBuffer encodeRegClientAck(String username) { MessageType action = MessageType.REG_CLIENT_ACK; String message = username; return new Message(action, message).encode(); } public static ByteBuffer encodeRegServerAck(String username) { MessageType action = MessageType.REG_SERVER_ACK; String message = username + ", Welcome to join the chat."; return new Message(action, message).encode(); } public static ByteBuffer encodePublishUserList(List<String> userList) { MessageType action = MessageType.BROADCAST_USER_LIST; String message = Arrays.toString(userList.toArray()); return new Message(action, message).encode(); } public MessageType getAction() { return action; } public void setAction(MessageType action) { this.action = action; } public String getOption() { return option; } public void setOption(String option) { this.option = option; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } } enum MessageType { REG_SERVER_SYN("reg_server_syn"), CHAT_MSG_SEND("chat_send"), CHAT_MSG_RECEIVE("chat_receive"), UNKNOWN("unknown"), REG_SERVER_ACK("reg_server_ack"), REG_CLIENT_ACK("reg_client_ack"), BROADCAST_USER_LIST("broadcast_user_list"); private String action; MessageType(String action) { this.action = action; } public String getAction() { return action; } public static MessageType getActionType(String action) { for (MessageType messageType : MessageType.values()) { if (messageType.getAction().equals(action)) { return messageType; } } return UNKNOWN; } }
ChatServer
package chart; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.stream.Collectors; public class ChatServer { public static final int SERVER_PORT = 8080; Selector selector; ServerSocketChannel serverSocketChannel; boolean running = true; public void runServer() throws IOException { try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(SERVER_PORT)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Server started."); while (running) { int eventCount = selector.select(100); if (eventCount == 0) continue; Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> keyIterable = set.iterator(); while (keyIterable.hasNext()) { SelectionKey key = keyIterable.next(); keyIterable.remove(); dealEvent(key); } } } finally { if (selector != null && selector.isOpen()) selector.close(); if (serverSocketChannel != null && serverSocketChannel.isOpen()) serverSocketChannel.close(); } } private void dealEvent(SelectionKey key) throws IOException { if (key.isAcceptable()) { System.out.println("Accept client connection."); SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); socketChannel.write(Message.encodeRegSyn()); } if (key.isReadable()) { SocketChannel socketChannel = null; try { System.out.println("Receive message from client."); socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); byteBuffer.flip(); String msg = Message.CHARSET.decode(byteBuffer).toString(); dealMsg(msg, key); } catch (IOException e) { socketChannel.close(); String username = (String) key.attachment(); System.out.println(String.format("User %s disconnected", username)); broadcastUserList(); } } } private void dealMsg(String msg, SelectionKey key) throws IOException { System.out.println(String.format("Message info is: %s", msg)); Message message = Message.decode(msg); if (message == null) return; SocketChannel currentChannel = (SocketChannel) key.channel(); Set<SelectionKey> keySet = getConnectedChannel(); switch (message.getAction()) { case REG_CLIENT_ACK: String username = message.getMessage(); for (SelectionKey keyItem : keySet) { String channelUser = (String) keyItem.attachment(); if (channelUser != null && channelUser.equals(username)) { currentChannel.write(Message.encodeRegSyn(true)); return; } } key.attach(username); currentChannel.write(Message.encodeRegServerAck(username)); System.out.println(String.format("New user joined: %s,", username)); broadcastUserList(); break; case CHAT_MSG_SEND: String toUser = message.getOption(); String msg2 = message.getMessage(); String fromUser = (String) key.attachment(); for (SelectionKey keyItem : keySet) { if (keyItem == key) { continue; } String channelUser = (String) keyItem.attachment(); SocketChannel channel = (SocketChannel) keyItem.channel(); if (toUser == null || toUser.equals(channelUser)) { channel.write(Message.encodeReceiveMsg(fromUser, msg2)); } } break; } } public void broadcastUserList() throws IOException { Set<SelectionKey> keySet = getConnectedChannel(); List<String> uList = keySet.stream().filter(item -> item.attachment() != null).map(SelectionKey::attachment) .map(Object::toString).collect(Collectors.toList()); for (SelectionKey keyItem : keySet) { SocketChannel channel = (SocketChannel) keyItem.channel(); channel.write(Message.encodePublishUserList(uList)); } } private Set<SelectionKey> getConnectedChannel() { return selector.keys().stream() .filter(item -> item.channel() instanceof SocketChannel && item.channel().isOpen()) .collect(Collectors.toSet()); } public static void main(String[] args) throws IOException { new ChatServer().runServer(); } }
ChatClient
package chart; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class ChatClient { Selector selector; SocketChannel socketChannel; boolean running = true; MessageType messageType = MessageType.REG_CLIENT_ACK; String prompt = "User Name:"; public void runClient() throws IOException { try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1", ChatServer.SERVER_PORT)); System.out.println("Client connecting to server."); socketChannel.register(selector, SelectionKey.OP_CONNECT); while (running) { int eventCount = selector.select(100); if (eventCount == 0) continue; Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> keyIterable = set.iterator(); while (keyIterable.hasNext()) { SelectionKey key = keyIterable.next(); keyIterable.remove(); dealEvent(key); } } } finally { if (selector != null && selector.isOpen()) selector.close(); if (socketChannel != null && socketChannel.isConnected()) socketChannel.close(); } } private void dealEvent(SelectionKey key) throws IOException { if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); if (channel.isConnectionPending()) { channel.finishConnect(); } channel.register(selector, SelectionKey.OP_READ); new Thread(() -> { try { Thread.sleep(500); printMsgAndPrompt("Start to interconnect with server."); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); while (running) { System.out.print(prompt); String msg = reader.readLine(); if (msg == null || msg.length() == 0) continue; if (messageType == MessageType.REG_CLIENT_ACK) { ByteBuffer bufferMsg = Message.encodeRegClientAck(msg); channel.write(bufferMsg); } else { String[] msgArr = msg.split("#", 2); ByteBuffer bufferMsg = Message.encodeSendMsg(msg); if (msgArr.length == 2) { bufferMsg = Message.encodeSendMsg(msgArr[0], msgArr[1]); } channel.write(bufferMsg); } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { } }).start(); } else if (key.isReadable()) { try { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); channel.read(byteBuffer); byteBuffer.flip(); String msg = Message.CHARSET.decode(byteBuffer).toString(); dealMsg(msg); } catch (IOException e) { e.printStackTrace(); System.out.println("Server exit."); System.exit(0); } } } private void dealMsg(String msg) { Message message = Message.decode(msg); if (message == null) return; switch (message.getAction()) { case REG_SERVER_SYN: printMsgAndPrompt(message.getMessage()); break; case CHAT_MSG_RECEIVE: printMsgAndPrompt(String.format("MSG from %s: %s", message.getOption(), message.getMessage())); break; case REG_SERVER_ACK: messageType = MessageType.CHAT_MSG_SEND; prompt = "Input your message:"; printMsgAndPrompt(message.getMessage()); break; case BROADCAST_USER_LIST: printMsgAndPrompt(String.format("User list: %s", message.getMessage())); break; default: } } private void printMsgAndPrompt(String msg) { System.out.println(msg); System.out.print(prompt); } public static void main(String[] args) throws IOException { new ChatClient().runClient(); } }