Golang 目前的主要應用領域還是後臺微服務,雖然在業務領域也有所應用但仍然是比較小衆的選擇。大多數的服務運行環境都是linux,而在windows中golang應用更少,而做者因爲特殊情況,不得已要在widows環境中用golang去寫本地代理服務。在個人使用場景中實時性要求很是高(視頻通訊),對tcp數據處理要足夠快,否則會形成TCP 服務端的 Receive Buffer 溢出形成 Packet loss,影響實時性和數據的完整性。linux
做者閲讀了golang 在windows 環境下 tcp 部分syscall 的實現,最終確認它的底層模型是用了完成端口(異步IO模型)的。
可是由於做者本人比較喜歡折騰,因此用golang 底層的syscall 實現了一下tcp 完成端口服務。golang
如下為windows環境下用golang實現的 IoCompletion Port Server
windows
管理指定 Port
上全部 accepted socket
:api
type IoCompletionRootContext struct { socket windows.Handle socketAddr windows.SockaddrInet4 ioSet []*IoCompletionContext sync.Mutex } func (root *IoCompletionRootContext) NewIoContext() *IoCompletionContext { root.Lock() defer root.Unlock() res := &IoCompletionContext{ data: make([]byte, 65535), overlapped: windows.Overlapped{ Internal: 0, InternalHigh: 0, Offset: 0, OffsetHigh: 0, HEvent: 0, }, } res.wsaBuf.Buf = &res.data[0] res.wsaBuf.Len = uint32(65535) root.ioSet = append(root.ioSet, res) return res } func NewRootContext() *IoCompletionRootContext { return &IoCompletionRootContext{ ioSet: make([]*IoCompletionContext, 0), } }
accepted socket
的上下文:app
type IoCompletionContext struct { socket windows.Handle socketAddr windows.SockaddrInet4 wsaBuf windows.WSABuf data []byte opType OP_TYPE overlapped windows.Overlapped }
完成端口服務:socket
type IoCompletionServer struct { Addr string Port int recvFunc func(data []byte) error rootCtx *IoCompletionRootContext // 爲了防止內存移動,採用此種方式 accepts sync.Map hIOCompletionPort windows.Handle } func (ss *IoCompletionServer) saveIoRootCtx(id uint32, ctx *IoCompletionRootContext) { ss.accepts.Store(id, ctx) } func (ss *IoCompletionServer) loadIoRootCtx(id uint32) *IoCompletionRootContext { if id == uint32(ss.rootCtx.socket) { return ss.rootCtx } if v, isOk := ss.accepts.Load(id); isOk { if res, isOk := v.(*IoCompletionRootContext); isOk { return res } } return nil } func (ss *IoCompletionServer) remove(id uint32) { ss.accepts.Delete(id) } func (ss *IoCompletionServer) RegisterReceiveFunc(rfunc func([]byte) error) { ss.recvFunc = rfunc } func (ss *IoCompletionServer) Listen() { dwBytesTransfered := uint32(0) var ctxId uint32 var overlapped *windows.Overlapped for { err := windows.GetQueuedCompletionStatus(ss.hIOCompletionPort, &dwBytesTransfered, &ctxId, &overlapped, windows.INFINITE) if err != nil { fmt.Printf("syscall.GetQueuedCompletionStatus: %v\n", err) } if overlapped == nil { continue } // 經過位移取得ioCtx ioCtx := (*IoCompletionContext)(unsafe.Pointer(uintptr(unsafe.Pointer(overlapped)) - unsafe.Offsetof(IoCompletionContext{}.overlapped))) switch ioCtx.opType { case ACCEPT_POSTED: { ss.DoAcceptEx(ss.loadIoRootCtx(ctxId), ioCtx) } case RECV_POSTED: { ss.DoReceive(ss.loadIoRootCtx(ctxId), ioCtx) } case SEND_POSTED: case NULL_POSTED: default: } } } func (ss *IoCompletionServer) DoAcceptEx(rootCtx *IoCompletionRootContext, ioCtx *IoCompletionContext) (err error) { nFdCtx := NewRootContext() nFdCtx.socket = ioCtx.socket addrSize := uint32(unsafe.Sizeof(windows.RawSockaddrAny{})) var localAddr, remoteAddr *windows.RawSockaddrAny lrsalen := int32(addrSize) rrsalen := int32(addrSize) // 與windows C++ 不一樣,此處函數無需去函數指針便可使用 windows.GetAcceptExSockaddrs(ioCtx.wsaBuf.Buf, ioCtx.wsaBuf.Len-(addrSize+16)*2, addrSize+16, addrSize+16, &localAddr, &lrsalen, &remoteAddr, &rrsalen) if ss.recvFunc != nil { ss.recvFunc(ioCtx.data[:ioCtx.overlapped.InternalHigh]) } // 繼承listen socket的屬性 err = windows.Setsockopt(nFdCtx.socket, windows.SOL_SOCKET, windows.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&ss.rootCtx.socket)), int32(unsafe.Sizeof(ss.rootCtx.socket))) if err != nil { return errors.Wrap(err, "syscall.AcceptEx") } err = windows.SetsockoptInt(nFdCtx.socket, windows.SOL_SOCKET, windows.SO_RCVBUF, 65535) if err != nil { return errors.Wrap(err, "windows.SetsockoptInt") } // 綁定到完成端口, 此步驟很關鍵 handle, err := windows.CreateIoCompletionPort(nFdCtx.socket, ss.hIOCompletionPort, uint32(nFdCtx.socket), 0) if err != nil { return errors.Wrap(err, "syscall.CreateIoCompletionPort") } else { fmt.Println(handle, rootCtx.socket) } // 投遞接收請求, 此處能夠自行修改 for i := 0; i < 16; i++ { nFdIoCtx := nFdCtx.NewIoContext() nFdIoCtx.socket = nFdCtx.socket if err = ss.Receive(nFdIoCtx); err != nil { return err } } //投遞接收連接請求 if err = ss.AcceptEx(ioCtx); err != nil { return err } // 保存到context中 ss.saveIoRootCtx(uint32(nFdCtx.socket), nFdCtx) return nil } func (ss *IoCompletionServer) AcceptEx(ctx *IoCompletionContext) (err error) { ctx.socket = windows.Handle(C.mWSASocket()) dwBytes := uint32(0) addrSize := uint32(unsafe.Sizeof(windows.RawSockaddrAny{})) ctx.opType = ACCEPT_POSTED //err = syscall.AcceptEx(ss.rootCtx.socket, ctx.socket, ctx.wsaBuf.Buf, // ctx.wsaBuf.Len-2*(addrSize+16), addrSize+16, // addrSize+16, &dwBytes, &ctx.overlapped) //windows.WSAIoctl(ss.rootCtx.socket, windows.SIO_GET_EXTENSION_FUNCTION_POINTER) err = windows.AcceptEx(ss.rootCtx.socket, ctx.socket, ctx.wsaBuf.Buf, ctx.wsaBuf.Len-2*(addrSize+16), addrSize+16, addrSize+16, &dwBytes, &ctx.overlapped) if err != nil { if err == windows.Errno(997) { // ERROR_IO_PENDING 表示還沒有接收到鏈接 err = nil } else { err = errors.Wrap(err, "syscall.AcceptEx") } } return err } func (ss *IoCompletionServer) DoReceive(rootCtx *IoCompletionRootContext, ctx *IoCompletionContext) { if ctx.overlapped.InternalHigh == 0 { if rootCtx != nil { ss.remove(uint32(rootCtx.socket)) C.mClose(C.int(rootCtx.socket)) } return } if ss.recvFunc != nil { ss.recvFunc(ctx.data[:ctx.overlapped.InternalHigh]) } ss.Receive(ctx) } func (ss *IoCompletionServer) Receive(ioCtx *IoCompletionContext) error { recv := uint32(0) flags := uint32(0) ioCtx.opType = RECV_POSTED err := windows.WSARecv(ioCtx.socket, &ioCtx.wsaBuf, 1, &recv, &flags, &ioCtx.overlapped, nil) if err != nil { if err == windows.Errno(997) { // ERROR_IO_PENDING 表示還沒有接收到數據 err = nil } else { err = errors.Wrap(err, "syscall.AcceptEx") } } return err } func setDefaultSockOpt(handle windows.Handle) error { err := windows.SetsockoptInt(handle, windows.SOL_SOCKET, windows.SO_REUSEADDR, 1) if err != nil { return errors.Wrap(err, "syscall.SetsockoptInt") } //err = windows.SetsockoptInt(handle, windows.SOL_SOCKET, windows.SO, 1) //if err != nil { // return errors.Wrap(err, "syscall.SetsockoptInt") //} return nil } func (ss *IoCompletionServer) Start() error { fmt.Println(windows.WSAStartup(2, &windows.WSAData{})) // 初始創建一個用於綁定的 listen socket 的 IoCompletion 句柄 hIOCompletionPort, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0) if err != nil { return errors.Wrap(err, "syscall.CreateIoCompletionPort") } ss.hIOCompletionPort = hIOCompletionPort rootCtx := NewRootContext() rootCtx.socket = windows.Handle(C.mWSASocket()) setDefaultSockOpt(rootCtx.socket) ss.rootCtx = rootCtx handle, err := windows.CreateIoCompletionPort(rootCtx.socket, hIOCompletionPort, uint32(ss.rootCtx.socket), 0) if err != nil { return errors.Wrap(err, "syscall.CreateIoCompletionPort") } else { fmt.Println(handle, rootCtx.socket) } sockAddr := windows.SockaddrInet4{} sockAddr.Port = ss.Port if err := windows.Bind(rootCtx.socket, &sockAddr); err != nil { return errors.Wrap(err, "syscall.Bind") } if err := windows.Listen(rootCtx.socket, MAX_POST_ACCEPT); err != nil { return errors.Wrap(err, "windows.Listen") } ss.rootCtx = rootCtx if err := ss.AcceptEx(rootCtx.NewIoContext()); err != nil { return err } return nil }
完成端口服務使用示例:tcp
ss = &StreamServer{ Addr: "127.0.0.1:10050", Port: 10050, accepts: sync.Map{}, } ss.RegisterReceiveFunc(func(data []byte) error { fmt.Println("receive data len:", string(data)) return nil }) // 能夠啓動多個攜程來接收請求,可是須要特別注意的是 // 多攜程可能會導致接受數據包時序發生亂序 ss.Listen()
以上代碼經過實際測試檢驗,能夠正常使用,還沒有與標準庫進行 效率\性能
對比,沒有實現 send
功能,此處須要提醒的是,使用 IoCompletion Port
發送數據要注意時序的把握。性能
IoCompletion Port
是windows 系統中十分優秀的IO模型, 深刻瞭解其工做機制及原理, 也有助於我們對操做系統 IO 數據處理的機制有更清晰的認知。ui