乾貨丨DolphinDB通用計算教程

DolphinDB不只能夠分佈式地存儲數據,並且對分佈式計算有良好支持。在DolphinDB中,用戶能夠用系統提供的通用分佈式計算框架,經過腳本實現高效的分佈式算法,而不需關注具體的底層實現。本文將對DolphinDB通用計算框架中的重要概念和相關函數做出詳細解釋,並提供豐富的具體使用場景和例子。html


1. 數據源

數據源(Data Source)是DolphinDB的通用計算框架中的基本概念。它是一種特殊類型的數據對象,是對數據的元描述。經過執行數據源,用戶能夠得到諸如表、矩陣、向量等數據實體。在DolphinDB的分佈式計算框架中,輕量級的數據源對象而不是龐大的數據實體被傳輸到遠程結點,以用於後續的計算,這大大減小了網絡流量。算法

在DolphinDB中,用戶經常使用sqlDS函數,基於一個SQL表達式產生數據源。這個函數並不直接對錶進行查詢,而是返回一個或多個SQL子查詢的元語句,即數據源。以後,用戶可使用Map-Reduce框架,傳入數據源和計算函數,將任務分發到每一個數據源對應的結點,並行地完成計算,而後將結果彙總。sql

關於幾種經常使用的得到數據源的方法,本文的3.1, 3.2, 3.3, 3.4節中會詳細介紹。數據庫


2. Map-Reduce框架

Map-Reduce函數是DolphinDB通用分佈式計算框架的核心功能。緩存

2.1 mr函數網絡

DolphinDB的Map-Reduce函數mr的語法是mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true]),它可接受一組數據源和一個mapFunc函數做爲參數。它會將計算任務分發到每一個數據源所在的結點,經過mapFunc對每一個數據源中的數據進行處理。可選參數reduceFunc會將mapFunc的返回值兩兩作計算,獲得的結果再與第三個mapFunc的返回值計算,如此累積計算,將mapFunc的結果彙總。若是有M個map調用,reduce函數將被調用M-1次。可選參數finalFunc對reduceFunc的返回值作進一步處理。app

官方文檔中有一個經過mr執行分佈式最小二乘線性迴歸的例子。本文經過如下例子,展現如何用一個mr調用實現對分佈式表每一個分區中的數據隨機採樣十分之一的功能:框架

// 建立數據庫和DFS表
db = database("dfs://sampleDB", VALUE, `a`b`c`d)
t = db.createPartitionedTable(table(100000:0, `sym`val, [SYMBOL,DOUBLE]), `tb, `sym)
n = 3000000
t.append!(table(rand(`a`b`c`d, n) as sym, rand(100.0, n) as val))

// 定義map函數
def sampleMap(t) {
    sampleRate = 0.1
    rowNum = t.rows()
    sampleIndex = (0..(rowNum - 1)).shuffle()[0:int(rowNum * sampleRate)]
    return t[sampleIndex]
}

ds = sqlDS(<select * from t>)              // 建立數據源
res = mr(ds, sampleMap, , unionAll)        // 執行計算

在以上的例子中,用戶自定義的sampleMap函數接受一個表(即數據源中的數據)做爲參數,隨機返回其中十分之一的行。本例的mr函數沒有reduceFunc參數,所以將各個map函數的返回值放在一個元組中,傳給finalFunc,即unionAll。unionAll將map函數返回的多張表合併成一個順序分區的分佈式表。dom

2.2 imr函數機器學習

DolphinDB database提供了基於Map-Reduce方法的迭代計算函數imr。相比mr它能支持迭代計算,每次迭代使用上一次迭代的結果和輸入數據集,於是能支持更多複雜算法的實現。迭代計算須要模型參數的初始值和終止標準。它的語法是imr(ds, initValue, mapFunc, [reduceFunc], [finalFunc], terminateFunc, [carryover=false])其中initValue參數是第一次迭代的初值,mapFunc參數是一個函數,接受的參數包括數據源實體,和前一次迭代中最終函數的輸出。對於第一次迭代,它是用戶給出的初始值。imr中的參數與mr函數相似。finalFunc函數接受兩個參數,第一個參數是前一次迭代中最終函數的輸出。對於第一次迭代,它是用戶給出的初始值。第二個參數是調用reduce函數後的輸出。terminateFunc參數用於判斷迭代是否停止。它接受兩個參數。第一個是前一次迭代中reduce函數的輸出,第二個是當前迭代中reduce函數的輸出。若是它返回true,迭代將會停止。carryover參數表示map函數調用是否生成一個傳遞給下一次map函數調用的對象。若是carryover爲true,那麼map函數有3個參數而且最後一個參數爲攜帶的對象,同時map函數的輸出結果是一個元組,最後一個元素爲攜帶的對象。在第一次迭代中,攜帶的對象爲NULL。

