如何對消息隊列作性能測試

本人在負責服務壓測的實踐中,遇到了一個需求,就是對消息隊列的dubbo接口性能進行壓測,主要分兩類:一類是往隊列裏面添加,一類是從隊列中取值(等同刪除)。是一個server的兩個不一樣方法。同組其餘人用的jmeter進行的dubbo接口壓測。 隊列的添加規則比較簡單,主要有一個標誌msg,由事件類型+用戶標識符+消息體構成。作此類此類測試的時候遇到的問題就是若是構建消息體,每次都構建不一樣的消息體,這裏我才用了納秒+隨機數的方式,後來發現直接用納秒就行。(這裏相信jmeter也應該有響應的方法)java

在添加隊列的測試不太清楚jmeter如何實現,由於他們直接放棄掉了,我才用的方案是,先構建足夠多數量的消息,而後將消息數據拿出來放到一個線程安全的集合中,多線程去拿,使用的是java的LinkedBlockingQueue<String>消息隊列。沒作完一次測試,重置一次測試數據,防止中途有失敗的狀況。git

public int createQ() {
        String absolutePath = new File("").getAbsolutePath();
        List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/dubbo");
        new Concurrent(new ThreadBase(SourceCode.changeStringToInt(strings.get(0))) {
            @Override
            protected void before() {

            }

            @Override
            protected void doing() throws Exception {
                CreateQueueRequest createQueueRequest = new CreateQueueRequest();
                createQueueRequest.setReqId(TraceKeyHolder.getTraceKey());
                createQueueRequest.setDelayTime(System.currentTimeMillis() + 3600 * 1000);
                String msg = "wait_for_publish:8888" + "@" + System.nanoTime() + PublishType.ZUOYE;
                createQueueRequest.setMsg(msg);
                createQueueRequest.setTaskTypeEnum(TaskTypeEnum.PUBLISH_PROMU);
                createQueueRequest.setTtl(0L);
                CommonResponse<CreateQueueResultVo> queue = commonDelayQueueService.createQueue(createQueueRequest);
                logger.info("createQueue0  {}", JsonUtil.obj2Json(queue));
            }

            @Override
            protected void after() {

            }
        }, SourceCode.changeStringToInt(strings.get(1))).start();
        return 0;
    }

刪除隊列:api

public int deleteQ() throws InterruptedException {
        if (msgs.size() == 0) {
            logger.info("隊列爲空了");
            msgs = addmsg();
        }
        String absolutePath = new File("").getAbsolutePath();
        List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/dubbo");

        new Concurrent(new ThreadBase(SourceCode.changeStringToInt(strings.get(0))) {
            @Override
            protected void before() {

            }

            @Override
            protected void doing() throws Exception {
                String msg = msgs.poll(100, TimeUnit.MILLISECONDS);
                logger.info("msg:{}", msg);
                DeleteQueueRequest deleteQueueRequest0 = new DeleteQueueRequest();
                deleteQueueRequest0.setMsg(msg);
                deleteQueueRequest0.setTaskTypeEnum(TaskTypeEnum.PUBLISH_PROMU);
                CommonResponse<String> queue3 = commonDelayQueueService.deleteQueue(deleteQueueRequest0);
                logger.info("deleteQueue2 {}", JsonUtil.obj2Json(queue3));
            }

            @Override
            protected void after() {

            }
        }, SourceCode.changeStringToInt(strings.get(1))).start();

        return 0;
    }

其中msgs的設置以下:安全

public static LinkedBlockingQueue<String> msgs = addmsg();


    public static LinkedBlockingQueue<String> addmsg() {
        String absolutePath = new File("").getAbsolutePath();
        List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/queue");
        LinkedBlockingQueue<String> ss = new LinkedBlockingQueue<>();
        ss.addAll(strings);
        logger.info("從新讀取隊列值");
        return ss;
    }
  • 這裏會有一個問題:在不斷測試過程當中,addmsg方法可能在測試過程當中被執行。

由於我在作測試的時候,數據量足夠大,因此沒有作處理,若是數據量不足以支撐不少次測試,能夠採用啓動測試前把msgs進行初始化,或者在before()方法裏面爲每個線程進行數據初始化操做。markdown

歡迎有興趣的童鞋一塊兒交流多線程

相關文章
相關標籤/搜索