Java高效NIO之多人聊天室

系統功能

使用NIO實現一個多人聊天室。聊天室包含如下功能。
服務端java

  1. 處理客戶鏈接
  2. 新鏈接客戶端註冊名字,並進行重名判斷
  3. 新用戶註冊後向客戶端廣播用戶列表
  4. 接收客戶端消息並單播或廣播

客戶端服務器

  1. 向服務端發起鏈接
  2. 用戶註冊名稱
  3. 接收服務端廣播消息
  4. 發送聊天消息,支持單播和廣播

系統設計

系統類設計
系統包括四個類,分別爲:app

  1. 消息處理類:Message,處理消息的編解碼
  2. 消息枚舉:MessageType,定義消息類型
  3. 聊天服務端:ChatServer
  4. 聊天客戶端:ChatClient

NIO聊天室.PNG

系統業務流程socket

  1. 服務端啓動
  2. 客戶端啓動
  3. 客戶端註冊ui

    1. 服務端向客戶端發送註冊用戶名提示,消息類型:REG_SERVER_SYN
    2. 客戶端向服務端發送註冊用戶名,消息類型:REG_CLIENT_ACK
    3. 服務端向客戶端發送註冊確認消息,消息類型:REG_SERVER_ACK
    4. 服務端向全部客戶端廣播用戶列表,消息類型:BROADCAST_USER_LIST
  4. 發送聊天信息this

    1. 客戶端向服務端發送聊天信息,指定toUser爲單播,不然廣播,消息類型:CHAT_MSG_SEND
    2. 服務端接收聊天信息,進行單播或關閉,消息類型:CHAT_MSG_RECEIVE
    3. 客戶端顯示消息內容

消息設計
系統消息採用簡單的特殊字符串 String MSG_SPLIT = "#@@#" 分割字段的格式,消息格式分兩種:spa

  1. message_type#@@#message_content格式,即命令+數據的格式
  2. message_type#@@#option#@@#message_content,其中option爲附加消息,好比客戶端發送單播聊天信息時指定toUser。

程序注意點.net

  1. 服務端爲單線程模式,由一個Selector處理全部消息。
  2. 客戶端註冊後,用戶名信息保存在服務端對應SelectionKey.attachment屬性中。
  3. 經過Selector.keys可獲取全部向Selector註冊的客戶端,獲取客戶端鏈接列表時,須要過濾掉ServerSocketChannel和關閉的Channel selector.keys().stream().filter(item -> item.channel() instanceof SocketChannel && item.channel().isOpen()).collect(Collectors.toSet());
  4. 當Socket鏈接的一端關閉時,另外一端會觸發 OP_READ 事件,但此時 socketChannel.read(byteBuffer) 返回-1或拋IOException,須要捕獲這個異常並關閉socketChannel。
  5. 客戶端由於要同時處理服務端發送的數據和接收客戶端消息輸入,若是單線程,在客戶端輸入消息時,線程阻塞,沒法接受服務端消息。因此客戶端使用2個線程,主線程處理服務端消息,啓動一個子線程接收用戶輸入並處理。
  6. 客戶端分爲兩個階段線程

    1. 初始爲註冊階段 messageType = MessageType.REG_CLIENT_ACK
    2. 收到服務器端註冊成功消息REG_SERVER_ACK後,進入聊天階段 messageType = MessageType.CHAT_MSG_SEND
上述第3步,經過Selector.keys獲取全部向Selector註冊的客戶端時,特別注意要過濾已經關閉的Channel,否則處理客戶端下線事件時,取到的用戶列表會包含剛下線的這個用戶,多是由於Selector只有執行select時纔會去刷新並刪除關閉的Channel的緣由吧。

程序代碼

代碼仍是比較簡單的,設計點上面基本都描述了,代碼沒有註釋,將就着看吧。設計

Message 和 MessageType

package chart;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;

public class Message {
    public static final String MSG_SPLIT = "#@@#";
    public static final Charset CHARSET = StandardCharsets.UTF_8;

    private MessageType action;
    private String option;
    private String message;

    public Message(MessageType action, String option, String message) {
        this.action = action;
        this.option = option;
        this.message = message;
    }

    public Message(MessageType action, String message) {
        this.action = action;
        this.message = message;
    }

    public ByteBuffer encode() {
        StringBuilder builder = new StringBuilder(action.getAction());
        if (option != null && option.length() > 0) {
            builder.append(MSG_SPLIT);
            builder.append(option);
        }
        builder.append(MSG_SPLIT);
        builder.append(message);

        return CHARSET.encode(builder.toString());
    }

    public static Message decode(String message) {
        if (message == null || message.length() == 0)
            return null;
        String[] msgArr = message.split(MSG_SPLIT);
        MessageType messageType = msgArr.length > 1 ? MessageType.getActionType(msgArr[0]) : null;

        switch (msgArr.length) {
            case 2:
                return new Message(messageType, msgArr[1]);
            case 3:
                return new Message(messageType, msgArr[1], msgArr[2]);
            default:
                return null;
        }
    }

