乾貨丨DolphinDB數據導入教程


DolphinDB提供了多種靈活的數據導入方法,來幫助用戶方便的把海量數據從多個數據源導入。具體有以下4種途徑:node

  • 經過文本文件導入
  • 經過二進制文件導入
  • 經過HDF5接口導入
  • 經過ODBC接口導入

1. DolphinDB數據庫基本概念和特色

本章中多處使用到DolphinDB的數據庫和表的概念,因此這裏首先作一個介紹。數據庫

在DolphinDB裏數據以結構化數據表的方式保存。數據表按存儲介質能夠分爲:服務器

  • 內存表:數據保存在內存中,存取速度最快,可是若節點關閉就會丟失數據。
  • 本地磁盤表:數據保存在本地磁盤上,即便節點關閉,也不會丟失數據。能夠方便的把數據從磁盤加載到內存。
  • 分佈式表:數據分佈在不一樣的節點的磁盤上,經過DolphinDB的分佈式計算引擎,邏輯上仍然能夠像本地表同樣作統一查詢。

按是否分區能夠分爲:app

  • 普通表(未分區表)
  • 分區表

在傳統的數據庫系統,分區是針對數據表定義的,就是同一個數據庫裏的每一個數據表均可以有本身的分區定義;而DolphinDB的分區是針對數據庫定義的,也就是說同一個數據庫下的數據表只能使用同一種分區機制,這也意味着若是兩張表要使用不一樣的分區機制,那麼它們是不能放在一個數據庫下的。分佈式

 

2. 經過文本文件導入

經過文件進行數據中轉是比較通用化的一種數據遷移方式,方式簡單易操做。DolphinDB提供瞭如下三個函數來載入文本文件:ide

  • loadText: 將文本文件以 DolphinDB 數據表的形式讀取到內存中。
  • ploadText: 將數據文件做爲分區表並行加載到內存中。與loadText函數相比,速度更快。
  • loadTextEx: 把數據文件轉換爲DolphinDB數據庫中的分佈式表,而後將表的元數據加載到內存中。

如下爲將candle_201801.csv導入DolphinDB來演示loadText和loadTextEx的用法。函數

2.1 loadText性能

loadText函數有三個參數,第一個參數filename是文件名,第二個參數delimiter用於指定不一樣字段的分隔符,默認是",",第三個參數schema是用來指定導入後表的每一個字段的數據類型,schema參數是一個數據表,格式示例以下:spa

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

首先導入數據:插件

dataFilePath = "/home/data/candle_201801.csv"
tmpTB = loadText(dataFilePath);

DolphinDB在導入數據的同時,隨機提取一部分的行以肯定各列數據類型,因此對大多數文本文件無須手動指定各列的數據類型,很是方便。但有時系統自動識別的數據類型並不符合預期或需求,好比導入數據的volume列被識別爲INT類型, 而須要的volume類型是LONG類型,這時就須要使用一個數據類型表做爲schema參數。例如可以使用以下腳本構建數據類型表:

nameCol = `symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
typeCol = `SYMBOL`SYMBOL`INT`DATE`DATE`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`LONG
schemaTb = table(nameCol as name,typeCol as type);

當表字段很是多的時候,寫這樣一個腳本費時費力,爲了簡化操做,DolphinDB提供了extractTextSchema 函數,可從文本文件中提取表的結構生成數據類型表。只需修改少數指定字段的數據類型,就可獲得理想的數據類型表。

整合上述方法,可以使用以下腳本以導入數據:

dataFilePath = "/home/data/candle_201801.csv"
schemaTb=extractTextSchema(dataFilePath)
update schemaTb set type=`LONG where name=`volume        tt=loadText(dataFilePath,,schemaTb);

2.2 ploadText

ploadText函數的特色能夠快速載入大文件。它在設計中充分利用了多核CPU來並行載入文件,並行程度取決於服務器自己CPU核數量和節點的localExecutors配置。

首先經過腳本生成一個4G左右的CSV文件:

filePath = "/home/data/testFile.csv"
appendRows = 100000000
dateRange = 2010.01.01..2018.12.30
ints = rand(100, appendRows)
symbols = take(string('A'..'Z'), appendRows)
dates = take(dateRange, appendRows)
floats = rand(float(100), appendRows)
times = 00:00:00.000 + rand(86400000, appendRows)
t = table(ints as int, symbols as symbol, dates as date, floats as float, times as time)
t.saveText(filePath)

分別經過loadText和ploadText來載入文件。本例所用節點是4核8超線程的CPU。

