IO分兩個階段:
1.通知內核準備數據。2.數據從內核緩衝區拷貝到應用緩衝區
根據這2點IO類型能夠分紅:
1.阻塞IO,在兩個階段上面都是阻塞的。
2.非阻塞IO,在第1階段,程序不斷的輪詢直到數據準備好,第2階段仍是阻塞的
3.IO複用,在第1階段,當一個或者多個IO準備就緒時,通知程序,第2階段仍是阻塞的,在第1階段仍是輪詢實現的,只是
全部的IO都集中在一個地方,這個地方進行輪詢
4.信號IO,當數據準備完畢的時候,信號通知程序數據準備完畢,第2階段阻塞
5.異步IO,1,2都不阻塞,windows的iocp是真正的異步IO
Java NIO
java NIO在linux上面是用epoll實現的,屬於IO複用類型。
Selector:IO的多路複用器,通道須要向其註冊,在數據準備階段由他進行狀態的輪詢
SelectionKey:通道向selector註冊後會建立一個SelectionKey,SelectionKey維繫通道和selector的關係.SelectionKey包含兩個整數集一個爲interest集合,一個爲ready集合.interest集合指定Selector須要監聽的事件.ready集合爲Selector爲SelectorKey監聽後已經準備就緒的能夠進行操做的事件.ready集合特別須要注意,這個裏面可能有阻塞的行爲,如OP_READ事件,只是暗示可讀,可是真正的數據此時尚未到來,此時就會阻塞了。
epoll有三個方法epoll_create(),epoll_ctl(),epoll_wait():
int epoll_create(int size)
建立epoll文件,用於存放epoll_ctl註冊的event。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
註冊fd到epfd中,op爲操做數(add,mod,delete) ,epoll_event爲註冊感興趣的事件,這個裏面也註冊了回調函數,當對應的fd的設備ready時,就調用回調函數,將這個fd加入epfd的ready set當中,epoll_wait()一直就在那裏等待ready set。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
等待ready set,當準備好有數據的時候返回數據的個數,epoll_event爲感興趣的事件集合,maxevents爲事件集合的個數。
JDK中向Selector註冊事件流程(以linux epoll爲例):
關鍵代碼以下:
AbstractSelectableChannel.register():
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
if (!isOpen()) //channel關閉拋異常
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)//不合理的註冊值,拋異常
throw new IllegalArgumentException();
synchronized (regLock) {//有鎖就有可能有線程的阻塞和切換 關鍵點1
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel); //查看原來有沒有註冊過
if (k != null) {//註冊過直接設置後返回
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);//沒有註冊的話執行selector的register
addKey(k);
}
return k;
}
}
SelectionImpl.register():
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object p_w_upload)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(p_w_upload);
synchronized (publicKeys) {//又是鎖,可能阻塞或者線程切換
implRegister(k);
}
k.interestOps(ops);
return k;
}
EPollSelectorImpl.implRegister():
protected void implRegister(SelectionKeyImpl ski) {
SelChImpl ch = ski.channel;
fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);//關鍵點2 fd和Selectionkey對應的map
pollWrapper.add(ch);
keys.add(ski);
}
EPollArrayWrapper.add():
void add(SelChImpl channel) {
synchronized (updateList) {
updateList.add(new Updator(channel, EPOLL_CTL_ADD));//關鍵點3
}
}
關鍵點1.register()方法中有同步語句,可能當前線程就阻塞了,線程切換會有性能的損耗。
關鍵點2.fdToKey是個key爲fd,value爲selectionKey的map
關鍵點3.Updator是個內部類:
private static class Updator {
SelChImpl channel;
int opcode;
int events;
Updator(SelChImpl channel, int opcode, int events) {
this.channel = channel;
this.opcode = opcode;
this.events = events;
}
Updator(SelChImpl channel, int opcode) {
this(channel, opcode, 0);
}
}
表明channel以及對channel的操做類型以及操做的事件,新註冊的操做類型爲add,操做事件爲0,表明沒有事件。
在SelectionImpl.register()中最後還要執行SelectionKeyImpl.interestOps()方法註冊操做事件(前面添加的時候操做事件是爲0的)
SelectionKeyImpl:
public SelectionKey interestOps(int ops) {
ensureValid();//檢測是否可用
return nioInterestOps(ops);
}
SelectionKey nioInterestOps(int ops) { // package-private
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
//真正的進行epoll感興趣事件的註冊
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
//進行java的ops和epoll的ops轉換,每一個具體的channel是有區別的
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= PollArrayWrapper.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= PollArrayWrapper.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= PollArrayWrapper.POLLCONN;
sk.selector.putEventOps(sk, newOps);
}
void setInterest(SelChImpl channel, int mask) {
synchronized (updateList) {
// if the previous pending operation is to add this file descriptor
// to epoll then update its event set
if (updateList.size() > 0) {//關鍵點1
Updator last = updateList.getLast();
//這個確定是剛纔那個註冊的channel,直接進行事件的更新
if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
last.events = mask;
return;
}
}
// update existing registration 程序運行到這裏的話,說明前面已經有更新的updator加入了,這裏只好新加入一個
updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
}
}
關鍵點1.這裏是考慮到併發的問題了,可是我有個疑問爲何註冊要分兩個步驟執行,爲何不直接在EPollSelectorImpl.implRegister()加入 updateList.add(new Updator(channel, EPOLL_CTL_ADD, mask))呢?
至此register()動做已經完成。
select()
代碼以下:
public int select(long timeout)
throws IOException
{
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen())
throw new ClosedSelectorException();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}
protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException();
//反註冊過程 刪除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);//關鍵點1
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
//遍歷取消的key
Set cks = cancelledKeys();
synchronized (cks) {
Iterator i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
//底層的取消實現
implDereg(ski);
} catch (SocketException se) {
IOException ioe = new IOException(
"Error deregistering key");
ioe.initCause(se);
throw ioe;
} finally {
//刪除取消的key
i.remove();
}
}
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(new Integer(fd));//hashmap中去除
pollWrapper.release(ch);//這步很關鍵
ski.setIndex(-1);
keys.remove(ski);//總的key set去除
selectedKeys.remove(ski);//已經準備好的set去除
deregister((AbstractSelectionKey)ski);//移除channel的集合
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
void release(SelChImpl channel) {
//空閒隊列刪除了
synchronized (updateList) {
// flush any pending updates
for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
if (it.next().channel == channel) {
it.remove();
}
}
// remove from the idle set (if present)
idleSet.remove(channel);
//調用native 通知本channel對應的fd被被刪除
// remove from epoll (if registered)
epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
}
}
看關鍵點1的代碼:
int poll(long timeout) throws IOException {
updateRegistrations();//在poll前 先把update裏面不須要的條目處理掉
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
void updateRegistrations() {
synchronized (updateList) {
Updator u = null;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
// if the events are 0 then file descriptor is put into "idle
// set" to prevent it being polled
if (u.events == 0) {//這個表示interest事件爲0
boolean added = idleSet.add(u.channel); //關鍵點1
//先加入到idleSet裏面 若是是此次加入的 並且操做行爲是mod,那麼就是個刪除這個channel對應的fd
// if added to idle set then remove from epoll if registered
if (added && (u.opcode == EPOLL_CTL_MOD))
epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
} else {
//關鍵點 2
// events are specified. If file descriptor was in idle set
// it must be re-registered (by converting opcode to ADD)
boolean idle = false;
//若是idleSet不爲空並且有這個Updator 說明關鍵點1處代碼返回true,操做行爲爲add,mod的話在epollCtl會被刪除掉
if (!idleSet.isEmpty())
idle = idleSet.remove(u.channel);
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
epollCtl(epfd, opcode, ch.getFDVal(), u.events);
}
}
}
}
updateList是在EPollArrayWrapper.setInterest()和add()方法中添加的,當有多個線程的時候,經過channel register()有可能剛加入的Updator會被updateRegistrations()獲得,獲得的就是Channel第一次register的updatot,這個時候events爲0,被加入到idleSet接着setInterest()被調用(channel register()最後一步),多了一個updator,這個時候再執行Selector.select(),顯然會到關鍵點2操做行爲add,是沒有問題的。執行完updateRegistrations()方法,而後就epollWait()方法的調用,這個就是epoll的native方法了
總結:
1.Channel的register方法最後加入channel的感興趣的事件到updatorList中
2.Selector的select的方法主要是對updatorList進行運做,首先去除全部cancelkey(),也就刪除了對應的底層的updatorList的條目,而後迭代updatorList根據updator的event事件進行處理,也就是執行epoll的epollCtl方法,以後就是執行epollWait等待epollCtl的channel對應的callback函數的執行了。