Golang網絡庫中socket阻塞調度源碼剖析

本文分析了Golang的socket文件描述符和goroutine阻塞調度的原理。代碼中大部分是Go代碼,小部分是彙編代碼。完整理解本文須要Go語言知識,而且用Golang寫過網絡程序。更重要的是,須要提早理解goroutine的調度原理。cookie

1. TCP的鏈接對象:

鏈接對象:

在net.go中有一個名爲Conn的接口,提供了對於鏈接的讀寫和其餘操做:網絡

type Conn interface { Read(b []byte) (n int, err error) Write(b []byte) (n int, err error) Close() error LocalAddr() Addr RemoteAddr() Addr SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error }

這個接口就是對下面的結構體conn的抽象。conn結構體包含了對鏈接的讀寫和其餘操做:數據結構

type conn struct { fd *netFD }

從鏈接讀取數據:

// Read implements the Conn Read method. func (c *conn) Read(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } return c.fd.Read(b) }

向鏈接寫入數據:

// Write implements the Conn Write method. func (c *conn) Write(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } return c.fd.Write(b) }

關閉鏈接:

// Close closes the connection. func (c *conn) Close() error { if !c.ok() { return syscall.EINVAL } return c.fd.Close() }

設置讀寫超時:

// SetDeadline implements the Conn SetDeadline method. func (c *conn) SetDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setDeadline(t) } // SetReadDeadline implements the Conn SetReadDeadline method. func (c *conn) SetReadDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setReadDeadline(t) } // SetWriteDeadline implements the Conn SetWriteDeadline method. func (c *conn) SetWriteDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setWriteDeadline(t) }

能夠看到,對鏈接的全部操做,都體如今對*netFD的操做上。咱們繼續跟蹤c.fd.Read()函數.

2.文件描述符

net/fd_unix.go:app

網絡鏈接的文件描述符:

// Network file descriptor. type netFD struct { // locking/lifetime of sysfd + serialize access to Read and Write methods fdmu fdMutex // immutable until Close sysfd int family int sotype int isConnected bool net string laddr Addr raddr Addr // wait server pd pollDesc }

文件描述符讀取數據:

func (fd *netFD) Read(p []byte) (n int, err error) { if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() if err := fd.pd.PrepareRead(); err != nil { return 0, &OpError{"read", fd.net, fd.raddr, err} } // 調用system call,循環從fd.sysfd讀取數據 for { // 系統調用Read讀取數據 n, err = syscall.Read(int(fd.sysfd), p) // 若是發生錯誤,則須要處理 // 而且只處理EAGAIN類型的錯誤,其餘錯誤一概返回給調用者 if err != nil { n = 0 // 對於非阻塞的網絡鏈接的文件描述符,若是錯誤是EAGAIN // 說明Socket的緩衝區爲空,未讀取到任何數據 // 則調用fd.pd.WaitRead, if err == syscall.EAGAIN { if err = fd.pd.WaitRead(); err == nil { continue } } } err = chkReadErr(n, err, fd) break } if err != nil && err != io.EOF { err = &OpError{"read", fd.net, fd.raddr, err} } return }

網絡輪詢器

網絡輪詢器是Golang中針對每一個socket文件描述符創建的輪詢機制。 此處的輪詢並非通常意義上的輪詢,而是Golang的runtime在調度goroutine或者GC完成以後或者指定時間以內,調用epoll_wait獲取全部產生IO事件的socket文件描述符。固然在runtime輪詢以前,須要將socket文件描述符和當前goroutine的相關信息加入epoll維護的數據結構中,並掛起當前goroutine,當IO就緒後,經過epoll返回的文件描述符和其中附帶的goroutine的信息,從新恢復當前goroutine的執行。socket

// Integrated network poller (platform-independent part). // 網絡輪詢器(平臺獨立部分) // A particular implementation (epoll/kqueue) must define the following functions: // 實際的實現(epoll/kqueue)必須定義如下函數: // func netpollinit() // to initialize the poller,初始化輪詢器 // func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications, 爲fd和pd啓動邊緣觸發通知 // and associate fd with pd. // 一個實現必須調用下面的函數,用來指示pd已經準備好 // An implementation must call the following function to denote that the pd is ready. // func netpollready(gpp **g, pd *pollDesc, mode int32) // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer // goroutines respectively. The semaphore can be in the following states: // pollDesc包含了2個二進制的信號,分別負責讀寫goroutine的暫停. // 信號可能處於下面的狀態: // pdReady - IO就緒通知被掛起; // 一個goroutine將次狀態置爲nil來消費一個通知。 // pdReady - io readiness notification is pending; // a goroutine consumes the notification by changing the state to nil. // pdWait - 一個goroutine準備暫停在信號上,可是尚未完成暫停。 // 這個goroutine經過把這個狀態改變爲G指針去提交這個暫停動做。 // 或者,替代性的,並行的其餘通知將狀態改變爲READY. // 或者,替代性的,並行的超時/關閉會將次狀態變爲nil // pdWait - a goroutine prepares to park on the semaphore, but not yet parked; // the goroutine commits to park by changing the state to G pointer, // or, alternatively, concurrent io notification changes the state to READY, // or, alternatively, concurrent timeout/close changes the state to nil. // G指針 - 阻塞在信號上的goroutine // IO通知或者超時/關閉會分別將此狀態置爲READY或者nil. // G pointer - the goroutine is blocked on the semaphore; // io notification or timeout/close changes the state to READY or nil respectively // and unparks the goroutine. // nil - nothing of the above. const ( pdReady uintptr = 1 pdWait uintptr = 2 )

網絡輪詢器的數據結構以下:函數

// Network poller descriptor. // 網絡輪詢器描述符 type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification) // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated // in a lock-free way by all operations. // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg), // that will blow up when GC starts moving objects. // // lock鎖對象保護了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操做。 // 而這些操做又徹底包含了對seq, rt, tw變量。 // fd在PollDesc整個生命過程當中都是一個常量。 // 處理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就緒通知)不須要用到鎖。 // 因此closing, rg, rd, wg和wd的全部操做都是一個無鎖的操做。 lock mutex // protectes the following fields fd uintptr closing bool seq uintptr // protects from stale timers and ready notifications rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline user unsafe.Pointer // user settable cookie }

將當前goroutine設置爲阻塞在fd上:

pd.WaitRead():ui

func (pd *pollDesc) WaitRead() error { return pd.Wait('r') } func (pd *pollDesc) Wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res) }

res是runtime_pollWait函數返回的結果,由conevertErr函數包裝後返回:this

func convertErr(res int) error { switch res { case 0: return nil case 1: return errClosing case 2: return errTimeout } println("unreachable: ", res) panic("unreachable") }
  • 函數返回0,表示IO已經準備好,返回nil。
  • 返回1,說明鏈接已關閉,應該放回errClosing。
  • 返回2,說明對IO進行的操做發生超時,應該返回errTimeout。

runtime_pollWait會調用runtime/thunk.s中的函數:spa

TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0 JMP runtime·netpollWait(SB)

這是一個包裝函數,沒有參數,直接跳轉到runtime/netpoll.go中的函數netpollWait:.net

