「 Epoll 是Linux內核的高性能、可擴展的I/O事件通知機制。 java
在linux2.5.44首次引入epoll,它設計的目的旨在取代既有的select、poll系統函數,讓須要大量操做文件描述符的程序得以發揮更優異的性能(wikipedia example: 舊有的系統函數所花費的時間複雜度爲O(n), epoll的時間複雜度O(log n))。epoll實現的功能與poll相似,都是監聽多個文件描述符上的事件。linux
epoll底層是由可配置的操做系統內核對象建構而成,並以文件描述符(file descriptor)的形式呈現於用戶空間(from wikipedia: 在操做系統中,虛擬內存一般會被分紅用戶空間,與核心空間這兩個區段。這是存儲器保護機制中的一環。內核**、核心擴展(kernel extensions)、以及驅動程序,運行在覈心空間**上。而其餘的應用程序,則運行在用戶空間上。全部運行在用戶空間的應用程序,都被統稱爲用戶級(userland))。編程
它是一個用來管理軟件發出的數據I/O的一個程序,並將數據交由CPU和電腦其餘電子組件處理,可是直接對硬件操做是很是複雜的,一般內核提供一種硬件抽象的方法來完成(由內核決定一個程序在何時對某部分硬件操做多長時間),經過這些方法來完成進程間通訊和系統調用。windows
宏內核簡單來講,首先定義了一個高階的抽象接口,叫系統調用(System call))來實現操做系統的功能,例如進程管理,文件系統,和存儲管理等等,這些功能由多個運行在內核態的程序來完成。數組
微內核:緩存
微內核結構由硬件抽象層和系統調用組成;包括了建立一個系統必需的幾個部分;如線程管理,地址空間和進程間通訊等。微核的目標是將系統服務的實現和系統的基本操做規則分離開來。服務器
linux就是使用的宏內核。由於它可以在運行時將模塊調入執行,使擴充內核的功能變得更簡單。markdown
epoll作了什麼事?網絡
epoll 經過使用紅黑樹(RB-tree)搜索被監視的文件描述符(file descriptor)。app
在 epoll 實例上註冊事件時,epoll 會將該事件添加到 epoll 實例的紅黑樹上並註冊一個回調函數,當事件發生時會將事件添加到就緒鏈表中。
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
複製代碼
向內核申請空間,建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值。在最初的實現中,調用者經過 size 參數告知內核須要監聽的文件描述符數量。若是監聽的文件描述符數量超過 size, 則內核會自動擴容。而如今 size 已經沒有這種語義了,可是調用者調用時 size 依然必須大於 0,以保證後向兼容性。須要注意的是,當建立好epoll句柄後,它就是會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的。
向 epfd 對應的內核epoll 實例添加、修改或刪除對 fd 上事件 event 的監聽。op 能夠爲 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分別對應的是添加新的事件,修改文件描述符上監聽的事件類型,從實例上刪除一個事件。若是 event 的 events 屬性設置了 EPOLLET flag,那麼監聽該事件的方式是邊緣觸發。
events能夠是如下幾個宏的集合:
EPOLLIN:觸發該事件,表示對應的文件描述符上有可讀數據。(包括對端SOCKET正常關閉);
EPOLLOUT:觸發該事件,表示對應的文件描述符上能夠寫數據;
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET:將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。
EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏。
例如:
struct epoll_event ev;
//設置與要處理的事件相關的文件描述符
ev.data.fd=listenfd;
//設置要處理的事件類型
ev.events=EPOLLIN|EPOLLET;
//註冊epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
複製代碼
Linux-2.6.19又引入了能夠屏蔽指定信號的epoll_wait: epoll_pwait
接收發生在被偵聽的描述符上的,用戶感興趣的IO事件。簡單點說:經過循環,不斷地監聽暴露的端口,看哪個fd可讀、可寫~
當 timeout 爲 0 時,epoll_wait 永遠會當即返回。而 timeout 爲 -1 時,epoll_wait 會一直阻塞直到任一已註冊的事件變爲就緒。當 timeout 爲一正整數時,epoll 會阻塞直到計時結束或已註冊的事件變爲就緒。由於內核調度延遲,阻塞的時間可能會略微超過 timeout (毫秒級)。
epoll文件描述符用完後,直接用close關閉,而且會自動從被偵聽的文件描述符集合中刪除
說了這麼多原理,腦袋怕嗡嗡的吧,來看看實戰清醒下~
如上知道:每次添加/修改/刪除被偵聽文件描述符都須要調用epoll_ctl,因此要儘可能少地調用epoll_ctl,防止其所引來的開銷抵消其帶來的好處。有的時候,應用中可能存在大量的短鏈接(好比說Web服務器),epoll_ctl將被頻繁地調用,可能成爲這個系統的瓶頸。
傳統的select以及poll的效率會由於在線人數的線形遞增而致使呈二次乃至三次方的降低,這些直接致使了網絡服務器能夠支持的人數有了個比較明顯的限制。這是由於他們有限的文件描述符和遍歷全部的fd所帶來的低效。
當你擁有一個很大的socket集合,不過因爲網絡延時,任一時間只有部分的socket是「活躍」的,可是select/poll每次調用都會線性掃描所有的集合,致使效率呈現線性降低。epoll不存在這個問題,它只會對「活躍」的socket進行操做---這是由於在內核實現中epoll是根據每一個fd上面的callback函數實現的。那麼,只有「活躍」的socket纔會主動的去調用 callback函數,其餘idle(空閒)狀態socket則不會,在這點上,epoll實現了一個「僞」AIO,由於這時候推進力在os內核。在一些 benchmark中,若是全部的socket基本上都是活躍的---好比一個高速LAN環境,epoll並不比select/poll有什麼效率,相反,若是過多使用epoll_ctl,效率相比還有稍微的降低。可是一旦使用idle connections模擬WAN環境,epoll的效率就遠在select/poll之上了。
int epfd = epoll_create(POLL_SIZE);
struct epoll_event ev;
struct epoll_event *events = NULL;
nfds = epoll_wait(epfd, events, 20, 500);
{
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listener) {
//若是是主socket的事件的話,則表示
//有新鏈接進入了,進行新鏈接的處理。
client = accept(listener, (structsockaddr *)&local, &addrlen);
if (client < 0) {
perror("accept");
continue;
}
setnonblocking(client); //將新鏈接置於非阻塞模式
ev.events = EPOLLIN | EPOLLET; //而且將新鏈接也加入EPOLL的監聽隊列。
//注意,這裏的參數EPOLLIN|EPOLLET並無設置對寫socket的監聽,
//若是有寫操做的話,這個時候epoll是不會返回事件的,若是要對寫操做
//也監聽的話,應該是EPOLLIN|EPOLLOUT|EPOLLET
ev.data.fd = client;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {
//設置好event以後,將這個新的event經過epoll_ctl加入到epoll的監聽隊列裏面,
//這裏用EPOLL_CTL_ADD來加一個新的epoll事件,經過EPOLL_CTL_DEL來減小一個
//epoll事件,經過EPOLL_CTL_MOD來改變一個事件的監聽方式。
fprintf(stderr, "epollsetinsertionerror:fd=%d", client);
return -1;
}
}
else if(event[n].events & EPOLLIN)
{
//若是是已經鏈接的用戶,而且收到數據,
//那麼進行讀入
int sockfd_r;
if ((sockfd_r = event[n].data.fd) < 0)
continue;
read(sockfd_r, buffer, MAXSIZE);
//修改sockfd_r上要處理的事件爲EPOLLOUT
ev.data.fd = sockfd_r;
ev.events = EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)
}
else if(event[n].events & EPOLLOUT)
{
//若是有數據發送
int sockfd_w = events[n].data.fd;
write(sockfd_w, buffer, sizeof(buffer));
//修改sockfd_w上要處理的事件爲EPOLLIN
ev.data.fd = sockfd_w;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev)
}
do_use_fd(events[n].data.fd);
}
}
複製代碼
簡單說下流程:
監聽到有新鏈接進入了,進行新鏈接的處理;
若是是已經鏈接的用戶,而且收到數據,讀完以後修改sockfd_r上要處理的事件爲EPOLLOUT(可寫);
若是有數據發送,寫完以後,修改sockfd_w上要處理的事件爲EPOLLIN(可讀)
基礎知識:
文件描述符:
(參考《Unix網絡編程》譯者的註釋)
文件描述符是Unix系統標識文件的int,Unix的哲學一切皆文件,因此各自資源(包括常規意義的文件、目錄、管道、POSIX IPC、socket)均可以當作文件。
Java NIO的世界中,Selector是中央控制器,Buffer是承載數據的容器,而Channel能夠說是最基礎的門面,它是本地I/O設備、網絡I/O的通訊橋樑。
網絡I/O設備:
DatagramChannel:讀寫UDP通訊的數據,對應DatagramSocket類
SocketChannel:讀寫TCP通訊的數據,對應Socket類
ServerSocketChannel:監聽新的TCP鏈接,而且會建立一個可讀寫的SocketChannel,對應ServerSocket類
本地I/O設備:
FileChannel:讀寫本地文件的數據,不支持Selector控制,對應File類
ServerSocketChannel與ServerSocket同樣是socket監聽器,其主要區別前者能夠運行在非阻塞模式下運行;
// 建立一個ServerSocketChannel,將會關聯一個未綁定的ServerSocket
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
複製代碼
ServerSocketChannel的建立也是依賴底層操做系統實現,其實現類主要是ServerSocketChannelImpl,咱們來看看其構造方法
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(var1);
// 建立一個文件操做符
this.fd = Net.serverSocket(true);
// 獲得文件操做符是索引
this.fdVal = IOUtil.fdVal(this.fd);
this.state = 0;
}
複製代碼
新建一個ServerSocketChannelImpl其本質是在底層操做系統建立了一個fd(即文件描述符),至關於創建了一個用於網絡通訊的通道,調用socket的bind()方法綁定,經過accept()調用操做系統獲取TCP鏈接
public SocketChannel accept() throws IOException {
// 忽略一些校驗及無關代碼
....
SocketChannelImpl var2 = null;
// var3的做用主要是說明當前的IO狀態,主要有
/** * EOF = -1; * UNAVAILABLE = -2; * INTERRUPTED = -3; * UNSUPPORTED = -4; * THROWN = -5; * UNSUPPORTED_CASE = -6; */
int var3 = 0;
// 這裏本質也是用fd來獲取鏈接
FileDescriptor var4 = new FileDescriptor();
// 用來存儲TCP鏈接的地址信息
InetSocketAddress[] var5 = new InetSocketAddress[1];
try {
// 這裏設置了一箇中斷器,中斷時會將鏈接關閉
this.begin();
// 這裏當IO被中斷時,會從新獲取鏈接
do {
var3 = this.accept(this.fd, var4, var5);
} while(var3 == -3 && this.isOpen());
}finally {
// 當鏈接被關閉且accept失敗時或拋出AsynchronousCloseException
this.end(var3 > 0);
// 驗證鏈接是可用的
assert IOStatus.check(var3);
}
if (var3 < 1) {
return null;
} {
// 默認鏈接是阻塞的
IOUtil.configureBlocking(var4, true);
// 建立一個SocketChannel的引用
var2 = new SocketChannelImpl(this.provider(), var4, var5[0]);
// 下面是是否鏈接成功校驗,這裏忽略...
return var2;
}
}
// 依賴底層操做系統實現的accept0方法
private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException {
return this.accept0(var1, var2, var3);
}
複製代碼
用於讀寫TCP通訊的數據,至關於客戶端
經過open方法建立SocketChannel,
而後利用connect方法來和服務端發起創建鏈接,還支持了一些判斷鏈接創建狀況的方法;
read和write支持最基本的讀寫操做
open
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
// State, increases monotonically
private static final int ST_UNINITIALIZED = -1;
private static final int ST_UNCONNECTED = 0;
private static final int ST_PENDING = 1;
private static final int ST_CONNECTED = 2;
private static final int ST_KILLPENDING = 3;
private static final int ST_KILLED = 4;
private int state = ST_UNINITIALIZED;
SocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
// 建立一個scoket通道,即fd(fd的做用可參考上面的描述)
this.fd = Net.socket(true);
// 獲得該fd的索引
this.fdVal = IOUtil.fdVal(fd);
// 設置爲未鏈接
this.state = ST_UNCONNECTED;
}
複製代碼
// 代碼均來自JDK1.8 部分代碼
public boolean connect(SocketAddress var1) throws IOException {
boolean var2 = false;
// 讀寫都鎖住
synchronized(this.readLock) {
synchronized(this.writeLock) {
/****狀態檢查,channel和address****/
// 判斷channel是否open
this.ensureOpenAndUnconnected();
InetSocketAddress var5 = Net.checkAddress(var1);
SecurityManager var6 = System.getSecurityManager();
if (var6 != null) {
var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort());
}
boolean var10000;
/****鏈接創建****/
// 阻塞狀態變動的鎖也鎖住
synchronized(this.blockingLock()) {
int var8 = 0;
try {
try {
this.begin();
// 若是當前socket未綁定本地端口,則嘗試着判斷和服務端是否能創建鏈接
synchronized(this.stateLock) {
if (!this.isOpen()) {
boolean var10 = false;
return var10;
}
if (this.localAddress == null) {
// 和遠程創建鏈接後關閉鏈接
NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort());
}
this.readerThread = NativeThread.current();
}
do {
InetAddress var9 = var5.getAddress();
if (var9.isAnyLocalAddress()) {
var9 = InetAddress.getLocalHost();
}
// 創建鏈接
var8 = Net.connect(this.fd, var9, var5.getPort());
} while(var8 == -3 && this.isOpen());
synchronized(this.stateLock) {
this.remoteAddress = var5;
if (var8 <= 0) {
if (!this.isBlocking()) {
this.state = 1;
} else {
assert false;
}
} else {
this.state = 2;// 鏈接成功
if (this.isOpen()) {
this.localAddress = Net.localAddress(this.fd);
}
var10000 = true;
return var10000;
}
}
}
var10000 = false;
return var10000;
}
}
}
複製代碼
在創建在綁定地址以前,咱們須要調用NetHooks.beforeTcpBind,這個方法是將fd轉換爲SDP(Sockets Direct Protocol,Java套接字直接協議) socket。SDP須要網卡支持InfiniBand高速網絡通訊技術,windows不支持該協議。
咱們來看看在openjdk: src\solaris\classes\sun\net下的NetHooks.java
private static final Provider provider = new sun.net.sdp.SdpProvider();
public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
{
provider.implBeforeTcpBind(fdObj, address, port);
}
public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
{
provider.implBeforeTcpConnect(fdObj, address, port);
}
複製代碼
能夠看到實際是調用的SdpProvider裏的implBeforeTcpBind
@Override
public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
if (enabled)
convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);
}
// converts unbound TCP socket to a SDP socket if it matches the rules
private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port) throws IOException {
boolean matched = false;
// 主要是先經過規則校驗器判斷入參是否符合,通常有PortRangeRule校驗器
// 而後再執行將fd轉換爲socket
for (Rule rule: rules) {
if (rule.match(action, address, port)) {
SdpSupport.convertSocket(fdObj);
matched = true;
break;
}
}
}
public static void convertSocket(FileDescriptor fd) throws IOException {
...
//獲取fd索引
int fdVal = fdAccess.get(fd);
convert0(fdVal);
}
// convert0
JNIEXPORT void JNICALL Java_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd) {
// create方法實際是經過socket(AF_INET_SDP, SOCK_STREAM, 0);方法獲得一個socket
int s = create(env);
if (s >= 0) {
socklen_t len;
int arg, res;
struct linger linger;
/* copy socket options that are relevant to SDP */
len = sizeof(arg);
// 重用TIME_WAIT的端口
if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0)
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len);
len = sizeof(arg);
// 緊急數據放入普通數據流
if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0)
setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len);
len = sizeof(linger);
// 延遲關閉鏈接
if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0)
setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len);
// 將fd也引用到s所持有的通道
RESTARTABLE(dup2(s, fd), res);
if (res < 0)
JNU_ThrowIOExceptionWithLastError(env, "dup2");
// 執行close方法,關閉s這個引用
RESTARTABLE(close(s), res);
}
}
複製代碼
public int read(ByteBuffer var1) throws IOException {
// 省略一些判斷
synchronized(this.readLock) {
this.begin();
synchronized(this.stateLock) {
do {
// 經過IOUtil的讀取fd的數據至buf
// 這裏的nd是SocketDispatcher,用於調用底層的read和write操做
var3 = IOUtil.read(this.fd, var1, -1L, nd);
} while(var3 == -3 && this.isOpen());
// 這個方法主要是將UNAVAILABLE(原爲-2)這個狀態返回0,不然返回n
var4 = IOStatus.normalize(var3);
var20 = false;
break label367;
}
this.readerCleanup();
assert IOStatus.check(var3);
}
}
}
}
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
if (var1.isReadOnly()) {
throw new IllegalArgumentException("Read-only buffer");
} else if (var1 instanceof DirectBuffer) {
return readIntoNativeBuffer(var0, var1, var2, var4);
} else {
// 臨時緩衝區,大小爲buf的remain(limit - position),堆外內存,使用ByteBuffer.allocateDirect(size)分配
// Notes:這裏分配後後面有個try-finally塊會釋放該部份內存
ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
int var7;
try {
// 將網絡中的buf讀進direct buffer
int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
var5.flip();// 待讀取
if (var6 > 0) {
var1.put(var5);// 成功時寫入
}
var7 = var6;
} finally {
Util.offerFirstTemporaryDirectBuffer(var5);
}
return var7;
}
}
private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
// 忽略變量init
if (var2 != -1L) {
// pread方法只有在同步狀態下才能使用
var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);
} else {
// 其調用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法
var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);
}
if (var9 > 0) {
var1.position(var5 + var9);
}
return var9;
}
}
// 一樣找到openjdk:src\solaris\native\sun\nio\ch
//FileDispatcherImpl.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,
jobject fdo, jlong address, jint len)
{
jint fd = fdval(env, fdo);// 獲取fd索引
void *buf = (void *)jlong_to_ptr(address);
// 調用底層read方法
return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}
複製代碼
初始化一個direct buffer,若是自己的buffer就是direct的則不用初始化
調用底層read方法寫入至direct buffer
最終將direct buffer寫到傳入的buffer對象
看完了前面的read,write整個執行流程基本同樣,具體的細節參考以下
public int write(ByteBuffer var1) throws IOException {
if (var1 == null) {
throw new NullPointerException();
} else {
synchronized(this.writeLock) {
this.ensureWriteOpen();
this.begin();
synchronized(this.stateLock) {
if (!this.isOpen()) {
var5 = 0;
var20 = false;
break label310;
}
this.writerThread = NativeThread.current();
}
do {
// 經過IOUtil的讀取fd的數據至buf
// 這裏的nd是SocketDispatcher,用於調用底層的read和write操做
var3 = IOUtil.write(this.fd, var1, -1L, nd);
} while(var3 == -3 && this.isOpen());
var4 = IOStatus.normalize(var3);
var20 = false;
this.writerCleanup();
assert IOStatus.check(var3);
return var4;
}
}
}
}
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
if (var1 instanceof DirectBuffer) {
return writeFromNativeBuffer(var0, var1, var2, var4);
} else {
ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
int var10;
try {
// 這裏的pos爲buf初始的position,意思是將buf重置爲最初的狀態;由於目前尚未真實的寫入到channel中
var8.put(var1);
var8.flip();
var1.position(var5);
// 調用
int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
if (var9 > 0) {
var1.position(var5 + var9);
}
var10 = var9;
} finally {
Util.offerFirstTemporaryDirectBuffer(var8);
}
return var10;
}
}
IOUtil.writeFromNativeBuffer(fd , buf , position , nd)
{
// ... 忽略一些獲取buf變量的代碼
int written = 0;
if (position != -1) {
// pread方法只有在同步狀態下才能使用
written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position);
} else {
// 其調用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法
written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
}
//....
}
FileDispatcherImpl.write0
{
// 調用底層的write方法寫入
return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);
}
}
複製代碼
若是buf是direct buffer則直接開始寫入,不然須要初始化一個direct buffer,大小是buf的remain
將buf的內容寫入到direct buffer中,並恢復buf的position
調用底層的write方法寫入至channel
更新buf的position,即被direct buffer讀取內容後的position
耐心一點,立刻就到Epoll了
理解了前面的一些基礎知識,接下來的部分就會涉及到Java是怎麼樣來使用epoll的。
Selector的做用是Java NIO中管理一組多路複用的SelectableChannel對象,並可以識別通道是否爲諸如讀寫事件作好準備的組件 --Java doc
Selector的建立過程以下:
// 1.建立Selector
Selector selector = Selector.open();
// 2.將Channel註冊到選擇器中
// ....... new channel的過程 ....
//Notes:channel要註冊到Selector上就必須是非阻塞的,因此FileChannel是不能夠
//使用Selector的,由於FileChannel是阻塞的
channel.configureBlocking(false);
// 第二個參數指定了咱們對 Channel 的什麼類型的事件感興趣
SelectionKey key = channel.register(selector , SelectionKey.OP_READ);
// 也可使用或運算|來組合多個事件,例如
SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// 不過值得注意的是,一個 Channel 僅僅能夠被註冊到一個 Selector 一次,
// 若是將 Channel 註冊到 Selector 屢次, 那麼其實就是至關於更新 SelectionKey
//的 interest set.
複製代碼
①一個Channel在Selector註冊其表明的是一個SelectionKey事件,SelectionKey的類型包括:
OP_READ:可讀事件;值爲:1<<0
OP_WRITE:可寫事件;值爲:1<<2
OP_CONNECT:客戶端鏈接服務端的事件(tcp鏈接),通常爲建立SocketChannel客戶端channel;值爲:1<<3
OP_ACCEPT:服務端接收客戶端鏈接的事件,通常爲建立ServerSocketChannel服務端channel;值爲:1<<4
②一個Selector內部維護了三組keys:
key set:當前channel註冊在Selector上全部的key;可調用keys()獲取
selected-key set:當前channel就緒的事件;可調用selectedKeys()獲取
cancelled-key:主動觸發SelectionKey#cancel()方法會放在該集合,前提條件是該channel沒有被取消註冊;不可經過外部方法調用
③Selector類中總共包含如下10個方法:
open():建立一個Selector對象
isOpen():是不是open狀態,若是調用了close()方法則會返回false
provider():獲取當前Selector的Provider
keys():如上文所述,獲取當前channel註冊在Selector上全部的key
selectedKeys():獲取當前channel就緒的事件列表
selectNow():獲取當前是否有事件就緒,該方法當即返回結果,不會阻塞;若是返回值>0,則表明存在一個或多個
select(long timeout):selectNow的阻塞超時方法,超時時間內,有事件就緒時纔會返回;不然超過期間也會返回
select():selectNow的阻塞方法,直到有事件就緒時纔會返回
wakeup():調用該方法會時,阻塞在select()處的線程會立馬返回;(ps:下面一句劃重點)即便當前不存在線程阻塞在select()處,那麼下一個執行select()方法的線程也會當即返回結果,至關於執行了一次selectNow()方法
close(): 用完Selector後調用其close()方法會關閉該Selector,且使註冊到該Selector上的全部SelectionKey實例無效。channel自己並不會關閉。
談到Selector就不得不提SelectionKey,二者是緊密關聯,配合使用的;如上文所示,往Channel註冊Selector會返回一個SelectionKey對象, 這個對象包含了以下內容:
interest set,當前Channel感興趣的事件集,即在調用register方法設置的interes set
ready set
channel
selector
attached object,可選的附加對象
// 返回當前感興趣的事件列表
int interestSet = key.interestOps();
// 也可經過interestSet判斷其中包含的事件
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
// 能夠經過interestOps(int ops)方法修改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);
複製代碼
int readySet = key.readyOps();
// 也可經過四個方法來分別判斷不一樣事件是否就緒
key.isReadable(); //讀事件是否就緒
key.isWritable(); //寫事件是否就緒
key.isConnectable(); //客戶端鏈接事件是否就緒
key.isAcceptable(); //服務端鏈接事件是否就緒
複製代碼
// 返回當前事件關聯的通道,可轉換的選項包括:`ServerSocketChannel`和`SocketChannel`
Channel channel = key.channel();
//返回當前事件所關聯的Selector對象
Selector selector = key.selector();
複製代碼
attached object 咱們能夠在selectionKey中附加一個對象,或者在註冊時直接附加:
key.attach(theObject);
Object attachedObj = key.attachment();
// 在註冊時直接附加
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
複製代碼
萬丈高樓平地起,基礎知識差很少了,瞭解了這些,能夠找一些nio demo或者netty demo練練手。接下來說解本節比較重要的~epoll
前面屢次提到了openjdk,seletor的具體實現確定是跟操做系統有關的,咱們一塊兒來看看。
能夠看到Selector的實現是SelectorImpl, 而後SelectorImpl又將職責委託給了具體的平臺,好比圖中的linux2.6 EpollSelectorImpl,windows是WindowsSelectorImpl,MacOSX是KQueueSelectorImpl
根據前面咱們知道,Selector.open()能夠獲得一個Selector實例,怎麼實現的呢?
// Selector.java
public static Selector open() throws IOException {
// 首先找到provider,而後再打開Selector
return SelectorProvider.provider().openSelector();
}
// java.nio.channels.spi.SelectorProvider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 這裏就是打開Selector的真正方法
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
複製代碼
在openjdk中,每一個操做系統都有一個sun.nio.ch.DefaultSelectorProvider實現,以srcsolaris\classes\sun\nio\ch下的DefaultSelectorProvider爲例:
/** * Returns the default SelectorProvider. */
public static SelectorProvider create() {
// 獲取OS名稱
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
// 根據名稱來建立不一樣的Selctor
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
複製代碼
打開srcsolaris\classes\sun\nio\ch下的EPollSelectorProvider.java
public class EPollSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
複製代碼
Linux平臺就獲得了最終的Selector實現:srcsolaris\classes\sun\nio\ch下的EPollSelectorImpl.java
來看看它實現的構造器:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
// makePipe返回管道的2個文件描述符,編碼在一個long類型的變量中
// 高32位表明讀 低32位表明寫
// 使用pipe爲了實現Selector的wakeup邏輯
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 新建一個EPollArrayWrapper
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
複製代碼
\src\solaris\native\sun\nio\ch下的EPollArrayWrapper.c
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) {
/* * epoll_create expects a size as a hint to the kernel about how to * dimension internal structures. We can't predict the size in advance. */
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
複製代碼
調用Selector.select(返回鍵的數量,多是零)最後會委託給各個實現的doSelect方法,限於篇幅不貼出太詳細的,這裏看下EpollSelectorImpl的doSelect方法
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
//EPollArrayWrapper pollWrapper
pollWrapper.poll(timeout);//重點在這裏
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();// 後面會講到
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
int poll(long timeout) throws IOException {
updateRegistrations();// 這個代碼在下面講,涉及到epoo_ctl
// 這個epollWait是否是有點熟悉呢?
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
複製代碼
看下EPollArrayWrapper.c
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
//系統調用等待內核事件
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
複製代碼
能夠看到在linux中Selector.select()實際上是調用了epoll_wait
JDK中對於註冊到Selector上的IO事件關係是使用SelectionKey來表示,表明了Channel感興趣的事件,如Read,Write,Connect,Accept.
調用Selector.register()時均會將事件存儲到EpollArrayWrapper.java的成員變量eventsLow和eventsHigh中
// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用數組保存事件變動, 數組的最大長度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超過數組長度的事件會緩存到這個map中,等待下次處理
private Map<Integer,Byte> eventsHigh;
/** * Sets the pending update events for the given file descriptor. This * method has no effect if the update events is already set to KILLED, * unless {@code force} is {@code true}. */
private void setUpdateEvents(int fd, byte events, boolean force) {
// 判斷fd和數組長度
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
/** * Returns the pending update events for the given file descriptor. */
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
複製代碼
在上面poll代碼中涉及到
int poll(long timeout) throws IOException {
updateRegistrations();/
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
// 從保存的eventsLow和eventsHigh裏取出事件
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
if (isRegistered) {
// 判斷操做類型以傳給epoll_ctl
// 沒有指定EPOLLET事件類型
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 熟悉的epoll_ctl
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
複製代碼
能夠看到epollCtl調用的native方法,咱們進入EpollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// epoll_ctl這裏就不用多說了吧
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
複製代碼
在doSelect方法poll執行後,會更新EpollSelectorImpl.java裏的 updateSelectedKeys,就是Selector裏的三個set集合,具體可看前面。
/**
*更新已被epoll選擇fd的鍵。
*將就緒興趣集添加到就緒隊列。
*/
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
複製代碼
總結
經過本文,你應該知道Channel、Selector基本原理和在Java中怎麼使用Epoll的。 (包括更細節的fd與channel和socket之間的轉換關係)掌握這些基礎知識,再去看NIO、netty網絡框架的源碼可能就沒有那麼吃力了。在接下來的文章裏我會跟進關於Netty的文章,畢竟這已成爲分佈式網絡通訊框架的主流了!
感謝
zh.wikipedia.org/wiki/Epoll 維基百科
真心感謝帥逼靚女們能看到這裏,若是這個文章寫得還不錯,以爲有點東西的話
求點贊👍 求關注❤️ 求分享👥 對8塊腹肌的我來講真的 很是有用!!!
若是本篇博客有任何錯誤,請批評指教,不勝感激 !❤️❤️❤️❤️