因爲工做的需求,後續筆者工做須要和開源的OLAP數據庫ClickHouse打交道。ClickHouse是Yandex在2016年6月15日開源了一個分析型數據庫,以強悍的單機處理能力被稱道。
筆者在實際測試ClickHouse和閱讀ClickHouse的源碼過程之中,對"戰鬥民族"開發的數據庫十分欣賞。ClickHouse不只是一個很好的數據庫學習材料,並且同時應用了大量的CPP17的新特性進行開發,也是一個大型的Modern CPP的教導資料。
筆者接下來會陸續將閱讀ClickHouse的部分心得體會與經過源碼閱讀筆記的方式和你們分享,坦白說,這種源碼閱讀筆記很難寫啊。(多一分繁瑣,少一分就模糊了~~)
第一篇文章,咱們就從聚合函數的實現開始聊起~~ 上車!git
聚合函數: 顧名思義就是對一組數據執行聚合計算並返回結果的函數。
這類函數在數據庫之中很常見,如:count, max, min, sum
等等。github
/** Adds a value into aggregation data on which place points to. * columns points to columns containing arguments of aggregation function. * row_num is number of row which should be added. * Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation. */ virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0; /// Merges state (on which place points to) with other state of current aggregation function. virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0; /// Serializes state (to transmit it over the network, for example). virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0; /// Deserializes state. This function is called only for empty (just created) states. virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0; // /** Contains a loop with calls to "add" function. You can collect arguments into array "places" * and do a single call to "addBatch" for devirtualization and inlining. */ virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
ColumnUInt八、ColumnArray
等, 都實現了對應的列接口,而且在子類之中具象實現了不一樣的內存佈局。columns
是一個二維數組,經過columns[0]
能夠取到第一列。(這裏只有涉及到一列,爲何columns是二維數組呢?由於處理array等列的時候,也是經過對應的接口,而array就須要應用二維數組了. )IColumn
子類實現的getData方法
獲取對應row_num
行的數據進行add函數調用就完成了一次聚合函數的計算了。void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override { const auto & column = static_cast<const ColVecType &>(*columns[0]); this->data(place).add(column.getData()[row_num]); }
addFree
就實現了我上述所說的過程,可是它是一個private的函數,因此一般咱們都是經過getAddressOfAddFunction
獲取對應的函數地址。這在聚合查詢的過程之中可以提升20%左右的執行效率。template <typename Derived> class IAggregateFunctionHelper : public IAggregateFunction { private: static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) { static_cast<const Derived &>(*that).add(place, columns, row_num, arena); } public: IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_) : IAggregateFunction(argument_types_, parameters_) {} AddFunc getAddressOfAddFunction() const override { return &addFree; }
class AggregateFunctionFactory final : private boost::noncopyable, public IFactoryWithAliases<AggregateFunctionCreator> { public: static AggregateFunctionFactory & instance(); /// Register a function by its name. /// No locking, you must register all functions before usage of get. void registerFunction( const String & name, Creator creator, CaseSensitiveness case_sensitiveness = CaseSensitive); /// Throws an exception if not found. AggregateFunctionPtr get( const String & name, const DataTypes & argument_types, const Array & parameters = {}, int recursion_level = 0) const;
有了上述的背景知識,咱們接下來舉個栗子。來看看一個聚合函數的實現細節,以及它是如何被使用的。算法
筆者這裏選取了一個很簡單的聚合算子Sum,咱們來看看它實現的代碼細節。
這裏咱們能夠看到AggregateFunctionSum
是個final類,沒法被繼承了。而它繼承了上面提到的IAggregateFunctionHelp
類的子類IAggregateFunctionDataHelper
類。數據庫
這裏咱們就重點看,這個類override了getName
方法,返回了對應的名字sum。而且實現了咱們上文提到的四個核心的方法。數組
template <typename T, typename TResult, typename Data> class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>> { public: using ResultDataType = std::conditional_t<IsDecimalNumber<T>, DataTypeDecimal<TResult>, DataTypeNumber<TResult>>; using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>; using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<TResult>, ColumnVector<TResult>>; String getName() const override { return "sum"; } AggregateFunctionSum(const DataTypes & argument_types_) : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {}) , scale(0) {} AggregateFunctionSum(const IDataType & data_type, const DataTypes & argument_types_) : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {}) , scale(getDecimalScale(data_type)) {} DataTypePtr getReturnType() const override { if constexpr (IsDecimalNumber<T>) return std::make_shared<ResultDataType>(ResultDataType::maxPrecision(), scale); else return std::make_shared<ResultDataType>(); } void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override { const auto & column = static_cast<const ColVecType &>(*columns[0]); this->data(place).add(column.getData()[row_num]); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).write(buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).read(buf); } void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override { auto & column = static_cast<ColVecResult &>(to); column.getData().push_back(this->data(place).get()); } private: UInt32 scale; };
接下來,ClickHouse實現了兩種聚合計算:AggregateFunctionSumData
和AggregateFunctionSumKahanData
。後者是用Kahan算法避免float類型精度損失的,咱們能夠暫時不細看。直接看SumData的實現。這是個模板類,以前咱們講到AggregateFunction
的函數就是經過AggregateDataPtr
指針來獲取AggregateFunctionSumData
的地址,來調用add實現聚合算子的。咱們能夠看到AggregateFunctionSumData實現了前文提到的add, merge, write,read
四大方法,正好和接口一一對應上了。數據結構
template <typename T> struct AggregateFunctionSumData { T sum{}; void add(T value) { sum += value; } void merge(const AggregateFunctionSumData & rhs) { sum += rhs.sum; } void write(WriteBuffer & buf) const { writeBinary(sum, buf); } void read(ReadBuffer & buf) { readBinary(sum, buf); } T get() const { return sum; } };
ClickHouse在Server啓動時。main函數之中會調用registerAggregateFunction
的初始化函數註冊全部的聚合函數。
而後調用到下面的函數:併發
void registerAggregateFunctionSum(AggregateFunctionFactory & factory) { factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive); factory.registerFunction("sumWithOverflow", createAggregateFunctionSum<AggregateFunctionSumWithOverflow>); factory.registerFunction("sumKahan", createAggregateFunctionSum<AggregateFunctionSumKahan>); }
這裏又調用了 factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive);
來進行上述咱們看到的聚合函數的註冊。這裏有一點很噁心的模板代碼,筆者這裏簡化了一下,把註冊的部分函數拉出來:分佈式
createAggregateFunctionSum(const std::string & name, const DataTypes & argument_types, const Array & parameters) { AggregateFunctionPtr res; DataTypePtr data_type = argument_types[0]; if (isDecimal(data_type)) res.reset(createWithDecimalType<Function>(*data_type, *data_type, argument_types)); else res.reset(createWithNumericType<Function>(*data_type, argument_types)); return res;
這裏的Function
模板就是上面的AggregateFunctionSumSimple
, 而它又是下面的模板類型:ide
template <typename T> using AggregateFunctionSumSimple = typename SumSimple<T>::Function; template <typename T> struct SumSimple { /// @note It uses slow Decimal128 (cause we need such a variant). sumWithOverflow is faster for Decimal32/64 using ResultType = std::conditional_t<IsDecimalNumber<T>, Decimal128, NearestFieldType<T>>; using AggregateDataType = AggregateFunctionSumData<ResultType>; using Function = AggregateFunctionSum<T, ResultType, AggregateDataType>; };
不知道讀者被繞暈了沒,最終繞回來仍是new出來這個AggregateFunctionSum<T, ResultType, AggregateDataType>
也就是完成了這個求和算子的註冊,後續咱們get出來就能夠愉快的調用啦。(這裏這部分的模板變化比較複雜,若是看不明白能夠回到源碼梳理一下~~~)函數
好了,關於聚合函數的基礎信息,和它是如何實現而且經過工廠方法註冊獲取的流程算是搞明白了。
關於其餘的聚合算子,也是大同小異的方式。筆者就再也不贅述了,感興趣的能夠回到源碼之中繼續一探究竟。講完了聚合函數的實現,下一篇筆者就要繼續給探究聚合函數究竟在ClickHouse之中是如何和列存結合使用,並實現向量化的~~。
筆者是一個ClickHouse的初學者,對ClickHouse有興趣的同窗,也歡迎和筆者多多指教,交流。