    public static ByteBuffer encodeRegSyn() {
        return encodeRegSyn(false);
    }

    public static ByteBuffer encodeRegSyn(boolean duplicate) {
        MessageType action = MessageType.REG_SERVER_SYN;
        String message = "Please input your name to register.";
        if (duplicate) {
            message = "This name is used, Please input another name.";
        }
        return new Message(action, message).encode();
    }

    public static ByteBuffer encodeSendMsg(String msg) {
        return encodeSendMsg(null, msg);
    }

    public static ByteBuffer encodeSendMsg(String toUser, String msg) {
        MessageType action = MessageType.CHAT_MSG_SEND;
        String option = toUser;
        String message = msg;
        return new Message(action, option, message).encode();
    }

    public static ByteBuffer encodeReceiveMsg(String fromUser, String msg) {
        MessageType action = MessageType.CHAT_MSG_RECEIVE;
        String option = fromUser;
        String message = msg;
        return new Message(action, option, message).encode();
    }

    public static ByteBuffer encodeRegClientAck(String username) {
        MessageType action = MessageType.REG_CLIENT_ACK;
        String message = username;
        return new Message(action, message).encode();
    }

    public static ByteBuffer encodeRegServerAck(String username) {
        MessageType action = MessageType.REG_SERVER_ACK;
        String message = username + ", Welcome to join the chat.";
        return new Message(action, message).encode();
    }

    public static ByteBuffer encodePublishUserList(List<String> userList) {
        MessageType action = MessageType.BROADCAST_USER_LIST;
        String message = Arrays.toString(userList.toArray());
        return new Message(action, message).encode();
    }

    public MessageType getAction() {
        return action;
    }

    public void setAction(MessageType action) {
        this.action = action;
    }

    public String getOption() {
        return option;
    }

