乾貨丨DolphinDB定時做業教程

DolphinDB提供的定時做業(scheduled job)功能,可讓系統在指定的時間以指定的頻率自動執行做業。當咱們須要數據庫定時自動執行一些腳本進行計算分析(譬如每日休市後分鍾級的K線計算、每個月統計報表生成)、數據庫管理(譬如數據庫備份、數據同步)、操做系統管理(譬如過時的日誌文件刪除)等工做時,能夠用這個功能來實現。html

定時做業用一個函數來表示,這給了做業定義極大的靈活性。凡是能用函數來表示的工做,均可以做爲定時任務來運行。定時做業經過scheduleJob函數提交,並按設定時間在後臺運行。做業建立後,做業相關定義信息序列化保存到數據節點的磁盤文件。節點重啓後,系統會反序列化並加載定時做業。定時做業每次運行的結果也會保存到節點磁盤上,咱們可使用getJobMessagegetJobReturn來查看每一個做業的運行日誌和返回值。mysql

1.功能介紹

1.1 建立定時做業git

建立定時做業使用函數scheduleJob。做業建立後,系統會序列化做業定義信息並保存到文件<homeDir>/sysmgmt/jobEditlog.meta。函數語法以下:github

scheduleJob(jobId, jobDesc, jobFunc, scheduledTime, startDate, endDate, frequency, [days])

其中要注意的是:sql

  • 參數jobFunc(做業函數)是一個不帶參數的函數。
  • 參數scheduledTime(預約時間)能夠是minute類型的標量或向量。當它爲向量時,注意相鄰2個時間點的間隔不能小於30分鐘。
  • 函數返回值是定時做業的做業ID。若是輸入的jobId與已有定時做業的做業ID不重複,系統返回輸入的jobId。不然在jobId後面添加當前日期,"000",「001」等做爲後綴,直到產生惟一的做業ID。

衆所周知,執行一個函數必須提供函數須要的全部參數。在函數化編程中,一個提供了全部參數的函數,實際上就是原函數的一個特殊的部分應用(Partial Application),也即一個不帶參數的函數。在DolphinDB中,咱們用花括號{}來表示部分應用。shell

自定義函數、內置函數、插件函數、函數視圖(Function View)和模塊中的函數等各種函數均可以做爲做業函數。所以,定時做業幾乎能作任何事情。好比用自定義函數、插件函數等作計算分析,用內置函數run運行一個腳本文件,用shell函數執行操做系統管理等等。下面例子中的做業調用了一個自定義函數getMaxTemperature,用於計算前一天某個設備溫度指標的最大值,參數是設備編號,建立做業時,用getMaxTemperature{1}給設備編號賦值1,定時做業在天天0點執行。數據庫

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,ts between (today()-1).datetime():(today().datetime()-1)
    return  maxTemp
}
scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

下面的例子執行了一個腳本文件。做業函數用了run函數,並指定腳本文件monthlyJob.dos的完整路徑做爲參數,做業在2020年的每個月1號0點執行。編程

scheduleJob(`monthlyJob, "Monthly Job 1", run{"/home/DolphinDB/script/monthlyJob.dos"}, 00:00m, 2020.01.01, 2020.12.31, 'M', 1);

下面的例子執行了一個刪除日誌文件的操做系統命令。做業函數用了shell函數,並指定具體的命令「rm /home/DolphinDB/server/dolphindb.log」做爲參數。做業在每週的週日1點執行。session

scheduleJob(`weeklyjob, "rm log", shell{"rm /home/DolphinDB/server/dolphindb.log"}, 1:00m, 2020.01.01, 2021.12.31, 'W', 6);

在實際應用中,用函數參數、函數返回值進行輸入輸出有點不太方便,咱們更經常使用的作法是從數據庫中取出數據,計算後把結果再存到數據庫中。下面的例子是在每日休市後,計算分鐘級的K線。自定義函數computeK中,行情數據從分佈式數據庫表trades中取出,計算後存入分佈式數據庫表OHLC中。做業的frequency爲"W"、days爲[1,2,3,4,5],scheduledTime爲15:00m,表示做業在每週一到週五的15點執行。app