官方文檔中有一個經過imr計算分佈式數據的中位數的例子。本文將提供一個更加複雜的例子,即用牛頓法實現邏輯迴歸(Logistic Regression)的計算,展現imr在機器學習算法中的應用。

def myLrMap(t, lastFinal, yColName, xColNames, intercept) {
	placeholder, placeholder, theta = lastFinal
    if (intercept)
        x = matrix(t[xColNames], take(1.0, t.rows()))
    else
        x = matrix(t[xColNames])
    xt = x.transpose()
    y = t[yColName]
    scores = dot(x, theta)
    p = 1.0 \ (1.0 + exp(-scores))
    err = y - p
    w = p * (1.0 - p)
    logLik = (y * log(p) + (1.0 - y) * log(1.0 - p)).flatten().sum()
    grad = xt.dot(err)                   // 計算梯度向量
    wx = each(mul{w}, x)
    hessian = xt.dot(wx)                 // 計算Hessian矩陣
    return [logLik, grad, hessian]
}

def myLrFinal(lastFinal, reduceRes) {
    placeholder, placeholder, theta = lastFinal
    logLik, grad, hessian = reduceRes
    deltaTheta = solve(hessian, grad)    // deltaTheta等於hessian^-1 * grad,至關於解方程hessian * deltaTheta = grad
    return [logLik, grad, theta + deltaTheta]
}

def myLrTerm(prev, curr, tol) {
	placeholder, grad, placeholder = curr
	return grad.flatten().abs().max() <= tol
}

def myLr(ds, yColName, xColNames, intercept, initTheta, tol) {
    logLik, grad, theta = imr(ds, [0, 0, initTheta], myLrMap{, , yColName, xColNames, intercept}, +, myLrFinal, myLrTerm{, , tol})
    return theta
}

在上述例子中,map函數爲數據源中的數據計算在當前的係數下的梯度向量和Hessian矩陣;reduce函數將map的結果相加,至關於求出整個數據集的梯度向量和Hessian矩陣;final函數經過最終的梯度向量和Hessian矩陣對係數進行優化,完成一輪迭代;terminate函數的判斷標準是本輪迭代中梯度向量中最大份量的絕對值是否大於參數tol。

這個例子還能夠經過數據源轉換操做,進一步優化以提升性能,具體參見3.6節。

做爲常用的分析工具,分佈式邏輯迴歸已經在DolphinDB中做爲內置函數實現。內置版本(logisticRegression)提供更多功能。


3. 數據源相關函數

DolphinDB提供瞭如下經常使用的方法獲取數據源:

3.1 sqlDS函數

sqlDS函數根據輸入的SQL元代碼建立一個數據源列表。 若是SQL查詢中的數據表有n個分區,sqlDS會生成n個數據源。 若是SQL查詢不包含任何分區表,sqlDS將返回只包含一個數據源的元組。

sqlDS是將SQL表達式轉換成數據源的高效方法。用戶只須要提供SQL表達式,而不須要關注具體的數據分佈,就能利用返回的數據源執行分佈式算法。下面提供的例子,展現了利用sqlDS對DFS表中的數據執行olsEx分佈式最小二乘迴歸的方法。

