乾貨丨時序數據庫DolphinDB插件開發教程

DolphinDB支持動態加載外部插件,以擴展系統功能。插件用C++編寫,須要編譯成".so"或".dll"共享庫文件。本文着重介紹開發插件的方法和注意事項,並詳細介紹如下幾個具體場景的插件開發流程:html

  • 如何開發支持時間序列數據處理的插件函數
  • 如何開發用於處理分佈式SQL的聚合函數
  • 如何開發支持新的分佈式算法的插件函數
  • 如何開發支持流數據處理的插件函數
  • 如何開發支持外部數據源的插件函數

1. 如何開發插件

1.1 基本概念

DolphinDB的插件實現了能在腳本中調用的函數。一個插件函數多是運算符函數(Operator function),也多是系統函數(System function),它們的區別在於,前者接受的參數個數小於等於2,然後者的函數能夠接受任意個參數,並支持會話的訪問操做。git

開發一個運算符函數,須要編寫一個原型爲ConstantSP (const ConstantSP& a, const ConstantSP& b)的C++函數。當函數參數個數爲2時,ab分別爲插件函數的第一和第二個參數;當參數個數爲1時,b是一個佔位符,沒有實際用途;當沒有參數時,ab均爲佔位符。github

開發一個系統函數,須要編寫一個原型爲ConstantSP (Heap* heap, vector<ConstantSP>& args)的C++函數。用戶在DolphinDB中調用插件函數時傳入的參數,都按順序保存在C++的向量args中。heap參數不須要用戶傳入。算法

函數原型中的ConstantSP能夠表示絕大多數DolphinDB對象(標量、向量、矩陣、表,等等)。其餘經常使用的派生自它的變量類型有VectorSP(向量)、TableSP(表)等。sql

1.2 建立變量

建立標量,能夠直接用new語句建立頭文件ScalarImp.h中聲明的類型對象,並將它賦值給一個ConstantSPConstantSP是一個通過封裝的智能指針,會在變量的引用計數爲0時自動釋放內存,所以,用戶不須要手動delete已經建立的變量:數據庫

ConstantSP i = new Int(1);                 // 至關於1i
ConstantSP d = new Date(2019, 3, 14);      // 至關於2019.03.14
ConstantSP s = new String("DolphinDB");    // 至關於"DolphinDB"
ConstantSP voidConstant = new Void();      // 建立一個void類型變量,經常使用於表示空的函數參數

頭文件Util.h聲明瞭一系列函數,用於快速建立某個類型和格式的變量:編程

VectorSP v = Util::createVector(DT_INT, 10);     // 建立一個初始長度爲10的int類型向量
v->setInt(0, 60);                                // 至關於v[0] = 60

VectorSP t = Util::createVector(DT_ANY, 0);      // 建立一個初始長度爲0的any類型向量(元組)
t->append(new Int(3));                           // 至關於t.append!(3)
t->get(0)->setInt(4);                            // 至關於t[0] = 4
// 這裏不能用t->setInt(0, 4),由於t是一個元組,setInt(0, 4)只對int類型的向量有效

ConstantSP seq = Util::createIndexVector(5, 10); // 至關於5..14
int seq0 = seq->getInt(0);                       // 至關於seq[0]

ConstantSP mat = Util::createDoubleMatrix(5, 10);// 建立一個10行5列的double類型矩陣
mat->setColumn(3, seq);                          // 至關於mat[3] = seq

1.3 異常處理和參數校驗

1.3.1 異常處理數組

插件開發時的異常拋出和處理,和通常C++開發中同樣,都經過throw關鍵字拋出異常,try語句塊處理異常。DolphinDB在頭文件Exceptions.h中聲明瞭異常類型。緩存

插件函數若遇到運行時錯誤,通常拋出RuntimeException網絡

