服務端對於每一個到達的客戶端都從新開啓一個線程專門處理它們之間的交互。 這種交互在邏輯上是客戶端與服務端直接進行通訊。 隨着高併發的場景到來,服務器處理上下文切換,建立和銷燬線程的代價,將會讓服務器不堪重負。java
Channel --> 區別於單向的InputStream/OutputStream,它是雙向的。 Selector --> 主要的控制器 Buffer --> 讀寫兩種模式能夠經過flip切換。編程
ServerSocketChannel先建立,後綁定一個端口。 設置爲非阻塞模式。 將channel註冊到selector上,監聽鏈接事件。 開始循環等待新接入的鏈接。安全
循環內: 每次調用selector.select()將會阻塞地等待至少一個channel準備就緒,返回準備就緒地channel數量。 若是數量爲零,開始下一輪select(); 數量不爲零,則將這些準備就緒地channel取出來。 根據這些channel對應的當初向selector註冊的類型(accept/read),執行對應的業務邏輯。服務器
public void start() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8000));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務器啓動成功!");
for (;;) {
int readyChannels = selector.select();
if (readyChannels == 0) continue;
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iterator.next();
if (selectionKey.isAcceptable()) {
acceptHandler(serverSocketChannel, selector);
}
if (selectionKey.isReadable()) {
readHandler(selectionKey, selector);
}
}
}
}
複製代碼
創建鏈接成功後,設置非阻塞模式,而且將這個剛剛創建的channel,註冊到服務端的Selector。併發
private void acceptHandler(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
socketChannel.write(Charset.forName("UTF-8")
.encode("你與聊天室裏其餘人都不是朋友關係,請注意隱私安全"));
}
複製代碼
private void readHandler(SelectionKey selectionKey, Selector selector) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String request = "";
while (socketChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
request += Charset.forName("UTF-8").decode(byteBuffer);
}
// 將channel再次註冊到selector上,監聽他的可讀事件
socketChannel.register(selector, SelectionKey.OP_READ);
if (request.length() > 0) {
broadCast(selector, socketChannel, request);
}
}
複製代碼
private void broadCast(Selector selector, SocketChannel sourceChannel, String request) {
// 獲取到全部已經接入的客戶端channel
Set<SelectionKey> selectionKeySet = selector.keys();
selectionKeySet.forEach(selectionKey -> {
Channel targetChannel = selectionKey.channel();
// 剔除發消息的客戶端
if (targetChannel instanceof SocketChannel
&& targetChannel != sourceChannel) {
try {
// 將信息發送到targetChannel客戶端
((SocketChannel) targetChannel).write(
Charset.forName("UTF-8").encode(request));
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
複製代碼
public void start(String nickname) throws IOException {
SocketChannel socketChannel = SocketChannel.open(
new InetSocketAddress("127.0.0.1", 8000));
Selector selector = Selector.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
//開啓一個線程專門處理服務端發來的消息
new Thread(new NioClientHandler(selector)).start();
//向服務端發送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String request = scanner.nextLine();
if (request != null && request.length() > 0) {
socketChannel.write(
Charset.forName("UTF-8")
.encode(nickname + " : " + request));
}
}
}
複製代碼
與服務端的start方法相似。 但客戶端的Selector只註冊了一個讀事件的SocketChannel。 所以該Selector,實際上就只是不斷地監聽服務端有沒有消息傳過來。 若是有消息傳來那麼該Selector中綁定的這個惟一的channel就會編程已經就緒的狀態,將會執行它的readHadler()。 因此客戶端使用NIO和BIO的性能影響差異不大。socket
@Override
public void run() {
try {
for (;;) {
int readyChannels = selector.select();
if (readyChannels == 0) continue;
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = (SelectionKey) iterator.next();
if (selectionKey.isReadable()) {
readHandler(selectionKey, selector);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
複製代碼
readHandleride
private void readHandler(SelectionKey selectionKey, Selector selector) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String response = "";
while (socketChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
response += Charset.forName("UTF-8").decode(byteBuffer);
}
socketChannel.register(selector, SelectionKey.OP_READ);
if (response.length() > 0) {
System.out.println(response);
}
}
複製代碼