type SimpleCanalConnector struct { Address string Port int UserName string PassWord string SoTime int32 IdleTimeOut int32 ClientIdentity pb.ClientIdentity Connected bool Running bool Filter string RollbackOnConnect bool LazyParseEntry bool }
//NewSimpleCanalConnector 建立SimpleCanalConnector實例 func NewSimpleCanalConnector(address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector { s := &SimpleCanalConnector{ Address: address, Port: port, UserName: username, PassWord: password, ClientIdentity: pb.ClientIdentity{Destination: destination, ClientId: 1001}, SoTime: soTimeOut, IdleTimeOut: idleTimeOut, RollbackOnConnect: true, } return s }
//Connect 鏈接Canal-server func (c *SimpleCanalConnector) Connect() error { if c.Connected { return nil } if c.Running { return nil } err := c.doConnect() if err != nil { return err } if c.Filter != "" { c.Subscribe(c.Filter) } if c.RollbackOnConnect { c.waitClientRunning() c.RollBack(0) } c.Connected = true return nil }
//DisConnection 關閉鏈接 func (c *SimpleCanalConnector) DisConnection() { if c.RollbackOnConnect && c.Connected == true { c.RollBack(0) } c.Connected = false quitelyClose() } //quitelyClose 優雅關閉 func quitelyClose() { if conn != nil { conn.Close() } }
//doConnect 去鏈接Canal-Server func (c SimpleCanalConnector) doConnect() error { address := c.Address + ":" + fmt.Sprintf("%d", c.Port) con, err := net.Dial("tcp", address) if err != nil { return err } conn = con p := new(pb.Packet) data, err := readNextPacket() if err != nil { return err } err = proto.Unmarshal(data, p) if err != nil { return err } if p != nil { if p.GetVersion() != 1 { panic("unsupported version at this client.") } if p.GetType() != pb.PacketType_HANDSHAKE { panic("expect handshake but found other type.") } handshake := &pb.Handshake{} err = proto.Unmarshal(p.GetBody(), handshake) if err != nil { return err } pas := []byte(c.PassWord) ca := &pb.ClientAuth{ Username: c.UserName, Password: pas, NetReadTimeoutPresent: &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut}, NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut}, } caByteArray, _ := proto.Marshal(ca) packet := &pb.Packet{ Type: pb.PacketType_CLIENTAUTHENTICATION, Body: caByteArray, } packArray, _ := proto.Marshal(packet) WriteWithHeader(packArray) pp, err := readNextPacket() if err != nil { return err } pk := &pb.Packet{} err = proto.Unmarshal(pp, pk) if err != nil { return err } if pk.Type != pb.PacketType_ACK { panic("unexpected packet type when ack is expected") } ackBody := &pb.Ack{} err = proto.Unmarshal(pk.GetBody(), ackBody) if err != nil { return err } if ackBody.GetErrorCode() > 0 { panic(errors.New(fmt.Sprintf("something goes wrong when doing authentication:%s", ackBody.GetErrorMessage()))) } c.Connected = true } return nil }
//GetWithOutAck 獲取數據不Ack func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) { c.waitClientRunning() if !c.Running { return nil, nil } var size int32 if batchSize < 0 { size = 1000 } else { size = batchSize } var time *int64 var t int64 t = -1 if timeOut == nil { time = &t } else { time = timeOut } var i int32 i = -1 if units == nil { units = &i } get := new(pb.Get) get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false} get.Destination = c.ClientIdentity.Destination get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId) get.FetchSize = size get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time} get.UnitPresent = &pb.Get_Unit{Unit: *units} getBody, err := proto.Marshal(get) if err != nil { return nil, err } packet := new(pb.Packet) packet.Type = pb.PacketType_GET packet.Body = getBody pa, err := proto.Marshal(packet) if err != nil { return nil, err } WriteWithHeader(pa) message, err := c.receiveMessages() if err != nil { return nil, err } return message, nil }
//Get 獲取數據而且Ack數據 func (c *SimpleCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) { message, err := c.GetWithOutAck(batchSize, timeOut, units) if err != nil { return nil, err } err = c.Ack(message.Id) if err != nil { return nil, err } return message, nil }
//Ack Ack Canal-server的數據(就是昨晚某些邏輯操做後刪除canal-server端的數據) func (c *SimpleCanalConnector) Ack(batchId int64) error { c.waitClientRunning() if !c.Running { return nil } ca := new(pb.ClientAck) ca.Destination = c.ClientIdentity.Destination ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId) ca.BatchId = batchId clientAck, err := proto.Marshal(ca) if err != nil { return err } pa := new(pb.Packet) pa.Type = pb.PacketType_CLIENTACK pa.Body = clientAck pack, err := proto.Marshal(pa) if err != nil { return err } WriteWithHeader(pack) return nil }
//Subscribe 訂閱 func (c *SimpleCanalConnector) Subscribe(filter string) error { c.waitClientRunning() if !c.Running { return nil } body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter}) pack := new(pb.Packet) pack.Type = pb.PacketType_SUBSCRIPTION pack.Body = body packet, _ := proto.Marshal(pack) WriteWithHeader(packet) p := new(pb.Packet) paBytes, err := readNextPacket() if err != nil { return err } err = proto.Unmarshal(paBytes, p) if err != nil { return err } ack := new(pb.Ack) err = proto.Unmarshal(p.Body, ack) if err != nil { return err } if ack.GetErrorCode() > 0 { return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage()) } c.Filter = filter return nil }
//UnSubscribe 取消訂閱 func (c *SimpleCanalConnector) UnSubscribe() error { c.waitClientRunning() if c.Running { return nil } us := new(pb.Unsub) us.Destination = c.ClientIdentity.Destination us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId) unSub, err := proto.Marshal(us) if err != nil { return err } pa := new(pb.Packet) pa.Type = pb.PacketType_UNSUBSCRIPTION pa.Body = unSub pack, err := proto.Marshal(pa) WriteWithHeader(pack) p, err := readNextPacket() if err != nil { return err } pa = nil err = proto.Unmarshal(p, pa) if err != nil { return err } ack := new(pb.Ack) err = proto.Unmarshal(pa.Body, ack) if err != nil { return err } if ack.GetErrorCode() > 0 { panic(errors.New(fmt.Sprintf("failed to unSubscribe with reason:%s", ack.GetErrorMessage()))) } return nil }
//RollBack 回滾操做 func (c *SimpleCanalConnector) RollBack(batchId int64) error { c.waitClientRunning() cb := new(pb.ClientRollback) cb.Destination = c.ClientIdentity.Destination cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId) cb.BatchId = batchId clientBollBack, err := proto.Marshal(cb) if err != nil { return err } pa := new(pb.Packet) pa.Type = pb.PacketType_CLIENTROLLBACK pa.Body = clientBollBack pack, err := proto.Marshal(pa) if err != nil { return err } WriteWithHeader(pack) return nil }