乾貨丨DolphinDB文本數據加載教程

DolphinDB提供如下4個函數,將文本數據導入內存或數據庫:html

loadText: 將文本文件導入爲內存表。git

ploadText: 將文本文件並行導入爲分區內存表。與loadText函數相比,速度更快。github

loadTextEx: 將文本文件導入數據庫中,包括分佈式數據庫,本地磁盤數據庫或內存數據庫。數據庫

textChunkDS:將文本文件劃分爲多個小數據源,再經過mr函數進行靈活的數據處理。服務器

DolphinDB的文本數據導入不只靈活,功能豐富,並且速度很是快。DolphinDB與Clickhouse, MemSQL, Druid, Pandas等業界流行的系統相比,單線程導入的速度更快,最多可達一個數量級的優點;多線程並行導入的狀況下,速度優點更加明顯。多線程

本教程介紹文本數據導入時的常見問題,相應的解決方案以及注意事項。併發

1. 自動識別數據格式

大多數其它系統中,導入文本數據時,須要由用戶指定數據的格式。爲了方便用戶,DolphinDB在導入數據時,可以自動識別數據格式。app

自動識別數據格式包括兩部分:字段名稱識別和數據類型識別。若是文件的第一行沒有任何一列以數字開頭,那麼系統認爲第一行是文件頭,包含了字段名稱。DolphinDB會抽取少許部分數據做爲樣本,並自動推斷各列的數據類型。由於是基於部分數據,某些列的數據類型的識別可能有誤。可是對於大多數文本文件,無須手動指定各列的字段名稱和數據類型,就能正確地導入到DolphinDB中。分佈式

請注意:DolphinDB支持自動識別大部分 DolphinDB提供的數據類型,可是目前暫不支持識別UUID和IPADDR類型,在後續版本中會支持。

loadText函數用於將數據導入DolphinDB內存表。下例調用loadText函數導入數據,並查看生成的數據表的結構。例子中涉及到的數據文件請參考附錄。ide

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath);

查看數據表前5行數據:

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

調用schema函數查看錶結構(字段名稱、數據類型等信息):

tmpTB.schema().colDefs;

name       typeString typeInt comment
---------- ---------- ------- -------
symbol     SYMBOL     17
exchange   SYMBOL     17
cycle      INT        4
tradingDay DATE       6
date       DATE       6
time       INT        4
open       DOUBLE     16
high       DOUBLE     16
low        DOUBLE     16
close      DOUBLE     16
volume     INT        4
turnover   DOUBLE     16
unixTime   LONG       5

2. 指定數據導入格式

本教程講述的4個數據加載函數中,都可用schema參數指定一個表,內含各字段的名稱、類型、格式、須要導入的列等信息。該表可包含如下4列:

  • name:字符串,表示列名
  • type:字符串,表示每列的數據類型
  • format:字符串,表示日期或時間列的格式
  • col:整型,表示要加載的列的下標。該列的值必須是升序。

其中,name和type這兩列是必需的,並且必須是前兩列。format和col這兩列是可選的,且沒有前後關係的要求。

例如,咱們可使用下面的數據表做爲schema參數:

name         type
----------   -------
timestamp    SECOND
ID           INT
qty          INT
price        DOUBLE

2.1 提取文本文件的schema

extractTextSchema函數用於獲取文本文件的schema,包括字段名稱和數據類型等信息。

例如,使用extractTextSchema函數獲得本教程中示例文件的表結構:

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath)
schemaTB;

name       type
---------- ------
symbol     SYMBOL
exchange   SYMBOL
cycle      INT
tradingDay DATE
date       DATE
time       INT
open       DOUBLE
high       DOUBLE
low        DOUBLE
close      DOUBLE
volume     INT
turnover   DOUBLE
unixTime   LONG

經過extractTextSchema函數獲得數據文件的表結構schemaTB之後,若表中自動解析的數據類型不符合預期,可使用SQL語句對該表進行修改,從而獲得知足要求的表結構。

2.2 指定字段名稱和類型