    public void setOption(String option) {
        this.option = option;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

enum MessageType {
    REG_SERVER_SYN("reg_server_syn"),
    CHAT_MSG_SEND("chat_send"),
    CHAT_MSG_RECEIVE("chat_receive"),
    UNKNOWN("unknown"),
    REG_SERVER_ACK("reg_server_ack"),
    REG_CLIENT_ACK("reg_client_ack"),
    BROADCAST_USER_LIST("broadcast_user_list");

    private String action;

    MessageType(String action) {
        this.action = action;
    }

    public String getAction() {
        return action;
    }

    public static MessageType getActionType(String action) {
        for (MessageType messageType : MessageType.values()) {
            if (messageType.getAction().equals(action)) {
                return messageType;
            }
        }
        return UNKNOWN;
    }
}

ChatServer

package chart;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class ChatServer {
    public static final int SERVER_PORT = 8080;

    Selector selector;
    ServerSocketChannel serverSocketChannel;
    boolean running = true;

    public void runServer() throws IOException {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();

            serverSocketChannel.bind(new InetSocketAddress(SERVER_PORT));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("Server started.");

            while (running) {
                int eventCount = selector.select(100);
                if (eventCount == 0)
                    continue;
                Set<SelectionKey> set = selector.selectedKeys();
                Iterator<SelectionKey> keyIterable = set.iterator();
                while (keyIterable.hasNext()) {
                    SelectionKey key = keyIterable.next();
                    keyIterable.remove();
                    dealEvent(key);
                }
            }
        } finally {
            if (selector != null && selector.isOpen())
                selector.close();
            if (serverSocketChannel != null && serverSocketChannel.isOpen())
                serverSocketChannel.close();
        }
    }

    private void dealEvent(SelectionKey key) throws IOException {
        if (key.isAcceptable()) {
            System.out.println("Accept client connection.");
            SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            socketChannel.write(Message.encodeRegSyn());
        }
        if (key.isReadable()) {
            SocketChannel socketChannel = null;
            try {
                System.out.println("Receive message from client.");
                socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                socketChannel.read(byteBuffer);
                byteBuffer.flip();
                String msg = Message.CHARSET.decode(byteBuffer).toString();
                dealMsg(msg, key);
            } catch (IOException e) {
                socketChannel.close();
                String username = (String) key.attachment();
                System.out.println(String.format("User %s disconnected", username));
                broadcastUserList();
            }
        }
    }

    private void dealMsg(String msg, SelectionKey key) throws IOException {
        System.out.println(String.format("Message info is: %s", msg));
        Message message = Message.decode(msg);
        if (message == null)
            return;

        SocketChannel currentChannel = (SocketChannel) key.channel();
        Set<SelectionKey> keySet = getConnectedChannel();
        switch (message.getAction()) {
            case REG_CLIENT_ACK:
                String username = message.getMessage();
                for (SelectionKey keyItem : keySet) {
                    String channelUser = (String) keyItem.attachment();
                    if (channelUser != null && channelUser.equals(username)) {
                        currentChannel.write(Message.encodeRegSyn(true));
                        return;
                    }
                }
                key.attach(username);
                currentChannel.write(Message.encodeRegServerAck(username));
                System.out.println(String.format("New user joined: %s,", username));
                broadcastUserList();
                break;
            case CHAT_MSG_SEND:
                String toUser = message.getOption();
                String msg2 = message.getMessage();
                String fromUser = (String) key.attachment();

                for (SelectionKey keyItem : keySet) {
                    if (keyItem == key) {
                        continue;
                    }
                    String channelUser = (String) keyItem.attachment();
                    SocketChannel channel = (SocketChannel) keyItem.channel();
                    if (toUser == null || toUser.equals(channelUser)) {
                        channel.write(Message.encodeReceiveMsg(fromUser, msg2));
                    }
                }
                break;
        }
    }

    public void broadcastUserList() throws IOException {
        Set<SelectionKey> keySet = getConnectedChannel();
        List<String> uList = keySet.stream().filter(item -> item.attachment() != null).map(SelectionKey::attachment)
                .map(Object::toString).collect(Collectors.toList());
        for (SelectionKey keyItem : keySet) {
            SocketChannel channel = (SocketChannel) keyItem.channel();
            channel.write(Message.encodePublishUserList(uList));
        }
    }

    private Set<SelectionKey> getConnectedChannel() {
        return selector.keys().stream()
                .filter(item -> item.channel() instanceof SocketChannel && item.channel().isOpen())
                .collect(Collectors.toSet());
    }

    public static void main(String[] args) throws IOException {
        new ChatServer().runServer();
    }
}

ChatClient

package chart;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ChatClient {
    Selector selector;
    SocketChannel socketChannel;
    boolean running = true;

    MessageType messageType = MessageType.REG_CLIENT_ACK;
    String prompt = "User Name:";

    public void runClient() throws IOException {
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("127.0.0.1", ChatServer.SERVER_PORT));
            System.out.println("Client connecting to server.");

            socketChannel.register(selector, SelectionKey.OP_CONNECT);

            while (running) {
                int eventCount = selector.select(100);
                if (eventCount == 0)
                    continue;
                Set<SelectionKey> set = selector.selectedKeys();
                Iterator<SelectionKey> keyIterable = set.iterator();
                while (keyIterable.hasNext()) {
                    SelectionKey key = keyIterable.next();
                    keyIterable.remove();
                    dealEvent(key);
                }
            }
        } finally {
            if (selector != null && selector.isOpen())
                selector.close();

            if (socketChannel != null && socketChannel.isConnected())
                socketChannel.close();
        }
    }

    private void dealEvent(SelectionKey key) throws IOException {
        if (key.isConnectable()) {
            SocketChannel channel = (SocketChannel) key.channel();
            if (channel.isConnectionPending()) {
                channel.finishConnect();
            }
            channel.register(selector, SelectionKey.OP_READ);

            new Thread(() -> {
                try {
                    Thread.sleep(500);
                    printMsgAndPrompt("Start to interconnect with server.");
                    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
                    while (running) {
                        System.out.print(prompt);
                        String msg = reader.readLine();
                        if (msg == null || msg.length() == 0)
                            continue;

                        if (messageType == MessageType.REG_CLIENT_ACK) {
                            ByteBuffer bufferMsg = Message.encodeRegClientAck(msg);
                            channel.write(bufferMsg);
                        } else {
                            String[] msgArr = msg.split("#", 2);
                            ByteBuffer bufferMsg = Message.encodeSendMsg(msg);
                            if (msgArr.length == 2) {
                                bufferMsg = Message.encodeSendMsg(msgArr[0], msgArr[1]);
                            }

                            channel.write(bufferMsg);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {

                }
            }).start();
        } else if (key.isReadable()) {
            try {
                SocketChannel channel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                channel.read(byteBuffer);
                byteBuffer.flip();
                String msg = Message.CHARSET.decode(byteBuffer).toString();
                dealMsg(msg);
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("Server exit.");
                System.exit(0);
            }
        }
    }

    private void dealMsg(String msg) {
        Message message = Message.decode(msg);
        if (message == null)
            return;

        switch (message.getAction()) {
            case REG_SERVER_SYN:
                printMsgAndPrompt(message.getMessage());
                break;
            case CHAT_MSG_RECEIVE:
                printMsgAndPrompt(String.format("MSG from %s: %s", message.getOption(), message.getMessage()));
                break;
            case REG_SERVER_ACK:
                messageType = MessageType.CHAT_MSG_SEND;
                prompt = "Input your message:";
                printMsgAndPrompt(message.getMessage());
                break;
            case BROADCAST_USER_LIST:
                printMsgAndPrompt(String.format("User list: %s", message.getMessage()));
                break;
            default:
        }
    }

    private void printMsgAndPrompt(String msg) {
        System.out.println(msg);
        System.out.print(prompt);
    }

    public static void main(String[] args) throws IOException {
        new ChatClient().runClient();
    }
}
相關文章
相關標籤/搜索