固定QPS壓測模式探索

在早前跟測試同行在QQ羣聊天的時候,聊過一個固定QPS壓測的問題,最近忽然有需求,想實現一下,豐富一下本身的性能測試框架,最新的代碼請移步個人GitHub,地址:https://github.com/JunManYuanLong/FunTester,gitee地址:https://gitee.com/fanapi/testerjava

思路

  • 有一個多線程的基類,其餘壓測任務類繼承於基類。
  • 併發執行類由 線程池任務發生器補償器組成。
  • 單線程執行 任務發生器將生成的任務對象丟到線程池裏面執行。
  • 另起 補償器線程完成缺失的補償。(因爲多種緣由,真實發生量小於設定值)

整體的思路與如何mock固定QPS的接口moco固定QPS接口升級補償機制這兩票文章一致,可是沒有采起Semaphore的模式,緣由是moco是多線程對單線程,壓測是單線程對多線程。git

  • 語言繼續採用 Java語言。

基類

寫得有點倉促,還未進行大量實踐,因此註釋少一些。這裏依然設計兩種子模式:定量壓測定時壓測,這裏因爲兩種壓測模式,經過一個屬性isTimesMode記錄,在執行類FixedQpsConcurrent中用到,單次壓測任務對象統一isTimesModelimit兩個屬性。github

package com.fun.base.constaint;

import com.fun.base.interfaces.MarkThread;
import com.fun.config.HttpClientConstant;
import com.fun.frame.execute.FixedQpsConcurrent;
import com.fun.frame.httpclient.GCThread;
import com.fun.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FixedQpsThread<Textends ThreadBase {

    private static Logger logger = LoggerFactory.getLogger(FixedQpsThread.class);

    public int qps;

    public int limit;

    public boolean isTimesMode;

    public FixedQpsThread(T t, int limit, int qps, MarkThread markThread) {
        this.limit = limit;
        this.qps = qps;
        this.mark = markThread;
        this.t = t;
        isTimesMode = limit > 1000 ? true : false;
    }


    protected FixedQpsThread() {
        super();
    }

    @Override
    public void run() {
        try {
            before();
            threadmark = mark == null ? EMPTY : this.mark.mark(this);
            long s = Time.getTimeStamp();
            doing();
            long e = Time.getTimeStamp();
            long diff = e - s;
            FixedQpsConcurrent.allTimes.add(diff);
            FixedQpsConcurrent.executeTimes.getAndIncrement();
            if (diff > HttpClientConstant.MAX_ACCEPT_TIME)
                FixedQpsConcurrent.marks.add(diff + CONNECTOR + threadmark);
        } catch (Exception e) {
            logger.warn("執行任務失敗!", e);
            logger.warn("執行失敗對象的標記:{}", threadmark);
            FixedQpsConcurrent.errorTimes.getAndIncrement();
        } finally {
            after();
        }
    }

    @Override
    public void before() {
        GCThread.starts();
    }

    /**
     * 子類必需實現改方法,否則調用deepclone方法會報錯
     *
     * @return
     */

    public abstract FixedQpsThread clone();


}

執行類

此處補償線程設計還待優化,中間有兩處休眠:一處是循環檢測是否須要補償,一處是單詞補償間隔。還沒有提取配置變量,有待後面實踐以後進行優化調整。測試結果對象依然採用了原來的,數值和計算方式保持一致,後期也會根據實踐結果進行調整,能夠關注個人GitHub及時獲取更新。web

package com.fun.frame.execute;

import com.fun.base.bean.PerformanceResultBean;
import com.fun.base.constaint.FixedQpsThread;
import com.fun.config.Constant;
import com.fun.frame.Save;
import com.fun.frame.SourceCode;
import com.fun.frame.httpclient.GCThread;
import com.fun.utils.Time;
import com.fun.utils.WriteRead;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.stream.Collectors.toList;

/**
 * 併發類,用於啓動壓力腳本
 */

public class FixedQpsConcurrent extends SourceCode {

    private static Logger logger = LoggerFactory.getLogger(FixedQpsConcurrent.class);

    public static boolean key = false;

    public static AtomicInteger executeTimes = new AtomicInteger();

    public static AtomicInteger errorTimes = new AtomicInteger();

    public static Vector<String> marks = new Vector<>();

    /**
     * 用於記錄全部請求時間
     */

    public static Vector<Long> allTimes = new Vector<>();

    /**
     * 開始時間
     */

    public long startTime;

    /**
     * 結束時間
     */

    public long endTime;

    public int queueLength;

    /**
     * 任務描述
     */

    public String desc = DEFAULT_STRING;

    /**
     * 任務集
     */

    public List<FixedQpsThread> threads = new ArrayList<>();

    /**
     * 線程池
     */

    ExecutorService executorService;

    /**
     * @param thread 線程任務
     */

    public FixedQpsConcurrent(FixedQpsThread thread) {
        this(thread, DEFAULT_STRING);
    }