當系統自動識別的字段名稱或者數據類型不符合預期或需求時,能夠經過設置schema參數爲文本文件中的每列指定字段名稱和數據類型。

例如,若導入數據的volume列被自動識別爲INT類型,而須要的volume類型是LONG類型,就須要經過schema參數指定volumne列類型爲LONG。下面的例子中,首先調用extractTextSchema函數獲得文本文件的表結構,再根據需求修改表中列的數據類型。

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="LONG" where name="volume";

使用loadText函數導入文本文件,將數據按照schemaTB所規定的字段數據類型導入到數據庫中。

tmpTB=loadText(filename=dataFilePath,schema=schemaTB);

查看錶中前五行的數據,volume列數據以長整型的形式正常顯示:

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

上例介紹了修改數據類型的狀況,若要修改表中的字段名稱,也能夠經過一樣的方法實現。

請注意,若DolphinDB對日期和時間相關數據類型的解析不符合預期,須要經過本教程第2.3小節的方式解決。

2.3 指定日期和時間類型的格式

對於日期列或時間列的數據,若是DolphinDB識別的數據類型不符合預期,不只須要在schema的type列指定數據類型,還須要在format列中指定格式(用字符串表示),如"MM/dd/yyyy"。如何表示日期和時間格式請參考日期和時間的調整及格式

下面結合例子具體說明對日期和時間列指定數據類型的方法。

在DolphinDB中執行如下腳本,生成本例所需的數據文件。

dataFilePath="/home/data/timeData.csv"
t=table(["20190623 14:54:57","20190623 15:54:23","20190623 16:30:25"] as time,`AAPL`MS`IBM as sym,2200 5400 8670 as qty,54.78 59.64 65.23 as price)
saveText(t,dataFilePath);

加載數據前,使用extractTextSchema函數獲取該數據文件的schema:

schemaTB=extractTextSchema(dataFilePath)
schemaTB;

name  type
----- ------
time  SECOND
sym   SYMBOL
qty   INT
price DOUBLE

顯然,系統識別time列的數據類型不符合預期。若是直接加載該文件,time列的數據將爲空。爲了可以正確加載該文件time列的數據,須要指定time列的數據類型爲DATETIME,而且指定該列的格式爲"yyyyMMdd HH:mm:ss"。

update schemaTB set type="DATETIME" where name="time"
schemaTB[`format]=["yyyyMMdd HH:mm:ss",,,];

導入數據並查看,數據顯示正確:

tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

time                sym  qty  price
------------------- ---- ---- -----
2019.06.23T14:54:57 AAPL 2200 54.78
2019.06.23T15:54:23 MS   5400 59.64
2019.06.23T16:30:25 IBM  8670 65.23

2.4 導入指定列

在導入數據時,能夠經過schema參數指定只導入文本文件中的某幾列。

下例中,只需加載文本文件中symbol, date, open, high, close, volume, turnover這7列。

首先,調用extractTextSchema函數獲得目標文本文件的表結構。

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath);

使用rowNo函數爲各列生成列號,賦值給schema表中的col列,而後修改schema表,僅保留表示須要導入的字段的行。

