企業在使用大數據分析平臺時,首先須要把海量數據從多個數據源遷移到大數據平臺中。node
在導入數據前,咱們須要理解 DolphinDB database 的基本概念和特色。git
DolphinDB數據表按存儲介質分爲3種類型:github
DolphinDB數據表按是否分區分爲2種類型:數據庫
在傳統的數據庫中,分區是針對數據表的,即同一個數據庫中的每一個數據表能夠有不一樣的分區方案;而DolphinDB的分區是針對數據庫的,即一個數據庫只能使用一種分區方案。若是兩個表的分區方案不一樣,它們不能放在同一個數據庫中。服務器
DolphinDB提供了3種靈活的數據導入方法:app
經過CSV文件進行數據中轉是比較通用的數據遷移方式。DolphinDB提供了loadText、ploadText和loadTextEx三個函數來導入CSV文件。下面咱們經過一個示例CSV文件candle_201801.csv來講明這3個函數的用法。分佈式
1.1 loadTextide
語法:loadText(filename, [delimiter=','], [schema])函數
參數:性能
filename是文件名。
delimiter和schema都是可選參數。
delimiter用於指定不一樣字段的分隔符,默認是「,」。
schema用於數據導入後每一個字段的數據類型,它是一個table類型。DolphinDB提供了字段類型自動識別功能,可是某些狀況下系統自動識別的數據類型不符合需求,好比咱們在導入示例CSVcandle_201801.csv時,volume字段會被識別成INT類型,實際上咱們須要LONG類型,這時就須要使用schema參數。
建立schema table的腳本:
nameCol = `symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime typeCol = [SYMBOL,SYMBOL,INT,DATE,DATE,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,LONG] schemaTb = table(nameCol as name,typeCol as type)
當表的字段很是多時,建立schema table的腳本會十分冗長。爲了不這個問題,DolphinDB提供了extractTextSchema函數,它能夠從文本文件中提取表的結構,咱們只需修改須要指定的字段類型便可。
dataFilePath = "/home/data/candle_201801.csv" schemaTb=extractTextSchema(dataFilePath) update schemaTb set type=`LONG where name=`volume tt=loadText(dataFilePath,,schemaTb)
1.2 ploadText
ploadText把數據文件做爲分區表並行加載到內存中,語法和loadText徹底相同,可是ploadText的速度更快。ploadText主要用於快速載入大文件,它在設計上充分利用了多個core來並行載入文件,並行程度取決於服務器自己core數量和節點的localExecutors配置。
下面咱們對比loadText和ploadText的性能。
首先,經過腳本生成一個4G左右的CSV文件:
filePath = "/home/data/testFile.csv" appendRows = 100000000 dateRange = 2010.01.01..2018.12.30 ints = rand(100, appendRows) symbols = take(string('A'..'Z'), appendRows) dates = take(dateRange, appendRows) floats = rand(float(100), appendRows) times = 00:00:00.000 + rand(86400000, appendRows) t = table(ints as int, symbols as symbol, dates as date, floats as float, times as time) t.saveText(filePath)
分別使用loadText和ploadText來導入文件,該節點是4核8線程的CPU。
timer loadText(filePath); //Time elapsed: 39728.393 ms timer ploadText(filePath); //Time elapsed: 10685.838 ms
結果顯示,ploadText的性能差很少是loadText的4倍。
1.3 loadTextEx
語法:loadTextEx(dbHandle, tableName, [partitionColumns], fileName, [delimiter=','], [schema])
參數:
dbHandle是數據庫句柄。
tableName是保存數據的分佈式表的表名。
partitionColumns、delimiter和schema是可選參數。
當分區方案不是順序分區時,須要指定partitionColumns,表示分區列。
fileName表示導入文件的名稱。
delimiter用於指定不一樣字段的分隔符,默認是「,」。
schema用於數據導入後每一個字段的數據類型,它是一個table類型。
loadText函數老是把數據導入到內存,當數據文件很是龐大時,工做機的內存很容易成爲瓶頸。loadTextEx能夠很好地解決這個問題,它經過邊導入邊保存的方式,把靜態的CSV文件以較爲平緩的數據流的方式「另存爲」DolphinDB的分佈式表,而不是採用所有導入內存再存爲分區表的方式,大大下降了內存的使用需求。
首先建立用於保存數據的分佈式表:
dataFilePath = "/home/data/candle_201801.csv" tb = loadText(dataFilePath) db=database("dfs://dataImportCSVDB",VALUE,2018.01.01..2018.01.31) db.createPartitionedTable(tb, "cycle", "tradingDay")
而後將文件導入分佈式表:
loadTextEx(db, "cycle", "tradingDay", dataFilePath)
當須要使用數據作分析的時候,經過loadTable函數將分區元數據先載入內存,在實際執行查詢的時候,DolphinDB會按需加載數據到內存。
tb = database("dfs://dataImportCSVDB").loadTable("cycle")
HDF5是一種比CSV更高效的二進制數據文件格式,在數據分析領域普遍使用。DolphinDB也支持經過HDF5格式文件導入數據。
DolphinDB經過HDF5插件來訪問HDF5文件,插件提供瞭如下方法:
調用插件方法時須要在方法前面提供namespace,好比調用loadHdf5時hdf5::loadHdf5,若是不想每次調用都使用namespace,可使用use關鍵字:
use hdf5 loadHdf5(filePath,tableName)
要使用DolphinDB的插件,首先須要下載HDF5插件,再將插件部署到節點的plugins目錄下,在使用插件以前須要先加載,使用下面的腳本:
loadPlugin("plugins/hdf5/PluginHdf5.txt")
HDF5文件的導入與CSV文件大同小異,好比咱們要將示例HDF5文件candle_201801.h5導入,它包含一個Dataset:candle_201801,那麼最簡單的導入方式以下:
dataFilePath = "/home/data/candle_201801.h5" datasetName = "candle_201801" tmpTB = hdf5::loadHdf5(dataFilePath,datasetName)
若是須要指定數據類型導入可使用hdf5::extractHdf5Schema,腳本以下:
dataFilePath = "/home/data/candle_201801.h5" datasetName = "candle_201801" schema=hdf5::extractHdf5Schema(dataFilePath,datasetName) update schema set type=`LONG where name=`volume tt=hdf5::loadHdf5(dataFilePath,datasetName,schema)
若是HDF5文件很是龐大,工做機內存沒法支持全量載入,可使用hdf5::loadHdf5Ex方式來載入數據。
首先建立用於保存數據的分佈式表:
dataFilePath = "/home/data/candle_201801.h5" datasetName = "candle_201801" dfsPath = "dfs://dataImportHDF5DB" tb = hdf5::loadHdf5(dataFilePath,datasetName) db=database(dfsPath,VALUE,2018.01.01..2018.01.31) db.createPartitionedTable(tb, "cycle", "tradingDay")
而後將HDF5文件經過hdf5::loadHdf5Ex函數導入:
hdf5::loadHdf5Ex(db, "cycle", "tradingDay", dataFilePath,datasetName)
DolphinDB支持ODBC接口鏈接第三方數據庫,從數據庫中直接將表讀取成DolphinDB的內存數據表。使用DolphinDB提供的ODBC插件能夠方便地從ODBC支持的數據庫中遷移數據至DolphinDB中。
ODBC插件提供瞭如下四個方法用於操做第三方數據源數據:
在使用ODBC插件前,須要先安裝ODBC驅動,請參考ODBC插件使用教程。
下面以鏈接 SQL Server 做爲實例,現有數據庫的具體配置爲:
server:172.18.0.15
默認端口:1433
鏈接用戶名:sa
密碼:123456
數據庫名稱: SZ_TAQ
數據庫表選2016年1月1日的數據,表名candle_201801,字段與CSV文件相同。
要使用ODBC插件鏈接SQL Server數據庫,首先第一步是下載插件解壓並拷貝plugins\odbc目錄下全部文件到DolphinDB Server的plugins/odbc目錄下,經過下面的腳本完成插件初始化:
//載入插件 loadPlugin("plugins/odbc/odbc.cfg") //鏈接 SQL Server conn=odbc::connect("Driver=ODBC Driver 17 for SQL Server;Server=172.18.0.15;Database=SZ_TAQ;Uid=sa; Pwd=123456;")
在導入數據以前,先建立分佈式磁盤數據庫用於保存數據:
//從SQL Server中取到表結構做爲DolphinDB導入表的模板 tb = odbc::query(conn,"select top 1 * from candle_201801") db=database("dfs://dataImportODBC",VALUE,2018.01.01..2018.01.31) db.createPartitionedTable(tb, "cycle", "tradingDay")
從SQL Server中導入數據並保存成DolphinDB分區表:
data = odbc::query(conn,"select * from candle_201801") tb = database("dfs://dataImportODBC").loadTable("cycle") tb.append!(data);
經過ODBC導入數據避免了文件導出導入的過程,並且經過DolphinDB的定時做業機制,它還能夠做爲時序數據定時同步的數據通道。
下面以證券市場日K線圖數據文件導入做爲示例,數據以CSV文件格式保存在磁盤上,共有10年的數據,按年度分目錄保存,一共大約100G的數據,路徑示例以下:
2008
---- 000001.csv
---- 000002.csv
---- 000003.csv
---- 000004.csv
---- ...
2009
...
2018
每一個文件的結構都是一致的,如圖所示:
4.1 分區規劃
要導入數據以前,首先要作好數據的分區規劃,這涉及到兩個方面的考量:
首先根據平常的查詢語句執行頻率,咱們採用trading和symbol兩個字段進行組合範圍(RANGE)分區,經過對經常使用檢索字段分區,能夠極大的提高數據檢索和分析的效率。
接下來要作的是分別定義兩個分區的粒度。
現有數據的時間跨度是從2008-2018年,因此這裏按照年度對數據進行時間上的劃分,在規劃時間分區時要考慮爲後續進入的數據留出足夠的空間,因此這裏把時間範圍設置爲2008-2030年。
yearRange =date(2008.01M + 12*0..22)
這裏股票代碼有幾千個,若是對股票代碼按值(VALUE)分區,那麼每一個分區只是幾兆大小,而分區數量則不少。分佈式系統在執行查詢時,會將查詢語句分紅多個子任務分發到不一樣的分區執行,因此按值分區方式會致使任務數量很是多,而任務執行時間極短,致使系統在管理任務上花費的時間反而大於任務自己的執行時間,這樣的分區方式明顯是不合理的。這裏咱們按照範圍將全部股票代碼均分紅100個區間,每一個區間做爲一個分區,最終分區的大小約100M左右。 考慮到後期有新的股票數據進來,因此增長了一個虛擬的代碼999999,跟最後一個股票代碼組成一個分區,用來保存後續新增股票的數據。
經過下面的腳本獲得 symbol 字段的分區範圍:
//遍歷全部的年度目錄,去重整理出股票代碼清單,並經過cutPoint分紅100個區間 symbols = array(SYMBOL, 0, 100) yearDirs = files(rootDir)[`filename] for(yearDir in yearDirs){ path = rootDir + "/" + yearDir symbols.append!(files(path)[`filename].upper().strReplace(".CSV","")) } //去重並增長擴容空間: symbols = symbols.distinct().sort!().append!("999999"); //均分紅100份 symRanges = symbols.cutPoints(100) 經過下述腳本定義兩個維度組合(COMPO)分區,建立Database和分區表: columns=`symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime types = [SYMBOL,SYMBOL,INT,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG] dbDate=database("", RANGE, yearRange) dbID=database("", RANGE, symRanges) db = database(dbPath, COMPO, [dbDate, dbID]) pt=db.createPartitionedTable(table(1000000:0,columns,types), tableName, `tradingDay`symbol)
須要注意的是,分區是DolphinDB存儲數據的最小單位,DolphinDB對分區的寫入操做是獨佔式的,當任務並行進行的時候,須要避免多任務同時向一個分區寫入數據。本案例中每一年的數據交給一個單獨任務去作,各任務操做的數據邊界沒有重合,因此不可能發生多任務寫入同一分區的狀況。
4.2 導入數據
數據導入腳本的主要思路很簡單,就是經過循環目錄樹,將全部的CSV文件逐個讀取並寫入到分佈式數據庫表dfs://SAMPLE_TRDDB中,可是具體導入過程當中仍是會有不少細節問題。
首先碰到的問題是,CSV文件中保存的數據格式與DolphinDB內部的數據格式存在差別,好比time字段,文件裏是以「9390100000」表示精確到毫秒的時間,若是直接讀入會被識別成數值類型,而不是time類型,因此這裏須要用到數據轉換函數datetimeParse結合格式化函數format在數據導入時進行轉換。 關鍵腳本以下:
datetimeParse(format(time,"000000000"),"HHmmssSSS")
雖然經過循環導入實現起來很是簡單,可是實際上100G的數據是由極多的5M左右的細碎文件組成,若是單線程操做會等待好久,爲了充分利用集羣的資源,因此咱們按照年度把數據導入拆分紅多個子任務,輪流發送到各節點的任務隊列並行執行,提升導入的效率。這個過程分下面兩步實現:
(1)定義一個自定義函數,函數的主要功能是導入指定年度目錄下的全部文件:
//循環處理年度目錄下的全部數據文件 def loadCsvFromYearPath(path, dbPath, tableName){ symbols = files(path)[`filename] for(sym in symbols){ filePath = path + "/" + sym t=loadText(filePath) database(dbPath).loadTable(tableName).append!(select symbol, exchange,cycle, tradingDay,date,datetimeParse(format(time,"000000000"),"HHmmssSSS"),open,high,low,close,volume,turnover,unixTime from t ) } }
(2)經過 rpc 函數結合 submitJob 函數把上面定義的函數提交到各節點去執行:
nodesAlias="NODE" + string(1..4) years= files(rootDir)[`filename] index = 0; for(year in years){ yearPath = rootDir + "/" + year des = "loadCsv_" + year rpc(nodesAlias[index%nodesAlias.size()],submitJob,des,des,loadCsvFromYearPath,yearPath,dbPath,tableName) index=index+1 }
數據導入過程當中,能夠經過pnodeRun(getRecentJobs)來觀察後臺任務的完成狀況。
案例完整腳本