    /**
     * @param threads 線程組
     */

    public FixedQpsConcurrent(List<FixedQpsThread> threads) {
        this(threads, DEFAULT_STRING);
    }

    /**
     * @param thread 線程任務
     * @param desc   任務描述
     */

    public FixedQpsConcurrent(FixedQpsThread thread, String desc) {
        this();
        this.queueLength = 1;
        threads.add(thread);
        this.desc = desc + Time.getNow();
    }

    /**
     * @param threads 線程組
     * @param desc    任務描述
     */

    public FixedQpsConcurrent(List<FixedQpsThread> threads, String desc) {
        this();
        this.threads = threads;
        this.queueLength = threads.size();
        this.desc = desc + Time.getNow();
    }

    private FixedQpsConcurrent() {
        executorService = ThreadPoolUtil.createPool(202003);
    }

    /**
     * 執行多線程任務
     * 默認取list中thread對象,丟入線程池,完成多線程執行,若是沒有threadname,name默認採用desc+線程數做爲threadname,去除末尾的日期
     */

    public PerformanceResultBean start() {
        key = false;
        FixedQpsThread fixedQpsThread = threads.get(0);
        boolean isTimesMode = fixedQpsThread.isTimesMode;
        int limit = fixedQpsThread.limit;
        int qps = fixedQpsThread.qps;
        long interval = 1_000_000_000 / qps;
        AidThread aidThread = new AidThread();
        new Thread(aidThread).start();
        startTime = Time.getTimeStamp();
        while (true) {
            executorService.execute(threads.get(limit-- % queueLength).clone());
            if (key ? true : isTimesMode ? limit < 1 : Time.getTimeStamp() - startTime > fixedQpsThread.limit) break;
            sleep(interval);
        }
        endTime = Time.getTimeStamp();
        aidThread.stop();
        GCThread.stop();
        try {
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("線程池等待任務結束失敗!", e);
        }
        logger.info("總計執行 {} ,共用時:{} s,執行總數:{},錯誤數:{}!", fixedQpsThread.isTimesMode ? fixedQpsThread.limit + "次任務" : "秒", Time.getTimeDiffer(startTime, endTime), executeTimes, errorTimes);
        return over();
    }

    private PerformanceResultBean over() {
        key = true;
        Save.saveLongList(allTimes, "data/" + queueLength + desc);
        Save.saveStringListSync(marks, MARK_Path.replace(LONG_Path, EMPTY) + desc);
        allTimes = new Vector<>();
        marks = new Vector<>();
        executeTimes.set(0);
        errorTimes.set(0);
        return countQPS(queueLength, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime));
    }

    /**
     * 計算結果
     * <p>此結果僅供參考</p>
     *
     * @param name 線程數
     */

    public PerformanceResultBean countQPS(int name, String desc, String start, String end) {
        List<String> strings = WriteRead.readTxtFileByLine(Constant.DATA_Path + name + desc);
        int size = strings.size();
        List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList());
        int sum = data.stream().mapToInt(x -> x).sum();
        Collections.sort(data);
        String statistics = StatisticsUtil.statistics(data, desc, this.queueLength);
        double qps = 1000.0 * size * name / sum;
        return new PerformanceResultBean(desc, start, end, name, size, sum / size, qps, getPercent(executeTimes.get(), errorTimes.get()), 0, executeTimes.get(), statistics);
    }


    /**
     * 用於作後期的計算
     *
     * @param name
     * @param desc
     * @return
     */

    public PerformanceResultBean countQPS(int name, String desc) {
        return countQPS(name, desc, Time.getDate(), Time.getDate());
    }

    /**
     * 後期計算用
     *
     * @param name
     * @return
     */

    public PerformanceResultBean countQPS(int name) {
        return countQPS(name, EMPTY, Time.getDate(), Time.getDate());
    }


    /**
     * 補償線程
     */

    class AidThread implements Runnable {

        private boolean key = true;

        int i;

        public AidThread() {

        }

        @Override
        public void run() {
            logger.info("補償線程開始!");
            while (key) {
                long expect = (Time.getTimeStamp() - startTime) / 1000 * threads.get(0).qps;
                if (expect > executeTimes.get() + 10) {
                    range((int) expect - executeTimes.get()).forEach(x -> {
                        sleep(100);
                        executorService.execute(threads.get(i++ % queueLength).clone());
                    });
                }
                sleep(3);
            }
            logger.info("補償線程結束!");
        }

        public void stop() {
            key = false;
        }


    }


}

其餘配套的標記類統計類還等待修改,比較簡單,這裏不放代碼了。編程


公衆號FunTester首發,原創分享愛好者,騰訊雲、開源中國和掘金社區首頁推薦,知乎準八級原創做者,歡迎關注、交流,禁止第三方擅自轉載。api

FunTester熱文精選

本文分享自微信公衆號 - FunTester(NuclearTester)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。微信

相關文章
相關標籤/搜索