Go netpoll I/O 多路複用構建原生網絡模型之源碼深度解析 html
Go 基於 I/O multiplexing 和 goroutine 構建了一個簡潔而高性能的原生網絡模型(基於 Go 的I/O 多路複用 netpoll
),提供了 goroutine-per-connection
這樣簡單的網絡編程模式。在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯,極大地下降了開發者編寫網絡應用時的心智負擔,且藉助於 Go runtime scheduler 對 goroutines 的高效調度,這個原生網絡模型不論從適用性仍是性能上都足以知足絕大部分的應用場景。java
然而,在工程性上能作到如此高的普適性和兼容性,最終暴露給開發者提供接口/模式如此簡潔,其底層必然是基於很是複雜的封裝,作了不少取捨,也有可能放棄了一些『極致』的設計和理念。事實上netpoll
底層就是基於 epoll/kqueue/iocp 這些系統調用來作封裝的,最終暴露出 goroutine-per-connection
這樣的極簡的開發模式給使用者。react
Go netpoll 在不一樣的操做系統,其底層使用的 I/O 多路複用技術也不同,能夠從 Go 源碼目錄結構和對應代碼文件瞭解 Go 在不一樣平臺下的網絡 I/O 模式的實現。好比,在 Linux 系統下基於 epoll,freeBSD 系統下基於 kqueue,以及 Windows 系統下基於 iocp。linux
本文將基於 linux 平臺來解析 Go netpoll 之 I/O 多路複用的底層是如何基於 epoll 封裝實現的,從源碼層層推動,全面而深度地解析 Go netpoll 的設計理念和實現原理,以及 Go 是如何利用netpoll
來構建它的原生網絡模型的。主要涉及到的一些概念:I/O 模式、用戶/內核空間、epoll、linux 源碼、goroutine scheduler 等等,我會盡可能簡單地講解,若是有對相關概念不熟悉的同窗,仍是但願能提早熟悉一下。git
如今操做系統都是採用虛擬存儲器,那麼對 32 位操做系統而言,它的尋址空間(虛擬存儲空間)爲 4G(2 的 32 次方)。操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護的內存空間,也有訪問底層硬件設備的全部權限。爲了保證用戶進程不能直接操做內核(kernel),保證內核的安全,操心繫統將虛擬空間劃分爲兩部分,一部分爲內核空間,一部分爲用戶空間。針對 linux 操做系統而言,將最高的 1G 字節(從虛擬地址 0xC0000000 到 0xFFFFFFFF),供內核使用,稱爲內核空間,而將較低的 3G 字節(從虛擬地址 0x00000000 到0xBFFFFFFF),供各個進程使用,稱爲用戶空間。程序員
在神做《UNIX 網絡編程》裏,總結概括了 5 種 I/O 模型,包括同步和異步 I/O:github
操做系統上的 I/O 是用戶空間和內核空間的數據交互,所以 I/O 操做一般包含如下兩個步驟:golang
而斷定一個 I/O 模型是同步仍是異步,主要看第二步:數據在用戶和內核空間之間複製的時候是否是會阻塞當前進程,若是會,則是同步 I/O,不然,就是異步 I/O。基於這個原則,這 5 種 I/O 模型中只有一種異步 I/O 模型:Asynchronous I/O,其他都是同步 I/O 模型。算法
這 5 種 I/O 模型的對好比下:編程
所謂 I/O 多路複用指的就是 select/poll/epoll 這一系列的多路選擇器:支持單一線程同時監聽多個文件描述符(I/O事件),阻塞等待,並在其中某個文件描述符可讀寫時收到通知。 I/O 複用其實複用的不是 I/O 鏈接,而是複用線程,讓一個 thread of control 可以處理多個鏈接(I/O 事件)。
#include <sys/select.h> /* According to earlier standards */ #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); // 和 select 緊密結合的四個宏: void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);
select 是 epoll 以前 linux 使用的 I/O 事件驅動技術。
理解 select 的關鍵在於理解 fd_set,爲說明方便,取 fd_set 長度爲 1 字節,fd_set 中的每一 bit 能夠對應一個文件描述符 fd,則 1 字節長的 fd_set 最大能夠對應 8 個 fd。select 的調用過程以下:
0000,0000
0001,0011
0000,0011
(注意:沒有事件發生的 fd=5 被清空)基於上面的調用過程,能夠得出 select 的特色:
因此,select 有以下的缺點:
poll 的實現和 select 很是類似,只是描述 fd 集合的方式不一樣,poll 使用 pollfd 結構而不是 select 的 fd_set 結構,poll 解決了最大文件描述符數量限制的問題,可是一樣須要從用戶態拷貝全部的 fd 到內核態,也須要線性遍歷全部的 fd 集合,因此它和 select 只是實現細節上的區分,並無本質上的區別。
epoll 是 linux kernel 2.6 以後引入的新 I/O 事件驅動技術,I/O 多路複用的核心設計是 1 個線程處理全部鏈接的等待消息準備好
I/O 事件,這一點上 epoll 和 select&poll 是大同小異的。但 select&poll 預估錯誤了一件事,當數十萬併發鏈接存在時,可能每一毫秒只有數百個活躍的鏈接,同時其他數十萬鏈接在這一毫秒是非活躍的。select&poll 的使用方法是這樣的:返回的活躍鏈接 == select(所有待監控的鏈接)
。
何時會調用 select&poll 呢?在你認爲須要找出有報文到達的活躍鏈接時,就應該調用。因此,select&poll 在高併發時是會被頻繁調用的。這樣,這個頻繁調用的方法就頗有必要看看它是否有效率,由於,它的輕微效率損失都會被高頻
二字所放大。它有效率損失嗎?顯而易見,所有待監控鏈接是數以十萬計的,返回的只是數百個活躍鏈接,這自己就是無效率的表現。被放大後就會發現,處理併發上萬個鏈接時,select&poll 就徹底力不從心了。這個時候就該 epoll 上場了,epoll 經過一些新的設計和優化,基本上解決了 select&poll 的問題。
epoll 的 API 很是簡潔,涉及到的只有 3 個系統調用:
#include <sys/epoll.h> int epoll_create(int size); // int epoll_create1(int flags); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
其中,epoll_create 建立一個 epoll 實例並返回 epollfd;epoll_ctl 註冊 file descriptor 等待的 I/O 事件(好比 EPOLLIN、EPOLLOUT 等) 到 epoll 實例上;epoll_wait 則是阻塞監聽 epoll 實例上全部的 file descriptor 的 I/O 事件,它接收一個用戶空間上的一塊內存地址 (events 數組),kernel 會在有 I/O 事件發生的時候把文件描述符列表複製到這塊內存地址上,而後 epoll_wait 解除阻塞並返回,最後用戶空間上的程序就能夠對相應的 fd 進行讀寫了:
#include <unistd.h> ssize_t read(int fd, void *buf, size_t count); ssize_t write(int fd, const void *buf, size_t count);
epoll 的工做原理以下:
與 select&poll 相比,epoll 分清了高頻調用和低頻調用。例如,epoll_ctl 相對來講就是不太頻繁被調用的,而 epoll_wait 則是很是頻繁被調用的。因此 epoll 利用 epoll_ctl 來插入或者刪除一個 fd,實現用戶態到內核態的數據拷貝,這確保了每個 fd 在其生命週期只須要被拷貝一次,而不是每次調用 epoll_wait 的時候都拷貝一次。 epoll_wait 則被設計成幾乎沒有入參的調用,相比 select&poll 須要把所有監聽的 fd 集合從用戶態拷貝至內核態的作法,epoll 的效率就高出了一大截。
在實現上 epoll 採用紅黑樹來存儲全部監聽的 fd,而紅黑樹自己插入和刪除性能比較穩定,時間複雜度 O(logN)。經過 epoll_ctl 函數添加進來的 fd 都會被放在紅黑樹的某個節點內,因此,重複添加是沒有用的。當把 fd 添加進來的時候時候會完成關鍵的一步:該 fd 都會與相應的設備(網卡)驅動程序創建回調關係,也就是在內核中斷處理程序爲它註冊一個回調函數,在 fd 相應的事件觸發(中斷)以後(設備就緒了),內核就會調用這個回調函數,該回調函數在內核中被稱爲:ep_poll_callback
,這個回調函數其實就是把這個 fd 添加到 rdllist 這個雙向鏈表(就緒鏈表)中。epoll_wait 實際上就是去檢查 rdlist 雙向鏈表中是否有就緒的 fd,當 rdlist 爲空(無就緒fd)時掛起當前進程,直到 rdlist 非空時進程才被喚醒並返回。
相比於 select&poll 調用時會將所有監聽的 fd 從用戶態空間拷貝至內核態空間併線性掃描一遍找出就緒的 fd 再返回到用戶態,epoll_wait 則是直接返回已就緒 fd,所以 epoll 的 I/O 性能不會像 select&poll 那樣隨着監聽的 fd 數量增長而出現線性衰減,是一個很是高效的 I/O 事件驅動技術。
因爲使用 epoll 的 I/O 多路複用須要用戶進程本身負責 I/O 讀寫,從用戶進程的角度看,讀寫過程是阻塞的,因此 select&poll&epoll 本質上都是同步 I/O 模型,而像 Windows 的 IOCP 這一類的異步 I/O,只須要在調用 WSARecv 或 WSASend 方法讀寫數據的時候把用戶空間的內存 buffer 提交給 kernel,kernel 負責數據在用戶空間和內核空間拷貝,完成以後就會通知用戶進程,整個過程不須要用戶進程參與,因此是真正的異步 I/O。
另外,我看到有些文章說 epoll 之因此性能高是由於利用了 linux 的 mmap 內存映射讓內核和用戶進程共享了一片物理內存,用來存放就緒 fd 列表和它們的數據 buffer,因此用戶進程在 epoll_wait
返回以後用戶進程就能夠直接從共享內存那裏讀取/寫入數據了,這讓我很疑惑,由於首先看epoll_wait
的函數聲明:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
第二個參數:就緒事件列表,是須要在用戶空間分配內存而後再傳給epoll_wait
的,若是內核會用 mmap 設置共享內存,直接傳遞一個指針進去就好了,根本不須要在用戶態分配內存,畫蛇添足。其次,內核和用戶進程經過 mmap 共享內存是一件極度危險的事情,內核沒法肯定這塊共享內存何時會被回收,並且這樣也會賦予用戶進程直接操做內核數據的權限和入口,很是容易出現大的系統漏洞,所以通常極少會這麼作。因此我很懷疑 epoll 是否是真的在 linux kernel 裏用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源碼:
/* * Implement the event wait interface for the eventpoll file. It is the kernel * part of the user space epoll_wait(2). */ static int do_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout) { // ... /* Time to fish for events ... */ error = ep_poll(ep, events, maxevents, timeout); } // 若是 epoll_wait 入參時設定 timeout == 0, 那麼直接經過 ep_events_available 判斷當前是否有用戶感興趣的事件發生,若是有則經過 ep_send_events 進行處理 // 若是設置 timeout > 0,而且當前沒有用戶關注的事件發生,則進行休眠,並添加到 ep->wq 等待隊列的頭部;對等待事件描述符設置 WQ_FLAG_EXCLUSIVE 標誌 // ep_poll 被事件喚醒後會從新檢查是否有關注事件,若是對應的事件已經被搶走,那麼 ep_poll 會繼續休眠等待 static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { // ... send_events: /* * Try to transfer events to user space. In case we get 0 events and * there's still timeout left over, we go trying again in search of * more luck. */ // 若是一切正常, 有 event 發生, 就開始準備數據 copy 給用戶空間了 // 若是有就緒的事件發生,那麼就調用 ep_send_events 將就緒的事件 copy 到用戶態內存中, // 而後返回到用戶態,不然判斷是否超時,若是沒有超時就繼續等待就緒事件發生,若是超時就返回用戶態。 // 從 ep_poll 函數的實現能夠看到,若是有就緒事件發生,則調用 ep_send_events 函數作進一步處理 if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && !timed_out) goto fetch_events; // ... } // ep_send_events 函數是用來向用戶空間拷貝就緒 fd 列表的,它將用戶傳入的就緒 fd 列表內存簡單封裝到 // ep_send_events_data 結構中,而後調用 ep_scan_ready_list 將就緒隊列中的事件寫入用戶空間的內存; // 用戶進程就能夠訪問到這些數據進行處理 static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { struct ep_send_events_data esed; esed.maxevents = maxevents; esed.events = events; // 調用 ep_scan_ready_list 函數檢查 epoll 實例 eventpoll 中的 rdllist 就緒鏈表, // 並註冊一個回調函數 ep_send_events_proc,若是有就緒 fd,則調用 ep_send_events_proc 進行處理 ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false); return esed.res; } // 調用 ep_scan_ready_list 的時候會傳遞指向 ep_send_events_proc 函數的函數指針做爲回調函數, // 一旦有就緒 fd,就會調用 ep_send_events_proc 函數 static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) { // ... /* * If the event mask intersect the caller-requested one, * deliver the event to userspace. Again, ep_scan_ready_list() * is holding ep->mtx, so no operations coming from userspace * can change the item. */ revents = ep_item_poll(epi, &pt, 1); // 若是 revents 爲 0,說明沒有就緒的事件,跳過,不然就將就緒事件拷貝到用戶態內存中 if (!revents) continue; // 將當前就緒的事件和用戶進程傳入的數據都經過 __put_user 拷貝回用戶空間, // 也就是調用 epoll_wait 之時用戶進程傳入的 fd 列表的內存 if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) { list_add(&epi->rdllink, head); ep_pm_stay_awake(epi); if (!esed->res) esed->res = -EFAULT; return 0; } // ... }
從do_epoll_wait
開始層層跳轉,咱們能夠很清楚地看到最後內核是經過__put_user
函數把就緒 fd 列表和事件返回到用戶空間,而__put_user
正是內核用來拷貝數據到用戶空間的標準函數。此外,我並無在 linux kernel 的源碼中和 epoll 相關的代碼裏找到 mmap 系統調用作內存映射的邏輯,因此基本能夠得出結論:epoll 在 linux kernel 裏並無使用 mmap 來作用戶空間和內核空間的內存共享,因此那些說 epoll 使用了 mmap 的文章都是誤解。
什麼叫非阻塞 I/O,顧名思義就是:全部 I/O 操做都是馬上返回而不會阻塞當前用戶進程。I/O 多路複用一般狀況下須要和非阻塞 I/O 搭配使用,不然可能會產生意想不到的問題。好比,epoll 的 ET(邊緣觸發) 模式下,若是不使用非阻塞 I/O,有極大的機率會致使阻塞 event-loop 線程,從而下降吞吐量,甚至致使 bug。
Linux 下,咱們能夠經過 fcntl
系統調用來設置 O_NONBLOCK
標誌位,從而把 socket 設置成 non-blocking。當對一個 non-blocking socket 執行讀操做時,流程是這個樣子:
當用戶進程發出 read 操做時,若是 kernel 中的數據尚未準備好,那麼它並不會 block 用戶進程,而是馬上返回一個 EAGAIN error。從用戶進程角度講 ,它發起一個 read 操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個 error 時,它就知道數據尚未準備好,因而它能夠再次發送 read 操做。一旦 kernel 中的數據準備好了,而且又再次收到了用戶進程的 system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。
因此,non-blocking I/O 的特色是用戶進程須要不斷的主動詢問 kernel 數據好了沒有。
一個典型的 Go TCP server:
package main import ( "fmt" "net" ) func main() { listen, err := net.Listen("tcp", ":8888") if err != nil { fmt.Println("listen error: ", err) return } for { conn, err := listen.Accept() if err != nil { fmt.Println("accept error: ", err) break } // start a new goroutine to handle the new connection go HandleConn(conn) } } func HandleConn(conn net.Conn) { defer conn.Close() packet := make([]byte, 1024) for { // 若是沒有可讀數據,也就是讀 buffer 爲空,則阻塞 _, _ = conn.Read(packet) // 同理,不可寫則阻塞 _, _ = conn.Write(packet) } }
上面是一個基於 Go 原生網絡模型(基於 netpoll)編寫的一個 TCP server,模式是 goroutine-per-connection
,在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯並且對於開發者來講 I/O 是否阻塞是無感知的,也就是說開發者無需考慮 goroutines 甚至更底層的線程、進程的調度和上下文切換。而 Go netpoll 最底層的事件驅動技術確定是基於 epoll/kqueue/iocp 這一類的 I/O 事件驅動技術,只不過是把這些調度和上下文切換的工做轉移到了 runtime 的 Go scheduler,讓它來負責調度 goroutines,從而極大地下降了程序員的心智負擔!
Go netpoll 核心
Go netpoll 經過在底層對 epoll/kqueue/iocp 的封裝,從而實現了使用同步編程模式達到異步執行的效果。總結來講,全部的網絡操做都以網絡描述符 netFD 爲中心實現。netFD 與底層 PollDesc 結構綁定,當在一個 netFD 上讀寫遇到 EAGAIN 錯誤時,就將當前 goroutine 存儲到這個 netFD 對應的 PollDesc 中,同時調用 gopark 把當前 goroutine 給 park 住,直到這個 netFD 上再次發生讀寫事件,纔將此 goroutine 給 ready 激活從新運行。顯然,在底層通知 goroutine 再次發生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅動機制。
接下來咱們經過分析最新的 Go 源碼(v1.13.4),解讀一下整個 netpoll 的運行流程。
上面的示例代碼中相關的在源碼裏的幾個數據結構和方法:
// TCPListener is a TCP network listener. Clients should typically // use variables of type Listener instead of assuming TCP. type TCPListener struct { fd *netFD lc ListenConfig } // Accept implements the Accept method in the Listener interface; it // waits for the next call and returns a generic Conn. func (l *TCPListener) Accept() (Conn, error) { if !l.ok() { return nil, syscall.EINVAL } c, err := l.accept() if err != nil { return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err} } return c, nil } func (ln *TCPListener) accept() (*TCPConn, error) { fd, err := ln.fd.accept() if err != nil { return nil, err } tc := newTCPConn(fd) if ln.lc.KeepAlive >= 0 { setKeepAlive(fd, true) ka := ln.lc.KeepAlive if ln.lc.KeepAlive == 0 { ka = defaultTCPKeepAlive } setKeepAlivePeriod(fd, ka) } return tc, nil } // TCPConn is an implementation of the Conn interface for TCP network // connections. type TCPConn struct { conn } // Conn type conn struct { fd *netFD } type conn struct { fd *netFD } func (c *conn) ok() bool { return c != nil && c.fd != nil } // Implementation of the Conn interface. // Read implements the Conn Read method. func (c *conn) Read(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } n, err := c.fd.Read(b) if err != nil && err != io.EOF { err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err} } return n, err } // Write implements the Conn Write method. func (c *conn) Write(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } n, err := c.fd.Write(b) if err != nil { err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err} } return n, err }
net.Listen("tcp", ":8888")
方法返回了一個 TCPListener,它是一個實現了 net.Listener
接口的 struct,而經過 listen.Accept()
接收的新鏈接 TCPConn 則是一個實現了 net.Conn
接口的 struct,它內嵌了 net.conn
struct。仔細閱讀上面的源碼能夠發現,無論是 Listener 的 Accept 仍是 Conn 的 Read/Write 方法,都是基於一個 netFD
的數據結構的操做,netFD
是一個網絡描述符,相似於 Linux 的文件描述符的概念,netFD 中包含一個 poll.FD 數據結構,而 poll.FD 中包含兩個重要的數據結構 Sysfd 和 pollDesc,前者是真正的系統文件描述符,後者對是底層事件驅動的封裝,全部的讀寫超時等操做都是經過調用後者的對應方法實現的。
netFD
和poll.FD
的源碼:
// Network file descriptor. type netFD struct { pfd poll.FD // immutable until Close family int sotype int isConnected bool // handshake completed or use of association with peer net string laddr Addr raddr Addr } // FD is a file descriptor. The net and os packages use this type as a // field of a larger type representing a network connection or OS file. type FD struct { // Lock sysfd and serialize access to Read and Write methods. fdmu fdMutex // System file descriptor. Immutable until Close. Sysfd int // I/O poller. pd pollDesc // Writev cache. iovecs *[]syscall.Iovec // Semaphore signaled when file is closed. csema uint32 // Non-zero if this file has been set to blocking mode. isBlocking uint32 // Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. Immutable. IsStream bool // Whether a zero byte read indicates EOF. This is false for a // message based socket connection. ZeroReadIsEOF bool // Whether this is a file rather than a network socket. isFile bool }
前面提到了 pollDesc 是底層事件驅動的封裝,netFD 經過它來完成各類 I/O 相關的操做,它的定義以下:
type pollDesc struct { runtimeCtx uintptr }
這裏的 struct 只包含了一個指針,而經過 pollDesc 的 init 方法,咱們能夠找到它具體的定義是在runtime.pollDesc
這裏:
func (pd *pollDesc) init(fd *FD) error { serverInit.Do(runtime_pollServerInit) ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) if errno != 0 { if ctx != 0 { runtime_pollUnblock(ctx) runtime_pollClose(ctx) } return syscall.Errno(errno) } pd.runtimeCtx = ctx return nil } // Network poller descriptor. // // No heap pointers. // //go:notinheap type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification) // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated // in a lock-free way by all operations. // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg), // that will blow up when GC starts moving objects. lock mutex // protects the following fields fd uintptr closing bool everr bool // marks event scanning error happened user uint32 // user settable cookie rseq uintptr // protects from stale read timers rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wseq uintptr // protects from stale write timers wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline }
runtime.pollDesc
包含自身類型的一個指針,用來保存下一個runtime.pollDesc
的地址,以此來實現鏈表,能夠減小數據結構的大小,全部的runtime.pollDesc
保存在runtime.pollCache
結構中,定義以下:
type pollCache struct { lock mutex first *pollDesc // PollDesc objects must be type-stable, // because we can get ready notification from epoll/kqueue // after the descriptor is closed/reused. // Stale notifications are detected using seq variable, // seq is incremented when deadlines are changed or descriptor is reused. }
調用 net.Listen
以後,底層會經過 Linux 的系統調用socket
方法建立一個 fd 分配給 listener,並用以來初始化 listener 的 netFD
,接着調用 netFD 的listenStream
方法完成對 socket 的 bind&listen 操做以及對 netFD
的初始化(主要是對 netFD 裏的 pollDesc 的初始化),相關源碼以下:
// 調用 linux 系統調用 socket 建立 listener fd 並設置爲爲阻塞 I/O s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto) // On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were // introduced in 2.6.27 kernel and on FreeBSD both flags were // introduced in 10 kernel. If we get an EINVAL error on Linux // or EPROTONOSUPPORT error on FreeBSD, fall back to using // socket without them. socketFunc func(int, int, int) (int, error) = syscall.Socket // 用上面建立的 listener fd 初始化 listener netFD if fd, err = newFD(s, family, sotype, net); err != nil { poll.CloseFunc(s) return nil, err } // 對 listener fd 進行 bind&listen 操做,而且調用 init 方法完成初始化 func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error { // ... // 完成綁定操做 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } // 完成監聽操做 if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil { return os.NewSyscallError("listen", err) } // 調用 init,內部會調用 poll.FD.Init,最後調用 pollDesc.init if err = fd.init(); err != nil { return err } lsa, _ = syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil } // 使用 sync.Once 來確保一個 listener 只持有一個 epoll 實例 var serverInit sync.Once // netFD.init 會調用 poll.FD.Init 並最終調用到 pollDesc.init, // 它會建立 epoll 實例並把 listener fd 加入監聽隊列 func (pd *pollDesc) init(fd *FD) error { // runtime_pollServerInit 內部調用了 netpollinit 來建立 epoll 實例 serverInit.Do(runtime_pollServerInit) // runtime_pollOpen 內部調用了 netpollopen 來將 listener fd 註冊到 // epoll 實例中,另外,它會初始化一個 pollDesc 並返回 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) if errno != 0 { if ctx != 0 { runtime_pollUnblock(ctx) runtime_pollClose(ctx) } return syscall.Errno(errno) } // 把真正初始化完成的 pollDesc 實例賦值給當前的 pollDesc 表明自身的指針, // 後續使用直接經過該指針操做 pd.runtimeCtx = ctx return nil } // netpollopen 會被 runtime_pollOpen,註冊 fd 到 epoll 實例, // 同時會利用萬能指針把 pollDesc 保存到 epollevent 的一個 8 位的字節數組 data 裏 func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) }
咱們前面提到的 epoll 的三個基本調用,Go 在源碼裏實現了對那三個調用的封裝:
#include <sys/epoll.h> int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); // Go 對上面三個調用的封裝 func netpollinit() func netpollopen(fd uintptr, pd *pollDesc) int32 func netpoll(block bool) gList
netFD 就是經過這三個封裝來對 epoll 進行建立實例、註冊 fd 和等待事件操做的。
netpoll
accept socket 的工做流程以下:
listen
時會建立 epoll 的實例,並將 listenerFD 加入 epoll 的事件隊列accept
時將返回的 connFD 也加入 epoll 的事件隊列syscall.EAGAIN
錯誤,經過 pollDesc 的 waitRead
方法將當前的 goroutine park 住,直到 ready,從 pollDesc 的waitRead
中返回Listener.Accept()
接收來自客戶端的新鏈接,具體仍是調用netFD.accept
方法來完成這個功能:
// Accept implements the Accept method in the Listener interface; it // waits for the next call and returns a generic Conn. func (l *TCPListener) Accept() (Conn, error) { if !l.ok() { return nil, syscall.EINVAL } c, err := l.accept() if err != nil { return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err} } return c, nil } func (ln *TCPListener) accept() (*TCPConn, error) { fd, err := ln.fd.accept() if err != nil { return nil, err } tc := newTCPConn(fd) if ln.lc.KeepAlive >= 0 { setKeepAlive(fd, true) ka := ln.lc.KeepAlive if ln.lc.KeepAlive == 0 { ka = defaultTCPKeepAlive } setKeepAlivePeriod(fd, ka) } return tc, nil }
而netFD.accept
方法裏再調用poll.FD.Accept
,最後會使用 linux 的系統調用accept
來完成新鏈接的接收,而且會把 accept 的 socket 設置成非阻塞 I/O 模式:
// Accept wraps the accept network call. func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { if err := fd.readLock(); err != nil { return -1, nil, "", err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for { // 使用 linux 系統調用 accept 接收新鏈接,建立對應的 socket s, rsa, errcall, err := accept(fd.Sysfd) // 由於 listener fd 在建立的時候已經設置成非阻塞的了, // 因此 accept 方法會直接返回,無論有沒有新鏈接到來;若是 err == nil 則表示正常創建新鏈接,直接返回 if err == nil { return s, rsa, "", err } // 若是 err != nil,則判斷 err == syscall.EAGAIN,符合條件則進入 pollDesc.waitRead 方法 switch err { case syscall.EAGAIN: if fd.pd.pollable() { // 若是當前沒有發生期待的 I/O 事件,那麼 waitRead 會經過 park goroutine 讓邏輯 block 在這裏 if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue } return -1, nil, errcall, err } } // 使用 linux 的 accept 系統調用接收新鏈接並把這個 socket fd 設置成非阻塞 I/O ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) // On Linux the accept4 system call was introduced in 2.6.28 // kernel and on FreeBSD it was introduced in 10 kernel. If we // get an ENOSYS error on both Linux and FreeBSD, or EINVAL // error on Linux, fall back to using accept. // Accept4Func is used to hook the accept4 call. var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4
pollDesc.waitRead
方法主要負責檢測當前這個 pollDesc 的上層 netFD 對應的 fd 是否有『期待的』I/O 事件發生,若是有就直接返回,不然就 park 住當前的 goroutine 並持續等待直至對應的 fd 上發生可讀/可寫或者其餘『期待的』I/O 事件爲止,而後它就會返回到外層的 for 循環,讓 goroutine 繼續執行邏輯。
咱們先來看看Conn.Read
方法是如何實現的,原理其實和 Listener.Accept
是同樣的,具體調用鏈仍是首先調用 conn 的netFD.Read
,而後內部再調用 poll.FD.Read
,最後使用 linux 的系統調用 read: syscall.Read
完成數據讀取:
// Implementation of the Conn interface. // Read implements the Conn Read method. func (c *conn) Read(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } n, err := c.fd.Read(b) if err != nil && err != io.EOF { err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err} } return n, err } func (fd *netFD) Read(p []byte) (n int, err error) { n, err = fd.pfd.Read(p) runtime.KeepAlive(fd) return n, wrapSyscallError("read", err) } // Read implements io.Reader. func (fd *FD) Read(p []byte) (int, error) { if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() if len(p) == 0 { // If the caller wanted a zero byte read, return immediately // without trying (but after acquiring the readLock). // Otherwise syscall.Read returns 0, nil which looks like // io.EOF. // TODO(bradfitz): make it wait for readability? (Issue 15735) return 0, nil } if err := fd.pd.prepareRead(fd.isFile); err != nil { return 0, err } if fd.IsStream && len(p) > maxRW { p = p[:maxRW] } for { // 嘗試從該 socket 讀取數據,由於 socket 在被 listener accept 的時候設置成 // 了非阻塞 I/O,因此這裏一樣也是直接返回,無論有沒有可讀的數據 n, err := syscall.Read(fd.Sysfd, p) if err != nil { n = 0 // err == syscall.EAGAIN 表示當前沒有期待的 I/O 事件發生,也就是 socket 不可讀 if err == syscall.EAGAIN && fd.pd.pollable() { // 若是當前沒有發生期待的 I/O 事件,那麼 waitRead // 會經過 park goroutine 讓邏輯 block 在這裏 if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } // On MacOS we can see EINTR here if the user // pressed ^Z. See issue #22838. if runtime.GOOS == "darwin" && err == syscall.EINTR { continue } } err = fd.eofError(n, err) return n, err } }
conn.Write
和conn.Read
的原理是一致的,它也是經過相似 pollDesc.waitRead
的pollDesc.waitWrite
來 park 住 goroutine 直至期待的 I/O 事件發生才返回,而 pollDesc.waitWrite
的內部實現原理和pollDesc.waitRead
是同樣的,都是基於runtime_pollWait
,這裏就再也不贅述。
pollDesc.waitRead
內部調用了 runtime_pollWait
來達成無 I/O 事件時 park 住 goroutine 的目的:
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait func poll_runtime_pollWait(pd *pollDesc, mode int) int { err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris, illumos, and AIX use level-triggered IO. if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" { netpollarm(pd, mode) } // 進入 netpollblock 而且判斷是否有期待的 I/O 事件發生, // 這裏的 for 循環是爲了一直等到 io ready for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return 0 } // returns true if IO is ready, or false if timedout or closed // waitio - wait only for completed IO, ignore errors func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { // gpp 保存的是 goroutine 的數據結構 g,這裏會根據 mode 的值決定是 rg 仍是 wg // 後面調用 gopark 以後,會把當前的 goroutine 的抽象數據結構 g 存入 gpp 這個指針 gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } // set the gpp semaphore to WAIT // 這個 for 循環是爲了等待 io ready 或者 io wait for { old := *gpp // gpp == pdReady 表示此時已有期待的 I/O 事件發生, // 能夠直接返回 unblock 當前 goroutine 並執行響應的 I/O 操做 if old == pdReady { *gpp = 0 return true } if old != 0 { throw("runtime: double wait") } // 若是沒有期待的 I/O 事件發生,則經過原子操做把 gpp 的值置爲 pdWait 並退出 for 循環 if atomic.Casuintptr(gpp, 0, pdWait) { break } } // need to recheck error states after setting gpp to WAIT // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg // waitio 此時是 false,netpollcheckerr 方法會檢查當前 pollDesc 對應的 fd 是不是正常的, // 一般來講 netpollcheckerr(pd, mode) == 0 是成立的,因此這裏會執行 gopark // 把當前 goroutine 給 park 住,直至對應的 fd 上發生可讀/可寫或者其餘『期待的』I/O 事件爲止, // 而後 unpark 返回,在 gopark 內部會把當前 goroutine 的抽象數據結構 g 存入 // gpp(pollDesc.rg/pollDesc.wg) 指針裏,以便在後面的 netpoll 函數取出 pollDesc 以後, // 把 g 添加到鏈表裏返回,而後從新調度運行該 goroutine if waitio || netpollcheckerr(pd, mode) == 0 { // 註冊 netpollblockcommit 回調給 gopark,在 gopark 內部會執行它,保存當前 goroutine 到 gpp gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5) } // be careful to not lose concurrent READY notification old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady } // gopark 會停住當前的 goroutine 而且調用傳遞進來的回調函數 unlockf,從上面的源碼咱們能夠知道這個函數是 // netpollblockcommit func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { if reason != waitReasonSleep { checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy } mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. // gopark 最終會調用 park_m,在這個函數內部會調用 unlockf,也就是 netpollblockcommit, // 而後會把當前的 goroutine,也就是 g 數據結構保存到 pollDesc 的 rg 或者 wg 指針裏 mcall(park_m) } // park continuation on g0. func park_m(gp *g) { _g_ := getg() if trace.enabled { traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip) } casgstatus(gp, _Grunning, _Gwaiting) dropg() if fn := _g_.m.waitunlockf; fn != nil { // 調用 netpollblockcommit,把當前的 goroutine, // 也就是 g 數據結構保存到 pollDesc 的 rg 或者 wg 指針裏 ok := fn(gp, _g_.m.waitlock) _g_.m.waitunlockf = nil _g_.m.waitlock = nil if !ok { if trace.enabled { traceGoUnpark(gp, 2) } casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns. } } schedule() } // netpollblockcommit 在 gopark 函數裏被調用 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { // 經過原子操做把當前 goroutine 抽象的數據結構 g,也就是這裏的參數 gp 存入 gpp 指針, // 此時 gpp 的值是 pollDesc 的 rg 或者 wg 指針 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) if r { // Bump the count of goroutines waiting for the poller. // The scheduler uses this to decide whether to block // waiting for the poller if there is nothing else to do. atomic.Xadd(&netpollWaiters, 1) } return r }
前面已經從源碼的角度分析完了 netpoll 是如何經過 park goroutine 從而達到阻塞 Accept/Read/Write 的效果,而經過調用 gopark,goroutine 會被放置在某個等待隊列中(如 channel 的 waitq ,此時 G 的狀態由_Grunning
爲_Gwaitting
),所以G必須被手動喚醒(經過 goready ),不然會丟失任務,應用層阻塞一般使用這種方式。
因此,最後還有一個很是關鍵的問題是:當 I/O 事件發生以後,netpoll 是經過什麼方式喚醒那些在 I/O wait 的 goroutine 的?答案是經過 epoll_wait
,在 Go 源碼中的 src/runtime/netpoll_epoll.go
文件中有一個 func netpoll(block bool) gList
方法,它會內部調用epoll_wait
獲取就緒的 fd 列表,並將每一個 fd 對應的 goroutine 添加到鏈表返回:
// polls for ready network connections // returns list of goroutines that become runnable func netpoll(block bool) gList { if epfd == -1 { return gList{} } waitms := int32(-1) // 是否以阻塞模式調用 epoll_wait if !block { waitms = 0 } var events [128]epollevent retry: // 獲取就緒的 fd 列表 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: netpoll failed") } goto retry } // toRun 是一個 g 的鏈表,存儲要恢復的 goroutines,最後返回給調用方 var toRun gList for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32 // 判斷髮生的事件類型,讀類型或者寫類型 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { // 取出保存在 epollevent 裏的 pollDesc pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.everr = false if ev.events == _EPOLLERR { pd.everr = true } // 調用 netpollready,傳入就緒 fd 的 pollDesc,把 fd 對應的 goroutine 添加到鏈表 toRun 中 netpollready(&toRun, pd, mode) } } if block && toRun.empty() { goto retry } return toRun } // netpollready 調用 netpollunblock 返回就緒 fd 對應的 goroutine 的抽象數據結構 g func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } } // netpollunblock 會依據傳入的 mode 決定從 pollDesc 的 rg 或者 wg 取出當時 gopark 之時存入的 // goroutine 抽象數據結構 g 並返回 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g { // mode == 'r' 表明當時 gopark 是爲了等待讀事件,而 mode == 'w' 則表明是等待寫事件 gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { // 取出 gpp 存儲的 g old := *gpp if old == pdReady { return nil } if old == 0 && !ioready { // Only set READY for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil } var new uintptr if ioready { new = pdReady } // 重置 pollDesc 的 rg 或者 wg if atomic.Casuintptr(gpp, old, new) { if old == pdReady || old == pdWait { old = 0 } // 經過萬能指針還原成 g 並返回 return (*g)(unsafe.Pointer(old)) } } }
而 Go 在多種場景下均可能會調用netpoll
檢查文件描述符狀態。尋找到 I/O 就緒的 socket fd,並找到這些 socket fd 對應的輪詢器中附帶的信息,根據這些信息將以前等待這些 socket fd 就緒的 goroutine 狀態修改成 _Grunnable
。執行完netpoll
以後,會返回一個就緒 fd 列表對應的 goroutine 列表,接下來將就緒的 goroutine 加入到調度隊列中,等待調度運行。
首先,在 Go runtime scheduler 正常調度 goroutine 之時就有可能會調用netpoll
獲取到已就緒的 fd 對應的 goroutine 來調度執行:
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() { // ... if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } // ... } // Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from global queue, poll network. func findrunnable() (gp *g, inheritTime bool) { // ... // Poll network. // This netpoll is only an optimization before we resort to stealing. // We can safely skip it if there are no waiters or a thread is blocked // in netpoll already. If there is any kind of logical race with that // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway. if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(false); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // ... }
Go scheduler 的核心方法schedule
裏會調用一個叫findrunable()
的方法獲取可運行的 goroutine 來執行,而在 findrunable()
方法裏就調用了netpoll
獲取已就緒的 fd 列表對應的 goroutine 列表。
另外, sysmon
監控線程也可能會調用到netpoll
:
// Always runs without a P, so write barriers are not allowed. // //go:nowritebarrierrec func sysmon() { // ... now := nanotime() if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) // 以非阻塞的方式調用 netpoll 獲取就緒 fd 列表 list := netpoll(false) // non-blocking - returns list of goroutines if !list.empty() { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) // 將其插入調度器的runnable列表中(全局),等待被調度執行 injectglist(&list) incidlelocked(1) } } // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } // check if we need to force a GC if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 { lock(&forcegc.lock) forcegc.idle = 0 var list gList list.push(forcegc.g) injectglist(&list) unlock(&forcegc.lock) } if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now { lasttrace = now schedtrace(debug.scheddetail > 0) } } }
Go runtime 在程序啓動的時候會建立一個獨立的 M 做爲監控線程,叫sysmon
,這個線程爲系統級的 daemon 線程,無需 P 便可運行,sysmon
每 20us~10ms 運行一次。sysmon
中以輪詢的方式執行如下操做(如上面的代碼所示):
runtime.netpoll
,從中找出能從網絡 I/O 中喚醒的 G,並調用injectglist
,將其插入調度器的 runnable 列表中(全局),調度觸發時,有可能從這個全局 runnable 列表獲取 G。而後再循環調用startm
,直到全部 P 都不處於 _Pidle
狀態。retake
,搶佔長時間處於_Psyscall
狀態的 P。綜上,Go 藉助於 epoll/kqueue/iocp 和 runtime scheduler 等的幫助,設計出了本身的 I/O 多路複用 netpoll,成功地讓 Listener.Accept
/conn.Read
/conn.Write
等方法從開發者的角度看來是同步模式。
經過前面對源碼的分析,咱們如今知道 Go netpoll 依託於 runtime scheduler,爲開發者提供了一種強大的同步網絡編程模式;然而,Go netpoll 存在的意義卻遠不止於此,Go netpoll I/O 多路複用搭配 Non-blocking I/O 而打造出來的這個原生網絡模型,它最大的價值是把網絡 I/O 的控制權緊緊掌握在 Go 本身的 runtime 裏,關於這一點咱們須要從 Go 的 runtime scheduler 提及,Go 的 G-P-M 調度模型以下:
G 在運行過程當中若是被阻塞在某個 system call 操做上,那麼不光 G 會阻塞,執行該 G 的 M 也會解綁 P(實質是被 sysmon 搶走了),與 G 一塊兒進入 sleep 狀態。若是此時有 idle 的 M,則 P 與其綁定繼續執行其餘 G;若是沒有 idle M,但仍然有其餘 G 要去執行,那麼就會建立一個新的 M。當阻塞在 system call 上的 G 完成 syscall 調用後,G 會去嘗試獲取一個可用的 P,若是沒有可用的 P,那麼 G 會被標記爲_Grunnable
並把它放入全局的 runqueue 中等待調度,以前的那個 sleep 的 M 將再次進入 sleep。
如今清楚爲何 netpoll 爲何必定要使用非阻塞 I/O 了吧?就是爲了不讓操做網絡 I/O 的 goroutine 陷入到系統調用從而進入內核態,由於一旦進入內核態,整個程序的控制權就會發生轉移(到內核),再也不屬於用戶進程了,那麼也就沒法藉助於 Go 強大的 runtime scheduler 來調度業務程序的併發了;而有了 netpoll 以後,藉助於非阻塞 I/O ,G 就不再會由於系統調用的讀寫而陷入內核態,當 G 被阻塞在某個 network I/O 操做上時,實際上它不是由於陷入內核態被阻塞住了,而是被 Go runtime 調用 gopark 給 park 住了,此時 G 會被放置到某個 wait queue 中,而 M 會嘗試運行下一個_Grunnable
的 G,若是此時沒有_Grunnable
的 G 供 M 運行,那麼 M 將解綁P,並進入 sleep 狀態。當 I/O available,在 wait queue 中的 G 會被喚醒,標記爲_Grunnable
,放入某個可用的 P 的 local 隊列中,綁定一個 M 恢復執行。
Go netpoll 的設計不可謂不精巧、性能也不可謂不高效,配合 goroutine 寫網絡程序是真的爽:簡潔高效。然而,沒有任何一種設計和架構是完美的,goroutine-per-connection
這種模式雖然簡單高效,可是在某些極端的場景下也會暴露出問題:goroutine 雖然很是輕量,它的自定義棧內存初始值僅爲 2KB,後面按需擴容;海量鏈接的業務場景下,goroutine-per-connection
,此時 goroutine 數量以及消耗的資源就會呈線性趨勢暴漲,首先給 Go runtime scheduler 形成極大的壓力和侵佔系統資源,而後資源被侵佔又反過來影響 runtime 的調度,致使性能大幅降低。
目前在 Linux 平臺下構建的高性能網絡程序中,大都使用 Reactor 模式,好比 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python)等。
Reactor 模式本質上指的是使用I/O 多路複用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)
的模式。
一般設置一個主線程負責作 event-loop 事件循環和 I/O 讀寫,經過 select/poll/epoll_wait 等系統調用監聽 I/O 事件,業務邏輯提交給其餘工做線程去作。而所謂『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其餘的 I/O 系統調用上,這樣能夠最大限度的複用 event-loop 線程,讓一個線程能服務於多個 sockets。在 Reactor 模式中,I/O 線程只能阻塞在 I/O multiplexing 函數上(select/poll/epoll_wait)。
Reactor 模式一般的工做流程以下:
bind&listen
以後,將 listenfd 註冊到epollfd中,最後進入 event-loop事件循環。循環過程當中會調用select/poll/epoll_wait
阻塞等待,如有在 listenfd 上的新鏈接事件則解除阻塞返回,並調用socket.accept
接收新鏈接 connfd,並將 connfd 加入到 epollfd 的 I/O 複用(監聽)隊列。select/poll/epoll_wait
的阻塞等待,而後進行 I/O 讀寫操做,這裏讀寫 I/O 都是非阻塞 I/O,這樣纔不會阻塞 event-loop 的下一個循環。然而,這樣容易割裂業務邏輯,不易理解和維護。read
讀取數據以後進行解碼並放入隊列中,等待工做線程處理。write
把數據寫回 client。accept 鏈接以及 conn 上的讀寫操做如果在主線程完成,則要求是非阻塞 I/O,由於 Reactor 模式一條最重要的原則就是:I/O 操做不能阻塞 event-loop 事件循環。實際上 event loop 可能也能夠是多線程的,只是一個線程裏只有一個 select/poll/epoll_wait。
上面提到了 Go netpoll 在某些場景下可能由於建立太多的 goroutine 而過多地消耗系統資源,而在現實世界的網絡業務中,服務器持有的海量鏈接中在極短的時間窗口內只有極少數是 active 而大多數則是 idle,就像這樣(非真實數據,僅僅是爲了比喻):
那麼爲每個鏈接指派一個 goroutine 就顯得太過奢侈了,而 Reactor 模式這種利用 I/O 多路複用進而只須要使用少許線程便可管理海量鏈接的設計就能夠在這樣網絡業務中大顯身手了:
在絕大部分應用場景下,我推薦你們仍是遵循 Go 的 best practices,以這種 netpoll 模式來構建本身的網絡應用。然而,在某些極度追求性能、壓榨系統資源以及技術棧必須是原生 Go (不考慮 C/C++ 寫中間層而 Go 寫業務層)的業務場景下,咱們能夠考慮本身構建 Reactor 網絡模型。
gnet 是一個基於事件驅動的高性能和輕量級網絡框架,支持多種協議:TCP/UDP/Unix-Socket。它直接使用 epoll 和 kqueue 系統調用而非標準 Golang 網絡包:net 來構建網絡應用,它的工做原理相似兩個開源的網絡庫:netty 和 libuv。
gnet 的亮點在於它是一個高性能、輕量級、非阻塞的純 Go 實現的傳輸層(TCP/UDP/Unix-Socket)網絡框架,開發者可使用 gnet 來實現本身的應用層網絡協議,從而構建出本身的應用層網絡應用:好比在 gnet 上實現 HTTP 協議就能夠建立出一個 HTTP 服務器 或者 Web 開發框架,實現 Redis 協議就能夠建立出本身的 Redis 服務器等等。
gnet,在某些極端的網絡業務場景,好比海量鏈接、高頻建立銷燬鏈接等等場景,gnet 在性能和資源佔用上都遠超 Go 原生的 net 包(基於 netpoll)。
gnet
已經實現了Multi-Reactors
和Multi-Reactors + Goroutine Pool
兩種網絡模型,也得益於這些網絡模型,使得gnet
成爲一個高性能和低損耗的 Go 網絡框架:
gnet
客戶端