timer loadText(filePath);
Time elapsed: 39728.393 ms

timer ploadText(filePath);
Time elapsed: 10685.838 ms

結果顯示在此配置下,ploadText的性能是loadText的4倍左右。

2.3 loadTextEx

loadText函數老是把全部數據導入內存。當數據文件體積很是龐大時,服務器的內存很容易成爲制約因素。DolphinDB提供的;loadTextEx函數能夠較好的解決這個問題。它將一個大的文本文件分割成不少個小塊,逐步加載到分佈式數據表中。

首先建立分佈式數據庫:

db=database("dfs://dataImportCSVDB",VALUE,2018.01.01..2018.01.31)

而後將文本文件導入數據庫中"cycle"表:

dataFilePath = "/home/data/candle_201801.csv"
loadTextEx(db, "cycle", "tradingDay", dataFilePath)

當須要使用數據時,經過loadTable函數將分區元數據先載入內存。

tb = database("dfs://dataImportCSVDB").loadTable("cycle")

在實際執行查詢的時候,會按需加載所需數據到內存。


3. 經過二進制文件導入

對於二進制格式的文件,DolphinDB提供了2個函數用於導入:readRecord!函數和loadRecord函數。兩者的區別是,前者不支持導入字符串類型的數據,後者支持。下面經過2個例子分別介紹這兩個函數的用法。

  • readRecord!函數

readRecord!函數可以導入不含有字符串類型字段的二進制文件,下面介紹如何使用readRecord!函數導入一個二進制文件:binSample.bin。

首先,建立一個內存表tb,用於存放導入的數據,須要爲每一列指定字段名稱和數據類型。

tb=table(1000:0, `id`date`time`last`volume`value`ask1`ask_size1`bid1`bid_size1, [INT,INT,INT,FLOAT,INT,FLOAT,FLOAT,INT,FLOAT,INT])

調用file函數打開文件,並經過readRecord!函數導入二進制文件,數據會被加載到tb表中。

dataFilePath="/home/data/binSample.bin"
f=file(dataFilePath)
f.readRecord!(tb);

查看tb表的數據,數據已經正確導入:

select top 5 * from tb;

id date     time     last volume value ask1  ask_size1 bid1  bid_size1
-- -------- -------- ---- ------ ----- ----- --------- ----- ---------
1  20190902 91804000 0    0      0     11.45 200       11.45 200
2  20190902 92007000 0    0      0     11.45 200       11.45 200
3  20190902 92046000 0    0      0     11.45 1200      11.45 1200
4  20190902 92346000 0    0      0     11.45 1200      11.45 1200
5  20190902 92349000 0    0      0     11.45 5100      11.45 5100

導入之後的數據中,date列和time列的數據以數值形式存儲,爲了更直觀地顯示數據,可使用temporalParse函數進行日期和時間類型數據的格式轉換。再使用replaceColumn!函數替換表中原有的列。具體以下所示。

tb.replaceColumn!(`date, tb.date.string().temporalParse("yyyyMMdd"))
tb.replaceColumn!(`time, tb.time.format("000000000").temporalParse("HHmmssSSS"))
select top 5 * from tb;

id date       time         last volume value ask1  ask_size1 bid1  bid_size1
-- ---------- ------------ ---- ------ ----- ----- --------- ----- ---------
1  2019.09.02 09:18:04.000 0    0      0     11.45 200       11.45 200
2  2019.09.02 09:20:07.000 0    0      0     11.45 200       11.45 200
3  2019.09.02 09:20:46.000 0    0      0     11.45 1200      11.45 1200
4  2019.09.02 09:23:46.000 0    0      0     11.45 1200      11.45 1200
5  2019.09.02 09:23:49.000 0    0      0     11.45 5100      11.45 5100

  • loadRecord函數

loadRecord函數可以處理字符串類型的數據(包括STRING和SYMBOL類型),可是要求字符串在磁盤上的長度必須固定。若是字符串的長度小於固定值,則用ASCII值0填充,加載的時候會把末尾0去掉。下面介紹使用loadRecord函數導入一個帶有字符串類型字段的二進制文件:binStringSample.bin。

首先,指定要導入文件的表結構,包括字段名稱和數據類型。與readRecord!函數不一樣的是,loadRecord函數是經過一個元組來指定schema,而不是直接定義一個內存表。關於表結構的指定,有如下3點要求:

  1. 對於表中的每一個字段,都須要以tuple的形式指定字段名稱和相應的數據類型。
  2. 若類型是字符串,還需指定磁盤上的字符串長度(包括結尾的0)。例如:("name",SYMBOL,24)。
  3. 將全部tuple按照字段順序組成元組,做爲表結構。