func netpollWait(pd *pollDesc, mode int) int { // 檢查pd的狀態是否異常 err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris uses level-triggered IO. if GOOS == "solaris" { onM(func() { netpollarm(pd, mode) }) } // 循環中檢查pd的狀態是否是已經被設置爲pdReady // 即檢查IO是否是已經就緒 for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return 0 }

netpollcheckerr函數檢查pd是否出現異常:

 // 檢查pd的異常 func netpollcheckerr(pd *pollDesc, mode int32) int { // 是否已經關閉 if pd.closing { return 1 // errClosing } // 當讀寫狀態下,deadline小於0,表示pd已通過了超時時間 if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) { return 2 // errTimeout } // 正常狀況返回0 return 0 }

netpollblock():

// returns true if IO is ready, or false if timedout or closed // waitio - wait only for completed IO, ignore errors // 這個函數被netpollWait循環調用 // 返回true說明IO已經準備好,返回false說明IO操做已經超時或者已經關閉 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { // 獲取pd的rg gpp := &pd.rg // 若是模式是w,則獲取pd的wg if mode == 'w' { gpp = &pd.wg } // set the gpp semaphore to WAIT // 在循環中設置pd的gpp爲pdWait // 由於casuintptr是自旋鎖,因此須要在循環中調用 for { // 若是在循環中發現IO已經準備好(pg的rg或者wg爲pdReady狀態) // 則設置rg/wg爲0,返回true old := *gpp if old == pdReady { *gpp = 0 return true } // 每次netpollblock執行完畢以後,gpp重置爲0 // 非0表示重複wait if old != 0 { gothrow("netpollblock: double wait") } // CAS操做改變gpp爲pdWait if casuintptr(gpp, 0, pdWait) { break } } // need to recheck error states after setting gpp to WAIT // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg // // 當設置gpp爲pdWait狀態後,從新檢查gpp的狀態 // 這是必要的,由於runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl會作相反的操做 // 若是狀態正常則掛起當前的goroutine // // 當netpollcheckerr檢查io出現超時或者錯誤,waitio爲true可用於等待ioReady // 不然當waitio爲false, 且io不出現錯誤或者超時纔會掛起當前goroutine if waitio || netpollcheckerr(pd, mode) == 0 { // 解鎖函數,設置gpp爲pdWait,若是設置不成功 // 說明已是發生其餘事件,可讓g繼續運行,而不是掛起當前g f := netpollblockcommit // 嘗試掛起當前g gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait") } // be careful to not lose concurrent READY notification old := xchguintptr(gpp, 0) if old > pdWait { gothrow("netpollblock: corrupted state") } return old == pdReady }

runtime/proc.go: gopark():

// Puts the current goroutine into a waiting state and calls unlockf. // If unlockf returns false, the goroutine is resumed. // 將當前goroutine置爲waiting狀態,而後調用unlockf func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) { // 獲取當前M mp := acquirem() // 獲取當前G gp := mp.curg // 獲取G的狀態 status := readgstatus(gp) // 若是不是_Grunning或者_Gscanrunning,則報錯 if status != _Grunning && status != _Gscanrunning { gothrow("gopark: bad g status") } // 設置lock和unlockf mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason releasem(mp) // can't do anything that might move the G between Ms here. // 在m->g0這個棧上調用park_m,而不是當前g的棧 mcall(park_m) }

mcall函數是一段彙編,在m->g0的棧上調用park_m,而不是在當前goroutine的棧上。mcall的功能分兩部分,第一部分保存當前G的PC/SP到G的gobuf的pc/sp字段,第二部分調用park_m函數:

// func mcall(fn func(*g)) // Switch to m->g0's stack, call fn(g). // Fn must never return. It should gogo(&g->sched) // to keep running g. TEXT runtime·mcall(SB), NOSPLIT, $0-8 // 將須要執行的函數保存在DI MOVQ fn+0(FP), DI // 將M的TLS存放在CX get_tls(CX) // 將G對象存放在AX MOVQ g(CX), AX // save state in g->sched // 將調用者的PC存放在BX MOVQ 0(SP), BX // caller's PC // 將調用者的PC保存到g->sched.pc MOVQ BX, (g_sched+gobuf_pc)(AX) // 第一個參數的地址,即棧頂的地址,保存到BX LEAQ fn+0(FP), BX // caller's SP // 保存SP的地址到g->sched.sp MOVQ BX, (g_sched+gobuf_sp)(AX) // 將g對象保存到g->sched->g MOVQ AX, (g_sched+gobuf_g)(AX) // switch to m->g0 & its stack, call fn // 將g對象指針保存到BX MOVQ g(CX), BX // 將g->m保存到BX MOVQ g_m(BX), BX // 將m->g0保存到SI MOVQ m_g0(BX), SI CMPQ SI, AX // if g == m->g0 call badmcall JNE 3(PC) MOVQ $runtime·badmcall(SB), AX JMP AX // 將m->g0保存到g MOVQ SI, g(CX) // g = m->g0 // 將g->sched.sp恢復到SP寄存器 // 即便用g0的棧 MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp // AX進棧 PUSHQ AX MOVQ DI, DX // 將fn的地址複製到DI MOVQ 0(DI), DI // 調用函數 CALL DI // AX出棧 POPQ AX MOVQ $runtime·badmcall2(SB), AX JMP AX RET

park_m函數的功能分爲三部分,第一部分讓當前G和當前M脫離關係,第二部分是調用解鎖函數,這裏是調用netpoll.go源文件中的netpollblockcommit函數:

// runtime·park continuation on g0. void runtime·park_m(G *gp) { bool ok; // 設置當前g爲Gwaiting狀態 runtime·casgstatus(gp, Grunning, Gwaiting); // 讓當前g和m脫離關係 dropg(); if(g->m->waitunlockf) { ok = g->m->waitunlockf(gp, g->m->waitlock); g->m->waitunlockf = nil; g->m->waitlock = nil; // 返回0爲false,非0爲true // 0說明g->m->waitlock發生了變化,即不是在gopark是設置的(pdWait) // 說明了脫離了WAIT狀態,應該設置爲Grunnable,並執行g if(!ok) { runtime·casgstatus(gp, Gwaiting, Grunnable); execute(gp); // Schedule it back, never returns. } } // 這裏是調度當前m繼續執行其餘g // 而不是上面執行execute schedule(); }

netpollblockcommit函數,設置gpp爲pdWait,設置成功返回1,不然返回0。1爲true,0爲false:

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { return casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) }

