一個go的迷你網絡庫

go語言完善的基礎設施爲編寫網絡程序提供了極大的便利.只須要少許代碼就能夠編寫一個高性能,穩定的異步網絡程序.
本文介紹一個迷你的,基於事件回調的異步網絡庫.git

首先簡單介紹一下併發模型.github

go提供了基於goroutine的同步網絡接口,因此對每一個網絡鏈接能夠建立一個單獨的goroutine用於接收網絡數據.這個goroutine是執行一個死循環,不斷的recv數據,解包而後將完整的邏輯包發送到一個每鏈接惟一的chan中,供邏輯消費.網絡

除了網絡接收goroutine以外,每一個鏈接還有一個專門的處理邏輯消息的goroutine,它的工做就是不斷的從關聯的chan中提取邏輯包並處理.session

與以前的chuck-lua相似,爲了讓使用者能夠方便定製本身的包結構,我提供了packet和decoder的抽象.併發

packet.go異步

package packet

const (
    RAWPACKET = 1
    RPACKET   = 2
    WPACKET   = 3
    EPACKET   = 4
)

type Packet interface{
    MakeWrite()(*Packet)
    MakeRead() (*Packet)
    Clone()    (*Packet)
    PkLen()    (uint32)
    DataLen()  (uint32)
    Buffer()   (*ByteBuffer)
    GetType()  (byte)
}

decoder.gosocket

package packet

import(
    "net"
    "encoding/binary"
    "fmt"
    "io"
)

var (
    ErrPacketTooLarge     = fmt.Errorf("Packet too Large")
    ErrEOF                = fmt.Errorf("Eof")
)

type Decoder interface{
    DoRecv(Conn net.Conn)(Packet,error)
}

type RPacketDecoder struct{
    maxpacket uint32
}

func NewRPacketDecoder(maxpacket uint32)(RPacketDecoder){
    return RPacketDecoder{maxpacket:maxpacket}
}

func (this RPacketDecoder)DoRecv(Conn net.Conn)(Packet,error){
    header := make([]byte,4)
    n, err := io.ReadFull(Conn, header)
    if n == 0 && err == io.EOF {
        return nil,ErrEOF
    }else if err != nil {
        return nil,err
    }
    size := binary.LittleEndian.Uint32(header)
    if size > this.maxpacket {
        return nil,ErrPacketTooLarge
    }
    buf := make([]byte,size+4)
    copy(buf[:],header[:])
    n, err = io.ReadFull(Conn,buf[4:])
    if n == 0 && err == io.EOF {
        return nil,ErrEOF
    }else if err != nil {
        return nil,err
    }
    return NewRPacket(NewBufferByBytes(buf,(uint32)(len(buf)))),nil
}

type RawDecoder struct{
}

func NewRawDecoder()(RawDecoder){
    return RawDecoder{}
}

func (this RawDecoder)DoRecv(Conn net.Conn)(Packet,error){
    buff  := make([]byte,4096)
    n,err := Conn.Read(buff)
    if n == 0 && err == io.EOF {
        return nil,ErrEOF
    }else if err != nil {
        return nil,err
    }
    return NewRawPacket(NewBufferByBytes(buff,(uint32)(n))),nil     
}

內置了2種包的類型,分別是rawpacket,rpacket/wpacket.tcp

rawpacket其實就是原始二進制數據流,沒有區分邏輯界限.函數

rpacket/wpacket則提供了一種4字節包頭,的二進制流包結構.性能

eventpacket則做爲內部使用,目前用來向邏輯通告鏈接關閉,錯誤等事件.

下面就是整個網絡庫的核心部分tcpsession,它提供了對tcp鏈接的處理.

package tcpsession

import(
       "net"
       packet "kendynet-go/packet"
       "fmt"
   )

var (
    ErrUnPackError     = fmt.Errorf("TcpSession: UnpackError")
    ErrSendClose       = fmt.Errorf("send close")
    ErrSocketClose     = fmt.Errorf("socket close")
)