在插件開發時,一般會校驗函數參數,若是參數不符合要求,拋出一個IllegalArgumentException。經常使用的參數校驗函數有:

  • ConstantSP->getType():返回變量的類型(int, char, date等等),DolphinDB的類型定義在頭文件Types.h中。
  • ConstantSP->getCategory():返回變量的類別,經常使用的類別有INTEGRAL(整數類型,包括int, char, short, long等)、FLOATING(浮點數類型,包括float, double等)、TEMPORAL(時間類型,包括time, date, datetime等)、LITERAL(字符串類型,包括string, symbol等),都定義在頭文件Types.h中。
  • ConstantSP->getForm():返回變量的格式(標量、向量、表等等),DolphinDB的格式定義在頭文件Types.h中。
  • ConstantSP->isVector():判斷變量是否爲向量。
  • ConstantSP->isScalar():判斷變量是否爲標量。
  • ConstantSP->isTable():判斷變量是否爲表。
  • ConstantSP->isNumber():判斷變量是否爲數字類型。
  • ConstantSP->isNull():判斷變量是否爲空值。
  • ConstantSP->getInt():得到變量對應的整數值,經常使用於判斷邊界。
  • ConstantSP->getString():得到變量對應的字符串。
  • ConstantSP->size():得到變量的長度。

更多參數校驗函數通常在頭文件CoreConcept.hConstant類方法中。

1.3.2 參數校驗的範例

本節將開發一個插件函數用於求非負整數的階乘,返回一個long類型變量。

DolphinDB中long類型的最大值爲2^63 - 1,能表示的階乘最大爲25!,所以只有0~25範圍內的參數是合法的。

#include "CoreConcept.h"
#include "Exceptions.h"
#include "ScalarImp.h"

ConstantSP factorial(const ConstantSP &n, const ConstantSP &placeholder) {
    string syntax = "Usage: factorial(n). ";
    if (!n->isScalar() || n->getCategory() != INTEGRAL)
        throw IllegalArgumentException("factorial", syntax + "n must be an integral scalar.");
    int nValue = n->getInt();
    if (nValue < 0 || nValue > 25)
        throw IllegalArgumentException("factorial", syntax + "n must be a non-negative integer less than 26.");

    long long fact = 1;
    for (int i = nValue; i > 0; i--)
        fact *= i;
    return new Long(fact);
}

1.4 調用DolphinDB內置函數

有時會須要調用DolphinDB的內置函數對數據進行處理。有些類已經定義了一些經常使用的內置函數做爲方法:

VectorSP v = Util::createIndexVector(1, 100);
ConstantSP avg = v->avg();     // 至關於avg(v)
ConstantSP sum2 = v->sum2();   // 至關於sum2(v)
v->sort(false);                // 至關於sort(v, false)

若是須要調用其它內置函數,插件函數的類型必須是系統函數。經過heap->currentSession()->getFunctionDef函數得到一個內置函數,而後用call方法調用它。若是該內置函數是運算符函數,應調用原型call(Heap, const ConstantSP&, const ConstantSP&);若是是系統函數,應調用原型call(Heap, vector<ConstantSP>&)。如下是調用內置函數cumsum的一個例子:

ConstantSP v = Util::createIndexVector(1, 100);
v->setTemporary(false);                                   // v的值可能在內置函數調用時被修改。若是不但願它被修改,應先調用setTemporary(false)
FunctionDefSP cumsum = heap->currentSession()->getFunctionDef("cumsum");
ConstantSP result = cumsum->call(heap, v, new Void());    // 至關於cumsum(v),這裏的new Void()是一個佔位符,沒有實際用途

2. 如何開發支持時間序列數據處理的插件函數

DolphinDB的特點之一在於它對時間序列有良好支持。

本章以編寫一個msum函數的插件爲例,介紹如何開發插件函數支持時間序列數據處理。

時間序列處理函數一般接受向量做爲參數,並對向量中的每一個元素進行計算處理。在本例中,msum函數接受兩個參數:一個向量和一個窗口大小。它的原型是:

ConstantSP msum(const ConstantSP &X, const ConstantSP &window);

msum函數的返回值是一個和輸入向量一樣長度的向量。本例爲簡便起見,假定返回值是一個double類型的向量。能夠經過Util::createVector函數預先爲返回值分配空間:

int size = X->size();
int windowSize = window->getInt();
ConstantSP result = Util::createVector(DT_DOUBLE, size);

在DolphinDB插件編寫時處理向量,能夠循環使用getDoubleConst,getIntConst等函數,批量得到必定長度的只讀數據,保存在相應類型的緩衝區中,從緩衝區中取得數據進行計算。這樣作的效率比循環使用getDouble,getInt等函數要高。本例爲簡便起見,統一使用getDoubleConst,每次得到長度爲Util::BUF_SIZE的數據。這個函數返回一個const double*,指向緩衝區頭部:

double buf[Util::BUF_SIZE];

