一個TCP長鏈接設備管理後臺工程(一)
一個TCP長鏈接設備管理後臺工程(二)
一個TCP長鏈接設備管理後臺工程(三)
一個TCP長鏈接設備管理後臺工程(四)
一個TCP長鏈接設備管理後臺工程(五)git
Github倉庫地址github
上面介紹了過濾器,過濾器實際就是一個可以處理粘包和拆包的解析器,和封包器的做用正好相反。可是封包器會很簡單,由於封包沒有粘包和拆包的處理。數據庫
代碼以下:segmentfault
//Packer is proto Packer api func Packer(msg Message) []byte { data := make([]byte, 0) tempbytes := codec.Word2Bytes(msg.HEADER.MID) data = append(data, tempbytes...) datalen := uint16(len(msg.BODY)) & 0x03FF datalen = datalen | 0x4000 tempbytes = utils.Word2Bytes(datalen) data = append(data, tempbytes...) data = append(data, msg.HEADER.Version) if len(msg.HEADER.PhoneNum) < 10 { data = append(data, make([]byte, 10-len(msg.HEADER.PhoneNum))...) data = append(data, msg.HEADER.PhoneNum...) } else { data = append(data, msg.HEADER.PhoneNum[:10]...) } tempbytes = utils.Word2Bytes(msg.HEADER.SeqNum) data = append(data, tempbytes...) if msg.HEADER.IsMulti() { data = append(data, utils.Word2Bytes(msg.HEADER.MutilFlag.MsgSum)...) data = append(data, utils.Word2Bytes(msg.HEADER.MutilFlag.MsgIndex)...) } data = append(data, msg.BODY...) csdata := byte(checkSum(data[:])) data = append(data, csdata) //添加頭尾 var tmpdata []byte = []byte{0x7e} for _, item := range data { if item == 0x7d { tmpdata = append(tmpdata, 0x7d, 0x01) } else if item == 0x7e { tmpdata = append(tmpdata, 0x7d, 0x02) } else { tmpdata = append(tmpdata, item) } } tmpdata = append(tmpdata, 0x7e) return tmpdata }
處理器用來處理接收到的有效TCP數據包,它應該是比過濾器更上層的一個模塊。由於咱們是用來管理TCP鏈接的,一個tcp鏈接表明着一個終端設備,這個終端設備有各類屬性和操做邏輯,這些東西都是依附於TCP的長鏈接。咱們單獨定義一個包來組織這部份內容:後端
package term
而咱們的處理器就存在於這個包中。因爲這個模塊是tcp數據的實際處理模塊,因此會牽扯到許多相關連的包,好比前面的codec、proto等,還有數據庫的操做。api
這一部分咱們主要只介紹處理器的邏輯。前面咱們說了,咱們要處理的包有:app
經過proto的filter咱們獲得了各個Message,而且獲取了其中的幀頭信息,BODY部分尚未處理。而咱們的codec正是用來處理BODY部分的編/解碼器。tcp
因此處理器的基本流程就是根據Message中Header信息,分別處理其Body數據,而後返回處理的結果。這個處理的結果每每就是須要響應的數據流。因此咱們的處理器函數的樣子大概就是這樣的:函數
func (t *Terminal) Handler(msg proto.Message) []byte{ }
傳入一個Message,入後輸出須要響應的數據,若是返回nil則代表沒有數據須要響應。ui
其中Terminal這個結構體咱們在後端模型這個裝接中有說起到:
type Terminal struct { authkey string imei string iccid string vin string tboxver string loginTime time.Time seqNum uint16 phoneNum []byte Conn net.Conn Engine *xorm.Engine Ch chan int }
同時爲了使用codec的序列化和反序列化,咱們還須要定義以下結構體:
type TermAckBody struct { AckSeqNum uint16 AckID uint16 AckResult uint8 } type PlatAckBody struct { AckSeqNum uint16 AckID uint16 AckResult uint8 } type RegisterBody struct { ProID uint16 CityID uint16 ManufID []byte `len:"11"` TermType []byte `len:"30"` TermID []byte `len:"30"` LicPlateColor uint8 LicPlate string } type RegisterAckBody struct { AckSeqNum uint16 AckResult uint8 AuthKey string } type AuthBody struct { AuthKeyLen uint8 AuthKey string Imei []byte `len:"15"` Version []byte `len:"20"` } type GPSInfoBody struct { WarnFlag uint32 State uint32 Lat uint32 Lng uint32 Alt uint16 Speed uint16 Dir uint16 Time []byte `len:"6"` } type CtrlBody struct { Cmd uint8 Param string }
下面就來正式講解Handler的實現。
首先獲取保存Header中的電話號和流水號到Terminal中:
if t.phoneNum == nil { t.phoneNum = make([]byte, 10) } copy(t.phoneNum, []byte(msg.HEADER.PhoneNum)) t.seqNum = msg.HEADER.SeqNum
而後經過switch來匹配消息id,並對其body部分作相關處理:
switch msg.HEADER.MID { case proto.TermAck: // case proto.Register: // case proto.Login: // case proto.Heartbeat: // case proto.Gpsinfo: // } return nil
咱們先說註冊,咱們使用幀頭中的手機號,在數據庫中查找對應的鑑權碼。而後從msg中獲取body部分,經過codec反序列話獲得RegisterBody實例。爲了簡單,咱們此處不作其餘數據驗證,直接作出數據響應便可。生成須要響應的RegisterAckBody實例,而後序列化爲body切片,而後生成響應的Message,再經過封包器封包爲數據流返回:
devinfo := new(DevInfo) devinfo.PhoneNum = strings.TrimLeft(utils.HexBuffToString(t.phoneNum), "0") is, _ := t.Engine.Get(devinfo) if !is { return []byte{} } var reg RegisterBody _, err := codec.Unmarshal(msg.BODY, ®) if err != nil { fmt.Println("err:", err) } var body []byte body, err = codec.Marshal(&RegisterAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckResult: 0, AuthKey: devinfo.Authkey, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.RegisterAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck)
上面有涉及到數據庫的查詢操做,這部分使用了xorm,具體的參考xorm官方文檔:xorm官方文檔
上面涉及一個utils.HexBuffToString函數,這個函數會將字符串轉換爲16進制格式的字符串,自己是基於strconv.FormatUint(uint64(value), 16)完成的,可是這個函數會沒有辦法指定轉換後的填充值,好比0x0A會直接轉換成"A"而不是"0A",因此須要作一點特殊處理:
func HexBuffToString(hex []byte) string { var ret string for _, value := range hex { str := strconv.FormatUint(uint64(value), 16) if len([]rune(str)) == 1 { ret = ret + "0" + str } else { ret = ret + str } } return ret }
Handler其餘部分的流程大致差很少,就不作過多講解了,完整代碼:
//Handler is proto Handler api func (t *Terminal) Handler(msg proto.Message) []byte { if t.phoneNum == nil { t.phoneNum = make([]byte, 10) } copy(t.phoneNum, []byte(msg.HEADER.PhoneNum)) t.seqNum = msg.HEADER.SeqNum switch msg.HEADER.MID { case proto.TermAck: reqID := codec.Bytes2Word(msg.BODY[2:4]) if reqID == proto.UpdateReq { //ch <- 1 //升級命令 } case proto.Register: devinfo := new(DevInfo) devinfo.PhoneNum = strings.TrimLeft(utils.HexBuffToString(t.phoneNum), "0") is, _ := t.Engine.Get(devinfo) if !is { return []byte{} } var reg RegisterBody _, err := codec.Unmarshal(msg.BODY, ®) if err != nil { fmt.Println("err:", err) } var body []byte body, err = codec.Marshal(&RegisterAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckResult: 0, AuthKey: devinfo.Authkey, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.RegisterAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck) case proto.Login: var auth AuthBody _, err := codec.Unmarshal(msg.BODY, &auth) if err != nil { fmt.Println("err:", err) } t.authkey = auth.AuthKey t.imei = string(auth.Imei) t.tboxver = string(auth.Version) var body []byte body, err = codec.Marshal(&PlatAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckID: msg.HEADER.MID, AckResult: 0, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.PlatAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck) case proto.Heartbeat: var err error var body []byte body, err = codec.Marshal(&PlatAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckID: msg.HEADER.MID, AckResult: 0, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.PlatAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck) case proto.Gpsinfo: var gpsInfo GPSInfoBody _, err := codec.Unmarshal(msg.BODY, &gpsInfo) if err != nil { fmt.Println("err:", err) } gpsdata := new(GPSData) gpsdata.Imei = t.imei gpsdata.Stamp = time.Now() gpsdata.WarnFlag = gpsInfo.WarnFlag gpsdata.State = gpsInfo.State gpsdata.Latitude = gpsInfo.Lat gpsdata.Longitude = gpsInfo.Lng gpsdata.Altitude = gpsInfo.Alt gpsdata.Speed = gpsInfo.Speed gpsdata.Direction = gpsInfo.Dir if (gpsdata.State & 0x00000001) > 0 { gpsdata.AccState = 1 } else { gpsdata.AccState = 0 } if (gpsdata.State & 0x00000002) > 0 { gpsdata.GpsState = 1 } else { gpsdata.GpsState = 0 } _, err = t.Engine.Insert(gpsdata) if err != nil { fmt.Println("insert gps err:", err) } var body []byte body, err = codec.Marshal(&PlatAckBody{ AckSeqNum: msg.HEADER.SeqNum, AckID: msg.HEADER.MID, AckResult: 0, }) if err != nil { fmt.Println("err:", err) } msgAck := proto.Message{ HEADER: proto.Header{ MID: proto.PlatAck, Attr: proto.MakeAttr(1, false, 0, uint16(len(body))), Version: 1, PhoneNum: string(t.phoneNum), SeqNum: t.seqNum, }, BODY: body, } return proto.Packer(msgAck) } return nil }