趁着三天假期,把
Java NIO
和Reactor
模式整理總結了下,文章特別細節的知識點沒有寫,如一些API
的具體實現。相似數據讀到Buffer
後再寫出時,爲何須要復位操做,這些都屬於NIO
基礎知識,是學習Reactor
模式的前置條件。java
事件
,好比文件描述符,或者是針對於網絡編程中的Socket
描述符。事件既能夠來自於外部,也能夠來自內部;外部事件好比說客戶端的鏈接請求,客戶端發送過來數據等;內部事件好比說操縱系統產生的定時器事件等。它本質上就是一個文件描述符。Handle
是事件產生的發源地。Linux
來講,同步事件分離器指的就是經常使用的I/O
多路複用機制,好比說select
、poll
、epoll
等。在Java NIO
中,同步事件分離器對應的組件就是Selector
;對應的阻塞方法就是select
方法。Netty
相比於Java NIO
來講,在事件處理器這個角色上進行一個升級,它爲咱們開發者提供了大量的回調方法,供咱們在待定事件產生時實現相應的回調方法進行業務邏輯的處理。Reactor
角色。它自己定義了一些規範,這些規範用於控制事件的調度方式,同時又提供了應用進行事件處理器的註冊、刪除等。Initiation Dispatcher
會經過同步事件分離器來等待事件的發生,一旦事件發生,Initiation Dispatcher
首先會分離出每個事件,而後調用事件處理器,最後調用相關的回調方法來處理事件。Initiation Dispatcher
註冊具體的事件處理器時,應用會標識出事件處理器但願Initiation Dispatcher
在某個事件發生時向其通知該事件,該事件與Handle
關聯。Initiation Dispatcher
會要求每一個事件向其傳遞內部的Handle
。該Handle
向操做系統標識了事件處理器。handle_events
方法來啓動Initiation Dispatcher
的事件循環。這時,Initiation Dispatcher
會將每一個註冊的事件管理器的Handle
合併起來,並使用同步事件分離器等待這些事件的發生。好比說,TCP
協議層使用select
同步事件分離器操做來等待客戶端發送的數據到達鏈接的socker handle
上。Handle
變爲ready
狀態時(好比說,TCP socker
變爲等待讀狀態時),同步事件分離器就會通知Initiation Dispatcher
。Initiation Dispatcher
會觸發事件處理器的回調方法,從而響應這個處於ready
狀態的Handle
。Initiation Dispatcher
會回調事件處理器的handle_events
回調方法來執行特定於應用的功能(開發者本身所編寫的功能),從而響應這個事件。所發生的事件類型能夠做爲該方法參數並被該方法內部使用來執行額外的特定於服務的功能。以上描述的內容彷佛和本文的標題不大,其實否則,它正是下面介紹的內容的開端。react
/**
* @Author CoderJiA
* @Description NIOServer
* @Date 13/2/19 下午4:59
**/
public class NIOServer {
public static void main(String[] args) throws Exception{
// 1.建立ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));
// 2.建立Selector,並ServerSocketChannel註冊OP_ACCEPT事件,接收鏈接。
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 3.開啓輪詢
while (selector.select() > 0) {
// 從selector全部事件就緒的key,並遍歷處理。
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> {
SocketChannel client;
try {
if (selectionKey.isAcceptable()) { // 接受事件就緒
// 獲取serverSocketChannel
ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
// 接收鏈接
client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) { // 讀事件就緒
// 獲取socketChannel
client = (SocketChannel) selectionKey.channel();
// 建立buffer,並將獲取socketChannel中的數據讀入到buffer中
ByteBuffer readBuf = ByteBuffer.allocate(1024);
int readCount = client.read(readBuf);
if (readCount <= 0) {
return;
}
Charset charset = Charset.forName(StandardCharsets.UTF_8.name());
readBuf.flip();
System.out.println(String.valueOf(charset.decode(readBuf).array()));
}
} catch (IOException e) {
e.printStackTrace();
}
selectionKeys.remove(selectionKey);
});
}
}
複製代碼
經過這個例子,與原始
Reactor
模式相對應的理解,好比同步事件分離器對應着Selector
的select()
方法,再好比ServerSocketChannel
註冊給Selector
的OP_ACCEPT
,還有SocketChannel
的OP_READ
與OP_WRITE
,這些事件保存在操做系統上,其實就是原始Reactor
中的Handle
。git
Channel
:Connections to files,sockets etc that support non-blocking reads.Buffer
:Array-like objects that can be directly read or written by Channels.Selector
:Tell which of a set of Channels have IO events.SelectionKeys
:Maintain IO event status and bingdings./**
* @Author CoderJiA
* @Description Reactor
* @Date 5/4/19 下午2:25
**/
public abstract class Reactor implements Runnable{
protected final Selector selector;
protected final ServerSocketChannel serverSocket;
protected final long port;
protected final long timeout;
public Reactor(int port, long timeout) throws IOException {
this.port = port;
this.timeout = timeout;
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket
.socket()
.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(newAcceptor(selector));
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
if (selector.select(timeout) > 0) {
Set<SelectionKey> selected = selector.selectedKeys();
selected.forEach(sk -> {
dispatch(sk);
selected.remove(sk);
});
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void dispatch(SelectionKey sk) {
Runnable r = (Runnable)(sk.attachment());
if (Objects.nonNull(r)) {
r.run();
}
}
public abstract Acceptor newAcceptor(Selector selector);
}
複製代碼
/**
* @Author CoderJiA
* @Description Acceptor
* @Date 5/4/19 下午2:58
**/
public class Acceptor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocket;
public Acceptor(Selector selector, ServerSocketChannel serverSocket) {
this.selector = selector;
this.serverSocket = serverSocket;
}
@Override
public void run() {
try {
SocketChannel socket = serverSocket.accept();
if (Objects.nonNull(socket)) {
new Handler(selector, socket);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
複製代碼
/**
* @Author CoderJiA
* @Description Handler
* @Date 5/4/19 下午4:25
**/
public class Handler implements Runnable {
private static final int MB = 1024 * 1024;
protected final SocketChannel socket;
protected final SelectionKey sk;
protected final ByteBuffer input = ByteBuffer.allocate(MB);
protected final ByteBuffer output = ByteBuffer.allocate(MB);
private static final int READING = 0, SENDING = 1;
private int state = READING;
public Handler(Selector selector, SocketChannel socket) throws IOException {
this.socket = socket;
socket.configureBlocking(false);
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
}
@Override
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (Exception e) {
e.printStackTrace();
}
}
private void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
input.clear();
}
private void send() throws IOException {
socket.write(output);
if (outputIsComplete()) {
sk.cancel();
}
}
private boolean inputIsComplete() {
return input.position() > 0;
}
private boolean outputIsComplete() {
return !output.hasRemaining();
}
}
複製代碼
/**
* @Author CoderJiA
* @Description EchoReactor
* @Date 5/4/19 下午5:01
**/
public class EchoReactor extends Reactor {
private static final int PORT = 9999;
private static final long TIME_OUT = TimeUnit.MILLISECONDS.toMillis(10);
public EchoReactor(int port, long timeout) throws IOException {
super(port, timeout);
}
@Override
public Acceptor newAcceptor(Selector selector) {
return new Acceptor(selector, this.serverSocket);
}
public static void main(String[] args) throws IOException {
new EchoReactor(PORT, TIME_OUT).run();
}
}
複製代碼
Reactor
等同於原始Reactor模式
的Initiation Dispatcher
,它負責全部就緒事件統一分發到事件處理器,如Acceptor
和Hanlder
。Acceptor
用於將接收到的SocketChannel
交給Handler處理。Handler
處理讀寫操做。這是
Reactor
的單線程版本,這個版本一個線程處理客戶端的接收
和數據處理
以及讀寫操做
,數據處理每每就是咱們實際開發中的業務處理,是比較耗時的。若是一個處理過程處於阻塞
,那麼這個模型所表現出的就處於阻塞
,因此一個數據處理的阻塞會致使不能處理客戶端鏈接的接收。所以衍生出來下面的多工做線程版原本優化Handler
。github
調整下Handler編程
package cn.coderjia.nio.douglea.reactor2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Author CoderJiA
* @Description Handler
* @Date 5/4/19 下午4:25
**/
public class Handler implements Runnable {
private static final int MB = 1024 * 1024;
protected final SocketChannel socket;
protected final SelectionKey sk;
protected final ByteBuffer input = ByteBuffer.allocate(MB);
protected final ByteBuffer output = ByteBuffer.allocate(MB);
private static final int READING = 0, SENDING = 1, PROCESSING = 3;
private int state = READING;
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public Handler(Selector selector, SocketChannel socket) throws IOException {
this.socket = socket;
socket.configureBlocking(false);
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
}
@Override
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (Exception e) {
e.printStackTrace();
}
}
private void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
EXECUTOR_SERVICE.execute(new Processer());
}
input.clear();
}
private void send() throws IOException {
socket.write(output);
if (outputIsComplete()) {
sk.cancel();
}
}
private void process() {
System.out.println("Handler.process()...");
}
private boolean inputIsComplete() {
return input.position() > 0;
}
private boolean outputIsComplete() {
return !output.hasRemaining();
}
class Processer implements Runnable {
public void run() {
processAndHandOff();
}
}
synchronized void processAndHandOff() {
process();
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
}
複製代碼
Handler
多工做線程版本將耗時的process()
,建立線程去處理。這個版本Reactor
既負責客戶端的接收事件,又負責讀寫事件,由於對於高併發場景鏈接數巨大,Reactor
可能有時候會力不從心。所以衍生出下面的主從Reactor
模型。api
/**
* @Author CoderJiA
* @Description Acceptor3
* @Date 6/4/19 下午6:51
**/
public class Acceptor3 implements Runnable {
private final ServerSocketChannel serverSocket;
public Acceptor3(ServerSocketChannel serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void run() {
try {
SocketChannel socket = serverSocket.accept();
if (Objects.nonNull(socket)) {
new Handler(EchoReactor.nextSubReactor().selector, socket);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
複製代碼
調整Reactorbash
/**
* @Author CoderJiA
* @Description Reactor3
* @Date 6/4/19 下午6:51
**/
public abstract class Reactor3 implements Runnable {
protected Selector selector;
protected ServerSocketChannel serverSocket;
protected final int port;
protected final long timeout;
protected final boolean isMainReactor;
public Reactor3(int port, long timeout, boolean isMainReactor) {
this.port = port;
this.timeout = timeout;
this.isMainReactor = isMainReactor;
}
@Override
public void run() {
try {
init();
while (!Thread.interrupted()) {
if (selector.select(timeout) > 0) {
System.out.println("isMainReactor:" + isMainReactor);
Set<SelectionKey> selected = selector.selectedKeys();
selected.forEach(sk -> {
dispatch(sk);
selected.remove(sk);
});
selected.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void init() throws IOException {
selector = Selector.open();
if (isMainReactor) {
serverSocket = ServerSocketChannel.open();
serverSocket
.socket()
.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(newAcceptor());
}
}
private void dispatch(SelectionKey sk) {
Runnable r = (Runnable)(sk.attachment());
if (Objects.nonNull(r)) {
r.run();
}
}
public abstract Acceptor3 newAcceptor();
}
複製代碼
/**
* @Author CoderJiA
* @Description EchoReactor
* @Date 6/4/19 下午5:35
**/
public class EchoReactor extends Reactor3 {
private static final int PORT = 9999;
private static final long TIME_OUT = TimeUnit.MILLISECONDS.toMillis(10);
private static final int SUB_REACTORS_SIZE = 2;
private static final Reactor3[] SUB_REACTORS = new Reactor3[SUB_REACTORS_SIZE];
private static final AtomicInteger NEXT_INDEX = new AtomicInteger(0);
static {
// 初始化子Reactor
IntStream.range(0, SUB_REACTORS_SIZE).forEach(i -> SUB_REACTORS[i] = new EchoReactor(PORT, TIME_OUT, false));
}
public static Reactor3 nextSubReactor(){
int curIdx = NEXT_INDEX.getAndIncrement();
if(curIdx >= SUB_REACTORS_SIZE){
NEXT_INDEX.set(0);
curIdx = 0;
}
return SUB_REACTORS[(curIdx % SUB_REACTORS_SIZE)];
}
public EchoReactor(int port, long timeout, boolean isMainReactor) {
super(port, timeout, isMainReactor);
}
@Override
public Acceptor3 newAcceptor() {
return new Acceptor3(this.serverSocket);
}
public static void main(String[] args) {
Reactor3 mainReactor = new EchoReactor(PORT, TIME_OUT, true);
// 啓動主Reactor
new Thread(mainReactor).start();
// 啓動子Reactor
IntStream.range(0, SUB_REACTORS_SIZE).forEach(i -> new Thread(SUB_REACTORS[i]).start());
}
}
複製代碼
主從
Reactor
模型,主Reactor
用於處理客戶端鏈接的接收轉發給Acceptor
處理,子Reactor
處理讀寫事件的接收轉發給Handler
處理。網絡
Scalable IO in Java併發
github.com/coderjia061…socket