一個TCP長鏈接設備管理後臺工程(六)

前篇

一個TCP長鏈接設備管理後臺工程(一)
一個TCP長鏈接設備管理後臺工程(二)
一個TCP長鏈接設備管理後臺工程(三)
一個TCP長鏈接設備管理後臺工程(四)
一個TCP長鏈接設備管理後臺工程(五)git

Github倉庫地址github

封包器

上面介紹了過濾器,過濾器實際就是一個可以處理粘包和拆包的解析器,和封包器的做用正好相反。可是封包器會很簡單,由於封包沒有粘包和拆包的處理。數據庫

代碼以下:segmentfault

//Packer is proto Packer api
func Packer(msg Message) []byte {
    data := make([]byte, 0)
    tempbytes := codec.Word2Bytes(msg.HEADER.MID)
    data = append(data, tempbytes...)
    datalen := uint16(len(msg.BODY)) & 0x03FF
    datalen = datalen | 0x4000

    tempbytes = utils.Word2Bytes(datalen)
    data = append(data, tempbytes...)

    data = append(data, msg.HEADER.Version)

    if len(msg.HEADER.PhoneNum) < 10 {
        data = append(data, make([]byte, 10-len(msg.HEADER.PhoneNum))...)
        data = append(data, msg.HEADER.PhoneNum...)
    } else {
        data = append(data, msg.HEADER.PhoneNum[:10]...)
    }

    tempbytes = utils.Word2Bytes(msg.HEADER.SeqNum)
    data = append(data, tempbytes...)

    if msg.HEADER.IsMulti() {
        data = append(data, utils.Word2Bytes(msg.HEADER.MutilFlag.MsgSum)...)
        data = append(data, utils.Word2Bytes(msg.HEADER.MutilFlag.MsgIndex)...)
    }

    data = append(data, msg.BODY...)

    csdata := byte(checkSum(data[:]))
    data = append(data, csdata)

    //添加頭尾
    var tmpdata []byte = []byte{0x7e}

    for _, item := range data {
        if item == 0x7d {
            tmpdata = append(tmpdata, 0x7d, 0x01)
        } else if item == 0x7e {
            tmpdata = append(tmpdata, 0x7d, 0x02)
        } else {
            tmpdata = append(tmpdata, item)
        }
    }
    tmpdata = append(tmpdata, 0x7e)

    return tmpdata
}

處理器

處理器用來處理接收到的有效TCP數據包,它應該是比過濾器更上層的一個模塊。由於咱們是用來管理TCP鏈接的,一個tcp鏈接表明着一個終端設備,這個終端設備有各類屬性和操做邏輯,這些東西都是依附於TCP的長鏈接。咱們單獨定義一個包來組織這部份內容:後端

package term

而咱們的處理器就存在於這個包中。因爲這個模塊是tcp數據的實際處理模塊,因此會牽扯到許多相關連的包,好比前面的codec、proto等,還有數據庫的操做。api

這一部分咱們主要只介紹處理器的邏輯。前面咱們說了,咱們要處理的包有:app

  • 平臺通用應答
  • 終端通用應答
  • 終端註冊
  • 終端註冊應答
  • 終端鑑權
  • 心跳
  • 位置上報處理

經過proto的filter咱們獲得了各個Message,而且獲取了其中的幀頭信息,BODY部分尚未處理。而咱們的codec正是用來處理BODY部分的編/解碼器。tcp

因此處理器的基本流程就是根據Message中Header信息,分別處理其Body數據,而後返回處理的結果。這個處理的結果每每就是須要響應的數據流。因此咱們的處理器函數的樣子大概就是這樣的:函數

func (t *Terminal) Handler(msg proto.Message) []byte{
    
}

傳入一個Message,入後輸出須要響應的數據,若是返回nil則代表沒有數據須要響應。ui

其中Terminal這個結構體咱們在後端模型這個裝接中有說起到:

type Terminal struct {
    authkey   string
    imei      string
    iccid     string
    vin       string
    tboxver   string
    loginTime time.Time
    seqNum    uint16
    phoneNum  []byte
    Conn      net.Conn
    Engine    *xorm.Engine
    Ch        chan int
}

同時爲了使用codec的序列化和反序列化,咱們還須要定義以下結構體:

type TermAckBody struct {
    AckSeqNum uint16
    AckID     uint16
    AckResult uint8
}

type PlatAckBody struct {
    AckSeqNum uint16
    AckID     uint16
    AckResult uint8
}