針對本例中的數據文件指定表結構,具體以下所示:

schema = [("code", SYMBOL, 32),("date", INT),("time", INT),("last", FLOAT),("volume", INT),("value", FLOAT),("ask1", FLOAT),("ask2", FLOAT),("ask3", FLOAT),("ask4", FLOAT),("ask5", FLOAT),("ask6", FLOAT),("ask7", FLOAT),("ask8", FLOAT),("ask9", FLOAT),("ask10", FLOAT),("ask_size1", INT),("ask_size2", INT),("ask_size3", INT),("ask_size4", INT),("ask_size5", INT),("ask_size6", INT),("ask_size7", INT),("ask_size8", INT),("ask_size9", INT),("ask_size10", INT),("bid1", FLOAT),("bid2", FLOAT),("bid3", FLOAT),("bid4", FLOAT),("bid5", FLOAT),("bid6", FLOAT),("bid7", FLOAT),("bid8", FLOAT),("bid9", FLOAT),("bid10", FLOAT),("bid_size1", INT),("bid_size2", INT),("bid_size3", INT),("bid_size4", INT),("bid_size5", INT),("bid_size6", INT),("bid_size7", INT),("bid_size8", INT),("bid_size9", INT),("bid_size10", INT)]

使用loadRecord函數導入二進制文件,因爲表的列數較多,經過select語句選出幾列有表明性的數據進行後續介紹。

dataFilePath="/home/data/binStringSample.bin"
tmp=loadRecord(dataFilePath, schema)
tb=select code,date,time,last,volume,value,ask1,ask_size1,bid1,bid_size1 from tmp;

查看錶內數據的前5行。

select top 5 * from tb;

code      date     time     last volume value ask1  ask_size1 bid1  bid_size1
--------- -------- -------- ---- ------ ----- ----- --------- ----- ---------
601177.SH 20190902 91804000 0    0      0     11.45 200       11.45 200
601177.SH 20190902 92007000 0    0      0     11.45 200       11.45 200
601177.SH 20190902 92046000 0    0      0     11.45 1200      11.45 1200
601177.SH 20190902 92346000 0    0      0     11.45 1200      11.45 1200
601177.SH 20190902 92349000 0    0      0     11.45 5100      11.45 5100

用一樣的方法處理日期和時間列的數據:

tb.replaceColumn!(`date, tb.date.string().temporalParse("yyyyMMdd"))
tb.replaceColumn!(`time, tb.time.format("000000000").temporalParse("HHmmssSSS"))
select top 5 * from tb;

code      date       time         last volume value ask1  ask_size1 bid1  bid_size1
--------- ---------- ------------ ---- ------ ----- ----- --------- ----- ---------
601177.SH 2019.09.02 09:18:04.000 0    0      0     11.45 200       11.45 200
601177.SH 2019.09.02 09:20:07.000 0    0      0     11.45 200       11.45 200
601177.SH 2019.09.02 09:20:46.000 0    0      0     11.45 1200      11.45 1200
601177.SH 2019.09.02 09:23:46.000 0    0      0     11.45 1200      11.45 1200
601177.SH 2019.09.02 09:23:49.000 0    0      0     11.45 5100      11.45 5100

除了readRecord!和loadRecord函數以外,DolphinDB還提供了一些與二進制文件的處理相關的函數,例如writeRecord函數,用於將DolphinDB對象保存爲二進制文件。具體請參考用戶手冊。

 

4. 經過HDF5接口導入

HDF5是一種高效的二進制數據文件格式,在數據分析領域普遍使用。DolphinDB支持導入HDF5格式數據文件。

DolphinDB經過HDF5插件來訪問HDF5文件,插件提供瞭如下方法:

  • hdf5::ls - 列出h5文件中全部 Group 和 Dataset 對象
  • hdf5::lsTable - 列出h5文件中全部 Dataset 對象
  • hdf5::hdf5DS - 返回h5文件中 Dataset 的元數據
  • hdf5::loadHDF5 - 將h5文件導入內存表
  • hdf5::loadHDF5Ex - 將h5文件導入分區表
  • hdf5::extractHDF5Schema - 從h5文件中提取表結構

DolphinDB 1.00.0版本以後,安裝目錄/server/plugins/hdf5已經包含HDF5插件,使用如下腳本加載插件:

loadPlugin("plugins/hdf5/PluginHdf5.txt")

若用戶使用的是老版本,默認不包含此插件,可先從HDF5插件對應版本分支bin目錄下載,再將插件部署到節點的plugins目錄下。

