百度brpc 壓測工具rpc_press解析

1. 背景

昨天看到一段brpc中的壓測代碼rpc_press, 看着不錯。整理一下。
發壓工具的難點不是發送請求,而是要注意下面的2點:git

  • 保證能發出足夠的qps,好比上萬qps
  • 控制發送合理的qps,好比控制爲5qps,不能夠大量發壓

2. brpc 中的是關鍵實現

2.1 如何確保發送足夠qps

rpc_press 採用多線程發送。
對於上萬qps,多線程來分攤,每一個線程發送qps控制在必定範圍內。github

2.2 如何控制合理qps

這是難點,線程如何控制發送qps。
看下面公式,1000000us(1s)須要發送200請求, 那50請求須要多少時間了?這就是brpc控制壓力的核心原理。
多線程

核心原理: rpc_press中根據目前發送請求的速度與應該的速度進行比較,來判斷是繼續發壓,仍是usleep工具

2.3 rpc_press 代碼

這裏貼一點rpc_press工做線程git代碼(引用自brpc)fetch

void RpcPress::sync_client() {
    double req_rate = _options.test_req_rate / _options.test_thread_num;
    //max make up time is 5 s
    if (_msgs.empty()) {
        LOG(ERROR) << "nothing to send!";
        return;
    }
    const int thread_index = g_thread_count.fetch_add(1, butil::memory_order_relaxed);
    int msg_index = thread_index;
    std::deque<int64_t> timeq;
    size_t MAX_QUEUE_SIZE = (size_t)req_rate;
    if (MAX_QUEUE_SIZE < 100) {
        MAX_QUEUE_SIZE = 100;
    } else if (MAX_QUEUE_SIZE > 2000) {
        MAX_QUEUE_SIZE = 2000;
    }
    timeq.push_back(butil::gettimeofday_us());
    while (!_stop) {
        brpc::Controller* cntl = new brpc::Controller;
        msg_index = (msg_index + _options.test_thread_num) % _msgs.size();
        Message* request = _msgs[msg_index];
        Message* response = _pbrpc_client->get_output_message();
        const int64_t start_time = butil::gettimeofday_us();
        google::protobuf::Closure* done = brpc::NewCallback<
            RpcPress, 
            RpcPress*, 
            brpc::Controller*, 
            Message*, 
            Message*, int64_t>
            (this, &RpcPress::handle_response, cntl, request, response, start_time);
        const brpc::CallId cid1 = cntl->call_id();
        _pbrpc_client->call_method(cntl, request, response, done);
        _sent_count << 1;

        if (_options.test_req_rate <= 0) { 
            brpc::Join(cid1);
        } else {
            int64_t end_time = butil::gettimeofday_us();
            int64_t expected_elp = 0;
            int64_t actual_elp = 0;
            timeq.push_back(end_time);
            if (timeq.size() > MAX_QUEUE_SIZE) {
                actual_elp = end_time - timeq.front();
                timeq.pop_front();
                expected_elp = (int64_t)(1000000 * timeq.size() / req_rate);
            } else {
                actual_elp = end_time - timeq.front();
                expected_elp = (int64_t)(1000000 * (timeq.size() - 1) / req_rate);
            }
            if (actual_elp < expected_elp) {
                usleep(expected_elp - actual_elp);
            }
        }
    }
}

3. 參考

相關文章
相關標籤/搜索