熬夜肝了-EPOLL原理分析與java實戰|Java 開發實戰

「 Epoll 是Linux內核的高性能、可擴展的I/O事件通知機制。 圖片來自 Pexelsjava

在linux2.5.44首次引入epoll,它設計的目的旨在取代既有的select、poll系統函數,讓須要大量操做文件描述符的程序得以發揮更優異的性能(wikipedia example: 舊有的系統函數所花費的時間複雜度爲O(n), epoll的時間複雜度O(log n))。epoll實現的功能與poll相似,都是監聽多個文件描述符上的事件。linux

epoll底層是由可配置的操做系統內核對象建構而成,並以文件描述符(file descriptor)的形式呈現於用戶空間(from wikipedia: 在操做系統中,虛擬內存一般會被分紅用戶空間,與核心空間這兩個區段。這是存儲器保護機制中的一環。內核**、核心擴展(kernel extensions)、以及驅動程序,運行在覈心空間**上。而其餘的應用程序,則運行在用戶空間上。全部運行在用戶空間的應用程序,都被統稱爲用戶級(userland))。編程

多說一點關於內核的

它是一個用來管理軟件發出的數據I/O的一個程序,並將數據交由CPU和電腦其餘電子組件處理,可是直接對硬件操做是很是複雜的,一般內核提供一種硬件抽象的方法來完成(由內核決定一個程序在何時對某部分硬件操做多長時間),經過這些方法來完成進程間通訊和系統調用。windows

宏內核:

宏內核簡單來講,首先定義了一個高階的抽象接口,叫系統調用(System call))來實現操做系統的功能,例如進程管理,文件系統,和存儲管理等等,這些功能由多個運行在內核態的程序來完成。數組

微內核:緩存

微內核結構由硬件抽象層和系統調用組成;包括了建立一個系統必需的幾個部分;如線程管理,地址空間和進程間通訊等。微核的目標是將系統服務的實現和系統的基本操做規則分離開來。服務器

linux就是使用的宏內核。由於它可以在運行時將模塊調入執行,使擴充內核的功能變得更簡單。markdown

epoll作了什麼事?網絡

epoll 經過使用紅黑樹(RB-tree)搜索被監視的文件描述符(file descriptor)。app

在 epoll 實例上註冊事件時,epoll 會將該事件添加到 epoll 實例的紅黑樹上並註冊一個回調函數,當事件發生時會將事件添加到就緒鏈表中。

epoll的結構?

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
複製代碼

①epoll_create

向內核申請空間,建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大。這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值。在最初的實現中,調用者經過 size 參數告知內核須要監聽的文件描述符數量。若是監聽的文件描述符數量超過 size, 則內核會自動擴容。而如今 size 已經沒有這種語義了,可是調用者調用時 size 依然必須大於 0,以保證後向兼容性。須要注意的是,當建立好epoll句柄後,它就是會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的。

②epoll_ctl

向 epfd 對應的內核epoll 實例添加、修改或刪除對 fd 上事件 event 的監聽。op 能夠爲 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分別對應的是添加新的事件,修改文件描述符上監聽的事件類型,從實例上刪除一個事件。若是 event 的 events 屬性設置了 EPOLLET flag,那麼監聽該事件的方式是邊緣觸發。

events能夠是如下幾個宏的集合:

  • EPOLLIN:觸發該事件,表示對應的文件描述符上有可讀數據。(包括對端SOCKET正常關閉);

  • EPOLLOUT:觸發該事件,表示對應的文件描述符上能夠寫數據;

  • EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裏應該表示有帶外數據到來);

  • EPOLLERR:表示對應的文件描述符發生錯誤;

  • EPOLLHUP:表示對應的文件描述符被掛斷;

  • EPOLLET:將EPOLL設爲邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來講的。

  • EPOLLONESHOT:只監聽一次事件,當監聽完此次事件以後,若是還須要繼續監聽這個socket的話,須要再次把這個socket加入到EPOLL隊列裏。

例如:

struct epoll_event ev;
//設置與要處理的事件相關的文件描述符
ev.data.fd=listenfd;
//設置要處理的事件類型
ev.events=EPOLLIN|EPOLLET;
//註冊epoll事件
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
複製代碼

③epoll_wait

Linux-2.6.19又引入了能夠屏蔽指定信號的epoll_wait: epoll_pwait

接收發生在被偵聽的描述符上的,用戶感興趣的IO事件。簡單點說:經過循環,不斷地監聽暴露的端口,看哪個fd可讀、可寫~

