假設你寫了一個程序做爲http客戶端,進行webhook的請求,你會計算每一個http請求所須要的時間,而後把這些信息打印在日誌裏,可是把信息寫到日誌裏,從而查看程序的http請求狀況,用來監控本身的程序和對端的http server的運行情況。當程序部署太多,到每臺機器上看這些日誌信息,會很麻煩,你這時候可能想着添加一個上報,當上報的http請求時間出現異常的時候,就進行報警。起初,你可能會全量上報,那時候你的程序發出的http請求還比較少,上報系統能夠承載住這樣的負荷,可是後來你發現,你的程序進行的http請求愈來愈多,上報系統以爲承受不了這麼大的負荷。這時候,你有多個選擇,一種是將上報平均時間,這種上報方式上報的信息不多,可是很容易受到極端慢的請求的影響。或者,你可選擇分多個時間段上報,若是你對http server很瞭解,並且運行時間好久了,也不大會變更,你能夠hard code一系列的時間段,而後進行上報。可是,若是你的http server不受到你的控制,變更又很大,那上報99%,99.9%分位將會是更好的選擇。下面略微改寫folly中的tdigest的代碼,給出了下面的實現代碼:web
#include <cassert>
#include <cstddef>
#include <vector>
class Centroid { public: explicit Centroid(double mean = 0.0, double weight = 1.0) : mean_(mean), weight_(weight) { assert(weight > 0); } double mean() const; double weight() const; // Adds the sum/weight to this centroid, and returns the new sum. double add(double sum, double weight); bool operator<(const Centroid& other) const; private: double mean_; double weight_; }; inline double Centroid::mean() const { return mean_; } inline double Centroid::weight() const { return weight_; } inline double Centroid::add(double sum, double weight) { sum += (mean_ * weight_); weight_ += weight; mean_ = sum /weight_; return sum; } inline bool Centroid::operator<(const Centroid& other) const { return mean() < other.mean(); } class TDigest { public: explicit TDigest(size_t maxSize = 100); /* * Returns a new TDigest constructed with values merged from the current * digest and the given sortedValues. */ TDigest merge_sorted(const std::vector<double> &sortedValues) const; /* * Returns a new TDigest constructed with values merged from the current * digest and the given unsortedValues. */ TDigest merge_unsorted(std::vector<double> *unsortedValues) const; /* * Estimates the value of the given quantile. */ double estimateQuantile(double q) const; double mean() const { return count_ > 0 ? sum_ / count_ : 0; } double sum() const { return sum_; } double count() const { return count_; } double min() const { return min_; } double max() const { return max_; } double empty() const { return centroids_.empty(); } const std::vector<Centroid>& getCentroids() const { return centroids_; } size_t maxSize() const { return maxSize_; } private: std::vector<Centroid> centroids_; size_t maxSize_; double sum_; double count_; double max_; double min_; };
#include "tdigest.h"
#include <algorithm>
static double k_to_q(double k, double d) { double k_div_d = k / d; if (k_div_d >= 0.5) { double base = 1 - k_div_d; return 1 - 2 * base * base; } else { return 2 * k_div_d * k_div_d; } } static double clamp(double v, double lo, double hi) { if (v > hi) { return hi; } else if (v < lo) { return lo; } return v; } tdigest::tdigest(size_t maxsize) : maxsize_(maxsize), sum_(0.0), count_(0.0), max_(nan), min_(nan) {} // merge unsorted values by first sorting them. tdigest tdigest::merge_unsorted(std::vector<double> *unsortedvalues) const { std::sort(unsortedvalues->begin(), unsortedvalues->end()); return merge_sorted(*unsortedvalues); } tdigest tdigest::merge_sorted(const std::vector<double> &sortedvalues) const { if (sortedvalues.empty()) { return *this; } tdigest result(maxsize_); result.count_ = count_ + sortedvalues.size(); double maybemin = *(sortedvalues.begin()); double maybemax = *(sortedvalues.end() - 1); if (count_ > 0) { // we know that min_ and max_ are numbers result.min_ = (std::min)(min_, maybemin); result.max_ = (std::max)(max_, maybemax); } else { // we know that min_ and max_ are nan. result.min_ = maybemin; result.max_ = maybemax; } std::vector<centroid> compressed; compressed.reserve(maxsize_); double k_limit = 1; double q_limit_times_count = k_to_q(k_limit++, maxsize_) * result.count_; auto it_centroids = centroids_.begin(); auto it_sortedvalues = sortedvalues.begin(); centroid cur; if (it_centroids != centroids_.end() && it_centroids->mean() < *it_sortedvalues) { cur = *it_centroids++; } else { cur = centroid(*it_sortedvalues++, 1.0); } double weightsofar = cur.weight(); // keep track of sums along the way to reduce expensive floating points double sumstomerge = 0; double weightstomerge = 0; while (it_centroids != centroids_.end() || it_sortedvalues != sortedvalues.end()) { centroid next; if (it_centroids != centroids_.end() && (it_sortedvalues == sortedvalues.end() || it_centroids->mean() < *it_sortedvalues)) { next = *it_centroids++; } else { next = centroid(*it_sortedvalues++, 1.0); } double nextsum = next.mean() * next.weight(); weightsofar += next.weight(); if (weightsofar <= q_limit_times_count) { sumstomerge += nextsum; weightstomerge += next.weight(); } else { result.sum_ += cur.add(sumstomerge, weightstomerge); sumstomerge = 0; weightstomerge = 0; compressed.push_back(cur); q_limit_times_count = k_to_q(k_limit++, maxsize_) * result.count_; cur = next; } } result.sum_ += cur.add(sumstomerge, weightstomerge); compressed.push_back(cur); compressed.shrink_to_fit(); // deal with floating point precision std::sort(compressed.begin(), compressed.end()); result.centroids_ = std::move(compressed); return result; } double tdigest::estimatequantile(double q) const { if (centroids_.empty()) { return 0.0; } double rank = q * count_; size_t pos; double t; if (q > 0.5) { if (q >= 1.0) { return max_; } pos = 0; t = count_; for (auto rit = centroids_.rbegin(); rit != centroids_.rend(); ++rit) { t -= rit->weight(); if (rank >= t) { pos = std::distance(rit, centroids_.rend()) - 1; break; } } } else { if (q <= 0.0) { return min_; } pos = centroids_.size() - 1; t = 0; for (auto it = centroids_.begin(); it != centroids_.end(); ++it) { if (rank < t + it->weight()) { pos = std::distance(centroids_.begin(), it); break; } t += it->weight(); } } double delta = 0; double min = min_; double max = max_; if (centroids_.size() > 1) { if (pos == 0) { delta = centroids_[pos + 1].mean() - centroids_[pos].mean(); max = centroids_[pos + 1].mean(); } else if (pos == centroids_.size() - 1) { delta = centroids_[pos].mean() - centroids_[pos - 1].mean(); min = centroids_[pos - 1].mean(); } else { delta = (centroids_[pos + 1].mean() - centroids_[pos - 1].mean()) / 2; min = centroids_[pos - 1].mean(); max = centroids_[pos + 1].mean(); } } auto value = centroids_[pos].mean() + ((rank - t) /centroids_[pos].weight() - 0.5) * delta; return clamp(value, min, max); }
#include <gtest/gtest.h>
#include "tdigest.h"
class UnitTest : public testing::Test { public: UnitTest() {} ~UnitTest() {} void SetUp() override {} void TearDown() override {} void UnitTest_TDigestTest() { std::random_device rd{}; std::mt19937 gen{rd()}; TDigest td(1000); const size_t max_i = 100; const size_t max_j = 1000; const size_t all_size = max_i * max_j; std::vector<double> all_numbers; all_numbers.reserve(all_size); std::normal_distribution<double> d{50, 2}; for (size_t i = 0; i != max_i; ++i) { std::vector<double> numbers; for (size_t j = 0; j != max_j; ++j) { auto val = d(gen); numbers.push_back(val); all_numbers.push_back(val); } td = td.merge_unsorted(&numbers); numbers.clear(); } std::sort(all_numbers.begin(), all_numbers.end()); std::cout << "real 99%: " << all_numbers[(all_size * 99) / 100] << std::endl; std::cout << "estimate 99%: " << td.estimateQuantile(0.99) << std::endl; std::cout << "real 99.9%: " << all_numbers[(all_size * 999) / 1000] << std::endl; std::cout << "estimate 99.9%: " << td.estimateQuantile(0.999) << std::endl; } }; TEST_F(UnitTest, UnitTest_TDigestTest) { UnitTest_TDigestTest(); }
tdigest的思想大體是這樣的,給定若干個槽位,如今假設有1000個槽位,經過特定的映射方式,使每一個槽位對應的weight不一樣,兩邊每一個槽位對應的weight比較小,而中間每一個槽位對應的weight比較大,從而能夠得到比較精確的99%,99.9%這些的信息。這裏面使用的映射方式是:k_to_q。對於merge_sorted函數中的dom
weightsofar <= q_limit_times_count
判斷,我有一個想法,這裏,是否改成ide
weightsofar += next.weight();
以前和以後,哪一個距離q_limit_times_count更近,就取哪一個,會不會獲得的結果更加準確。這點,暫時沒有測試,有須要能夠加上,或者讀者本身加上測試一下。函數