調用插件方法時須要在方法前面提供namespace,好比調用loadHDF5可使用hdf5::loadHDF5。另外一種寫法是:、

use hdf5
loadHDF5(filePath,tableName)

HDF5文件的導入與CSV文件相似。例如,若要導入包含一個Dataset candle_201801的文件candle_201801.h5,可以使用如下腳本,其中datasetName可經過ls或lsTable得到:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
tmpTB = hdf5::loadHDF5(dataFilePath,datasetName)

若是須要指定數據類型導入可使用hdf5::extractHDF5Schema,腳本以下:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
schema=hdf5::extractHDF5Schema(dataFilePath,datasetName)
update schema set type=`LONG where name=`volume
tt=hdf5::loadHDF5(dataFilePath,datasetName,schema)

若是HDF5文件超過服務器內存,可使用hdf5::loadHDF5Ex載入數據。

首先建立用於保存數據的分佈式表:

dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
dfsPath = "dfs://dataImportHDF5DB"
db=database(dfsPath,VALUE,2018.01.01..2018.01.31)

而後導入HDF5文件:

hdf5::loadHDF5Ex(db, "cycle", "tradingDay", dataFilePath,datasetName)

5. 經過ODBC接口導入

DolphinDB支持ODBC接口鏈接第三方數據庫,從其中直接將數據表讀取成DolphinDB的內存數據表。

DolphinDB官方提供ODBC插件用於鏈接第三方數據源,使用該插件能夠方便的從ODBC支持的數據庫遷移數據至DolphinDB中。

ODBC插件提供瞭如下四個方法用於操做第三方數據源數據:

  • odbc::connect - 開啓鏈接
  • odbc::close - 關閉鏈接
  • odbc::query - 根據給定的SQL語句查詢數據並將結果返回到DolphinDB的內存表
  • odbc::execute - 在第三方數據庫內執行給定的SQL語句,不返回結果。
  • odbc::append - 把DolphinDB中表的數據寫入第三方數據庫的表中。

在使用ODBC插件以前,須要安裝ODBC驅動程序,請參考ODBC插件使用教程。

下面的例子使用ODBC插件鏈接如下SQL Server:

  • server:172.18.0.15
  • 默認端口:1433
  • 鏈接用戶名:sa
  • 密碼:123456
  • 數據庫名稱: SZ_TAQ

第一步,下載插件解壓並拷貝 plugins\odbc 目錄下全部文件到DolphinDB server的 plugins/odbc 目錄下(有些版本的DolphinDB安裝目錄/server/plugins/odbc已經包含ODBC插件,可略過此步),經過下面的腳本完成插件初始化:

loadPlugin("plugins/odbc/odbc.cfg")
conn=odbc::connect("Driver=ODBC Driver 17 for SQL Server;Server=172.18.0.15;Database=SZ_TAQ;Uid=sa;Pwd=123456;")

第二步,建立分佈式數據庫。使用SQL Server中的數據表結構做爲DolphinDB數據表的模板。

tb = odbc::query(conn,"select top 1 * from candle_201801")
db=database("dfs://dataImportODBC",VALUE,2018.01.01..2018.01.31)
db.createPartitionedTable(tb, "cycle", "tradingDay")

第三步,從SQL Server中導入數據並保存爲DolphinDB分區表:

tb = database("dfs://dataImportODBC").loadTable("cycle")
data = odbc::query(conn,"select * from candle_201801")
tb.append!(data);

經過ODBC導入數據方便快捷。經過DolphinDB的定時做業機制,它還能夠做爲時序數據定時同步的數據通道。


6. 導入數據實例

下面以股票市場日K線圖數據文件導入做爲示例。每一個股票數據存爲一個CSV文件,共約100G,時間範圍爲2008年-2017年,按年度分目錄保存。2008年度路徑示例以下:

2008
    ---- 000001.csv
    ---- 000002.csv
    ---- 000003.csv
    ---- 000004.csv
    ---- ...

每一個文件的結構都是一致的,如圖所示:

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

6.1 分區規劃

要導入數據以前,首先要作好數據的分區規劃,即肯定分區字段以及分區粒度。

肯定分區字段要考慮平常的查詢語句執行頻率。以where, group by或context by中經常使用字段做爲分區字段,能夠極大的提高數據檢索和分析的效率。使用股票數據的查詢常常與交易日期和股票代碼有關,因此咱們建議採用 tradingDay和symbol這兩列進行組合(COMPO)分區。

