DolphinDB支持動態加載外部插件,以擴展系統功能。插件用C++編寫,須要編譯成".so"或".dll"共享庫文件。本文着重介紹開發插件的方法和注意事項,並詳細介紹如下幾個具體場景的插件開發流程:html
DolphinDB的插件實現了能在腳本中調用的函數。一個插件函數多是運算符函數(Operator function),也多是系統函數(System function),它們的區別在於,前者接受的參數個數小於等於2,然後者的函數能夠接受任意個參數,並支持會話的訪問操做。git
開發一個運算符函數,須要編寫一個原型爲ConstantSP (const ConstantSP& a, const ConstantSP& b)
的C++函數。當函數參數個數爲2時,a
和b
分別爲插件函數的第一和第二個參數;當參數個數爲1時,b
是一個佔位符,沒有實際用途;當沒有參數時,a
和b
均爲佔位符。github
開發一個系統函數,須要編寫一個原型爲ConstantSP (Heap* heap, vector<ConstantSP>& args)
的C++函數。用戶在DolphinDB中調用插件函數時傳入的參數,都按順序保存在C++的向量args
中。heap
參數不須要用戶傳入。算法
函數原型中的ConstantSP
能夠表示絕大多數DolphinDB對象(標量、向量、矩陣、表,等等)。其餘經常使用的派生自它的變量類型有VectorSP
(向量)、TableSP
(表)等。sql
建立標量,能夠直接用new
語句建立頭文件ScalarImp.h
中聲明的類型對象,並將它賦值給一個ConstantSP
。ConstantSP
是一個通過封裝的智能指針,會在變量的引用計數爲0時自動釋放內存,所以,用戶不須要手動delete
已經建立的變量:數據庫
ConstantSP i = new Int(1); // 至關於1i ConstantSP d = new Date(2019, 3, 14); // 至關於2019.03.14 ConstantSP s = new String("DolphinDB"); // 至關於"DolphinDB" ConstantSP voidConstant = new Void(); // 建立一個void類型變量,經常使用於表示空的函數參數
頭文件Util.h
聲明瞭一系列函數,用於快速建立某個類型和格式的變量:編程
VectorSP v = Util::createVector(DT_INT, 10); // 建立一個初始長度爲10的int類型向量 v->setInt(0, 60); // 至關於v[0] = 60 VectorSP t = Util::createVector(DT_ANY, 0); // 建立一個初始長度爲0的any類型向量(元組) t->append(new Int(3)); // 至關於t.append!(3) t->get(0)->setInt(4); // 至關於t[0] = 4 // 這裏不能用t->setInt(0, 4),由於t是一個元組,setInt(0, 4)只對int類型的向量有效 ConstantSP seq = Util::createIndexVector(5, 10); // 至關於5..14 int seq0 = seq->getInt(0); // 至關於seq[0] ConstantSP mat = Util::createDoubleMatrix(5, 10);// 建立一個10行5列的double類型矩陣 mat->setColumn(3, seq); // 至關於mat[3] = seq
1.3.1 異常處理數組
插件開發時的異常拋出和處理,和通常C++開發中同樣,都經過throw
關鍵字拋出異常,try
語句塊處理異常。DolphinDB在頭文件Exceptions.h
中聲明瞭異常類型。緩存
插件函數若遇到運行時錯誤,通常拋出RuntimeException
。網絡
在插件開發時,一般會校驗函數參數,若是參數不符合要求,拋出一個IllegalArgumentException
。經常使用的參數校驗函數有:
ConstantSP->getType()
:返回變量的類型(int, char, date等等),DolphinDB的類型定義在頭文件Types.h
中。ConstantSP->getCategory()
:返回變量的類別,經常使用的類別有INTEGRAL(整數類型,包括int, char, short, long等)、FLOATING(浮點數類型,包括float, double等)、TEMPORAL(時間類型,包括time, date, datetime等)、LITERAL(字符串類型,包括string, symbol等),都定義在頭文件Types.h
中。ConstantSP->getForm()
:返回變量的格式(標量、向量、表等等),DolphinDB的格式定義在頭文件Types.h
中。ConstantSP->isVector()
:判斷變量是否爲向量。ConstantSP->isScalar()
:判斷變量是否爲標量。ConstantSP->isTable()
:判斷變量是否爲表。ConstantSP->isNumber()
:判斷變量是否爲數字類型。ConstantSP->isNull()
:判斷變量是否爲空值。ConstantSP->getInt()
:得到變量對應的整數值,經常使用於判斷邊界。ConstantSP->getString()
:得到變量對應的字符串。ConstantSP->size()
:得到變量的長度。更多參數校驗函數通常在頭文件CoreConcept.h
的Constant
類方法中。
1.3.2 參數校驗的範例
本節將開發一個插件函數用於求非負整數的階乘,返回一個long類型變量。
DolphinDB中long類型的最大值爲2^63 - 1
,能表示的階乘最大爲25!
,所以只有0~25
範圍內的參數是合法的。
#include "CoreConcept.h" #include "Exceptions.h" #include "ScalarImp.h" ConstantSP factorial(const ConstantSP &n, const ConstantSP &placeholder) { string syntax = "Usage: factorial(n). "; if (!n->isScalar() || n->getCategory() != INTEGRAL) throw IllegalArgumentException("factorial", syntax + "n must be an integral scalar."); int nValue = n->getInt(); if (nValue < 0 || nValue > 25) throw IllegalArgumentException("factorial", syntax + "n must be a non-negative integer less than 26."); long long fact = 1; for (int i = nValue; i > 0; i--) fact *= i; return new Long(fact); }
有時會須要調用DolphinDB的內置函數對數據進行處理。有些類已經定義了一些經常使用的內置函數做爲方法:
VectorSP v = Util::createIndexVector(1, 100); ConstantSP avg = v->avg(); // 至關於avg(v) ConstantSP sum2 = v->sum2(); // 至關於sum2(v) v->sort(false); // 至關於sort(v, false)
若是須要調用其它內置函數,插件函數的類型必須是系統函數。經過heap->currentSession()->getFunctionDef
函數得到一個內置函數,而後用call
方法調用它。若是該內置函數是運算符函數,應調用原型call(Heap, const ConstantSP&, const ConstantSP&)
;若是是系統函數,應調用原型call(Heap, vector<ConstantSP>&)
。如下是調用內置函數cumsum
的一個例子:
ConstantSP v = Util::createIndexVector(1, 100); v->setTemporary(false); // v的值可能在內置函數調用時被修改。若是不但願它被修改,應先調用setTemporary(false) FunctionDefSP cumsum = heap->currentSession()->getFunctionDef("cumsum"); ConstantSP result = cumsum->call(heap, v, new Void()); // 至關於cumsum(v),這裏的new Void()是一個佔位符,沒有實際用途
DolphinDB的特點之一在於它對時間序列有良好支持。
本章以編寫一個msum函數的插件爲例,介紹如何開發插件函數支持時間序列數據處理。
時間序列處理函數一般接受向量做爲參數,並對向量中的每一個元素進行計算處理。在本例中,msum
函數接受兩個參數:一個向量和一個窗口大小。它的原型是:
ConstantSP msum(const ConstantSP &X, const ConstantSP &window);
msum
函數的返回值是一個和輸入向量一樣長度的向量。本例爲簡便起見,假定返回值是一個double
類型的向量。能夠經過Util::createVector
函數預先爲返回值分配空間:
int size = X->size(); int windowSize = window->getInt(); ConstantSP result = Util::createVector(DT_DOUBLE, size);
在DolphinDB插件編寫時處理向量,能夠循環使用getDoubleConst
,getIntConst
等函數,批量得到必定長度的只讀數據,保存在相應類型的緩衝區中,從緩衝區中取得數據進行計算。這樣作的效率比循環使用getDouble
,getInt
等函數要高。本例爲簡便起見,統一使用getDoubleConst
,每次得到長度爲Util::BUF_SIZE
的數據。這個函數返回一個const double*
,指向緩衝區頭部:
double buf[Util::BUF_SIZE]; INDEX start = 0; while (start < size) { int len = std::min(Util::BUF_SIZE, size - start); const double *p = X->getDoubleConst(start, len, buf); for (int i = 0; i < len; i++) { double val = p[i]; // ... } start += len; }
在本例中,msum
將計算X
中長度爲windowSize
的窗口中全部數據的和。能夠用一個臨時變量tmpSum
記錄當前窗口的和,每當窗口移動時,只要給tmpSum
增長新窗口尾部的值,減去舊窗口頭部的值,就能計算獲得當前窗口中數據的和。爲了將計算值寫入result
,能夠循環用result->getDoubleBuffer
獲取一個可讀寫的緩衝區,寫完後使用result->setDouble
函數將緩衝區寫回數組。setDouble
函數會檢查給定的緩衝區地址和變量底層儲存的地址是否一致,若是一致就不會發生數據拷貝。在多數狀況下,用getDoubleBuffer
得到的緩衝區就是變量實際的存儲區域,這樣能減小數據拷貝,提升性能。
須要注意的是,DolphinDB用double
類型的最小值(已經定義爲宏DBL_NMIN
)表示double
類型的NULL
值,要專門判斷。
返回值的前windowSize - 1
個元素爲NULL
。能夠對X
中的前windowSize
個元素和以後的元素用兩個循環分別處理,前一個循環只計算累加,後一個循環執行加和減的操做。最終的實現以下:
ConstantSP msum(const ConstantSP &X, const ConstantSP &window) { INDEX size = X->size(); int windowSize = window->getInt(); ConstantSP result = Util::createVector(DT_DOUBLE, size); double buf[Util::BUF_SIZE]; double windowHeadBuf[Util::BUF_SIZE]; double resultBuf[Util::BUF_SIZE]; double tmpSum = 0.0; INDEX start = 0; while (start < windowSize) { int len = std::min(Util::BUF_SIZE, windowSize - start); const double *p = X->getDoubleConst(start, len, buf); double *r = result->getDoubleBuffer(start, len, resultBuf); for (int i = 0; i < len; i++) { if (p[i] != DBL_NMIN) // p[i] is not NULL tmpSum += p[i]; r[i] = DBL_NMIN; } result->setDouble(start, len, r); start += len; } result->setDouble(windowSize - 1, tmpSum); // 上一個循環多設置了一個NULL,填充爲tmpSum while (start < size) { int len = std::min(Util::BUF_SIZE, size - start); const double *p = X->getDoubleConst(start, len, buf); const double *q = X->getDoubleConst(start - windowSize, len, windowHeadBuf); double *r = result->getDoubleBuffer(start, len, resultBuf); for (int i = 0; i < len; i++) { if (p[i] != DBL_NMIN) tmpSum += p[i]; if (q[i] != DBL_NMIN) tmpSum -= q[i]; r[i] = tmpSum; } result->setDouble(start, len, r); start += len; } return result; }
在DolphinDB中,SQL的聚合函數一般接受一個或多個向量做爲參數,最終返回一個標量。在開發聚合函數的插件時,須要瞭解如何訪問向量中的元素。
DolphinDB中的向量有兩種存儲方式。一種是常規數組,數據在內存中連續存儲;另外一種是大數組,其中的數據分塊存儲。
本章將以編寫一個求幾何平均數的函數爲例,介紹如何開發聚合函數,重點關注數組中元素的訪問。
幾何平均數geometricMean
函數接受一個向量做爲參數。爲了防止溢出,通常採用其對數形式計算,即
geometricMean([x1, x2, ..., xn]) = exp((log(x1) + log(x2) + log(x3) + ... + log(xn))/n)
爲了實現這個函數的分佈式版本,能夠先開發聚合函數插件logSum
,用以計算某個分區上的數據的對數和,而後用defg
關鍵字定義一個Reduce函數,用mapr
關鍵字定義一個MapReduce函數。
在DolphinDB插件開發中,對數組的操做一般要考慮它是常規數組仍是大數組。能夠用isFastMode
函數判斷:
ConstantSP logSum(const ConstantSP &x, const ConstantSP &placeholder) { if (((VectorSP) x)->isFastMode()) { // ... } else { // ... } }
若是數組是常規數組,它在內存中連續存儲。能夠用getDataArray
函數得到它數據的指針。假定數據是以double
類型存儲的:
if (((VectorSP) x)->isFastMode()) { int size = x->size(); double *data = (double *) x->getDataArray(); double logSum = 0; for (int i = 0; i < size; i++) { if (data[i] != DBL_NMIN) // is not NULL logSum += std::log(data[i]); } return new Double(logSum); }
若是數據是大數組,它在內存中分塊存儲。能夠用getSegmentSize
得到每一個塊的大小,用getDataSegment
得到首個塊的地址。它返回一個二級指針,指向一個指針數組,這個數組中的每一個元素指向每一個塊的數據數組:
// ... else { int size = x->size(); int segmentSize = x->getSegmentSize(); double **segments = (double **) x->getDataSegment(); INDEX start = 0; int segmentId = 0; double logSum = 0; while (start < size) { double *block = segments[segmentId]; int blockSize = std::min(segmentSize, size - start); for (int i = 0; i < blockSize; i++) { if (block[i] != DBL_NMIN) // is not NULL logSum += std::log(block[i]); } start += blockSize; segmentId++; } return new Double(logSum); }
在實際開發中,數組的底層存儲不必定是double
類型。用戶須要考慮具體類型。本例採用了泛型編程統一處理不一樣類型,具體代碼參見附件。
一般須要實現一個聚合函數的非分佈式版本和分佈式版本,系統會基於哪一個版本更高效來選擇調用這個版本。
在DolphinDB中定義非分佈式的geometricMean函數:
def geometricMean(x) { return exp(logSum::logSum(x) count(x)) }
而後經過定義Map和Reduce函數,最終用mapr
定義分佈式的版本:
def geometricMeanMap(x) { return logSum::logSum(x) } defg geometricMeanReduce(myLogSum, myCount) { return exp(sum(myLogSum) sum(myCount)) } mapr geometricMean(x) { geometricMeanMap(x), count(x) -> geometricMeanReduce }
這樣就實現了geometricMean
函數。
若是是在單機環境中執行這個函數,只須要在執行的節點上加載插件。若是有數據位於遠程節點,須要在每個遠程節點加載插件。能夠手動在每一個節點執行loadPlugin
函數,也能夠用如下腳本快速在每一個節點上加載插件:
each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())
經過如下腳本建立一個分區表,驗證函數:
db = database("", VALUE, 1 2 3 4) t = table(take(1..4, 100) as id, rand(1.0, 100) as val) t0 = db.createPartitionedTable(t, `tb, `id) t0.append!(t) select geometricMean(val) from t0 group by id
3.3 隨機訪問大數組
能夠對大數組進行隨機訪問,但要通過下標計算。用getSegmentSizeInBit
函數得到塊大小的二進制位數,經過位運算得到塊的偏移量和塊內偏移量:
int segmentSizeInBit = x->getSegmentSizeInBit(); int segmentMask = (1 << segmentSizeInBit) - 1; double **segments = (double **) x->getDataSegment(); int index = 3000000; // 想要訪問的下標 double result = segments[index >> segmentSizeInBit][index & segmentMask]; // ^ 塊的偏移量 ^ 塊內偏移量
3.4 應該選擇哪一種方式訪問向量
上一章介紹了經過getDoubleConst
,getIntConst
等一族方法得到只讀緩衝區,以及經過getDoubleBuffer
,getIntBuffer
等一族方法得到可讀寫緩衝區,這兩種訪問向量的方法。本章介紹了經過getDataArray
和getDataSegment
方法直接訪問向量的底層存儲。在實際開發中,前一種方法更通用,通常應該選擇前一種方法。但在某些特別的場合(例如明確知道數據存儲在大數組中,且知道數據的類型),能夠採用第二種方法。
在DolphinDB中,Map-Reduce是執行分佈式算法的通用計算框架。DolphinDB提供了mr函數和imr函數,使用戶能經過腳本實現分佈式算法。而在編寫分佈式算法的插件時,使用的一樣是這兩個函數。本章主要介紹如何用C++語言編寫自定義的map, reduce等函數,並調用mr和imr兩個函數,最終實現分佈式計算。
本章將以mr
爲例,實現一個函數,求分佈式表中相應列名的全部列平均值,介紹編寫DolphinDB 分佈式算法插件的總體流程,及須要注意的技術細節。
在插件開發中,用戶自定義的map, reduce, final, term函數,能夠是運算符函數,也能夠是系統函數。
本例的map函數,對錶的一個分區內對應列名的列作計算,返回一個長度爲2的元組,分別包含數據的和,及數據非空元素的個數。具體實現以下:
ConstantSP columnAvgMap(Heap *heap, vector<ConstantSP> &args) { TableSP table = args[0]; ConstantSP colNames = args[1]; double sum = 0.0; int count = 0; for (int i = 0; i < colNames->size(); i++) { string colName = colNames->getString(i); VectorSP col = table->getColumn(colName); sum += col->sum()->getDouble(); count += col->count(); } ConstantSP result = Util::createVector(DT_ANY, 2); result->set(0, new Double(sum)); result->set(1, new Int(count)); return result; }
本例的reduce函數,是對map結果的相加。DolphinDB的內置函數add
就提供了這個功能,能夠用heap->currentSession()->getFunctionDef("add")
得到這個函數:
FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");
本例的final函數,是對reduce結果中的數據總和sum
和非空元素個數count
作除法,求得全部分區中對應列的平均數。具體實現以下:
ConstantSP columnAvgFinal(const ConstantSP &result, const ConstantSP &placeholder) { double sum = result->get(0)->getDouble(); int count = result->get(1)->getInt(); return new Double(sum / count); }
定義了map, reduce, final等函數後,將它們導出爲插件函數(在頭文件的函數聲明前加上extern "C"
,並在加載插件的文本文件中列出這些函數),而後經過heap->currentSession->getFunctionDef
獲取這些函數,就能以這些函數爲參數調用mr
函數。如:
FunctionDefSP mapFunc = Heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");
在本例中,map函數接受兩個參數table
和colNames
,但mr
只容許map函數有一個參數,所以須要以部分應用的形式調用map函數,能夠用Util::createPartialFunction
將它包裝爲部分應用,實現以下:
vector<ConstantSP> mapWithColNamesArgs {new Void(), colNames}; FunctionDefSP mapWithColNames = Util::createPartitalFunction(mapFunc, mapWithColNamesArgs);
用heap->currentSession()->getFunctionDef("mr")
得到系統內置函數mr
,調用mr->call
方法,就至關於在DolphinDB腳本中調用mr
函數。最後實現的columnAvg函數定義以下:
ConstantSP columnAvg(Heap *heap, vector<ConstantSP> &args) { ConstantSP ds = args[0]; ConstantSP colNames = args[1]; FunctionDefSP mapFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap"); vector<ConstantSP> mapWithColNamesArgs = {new Void(), colNames}; FunctionDefSP mapWithColNames = Util::createPartialFunction(mapFunc, mapWithColNamesArgs); // columnAvgMap{, colNames} FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add"); FunctionDefSP finalFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgFinal"); FunctionDefSP mr = heap->currentSession()->getFunctionDef("mr"); // mr(ds, columnAvgMap{, colNames}, add, columnAvgFinal) vector<ConstantSP> mrArgs = {ds, mapWithColNames, reduceFunc, finalFunc}; return mr->call(heap, mrArgs); }
若是是在單機環境中執行這個函數,只須要在執行的節點上加載插件。但若是有數據位於遠程節點,須要在每個遠程節點加載插件。能夠手動在每一個節點執行loadPlugin
函數,也能夠用如下腳本快速在每一個節點上加載插件:
each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())
加載插件後,用sqlDS
函數生成數據源,並調用函數:
n = 100 db = database("dfs://testColumnAvg", VALUE, 1..4) t = db.createPartitionedTable(table(10:0, `id`v1`v2, [INT,DOUBLE,DOUBLE]), `t, `id) t.append!(table(take(1..4, n) as id, rand(10.0, n) as v1, rand(100.0, n) as v2)) ds = sqlDS(<select * from t>) columnAvg::columnAvg(ds, `v1`v2)
在DolphinDB中,流數據訂閱端能夠經過一個handler函數處理收到的數據。訂閱數據能夠是一個數據表,或一個元組,由subsrciebeTable
函數的msgAsTable
參數決定。一般能夠用handler函數對流數據進行過濾、插入另外一張表等操做。
本章將編寫一個handler函數。它接受的消息類型是元組。另外接受兩個參數:一個是int類型的標量或向量indices
,表示元組中元素的下標,另外一個是一個表table
。它將元組中對應下標的列插入到表中。
向表中添加數據的接口是bool append(vector<ConstantSP>& values, INDEX& insertedRows, string& errMsg)
,若是插入成功,返回true
,並向insertedRows
中寫入插入的行數。不然返回false
,並在errMsg
中寫入出錯信息。插件的實現以下:
ConstantSP handler(Heap *heap, vector<ConstantSP> &args) { ConstantSP indices = args[0]; TableSP table = args[1]; ConstantSP msg = args[2]; vector<ConstantSP> msgToAppend; for (int i = 0; i < indices->size(); i++) { int index = indices->get(i); msgToAppend.push_back(msg->get(index)); } INDEX insertedRows; string errMsg; table->append(msgToAppend, insertedRows, errMsg); return new Void(); }
在實際應用中,可能須要知道插入出錯時的緣由。能夠引入頭文件Logger.h
,將出錯信息寫入日誌中。注意須要在編譯插件時加上宏定義-DLOGGING_LEVEL_2
:
// ... bool success = table->append(msgToAppend, insertedRows, errMsg); if (!success) LOG_ERR("Failed to append to table: ", errMsg);
能夠用如下腳本模擬流數據寫入,驗證handler函數:
loadPlugin("/path/to/PluginHandler.txt") share streamTable(10:0, `id`sym`timestamp, [INT,SYMBOL,TIMESTAMP]) as t0 t1 = table(10:0, `sym`timestamp, [SYMBOL,TIMESTAMP]) subscribeTable(, `t0, , , handler::handler{[1,2], t1}) t0.append!(table(1..100 as id, take(`a`b`c`d, 100) as symbol, now() + 1..100 as timestamp)) select * from t1
在爲第三方數據設計可擴展的接口插件時,有幾個須要關注的問題:
olsEx
, randomForestClassifier
等分佈式計算函數,也能夠調用mr
, imr
或ComputingModel.h
中定義的更底層的計算模型作並行計算。DolphinDB的內置函數sqlDS
就經過SQL表達式獲取數據源。在設計第三方數據接口時,一般須要實現一個獲取數據源的函數,它將大的文件分紅若干個部分,每部分都表示數據的一個子集,最後返回一個數據源的元組。數據源通常用一個Code object表示,是一個函數調用,它的參數是元數據,返回一個表。DataInputStream
和DataOutputStream
,這些接口封裝了數據壓縮,Endianness,IO類型(網絡,磁盤,buffer等)等細節,方便開發。此外還特別實現了針對多線程的IO實現,BlockFileInputStream
和BlockFileOutputStream
。這個實現有兩個優勢:本章將介紹一般須要實現的幾個函數,爲設計第三方數據接口提供一個簡單的範例。
假定本例中的數據儲存在平面文件數據庫,以二進制格式按行存儲,數據從文件頭部直接開始存儲。每行有四列,分別爲id(按有符號64位長整型格式存儲,8字節),symbol(按C字符串格式存儲,8字節),date(按BCD碼格式存儲,8字節),value(按IEEE 754標準的雙精度浮點數格式存儲,8字節),每行共32字節。如下是一行的例子:
這一行的十六進制表示爲:
0x 00 00 00 00 00 00 00 05 0x 49 42 4D 00 00 00 00 00 0x 02 00 01 09 00 03 01 03 0x 40 24 33 33 33 33 33 33
這個函數提取數據文件的表結構。在本例中,表結構是肯定的,不須要實際讀取文件。該函數提供了一個如何生成表結構的範例。它經過Util::createTable
函數建立一張結構表:
ConstantSP extractMyDataSchema(const ConstantSP &placeholderA, const ConstantSP &placeholderB) { ConstantSP colNames = Util::createVector(DT_STRING, 4); ConstantSP colTypes = Util::createVector(DT_STRING, 4); string names[] = {"id", "symbol", "date", "value"}; string types[] = {"LONG", "SYMBOL", "DATE", "DOUBLE"}; colNames->setString(0, 4, names); colTypes->setString(0, 4, types); vector<ConstantSP> schema = {colNames, colTypes}; vector<string> header = {"name", "type"}; return Util::createTable(header, schema); }
在實際開發中,可能須要以讀取文件頭等方式得到表結構。如何讀文件將在後面介紹。
loadMyData
函數讀取文件,並輸出一張DolphinDB表。給定一個文件的路徑,能夠經過Util::createBlockFileInputStream
建立一個輸入流,此後,可對這個流調用readBytes
函數讀取給定長度的字節,readBool
讀取下一個bool
值,readInt
讀取下一個int
值,等等。本例給loadMyData
函數設計的語法爲:loadMyData(path, [start], [length])
。除了接受文件路徑path
,還接受兩個int
類型的參數start
和length
,分別表示開始讀取的行數和須要讀取的總行數。createBlockFileInputStream
函數能夠經過參數決定開始讀取的字節數和須要讀取的總字節數:
ConstantSP loadMyData(Heap *heap, vector<ConstantSP> &args) { ConstantSP path = args[0]; long long fileLength = Util::getFileLength(path->getString()); size_t bytesPerRow = 32; int start = args.size() >= 2 ? args[1]->getInt() : 0; int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start; DataInputStreamSP inputStream = Util::createBlockFileInputStream(path->getString(), 0, fileLength, Util::BUF_SIZE, start * bytesPerRow, length * bytesPerRow); char buf[Util::BUF_SIZE]; size_t actualLength; while (true) { inputStream->readBytes(buf, Util::BUF_SIZE, actualLength); if (actualLength <= 0) break; // ... } }
在讀取數據時,一般將數據緩存到數組中,等待緩衝區滿後批量插入。例如,假定要讀取一個內容全爲char
類型字節的二進制文件,將它寫入一個char
類型的DolphinDB向量vec
。最後返回只由vec
一列組成的表:
char buf[Util::BUF_SIZE]; VectorSP vec = Util::createVector(DT_CHAR, 0); size_t actualLength; while (true) { inputStream->readBytes(buf, Util::BUF_SIZE, actualLength); if (actualLength <= 0) break; vec->appendChar(buf, actualLength); } vector<ConstantSP> cols = {vec}; vector<string> colNames = {"col0"}; return Util::createTable(colNames, cols);
本節的完整代碼請參考附件中的代碼。在實際開發中,加載數據的函數可能還會接受表結構參數schema
,按實際須要改變讀取的數據類型。
loadMyDataEx
函數loadMyData
函數老是將數據加載到內存,當數據文件很是龐大時,工做機的內存很容易成爲瓶頸。因此設計loadMyDataEx
函數解決這個問題。它經過邊導入邊保存的方式,把靜態的二進制文件以較爲平緩的數據流的方式保存爲DolphinDB的分佈式表,而不是採用所有導入內存再存爲分區表的方式,從而下降內存的使用需求。
loadMyDataEx
函數的參數能夠參考DolphinDB內置函數loadTextEx
。它的語法是:loadMyDataEx(dbHandle, tableName, partitionColumns, path, [start], [length])
。若是數據庫中的表存在,則將導入的數據添加到已有的表result
中。若是表不存在,則建立一張表result
,而後添加數據。最後返回這張表:
string dbPath = ((SystemHandleSP) db)->getDatabaseDir(); vector<ConstantSP> existsTableArgs = {new String(dbPath), tableName}; bool existsTable = heap->currentSession()->getFunctionDef("existsTable")->call(heap, existsTableArgs)->getBool(); // 至關於existsTable(dbPath, tableName) ConstantSP result; if (existsTable) { // 表存在,直接加載表 vector<ConstantSP> loadTableArgs = {db, tableName}; result = heap->currentSession()->getFunctionDef("loadTable")->call(heap, loadTableArgs); // 至關於loadTable(db, tableName) } else { // 表不存在,建立表 TableSP schema = extractMyDataSchema(new Void(), new Void()); ConstantSP dummyTable = DBFileIO::createEmptyTableFromSchema(schema); vector<ConstantSP> createTableArgs = {db, dummyTable, tableName, partitionColumns}; result = heap->currentSession()->getFunctionDef("createPartitionedTable")->call(heap, createTableArgs); // 至關於createPartitionedTable(db, dummyTable, tableName, partitionColumns) }
讀取數據並添加到表中的代碼實現採用了Pipeline框架。它的初始任務是一系列具備不一樣start
參數的loadMyData
函數調用,pipeline的follower
函數是一個部分應用append!{result}
,至關於把整個讀取數據的任務分紅若干份執行,調用loadMyData
分塊讀取後,將相應的數據經過append!
插入表中。核心部分的代碼以下:
int sizePerPartition = 16 * 1024 * 1024; int partitionNum = fileLength / sizePerPartition; vector<DistributedCallSP> tasks; FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false); int partitionStart = start; int partitionLength = length / partitionNum; for (int i = 0; i < partitionNum; i++) { if (i == partitionNum - 1) partitionLength = length - partitionLength * i; vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)}; ObjectSP call = Util::createRegularFunctionCall(func, partitionArgs); // 將會調用loadMyData(path, partitionStart, partitionLength) tasks.push_back(new DistributedCall(call, true)); partitionStart += partitionLength; } vector<ConstantSP> appendToResultArgs = {result}; FunctionDefSP appendToResult = Util::createPartialFunction(heap->currentSession()->getFunctionDef("append!"), appendToResultArgs); // 至關於append!{result} vector<FunctionDefSP> functors = {appendToResult}; PipelineStageExecutor executor(functors, false); executor.execute(heap, tasks);
本節的完整代碼請參考附件中的代碼。用Pipeline框架實現數據的分塊導入,只是一種思路。在具體開發時,能夠採用ComputingModel.h
中聲明的StaticStageExecutor
,也可使用Concurrent.h
中聲明的線程模型Thread
。實現方法有不少種,須要根據實際場景選擇。
myDataDS
函數myDataDS
函數返回一個數據源的元組。每一個數據源都是一個表示函數調用的Code object,能夠經過Util::createRegularFunctionCall
生成。執行這個對象能夠取得對應的數據。如下是基於loadMyData
函數產生數據源的一個例子:
ConstantSP myDataDS(Heap *heap, vector<ConstantSP> &args) { ConstantSP path = args[0]; long long fileLength = Util::getFileLength(path->getString()); size_t bytesPerRow = 32; int start = args.size() >= 2 ? args[1]->getInt() : 0; int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start; int sizePerPartition = 16 * 1024 * 1024; int partitionNum = fileLength / sizePerPartition; int partitionStart = start; int partitionLength = length / partitionNum; FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false); ConstantSP dataSources = Util::createVector(DT_ANY, partitionNum); for (int i = 0; i < partitionNum; i++) { if (i == partitionNum - 1) partitionLength = length - partitionLength * i; vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)}; ObjectSP code = Util::createRegularFunctionCall(func, partitionArgs); // 將會調用loadMyData(path, partitionStart, partitionLength) dataSources->set(i, new DataSource(code)); } return dataSources; }
教程中的完整代碼見https://github.com/dolphindb/Tu