update schemaTB set col = rowNo(name)
schemaTB=select * from schemaTB where name in `symbol`date`open`high`close`volume`turnover;
請注意:
1.列號從0開始。上例中第一列symbol列對應的列號是0。
2.導入數據時不能改變各列的前後順序。若是須要調整列的順序,能夠將數據文件加載後,再使用reorderColumns!函數。

最後,使用loadText函數,並配置schema參數,導入文本文件中指定的列。

tmpTB=loadText(filename=dataFilePath,schema=schemaTB);

查看錶中前5行,只導入了所需的列:

select top 5 * from tmpTB

symbol date       open   high  close volume turnover
------ ---------- ------ ----- ----- ------ ----------
000001 2018.01.02 9.31E7 13.35 13.35 13     2.003635E6
000001 2018.01.02 9.32E7 13.37 13.33 13     867181
000001 2018.01.02 9.33E7 13.32 13.32 13     903894
000001 2018.01.02 9.34E7 13.35 13.35 13     1.012E6
000001 2018.01.02 9.35E7 13.35 13.35 13     1.601939E6

2.5 跳過文本數據的前若干行

在數據導入時,若需跳過文件前n行(可能爲文件說明),可指定skipRows參數爲n。因爲描述文件的說明一般不會很是冗長,所以這個參數的取值最大爲1024。本教程講述的4個數據加載函數均支持skipRows參數。

下例中,經過loadText函數導入數據文件,而且查看該文件導入之後表的總行數,以及前5行的內容。

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath)
select count(*) from tmpTB;

count
-----
5040

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

指定skipRows參數取值爲1000,跳過文本文件的前1000行導入文件:

tmpTB=loadText(filename=dataFilePath,skipRows=1000)
select count(*) from tmpTB;

count
-----
4041

select top 5 * from tmpTB;

col0   col1 col2 col3       col4       col5      col6  col7  col8  col9  col10  col11      col12
------ ---- ---- ---------- ---------- --------- ----- ----- ----- ----- ------ ---------- -------------
000001 SZSE 1    2018.01.08 2018.01.08 101000000 13.13 13.14 13.12 13.14 646912 8.48962E6  1515377400000
000001 SZSE 1    2018.01.08 2018.01.08 101100000 13.13 13.14 13.13 13.14 453647 5.958462E6 1515377460000
000001 SZSE 1    2018.01.08 2018.01.08 101200000 13.13 13.14 13.12 13.13 700853 9.200605E6 1515377520000
000001 SZSE 1    2018.01.08 2018.01.08 101300000 13.13 13.14 13.12 13.12 738920 9.697166E6 1515377580000
000001 SZSE 1    2018.01.08 2018.01.08 101400000 13.13 13.14 13.12 13.13 469800 6.168286E6 1515377640000
請注意:如上例所示,在跳過前n行進行導入時,若數據文件的第一行是列名,改行會做爲第一行被略過。

在上面的例子中,文本文件指定skipRows參數導入之後,因爲表示列名的第一行被跳過,列名變成了默認列名:col1,col2等等。若須要保留列名而又指定跳過前n行,可先經過extractTextSchema函數獲得文本文件的schema,在導入時指定schema參數:

schema=extractTextSchema(dataFilePath)
tmpTB=loadText(filename=dataFilePath,schema=schema,skipRows=1000)
select count(*) from tmpTB;

count
-----
4041

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time      open  high  low   close volume turnover   unixTime
------ -------- ----- ---------- ---------- --------- ----- ----- ----- ----- ------ ---------- -------------
000001 SZSE     1     2018.01.08 2018.01.08 101000000 13.13 13.14 13.12 13.14 646912 8.48962E6  1515377400000
000001 SZSE     1     2018.01.08 2018.01.08 101100000 13.13 13.14 13.13 13.14 453647 5.958462E6 1515377460000
000001 SZSE     1     2018.01.08 2018.01.08 101200000 13.13 13.14 13.12 13.13 700853 9.200605E6 1515377520000
000001 SZSE     1     2018.01.08 2018.01.08 101300000 13.13 13.14 13.12 13.12 738920 9.697166E6 1515377580000
000001 SZSE     1     2018.01.08 2018.01.08 101400000 13.13 13.14 13.12 13.13 469800 6.168286E6 1515377640000

3. 並行導入數據

3.1 單個文件多線程載入內存

ploadText函數可將一個文本文件以多線程的方式載入內存。該函數與loadText函數的語法是一致的,區別在於,ploadText函數能夠快速載入大型文件,而且生成內存分區表。它充分利用了多核CPU來並行載入文件,並行程度取決於服務器自己CPU核數量和節點的localExecutors配置。

下面比較loadText函數與ploadText函數導入同一個文件的性能。

首先經過腳本生成一個4GB左右的文本文件:

filePath="/home/data/testFile.csv"
appendRows=100000000
t=table(rand(100,appendRows) as int,take(string('A'..'Z'),appendRows) as symbol,take(2010.01.01..2018.12.30,appendRows) as date,rand(float(100),appendRows) as float,00:00:00.000 + rand(86400000,appendRows) as time)
t.saveText(filePath);

分別經過loadTextploadText來載入文件。本例所用節點是6核12超線程的CPU。

timer loadText(filePath);
Time elapsed: 12629.492 ms

timer ploadText(filePath);
Time elapsed: 2669.702 ms

結果顯示在此配置下,ploadText的性能是loadText的4.5倍左右。

3.2 多文件並行導入

在大數據應用領域,數據導入每每不僅是一個或兩個文件的導入,而是數十個甚至數百個大型文件的批量導入。爲了達到更好的導入性能,建議儘可能以並行方式導入批量的數據文件。

loadTextEx函數可將文本文件導入指定的數據庫中,包括分佈式數據庫,本地磁盤數據庫或內存數據庫。因爲DolphinDB的分區表支持併發讀寫,所以能夠支持多線程導入數據。使用loadTextEx將文本數據導入到分佈式數據庫,具體實現爲將數據先導入到內存,再由內存寫入到數據庫,這兩個步驟由同一個函數完成,以保證高效率。

下例展現如何將磁盤上的多個文件批量寫入到DolphinDB分區表中。首先,在DolphinDB中執行如下腳本,生成100個文件,共約778MB,包括1千萬條記錄。

n=100000
dataFilePath="/home/data/multi/multiImport_"+string(1..100)+".csv"
for (i in 0..99){
    trades=table(sort(take(100*i+1..100,n)) as id,rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,take(2000.01.01..2000.06.30,n) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5)
    trades.saveText(dataFilePath[i])
};

建立數據庫和表:

login(`admin,`123456)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,1..10000)
tb=db.createPartitionedTable(trades,`tb,`id);

DolphinDB的cut函數可將一個向量中的元素分組。下面調用cut函數將待導入的文件路徑進行分組,再調用submitJob函數,爲每一個線程分配寫入任務,批量導入數據。

def writeData(db,file){
   loop(loadTextEx{db,`tb,`id,},file)
}
parallelLevel=10
for(x in dataFilePath.cut(100/parallelLevel)){
    submitJob("loadData"+parallelLevel,"loadData",writeData{db,x})
};
請注意:DolphinDB的分區表不容許多個線程同時向一個分區寫數據。上例中,每一個文件中的分區列(id列)取值不一樣,所以不會形成多個線程寫入同一個分區的狀況。在設計分區表的併發讀寫時,請確保不會有多個線程同時寫入同一分區。

經過getRecentJobs函數能夠取得當前本地節點上最近n個批處理做業的狀態。使用select語句計算並行導入批量文件所需時間,獲得在6核12超線程的CPU上耗時約1.59秒。

select max(endTime) - min(startTime) from getRecentJobs() where jobId like "loadData"+string(parallelLevel)+"%";

max_endTime_sub
---------------
1590

執行如下腳本,將100個文件單線程順序導入數據庫,記錄所需時間,耗時約8.65秒。

timer writeData(db, dataFilePath);
Time elapsed: 8647.645 ms

結果顯示在此配置下,並行開啓10個線程導入速度是單線程導入的5.5倍左右。

查看數據表中的記錄條數:

select count(*) from loadTable("dfs://DolphinDBdatabase", `tb);

