# 需求
-
基於NIO實現 -
支持同時多個客戶端接入 -
支持客戶端發送文本消息到服務器 -
支持客戶端自定義羣聊名稱 -
接收到客戶端發送的消息以後,服務器須要將消息轉發給目前在線的全部其餘客戶端 -
支持客戶端退出羣聊 -
服務端中止服務後,客戶端自動斷開鏈接
# 技術介紹
-
Non-blockingI/O
編程模型 -
Channel
通道 -
ServerSocketChannel
服務端通道 -
SocketChannel
客戶端通道 -
ByteBuffer
NIO中使用的讀寫緩衝區 -
Selector
多路複用器 -
將 channel
註冊在多路複用器上,並監聽相應的事件 -
多線程 -
線程池
# 代碼
舒適提示:注意看代碼註釋喲~ 跟上節奏,很簡單😼java
-
服務器
/**
* 基於NIO實現的聊天室服務端
*
* @author futao
* @date 2020/7/8
*/
@Slf4j
public class NioChatServer {
/**
* 用於處理通道上的事件的線程池(可選的)
*/
private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
/**
* 啓動聊天室
*/
public void start() {
try {
//服務端Socket通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//將通道設置成非阻塞
serverSocketChannel.configureBlocking(false);
//綁定主機與監聽端口
serverSocketChannel.bind(new InetSocketAddress("localhost", Constants.SERVER_PORT));
//多路複用器
Selector selector = Selector.open();
//將服務端通道註冊到多路複用器上,並設置監聽事件接入事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
log.debug("{} 基於NIO的聊天室在[{}]端口啓動成功 {}", StringUtils.repeat("=", 30), Constants.SERVER_PORT, StringUtils.repeat("=", 30));
while (true) {
// 觸發了事件的通道數量,該方法會阻塞
int eventCountTriggered = selector.select();
if (eventCountTriggered <= 0) {
continue;
}
// 獲取到全部觸發的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍歷事件進行處理
for (SelectionKey selectionKey : selectionKeys) {
// 處理事件
selectionKeyHandler(selectionKey, selector);
}
// 清除事件記錄
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 事件處理器
*
* @param selectionKey 觸發的事件信息
* @param selector 多路複用器
*/
private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
if (selectionKey.isAcceptable()) {
//若是觸發的是SocketChannel接入事件
try {
// ServerSocketChannel上觸發的客戶端SocketChannel接入
SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
log.debug("客戶端[{}]成功接入聊天服務器", socketChannel.socket().getPort());
// 將客戶端SocketChannel通道設置成非阻塞
socketChannel.configureBlocking(false);
// 將客戶端通道註冊到多路複用器,並監聽這個通道上發生的可讀事件
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
} else if (selectionKey.isReadable()) {
// 觸發的是可讀事件
// 獲取到可讀事件的客戶端通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//建立緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
try {
// 讀取通道上的數據寫入緩衝區(返回0或者-1說明讀到了末尾)
while (socketChannel.read(byteBuffer) > 0) {
}
//切換爲讀模式
byteBuffer.flip();
// 接收到的消息
String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
log.info("接收到來自客戶端[{}]的數據:[{}]", socketChannel.socket().getPort(), message);
// 是否退出
quit(message, selector, selectionKey);
// 消息轉發
forwardMessage(message, selector, selectionKey);
// 清除緩衝區的數據
byteBuffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 客戶端退出
*
* @param message 消息
* @param selector 多路複用器
* @param selectionKey 觸發的selectionKey
*/
public void quit(String message, Selector selector, SelectionKey selectionKey) {
if (StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message)) {
int port = ((SocketChannel) selectionKey.channel()).socket().getPort();
// 客戶端下線
selectionKey.cancel();
log.debug("客戶端[{}]下線", port);
// 由於發生了監聽事件和channel的變動,因此須要通知selector從新整理selector所監聽的事件
selector.wakeup();
}
}
/**
* 轉發消息
*
* @param message 須要轉發的消息
* @param selector 多路複用器
* @param curSelectionKey 當前觸發的selectionKey
*/
public void forwardMessage(String message, Selector selector, SelectionKey curSelectionKey) {
// 建立緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
// 數據寫入緩衝區
byteBuffer.put(message.getBytes(Constants.CHARSET));
// 切換爲讀模式
byteBuffer.flip();
// 在首尾進行標記,由於須要給每一個客戶端發送一樣的數據,須要重複讀取
byteBuffer.mark();
// 當前註冊在多路複用器上的SelectionKey集合
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
// 消息不能轉發給本身 and 只轉發給客戶端SocketChannel
if (curSelectionKey.equals(key) || !(key.channel() instanceof SocketChannel)) {
continue;
}
// 客戶端SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
// 若是緩衝區中還有數據就一直寫
while (byteBuffer.hasRemaining()) {
try {
// 數據寫入通道
socketChannel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
// 重置到上次mark的地方,即首位
byteBuffer.reset();
}
// 清除緩衝區的數據
byteBuffer.clear();
}
public static void main(String[] args) {
new NioChatServer().start();
}
}
-
客戶端
/**
* 基於NIO實現的羣聊客戶端
*
* @author futao
* @date 2020/7/8
*/
@Getter
@Setter
@Slf4j
public class NioChatClient {
/**
* 用於處理用戶輸入數據的單個線程線程池,使用線程池是爲了便於關閉
*/
private static final ExecutorService USER_INPUT_HANDLER = Executors.newSingleThreadExecutor();
/**
* 用戶名
*/
private String userName;
/**
* 啓動客戶端
*/
public void start() {
try {
// 建立客戶端通道
SocketChannel socketChannel = SocketChannel.open();
// 將通道設置爲非阻塞
socketChannel.configureBlocking(false);
// 建立多路複用器
Selector selector = Selector.open();
// 將客戶端通道註冊到多路複用器,並監聽可讀事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 嘗試鏈接到聊天服務器
socketChannel.connect(new InetSocketAddress("localhost", Constants.SERVER_PORT));
while (true) {
// 阻塞等待通道上的事件觸發。返回觸發的通道的數量
int eventCountTriggered = selector.select();
if (eventCountTriggered <= 0) {
continue;
}
// 獲取到全部觸發的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍歷事件進行處理
for (SelectionKey selectionKey : selectionKeys) {
// 處理事件
selectionKeyHandler(selectionKey, selector);
}
// 清除事件記錄
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClosedSelectorException e) {
log.debug("成功退出聊天室...");
}
}
/**
* 處理器
*
* @param selectionKey 觸發的selectionKey
* @param selector 多路複用器
*/
private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
if (selectionKey.isConnectable()) {
// 觸發的是成功接入服務器的事件
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
try {
// 判斷此通道上的鏈接操做是否正在進行中
if (socketChannel.isConnectionPending()) {
// 完成鏈接套接字通道的過程
socketChannel.finishConnect();
log.debug("成功接入聊天服務器");
// 將通道設置成非阻塞
socketChannel.configureBlocking(false);
// 將通道註冊到多路複用器,並監聽可讀事件
socketChannel.register(selector, SelectionKey.OP_READ);
// 建立緩衝區,用於處理將用戶輸入的數據寫入通道
ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);
// 在新線程中處理用戶輸入
USER_INPUT_HANDLER.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
//先清空緩衝區中的數據
byteBuffer.clear();
// 獲取用戶輸入的文本
String message = new Scanner(System.in).nextLine();
// 將數據寫入緩衝區
byteBuffer.put(String.format("【%s】: %s", userName, message).getBytes(Constants.CHARSET));
// 將緩衝區設置爲讀模式
byteBuffer.flip();
try {
// 當緩衝區中還有數據
while (byteBuffer.hasRemaining()) {
// 將數據寫入通道
socketChannel.write(byteBuffer);
}
} catch (IOException e) {
e.printStackTrace();
}
// 判斷是否退出羣聊
if (quit(message, selector, selectionKey)) {
// 跳出循環,結束線程
break;
}
}
try {
// 關閉多路複用器
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
// 關閉線程池
USER_INPUT_HANDLER.shutdown();
});
}
} catch (IOException e) {
e.printStackTrace();
}
} else if (selectionKey.isReadable()) {
// 觸發的是可讀事件
// 獲取到可讀事件的通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//建立緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
try {
// 將通道上的數據寫入緩衝區(返回0或者-1說明讀到了末尾)
while (socketChannel.read(byteBuffer) > 0) {
}
// 切換成讀模式
byteBuffer.flip();
String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
byteBuffer.clear();
log.info("接收到數據:[{}]", message);
if (StringUtils.isBlank(message)) {
log.debug("服務器拉胯,下車...");
selector.close();
USER_INPUT_HANDLER.shutdownNow();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 退出羣聊
*
* @param message 消息
* @param selector 多路複用器
* @param selectionKey 觸發的selectionKey
* @return 是否退出
*/
public boolean quit(String message, Selector selector, SelectionKey selectionKey) {
if (Constants.KEY_WORD_QUIT.equals(message)) {
selectionKey.cancel();
selector.wakeup();
return true;
}
return false;
}
public static void main(String[] args) {
NioChatClient nioChatClient = new NioChatClient();
nioChatClient.setUserName("小9");
nioChatClient.start();
}
}
# 測試
-
接入
-
客戶端發送消息
-
消息轉發
-
客戶端下線
-
服務器宕機
# 源代碼
* https://github.com/FutaoSmile/learn-IO/tree/master/practice/src/main/java/com/futao/practice/chatroom/nio
# 系列文章
【BIO】基於BIO實現簡單動態HTTP服務器git
【BIO】經過指定消息大小實現的多人聊天室-終極版本github
BIO在聊天室項目中的演化web
歡迎在評論區留下你看文章時的思考,及時說出,有助於加深記憶和理解,還能和像你同樣也喜歡這個話題的讀者相遇~編程
本文分享自微信公衆號 - 喜歡天文(AllUnderControl)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。服務器