分區大小應儘可能均勻,同時分區粒度不宜過大或太小。咱們建議一個分區未壓縮前的原始數據大小控制在100M~1G之間。有關爲什麼分區大小應均勻,以及分區最佳粒度的考慮因素,請參考DolphinDB分區數據庫教程第四節。

綜合考慮,咱們能夠在複合(COMPO)分區中,根據交易日期進行範圍分區(每一年一個範圍),並按照股票代碼進行範圍分區(共100個代碼範圍),共產生 10 * 100 = 1000 個分區,最終每一個分區的大小約100M左右。

首先建立交易日期的分區向量。若要爲後續進入的數據預先製做分區,可把時間範圍設置爲2008-2030年。

yearRange = date(2008.01M + 12*0..22);

經過如下腳本獲得symbol字段的分區向量。因爲每隻股票的數據量一致,咱們遍歷全部的年度目錄,整理出股票代碼清單,並經過cutPoint函數分紅100個股票代碼區間。考慮到將來新增的股票代碼可能會大於現有最大股票代碼,咱們增長了一個虛擬的代碼999999,做爲股票代碼的上限值。

symbols = array(SYMBOL, 0, 100)
yearDirs = files(rootDir)[`filename]
for(yearDir in yearDirs){
	path = rootDir + "/" + yearDir
	symbols.append!(files(path)[`filename].upper().strReplace(".CSV",""))
}
symbols = symbols.distinct().sort!().append!("999999");
symRanges = symbols.cutPoints(100)

經過如下腳本建立複合(COMPO)分區數據庫,以及數據庫內的分區表"stockData":

columns=`symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
types =  [SYMBOL,SYMBOL,INT,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG]

dbDate=database("", RANGE, yearRange)
dbID=database("", RANGE, symRanges)
db = database(dbPath, COMPO, [dbDate, dbID])

pt=db.createPartitionedTable(table(1000000:0,columns,types), `stockData, `tradingDay`symbol);


6.2 導入數據

數據導入的具體過程是經過目錄樹,將全部的CSV文件讀取並寫入到分佈式數據庫表dfs://SAMPLE_TRDDB中。這其中會有一些細節問題。例如,CSV文件中保存的數據格式與DolphinDB內部的數據格式存在差別,好比time字段,原始數據文件裏是以整數例如「9390100000」表示精確到毫秒的時間,若是直接讀入會被識別成整數類型,而不是時間類型,因此這裏須要用到數據轉換函數datetimeParse結合格式化函數format在數據導入時進行轉換。可採用如下腳本:

datetimeParse(format(time,"000000000"),"HHmmssSSS")

若是單線程導入100GB的數據會耗時好久。爲了充分利用集羣的資源,咱們能夠按照年度把數據導入拆分紅多個子任務,發送到各節點的任務隊列並行執行,提升導入的效率。這個過程可分爲如下兩步實現。

首先定義一個函數以導入指定年度目錄下的全部文件:

def loadCsvFromYearPath(path, dbPath, tableName){
	symbols = files(path)[`filename]
	for(sym in symbols){
		filePath = path + "/" + sym
		t=loadText(filePath)
		database(dbPath).loadTable(tableName).append!(select symbol, exchange,cycle, tradingDay,date,datetimeParse(format(time,"000000000"),"HHmmssSSS"),open,high,low,close,volume,turnover,unixTime from t )
    }
}

而後經過rpc函數結合submitJob函數把該函數提交到各節點去執行:

nodesAlias="NODE" + string(1..4)
years= files(rootDir)[`filename]

index = 0;
for(year in years){	
	yearPath = rootDir + "/" + year
	des = "loadCsv_" + year
	rpc(nodesAlias[index%nodesAlias.size()],submitJob,des,des,loadCsvFromYearPath,yearPath,dbPath,`stockData)
	index=index+1
}

數據導入過程當中,可使用pnodeRun(getRecentJobs)來觀察後臺任務的完成狀況。

須要注意的是,分區是 DolphinDB database 存儲數據的最小單位。DolphinDB對分區的寫入操做是獨佔式的,當任務並行進行的時候,請避免多任務同時向一個分區寫入數據。本例中每一年的數據的寫入由一個單獨任務執行,各任務操做的數據範圍沒有重合,因此不可能發生多任務同時寫入同一分區的狀況。

本案例的詳細腳本在附錄提供下載連接。


7. 附錄

  • CSV導入數據文件
  • 二進制導入例1數據文件
  • 二進制導入例2數據文件
  • HDF5導入數據文件
  • 案例完整腳本
相關文章
相關標籤/搜索