當 timeout 爲 0 時,epoll_wait 永遠會當即返回。而 timeout 爲 -1 時,epoll_wait 會一直阻塞直到任一已註冊的事件變爲就緒。當 timeout 爲一正整數時,epoll 會阻塞直到計時結束或已註冊的事件變爲就緒。由於內核調度延遲,阻塞的時間可能會略微超過 timeout (毫秒級)。

epoll文件描述符用完後,直接用close關閉,而且會自動從被偵聽的文件描述符集合中刪除

epoll實戰

說了這麼多原理,腦袋怕嗡嗡的吧,來看看實戰清醒下~

如上知道:每次添加/修改/刪除被偵聽文件描述符都須要調用epoll_ctl,因此要儘可能少地調用epoll_ctl,防止其所引來的開銷抵消其帶來的好處。有的時候,應用中可能存在大量的短鏈接(好比說Web服務器),epoll_ctl將被頻繁地調用,可能成爲這個系統的瓶頸。

傳統的select以及poll的效率會由於在線人數的線形遞增而致使呈二次乃至三次方的降低,這些直接致使了網絡服務器能夠支持的人數有了個比較明顯的限制。這是由於他們有限的文件描述符和遍歷全部的fd所帶來的低效。

重點哦~

當你擁有一個很大的socket集合,不過因爲網絡延時,任一時間只有部分的socket是「活躍」的,可是select/poll每次調用都會線性掃描所有的集合,致使效率呈現線性降低。epoll不存在這個問題,它只會對「活躍」的socket進行操做---這是由於在內核實現中epoll是根據每一個fd上面的callback函數實現的。那麼,只有「活躍」的socket纔會主動的去調用 callback函數,其餘idle(空閒)狀態socket則不會,在這點上,epoll實現了一個「僞」AIO,由於這時候推進力在os內核。在一些 benchmark中,若是全部的socket基本上都是活躍的---好比一個高速LAN環境,epoll並不比select/poll有什麼效率,相反,若是過多使用epoll_ctl,效率相比還有稍微的降低。可是一旦使用idle connections模擬WAN環境,epoll的效率就遠在select/poll之上了。

int epfd = epoll_create(POLL_SIZE);
    struct epoll_event ev;
    struct epoll_event *events = NULL;
    nfds = epoll_wait(epfd, events, 20, 500);
    {
        for (n = 0; n < nfds; ++n) {
            if (events[n].data.fd == listener) {
                //若是是主socket的事件的話,則表示
                //有新鏈接進入了,進行新鏈接的處理。
                client = accept(listener, (structsockaddr *)&local, &addrlen);
                if (client < 0) {
                    perror("accept");
                    continue;
                }
                setnonblocking(client);        //將新鏈接置於非阻塞模式
                ev.events = EPOLLIN | EPOLLET; //而且將新鏈接也加入EPOLL的監聽隊列。
                //注意,這裏的參數EPOLLIN|EPOLLET並無設置對寫socket的監聽,
                //若是有寫操做的話,這個時候epoll是不會返回事件的,若是要對寫操做
                //也監聽的話,應該是EPOLLIN|EPOLLOUT|EPOLLET
                ev.data.fd = client;
                if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                    //設置好event以後,將這個新的event經過epoll_ctl加入到epoll的監聽隊列裏面,
                    //這裏用EPOLL_CTL_ADD來加一個新的epoll事件,經過EPOLL_CTL_DEL來減小一個
                    //epoll事件,經過EPOLL_CTL_MOD來改變一個事件的監聽方式。
                    fprintf(stderr, "epollsetinsertionerror:fd=%d", client);
                    return -1;
                }
            }
            else if(event[n].events & EPOLLIN)
            {
                //若是是已經鏈接的用戶,而且收到數據,
                //那麼進行讀入
                int sockfd_r;
                if ((sockfd_r = event[n].data.fd) < 0)
                    continue;
                read(sockfd_r, buffer, MAXSIZE);
                //修改sockfd_r上要處理的事件爲EPOLLOUT
                ev.data.fd = sockfd_r;
                ev.events = EPOLLOUT | EPOLLET;
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)
            }
            else if(event[n].events & EPOLLOUT)
            {
                //若是有數據發送
                int sockfd_w = events[n].data.fd;
                write(sockfd_w, buffer, sizeof(buffer));
                //修改sockfd_w上要處理的事件爲EPOLLIN
                ev.data.fd = sockfd_w;
                ev.events = EPOLLIN | EPOLLET;
                epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev)
            }
            do_use_fd(events[n].data.fd);
        }
    }