count
------
10000000

4. 導入數據庫前的預處理

在將數據導入數據庫以前,若須要對數據進行復雜的處理,例如日期和時間數據類型的強制轉換,填充空值等,能夠在調用loadTextEx函數時指定transform參數。tansform參數接受一個函數做爲參數,而且要求該函數只能接受一個參數。函數的輸入是一個未分區的內存表,輸出也是一個未分區的內存表。須要注意的是,只有loadTextEx函數提供transform參數。

4.1 指定日期和時間數據的數據類型

4.1.1 將數值類型表示的日期和時間轉化爲指定類型

數據文件中表示時間的數據多是整型或者長整型,而在進行數據分析時,每每又須要將這類數據強制轉化爲時間類型的格式導入並存儲到數據庫中。針對這種場景,可經過loadTextEx函數的transform參數爲文本文件中的日期和時間列指定相應的數據類型。

首先,建立分佈式數據庫和表。

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);

自定義函數foo,用於對數據進行預處理,並返回處理事後的數據表。

def foo(mutable t){
    return t.replaceColumn!(`time,time(t.time/10))
}
請注意:在自定義函數體內對數據進行處理時,請儘可能使用本地的修改(帶有!的函數)來提高性能。

調用loadTextEx函數,而且指定transform參數,系統會對文本文件中的數據執行transform參數指定的函數,即foo函數,再將獲得的結果保存到數據庫中。

tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=foo);

查看錶內前5行數據。可見time列是以TIME類型存儲,而不是文本文件中的INT類型:

select top 5* from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time               open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- ------------------ ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 02:35:10.000000000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:20.000000000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:30.000000000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:40.000000000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:50.000000000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

4.1.2 爲文本文件中的日期和時間相關列指定數據類型

另外一種與日期和時間列相關的處理是,文本文件中日期以DATE類型存儲,在導入數據庫時但願以MONTH的形式存儲。這種狀況也可經過loadTextEx函數的transform參數轉換該日期列的數據類型,步驟與上述過程一致。

login(`admin,`123456)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="MONTH" where name="tradingDay"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date)
def fee(mutable t){
    return t.replaceColumn!(`tradingDay,month(t.tradingDay))
}
tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=fee);