INDEX start = 0;
while (start < size) {
    int len = std::min(Util::BUF_SIZE, size - start);
    const double *p = X->getDoubleConst(start, len, buf);
    for (int i = 0; i < len; i++) {
        double val = p[i];
        // ...
    }
    start += len;
}

在本例中,msum將計算X中長度爲windowSize的窗口中全部數據的和。能夠用一個臨時變量tmpSum記錄當前窗口的和,每當窗口移動時,只要給tmpSum增長新窗口尾部的值,減去舊窗口頭部的值,就能計算獲得當前窗口中數據的和。爲了將計算值寫入result,能夠循環用result->getDoubleBuffer獲取一個可讀寫的緩衝區,寫完後使用result->setDouble函數將緩衝區寫回數組。setDouble函數會檢查給定的緩衝區地址和變量底層儲存的地址是否一致,若是一致就不會發生數據拷貝。在多數狀況下,用getDoubleBuffer得到的緩衝區就是變量實際的存儲區域,這樣能減小數據拷貝,提升性能。

須要注意的是,DolphinDB用double類型的最小值(已經定義爲宏DBL_NMIN)表示double類型的NULL值,要專門判斷。

返回值的前windowSize - 1個元素爲NULL。能夠對X中的前windowSize個元素和以後的元素用兩個循環分別處理,前一個循環只計算累加,後一個循環執行加和減的操做。最終的實現以下:

ConstantSP msum(const ConstantSP &X, const ConstantSP &window) {
    INDEX size = X->size();
    int windowSize = window->getInt();
    ConstantSP result = Util::createVector(DT_DOUBLE, size);

    double buf[Util::BUF_SIZE];
    double windowHeadBuf[Util::BUF_SIZE];
    double resultBuf[Util::BUF_SIZE];
    double tmpSum = 0.0;

    INDEX start = 0;
    while (start < windowSize) {
        int len = std::min(Util::BUF_SIZE, windowSize - start);
        const double *p = X->getDoubleConst(start, len, buf);
        double *r = result->getDoubleBuffer(start, len, resultBuf);
        for (int i = 0; i < len; i++) {
            if (p[i] != DBL_NMIN)    // p[i] is not NULL
                tmpSum += p[i];
            r[i] = DBL_NMIN;
        }
        result->setDouble(start, len, r);
        start += len;
    }

    result->setDouble(windowSize - 1, tmpSum);    // 上一個循環多設置了一個NULL,填充爲tmpSum

    while (start < size) {
        int len = std::min(Util::BUF_SIZE, size - start);
        const double *p = X->getDoubleConst(start, len, buf);
        const double *q = X->getDoubleConst(start - windowSize, len, windowHeadBuf);
        double *r = result->getDoubleBuffer(start, len, resultBuf);
        for (int i = 0; i < len; i++) {
            if (p[i] != DBL_NMIN)
                tmpSum += p[i];
            if (q[i] != DBL_NMIN)
                tmpSum -= q[i];
            r[i] = tmpSum;
        }
        result->setDouble(start, len, r);
        start += len;
    }

    return result;
}

3. 如何開發用於處理分佈式SQL的聚合函數

在DolphinDB中,SQL的聚合函數一般接受一個或多個向量做爲參數,最終返回一個標量。在開發聚合函數的插件時,須要瞭解如何訪問向量中的元素。

DolphinDB中的向量有兩種存儲方式。一種是常規數組,數據在內存中連續存儲;另外一種是大數組,其中的數據分塊存儲。

本章將以編寫一個求幾何平均數的函數爲例,介紹如何開發聚合函數,重點關注數組中元素的訪問。

3.1 聚合函數範例

幾何平均數geometricMean函數接受一個向量做爲參數。爲了防止溢出,通常採用其對數形式計算,即

geometricMean([x1, x2, ..., xn])
    = exp((log(x1) + log(x2) + log(x3) + ... + log(xn))/n)

爲了實現這個函數的分佈式版本,能夠先開發聚合函數插件logSum,用以計算某個分區上的數據的對數和,而後用defg關鍵字定義一個Reduce函數,用mapr關鍵字定義一個MapReduce函數。

在DolphinDB插件開發中,對數組的操做一般要考慮它是常規數組仍是大數組。能夠用isFastMode函數判斷:

ConstantSP logSum(const ConstantSP &x, const ConstantSP &placeholder) {
    if (((VectorSP) x)->isFastMode()) {
        // ...
    }
    else {
        // ...
    }
}

若是數組是常規數組,它在內存中連續存儲。能夠用getDataArray函數得到它數據的指針。假定數據是以double類型存儲的:

if (((VectorSP) x)->isFastMode()) {
    int size = x->size();
    double *data = (double *) x->getDataArray();
    
    double logSum = 0;
    for (int i = 0; i < size; i++) {
        if (data[i] != DBL_NMIN)    // is not NULL
            logSum += std::log(data[i]);
    }
    return new Double(logSum);
}

若是數據是大數組,它在內存中分塊存儲。能夠用getSegmentSize得到每一個塊的大小,用getDataSegment得到首個塊的地址。它返回一個二級指針,指向一個指針數組,這個數組中的每一個元素指向每一個塊的數據數組:

// ...
else {
    int size = x->size();
    int segmentSize = x->getSegmentSize();
    double **segments = (double **) x->getDataSegment();
    INDEX start = 0;
    int segmentId = 0;
    double logSum = 0;

    while (start < size) {
        double *block = segments[segmentId];
        int blockSize = std::min(segmentSize, size - start);
        for (int i = 0; i < blockSize; i++) {
            if (block[i] != DBL_NMIN)    // is not NULL
                logSum += std::log(block[i]);
        }
        start += blockSize;
        segmentId++;
    }
    return new Double(logSum);
}

在實際開發中,數組的底層存儲不必定是double類型。用戶須要考慮具體類型。本例採用了泛型編程統一處理不一樣類型,具體代碼參見附件。

3.2 在DolphinDB中調用函數

一般須要實現一個聚合函數的非分佈式版本和分佈式版本,系統會基於哪一個版本更高效來選擇調用這個版本。

在DolphinDB中定義非分佈式的geometricMean函數:

def geometricMean(x) {
	return exp(logSum::logSum(x) \ count(x))
}

而後經過定義Map和Reduce函數,最終用mapr定義分佈式的版本:

def geometricMeanMap(x) {
	return logSum::logSum(x)
}

defg geometricMeanReduce(myLogSum, myCount) {
    return exp(sum(myLogSum) \ sum(myCount))
}

mapr geometricMean(x) { geometricMeanMap(x), count(x) -> geometricMeanReduce }

這樣就實現了geometricMean函數。

若是是在單機環境中執行這個函數,只須要在執行的節點上加載插件。若是有數據位於遠程節點,須要在每個遠程節點加載插件。能夠手動在每一個節點執行loadPlugin函數,也能夠用如下腳本快速在每一個節點上加載插件:

each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())

經過如下腳本建立一個分區表,驗證函數:

db = database("", VALUE, 1 2 3 4)
t = table(take(1..4, 100) as id, rand(1.0, 100) as val)
t0 = db.createPartitionedTable(t, `tb, `id)
t0.append!(t)
select geometricMean(val) from t0 group by id

3.3 隨機訪問大數組

能夠對大數組進行隨機訪問,但要通過下標計算。用getSegmentSizeInBit函數得到塊大小的二進制位數,經過位運算得到塊的偏移量和塊內偏移量:

int segmentSizeInBit = x->getSegmentSizeInBit();
int segmentMask = (1 << segmentSizeInBit) - 1;
double **segments = (double **) x->getDataSegment();

int index = 3000000;    // 想要訪問的下標

double result = segments[index >> segmentSizeInBit][index & segmentMask];
//                       ^ 塊的偏移量                ^ 塊內偏移量

3.4 應該選擇哪一種方式訪問向量

上一章介紹了經過getDoubleConst,getIntConst等一族方法得到只讀緩衝區,以及經過getDoubleBuffer,getIntBuffer等一族方法得到可讀寫緩衝區,這兩種訪問向量的方法。本章介紹了經過getDataArraygetDataSegment方法直接訪問向量的底層存儲。在實際開發中,前一種方法更通用,通常應該選擇前一種方法。但在某些特別的場合(例如明確知道數據存儲在大數組中,且知道數據的類型),能夠採用第二種方法。

4. 如何開發支持新的分佈式算法的插件函數

