go語言完善的基礎設施爲編寫網絡程序提供了極大的便利.只須要少許代碼就能夠編寫一個高性能,穩定的異步網絡程序.
本文介紹一個迷你的,基於事件回調的異步網絡庫.git
首先簡單介紹一下併發模型.github
go提供了基於goroutine的同步網絡接口,因此對每一個網絡鏈接能夠建立一個單獨的goroutine用於接收網絡數據.這個goroutine是執行一個死循環,不斷的recv數據,解包而後將完整的邏輯包發送到一個每鏈接惟一的chan中,供邏輯消費.網絡
除了網絡接收goroutine以外,每一個鏈接還有一個專門的處理邏輯消息的goroutine,它的工做就是不斷的從關聯的chan中提取邏輯包並處理.session
與以前的chuck-lua相似,爲了讓使用者能夠方便定製本身的包結構,我提供了packet和decoder的抽象.併發
packet.go異步
package packet const ( RAWPACKET = 1 RPACKET = 2 WPACKET = 3 EPACKET = 4 ) type Packet interface{ MakeWrite()(*Packet) MakeRead() (*Packet) Clone() (*Packet) PkLen() (uint32) DataLen() (uint32) Buffer() (*ByteBuffer) GetType() (byte) }
decoder.gosocket
package packet import( "net" "encoding/binary" "fmt" "io" ) var ( ErrPacketTooLarge = fmt.Errorf("Packet too Large") ErrEOF = fmt.Errorf("Eof") ) type Decoder interface{ DoRecv(Conn net.Conn)(Packet,error) } type RPacketDecoder struct{ maxpacket uint32 } func NewRPacketDecoder(maxpacket uint32)(RPacketDecoder){ return RPacketDecoder{maxpacket:maxpacket} } func (this RPacketDecoder)DoRecv(Conn net.Conn)(Packet,error){ header := make([]byte,4) n, err := io.ReadFull(Conn, header) if n == 0 && err == io.EOF { return nil,ErrEOF }else if err != nil { return nil,err } size := binary.LittleEndian.Uint32(header) if size > this.maxpacket { return nil,ErrPacketTooLarge } buf := make([]byte,size+4) copy(buf[:],header[:]) n, err = io.ReadFull(Conn,buf[4:]) if n == 0 && err == io.EOF { return nil,ErrEOF }else if err != nil { return nil,err } return NewRPacket(NewBufferByBytes(buf,(uint32)(len(buf)))),nil } type RawDecoder struct{ } func NewRawDecoder()(RawDecoder){ return RawDecoder{} } func (this RawDecoder)DoRecv(Conn net.Conn)(Packet,error){ buff := make([]byte,4096) n,err := Conn.Read(buff) if n == 0 && err == io.EOF { return nil,ErrEOF }else if err != nil { return nil,err } return NewRawPacket(NewBufferByBytes(buff,(uint32)(n))),nil }
內置了2種包的類型,分別是rawpacket,rpacket/wpacket.tcp
rawpacket其實就是原始二進制數據流,沒有區分邏輯界限.函數
rpacket/wpacket則提供了一種4字節包頭,的二進制流包結構.性能
eventpacket則做爲內部使用,目前用來向邏輯通告鏈接關閉,錯誤等事件.
下面就是整個網絡庫的核心部分tcpsession,它提供了對tcp鏈接的處理.
package tcpsession import( "net" packet "kendynet-go/packet" "fmt" ) var ( ErrUnPackError = fmt.Errorf("TcpSession: UnpackError") ErrSendClose = fmt.Errorf("send close") ErrSocketClose = fmt.Errorf("socket close") ) type Tcpsession struct{ Conn net.Conn Packet_que chan packet.Packet decoder packet.Decoder socket_close bool ud interface{} } func (this *Tcpsession) SetUd(ud interface{}){ this.ud = ud } func (this *Tcpsession) Ud()(interface{}){ return this.ud } func dorecv(session *Tcpsession){ for{ p,err := session.decoder.DoRecv(session.Conn) if session.socket_close{ break } if err != nil { session.Packet_que <- packet.NewEventPacket(err) break } session.Packet_que <- p } close(session.Packet_que) } func ProcessSession(tcpsession *Tcpsession,decoder packet.Decoder, process_packet func (*Tcpsession,packet.Packet,error))(error){ if tcpsession.socket_close{ return ErrSocketClose } tcpsession.decoder = decoder go dorecv(tcpsession) for{ msg,ok := <- tcpsession.Packet_que if !ok { //log error return nil } if packet.EPACKET == msg.GetType(){ process_packet(tcpsession,nil,msg.(packet.EventPacket).GetError()) }else{ process_packet(tcpsession,msg,nil) } if tcpsession.socket_close{ return nil } } } func NewTcpSession(conn net.Conn)(*Tcpsession){ session := new(Tcpsession) session.Conn = conn session.Packet_que = make(chan packet.Packet,1024) session.socket_close = false return session } func (this *Tcpsession)Send(wpk packet.Packet)(error){ if this.socket_close{ return ErrSocketClose } idx := (uint32)(0) for{ buff := wpk.Buffer().Bytes() end := wpk.PkLen() n,err := this.Conn.Write(buff[idx:end]) if err != nil || n < 0 { return ErrSendClose } idx += (uint32)(n) if idx >= (uint32)(end){ break } } return nil } func (this *Tcpsession)Close(){ if this.socket_close{ return } this.socket_close = true this.Conn.Close() }
代碼十分簡短只有一百行出頭點,這裏關鍵地方時,dorecv,Send和ProcessSession三個函數.
dorecv所作的就是不斷調用decoder.DoRecv從網絡中提取網絡包,而後將其寫入到Packet_que中.
Send函數則保證須要發送的數據被寫入到內湖緩衝或出錯纔會返回.
ProcessSession則是整個庫的核心所在,接收到新鏈接以後,用新建鏈接做爲參數調用ProcessSession,當網絡包到達或出錯時將會回調使用者提供的回調函數.從實現看它只是簡單的建立一個goroutine執行dorecv,而後在一個for循環中不斷讀取到達的網絡包而後調用回調函數.
下面是一個使用示例,更多的示例請參考:https://github.com/sniperHW/kendynet-go
package main import( "net" tcpsession "kendynet-go/tcpsession" packet "kendynet-go/packet" "fmt" ) func main(){ service := ":8010" tcpAddr,err := net.ResolveTCPAddr("tcp4", service) if err != nil{ fmt.Printf("ResolveTCPAddr") } listener, err := net.ListenTCP("tcp", tcpAddr) if err != nil{ fmt.Printf("ListenTCP") } for { conn, err := listener.Accept() if err != nil { continue } session := tcpsession.NewTcpSession(conn) fmt.Printf("a client comming\n") go tcpsession.ProcessSession(session,packet.NewRawDecoder(), func (session *tcpsession.Tcpsession,rpk packet.Packet,errno error){ if rpk == nil{ fmt.Printf("%s\n",errno) session.Close() return } session.Send(rpk) }) } }