def computeK(){
	barMinutes = 7
	sessionsStart=09:30:00.000 13:00:00.000
	OHLC =  select first(price) as open, max(price) as high, min(price) as low,last(price) as close, sum(volume) as volume 
		from loadTable("dfs://stock","trades")
		where time >today() and time < now()
		group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart
	append!(loadTable("dfs://stock","OHLC"),OHLC)
}
scheduleJob(`kJob, "7 Minutes", computeK, 15:00m, 2020.01.01, 2021.12.31, 'W', [1,2,3,4,5]);

1.2 查詢定時做業

查詢節點中的定時做業定義信息能夠用getScheduledJobs。函數語法以下:

getScheduledJobs([jobIdPattern])

其中參數jobIdPattern是表示做業ID或做業ID模式的字符串。它支持通配符「%」和「?」。函數的返回值是表格形式的定時做業信息。若jobId沒有指定,則返回全部做業。

系統會對每次做業的執行狀況進行保存,包括定時做業的運行日誌和返回值。運行日誌保存在jodId.msg 文件中,定時做業的返回值保存在jobId.object文件中。這些文件都保存在目錄<homeDir>/batchJobs下。咱們能夠分別使用getJobMessagegetJobReturn來查看每一個做業的運行日誌和返回值。但要注意jobID的取值,一是建立做業時,如前所述,若jobId與已有定時做業的做業ID重複,系統返回的不是輸入的jobId;二是對會屢次執行的做業,每次執行定時做業時,做業ID是不同的。所以咱們須要用getRecentJobs來查看已完成的定時做業。好比咱們定義以下定時做業:

def foo(){
	print "test scheduled job at"+ now()
	return now()
}
scheduleJob(`testJob, "foo", foo, 17:00m+0..2*30, today(), today(), 'D');

運行getRecentJobs()後獲得以下信息:

jobId	            jobDesc	startTime	            endTime
------              ------- ----------------------- ----------------------
testJob	            foo1	2020.02.14T17:00:23.636	2020.02.14T17:00:23.639
testJob20200214	    foo1	2020.02.14T17:30:23.908	2020.02.14T17:30:23.910
testJob20200214000  foo1	2020.02.14T18:00:23.148	2020.02.14T18:00:26.749

從中咱們看到,第一次執行的做業ID是「testJob」,第二次是「testJob20200214」...每次都有變化。以下所示,咱們可用getJobMessagegetJobReturn查看了第3次的執行狀況:

>getJobMessage(`testJob20200214000);
2020-02-14 18:00:23.148629 Start the job [testJob20200214000]: foo
2020-02-14 18:00:23.148721 test the scheduled job at 2020.02.14T18:00:23.148
2020-02-14 18:00:26.749111 The job is done.

>getJobReturn(`testJob20200214000);
2020.02.14T18:00:23.148

1.3 刪除定時做業

刪除定時做業用函數deleteScheduledJob。語法以下:

deleteScheduledJob(jobId)

參數jobId是做業ID。刪除前可用getScheduledJobs獲得想要刪除做業的做業ID。

2.定時做業運行時的權限

用戶建立定時做業時以什麼身份登陸,執行定時做業時就以這個身份運行。所以用戶建立定時做業時,須要確保用戶有權限訪問用到的資源。好比登陸用戶不是受權用戶,就不能訪問集羣的分佈式功能,若用到了集羣的分佈式功能,執行時就會出錯。如下例子中用戶guestUser1沒有訪問DFS權限:

def foo1(){
	print "Test scheduled job "+ now()
	cnt=exec count(*) from loadTable("dfs://FuturesContract","tb")
	print "The count of table is "+cnt
	return cnt
}
login("guestUser1","123456")
scheduleJob(`guestGetDfsjob, "dfs read", foo1, [12:00m, 21:03m, 21:45m], 2020.01.01, 2021.12.31, "D");

做業執行後,用getJobMessage(`guestGetDfsjob)查詢,以下所示,定時做業沒有權限去讀取分佈式數據庫:

2020-02-14 21:03:23.193039 Start the job [guestGetDfsjob]: dfs read
2020-02-14 21:03:23.193092 Test the scheduled job at 2020.02.14T21:03:23.193
2020-02-14 21:03:23.194914 Not granted to read table dfs://FuturesContract/tb

