1.檢測新連接->2.建立NioSocketChannel->3.分配線程及註冊selector->4.向selector註冊讀事件java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
// 省略代碼...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 調用與ServerSocketChannel綁定的
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
複製代碼
doReadMessages()算法
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 接收客戶端鏈接
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// netty建立本身的客戶端channel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
//省略代碼...
}
return 0;
}
複製代碼
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
// 建立channel對應的config
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 指定channel關注讀事件
super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
//省略代碼..
}
}
複製代碼
public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
try {
// 若是不是安卓就true
javaSocket.setTcpNoDelay(tcpNoDelay);
} catch (SocketException e) {
throw new ChannelException(e);
}
return this;
}
當開啓nagle算法時,客戶端首先發送大小爲1字節的第一個分組,隨後其它分組到達發送緩衝區,因爲上一個分組的應答尚未收到,因此TCP會先緩存新來的這4個小分組,並將其從新分組,組成一個大小爲8(2+3+1+2)字節的」較大的」小分組。當第一個小分組的應答收到後,客戶端將這個8字節的分組發送。總共發送的報文段(分組)個數爲2。當傳輸數據存在大量交互數據時,nagle算法能夠有效減小網絡中的報文段個數
/**
* Disable Nagle's algorithm for this connection. Written data * to the network is not buffered pending acknowledgement of * previously written data. */ @Native public final static int TCP_NODELAY = 0x0001; 複製代碼
以前建立Channel過程當中,客戶端和服務端channel有公共成員變量。它們的類繼承關係如圖: promise
服務端Channel的pipeline構成 Head --> ServerBootstrapAcceptor --> tail緩存
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
複製代碼
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 關鍵點在這,這句話的意思是 serverSocketChannel.pipeline().fireChannelRead(nioSocketChannel)
pipeline.fireChannelRead(readBuf.get(i));
}
// serverAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 省略代碼 ...
// child 也就是咱們上面說的 nioSocketChannel
final Channel child = (Channel) msg;
// 那麼這句話的意思就是 把nioSocketChannel註冊到EventLoop的其中一個的selector上
childGroup.register(child).addListener(...);
}
複製代碼
public ChannelFuture register(Channel channel) {
// next()其實就是咱們以前在EventLoop中提到的EventLoop選擇的問題,這塊是個輪詢詳情請看EventLoop那篇文章
return next().register(channel);
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 這塊實際上是調用與客戶端Channel所對應的unsafe
promise.channel().unsafe().register(this, promise);
return promise;
}
// 一直跟到最後的register.
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 這塊尚未監聽具體的事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
複製代碼
// readInterestOp上邊設置的是OP_READ,也就是監聽讀事件
protected void doBeginRead() throws Exception {
...
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
複製代碼