本文主要研究一下kingbus的binlog_progress.gomysql
kingbus/server/binlog_progress.gogit
//BinlogProgress is the progress of receiving binlog type BinlogProgress struct { currentGtid *atomic.String lastSaveGtid string //for heartbeat event lastBinlogFile *atomic.String lastFilePosition *atomic.Uint32 executedGtidSetStr *atomic.String trxBoundaryParser *mysql.TransactionBoundaryParser persistentTime time.Time persistentAppliedIndex uint64 executedGtidSet gomysql.GTIDSet store storage.Storage }
kingbus/server/binlog_progress.gogithub
func newBinlogProgress(store storage.Storage) (*BinlogProgress, error) { var err error p := new(BinlogProgress) p.trxBoundaryParser = new(mysql.TransactionBoundaryParser) p.trxBoundaryParser.Reset() p.currentGtid = atomic.NewString("") p.lastBinlogFile = atomic.NewString("") p.lastFilePosition = atomic.NewUint32(0) p.persistentAppliedIndex = 0 p.persistentTime = time.Unix(0, 0) //get executed gtid_set //This value may be old, but resetBinlogProgress will update it to the latest p.executedGtidSet, err = store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey) if err != nil { log.Log.Errorf("newBinlogProgress:get executedGtidSet error,err:%s", err) return nil, err } p.executedGtidSetStr = atomic.NewString(p.executedGtidSet.String()) p.store = store return p, nil }
kingbus/server/binlog_progress.gosql
//updateProcess update and save executedGtid set func (s *BinlogProgress) updateProcess(raftIndex uint64, eventRawData []byte) error { var err error //parse event header h := new(replication.EventHeader) err = h.Decode(eventRawData) if err != nil { log.Log.Errorf("Decode error,err:%s,buf:%v", err, eventRawData) return err } //set the heartbeat info s.lastFilePosition.Store(h.LogPos) //remove header eventRawData = eventRawData[replication.EventHeaderSize:] eventLen := int(h.EventSize) - replication.EventHeaderSize if len(eventRawData) != eventLen { return fmt.Errorf("invalid data size %d in event %s, less event length %d", len(eventRawData), h.EventType, eventLen) } //remove crc32 eventRawData = eventRawData[:len(eventRawData)-replication.BinlogChecksumLength] //the eventRawData maybe the first divided packet, but must not be query event //so don't worry eventBoundaryType, err := s.trxBoundaryParser.GetEventBoundaryType(h, eventRawData) if err != nil { log.Log.Errorf("GetEventBoundaryType error,err:%s,header:%v", err, *h) return err } //ignore updateState error, maybe a partial trx err = s.trxBoundaryParser.UpdateState(eventBoundaryType) if err != nil { log.Log.Warnf("trxBoundaryParser UpdateState error,err:%s,header:%v", err, *h) s.trxBoundaryParser.Reset() s.currentGtid.Store("") return nil } currentGtidStr := s.currentGtid.Load() if s.trxBoundaryParser.IsNotInsideTransaction() && len(currentGtidStr) != 0 && s.lastSaveGtid != currentGtidStr { log.Log.Debugf("current gtid is :%s,add into executedGtidSet:%s", currentGtidStr, s.executedGtidSet.String()) //update executedGtidSet err = s.executedGtidSet.Update(currentGtidStr) if err != nil { return err } s.lastSaveGtid = currentGtidStr s.executedGtidSetStr.Store(s.executedGtidSet.String()) //save the raftIndex and executedGtidSet at the same time if raftIndex-s.persistentAppliedIndex > persistentCount || time.Now().Sub(s.persistentTime) > persistentTimeInterval { err = s.store.SetBinlogProgress(raftIndex, s.executedGtidSet) if err != nil { log.Log.Errorf("SetGtidSet error,err:%s,key:%s,value:%s", err, storage.ExecutedGtidSetKey, s.executedGtidSet.String()) return err } s.persistentAppliedIndex = raftIndex s.persistentTime = time.Now() } } return nil }
kingbus的binlog_progress.go提供了newBinlogProgress、updateProcess方法用於存儲binglogProgressless