查看錶內前5行數據。可見tradingDay列是以MONTH類型存儲,而不是文本文件中的DATE類型:

select top 5* from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01M   2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01M   2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01M   2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01M   2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01M   2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

4.2 對錶內數據填充空值

transform參數支持調用DolphinDB的內置函數,當內置函數要求多個參數時,咱們可使用部分應用將多參數函數轉換爲一個參數的函數。例如,調用nullFill!函數對文本文件中的空值進行填充。

db=database(dbPath,VALUE,2018.01.02..2018.01.30)
tb=db.createPartitionedTable(tb,`tb1,`date)
tmpTB=loadTextEx(dbHandle=db,tableName=`pt,partitionColumns=`date,filename=dataFilePath,transform=nullFill!{,0});

5. 使用Map-Reduce自定義數據導入

DolphinDB支持使用Map-Reduce自定義數據導入,將數據按行進行劃分,並將劃分後的數據經過Map-Reduce導入到DolphinDB。

可以使用textChunkDS函數將文件劃分爲多個小文件數據源,再經過mr函數寫入到數據庫中。在調用mr將數據存入數據庫前,用戶還可進行靈活的數據處理,從而實現更復雜的導入需求。

5.1 將文件中的股票和期貨數據存儲到兩個不一樣的數據表

在DolphinDB中執行如下腳本,生成一個大小約爲1GB的數據文件,其中包括股票數據和期貨數據。

n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`stock`futures,n) as type, rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,take(2000.01.01..2000.06.30,n) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4,rand(10000,n) as qty5,rand(10000,n) as qty6)
trades.saveText(dataFilePath);

分別建立用於存放股票數據和期貨數據的分佈式數據庫和表:

login(`admin,`123456)
dbPath1="dfs://DolphinDBTickDatabase"
dbPath2="dfs://DolphinDBFuturesDatabase"
db1=database(dbPath1,VALUE,`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S)
db2=database(dbPath2,VALUE,2000.01.01..2000.06.30)
tb1=db1.createPartitionedTable(trades,`stock,`sym)
tb2=db2.createPartitionedTable(trades,`futures,`date);

定義函數,用於劃分數據,並將數據寫入到不一樣的數據庫。

def divideImport(tb, mutable stockTB, mutable futuresTB)
{
	tdata1=select * from tb where type="stock"
	tdata2=select * from tb where type="futures"
	append!(stockTB, tdata1)
	append!(futuresTB, tdata2)
}

再經過textChunkDS函數劃分文本文件,以300MB爲單位進行劃分,文件被劃分紅了4部分。

ds=textChunkDS(dataFilePath,300)
ds;

(DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment)

調用mr函數,指定數據源將文件導入到數據庫中。因爲map函數(由mapFunc參數指定)只接受一個表做爲參數,這裏咱們使用部分應用將多參數函數轉換爲一個參數的函數。

mr(ds=ds, mapFunc=divideImport{,tb1,tb2}, parallel=false);
請注意,這裏每一個小文件數據源可能包含相同分區的數據。DolphinDB不容許多個線程同時對相同分區進行寫入,所以要將 mr函數的parallel參數設置爲false,不然會拋出異常。

查看2個數據庫中表的前5行,股票數據庫中均爲股票數據,期貨數據庫中均爲期貨數據。

stock表:

select top 5 * from loadTable("dfs://DolphinDBTickDatabase", `stock);

type  sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 qty6
----- ---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ----
stock AMZN 2000.02.14 11.224234 112.26763  1160.926836 11661.418403 11902.403305 11636.093467 4    53   450  2072 9116 12
stock AMZN 2000.03.29 10.119057 111.132165 1031.171855 10655.048121 12682.656303 11182.317321 6    21   651  2078 7971 6207
stock AMZN 2000.06.16 11.61637  101.943971 1019.122963 10768.996906 11091.395164 11239.242307 0    91   857  3129 3829 811
stock AMZN 2000.02.20 11.69517  114.607763 1005.724332 10548.273754 12548.185724 12750.524002 1    39   270  4216 8607 6578
stock AMZN 2000.02.23 11.534805 106.040664 1085.913295 11461.783565 12496.932604 12995.461331 4    35   488  4042 6500 4826

futures表:

select top 5 * from loadTable("dfs://DolphinDBFuturesDatabase", `futures);

type    sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 ...
------- ---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ---
futures MSFT 2000.01.01 11.894442 106.494131 1000.600933 10927.639217 10648.298313 11680.875797 9    10   241  524  8325 ...
futures S    2000.01.01 10.13728  115.907379 1140.10161  11222.057315 10909.352983 13535.931446 3    69   461  4560 2583 ...
futures GM   2000.01.01 10.339581 112.602729 1097.198543 10938.208083 10761.688725 11121.888288 1    1    714  6701 9203 ...
futures IBM  2000.01.01 10.45422  112.229537 1087.366764 10356.28124  11829.206165 11724.680443 0    47   741  7794 5529 ...
futures TSLA 2000.01.01 11.901426 106.127109 1144.022732 10465.529256 12831.721586 10621.111858 4    43   136  9858 8487 ...
n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

5.2 快速加載大文件首尾部分數據

可以使用textChunkDS將大文件劃分紅多個小的數據源(chunk),而後加載首尾兩個數據源。在DolphinDB中執行如下腳本生成數據文件:

n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

再經過textChunkDS函數劃分文本文件,以10MB爲單位進行劃分。

ds=textChunkDS(dataFilePath, 10);

調用mr函數,加載首尾兩個chunk的數據。由於這兩個chunk的數據很是小,加載速度很是快。

head_tail_tb = mr(ds=[ds.head(), ds.tail()], mapFunc=x->x, finalFunc=unionAll{,false});

查看head_tail_tb表中的記錄數以及前5條記錄。由於數據是隨機生成,記錄數可能每次會略有不一樣,前5行的數據也會跟下面顯示的不一樣。

select count(*) from head_tail_tb;

count
------
192262

查看錶的前5行數據:

select top 5 * from head_tail_tb;

sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 qty6
---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ----
IBM  2000.01.01 10.978551 114.535418 1163.425635 11827.976468 11028.01038  10810.987825 2    51   396  6636 9403 937
MSFT 2000.01.01 11.776656 106.472172 1138.718459 10720.778545 10164.638399 11348.744314 9    79   691  533  5669 72
FB   2000.01.01 11.515097 118.674854 1153.305462 10478.6335   12160.662041 13874.09572  3    29   592  2097 4103 113
MSFT 2000.01.01 11.72034  105.760547 1139.238066 10669.293733 11314.226676 12560.093619 1    99   166  2282 9167 483
TSLA 2000.01.01 10.272615 114.748639 1043.019437 11508.695323 11825.865846 10495.364306 6    43   95   9433 6641 490

6. 其它注意事項

6.1 不一樣編碼的數據的處理

因爲DolphinDB的字符串採用UTF-8編碼,加載的文件必須是UTF-8編碼。若爲其它形式的編碼,能夠在導入之後進行轉化。DolphinDB提供了convertEncodefromUTF8toUTF8函數,用於導入數據後對字符串編碼進行轉換。

例如,使用convertEncode函數轉換表tmpTB中的exchange列的編碼:

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath, skipRows=0)
tmpTB.replaceColumn!(`exchange, convertEncode(tmpTB.exchange,"gbk","utf-8"));