// 建立數據庫和DFS表
db = database("dfs://olsDB", VALUE, `a`b`c`d)
t = db.createPartitionedTable(table(100000:0, `sym`x`y, [SYMBOL,DOUBLE,DOUBLE]), `tb, `sym)
n = 3000000
t.append!(table(rand(`a`b`c`d, n) as sym, 1..n + norm(0.0, 1.0, n) as x, 1..n + norm(0.0, 1.0, n) as y))

ds = sqlDS(<select x, y from t>)    // 建立數據源
olsEx(ds, `y, `x)                   // 執行計算


3.2 repartitionDS函數

sqlDS的數據源是系統自動根據數據的分區而生成的。有時用戶須要對數據源作一些限制,例如,在獲取數據時,從新指定數據的分區以減小計算量,或者,只須要一部分分區的數據。repartitionDS函數就提供了從新劃分數據源的功能。

函數repartitionDS根據輸入的SQL元代碼和列名、分區類型、分區方案等,爲元代碼生成通過從新分區的新數據源。

如下代碼提供了一個repartitionDS的例子。在這個例子中,DFS表t中有字段deviceId, time, temperature,分別爲symbol, datetime和double類型,數據庫採用雙層分區,第一層對time按VALUE分區,一天一個分區;第二層對deviceId按HASH分紅20個區。

現須要按deviceId字段聚合查詢95百分位的temperature。若是直接寫查詢select percentile(temperature,95) from t group by deviceID,因爲percentile函數沒有Map-Reduce實現,這個查詢將沒法完成。

一個方案是將所需字段所有加載到本地,計算95百分位,但當數據量過大時,計算資源可能不足。repartitionDS提供了一個解決方案:將表基於deviceId按其原有分區方案HASH從新分區,每一個新的分區對應原始表中一個HASH分區的全部數據。經過mr函數在每一個新的分區中計算95百分位的temperature,最後將結果合併彙總。

// 建立數據庫
deviceId = "device" + string(1..100000)
db1 = database("", VALUE, 2019.06.01..2019.06.30)
db2 = database("", HASH, INT:20)
db = database("dfs://repartitionExample", COMPO, [db1, db2])

// 建立DFS表
t = db.createPartitionedTable(table(100000:0, `deviceId`time`temperature, [SYMBOL,DATETIME,DOUBLE]), `tb, `deviceId`time)
n = 3000000
t.append!(table(rand(deviceId, n) as deviceId, 2019.06.01T00:00:00 + rand(86400 * 10, n) as time, 60 + norm(0.0, 5.0, n) as temperature))

// 從新分區
ds = repartitionDS(<select deviceId, temperature from t>, `deviceId)
// 執行計算
res = mr(ds, def(t) { return select percentile(temperature, 95) from t group by deviceId}, , unionAll{, false})

這個計算的結果正確性可以保證,由於repartitionDS產生的新分區基於deviceId的原有分區,能確保其中各個數據源的deviceId兩兩不重合,所以只須要將各分區計算結果合併就能取得正確結果。


3.3 textChunkDS函數

textChunkDS函數能夠將一個文本文件分紅若干個數據源,以便對一個文本文件所表示的數據執行分佈式計算。它的語法是:textChunkDS(filename, chunkSize, [delimiter=','], [schema])。其中,filename, delimiter, schema這些參數與loadText函數的參數相同。而chunkSize參數表示每一個數據源中數據的大小,單位爲MB,能夠取1到2047的整數。

如下例子是官方文檔中olsEx例子的另外一種實現。它經過textChunkDS函數從文本文件中生成若干數據源,每一個數據源的大小爲100MB,對生成的數據源通過轉換後,執行olsEx函數,計算最小二乘參數:

ds = textChunkDS("c:/DolphinDB/Data/USPrices.csv", 100)
ds.transDS!(USPrices -> select VOL\SHROUT as VS, abs(RET) as ABS_RET, RET, log(SHROUT*(BID+ASK)\2) as SBA from USPrices where VOL>0)
rs=olsEx(ds, `VS, `ABS_RET`SBA, true, 2)

其中的數據源轉換操做transDS!,能夠參考3.6節。


3.4 第三方數據源提供的數據源接口

一些加載第三方數據的插件,例如HDF5,提供了產生數據源的接口。用戶能夠直接對它們返回的數據源執行分佈式算法,而無需先將第三方數據導入內存或保存爲磁盤或分佈式表。

DolphinDB的HDF5插件提供了hdf5DS函數,用戶能夠經過設置其dsNum參數,指定須要生成的數據源個數。如下例子從HDF5文件中生成10個數據源,並經過Map-Reduce框架對結果的第1列求樣本方差:

ds = hdf5::hdf5DS("large_file.h5", "large_table", , 10)

def varMap(t) {
    column = t.col(0)
    return [column.sum(), column.sum2(), column.count()]
}

def varFinal(result) {
    sum, sum2, count = result
    mu = sum \ count
    populationVar = sum2 \ count - mu * mu
    sampleVar = populationVar * count \ (count - 1)
    return sampleVar
}

sampleVar = mr(ds, varMap, +, varFinal)


3.5 數據源緩存

數據源能夠有0,1或多個位置。位置爲0的數據源是本地數據源。 在多個位置的狀況下,這些位置互爲備份。系統會隨機選擇一個位置執行分佈式計算。當數據源被指示緩存數據對象時,系統會選擇咱們上次成功檢索數據的位置。

用戶能夠指示系統對數據源進行緩存或清理緩存。對於迭代計算算法(例如機器學習算法),數據緩存能夠大大提升計算性能。當系統內存不足時,緩存數據將被清除。若是發生這種狀況,系統能夠恢復數據,由於數據源包含全部元描述和數據轉換函數。

和數據源緩存相關的函數有:

  • cacheDS!:指示系統緩存數據源
  • clearcacheDS!:指示系統在下次執行數據源以後清除緩存
  • cacheDSNow:當即執行並緩存數據源,並返回緩存行的總數
  • clearCacheDSNow:當即清除數據源和緩存


3.6 數據源轉換

一個數據源對象還能夠包含多個數據轉換函數,用以進一步處理所檢索到的數據。系統會依次執行這些數據轉換函數,一個函數的輸出做爲下一個函數的輸入(和惟一的輸入)。

將數據轉換函數包含在數據源中,一般比在覈心計算操做(即map函數)中對數據源進行轉換更有效。若是檢索到的數據僅須要一次計算時,沒有性能差別,但它對於具備緩存數據對象的數據源的迭代計算會形成巨大的差別。若是轉換操做在覈心計算操做中,則每次迭代都須要執行轉換; 若是轉換操做在數據源中,則它們只被執行一次。transDS!函數提供了轉換數據源的功能。

例如,執行迭代機器學習函數randomForestRegressor以前,用戶可能須要手動填充數據的缺失值(固然,DolphinDB的隨機森林算法已經內置了缺失值處理)。此時,能夠用transDS!對數據源進行以下處理:對每個特徵列,用該列的平均值填充缺失值。假設表中的列x0, x1, x2, x3爲自變量,列y爲因變量,如下是實現方法:

ds = sqlDS(<select x0, x1, x2, x3, y from t>)
ds.transDS!(def (mutable t) {
    update t set x0 = nullFill(x0, avg(x0)), x1 = nullFill(x1, avg(x1)), x2 = nullFill(x2, avg(x2)), x3 = nullFill(x3, avg(x3))
    return t
})

randomForestRegressor(ds, `y, `x0`x1`x2`x3)

另外一個轉換數據源的例子是2.2節提到的邏輯迴歸的腳本實現。在2.2節的實現中,map函數調用中包含了從數據源的表中取出對應列,轉換成矩陣的操做,這意味着每一輪迭代都會發生這些操做。而實際上,每輪迭代都會使用一樣的輸入矩陣,這個轉換步驟只須要調用一次。所以,能夠用transDS!將數據源轉換成一個包含x, xt和y矩陣的三元組:

def myLrTrans(t, yColName, xColNames, intercept) {
    if (intercept)
        x = matrix(t[xColNames], take(1.0, t.rows()))
    else
        x = matrix(t[xColNames])
    xt = x.transpose()
    y = t[yColName]
    return [x, xt, y]
}

def myLrMap(input, lastFinal) {
    x, xt, y = input
    placeholder, placeholder, theta = lastFinal
    // 以後的計算和2.2節相同
}

// myLrFinal和myLrTerm函數和2.2節相同

def myLr(mutable ds, yColName, xColNames, intercept, initTheta, tol) {
    ds.transDS!(myLrTrans{, yColName, xColNames, intercept})
    logLik, grad, theta = imr(ds, [0, 0, initTheta], myLrMap, +, myLrFinal, myLrTerm{, , tol})
    return theta
}
相關文章
相關標籤/搜索