【備忘】使用Kettle(PDI)進行ETL

Kettle是一款開源的ETL工具,純java編寫,能夠在Window、Linux、Unix上運行,綠色無需安裝,數據抽取高效穩定。java

現有一個需求。將數據庫1中一個表(原表)中的數據抽取出來放到數據庫2中另一個表(目標表)中。並實時更新。數據庫

工具爲kettle 5.0

工具爲工具爲kettle 5.0 (spoon 5.0)工具

Kettle中有兩種腳本文件,transformation(轉換)和job(做業),transformation完成針對數據的基礎轉換,job則完成整個工做流的控制。code

直接拖拽你想要的組件到面板 而後用鼠標中間滾輪或者按住shift鼠標左鍵拖拽就能夠鏈接他們了。orm

下面談談主要的思路:ip

1.先count目標表 獲得目標表的記錄條數。get

表輸入2 3

2.查詢原表中目標表沒有的部分 (這裏用了自增id 即 id比目標表條數大的部分就是目標表不存在的)工作流

表輸入3 2

或者用偏移量以下it

表輸入5

由於有要替換的變量(就是那個問號)因此記得勾選這三個:io

唔

  1. 而後寫原表與目標表相對應的字段:

字段選擇2

4.最後傳給目標表: 表輸出2

來張 轉換1 的全家福 其中標輸入3 2 是用的自增id判斷是否有新加入的行 表輸入5是用的偏移量 這兩個選一個(我的傾向偏移量)。 轉換1

===========================以上的爲轉換1==================================

如下爲 做業:

做業

其中轉換2爲: count下原表獲得原錶行數,而後與目標表比較一下 講結果傳入一個變量 轉換2

變量是這麼設置的 設置變量

在job裏這麼接收比較的:

接收變量

最後 腳本能夠用Java代碼調用kettle 5.0的和3.0的不同 以下

<!-- lang: java -->
package test;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;

public class Test {
	public void runJob(String jobname){
		  try {
		   KettleEnvironment.init();
		   //jobname 是Job腳本的路徑及名稱
		   JobMeta jobMeta = new JobMeta(jobname, null);
		   Job job = new Job(null, jobMeta);
		   //向Job 腳本傳遞參數,腳本中獲取參數值:${參數名}
		   //job.setVariable(paraname, paravalue);
		   job.start();
		   job.waitUntilFinished();
		   if (job.getErrors() > 0) {
		 
		    System.out.println("decompress fail!");
		   }
		  } catch (KettleException e) {
		   System.out.println(e);
		  }
		 }
		 
		   //調用Transformation示例:
		 
		public void runTrans(String filename) throws Exception {
		 
		    KettleEnvironment.init();
		    TransMeta transMeta = new TransMeta(filename);
		    Trans trans = new Trans(transMeta);
		    trans.prepareExecution(null);
		    trans.startThreads();
		    trans.waitUntilFinished();
		    
		    if (trans.getErrors()!=0) {
		      System.out.println("Error");
		    }
		  }


	public static void main(String[] args) {
		try {
			//new Test().runJob("script/j1.kjb");
			new Test().runTrans("script/t1.ktr");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
相關文章
相關標籤/搜索