一個job項表明ETL控制流中的一項邏輯任務。Job項將會順序執行,每一個job項會產生一個結果,能做爲別的分支上job項的條件。數據庫
圖 1 job項示例框架
圖 2 Job entry類圖結構函數
JobEntryInterface是Job Entry插件的主要實現接口。主要包含如下功能:工具
1 保存Job Entry設置源碼分析
實現類使用私有變量保存設置的參數,經過get、set方法獲取和設置。Dialog實現類會經過這些方法,保存或設置設置界面上的參數。同時,須要提供一個深度拷貝的方法,由於在一些保存參數且可能修改的地方會調用。this
圖 3 JobEntryTrans配置界面lua
2 序列化插件spa
插件要實現對本插件的序列化,實現兩種方式xml與數據庫。插件
圖 4 轉換插件xml序列化結果線程
3 輸出信息提供
一個job entry支持三種類型的輸出:true、flase和無條件。這三種狀況不是全部的job entry插件都會同時支持的,例如dummy job entry僅支持true和false。因此,插件必須顯現兩類函數,來查看支持哪一種結果。
public boolean evaluates()//是否支持true、false
public boolean isUnconditional()//是否支持無條件執行
4 執行任務
負責工做的執行。
public Result execute()//執行具體的邏輯,須要結果和開始到該項的距離
prev_result.setNrErrors()//設置執行過程當中的異常數
prev_result.setResult()//設置結果,若是不知道true/false,結果不設置
最後返回prev_result。
負責構建和打開參數設置對話框。Spoon經過調用open函數打開該對話框,spoon是使用swt框架的,因此對話框也應使用swt來實現。
每個jobEntryInterface的實現類在完成相應功能時,返回結果的類型。
主要成員變量:
1 private boolean result;執行是否出現異常 2 3 private int exitStatus; 執行結果狀態 4 5 private List<RowMetaAndData> rows;一個jobEntry完成處理後的數據(若存在) 6 7 private Map<String, ResultFile> resultFiles;
圖 5 Job開啓時序圖
Job的開啓與Trans相相似,配置執行的參數,檢查.kjb文件是否發生變化,實例化一個Job對象,開啓該線程。
主要工做是從JobMeta的JobHopMeta找到job入口jobentry信息,根據開始條件調用真正執行jobentry的execute方法2,代碼以下所示:
代碼 4 Job.excute()關鍵代碼
1 startpoint=jobMeta.findJobEntry(JobMeta.STRING_SPECIAL_START, 0, false);// 找到Job開始組件 2 JobEntrySpecial jes = (JobEntrySpecial) startpoint.getEntry(); 3 // JobEntrySpecial是啓動job的job項目 4 Result res = null; 5 while ( (jes.isRepeat() || isFirst) && !isStopped()){ 6 //符合開始條件時,調用execute方法2 7 isFirst = false; 8 res = execute(0, null, startpoint, null, 9 Messages.getString("Job.Reason.Started")); 10 }
execute()方法包含,的參數有執行次數(START不算,從0開始,順序執行)、接一個Entry執行結果、當前Entry的拷貝、前一個Entry拷貝和緣由。
主要功能是根據參數startpoint,提取對應的jobentry,執行對應的jobentry操做,再根據JobMeta的hop信息依次獲得下一個jobentry,遞歸調用。具體的執行步驟以下所示:
圖 6 Job執行步驟
具體每一個組件的執行體對應org.pentaho.di.job.entries包內每一個entry的具體實現。
execute()方法2中調用jobEntry的execute()完成jobEntry的具體功能。
final Result result = cloneJei.execute(prevResult, nr, rep, this);
不一樣的Job項目(JobEntry)實現差異很大。
功能是開啓一個job,只是簡單地對傳遞來的preResult設置它的的result屬性值爲true,(Job項目據此判斷前一結果執行完畢)。返回該對象便可。
功能是判斷一個table是否存在數據庫中。JobEntryTableExit Job項目有屬性tablename和DatabaseMeta(對數據庫的元數據信息描述)根據DatabaseMeta獲得一個Dabase對象db,創建鏈接db.connect(); 調用db.checkTableExists(tablename)根據此返回值設置preResult的result屬性爲否爲true。返回preResult對象。
JobEntryJob和JobEntryTrans是嵌套job或trans的Job項目(JobEntry)。它們是比較複雜的job項目。
做用是執行一個trans。首先實例化一個TransMeta,以後實例化Trans。調用trans.start(),當執行完畢後調用函數trans.getResult(),並把結果加到preResult中,返回該對象便可。
Result中也能夠有處理數據,這些處理數據能夠做爲下一個Job項目(JobEntry)的輸入。可是容量受內存容量限制。
PDI使用數據庫插件來進行數據庫的正確鏈接、執行SQL,同時也考慮現有數據的各類特殊功能和不一樣限制。
在PDI裏面,已經集成了很是多的數據庫插件,大部分的插件都會繼承自BaseDatabaseMeta。下面所示的方法一般都須要被重寫,基類裏面並無相關的實現。要實現的方法主要分紅3大主題:鏈接信息、SQL方言和功能標記。
1 鏈接詳情
當PDI創建數據庫鏈接時將會調用這些函數,或者數據庫設置對話框裏顯示與方言有關的內容時也會調用。
2 SQL Generation
構建有效的SQL數據庫方言時會調用這些方法。
3 功能標記
查詢使用的數據庫是否支持該功能。
PS:Kettle源碼分析算是所有講完了,最後奉送本身作的PPT http://pan.baidu.com/share/link?shareid=3803402535&uk=3792525916。Kettle應該算是比較小衆的軟件,可是在業界仍是很是有名氣的。我看到過好幾個中國的公司,包括上市公司,說本身最新的ETL工具或者數據共享交換工具都是Kettle的改版。