到這裏當前goroutine對socket文件描述符的等待IO繼續的行爲已經完成。過程當中首先儘早嘗試判斷IO是否已經就緒,若是未就緒則掛起當前goroutine,掛起以後再次判斷IO是否就緒,若是還未就緒則調度當前M運行其餘G。若是是在調度goroutine以前IO已經就緒,則不會使當前goroutine進入調度隊列,會直接運行剛纔掛起的G。不然當前goroutine會進入調度隊列。

接下來是等待runtime將其喚醒。runtime在執行findrunnablequeuestarttheworldsysmon函數時,都會調用netpoll_epoll.go中的netpoll函數,尋找到IO就緒的socket文件描述符,並找到這些socket文件描述符對應的輪詢器中附帶的信息,根據這些信息將以前等待這些socket文件描述符就緒的goroutine狀態修改成Grunnable。在以上函數中,執行完netpoll以後,會找到一個就緒的goroutine列表,接下來將就緒的goroutine加入到調度隊列中,等待調度運行。

在netpoll_epoll.go中的netpoll函數中,epoll_wait函數返回N個發生事件的文件描述符對應的epollevent,接着對於每一個event使用其data屬性,將event.data轉換爲*pollDesc類型,再調用netpoll.go中的netpollready函數,將*pollDesc類型中的G數據類型去除,並附加到netpoll函數的調用者傳遞的G鏈表中:

// 將ev.data轉換爲*pollDesc類型 pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) // 調用netpollready將取出pd中保存的G,並添加到鏈表中 netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)

因此runtime在執行findrunnablequeuestarttheworldsysmon函數中會執行netpoll函數,並返回N個goroutine。這些goroutine期待的網絡事件已經發生,runtime會將這些goroutine放入到當前P的可運行隊列中,接下來調度它們並運行。

http://ju.outofmemory.cn/entry/168649

相關文章
相關標籤/搜索