type Tcpsession struct{
    Conn         net.Conn
    Packet_que   chan packet.Packet
    decoder      packet.Decoder
    socket_close bool
    ud           interface{}
}

func (this *Tcpsession) SetUd(ud interface{}){
    this.ud = ud
}

func (this *Tcpsession) Ud()(interface{}){
    return this.ud
}

func dorecv(session *Tcpsession){
    for{
        p,err := session.decoder.DoRecv(session.Conn)
        if session.socket_close{
            break
        }
        if err != nil {
            session.Packet_que <- packet.NewEventPacket(err)
            break
        }
        session.Packet_que <- p 
    }
    close(session.Packet_que)
}


func ProcessSession(tcpsession *Tcpsession,decoder packet.Decoder,
                    process_packet func (*Tcpsession,packet.Packet,error))(error){
    if tcpsession.socket_close{
        return ErrSocketClose
    }
    tcpsession.decoder = decoder
    go dorecv(tcpsession)
    for{
        msg,ok := <- tcpsession.Packet_que
        if !ok {
            //log error
            return nil
        }
        if packet.EPACKET == msg.GetType(){
            process_packet(tcpsession,nil,msg.(packet.EventPacket).GetError())
        }else{
            process_packet(tcpsession,msg,nil)
        }
        if tcpsession.socket_close{
            return nil
        }
    }
}

func NewTcpSession(conn net.Conn)(*Tcpsession){
    session := new(Tcpsession)
    session.Conn = conn
    session.Packet_que   = make(chan packet.Packet,1024)
    session.socket_close = false
    return session
}

func (this *Tcpsession)Send(wpk packet.Packet)(error){
    if this.socket_close{
        return ErrSocketClose
    }
    idx := (uint32)(0)
    for{
        buff  := wpk.Buffer().Bytes()
        end   := wpk.PkLen()
        n,err := this.Conn.Write(buff[idx:end])
        if err != nil || n < 0 {
            return ErrSendClose
        }
        idx += (uint32)(n)
        if idx >= (uint32)(end){
            break
        }
    }
    return nil
}

func (this *Tcpsession)Close(){
    if this.socket_close{
        return
    }
    this.socket_close = true
    this.Conn.Close()
}

代碼十分簡短只有一百行出頭點,這裏關鍵地方時,dorecv,Send和ProcessSession三個函數.

dorecv所作的就是不斷調用decoder.DoRecv從網絡中提取網絡包,而後將其寫入到Packet_que中.

Send函數則保證須要發送的數據被寫入到內湖緩衝或出錯纔會返回.

ProcessSession則是整個庫的核心所在,接收到新鏈接以後,用新建鏈接做爲參數調用ProcessSession,當網絡包到達或出錯時將會回調使用者提供的回調函數.從實現看它只是簡單的建立一個goroutine執行dorecv,而後在一個for循環中不斷讀取到達的網絡包而後調用回調函數.

下面是一個使用示例,更多的示例請參考:https://github.com/sniperHW/kendynet-go

package main

import(
    "net"
    tcpsession "kendynet-go/tcpsession"
    packet "kendynet-go/packet"
    "fmt"
)

func main(){
    service := ":8010"
    tcpAddr,err := net.ResolveTCPAddr("tcp4", service)
    if err != nil{
        fmt.Printf("ResolveTCPAddr")
    }
    listener, err := net.ListenTCP("tcp", tcpAddr)
    if err != nil{
        fmt.Printf("ListenTCP")
    }
    for {
        conn, err := listener.Accept()
        if err != nil {
            continue
        }
        session := tcpsession.NewTcpSession(conn)
        fmt.Printf("a client comming\n")
        go tcpsession.ProcessSession(session,packet.NewRawDecoder(),
           func (session *tcpsession.Tcpsession,rpk packet.Packet,errno error){ 
            if rpk == nil{
                fmt.Printf("%s\n",errno)
                session.Close()
                return
            }
            session.Send(rpk)
           })
    }
}
相關文章
相關標籤/搜索