type RegisterBody struct {
    ProID         uint16
    CityID        uint16
    ManufID       []byte `len:"11"`
    TermType      []byte `len:"30"`
    TermID        []byte `len:"30"`
    LicPlateColor uint8
    LicPlate      string
}

type RegisterAckBody struct {
    AckSeqNum uint16
    AckResult uint8
    AuthKey   string
}

type AuthBody struct {
    AuthKeyLen uint8
    AuthKey    string
    Imei       []byte `len:"15"`
    Version    []byte `len:"20"`
}

type GPSInfoBody struct {
    WarnFlag uint32
    State    uint32
    Lat      uint32
    Lng      uint32
    Alt      uint16
    Speed    uint16
    Dir      uint16
    Time     []byte `len:"6"`
}

type CtrlBody struct {
    Cmd   uint8
    Param string
}

下面就來正式講解Handler的實現。

首先獲取保存Header中的電話號和流水號到Terminal中:

if t.phoneNum == nil {
    t.phoneNum = make([]byte, 10)
}

copy(t.phoneNum, []byte(msg.HEADER.PhoneNum))
t.seqNum = msg.HEADER.SeqNum

而後經過switch來匹配消息id,並對其body部分作相關處理:

switch msg.HEADER.MID {
    case proto.TermAck:
    //
    case proto.Register:
    //
    case proto.Login:
    //
    case proto.Heartbeat:
    //
    case proto.Gpsinfo:
    //
}
return nil

咱們先說註冊,咱們使用幀頭中的手機號,在數據庫中查找對應的鑑權碼。而後從msg中獲取body部分,經過codec反序列話獲得RegisterBody實例。爲了簡單,咱們此處不作其餘數據驗證,直接作出數據響應便可。生成須要響應的RegisterAckBody實例,而後序列化爲body切片,而後生成響應的Message,再經過封包器封包爲數據流返回:

devinfo := new(DevInfo)

devinfo.PhoneNum = strings.TrimLeft(utils.HexBuffToString(t.phoneNum), "0")

is, _ := t.Engine.Get(devinfo)
if !is {
    return []byte{}
}

var reg RegisterBody
_, err := codec.Unmarshal(msg.BODY, &reg)
if err != nil {
    fmt.Println("err:", err)
}

var body []byte
body, err = codec.Marshal(&RegisterAckBody{
    AckSeqNum: msg.HEADER.SeqNum,
    AckResult: 0,
    AuthKey:   devinfo.Authkey,
})
if err != nil {
    fmt.Println("err:", err)
}

msgAck := proto.Message{
    HEADER: proto.Header{
        MID:      proto.RegisterAck,
        Attr:     proto.MakeAttr(1, false, 0, uint16(len(body))),
        Version:  1,
        PhoneNum: string(t.phoneNum),
        SeqNum:   t.seqNum,
    },
    BODY: body,
}
return proto.Packer(msgAck)

上面有涉及到數據庫的查詢操做,這部分使用了xorm,具體的參考xorm官方文檔:xorm官方文檔

上面涉及一個utils.HexBuffToString函數,這個函數會將字符串轉換爲16進制格式的字符串,自己是基於strconv.FormatUint(uint64(value), 16)完成的,可是這個函數會沒有辦法指定轉換後的填充值,好比0x0A會直接轉換成"A"而不是"0A",因此須要作一點特殊處理:

func HexBuffToString(hex []byte) string {
    var ret string
    for _, value := range hex {
        str := strconv.FormatUint(uint64(value), 16)
        if len([]rune(str)) == 1 {
            ret = ret + "0" + str
        } else {
            ret = ret + str
        }
    }
    return ret
}

Handler其餘部分的流程大致差很少,就不作過多講解了,完整代碼:

