Kettle 4.2源碼分析第三講--Kettle 轉換機制transformation介紹

轉換機制

  每一個轉換步驟都是ETL數據流裏面的一個任務。轉換步驟包括輸入、處理和輸出。輸入步驟從外部數據源獲取數據,例如文件或者數據庫;處理步驟處理數據流,字段計算,流處理等,例如整合或者過濾。輸出步驟將數據寫會到存儲系統裏面,例如文件或者數據庫。數據庫

 

圖 1 轉換步驟示例安全

1. Step類圖簡介

  Kettle爲擴展插件提供了4個擴展點,這4個擴展點也是每一個步驟的組成。每一個類都有其特定的目的及扮演的角色。以TableInput爲例,下圖說明了這4個類的繼承體系。函數

 

圖 2 StepInterface繼承體系spa

  實現StepInterface接口的類,在轉換運行時,將是數據實際處理的位置。每一個執行線程都表示一個實現StepInterface的實例。插件

  BaseStep實現了StepInterface是各step具體實現類的基類。完成了公用的處理函數,如putRow(),可是對於更具體的processRow()在StepBase的子類中。StepBase的主要成員有線程

  public ArrayList<RowSet>  inputRowSets,outputRowSets;3d

  StepBase的子類每次從inputRowSets中取出一行數據,向outputRowSets中寫入一行數據。調試

 

圖 3 StepDataInterface繼承體系code

  實現StepDataInterface接口的類爲數據類,當插件執行時,對於每一個執行執行的線程都是惟一的。保存於step相關的數據信息,好比行的元數據信息。xml

 

圖 4 StepMetaInterface繼承體系

  實現了StepMetaInterface接口的類爲元數據類。它的職責是保存和序列化特定步驟的實例配置,例如保存步驟的名稱、字段名稱等,如何生成加載xml或者讀寫數據庫。

圖 5 StepDialogInterface繼承體系

  實現了StepDialogInterface接口的類爲對話框類,該類實現了該步驟與用戶交互的界面,它顯示一對話框,經過對話框用戶能夠根據本身的要求進行步驟的設定。該對話框類與元數據類關係很是密切,對話框裏面的配置數據均會保存在元數據類裏面。

2. 步驟間交互通訊類

2.1.    RowSet

圖 6 步驟之間通訊機制

  RowSet的實現類,負責步驟之間的相互通訊,rowset對象便是前一個step的成員也是後一個step的成員,訪問是線程安全的。

 

圖 7 RowSet實現類內存快照

  RowSet類中包含源step,目標step和由源向目標發送的一個rowMeta和一組data。其中data數據是以行爲單位的隊列(queArray)。一個RowSet做爲此源step的outputrowsets的一部分。同時做爲目標step的inputRowsets一部分。源Step每次向隊列中寫一行數據,目標step每次從隊列中讀取一行數據。

圖 8 RowSet實現類

2.2.    行元數據

  全部的data均擦除爲object對象。步驟與步驟之間以行爲單位進行處理,天然須要知道每行的結構,即行元數據。行元數據至少須要包括類型、名稱,固然還可能包括字段長度、精度等常見內容。

  行元數據不只在執行的時候須要,並且在轉換設置的時候一樣須要。每一個步驟的行元數據都會保存在.ktr文件或者數據庫裏面,因此能夠根據步驟名稱從TransMeta對象中獲取行元數據。

  行元數據的UML類圖結構以下所示,主要有單元格元數據組成行元數據。在現有的版本中,支持的數據類型有String、Date、BigNumber、Boolean、SerializableType、Binary、Integer、Numberic。

 

圖 9 行元數據UML類圖

3. Trans配置及開啓

 

圖 10 Trans執行時序圖

  在真正運行trans以前,還須要對運行模式進行一個設置。設置結果,會傳給TransGraph.start(executionConfiguration)。配置界面以下所示:

 

圖 11 執行轉換模式設置

實例化Trans的基本流程以下,Trans類時最後真正執行轉換的類。實例化以前須要配置啓動項,保持.ktr文件同步,而後實例化Trans類。最後,開啓後臺程序,這樣不會影響UI的操做,真正的轉換在後臺執行。

 

圖 12 實例化Trans流程圖

4. Trans執行

  trans類的執行有execute()負責,主要包含兩個步驟:轉換執行前的準備工做和全部線程的開啓。Trans每個步驟都會對應一個獨立的線程,線程之間公國RowSet進行通訊交互。

代碼 Trans執行代碼

1   public void execute(String[] arguments) throws KettleException {
2 
3        prepareExecution(arguments);
4 
5        startThreads();
6 
7 }

4.1.    執行準備(prepareExecution)

該步驟,主要完成對通訊類的初始化,對步驟的包裝初始化。最後啓動各個步驟初始化線程,即調用各個步驟的init()方法。準備結束以後,步驟之間的通訊機制完成了,各個步驟的初始化工做也完成了。具體的流程以下所示:

 

圖 13 準備執行流程圖

1.4.2.    轉換處理執行

Trans轉換執行引擎類,經過startThreads()啓動步驟線程。爲全部步驟添加監聽器,在開啓監聽進程對全部線程進行監聽。具體的步驟以下所示

 

圖 14 啓動全部步驟線程

1.4.3.    步驟執行過程

  實現StepInterface的不一樣的step各個功能個不同,可是它們之間也有必定的規律性。下圖只列舉了兩個step,(TextInput)文本輸入和Uniquerow(去重)。BaseStep封裝了getRow()和putRow()方法,從上一個步驟獲取數據和將數據輸入到下一個步驟。

  基類BaseStep採起了統一的處理方式,調用子類processRow以行爲單位處理,核心代碼以下。

  while (stepInterface.processRow(meta, data)&& !stepInterface.isStopped());

  processRow( )通用過程是:調用基類BaseStep 的getRow( )獲得數據,對一行數據處理,處理以後調用基類putRow( )方法數據保存至outputRowSets(即next step的inputRowSets)

 

 

圖 15 TextInput與Uniquerow

1.4.4.    元數據與數據關係。

  Trans中的ETL過程(每一個step)以行爲單位處理,其中行的元數據信息RowMeta和數據信息統一保存在RowSet對象中。

  在RowSet中RowMeta的成員的調試結果以下。可見rowMeta儲存了每列數據的名稱和類型。第一列列名flag,數據是長度爲1的String;第二列列名id…

 

RowSet的數據信息在queArray隊列中,調試結果以下:能夠看出第一個數據元素是一個Object包含了3列,數據內容爲(N,1,a…)

 

相關文章
相關標籤/搜索