在DolphinDB中,Map-Reduce是執行分佈式算法的通用計算框架。DolphinDB提供了mr函數和imr函數,使用戶能經過腳本實現分佈式算法。而在編寫分佈式算法的插件時,使用的一樣是這兩個函數。本章主要介紹如何用C++語言編寫自定義的map, reduce等函數,並調用mr和imr兩個函數,最終實現分佈式計算。

4.1 分佈式算法範例

本章將以mr爲例,實現一個函數,求分佈式表中相應列名的全部列平均值,介紹編寫DolphinDB 分佈式算法插件的總體流程,及須要注意的技術細節。

在插件開發中,用戶自定義的map, reduce, final, term函數,能夠是運算符函數,也能夠是系統函數。

本例的map函數,對錶的一個分區內對應列名的列作計算,返回一個長度爲2的元組,分別包含數據的和,及數據非空元素的個數。具體實現以下:

ConstantSP columnAvgMap(Heap *heap, vector<ConstantSP> &args) {
    TableSP table = args[0];
    ConstantSP colNames = args[1];
    double sum = 0.0;
    int count = 0;
    
    for (int i = 0; i < colNames->size(); i++) {
        string colName = colNames->getString(i);
        VectorSP col = table->getColumn(colName);
        sum += col->sum()->getDouble();
        count += col->count();
    }

    ConstantSP result = Util::createVector(DT_ANY, 2);
    result->set(0, new Double(sum));
    result->set(1, new Int(count));
    return result;
}

本例的reduce函數,是對map結果的相加。DolphinDB的內置函數add就提供了這個功能,能夠用heap->currentSession()->getFunctionDef("add")得到這個函數:

FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");

本例的final函數,是對reduce結果中的數據總和sum和非空元素個數count作除法,求得全部分區中對應列的平均數。具體實現以下:

ConstantSP columnAvgFinal(const ConstantSP &result, const ConstantSP &placeholder) {
    double sum = result->get(0)->getDouble();
    int count = result->get(1)->getInt();
    
    return new Double(sum / count);
}

定義了map, reduce, final等函數後,將它們導出爲插件函數(在頭文件的函數聲明前加上extern "C",並在加載插件的文本文件中列出這些函數),而後經過heap->currentSession->getFunctionDef獲取這些函數,就能以這些函數爲參數調用mr函數。如:

FunctionDefSP mapFunc = Heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");

在本例中,map函數接受兩個參數tablecolNames,但mr只容許map函數有一個參數,所以須要以部分應用的形式調用map函數,能夠用Util::createPartialFunction將它包裝爲部分應用,實現以下:

vector<ConstantSP> mapWithColNamesArgs {new Void(), colNames};
FunctionDefSP mapWithColNames = Util::createPartitalFunction(mapFunc, mapWithColNamesArgs);

heap->currentSession()->getFunctionDef("mr")得到系統內置函數mr,調用mr->call方法,就至關於在DolphinDB腳本中調用mr函數。最後實現的columnAvg函數定義以下:

ConstantSP columnAvg(Heap *heap, vector<ConstantSP> &args) {
    ConstantSP ds = args[0];
    ConstantSP colNames = args[1];

    FunctionDefSP mapFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");
    vector<ConstantSP> mapWithColNamesArgs = {new Void(), colNames};
    FunctionDefSP mapWithColNames = Util::createPartialFunction(mapFunc, mapWithColNamesArgs);    // columnAvgMap{, colNames}
    FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");
    FunctionDefSP finalFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgFinal");

    FunctionDefSP mr = heap->currentSession()->getFunctionDef("mr");    // mr(ds, columnAvgMap{, colNames}, add, columnAvgFinal)
    vector<ConstantSP> mrArgs = {ds, mapWithColNames, reduceFunc, finalFunc};
    return mr->call(heap, mrArgs);
}

4.2 在DolphinDB中調用函數

若是是在單機環境中執行這個函數,只須要在執行的節點上加載插件。但若是有數據位於遠程節點,須要在每個遠程節點加載插件。能夠手動在每一個節點執行loadPlugin函數,也能夠用如下腳本快速在每一個節點上加載插件:

each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())

加載插件後,用sqlDS函數生成數據源,並調用函數:

n = 100
db = database("dfs://testColumnAvg", VALUE, 1..4)
t = db.createPartitionedTable(table(10:0, `id`v1`v2, [INT,DOUBLE,DOUBLE]), `t, `id)
t.append!(table(take(1..4, n) as id, rand(10.0, n) as v1, rand(100.0, n) as v2))

ds = sqlDS(<select * from t>)
columnAvg::columnAvg(ds, `v1`v2)

5.如何開發支持流數據處理的插件函數

在DolphinDB中,流數據訂閱端能夠經過一個handler函數處理收到的數據。訂閱數據能夠是一個數據表,或一個元組,由subsrciebeTable函數的msgAsTable參數決定。一般能夠用handler函數對流數據進行過濾、插入另外一張表等操做。

本章將編寫一個handler函數。它接受的消息類型是元組。另外接受兩個參數:一個是int類型的標量或向量indices,表示元組中元素的下標,另外一個是一個表table。它將元組中對應下標的列插入到表中。

向表中添加數據的接口是bool append(vector<ConstantSP>& values, INDEX& insertedRows, string& errMsg),若是插入成功,返回true,並向insertedRows中寫入插入的行數。不然返回false,並在errMsg中寫入出錯信息。插件的實現以下:

ConstantSP handler(Heap *heap, vector<ConstantSP> &args) {
    ConstantSP indices = args[0];
    TableSP table = args[1];
    ConstantSP msg = args[2];

    vector<ConstantSP> msgToAppend;
    for (int i = 0; i < indices->size(); i++) {
        int index = indices->get(i);
        msgToAppend.push_back(msg->get(index));
    }

    INDEX insertedRows;
    string errMsg;
    table->append(msgToAppend, insertedRows, errMsg);
    return new Void();
}

在實際應用中,可能須要知道插入出錯時的緣由。能夠引入頭文件Logger.h,將出錯信息寫入日誌中。注意須要在編譯插件時加上宏定義-DLOGGING_LEVEL_2

// ...
bool success = table->append(msgToAppend, insertedRows, errMsg);
if (!success)
    LOG_ERR("Failed to append to table: ", errMsg);

能夠用如下腳本模擬流數據寫入,驗證handler函數:

loadPlugin("/path/to/PluginHandler.txt")

share streamTable(10:0, `id`sym`timestamp, [INT,SYMBOL,TIMESTAMP]) as t0
t1 = table(10:0, `sym`timestamp, [SYMBOL,TIMESTAMP])
subscribeTable(, `t0, , , handler::handler{[1,2], t1})

t0.append!(table(1..100 as id, take(`a`b`c`d, 100) as symbol, now() + 1..100 as timestamp))

select * from t1

6.如何開發支持外部數據源的插件函數

在爲第三方數據設計可擴展的接口插件時,有幾個須要關注的問題:

  1. 數據源(Data source)。數據源是一個特殊的數據對象,包含了數據實體的元描述,執行一個數據源能得到數據實體,多是表、矩陣、向量等等。用戶能夠提供數據源調用olsEx, randomForestClassifier等分佈式計算函數,也能夠調用mr, imrComputingModel.h中定義的更底層的計算模型作並行計算。DolphinDB的內置函數sqlDS就經過SQL表達式獲取數據源。在設計第三方數據接口時,一般須要實現一個獲取數據源的函數,它將大的文件分紅若干個部分,每部分都表示數據的一個子集,最後返回一個數據源的元組。數據源通常用一個Code object表示,是一個函數調用,它的參數是元數據,返回一個表。
  2. 結構(Schema)。表的結構描述了表的列數,每一列的列名和數據類型。第三方接口一般須要實現一個函數,快速得到數據的表結構,以便用戶在這個結構的基礎上調整列名和列的數據類型。
  3. IO問題。在多核多CPU的環境中,IO可能成爲瓶頸。DolphinDB提供了抽象的IO接口,DataInputStreamDataOutputStream,這些接口封裝了數據壓縮,Endianness,IO類型(網絡,磁盤,buffer等)等細節,方便開發。此外還特別實現了針對多線程的IO實現,BlockFileInputStreamBlockFileOutputStream。這個實現有兩個優勢:
  • 實現計算和IO並行。A線程在處理數據的時候,後臺線程在異步幫A線程預讀取後面須要的數據。
  • 避免了多線程的磁盤競爭。當線程個數增長的時候,若是並行往同一個磁盤上讀寫,性能會急劇降低。這個實現,會對同一個磁盤的讀寫串行化,從而提升吞吐量。

本章將介紹一般須要實現的幾個函數,爲設計第三方數據接口提供一個簡單的範例。

6.1 數據格式描述

假定本例中的數據儲存在平面文件數據庫,以二進制格式按行存儲,數據從文件頭部直接開始存儲。每行有四列,分別爲id(按有符號64位長整型格式存儲,8字節),symbol(按C字符串格式存儲,8字節),date(按BCD碼格式存儲,8字節),value(按IEEE 754標準的雙精度浮點數格式存儲,8字節),每行共32字節。如下是一行的例子:

這一行的十六進制表示爲:

0x 00 00 00 00 00 00 00 05
0x 49 42 4D 00 00 00 00 00
0x 02 00 01 09 00 03 01 03
0x 40 24 33 33 33 33 33 33

6.2 extractMyDataSchema函數

這個函數提取數據文件的表結構。在本例中,表結構是肯定的,不須要實際讀取文件。該函數提供了一個如何生成表結構的範例。它經過Util::createTable函數建立一張結構表:

ConstantSP extractMyDataSchema(const ConstantSP &placeholderA, const ConstantSP &placeholderB) {
    ConstantSP colNames = Util::createVector(DT_STRING, 4);
    ConstantSP colTypes = Util::createVector(DT_STRING, 4);
    string names[] = {"id", "symbol", "date", "value"};
    string types[] = {"LONG", "SYMBOL", "DATE", "DOUBLE"};
    colNames->setString(0, 4, names);
    colTypes->setString(0, 4, types);

    vector<ConstantSP> schema = {colNames, colTypes};
    vector<string> header = {"name", "type"};

    return Util::createTable(header, schema);
}

在實際開發中,可能須要以讀取文件頭等方式得到表結構。如何讀文件將在後面介紹。

6.3 loadMyData函數

loadMyData函數讀取文件,並輸出一張DolphinDB表。給定一個文件的路徑,能夠經過Util::createBlockFileInputStream建立一個輸入流,此後,可對這個流調用readBytes函數讀取給定長度的字節,readBool讀取下一個bool值,readInt讀取下一個int值,等等。本例給loadMyData函數設計的語法爲:loadMyData(path, [start], [length])。除了接受文件路徑path,還接受兩個int類型的參數startlength,分別表示開始讀取的行數和須要讀取的總行數。createBlockFileInputStream函數能夠經過參數決定開始讀取的字節數和須要讀取的總字節數:

ConstantSP loadMyData(Heap *heap, vector<ConstantSP> &args) {
    ConstantSP path = args[0];
    long long fileLength = Util::getFileLength(path->getString());
    size_t bytesPerRow = 32;

    int start = args.size() >= 2 ? args[1]->getInt() : 0;
    int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start;

    DataInputStreamSP inputStream = Util::createBlockFileInputStream(path->getString(), 0, fileLength, Util::BUF_SIZE, start * bytesPerRow, length * bytesPerRow);
    char buf[Util::BUF_SIZE];
    size_t actualLength;
    
    while (true) {
        inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);
        if (actualLength <= 0)
            break;
        // ...
    }
}

在讀取數據時,一般將數據緩存到數組中,等待緩衝區滿後批量插入。例如,假定要讀取一個內容全爲char類型字節的二進制文件,將它寫入一個char類型的DolphinDB向量vec。最後返回只由vec一列組成的表:

char buf[Util::BUF_SIZE];
VectorSP vec = Util::createVector(DT_CHAR, 0);
size_t actualLength;

while (true) {
    inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);
    if (actualLength <= 0)
        break;
    vec->appendChar(buf, actualLength);
}

vector<ConstantSP> cols = {vec};
vector<string> colNames = {"col0"};

return Util::createTable(colNames, cols);

本節的完整代碼請參考附件中的代碼。在實際開發中,加載數據的函數可能還會接受表結構參數schema,按實際須要改變讀取的數據類型。

6.4 loadMyDataEx函數

loadMyData函數老是將數據加載到內存,當數據文件很是龐大時,工做機的內存很容易成爲瓶頸。因此設計loadMyDataEx函數解決這個問題。它經過邊導入邊保存的方式,把靜態的二進制文件以較爲平緩的數據流的方式保存爲DolphinDB的分佈式表,而不是採用所有導入內存再存爲分區表的方式,從而下降內存的使用需求。

loadMyDataEx函數的參數能夠參考DolphinDB內置函數loadTextEx。它的語法是:loadMyDataEx(dbHandle, tableName, partitionColumns, path, [start], [length])。若是數據庫中的表存在,則將導入的數據添加到已有的表result中。若是表不存在,則建立一張表result,而後添加數據。最後返回這張表:

string dbPath = ((SystemHandleSP) db)->getDatabaseDir();
vector<ConstantSP> existsTableArgs = {new String(dbPath), tableName};
bool existsTable = heap->currentSession()->getFunctionDef("existsTable")->call(heap, existsTableArgs)->getBool();    // 至關於existsTable(dbPath, tableName)
ConstantSP result;

if (existsTable) {    // 表存在,直接加載表
    vector<ConstantSP> loadTableArgs = {db, tableName};
    result = heap->currentSession()->getFunctionDef("loadTable")->call(heap, loadTableArgs);    // 至關於loadTable(db, tableName)
}
else {    // 表不存在,建立表
    TableSP schema = extractMyDataSchema(new Void(), new Void());
    ConstantSP dummyTable = DBFileIO::createEmptyTableFromSchema(schema);
    vector<ConstantSP> createTableArgs = {db, dummyTable, tableName, partitionColumns};
    result = heap->currentSession()->getFunctionDef("createPartitionedTable")->call(heap, createTableArgs);    // 至關於createPartitionedTable(db, dummyTable, tableName, partitionColumns)
}

讀取數據並添加到表中的代碼實現採用了Pipeline框架。它的初始任務是一系列具備不一樣start參數的loadMyData函數調用,pipeline的follower函數是一個部分應用append!{result},至關於把整個讀取數據的任務分紅若干份執行,調用loadMyData分塊讀取後,將相應的數據經過append!插入表中。核心部分的代碼以下:

int sizePerPartition = 16 * 1024 * 1024;
int partitionNum = fileLength / sizePerPartition;
vector<DistributedCallSP> tasks;
FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false);
int partitionStart = start;
int partitionLength = length / partitionNum;
for (int i = 0; i < partitionNum; i++) {
    if (i == partitionNum - 1)
        partitionLength = length - partitionLength * i;
    vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)};
    ObjectSP call = Util::createRegularFunctionCall(func, partitionArgs);    // 將會調用loadMyData(path, partitionStart, partitionLength)
    tasks.push_back(new DistributedCall(call, true));
    partitionStart += partitionLength;
}

vector<ConstantSP> appendToResultArgs = {result};
FunctionDefSP appendToResult = Util::createPartialFunction(heap->currentSession()->getFunctionDef("append!"), appendToResultArgs);    // 至關於append!{result}
vector<FunctionDefSP> functors = {appendToResult};
PipelineStageExecutor executor(functors, false);
executor.execute(heap, tasks);

本節的完整代碼請參考附件中的代碼。用Pipeline框架實現數據的分塊導入,只是一種思路。在具體開發時,能夠採用ComputingModel.h中聲明的StaticStageExecutor,也可使用Concurrent.h中聲明的線程模型Thread。實現方法有不少種,須要根據實際場景選擇。

6.5 myDataDS函數

myDataDS函數返回一個數據源的元組。每一個數據源都是一個表示函數調用的Code object,能夠經過Util::createRegularFunctionCall生成。執行這個對象能夠取得對應的數據。如下是基於loadMyData函數產生數據源的一個例子:

ConstantSP myDataDS(Heap *heap, vector<ConstantSP> &args) {
    ConstantSP path = args[0];
    long long fileLength = Util::getFileLength(path->getString());
    size_t bytesPerRow = 32;

    int start = args.size() >= 2 ? args[1]->getInt() : 0;
    int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start;

    int sizePerPartition = 16 * 1024 * 1024;
    int partitionNum = fileLength / sizePerPartition;

    int partitionStart = start;
    int partitionLength = length / partitionNum;

    FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false);
    ConstantSP dataSources = Util::createVector(DT_ANY, partitionNum);
    for (int i = 0; i < partitionNum; i++) {
        if (i == partitionNum - 1)
            partitionLength = length - partitionLength * i;
        vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)};
        ObjectSP code = Util::createRegularFunctionCall(func, partitionArgs);    // 將會調用loadMyData(path, partitionStart, partitionLength)
        dataSources->set(i, new DataSource(code));
    }
    return dataSources;
}

 

教程中的完整代碼見https://github.com/dolphindb/Tu

相關文章
相關標籤/搜索