上一篇提到了play底層的網絡通訊基於netty實現,因而粗略地研究了一下netty,總結以下。(netty版本是3.2.5,不一樣版本的實現可能差別較大)react
1、netty的組件編程
channelBuffer:網絡
傳輸Buffer和抽象後的邏輯Buffer的結合,將NIO底層的多個buffer合併成了一個能夠表明完整消息內容的buffer,能夠理解爲一個messagesocket
channel:tcp
對於Java的old IO和NIO的輸入|輸出通道的一個封裝ide
channelPipeline:oop
Netty的ChannelPipeline包含兩條線路:Upstream和Downstream。Upstream對應上行,接收到的消息、被動的狀態改變,都屬於Upstream。Downstream則對應下行,發送的消息、主動的狀態改變,都屬於Downstream。ChannelPipeline接口包含了兩個重要的方法:sendUpstream(ChannelEvent e)和sendDownstream(ChannelEvent e),就分別對應了Upstream和Downstream。this
handler:.net
ChannelPipeline裏包含的ChannelHandler也包含兩類:ChannelUpstreamHandler和ChannelDownstreamHandler。每條線路的Handler是互相獨立的。它們都很簡單的只包含一個方法:ChannelUpstreamHandler.handleUpstream和ChannelDownstreamHandler.handleDownstream。線程
handler要實現messageReceive方法,在這裏面作特定協議的實現
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent messageEvent) throws Exception {
//do something
}
2、編程模型
1.實現本身的handler
2.實現本身的pipeline factory,將handler註冊到pipeline中
3.在netty啓動類中指定本身的pipeline
3、線程模型(Reactor模式)
netty的線程模型採用了reactor模式,以下圖,boss線程(reactor)使用selector接收請求,而後委派給acceptor,worker線程(圖中的線程池表示)從acceptor領取任務並執行。
----------------------------------------------初始化過程-----------------------------------------------------------------
經過閱讀Netty中的NioServerSocketChannelFactory類的代碼,能夠詳細瞭解reactor模式對應的組件的初始化過程。
NioServerSocketChannelFactory負責構造一個NioServerSocket,包含一個ChannelSink的引用,ChannelSink中管理着worker線程池:
NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
workers = new NioWorker[workerCount];
for (int i = 0; i < workers.length; i ++) {
workers[i] = new NioWorker(id , i + 1, workerExecutor);
}
}
NioServerSocketPipelineSink實現ChannelSink接口,包含eventSunk方法,負責severSocker綁定到tcp端口時的初始化工做和創建新的客戶端socket鏈接的處理。以下述代碼所示:
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {//serverSocket綁定端口和建立channel
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {//client socket的鏈接創建
handleAcceptedSocket(e);
}
handlerServerSocket會調用以下的bind方法,初始化扮演mainReactor角色的Boss線程。
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false ;
boolean bossStarted = false;
try {
channel. socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor ;
DeadLockProofWorker. start(
bossExecutor,
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
----------------------------------------------初始化過程-----------------------------------------------------------------
到此爲止Boss線程使用NIO的selector監聽咱們制定的socket端口是否有客戶端鏈接的IO事件發生,它的主循環loop以下:
public void run() {
final Thread currentThread = Thread.currentThread();
channel. shutdownLock.lock();
for (;;) {
if (selector .select(1000) > 0) {
selector.selectedKeys().clear();
}
SocketChannel acceptedSocket = channel. socket.accept();
if (acceptedSocket != null) {
registerAcceptedChannel(acceptedSocket, currentThread);
}
}
}
只幹了一件事情,就是講接收到的客戶端sokcet鏈接註冊到Acceptor中。爲了提升boss線程的處理效率?註冊工做會被抽象成一個runnable對象,放入註冊任務隊列中,交給worker線程池來執行。worker的主循環以下:
thread = Thread.currentThread();
boolean shutdown = false;
Selector selector = this.selector ;
for (;;) {
wakenUp.set( false);
try {
SelectorUtil. select(selector);
if (wakenUp .get()) {
selector.wakeup();
}
cancelledKeys = 0;
processRegisterTaskQueue();
processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys());
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop." , t);
}
}
}
worker中採用了流水線模式,在每一次循環中,執行如下三個任務:
1.從acceptor中拿到mainReactor中接收的客戶端鏈接,註冊到本身的selector中(每一個worker一個selector)
2.若是寫任務隊裏中有數據須要執行,取出一個寫任務並執行
3.對於已經註冊到selector的channel,處理它的read或write IO事件。
4、play線程與netty線程的協做
play線程和netty線程的對於請求的協做處理過程以下:
請求->netty master->netty worker->本身啓動一個新的線程->netty response->netty worker。
下面是協做過程的詳細分析:
1.netty worker轉交請求處理工做給play
PlayHandler裏的messageRecived:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception{
....
Invoker.invoke(new NettyInvokation(request, response, ctx, nettyRequest, e));
}
經過在messageReceived中向線程池提交任務的方式來完成request的業務邏輯處理部分,messageReceived結束後,worker線程脫離這個request,這個request再也不會佔用worker了。這樣不但能夠保持長鏈接不關閉,並且不會佔用netty的worker線程。
2.play將寫回數據響應交回給netty worker
playHandler用來返回響應數據的copyResponse方法,底層會委託給下面這個方法:
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
可見返回的數據以message爲載體又回到了channel中註冊的pipeline中,netty的每個pipeline最後都有一個channelSink負責收尾工做,對於Play應用,這個sink是上面提到的NioServerSocketPipelineSink
private void handleAcceptedSocket (ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
//此處省略
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBuffer .offer(event);
assert offered;
channel. worker.writeFromUserCode(channel);
}
}
worker的writeFromUserCode方法會將寫任務放入worker的寫任務隊列中,而後等待worker的主循環來處理
void writeFromUserCode(final NioSocketChannel channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
return;
}
if (scheduleWriteIfNecessary(channel)) {//放入worker的寫任務隊列
return;
}
// From here, we are sure Thread.currentThread() == workerThread.
if (channel.writeSuspended ) {
return;
}
if (channel.inWriteNowLoop ) {
return;
}
write0(channel);
}
若是在一次循環中write0方法沒有寫完數據,write0方法還會主動設置channel的OP_WRITE狀態,讓主循環的processSelectedKeys方法來處理,一直到數據寫完,才清除這個OP_WRITE狀態。