乾貨丨時序數據庫DolphinDB數據導入教程

企業在使用大數據分析平臺時,首先須要把海量數據從多個數據源遷移到大數據平臺中。node


在導入數據前,咱們須要理解 DolphinDB database 的基本概念和特色。git

DolphinDB數據表按存儲介質分爲3種類型:github

  • 內存表:數據只保存在本節點內存,存取速度最快,可是節點關閉後,數據將會丟失。
  • 本地磁盤表:數據保存在本地磁盤上,即便節點重啓,也能夠方便地經過腳本把數據加載到內存中。
  • 分佈式表:數據在物理上分佈在不一樣的節點,經過DolphinDB的分佈式計算引擎,邏輯上仍然能夠像本地表同樣作統一查詢。

DolphinDB數據表按是否分區分爲2種類型:數據庫

  • 普通表
  • 分區表

在傳統的數據庫中,分區是針對數據表的,即同一個數據庫中的每一個數據表能夠有不一樣的分區方案;而DolphinDB的分區是針對數據庫的,即一個數據庫只能使用一種分區方案。若是兩個表的分區方案不一樣,它們不能放在同一個數據庫中。服務器


DolphinDB提供了3種靈活的數據導入方法:app

  • 經過CSV文本文件導入
  • 經過HDF5文件導入
  • 經過ODBC導入


1.經過CSV文本文件導入

經過CSV文件進行數據中轉是比較通用的數據遷移方式。DolphinDB提供了loadTextploadTextloadTextEx三個函數來導入CSV文件。下面咱們經過一個示例CSV文件candle_201801.csv來講明這3個函數的用法。分佈式

1.1 loadTextide

語法:loadText(filename, [delimiter=','], [schema])函數

參數:性能

filename是文件名。

delimiterschema都是可選參數。

delimiter用於指定不一樣字段的分隔符,默認是「,」。

schema用於數據導入後每一個字段的數據類型,它是一個table類型。DolphinDB提供了字段類型自動識別功能,可是某些狀況下系統自動識別的數據類型不符合需求,好比咱們在導入示例CSVcandle_201801.csv時,volume字段會被識別成INT類型,實際上咱們須要LONG類型,這時就須要使用schema參數。

建立schema table的腳本:

nameCol = `symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
typeCol = [SYMBOL,SYMBOL,INT,DATE,DATE,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,LONG]
schemaTb = table(nameCol as name,typeCol as type)

當表的字段很是多時,建立schema table的腳本會十分冗長。爲了不這個問題,DolphinDB提供了extractTextSchema函數,它能夠從文本文件中提取表的結構,咱們只需修改須要指定的字段類型便可。

dataFilePath = "/home/data/candle_201801.csv"
schemaTb=extractTextSchema(dataFilePath)
update schemaTb set type=`LONG where name=`volume        
tt=loadText(dataFilePath,,schemaTb)


1.2 ploadText

ploadText把數據文件做爲分區表並行加載到內存中,語法和loadText徹底相同,可是ploadText的速度更快。ploadText主要用於快速載入大文件,它在設計上充分利用了多個core來並行載入文件,並行程度取決於服務器自己core數量和節點的localExecutors配置。

下面咱們對比loadText和ploadText的性能。

首先,經過腳本生成一個4G左右的CSV文件:

filePath = "/home/data/testFile.csv"
appendRows = 100000000
dateRange = 2010.01.01..2018.12.30
ints = rand(100, appendRows)
symbols = take(string('A'..'Z'), appendRows)
dates = take(dateRange, appendRows)
floats = rand(float(100), appendRows)
times = 00:00:00.000 + rand(86400000, appendRows)
t = table(ints as int, symbols as symbol, dates as date, floats as float, times as time)
t.saveText(filePath)

分別使用loadText和ploadText來導入文件,該節點是4核8線程的CPU。

timer loadText(filePath);
//Time elapsed: 39728.393 ms
timer ploadText(filePath);
//Time elapsed: 10685.838 ms

結果顯示,ploadText的性能差很少是loadText的4倍。


1.3 loadTextEx

語法:loadTextEx(dbHandle, tableName, [partitionColumns], fileName, [delimiter=','], [schema])

參數:

dbHandle是數據庫句柄。

tableName是保存數據的分佈式表的表名。

partitionColumnsdelimiterschema是可選參數。

當分區方案不是順序分區時,須要指定partitionColumns,表示分區列。

fileName表示導入文件的名稱。

delimiter用於指定不一樣字段的分隔符,默認是「,」。

schema用於數據導入後每一個字段的數據類型,它是一個table類型。

loadText函數老是把數據導入到內存,當數據文件很是龐大時,工做機的內存很容易成爲瓶頸。loadTextEx能夠很好地解決這個問題,它經過邊導入邊保存的方式,把靜態的CSV文件以較爲平緩的數據流的方式「另存爲」DolphinDB的分佈式表,而不是採用所有導入內存再存爲分區表的方式,大大下降了內存的使用需求。

首先建立用於保存數據的分佈式表:

dataFilePath = "/home/data/candle_201801.csv"
tb = loadText(dataFilePath)
db=database("dfs://dataImportCSVDB",VALUE,2018.01.01..2018.01.31)  
db.createPartitionedTable(tb, "cycle", "tradingDay")

而後將文件導入分佈式表:

loadTextEx(db, "cycle", "tradingDay", dataFilePath)

當須要使用數據作分析的時候,經過loadTable函數將分區元數據先載入內存,在實際執行查詢的時候,DolphinDB會按需加載數據到內存。

tb = database("dfs://dataImportCSVDB").loadTable("cycle")


2. 經過HDF5文件導入

HDF5是一種比CSV更高效的二進制數據文件格式,在數據分析領域普遍使用。DolphinDB也支持經過HDF5格式文件導入數據。

DolphinDB經過HDF5插件來訪問HDF5文件,插件提供瞭如下方法:

  • hdf5::ls : 列出h5文件中全部 Group 和 Dataset 對象。
  • hdf5::lsTable :列出h5文件中全部 Dataset 對象。
  • hdf5::hdf5DS :返回h5文件中 Dataset 的元數據。
  • hdf5::loadHdf5 :將h5文件導入內存表。
  • hdf5::loadHdf5Ex :將h5文件導入分區表。
  • hdf5::extractHdf5Schema :從h5文件中提取表結構。

調用插件方法時須要在方法前面提供namespace,好比調用loadHdf5時hdf5::loadHdf5,若是不想每次調用都使用namespace,可使用use關鍵字:

use hdf5
loadHdf5(filePath,tableName)

要使用DolphinDB的插件,首先須要下載HDF5插件,再將插件部署到節點的plugins目錄下,在使用插件以前須要先加載,使用下面的腳本:

loadPlugin("plugins/hdf5/PluginHdf5.txt")

HDF5文件的導入與CSV文件大同小異,好比咱們要將示例HDF5文件candle_201801.h5導入,它包含一個Dataset:candle_201801,那麼最簡單的導入方式以下:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
tmpTB = hdf5::loadHdf5(dataFilePath,datasetName)

若是須要指定數據類型導入可使用hdf5::extractHdf5Schema,腳本以下:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
schema=hdf5::extractHdf5Schema(dataFilePath,datasetName)
update schema set type=`LONG where name=`volume        
tt=hdf5::loadHdf5(dataFilePath,datasetName,schema)

若是HDF5文件很是龐大,工做機內存沒法支持全量載入,可使用hdf5::loadHdf5Ex方式來載入數據。

首先建立用於保存數據的分佈式表:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
dfsPath = "dfs://dataImportHDF5DB"
tb = hdf5::loadHdf5(dataFilePath,datasetName)
db=database(dfsPath,VALUE,2018.01.01..2018.01.31)  
db.createPartitionedTable(tb, "cycle", "tradingDay")

而後將HDF5文件經過hdf5::loadHdf5Ex函數導入:

hdf5::loadHdf5Ex(db, "cycle", "tradingDay", dataFilePath,datasetName)


3. 經過ODBC接口導入

DolphinDB支持ODBC接口鏈接第三方數據庫,從數據庫中直接將表讀取成DolphinDB的內存數據表。使用DolphinDB提供的ODBC插件能夠方便地從ODBC支持的數據庫中遷移數據至DolphinDB中。

ODBC插件提供瞭如下四個方法用於操做第三方數據源數據:

  • odbc::connect : 開啓鏈接。
  • odbc::close : 關閉鏈接。
  • odbc::query : 根據給定的SQL語句查詢數據並返回到DolphinDB的內存表。
  • odbc::execute : 在第三方數據庫內執行給定的SQL語句,不返回數據。

在使用ODBC插件前,須要先安裝ODBC驅動,請參考ODBC插件使用教程

下面以鏈接 SQL Server 做爲實例,現有數據庫的具體配置爲:

server:172.18.0.15

默認端口:1433

鏈接用戶名:sa

密碼:123456

數據庫名稱: SZ_TAQ

數據庫表選2016年1月1日的數據,表名candle_201801,字段與CSV文件相同。

要使用ODBC插件鏈接SQL Server數據庫,首先第一步是下載插件解壓並拷貝plugins\odbc目錄下全部文件到DolphinDB Server的plugins/odbc目錄下,經過下面的腳本完成插件初始化:

//載入插件
loadPlugin("plugins/odbc/odbc.cfg")
//鏈接 SQL Server
conn=odbc::connect("Driver=ODBC Driver 17 for SQL Server;Server=172.18.0.15;Database=SZ_TAQ;Uid=sa;
Pwd=123456;")

在導入數據以前,先建立分佈式磁盤數據庫用於保存數據:

//從SQL Server中取到表結構做爲DolphinDB導入表的模板
tb = odbc::query(conn,"select top 1 * from candle_201801")
db=database("dfs://dataImportODBC",VALUE,2018.01.01..2018.01.31)
db.createPartitionedTable(tb, "cycle", "tradingDay")

從SQL Server中導入數據並保存成DolphinDB分區表:

data = odbc::query(conn,"select * from candle_201801")
tb = database("dfs://dataImportODBC").loadTable("cycle")
tb.append!(data);

經過ODBC導入數據避免了文件導出導入的過程,並且經過DolphinDB的定時做業機制,它還能夠做爲時序數據定時同步的數據通道。


4. 金融數據導入案例

下面以證券市場日K線圖數據文件導入做爲示例,數據以CSV文件格式保存在磁盤上,共有10年的數據,按年度分目錄保存,一共大約100G的數據,路徑示例以下:

2008

---- 000001.csv

---- 000002.csv

---- 000003.csv

---- 000004.csv

---- ...

2009

...

2018

每一個文件的結構都是一致的,如圖所示:

edcac7665c7b5c30c4316c15c573e93e.png


4.1 分區規劃

要導入數據以前,首先要作好數據的分區規劃,這涉及到兩個方面的考量:

  • 肯定分區字段。
  • 肯定分區的粒度。

首先根據平常的查詢語句執行頻率,咱們採用trading和symbol兩個字段進行組合範圍(RANGE)分區,經過對經常使用檢索字段分區,能夠極大的提高數據檢索和分析的效率。

接下來要作的是分別定義兩個分區的粒度。

現有數據的時間跨度是從2008-2018年,因此這裏按照年度對數據進行時間上的劃分,在規劃時間分區時要考慮爲後續進入的數據留出足夠的空間,因此這裏把時間範圍設置爲2008-2030年。

yearRange =date(2008.01M + 12*0..22)

這裏股票代碼有幾千個,若是對股票代碼按值(VALUE)分區,那麼每一個分區只是幾兆大小,而分區數量則不少。分佈式系統在執行查詢時,會將查詢語句分紅多個子任務分發到不一樣的分區執行,因此按值分區方式會致使任務數量很是多,而任務執行時間極短,致使系統在管理任務上花費的時間反而大於任務自己的執行時間,這樣的分區方式明顯是不合理的。這裏咱們按照範圍將全部股票代碼均分紅100個區間,每一個區間做爲一個分區,最終分區的大小約100M左右。 考慮到後期有新的股票數據進來,因此增長了一個虛擬的代碼999999,跟最後一個股票代碼組成一個分區,用來保存後續新增股票的數據。

經過下面的腳本獲得 symbol 字段的分區範圍:

//遍歷全部的年度目錄,去重整理出股票代碼清單,並經過cutPoint分紅100個區間
symbols = array(SYMBOL, 0, 100)
yearDirs = files(rootDir)[`filename]
for(yearDir in yearDirs){
	path = rootDir + "/" + yearDir
	symbols.append!(files(path)[`filename].upper().strReplace(".CSV",""))
}
//去重並增長擴容空間:
symbols = symbols.distinct().sort!().append!("999999");
//均分紅100份
symRanges = symbols.cutPoints(100)
經過下述腳本定義兩個維度組合(COMPO)分區,建立Database和分區表:
columns=`symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
types =  [SYMBOL,SYMBOL,INT,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG]

dbDate=database("", RANGE, yearRange)
dbID=database("", RANGE, symRanges)
db = database(dbPath, COMPO, [dbDate, dbID])

pt=db.createPartitionedTable(table(1000000:0,columns,types), tableName, `tradingDay`symbol)

須要注意的是,分區是DolphinDB存儲數據的最小單位,DolphinDB對分區的寫入操做是獨佔式的,當任務並行進行的時候,須要避免多任務同時向一個分區寫入數據。本案例中每一年的數據交給一個單獨任務去作,各任務操做的數據邊界沒有重合,因此不可能發生多任務寫入同一分區的狀況。


4.2 導入數據

數據導入腳本的主要思路很簡單,就是經過循環目錄樹,將全部的CSV文件逐個讀取並寫入到分佈式數據庫表dfs://SAMPLE_TRDDB中,可是具體導入過程當中仍是會有不少細節問題。

首先碰到的問題是,CSV文件中保存的數據格式與DolphinDB內部的數據格式存在差別,好比time字段,文件裏是以「9390100000」表示精確到毫秒的時間,若是直接讀入會被識別成數值類型,而不是time類型,因此這裏須要用到數據轉換函數datetimeParse結合格式化函數format在數據導入時進行轉換。 關鍵腳本以下:

datetimeParse(format(time,"000000000"),"HHmmssSSS")

雖然經過循環導入實現起來很是簡單,可是實際上100G的數據是由極多的5M左右的細碎文件組成,若是單線程操做會等待好久,爲了充分利用集羣的資源,因此咱們按照年度把數據導入拆分紅多個子任務,輪流發送到各節點的任務隊列並行執行,提升導入的效率。這個過程分下面兩步實現:

(1)定義一個自定義函數,函數的主要功能是導入指定年度目錄下的全部文件:

//循環處理年度目錄下的全部數據文件
def loadCsvFromYearPath(path, dbPath, tableName){
	symbols = files(path)[`filename]
	for(sym in symbols){
		filePath = path + "/" + sym
		t=loadText(filePath)
		database(dbPath).loadTable(tableName).append!(select symbol, exchange,cycle, tradingDay,date,datetimeParse(format(time,"000000000"),"HHmmssSSS"),open,high,low,close,volume,turnover,unixTime from t )			
	}
}

(2)經過 rpc 函數結合 submitJob 函數把上面定義的函數提交到各節點去執行:

nodesAlias="NODE" + string(1..4)
years= files(rootDir)[`filename]

index = 0;
for(year in years){	
	yearPath = rootDir + "/" + year
	des = "loadCsv_" + year
	rpc(nodesAlias[index%nodesAlias.size()],submitJob,des,des,loadCsvFromYearPath,yearPath,dbPath,tableName)
	index=index+1
}

數據導入過程當中,能夠經過pnodeRun(getRecentJobs)來觀察後臺任務的完成狀況。

案例完整腳本

相關文章
相關標籤/搜索