所以,若要遠程執行控制節點的某些功能,訪問集羣中的某個分佈式表,須要先以管理員(admin)或其餘受權用戶身份登陸。具體能夠經過login函數來完成。

從所示日誌中也能夠發現,訪問分佈式表後的語句沒有執行,也就是說做業執行過程當中若遇到錯誤,執行會中斷。爲了防止出現異常而中止執行後續的腳本,可以使用try-catch語句俘獲異常。運行過程當中須要輸出運行信息,能夠用print打印,輸出都會記錄在jodId.msg日誌文件中。

3.定時做業的序列化

定時做業在建立後,系統會把建立用戶(userID)、做業的ID、描述信息、起始時間、做業頻率、做業的定義等持久化保存。存儲路徑爲<homeDir>/sysmgmt/jobEditlog.meta。做業用一個DolphinDB的函數來表示。函數的定義包括了一系列語句,這些語句又會調用其餘函數和一些全局類對象,譬如共享變量(shared variable)。共享變量序列化時用名稱來表示。反序列化時,共享變量必須存在,不然會失敗。做業函數或其依賴的函數根據是否通過編譯能夠分兩類:通過編譯的函數包括內置函數和插件函數和腳本函數包括自定義函數、函數視圖和模塊中的函數等。這兩類函數的序列化方法有所不一樣,下面分別進行說明。

3.1 通過編譯的函數的序列化

對通過編譯的函數的序列化,只序列化函數名稱和模塊名稱。反序列化的時候,會在系統中搜索這些模塊及函數,若搜索不到,就會失敗。因此定時做業中若用到了插件函數,就須要在反序列化以前預先加載。系統與定時做業相關組件資源的初始化順序依次是:系統級初始化腳本(dolphindb.dos),函數視圖(function view)、用戶級啓動腳本(startup.dos)和定時做業。定時做業在啓動腳本執行後加載。以下例所示,在做業函數jobDemo中用到了odbc插件:

use odbc
def jobDemo(){
	conn = odbc::connect("dsn=mysql_factorDBURL");
}
scheduleJob("job demo","example of init",jobDemo,15:48m, 2019.01.01, 2020.12.31, 'D')

但odbc插件在系統啓動時沒有加載,因此讀取定時做業的時候,因沒法識別這個函數,輸出下列日誌後退出系統。

<ERROR>:Failed to unmarshall the job [job demo]. Failed to deserialize assign statement.. Invalid message format

在啓動腳本中加入下列代碼加載odbc插件後,系統即啓動成功。

loadPlugin("plugins/odbc/odbc.cfg")

3.2 腳本函數的序列化

腳本函數會序列化函數參數以及函數定義的每個語句。語句中若又包含了依賴的腳本函數,也會序列化這些依賴函數的定義。

建立定時做業後,若這些腳本函數被刪除或被修改了,或它依賴的腳本函數被修改,不影響定時做業運行。若但願定時做業按新的函數執行,就須要先刪除定時做業、而後從新建立定時做業,不然會運行舊的序列化的函數。其中要注意關聯的函數也須要從新定義。下面舉例說明:

  • 例子1,做業函數在建立定時做業後被修改,以下所示,做業函數f在建立scheduleJob後被從新定義:
def f(){
	print "The old function is called " 
}
scheduleJob(`test, "f", f, 11:05m, today(), today(), 'D');
go
def f(){
	print "The new function is called " 
}

定時做業執行後,用getJobMessage(`test)獲得以下信息,從中看到定時做業執行的仍是舊的自定義函數。

2020-02-14 11:05:53.382225 Start the job [test]: f
2020-02-14 11:05:53.382267 The old function is called 
2020-02-14 11:05:53.382277 The job is done.
  • 例子2,做業函數在建立定時做業後依賴的函數被修改,以下所示,做業函數是函數視圖fv,fv調用了函數foo,在scheduleJob後,函數foo從新被定義,函數視圖也從新生成:
def foo(){
	print "The old function is called " 
}
def fv(){
	foo()
}
addFunctionView(fv)  

scheduleJob(`testFvJob, "fv", fv, 11:36m, today(), today(), 'D');
go
def foo(){
	print "The new function is called " 
}
dropFunctionView(`fv)
addFunctionView(fv)

