最近看了《netty in action》,關於netty的線程模型不太理解,因而學習了一下java nio的知識,利用java nio寫個簡單的服務器便於理解。html
java nio有3個重要的概念, Channels ,Buffers ,Selectors。經過他們咱們能夠用單個的線程監聽多個數據通道。 java nio能夠進行阻塞的io操做,也能夠進行非阻塞的io操做,咱們更可能是用非阻塞式的操做。
Java NIO 系列教程java
NIO 入門ios
完整代碼.碼雲git
USR-TCP232
SocketTool2web
服務器端使用兩個線程,一個線程負責 accept 鏈接,另外一個線程負責處理 接收到的數據 服務器
package me.zingon.nioserver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
/** * Created by zhidi on 2018-6-29. */
public class SocketAccepter implements Runnable{
private Integer port = 9999;
private ServerSocketChannel server=null;
//接受到的鏈接
private BlockingQueue<SocketChannel> queue=null;
Selector acceptSelector=Selector.open();
public SocketAccepter(Integer port, BlockingQueue<SocketChannel> queue) throws IOException {
this.port = port;
this.queue = queue;
}
@Override
public void run() {
try {
this.server = ServerSocketChannel.open();
//配置爲非阻塞
this.server.configureBlocking(false);
//註冊accept事件
this.server.register(acceptSelector, SelectionKey.OP_ACCEPT);
this.server.bind(new InetSocketAddress(port));
System.out.println("服務器在 "+port +"端口啓動");
} catch (IOException e) {
e.printStackTrace();
}
try {
SocketChannel socketChannel=this.server.accept();
} catch (IOException e) {
e.printStackTrace();
}
while (true){
int count = 0;
try {
count = acceptSelector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
if(count ==0 ){
continue;
}
Iterator<SelectionKey> iterator = acceptSelector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
try {
//接受鏈接
SocketChannel sc = ssc.accept();
queue.add(sc);
System.out.println("服務器接收鏈接 :" + sc);
} catch (IOException e) {
e.printStackTrace();
}
iterator.remove();
}
}
}
}
package me.zingon.nioserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
/** * Created by zhidi on 2018-6-29. */
public class SocketLoop implements Runnable {
//已接受鏈接
private BlockingQueue<SocketChannel> queue;
//讀selector
private Selector readSelector=Selector.open();
//寫selector
private Selector writeSelector=Selector.open();
private ByteBuffer readBuf=ByteBuffer.allocate(1024*64);
private Queue<String> msgQueue=new LinkedList<>();
public SocketLoop(BlockingQueue<SocketChannel> queue) throws IOException {
this.queue = queue;
}
@Override
public void run() {
System.out.println("服務器處理線程啓動");
while (true){
//處理已接收鏈接
registerNewChannel();
try {
readFromChannels();
writeToChannels();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//給socketChannel註冊讀/寫時間
private void registerNewChannel(){
SocketChannel sc=null;
while( (sc = queue.poll()) != null){
try {
sc.configureBlocking(false);
sc.register(readSelector, SelectionKey.OP_READ);
sc.register(writeSelector,SelectionKey.OP_WRITE);
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//從讀就緒的channel中讀取數據,添加到msgQueue中
private void readFromChannels() throws IOException {
int count = readSelector.selectNow();
if(count == 0){
return;
}
Iterator<SelectionKey> iterator = readSelector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(!key.isValid()){
continue;
}
SocketChannel channel = (SocketChannel) key.channel();
try {
channel.read(readBuf);
readBuf.flip();
StringBuilder sb=new StringBuilder();
while(readBuf.hasRemaining()) {
sb.append((char)readBuf.get());
}
readBuf.compact();
System.out.println(sb.toString());
msgQueue.add(sb.toString());
} catch (IOException e) {
e.printStackTrace();
key.cancel();
channel.socket().close();
channel.close();
}
iterator.remove();
}
}
//當寫就緒而且msgQueue不爲空時,將msgQueue中的數據發送給全部寫就緒channel
private void writeToChannels() throws IOException {
int count = writeSelector.selectNow();
if(count == 0){
return;
}
Iterator<SelectionKey> iterator = writeSelector.selectedKeys().iterator();
String msg=msgQueue.poll();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if(!key.isValid()){
continue;
}
if(msg == null){
iterator.remove();
return;
}
SocketChannel channel = (SocketChannel) key.channel();
byte[] asd=(Thread.currentThread().getName()+msg).getBytes();
ByteBuffer bf=ByteBuffer.allocate(asd.length);
bf.put(asd);
bf.flip();
while(bf.hasRemaining()) {
try {
channel.write(bf);
} catch (IOException e) {
key.cancel();
channel.socket().close();
channel.close();
e.printStackTrace();
}
}
bf.clear();
iterator.remove();
}
}
}