作數據倉庫時,須要將業務系統CRM抽取到數據倉庫的緩衝層,業務系統使用的是SqlServer數據庫,數據倉庫的緩衝層使用的是mysql數據庫,爲實現數據庫的遷移,即將SqlServer數據庫中的全部表與數據遷移到Mysql。 java
整套流程分爲:2個job,4個trans。使用到的Trans插件:表輸入、字段選擇、複製記錄到結果、從結果獲取記錄、設置變量、java腳本、表輸出。mysql
一、表數據抽取做業:sql
做用:首先獲取數據庫中全部的表名稱 而後調用子Job進行表的建立、數據抽取數據庫
2.表名稱獲取流程 ide
要遷移的源庫表名稱獲取,並設置到結果集,爲下面的job使用。其中的表輸入使用的是show tables,複製數據庫中全部的表,也能夠從表中或者excel中輸入,實現更加小粒度的控制。
show tables 結果爲Tables_in_數據庫名稱,和具體數據庫有關,故須要更名
函數
三、子做業: 實現單個表格的建立及抽取spa
四、表名稱變量設置插件
上一步的子轉換 設計
五、入庫表結構建立 excel
執行的Java代碼以下
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { // First, get a row from the default input hop Object[] r = getRow(); org.pentaho.di.core.database.DatabaseMeta dbmeta = null; System.out.println(123); System.out.println( getTrans().getRepository()); System.out.println(456); java.util.List list = getTrans().getRepository().readDatabases(); if(list != null && !list.isEmpty()) { for(int i=0;i<list.size();i++) { dbmeta = (org.pentaho.di.core.database.DatabaseMeta)list.get(i); //test1 爲數據庫名稱 if("test1".equalsIgnoreCase(dbmeta.getName())) { break; } } } if(dbmeta!=null) { org.pentaho.di.core.database.Database db=new org.pentaho.di.core.database.Database(dbmeta); try { db.connect(); String tablename = getVariable("TABLENAME"); logBasic("開始建立表:" + tablename); if(tablename!=null && tablename.trim().length()>0) { String sql = db.getDDLCreationTable(tablename, data.inputRowMeta);//${TABLENAME} db.execStatement(sql.replace(";", "")); logBasic(sql); } } catch(Exception e) { logError("建立表出現異常",e); }finally{ db.disconnect(); } } return false; }
六、表數據抽取流程
一、源表若存在有blob的表,會有問題,多是因爲表輸出沒有指定字段的緣由
二、以上的操做使用的是倉庫,kettle repo會報錯
三、將原文中String sql = db.getDDL(tablename, data.inputRowMeta);函數名 getDDL 改成 getDDLCreationTable
四、去除了原文中建立表以前表輸入一個操做,原文當有空表須要複製時候,會報錯
原文地址: 用Kettle的一套流程完成對整個數據庫遷移
data-integration\samples\jobs\process all tables 實現整個數據庫的遷移,
http://pan.baidu.com/s/1nt7LOj3