複製代碼

簡單說下流程:

  • 監聽到有新鏈接進入了,進行新鏈接的處理;

  • 若是是已經鏈接的用戶,而且收到數據,讀完以後修改sockfd_r上要處理的事件爲EPOLLOUT(可寫);

  • 若是有數據發送,寫完以後,修改sockfd_w上要處理的事件爲EPOLLIN(可讀)

epoll在Java中怎麼去調用的?

基礎知識:

文件描述符:

  • (參考《Unix網絡編程》譯者的註釋)

  • 文件描述符是Unix系統標識文件的int,Unix的哲學一切皆文件,因此各自資源(包括常規意義的文件、目錄、管道、POSIX IPC、socket)均可以當作文件。

Java NIO的世界中,Selector是中央控制器,Buffer是承載數據的容器,而Channel能夠說是最基礎的門面,它是本地I/O設備、網絡I/O的通訊橋樑。

  • 網絡I/O設備:

  • DatagramChannel:讀寫UDP通訊的數據,對應DatagramSocket類

  • SocketChannel:讀寫TCP通訊的數據,對應Socket類

  • ServerSocketChannel:監聽新的TCP鏈接,而且會建立一個可讀寫的SocketChannel,對應ServerSocket類

  • 本地I/O設備:

  • FileChannel:讀寫本地文件的數據,不支持Selector控制,對應File類

①先從最簡單的ServerSocketChannel看起

ServerSocketChannel與ServerSocket同樣是socket監聽器,其主要區別前者能夠運行在非阻塞模式下運行;

// 建立一個ServerSocketChannel,將會關聯一個未綁定的ServerSocket
    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }
複製代碼

ServerSocketChannel的建立也是依賴底層操做系統實現,其實現類主要是ServerSocketChannelImpl,咱們來看看其構造方法

ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
        super(var1);
        // 建立一個文件操做符
        this.fd = Net.serverSocket(true);
        // 獲得文件操做符是索引
        this.fdVal = IOUtil.fdVal(this.fd);
        this.state = 0;
     }
複製代碼

新建一個ServerSocketChannelImpl其本質是在底層操做系統建立了一個fd(即文件描述符),至關於創建了一個用於網絡通訊的通道,調用socket的bind()方法綁定,經過accept()調用操做系統獲取TCP鏈接

public SocketChannel accept() throws IOException {
    // 忽略一些校驗及無關代碼
    ....

    SocketChannelImpl var2 = null;
    // var3的做用主要是說明當前的IO狀態,主要有
    /** * EOF = -1; * UNAVAILABLE = -2; * INTERRUPTED = -3; * UNSUPPORTED = -4; * THROWN = -5; * UNSUPPORTED_CASE = -6; */
    int var3 = 0;
    // 這裏本質也是用fd來獲取鏈接
    FileDescriptor var4 = new FileDescriptor();
    // 用來存儲TCP鏈接的地址信息
    InetSocketAddress[] var5 = new InetSocketAddress[1];

    try {
        // 這裏設置了一箇中斷器,中斷時會將鏈接關閉
        this.begin();
        // 這裏當IO被中斷時,會從新獲取鏈接
        do {
            var3 = this.accept(this.fd, var4, var5);
        } while(var3 == -3 && this.isOpen());
    }finally {
        // 當鏈接被關閉且accept失敗時或拋出AsynchronousCloseException
        this.end(var3 > 0);
        // 驗證鏈接是可用的
        assert IOStatus.check(var3);
    }

    if (var3 < 1) {
        return null;
    } {
        // 默認鏈接是阻塞的
        IOUtil.configureBlocking(var4, true);
        // 建立一個SocketChannel的引用
        var2 = new SocketChannelImpl(this.provider(), var4, var5[0]);
        // 下面是是否鏈接成功校驗,這裏忽略...

        return var2;
    }
}

// 依賴底層操做系統實現的accept0方法
private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException {
    return this.accept0(var1, var2, var3);
}
複製代碼

②SocketChannel

用於讀寫TCP通訊的數據,至關於客戶端

  1. 經過open方法建立SocketChannel,

  2. 而後利用connect方法來和服務端發起創建鏈接,還支持了一些判斷鏈接創建狀況的方法;

  3. read和write支持最基本的讀寫操做

open

