這篇文章主要是接下來Netty源碼閱讀過程的預熱,Netty的線程模型與這邊文章提供的三種相關。Netty沒有第二種,可是在主從Reactor多工做線程上又多了個多主線程模型。個人全部的NIO相關文章,都是爲接下來Netty源碼閱讀系列的準備。java
/**
* @Author CoderJiA
* @Description NIOServer
* @Date 13/2/19 下午4:59
**/
public class NIOServer {
public static void main(String[] args) throws Exception{
// 1.建立ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));
// 2.建立Selector,並ServerSocketChannel註冊OP_ACCEPT事件,接收鏈接。
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 3.開啓輪詢
while (selector.select() > 0) {
// 從selector全部事件就緒的key,並遍歷處理。
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> {
SocketChannel client;
try {
if (selectionKey.isAcceptable()) { // 接受事件就緒
// 獲取serverSocketChannel
ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
// 接收鏈接
client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) { // 讀事件就緒
// 獲取socketChannel
client = (SocketChannel) selectionKey.channel();
// 建立buffer,並將獲取socketChannel中的數據讀入到buffer中
ByteBuffer readBuf = ByteBuffer.allocate(1024);
int readCount = client.read(readBuf);
if (readCount <= 0) {
return;
}
Charset charset = Charset.forName(StandardCharsets.UTF_8.name());
readBuf.flip();
System.out.println(String.valueOf(charset.decode(readBuf).array()));
}
} catch (IOException e) {
e.printStackTrace();
}
selectionKeys.remove(selectionKey);
});
}
}
複製代碼
/**
* @Author CoderJiA
* @Description NIOServer
**/
public class NIOServer {
public static void main(String[] args) throws Exception{
// 1.建立ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));
// 2.建立Selector,並ServerSocketChannel註冊OP_ACCEPT事件,接收鏈接。
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 3.開啓輪詢
while (selector.select() > 0) {
// 從selector全部事件就緒的key,並遍歷處理。
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> {
SocketChannel client;
try {
if (selectionKey.isAcceptable()) { // 接受事件就緒
// 獲取serverSocketChannel
ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
// 接收鏈接
client = server.accept();
client.configureBlocking(false);
SelectionKey readKey = client.register(selector, SelectionKey.OP_READ);
readKey.attach(new Processor());
} else if (selectionKey.isReadable()) { // 讀事件就緒
Processor processor = (Processor) selectionKey.attachment();
processor.process(selectionKey);
}
} catch (IOException e) {
e.printStackTrace();
}
selectionKeys.remove(selectionKey);
});
}
}
}
複製代碼
/**
* @Author CoderJiA
* @Description Processor
**/
public class Processor {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(16);
public void process(SelectionKey selectionKey) {
EXECUTOR_SERVICE.execute(() -> {
try {
// 獲取socketChannel
SocketChannel client = (SocketChannel) selectionKey.channel();
// 建立buffer,並將獲取socketChannel中的數據讀入到buffer中
ByteBuffer readBuf = ByteBuffer.allocate(1024);
int readCount = client.read(readBuf);
if (readCount <= 0) {
return;
}
Charset charset = Charset.forName(StandardCharsets.UTF_8.name());
readBuf.flip();
System.out.println(charset.decode(readBuf).array());
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
複製代碼
/**
* @Author CoderJiA
* @Description NIOServer
* @Date 13/2/19 下午4:59
**/
public class NIOServer {
public static void main(String[] args) throws Exception{
// 1.建立ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));
// 2.建立Selector,並ServerSocketChannel註冊OP_ACCEPT事件,接收鏈接。
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 3.建立processor
int coreProcessorNum = Runtime.getRuntime().availableProcessors();
Processor[] processors = new Processor[coreProcessorNum];
IntStream.range(0, processors.length).forEach(i -> processors[i] = new Processor());
// 4.開啓輪詢
MutableInt index = new MutableInt(0);
while (selector.select() > 0) {
// 主Reactor監聽鏈接
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> {
try {
if (selectionKey.isAcceptable()) { // 接受事件就緒
// 獲取serverSocketChannel
ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
// 接收鏈接
SocketChannel client = server.accept();
client.configureBlocking(false);
Processor processor = processors[(index.getValue()) % coreProcessorNum];
addVal(coreProcessorNum, index);
processor.addSocketChannel(client);
processor.wakeup();
}
} catch (IOException e) {
e.printStackTrace();
}
selectionKeys.remove(selectionKey);
});
}
}
private static void addVal(int coreProcessorNum, MutableInt index) {
if (index.getValue() > coreProcessorNum * 100) {
index.setValue(0);
} else {
index.increment();
}
}
}
複製代碼
/**
* @Author CoderJiA
* @Description Processor
**/
public class Processor {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
private Selector selector;
public Processor() {
try {
this.selector = Selector.open();
start();
} catch (Exception e) {
e.printStackTrace();
}
}
public void addSocketChannel(SocketChannel socketChannel) throws ClosedChannelException {
System.out.println("processor addSocketChannel...");
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
public void wakeup() {
System.out.println("processor wakeup...");
this.selector.wakeup();
}
private void start() {
System.out.println("processor start...");
EXECUTOR_SERVICE.execute(this::run);
}
private void run() {
System.out.println("processor run...");
try {
while (true) {
if (selector.select(100) <= 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> {
try {
if (selectionKey.isReadable()) {
// 獲取socketChannel
SocketChannel client = (SocketChannel) selectionKey.channel();
// 建立buffer,並將獲取socketChannel中的數據讀入到buffer中
ByteBuffer readBuf = ByteBuffer.allocate(1024);
int readCount = client.read(readBuf);
if (readCount <= 0) {
return;
}
Charset charset = Charset.forName("UTF-8");
readBuf.flip();
System.out.println(charset.decode(readBuf).array());
}
selectionKeys.remove(selectionKey);
} catch (Exception e) {
e.printStackTrace();
}
});
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製代碼
參考文章地址:www.jasongj.com/java/nio_re…react
代碼地址: github.com/coderjia061…git