6.2 數值類型的解析

本教程第1節介紹了DolphinDB在導入數據時的數據類型自動解析機制,本節講解數值類型數據的解析。在數據導入時,若指定數據類型爲數值類型(包括CHAR,SHORT,INT,LONG,FLOAT和DOUBLE),則系統可以識別如下幾種形式的數據:

  • 數字表示的數值,例如:123
  • 以逗號分隔的數字表示的數值,例如:100,000
  • 帶有小數點的數字表示的數值,即浮點數,例如:1.231
  • 科學計數法表示的數值,例如:1.23E5

DolphinDB在導入時會會自動忽略數字先後的字母及其餘符號,若是沒有出現任何數字,則解析爲NULL值。下面結合例子具體說明。

首先,執行如下腳本,建立一個文本文件。

dataFilePath="/home/data/testSym.csv"
prices1=["2131","$2,131", "N/A"]
prices2=["213.1","$213.1", "N/A"]
totals=["2.658E7","-2.658e7","2.658e-7"]
tt=table(1..3 as id, prices1 as price1, prices2 as price2, totals as total)
saveText(tt,dataFilePath);

建立的文本文件中,price1和price2列中既有數字,又有字符。若不指定schema參數導入數據,DolphinDB會將price1和price2列均識別爲SYMBOL類型:

tmpTB=loadText(dataFilePath)
tmpTB;

id price1 price2 total
-- ------ ------ --------
1  2131   213.1  2.658E7
2  $2,131 $213.1 -2.658E7
3  N/A    N/A    2.658E-7

tmpTB.schema().colDefs;

name   typeString typeInt comment
------ ---------- ------- -------
id     INT        4
price1 SYMBOL     17
price2 SYMBOL     17
total  DOUBLE     16

若分別指定price1和price2列爲INT和FLOAT類型,DolphinDB在導入時會會自動忽略數字先後的字母及其餘符號。若是沒有出現任何數字,則解析爲NULL值。

schemaTB=table(`id`price1`price2`total as name, `INT`INT`FLOAT`DOUBLE as type)
tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

id price1 price2     total
-- ------ ---------- --------
1  2131   213.100006 2.658E7
2  2131   213.100006 -2.658E7
3                    2.658E-7

6.3 自動脫去文本外的雙引號

在CSV文件中,有時候會用雙引號來處理文本和數值中含有的特殊字符(譬如分隔符)的字段。DolphinDB處理這樣的數據時,會自動脫去文本外的雙引號。下面結合例子具體說明。

首先生成示例數據。生成的文件中,num列數據爲使用三位分節法表示的數值。

dataFilePath="/home/data/testSym.csv"
tt=table(1..3 as id,  ["\"500\"","\"3,500\"","\"9,000,000\""] as num)
saveText(tt,dataFilePath);

導入數據並查看錶內數據,DolphinDB database自動脫去了文本外的雙引號。

tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

id num
-- -------
1  500
2  3500
3  9000000

附錄

本教程的例子中使用的數據文件: candle_201801.csv

相關文章
相關標籤/搜索