與其餘語言的網絡IO強調異步非阻塞不一樣,GOLANG裏的網絡IO模型是:建立多個goroutine,每一個goroutine的網絡IO都是阻塞的,這樣的代碼很是直觀php
但低層,全部的網絡IO實際上都是非阻塞的linux
以net.Dial爲例子,其餘的Read/Write機制相似golang
Read的原理:算法
for {編程
fd.read()swift
if err == EAGAIN {數組
pollserver.WaitRead(fd)微信
continue網絡
}架構
break
}
網絡關鍵API的實現,主要包括Listen、Accept、Read、Write等。 另外,爲了突出關鍵流程,咱們選擇忽略全部的錯誤。這樣可使得代碼看起來更爲簡單。 並且咱們只關注tcp協議實現,udp和unix socket不是咱們關心的。
func Listen(net, laddr string) (Listener, error) { la, err := resolveAddr("listen", net, laddr, noDeadline) ...... switch la := la.toAddr().(type) { case *TCPAddr: l, err = ListenTCP(net, la) case *UnixAddr: ...... } ......} // 對於tcp協議,返回的的是TCPListenerfunc ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) { ...... fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen") ...... return &TCPListener{fd}, nil}func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) { ...... return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)} func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) { // 建立底層socket,設置屬性爲O_NONBLOCK s, err := sysSocket(family, sotype, proto) ...... setDefaultSockopts(s, family, sotype, ipv6only) // 建立新netFD結構 fd, err = newFD(s, family, sotype, net) ...... if laddr != nil && raddr == nil { switch sotype { case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: // 調用底層listen監聽建立的套接字 fd.listenStream(laddr, listenerBacklog) return fd, nil case syscall.SOCK_DGRAM: ...... } } } // 最終調用該函數來建立一個socket// 而且將socket屬性設置爲O_NONBLOCKfunc sysSocket(family, sotype, proto int) (int, error) { syscall.ForkLock.RLock() s, err := syscall.Socket(family, sotype, proto) if err == nil { syscall.CloseOnExec(s) } syscall.ForkLock.RUnlock() if err != nil { return -1, err } if err = syscall.SetNonblock(s, true); err != nil { syscall.Close(s) return -1, err} return s, nil} func (fd *netFD) listenStream(laddr sockaddr, backlog int) error { if err := setDefaultListenerSockopts(fd.sysfd) if lsa, err := laddr.sockaddr(fd.family); err != nil { return err } else if lsa != nil { // Bind綁定至該socket if err := syscall.Bind(fd.sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } } // 監聽該socket if err := syscall.Listen(fd.sysfd, backlog); // 這裏很是關鍵:初始化socket與異步IO相關的內容 if err := fd.init(); err != nil { return err } lsa, _ := syscall.Getsockname(fd.sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil}
咱們這裏看到了如何實現Listen。流程基本都很簡單,可是由於咱們使用了異步編程,所以,咱們在Listen完該socket後,還必須將其添加到監聽隊列中,之後該socket有事件到來時可以及時通知到。
對linux有所瞭解的應該都知道epoll,沒錯golang使用的就是epoll機制來實現socket事件通知。那咱們看對一個監聽socket,是如何將其添加到epoll的監聽隊列中呢?
func (fd *netFD) init() error { if err := fd.pd.Init(fd); err != nil { return err } return nil} func (pd *pollDesc) Init(fd *netFD) error { // 利用了Once機制,保證一個進程只會執行一次 // runtime_pollServerInit: // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0 // JMP runtime·netpollServerInit(SB) serverInit.Do(runtime_pollServerInit) // runtime_pollOpen: // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0 // JMP runtime·netpollOpen(SB) ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) if errno != 0 { return syscall.Errno(errno) } pd.runtimeCtx = ctx return nil}
這裏就是socket異步編程的關鍵:
netpollServerInit()初始化異步編程結構,對於epoll,該函數是netpollinit,且使用Once機制保證一個進程 只會初始化一次;
func netpollinit() { epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd >= 0 { return } epfd = epollcreate(1024) if epfd >= 0 { closeonexec(epfd) return } ......}
netpollOpen則在socket被建立出來後將其添加到epoll隊列中,對於epoll,該函數被實例化爲netpollopen。
func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)}
OK,看到這裏,咱們也就明白了,監聽一個套接字的時候無非就是傳統的socket異步編程,而後將該socket添加到 epoll的事件監聽隊列中。
Accept
既然咱們描述的重點的tcp協議,所以,咱們看看TCPListener的Accept方法是怎麼實現的:
func (l *TCPListener) Accept() (Conn, error) { c, err := l.AcceptTCP() ......} func (l *TCPListener) AcceptTCP() (*TCPConn, error) { ...... fd, err := l.fd.accept() ...... // 返回給調用者一個新的TCPConn return newTCPConn(fd), nil} func (fd *netFD) accept() (netfd *netFD, err error) { // 爲何對該函數加讀鎖? if err := fd.readLock(); err != nil { return nil, err } defer fd.readUnlock() ...... for { // 這個accept是golang包裝的系統調用 // 用來處理跨平臺 s, rsa, err = accept(fd.sysfd) if err != nil { if err == syscall.EAGAIN { // 若是沒有可用鏈接,WaitRead()阻塞該協程 // 後面會詳細分析WaitRead. if err = fd.pd.WaitRead(); err == nil { continue } } else if err == syscall.ECONNABORTED { // 若是鏈接在Listen queue時就已經被對端關閉 continue } } break } netfd, err = newFD(s, fd.family, fd.sotype, fd.net)...... // 這個前面已經分析,將該fd添加到epoll隊列中 err = netfd.init() ...... lsa, _ := syscall.Getsockname(netfd.sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil}
從前面的編程事例中咱們知道,通常在主協程中會accept新的connection,使用異步編程咱們知道,若是沒有 新鏈接到來,該協程會一直被阻塞,直到新鏈接到來有人喚醒了該協程。
通常在主協程中調用accept,若是返回值爲EAGAIN,則調用WaitRead來阻塞當前協程,後續在該socket有事件到來時被喚醒,WaitRead以及喚醒過程咱們會在後面仔細分析。
Read
func (c *conn) Read(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } return c.fd.Read(b)} func (fd *netFD) Read(p []byte) (n int, err error) { // 爲何對函數調用加讀鎖 if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() // 這個又是幹嗎? if err := fd.pd.PrepareRead(); err != nil { return 0, &OpError{"read", fd.net, fd.raddr, err} }
for { n, err = syscall.Read(int(fd.sysfd), p) if err != nil { n = 0 // 若是返回EAGIN,阻塞當前協程直到有數據可讀被喚醒 if err == syscall.EAGAIN { if err = fd.pd.WaitRead(); err == nil { continue } } } // 檢查錯誤,封裝io.EOF err = chkReadErr(n, err, fd) break } if err != nil && err != io.EOF { err = &OpError{"read", fd.net, fd.raddr, err} } return}func chkReadErr(n int, err error, fd *netFD) error { if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW { return io.EOF } return err}
每次Read不能保證能夠讀到想讀的那麼多內容,好比緩衝區大小是10,而實際可能只讀到5,應用程序須要可以處理這種狀況。
Write
func (fd *netFD) Write(p []byte) (nn int, err error) { // 爲何這裏加寫鎖 if err := fd.writeLock(); err != nil { return 0, err } defer fd.writeUnlock() // 這個是幹什麼? if err := fd.pd.PrepareWrite(); err != nil { return 0, &OpError{"write", fd.net, fd.raddr, err} } // nn記錄總共寫入的數據量,每次Write可能只能寫入部分數據 for { var n int n, err = syscall.Write(int(fd.sysfd), p[nn:]) if n > 0 { nn += n } // 若是數組數據已經所有寫完,函數返回 if nn == len(p) { break } // 若是寫入數據時被block了,阻塞當前協程 if err == syscall.EAGAIN { if err = fd.pd.WaitWrite(); err == nil { continue } } if err != nil { n = 0 break } // 若是返回值爲0,表明了什麼? if n == 0 { err = io.ErrUnexpectedEOF break } } if err != nil { err = &OpError{"write", fd.net, fd.raddr, err} } return nn, err}
注意Write語義與Read不同的地方:
Write儘可能將用戶緩衝區的內容所有寫入至底層socket,若是遇到socket暫時不可寫入,會阻塞當前協程; Read在某次讀取成功時當即返回,可能會致使讀取的數據量少於用戶緩衝區的大小; 爲何會在實現上有此不一樣,我想可能read的優先級比較高吧,應用程序可能一直在等着,咱們不能等到數據一直讀完才返回,會阻塞用戶。 而寫不同,優先級相對較低,並且用戶通常也不着急寫當即返回,因此能夠將全部的數據所有寫入,並且這樣 也能簡化應用程序的寫法。
當系統調用返回EAGAIN時,會調用WaitRead/WaitWrite來阻塞當前協程,如今咱們接着聊。
WaitRead/WaitWrite
func (pd *pollDesc) Wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res)
}
func (pd *pollDesc) WaitRead() error {
return pd.Wait('r')
}
func (pd *pollDesc) WaitWrite() error {
return pd.Wait('w')
}
最終runtime_pollWait走到下面去了:
TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0
JMP runtime·netpollWait(SB)
咱們仔細考慮應該明白:netpollWait的主要做用是:等待關心的socket是否有事件(其實後面咱們知道只是等待一個標記位是否發生改變),若是沒有事件,那麼就將當前的協程掛起,直到有通知事件發生,咱們接下來看看到底如何實現:
func netpollWait(pd *pollDesc, mode int) int {
// 先檢查該socket是否有error發生(如關閉、超時等)
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
onM(func() {
netpollarm(pd, mode)
})
}
// 循環等待netpollblock返回值爲true
// 若是返回值爲false且該socket未出現任何錯誤
// 那該協程可能被意外喚醒,須要從新被掛起
// 還有一種可能:該socket因爲超時而被喚醒
// 此時netpollcheckerr就是用來檢測超時錯誤的
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 首先將輪詢狀態設置爲pdWait
// 爲何要使用for呢?由於casuintptr使用了自旋鎖
// 爲何使用自旋鎖就要加for循環呢?
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
gothrow("netpollblock: double wait")
}
// 將socket輪詢相關的狀態設置爲pdWait
if casuintptr(gpp, 0, pdWait) {
break
}
}
// 若是未出錯將該協程掛起,解鎖函數是netpollblockcommit
if waitio || netpollcheckerr(pd, mode) == 0 {
f := netpollblockcommit
gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
}
// 多是被掛起的協程被喚醒
// 或者因爲某些緣由該協程壓根未被掛起
// 獲取其當前狀態記錄在old中
old := xchguintptr(gpp, 0)
if old > pdWait {
gothrow("netpollblock: corrupted state")
}
return old == pdReady
}
從上面的分析咱們看到,若是沒法讀寫,golang會將當前協程掛起,在協程被喚醒的時候,該標記位應該會被置位。 咱們接下來看看這些掛起的協程什麼時候會被喚醒。
事件通知
golang運行庫在系統運行過程當中存在socket事件檢查點,目前,該檢查點主要位於如下幾個地方:
runtime·startTheWorldWithSema(void):在完成gc後;
findrunnable():這個暫時不知道什麼時候會觸發?
sysmon:golang中的監控協程,會週期性檢查就緒socket
TODO: 爲何是在這些地方檢查socket就緒事件呢?
接下來咱們看看如何檢查socket就緒事件,在socket就緒後又是如何喚醒被掛起的協程?主要調用函數runtime-netpoll()
咱們只關注epoll的實現,對於epoll,上面的方法具體實現是netpoll_epoll.go中的netpoll
func netpoll(block bool) (gp *g) {
if epfd == -1 {
return
}
waitms := int32(-1)
if !block {
// 若是調用者不但願block
// 設置waitsm爲0
waitms = 0
}
var events [128]epollevent
retry:
// 調用epoll_wait獲取就緒事件
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
...
}
goto retry
}
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
// 對每一個事件,調用了netpollready
// pd主要記錄了與該socket關聯的等待協程
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
}
}
// 若是調用者同步等待且本次未獲取到就緒socket
// 繼續重試
if block && gp == nil {
goto retry
}
return gp
}
這個函數主要調用epoll_wait(固然,golang封裝了系統調用)來獲取就緒socket fd,對每一個就緒的fd,調用netpollready()做進一步處理。這個函數的最終返回值就是一個已經就緒的協程(g)鏈表。
netpollready主要是將該socket fd標記爲IOReady,並喚醒等待在該fd上的協程g,將其添加到傳入的g鏈表中。
// make pd ready, newly runnable goroutines (if any) are returned in rg/wg
func netpollready(gpp **g, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// 將就緒協程添加至鏈表中
if rg != nil {
rg.schedlink = *gpp
*gpp = rg
}
if wg != nil {
wg.schedlink = *gpp
*gpp = wg
}
}
// 將pollDesc的狀態置爲pdReady並返回就緒協程
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))
}
}
}
疑問:一個fd會被多個協程同時進行IO麼?好比一個協程讀,另一個協程寫?或者多個協程同時讀?此時返回的是哪一個協程就緒呢?
一個socket fd可支持併發讀寫,由於對於tcp協議來講,是全雙工。讀寫操做的是不一樣緩衝區,可是不支持併發讀和併發寫,由於這樣會錯亂的。因此上面的netFD.RWLock()就是幹這個做用的。
runtime中的epoll事件驅動抽象層其實在進入net庫後,又被封裝了一次,這一次封裝從代碼上看主要是爲了方便在純Go語言環境進行操做,net庫中的此次封裝實如今poll/fd_poll_runtime.go文件中,主要是經過pollDesc對象來實現的:
(ps: 這裏對應的版本是go1.9.1 的版本)
注意:此處的pollDesc對象不是上文提到的runtime中的PollDesc,相反此處pollDesc對象的runtimeCtx成員纔是指向的runtime的PollDesc實例。pollDesc對象主要就是將runtime的事件驅動抽象層給再封裝了一次,供網絡fd對象使用。
pollDesc對象最須要關注的就是其Init方法,這個方法經過一個sync.Once變量來調用了runtime_pollServerInit函數,也就是建立epoll實例的函數。
意思就是runtime_pollServerInit函數在整個進程生命週期內只會被調用一次,也就是隻會建立一次epoll實例。epoll實例被建立後,會調用runtime_pollOpen函數將fd添加到epoll中。
網絡編程中的全部socket fd都是經過netFD對象實現的,netFD是對網絡IO操做的抽象,linux的實如今文件net/fd_unix.go中。netFD對象實現有本身的init方法,還有完成基本IO操做的Read和Write方法,固然除了這三個方法之外,還有不少很是有用的方法供用戶使用。
/src/net/fd_unix.go
經過netFD對象的定義能夠看到每一個fd都關聯了一個pollDesc實例,經過上文咱們知道pollDesc對象最終是對epoll的封裝。
netFD對象的init函數僅僅是調用了pollDesc實例的Init函數,做用就是將fd添加到epoll中,若是這個fd是第一個網絡socket fd的話,這一次init還會擔任建立epoll實例的任務。要知道在Go進程裏,只會有一個epoll實例來管理全部的網絡socket fd,這個epoll實例也就是在第一個網絡socket fd被建立的時候所建立。
/src/net/fd_unix.go
Read()函數:
重點關注這個for循環中的syscall.Read調用的錯誤處理。當有錯誤發生的時候,會檢查這個錯誤是不是syscall.EAGAIN,若是是,則調用WaitRead將當前讀這個fd的goroutine給park住,直到這個fd上的讀事件再次發生爲止。
當這個socket上有新數據到來的時候,WaitRead調用返回,繼續for循環的執行。這樣的實現,就讓調用netFD的Read的地方變成了同步「阻塞」方式編程,再也不是異步非阻塞的編程方式了。netFD的Write方法和Read的實現原理是同樣的,都是在碰到EAGAIN錯誤的時候將當前goroutine給park住直到socket再次可寫爲止。
本文只是將網絡庫的底層實現給大致上引導了一遍,知道底層代碼大概實如今什麼地方,方便結合源碼深刻理解。Go語言中的高併發、同步阻塞方式編程的關鍵實際上是」goroutine和調度器」,針對網絡IO的時候,咱們須要知道EAGAIN這個很是關鍵的調度點,掌握了這個調度點,即便沒有調度器,本身也能夠在epoll的基礎上配合協程等用戶態線程實現網絡IO操做的調度,達到同步阻塞編程的目的。
最後,爲何須要同步阻塞的方式編程?只有看多、寫多了異步非阻塞代碼的時候纔可以深切體會到這個問題。真正的高大上絕對不是——「別人不會,我會;別人寫不出來,我寫得出來。」
EAGAIN:
本文分享自微信公衆號 - golang算法架構leetcode技術php(golangLeetcode)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。