goreplay~v1.3.0新增項

--input-file-dry-run:  預覽時間和請求信息session

--input-file-max-wait:  容許跳過已記錄文件中的長暫停(s)app

--input-file-read-depth: 預讀和緩衝請求(並排序)。缺省值是100oop

源碼解析,plugins.gospa

for _, options := range Settings.InputFile {
    plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
}

輸入文件構造器code

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
    i = new(FileInput)
    i.data = make(chan []byte, 1000)
    i.exit = make(chan bool)
    i.path = path
    i.speedFactor = 1
    i.loop = loop
 i.readDepth = readDepth i.stats = expvar.NewMap("file-" + path) i.dryRun = dryRun i.maxWait = maxWait if err := i.init(); err != nil {
        return
    }
    go i.emit()
    return
}

作文件初始化blog

func (i *FileInput) init() (err error) {
    defer i.mu.Unlock()
    i.mu.Lock()

    var matches []string

    if strings.HasPrefix(i.path, "s3://") {
        sess := session.Must(session.NewSession(awsConfig()))
        svc := s3.New(sess)

        bucket, key := parseS3Url(i.path)

        params := &s3.ListObjectsInput{
            Bucket: aws.String(bucket),
            Prefix: aws.String(key),
        }

        resp, err := svc.ListObjects(params)
        if err != nil {
            Debug(0, "[INPUT-FILE] Error while retreiving list of files from S3", i.path, err)
            return err
        }

        for _, c := range resp.Contents {
            matches = append(matches, "s3://"+bucket+"/"+(*c.Key))
        }
    } else if matches, err = filepath.Glob(i.path); err != nil {
        Debug(0, "[INPUT-FILE] Wrong file pattern", i.path, err)
        return
    }

    if len(matches) == 0 {
        Debug(0, "[INPUT-FILE] No files match pattern: ", i.path)
        return errors.New("No matching files")
    }

    i.readers = make([]*fileInputReader, len(matches))

    for idx, p := range matches { i.readers[idx] = newFileInputReader(p, i.readDepth) } i.stats.Add("reader_count", int64(len(matches)))

    return nil
}

內容讀取排序

func (i *FileInput) emit() {
 var lastTime int64 = -1 var maxWait, firstWait, minWait int64  minWait = math.MaxInt64 i.stats.Add("negative_wait", 0) for {
        select {
        case <-i.exit:
            return
        default:
        }

        reader := i.nextReader()

        if reader == nil {
            if i.loop {
                i.init()
                lastTime = -1
                continue
            } else {
                break
            }
        }

 reader.queue.RLock() payload := heap.Pop(&reader.queue).(*filePayload) i.stats.Add("total_counter", 1) i.stats.Add("total_bytes", int64(len(payload.data))) reader.queue.RUnlock() if lastTime != -1 {
            diff := payload.timestamp - lastTime if firstWait == 0 { firstWait = diff } if i.speedFactor != 1 { diff = int64(float64(diff) / i.speedFactor) } if i.maxWait > 0 && diff > int64(i.maxWait) { diff = int64(i.maxWait) } if diff >= 0 { lastTime = payload.timestamp if !i.dryRun { time.Sleep(time.Duration(diff)) } i.stats.Add("total_wait", diff) if diff > maxWait { maxWait = diff } if diff < minWait { minWait = diff } } else { i.stats.Add("negative_wait", 1) }
        } else {
            lastTime = payload.timestamp
        }

        // Recheck if we have exited since last check.
        select {
        case <-i.exit:
            return
        default:
            if !i.dryRun {
                i.data <- payload.data
            }
        }
    }

   i.stats.Set("first_wait", time.Duration(firstWait)) i.stats.Set("max_wait", time.Duration(maxWait)) i.stats.Set("min_wait", time.Duration(minWait))

    Debug(0, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))

    if i.dryRun { fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n", i.stats.Get("total_counter"), i.stats.Get("reader_count"), i.stats.Get("total_bytes"), i.stats.Get("max_wait"), i.stats.Get("min_wait"), i.stats.Get("first_wait"), time.Duration(i.stats.Get("total_wait").(*expvar.Int).Value()), i.stats.Get("negative_wait"), ) }
}
相關文章
相關標籤/搜索