Java中線程模型大體能夠分爲:java
單線程模型中,server端使用一個線程來處理全部的請求,全部的請求必須串行化處理,效率低下。 多線程模型中,server端會爲每一個請求分配一個線程去處理請求,相對單線程模型而言多線程模型效率更高,可是多線程模型的缺點也很明顯:server端爲每一個請求都開闢一個線程來處理請求,若是請求數量很大,則會形成大量線程被建立,形成內存溢出。Java中線程是比較昂貴的對象。線程的數量不該該無限量的增大,當線程數超過必定數目後,增長線程不只不能提升效率,反而會下降效率。 藉助"對象複用"的思想,線程池應運而生,線程池中通常有必定數目的線程,當請求數目超過線程數以後須要排隊等待。這樣就避免了線程會不停地增加。這裏不打算對Java的線程池作過多介紹,有興趣的能夠去看我以前的文章:java線程池。git
Reactor是一種處理模式。Reactor模式是處理併發I/O比較常見的一種模式,用於同步I/O,中心思想是將全部要處理的IO事件註冊到一箇中心I/O多路複用器上,同時主線程/進程阻塞在多路複用器上;一旦有I/O事件到來或是準備就緒(文件描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應I/O事件分發到對應的處理器中。程序員
Reactor也是一種實現機制。Reactor利用事件驅動機制實現,和普通函數調用的不一樣之處在於:應用程序不是主動的調用某個API完成處理,而是偏偏相反,Reactor逆置了事件處理流程,應用程序須要提供相應的接口並註冊到Reactor上,若是相應的事件發生,Reactor將主動調用應用程序註冊的接口,這些接口又稱爲「回調函數」。用「好萊塢原則」來形容Reactor再合適不過了:不要打電話給咱們,咱們會打電話通知你。github
Reactor模型實質上是對I/O多路複用的一層包裝,理論上來講I/O多路複用的效率已經夠高了,爲何還須要Reactor模型呢?答案是I/O多路複用雖然性能已經夠高了,可是編碼複雜,在工程效率上仍是過低。所以出現了Reactor模型。編程
一個個網絡請求可能涉及到多個I/O請求,相比傳統的單線程完整處理請求生命期的方法,I/O複用在人的大腦思惟中並不天然,由於,程序員編程中,處理請求A的時候,假定A請求必須通過多個I/O操做A1-An(兩次IO間可能間隔很長時間),每通過一次I/O操做,再調用I/O複用時,I/O複用的調用返回裏,很是可能再也不有A,而是返回了請求B。即請求A會常常被請求B打斷,處理請求B時,又被C打斷。這種思惟下,編程容易出錯。後端
通常來講處理一個網絡請求須要通過如下五個步驟:數組
Reactor模型有三種線程模型:bash
單線程模型中Reactor既負責Accept新的鏈接請求,又負責分派請求到具體的handler中進行處理,通常不使用這種模型,由於單線程效率比較低下。網絡
下面是基於Java NIO單線程Reactor模型的實現:多線程
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { // Reactor設置
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(
new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk =
serverSocket.register(selector,
SelectionKey.OP_ACCEPT); // 監聽Socket鏈接事件
sk.attach(new Acceptor());
}
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey) (it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
}
final class Handler implements Runnable {
static final int READING = 0, SENDING = 1;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
int state = READING;
Handler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
複製代碼
該模型在事件處理器(Handler)鏈部分採用了多線程(線程池),也是後端程序經常使用的模型。模型圖以下:
比起多線程單Rector模型,它是將Reactor分紅兩部分,mainReactor負責監聽並Accept新鏈接,而後將創建的socket經過多路複用器(Acceptor)分派給subReactor。subReactor負責多路分離已鏈接的socket,讀寫網絡數據;業務處理功能,其交給worker線程池完成。一般,subReactor個數上可與CPU個數等同。其模型圖以下:
Netty的線程模型相似於Reactor模型。Netty中ServerBootstrap用於建立服務端,下圖是它的結構:
下面是一個完整的Netty服務端的例子:
public class TimeServer {
public void bind(int port) {
// Netty的多Reactor線程模型,bossGroup是Acceptor線程池,用於接受鏈接。workGroup是Worker線程池,處理業務。
// bossGroup是Acceptor線程池
EventLoopGroup bossGroup = new NioEventLoopGroup();
// workGroup是Worker線程池
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
// 綁定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服務端監聽端口關閉
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
}
}
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// msg轉Buf
ByteBuf buf = (ByteBuf) msg;
// 建立緩衝中字節數的字節數組
byte[] req = new byte[buf.readableBytes()];
// 寫入數組
buf.readBytes(req);
String body = new String(req, "UTF-8");
String currenTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
// 將要返回的信息寫入Buffer
ByteBuf resp = Unpooled.copiedBuffer(currenTime.getBytes());
// buffer寫入通道
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// write讀入緩衝數組後經過invoke flush寫入通道
ctx.flush();
}
}
複製代碼