定時做業執行後,而後getJobMessage(`testFvJob)獲得以下信息,從中看到定時做業執行的仍是舊的函數。

2020-02-14 11:36:23.069892 Start the job [testFvJob]: fv
2020-02-14 11:36:23.069939 The old function is called 
2020-02-14 11:36:23.069951 The job is done.

用模塊函數也是如此。咱們建立一個模塊printLog.dos,其內容以下:

module printLog
def printLogs(logText){
	writeLog(string(now()) + " : " + logText)
	print "The old function is called"
}

而後建立一個定時做業調用這個printLog::printLogs函數:

use printLog
def f5(){
	printLogs("test my log")
}
scheduleJob(`testModule, "f5", f5, 13:32m, today(), today(), 'D');

在運行定時做業以前修改模塊以下:

module printLog
def printLogs(logText){
	writeLog(string(now()) + " : " + logText)
	print "The new function is called"
}

定時做業執行後,而後getJobMessage(`testModule)獲得以下信息,從中看到定時做業執行的仍是舊的函數。

2020-02-14 13:32:22.870855 Start the job [testModule]: f5
2020-02-14 13:32:22.871097 The old function is called
2020-02-14 13:32:22.871106 The job is done.

4.定時運行腳本文件

在建立定時做業時,若做業函數是run一個腳本文件,由於序列化時只保存了文件名,沒有保存文件內容,因此須要把依賴的自定義函數都放到腳本文件中,不然會由於找不到自定義的函數而執行失敗。好比建立一個腳本文件testjob.dos,文件內容以下:

foo()

而後在DolphinDB GUI中執行下列腳本:

def foo(){
	print ("Hello world!")
}
run "/home/xjqian/testjob.dos"

結果顯示能正常執行:

2020.02.14 13:47:00.992: executing code (line 104-108)...
Hello world!

再建立定時做業run這個腳本文件,代碼以下所示:

scheduleJob(`dailyfoofile1, "Daily Job 1", run {"/home/xjqian/testjob.dos"}, 16:14m, 2020.01.01, 2020.12.31, 'D');

但運行這個做業時卻發生了以下異常:

Exception was raised when running the script [/home/xjqian/testjob.dos]:Syntax Error: [line #3] Cannot recognize the token foo

這是foo函數定義和定時做業執行不在同一個會話(session)中,做業執行時找不到函數定義的緣故。把foo()的定義放到腳本文件中,修改testjob.dos文件內容以下:

def foo(){
	print ("Hello world!")
}
foo()

再從新建立定時做業運行這個腳本文件,就能順利完成。

5.小結和展望

常見故障及排除

  • 做業函數引用了共享變量,可是做業加載前沒有定義該共享變量。通常建議在用戶的啓動腳本中定義該共享變量。
  • 做業函數引用了插件中的函數,可是做業加載前沒有加載該插件。通常建議在用戶的啓動腳本中定義加載該插件。
  • 定時運行一個腳本文件,找不到依賴的函數。腳本文件必須包含依賴的自定義函數。
  • 建立定時做業的用戶沒有訪問分佈式數據庫表的權限。受權該用戶訪問相應數據庫的權限。
  • 在啓動腳本中使用函數scheduleJob、 getScheduledJobsdeleteScheduledJob時拋出異常。節點啓動時,定時做業的初始化在啓動腳本以後,所以不能在啓動腳本中使用跟定時做業相關的功能

在某些罕見的狀況下,可能出如今系統重啓時,發生定時做業加載失敗,甚至致使系統沒法啓動的狀況。尤爲是版本升級的時候,內置函數、插件函數等函數接口可能會有變化從而致使做業沒法加載,或者出現一些兼容性bug致使系統重啓失敗。所以,咱們開發時須要保留定義定時做業的腳本。若因定時任務致使系統沒法啓動,能夠先刪除定時做業的序列化文件<homeDir>/sysmgmt/jobEditlog.meta,在系統重啓後再從新建立這些定時做業。

後續功能開發

  • 增長瀏覽做業函數以及依賴的函數的定義的功能。
  • 定義和實現定時做業之間的依賴關係。
相關文章
相關標籤/搜索