public static SocketChannel open() throws IOException {
        return SelectorProvider.provider().openSocketChannel();
    }
    public SocketChannel openSocketChannel() throws IOException {
        return new SocketChannelImpl(this);
    }
    // State, increases monotonically
    private static final int ST_UNINITIALIZED = -1;
    private static final int ST_UNCONNECTED = 0;
    private static final int ST_PENDING = 1;
    private static final int ST_CONNECTED = 2;
    private static final int ST_KILLPENDING = 3;
    private static final int ST_KILLED = 4;
    private int state = ST_UNINITIALIZED;    
    SocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        // 建立一個scoket通道,即fd(fd的做用可參考上面的描述)
        this.fd = Net.socket(true);
        // 獲得該fd的索引
        this.fdVal = IOUtil.fdVal(fd);
        // 設置爲未鏈接
        this.state = ST_UNCONNECTED;
    }
複製代碼

connect創建鏈接

// 代碼均來自JDK1.8 部分代碼
    public boolean connect(SocketAddress var1) throws IOException {
        boolean var2 = false;
        // 讀寫都鎖住
        synchronized(this.readLock) {
            synchronized(this.writeLock) {
                 /****狀態檢查,channel和address****/
                // 判斷channel是否open
                this.ensureOpenAndUnconnected();
                InetSocketAddress var5 = Net.checkAddress(var1);
                SecurityManager var6 = System.getSecurityManager();
                if (var6 != null) {
                    var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort());
                }

                boolean var10000;
                 /****鏈接創建****/
                // 阻塞狀態變動的鎖也鎖住
                synchronized(this.blockingLock()) {
                    int var8 = 0;

                    try {
                        try {
                            this.begin(); 
                            // 若是當前socket未綁定本地端口,則嘗試着判斷和服務端是否能創建鏈接
                            synchronized(this.stateLock) {
                                if (!this.isOpen()) {
                                    boolean var10 = false;
                                    return var10;
                                }

                                if (this.localAddress == null) {
                                  // 和遠程創建鏈接後關閉鏈接
                                   NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort());
                                }

                                this.readerThread = NativeThread.current();
                            }

                            do {
                                InetAddress var9 = var5.getAddress();
                                if (var9.isAnyLocalAddress()) {
                                    var9 = InetAddress.getLocalHost();
                                }
                                // 創建鏈接
                                var8 = Net.connect(this.fd, var9, var5.getPort());
                            } while(var8 == -3 && this.isOpen());
                    synchronized(this.stateLock) {
                        this.remoteAddress = var5;
                        if (var8 <= 0) {
                            if (!this.isBlocking()) {
                                this.state = 1;
                            } else {
                                assert false;
                            }
                        } else {
                            this.state = 2;// 鏈接成功
                            if (this.isOpen()) {
                                this.localAddress = Net.localAddress(this.fd);
                            }

                            var10000 = true;
                            return var10000;
                        }
                    }
                }

                var10000 = false;
                return var10000;
            }
        }
    }
複製代碼

在創建在綁定地址以前,咱們須要調用NetHooks.beforeTcpBind,這個方法是將fd轉換爲SDP(Sockets Direct Protocol,Java套接字直接協議) socket。SDP須要網卡支持InfiniBand高速網絡通訊技術,windows不支持該協議。

咱們來看看在openjdk: src\solaris\classes\sun\net下的NetHooks.java

private static final Provider provider = new sun.net.sdp.SdpProvider();

    public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpBind(fdObj, address, port);
    }
    public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpConnect(fdObj, address, port);
    }
複製代碼

能夠看到實際是調用的SdpProvider裏的implBeforeTcpBind

@Override
    public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException {
        if (enabled)
            convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);
    }
  // converts unbound TCP socket to a SDP socket if it matches the rules
    private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port) throws IOException {
        boolean matched = false;
        // 主要是先經過規則校驗器判斷入參是否符合,通常有PortRangeRule校驗器
        // 而後再執行將fd轉換爲socket
        for (Rule rule: rules) {
            if (rule.match(action, address, port)) {
                SdpSupport.convertSocket(fdObj);
                matched = true;
                break;
            }
        }

    }
    public static void convertSocket(FileDescriptor fd) throws IOException {
      ...
      //獲取fd索引
      int fdVal = fdAccess.get(fd);
      convert0(fdVal);
    }


    // convert0
   JNIEXPORT void JNICALL Java_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd) {
    // create方法實際是經過socket(AF_INET_SDP, SOCK_STREAM, 0);方法獲得一個socket
    int s = create(env);

    if (s >= 0) {
        socklen_t len;
        int arg, res;
        struct linger linger;

        /* copy socket options that are relevant to SDP */
        len = sizeof(arg);
        // 重用TIME_WAIT的端口
        if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0)
            setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len);
        len = sizeof(arg);
        // 緊急數據放入普通數據流
        if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0)
            setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len);
        len = sizeof(linger);
        // 延遲關閉鏈接
        if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0)
            setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len);

        // 將fd也引用到s所持有的通道
        RESTARTABLE(dup2(s, fd), res);
        if (res < 0)
            JNU_ThrowIOExceptionWithLastError(env, "dup2");
        // 執行close方法,關閉s這個引用
        RESTARTABLE(close(s), res);
    }
  }