//Handler is proto Handler api
func (t *Terminal) Handler(msg proto.Message) []byte {
    if t.phoneNum == nil {
        t.phoneNum = make([]byte, 10)
    }

    copy(t.phoneNum, []byte(msg.HEADER.PhoneNum))
    t.seqNum = msg.HEADER.SeqNum

    switch msg.HEADER.MID {
    case proto.TermAck:
        reqID := codec.Bytes2Word(msg.BODY[2:4])
        if reqID == proto.UpdateReq {
            //ch <- 1
            //升級命令
        }
    case proto.Register:
        devinfo := new(DevInfo)

        devinfo.PhoneNum = strings.TrimLeft(utils.HexBuffToString(t.phoneNum), "0")

        is, _ := t.Engine.Get(devinfo)
        if !is {
            return []byte{}
        }

        var reg RegisterBody
        _, err := codec.Unmarshal(msg.BODY, &reg)
        if err != nil {
            fmt.Println("err:", err)
        }

        var body []byte
        body, err = codec.Marshal(&RegisterAckBody{
            AckSeqNum: msg.HEADER.SeqNum,
            AckResult: 0,
            AuthKey:   devinfo.Authkey,
        })
        if err != nil {
            fmt.Println("err:", err)
        }

        msgAck := proto.Message{
            HEADER: proto.Header{
                MID:      proto.RegisterAck,
                Attr:     proto.MakeAttr(1, false, 0, uint16(len(body))),
                Version:  1,
                PhoneNum: string(t.phoneNum),
                SeqNum:   t.seqNum,
            },
            BODY: body,
        }
        return proto.Packer(msgAck)
    case proto.Login:
        var auth AuthBody
        _, err := codec.Unmarshal(msg.BODY, &auth)
        if err != nil {
            fmt.Println("err:", err)
        }
        t.authkey = auth.AuthKey
        t.imei = string(auth.Imei)
        t.tboxver = string(auth.Version)

        var body []byte
        body, err = codec.Marshal(&PlatAckBody{
            AckSeqNum: msg.HEADER.SeqNum,
            AckID:     msg.HEADER.MID,
            AckResult: 0,
        })
        if err != nil {
            fmt.Println("err:", err)
        }

        msgAck := proto.Message{
            HEADER: proto.Header{
                MID:      proto.PlatAck,
                Attr:     proto.MakeAttr(1, false, 0, uint16(len(body))),
                Version:  1,
                PhoneNum: string(t.phoneNum),
                SeqNum:   t.seqNum,
            },
            BODY: body,
        }
        return proto.Packer(msgAck)
    case proto.Heartbeat:
        var err error
        var body []byte
        body, err = codec.Marshal(&PlatAckBody{
            AckSeqNum: msg.HEADER.SeqNum,
            AckID:     msg.HEADER.MID,
            AckResult: 0,
        })
        if err != nil {
            fmt.Println("err:", err)
        }

        msgAck := proto.Message{
            HEADER: proto.Header{
                MID:      proto.PlatAck,
                Attr:     proto.MakeAttr(1, false, 0, uint16(len(body))),
                Version:  1,
                PhoneNum: string(t.phoneNum),
                SeqNum:   t.seqNum,
            },
            BODY: body,
        }
        return proto.Packer(msgAck)
    case proto.Gpsinfo:
        var gpsInfo GPSInfoBody
        _, err := codec.Unmarshal(msg.BODY, &gpsInfo)
        if err != nil {
            fmt.Println("err:", err)
        }

        gpsdata := new(GPSData)
        gpsdata.Imei = t.imei
        gpsdata.Stamp = time.Now()
        gpsdata.WarnFlag = gpsInfo.WarnFlag
        gpsdata.State = gpsInfo.State
        gpsdata.Latitude = gpsInfo.Lat
        gpsdata.Longitude = gpsInfo.Lng

        gpsdata.Altitude = gpsInfo.Alt
        gpsdata.Speed = gpsInfo.Speed
        gpsdata.Direction = gpsInfo.Dir

        if (gpsdata.State & 0x00000001) > 0 {
            gpsdata.AccState = 1
        } else {
            gpsdata.AccState = 0
        }

        if (gpsdata.State & 0x00000002) > 0 {
            gpsdata.GpsState = 1
        } else {
            gpsdata.GpsState = 0
        }

        _, err = t.Engine.Insert(gpsdata)
        if err != nil {
            fmt.Println("insert gps err:", err)
        }

        var body []byte
        body, err = codec.Marshal(&PlatAckBody{
            AckSeqNum: msg.HEADER.SeqNum,
            AckID:     msg.HEADER.MID,
            AckResult: 0,
        })
        if err != nil {
            fmt.Println("err:", err)
        }

        msgAck := proto.Message{
            HEADER: proto.Header{
                MID:      proto.PlatAck,
                Attr:     proto.MakeAttr(1, false, 0, uint16(len(body))),
                Version:  1,
                PhoneNum: string(t.phoneNum),
                SeqNum:   t.seqNum,
            },
            BODY: body,
        }
        return proto.Packer(msgAck)
    }

    return nil
}
相關文章
相關標籤/搜索