複製代碼

read 讀

public int read(ByteBuffer var1) throws IOException {
            // 省略一些判斷
            synchronized(this.readLock) {
                  this.begin();
                  synchronized(this.stateLock) {
                                do {
                                // 經過IOUtil的讀取fd的數據至buf
                                // 這裏的nd是SocketDispatcher,用於調用底層的read和write操做
                                    var3 = IOUtil.read(this.fd, var1, -1L, nd);
                                } while(var3 == -3 && this.isOpen());
                                // 這個方法主要是將UNAVAILABLE(原爲-2)這個狀態返回0,不然返回n
                                var4 = IOStatus.normalize(var3);
                                var20 = false;
                                break label367;
                            }

                             this.readerCleanup();
                             assert IOStatus.check(var3);
                        }    
            }
        }
    }
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1.isReadOnly()) {
            throw new IllegalArgumentException("Read-only buffer");
        } else if (var1 instanceof DirectBuffer) {
            return readIntoNativeBuffer(var0, var1, var2, var4);
        } else {
    // 臨時緩衝區,大小爲buf的remain(limit - position),堆外內存,使用ByteBuffer.allocateDirect(size)分配
    // Notes:這裏分配後後面有個try-finally塊會釋放該部份內存
            ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

            int var7;
            try {
                // 將網絡中的buf讀進direct buffer
                int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                var5.flip();// 待讀取
                if (var6 > 0) {
                    var1.put(var5);// 成功時寫入
                }

                var7 = var6;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var5);
            }

            return var7;
        }
    }
private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
            // 忽略變量init
            if (var2 != -1L) {
                // pread方法只有在同步狀態下才能使用
                var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);
            } else {
                // 其調用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法
                var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);
            }

            if (var9 > 0) {
                var1.position(var5 + var9);
            }

            return var9;
        }
    }
// 一樣找到openjdk:src\solaris\native\sun\nio\ch 
//FileDispatcherImpl.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,
                             jobject fdo, jlong address, jint len)
{
    jint fd = fdval(env, fdo);// 獲取fd索引
    void *buf = (void *)jlong_to_ptr(address);
    // 調用底層read方法
    return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}
複製代碼

總結一下讀取的過程

  1. 初始化一個direct buffer,若是自己的buffer就是direct的則不用初始化

  2. 調用底層read方法寫入至direct buffer

  3. 最終將direct buffer寫到傳入的buffer對象

write 寫

看完了前面的read,write整個執行流程基本同樣,具體的細節參考以下

public int write(ByteBuffer var1) throws IOException {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            synchronized(this.writeLock) {
                this.ensureWriteOpen();
                        this.begin();
                        synchronized(this.stateLock) {
                            if (!this.isOpen()) {
                                var5 = 0;
                                var20 = false;
                                break label310;
                            }
                            this.writerThread = NativeThread.current();
                        }
                        do {
                            // 經過IOUtil的讀取fd的數據至buf
                            // 這裏的nd是SocketDispatcher,用於調用底層的read和write操做
                            var3 = IOUtil.write(this.fd, var1, -1L, nd);
                        } while(var3 == -3 && this.isOpen());

                        var4 = IOStatus.normalize(var3);
                        var20 = false;
                    this.writerCleanup();
                    assert IOStatus.check(var3);
                    return var4;
                }
            }
        }
    }
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {

            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);

            int var10;
            try {
                // 這裏的pos爲buf初始的position,意思是將buf重置爲最初的狀態;由於目前尚未真實的寫入到channel中
                var8.put(var1);
                var8.flip();
                var1.position(var5);
                // 調用
                int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                if (var9 > 0) {
                    var1.position(var5 + var9);
                }

                var10 = var9;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var8);
            }

            return var10;
        }
    }
IOUtil.writeFromNativeBuffer(fd , buf , position , nd)
{
    // ... 忽略一些獲取buf變量的代碼 
    int written = 0;
    if (position != -1) {
        // pread方法只有在同步狀態下才能使用
        written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position);
    } else {
        // 其調用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法
        written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    //....
}
FileDispatcherImpl.write0
{
    // 調用底層的write方法寫入
    return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);
}
}
複製代碼

總結一下write的過程:

  1. 若是buf是direct buffer則直接開始寫入,不然須要初始化一個direct buffer,大小是buf的remain

  2. 將buf的內容寫入到direct buffer中,並恢復buf的position

  3. 調用底層的write方法寫入至channel

  4. 更新buf的position,即被direct buffer讀取內容後的position

耐心一點,立刻就到Epoll了

理解了前面的一些基礎知識,接下來的部分就會涉及到Java是怎麼樣來使用epoll的。

Selector簡述

Selector的做用是Java NIO中管理一組多路複用的SelectableChannel對象,並可以識別通道是否爲諸如讀寫事件作好準備的組件 --Java doc

image.png

Selector的建立過程以下:

// 1.建立Selector
Selector selector = Selector.open();

// 2.將Channel註冊到選擇器中
// ....... new channel的過程 ....

//Notes:channel要註冊到Selector上就必須是非阻塞的,因此FileChannel是不能夠
//使用Selector的,由於FileChannel是阻塞的
channel.configureBlocking(false);

// 第二個參數指定了咱們對 Channel 的什麼類型的事件感興趣
SelectionKey key = channel.register(selector , SelectionKey.OP_READ);

// 也可使用或運算|來組合多個事件,例如
SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);

// 不過值得注意的是,一個 Channel 僅僅能夠被註冊到一個 Selector 一次,
// 若是將 Channel 註冊到 Selector 屢次, 那麼其實就是至關於更新 SelectionKey 
//的 interest set.
複製代碼

①一個Channel在Selector註冊其表明的是一個SelectionKey事件,SelectionKey的類型包括:

  • OP_READ:可讀事件;值爲:1<<0

  • OP_WRITE:可寫事件;值爲:1<<2

  • OP_CONNECT:客戶端鏈接服務端的事件(tcp鏈接),通常爲建立SocketChannel客戶端channel;值爲:1<<3

  • OP_ACCEPT:服務端接收客戶端鏈接的事件,通常爲建立ServerSocketChannel服務端channel;值爲:1<<4

②一個Selector內部維護了三組keys:

  1. key set:當前channel註冊在Selector上全部的key;可調用keys()獲取

  2. selected-key set:當前channel就緒的事件;可調用selectedKeys()獲取

  3. cancelled-key:主動觸發SelectionKey#cancel()方法會放在該集合,前提條件是該channel沒有被取消註冊;不可經過外部方法調用

③Selector類中總共包含如下10個方法:

  • open():建立一個Selector對象

  • isOpen():是不是open狀態,若是調用了close()方法則會返回false

  • provider():獲取當前Selector的Provider

  • keys():如上文所述,獲取當前channel註冊在Selector上全部的key

  • selectedKeys():獲取當前channel就緒的事件列表

  • selectNow():獲取當前是否有事件就緒,該方法當即返回結果,不會阻塞;若是返回值>0,則表明存在一個或多個

  • select(long timeout):selectNow的阻塞超時方法,超時時間內,有事件就緒時纔會返回;不然超過期間也會返回

  • select():selectNow的阻塞方法,直到有事件就緒時纔會返回

  • wakeup():調用該方法會時,阻塞在select()處的線程會立馬返回;(ps:下面一句劃重點)即便當前不存在線程阻塞在select()處,那麼下一個執行select()方法的線程也會當即返回結果,至關於執行了一次selectNow()方法

  • close(): 用完Selector後調用其close()方法會關閉該Selector,且使註冊到該Selector上的全部SelectionKey實例無效。channel自己並不會關閉。

關於SelectionKey

談到Selector就不得不提SelectionKey,二者是緊密關聯,配合使用的;如上文所示,往Channel註冊Selector會返回一個SelectionKey對象, 這個對象包含了以下內容:

  • interest set,當前Channel感興趣的事件集,即在調用register方法設置的interes set

  • ready set

  • channel

  • selector

  • attached object,可選的附加對象

①interest set 能夠經過SelectionKey類中的方法來獲取和設置interes set

// 返回當前感興趣的事件列表
int interestSet = key.interestOps();

// 也可經過interestSet判斷其中包含的事件
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;    

// 能夠經過interestOps(int ops)方法修改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);
複製代碼

②ready set 當前Channel就緒的事件列表

int readySet = key.readyOps();

// 也可經過四個方法來分別判斷不一樣事件是否就緒
key.isReadable();    //讀事件是否就緒
key.isWritable();    //寫事件是否就緒
key.isConnectable(); //客戶端鏈接事件是否就緒
key.isAcceptable();  //服務端鏈接事件是否就緒
複製代碼

③channel和selector 咱們能夠經過SelectionKey來獲取當前的channel和selector

// 返回當前事件關聯的通道,可轉換的選項包括:`ServerSocketChannel`和`SocketChannel`
Channel channel = key.channel();

//返回當前事件所關聯的Selector對象
Selector selector = key.selector();

複製代碼

attached object 咱們能夠在selectionKey中附加一個對象,或者在註冊時直接附加:

key.attach(theObject);
Object attachedObj = key.attachment();
// 在註冊時直接附加
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
複製代碼

萬丈高樓平地起,基礎知識差很少了,瞭解了這些,能夠找一些nio demo或者netty demo練練手。接下來說解本節比較重要的~epoll

前面屢次提到了openjdk,seletor的具體實現確定是跟操做系統有關的,咱們一塊兒來看看。

image.png

能夠看到Selector的實現是SelectorImpl, 而後SelectorImpl又將職責委託給了具體的平臺,好比圖中的linux2.6 EpollSelectorImpl,windows是WindowsSelectorImpl,MacOSX是KQueueSelectorImpl

根據前面咱們知道,Selector.open()能夠獲得一個Selector實例,怎麼實現的呢?

// Selector.java
public static Selector open() throws IOException {
    // 首先找到provider,而後再打開Selector
    return SelectorProvider.provider().openSelector();
}

// java.nio.channels.spi.SelectorProvider
    public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                            // 這裏就是打開Selector的真正方法
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
複製代碼

在openjdk中,每一個操做系統都有一個sun.nio.ch.DefaultSelectorProvider實現,以srcsolaris\classes\sun\nio\ch下的DefaultSelectorProvider爲例:

/** * Returns the default SelectorProvider. */
public static SelectorProvider create() {
    // 獲取OS名稱
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    // 根據名稱來建立不一樣的Selctor
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}
複製代碼

打開srcsolaris\classes\sun\nio\ch下的EPollSelectorProvider.java

public class EPollSelectorProvider extends SelectorProviderImpl {
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}
複製代碼

Linux平臺就獲得了最終的Selector實現:srcsolaris\classes\sun\nio\ch下的EPollSelectorImpl.java

來看看它實現的構造器:

EPollSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        // makePipe返回管道的2個文件描述符,編碼在一個long類型的變量中
        // 高32位表明讀 低32位表明寫
        // 使用pipe爲了實現Selector的wakeup邏輯
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        // 新建一個EPollArrayWrapper
        pollWrapper = new EPollArrayWrapper();
        pollWrapper.initInterrupt(fd0, fd1);
        fdToKey = new HashMap<>();
    }
複製代碼

\src\solaris\native\sun\nio\ch下的EPollArrayWrapper.c

JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) {
    /* * epoll_create expects a size as a hint to the kernel about how to * dimension internal structures. We can't predict the size in advance. */
    int epfd = epoll_create(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}
複製代碼

①epoll_create在前面已經講過了,這裏就再也不贅述了。

②epoll wait 等待內核IO事件

調用Selector.select(返回鍵的數量,多是零)最後會委託給各個實現的doSelect方法,限於篇幅不貼出太詳細的,這裏看下EpollSelectorImpl的doSelect方法

protected int doSelect(long timeout) throws IOException {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            //EPollArrayWrapper pollWrapper
            pollWrapper.poll(timeout);//重點在這裏
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();// 後面會講到
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }
int poll(long timeout) throws IOException {
    updateRegistrations();// 這個代碼在下面講,涉及到epoo_ctl
    // 這個epollWait是否是有點熟悉呢?
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
複製代碼

看下EPollArrayWrapper.c

JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        //系統調用等待內核事件
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}
複製代碼

能夠看到在linux中Selector.select()實際上是調用了epoll_wait

③epoll control以及openjdk對事件管理的封裝

JDK中對於註冊到Selector上的IO事件關係是使用SelectionKey來表示,表明了Channel感興趣的事件,如Read,Write,Connect,Accept.

調用Selector.register()時均會將事件存儲到EpollArrayWrapper.java的成員變量eventsLow和eventsHigh中

// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用數組保存事件變動, 數組的最大長度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超過數組長度的事件會緩存到這個map中,等待下次處理
private Map<Integer,Byte> eventsHigh;


/** * Sets the pending update events for the given file descriptor. This * method has no effect if the update events is already set to KILLED, * unless {@code force} is {@code true}. */
private void setUpdateEvents(int fd, byte events, boolean force) {
    // 判斷fd和數組長度
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        if ((eventsLow[fd] != KILLED) || force) {
            eventsLow[fd] = events;
        }
    } else {
        Integer key = Integer.valueOf(fd);
        if (!isEventsHighKilled(key) || force) {
            eventsHigh.put(key, Byte.valueOf(events));
        }
    }
}
  /** * Returns the pending update events for the given file descriptor. */
    private byte getUpdateEvents(int fd) {
        if (fd < MAX_UPDATE_ARRAY_SIZE) {
            return eventsLow[fd];
        } else {
            Byte result = eventsHigh.get(Integer.valueOf(fd));
            // result should never be null
            return result.byteValue();
        }
  
複製代碼

在上面poll代碼中涉及到

int poll(long timeout) throws IOException {
    updateRegistrations();/

   /**
     * Update the pending registrations.
     */
    private void updateRegistrations() {
        synchronized (updateLock) {
            int j = 0;
            while (j < updateCount) {
                int fd = updateDescriptors[j];
                // 從保存的eventsLow和eventsHigh裏取出事件
                short events = getUpdateEvents(fd);
                boolean isRegistered = registered.get(fd);
                int opcode = 0;

                if (events != KILLED) {
                    if (isRegistered) {
                        // 判斷操做類型以傳給epoll_ctl
                        // 沒有指定EPOLLET事件類型
                        opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                    } else {
                        opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                    }
                    if (opcode != 0) {
                         // 熟悉的epoll_ctl
                        epollCtl(epfd, opcode, fd, events);
                        if (opcode == EPOLL_CTL_ADD) {
                            registered.set(fd);
                        } else if (opcode == EPOLL_CTL_DEL) {
                            registered.clear(fd);
                        }
                    }
                }
                j++;
            }
            updateCount = 0;
        }
  private native void epollCtl(int epfd, int opcode, int fd, int events);
複製代碼

能夠看到epollCtl調用的native方法,咱們進入EpollArrayWrapper.c

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;
    // epoll_ctl這裏就不用多說了吧
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

    /*
     * A channel may be registered with several Selectors. When each Selector
     * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
     * list to remove the file descriptor from epoll. The "last" Selector will
     * close the file descriptor which automatically unregisters it from each
     * epoll descriptor. To avoid costly synchronization between Selectors we
     * allow pending updates to be processed, ignoring errors. The errors are
     * harmless as the last update for the file descriptor is guaranteed to
     * be EPOLL_CTL_DEL.
     */
    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }
}
複製代碼

在doSelect方法poll執行後,會更新EpollSelectorImpl.java裏的 updateSelectedKeys,就是Selector裏的三個set集合,具體可看前面。

/**

*更新已被epoll選擇fd的鍵。

*將就緒興趣集添加到就緒隊列。

*/
private int updateSelectedKeys() {
        int entries = pollWrapper.updated;
        int numKeysUpdated = 0;
        for (int i=0; i<entries; i++) {
            int nextFD = pollWrapper.getDescriptor(i);
            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
            // ski is null in the case of an interrupt
            if (ski != null) {
                int rOps = pollWrapper.getEventOps(i);
                if (selectedKeys.contains(ski)) {
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }
複製代碼

總結

經過本文,你應該知道Channel、Selector基本原理和在Java中怎麼使用Epoll的。 (包括更細節的fd與channel和socket之間的轉換關係)掌握這些基礎知識,再去看NIO、netty網絡框架的源碼可能就沒有那麼吃力了。在接下來的文章裏我會跟進關於Netty的文章,畢竟這已成爲分佈式網絡通訊框架的主流了!

感謝

zh.wikipedia.org/wiki/Epoll 維基百科

baike.baidu.com/item/epoll/…

juejin.cn/post/684490…

www.jianshu.com/p/f26f1eaa7…

真心感謝帥逼靚女們能看到這裏,若是這個文章寫得還不錯,以爲有點東西的話

求點贊👍 求關注❤️ 求分享👥 對8塊腹肌的我來講真的 很是有用!!!

若是本篇博客有任何錯誤,請批評指教,不勝感激 !❤️❤️❤️❤️

相關文章
相關標籤/搜索