1、實現目標java
源數據庫的數據更新或者刪除以後,目標數據庫的數據跟着更新或刪除,總體流程截圖以下:數據庫
1、準備工做spa
源數據庫ORACLE 目標數據庫MongoDB,在源數據庫添加刪除、更新觸發器3d
2、操做步驟code
import java.util.List; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.row.RowDataUtil; import org.pentaho.di.core.row.RowMeta; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.ValueMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; private Object[] previousRow;//上一行 private RowSet t1 = null;//業務表步驟 private RowSet t2 = null;//刪除步驟 public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r = getRow(); //獲取輸入行 if ( first ) { if ( getInputRowMeta() == null ) { setOutputDone();//設置輸出完成 return false; } } if ( r == null ) { // 若是當前行爲null if ( previousRow != null ) {//若是上一行不爲null //是最後一行 boolean valid=true; previousRow = createOutputRow(previousRow, data.outputRowMeta.size()); Trans trans=getTrans();//獲取轉換實例 if (trans != null){ String sync_val = get(Fields.In, "ID").getString(previousRow);//獲取ID trans.setVariable("LAST_SYNC_VAL", sync_val);//設置變量的值 } String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//獲取操做類型是刪除仍是更新 String keyid= get(Fields.In, "DATAID").getString(previousRow);//獲取操做類型是刪除仍是更新 //Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size()); //get(Fields.Out, "KEYID").setValue(rowData,keyid); //putRowTo(data.outputRowMeta, previousRow,t2); if(OpType.equals("UPDATE")){//驗證經過 putRowTo(data.outputRowMeta, previousRow,t1); } else { putRowTo(data.outputRowMeta, previousRow,t2); } } setOutputDone();//設置輸出完成 return false;//返回false表示不用再繼續處理processRow } if ( !first ) {//不是第一次執行,由於第一次執行時previousRow必定是Null //不是最後一行 boolean valid=true; String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//獲取操做類型是刪除仍是更新 String keyid= get(Fields.In, "DATAID").getString(previousRow);//獲取操做類型是刪除仍是更新 //Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size()); //get(Fields.Out, "KEYID").setValue(rowData,keyid); //putRowTo(data.outputRowMeta, previousRow,t2); if(OpType.equals("UPDATE")){ putRowTo(data.outputRowMeta, previousRow,t1); } else { putRowTo(data.outputRowMeta, previousRow,t2); } } previousRow = r;//把當前行設爲下一次執行的上一行 if ( first ) {//若是是首次執行 first = false; t1 = findTargetRowSet("dataupdate");//業務表步驟 t2 = findTargetRowSet("datadelete");//數據刪除步驟 } return true;//返回true表示還要繼續處理processRow }
3.若是跳轉至了MongoDB Delete,則根據ID對目標庫進行刪除。Mongodb delete組件配置以下:blog
JSON query中的{ID:"?{DATAID}"}表示刪除ID等於傳進來的參數DATAID的全部數據,Execute for each row要選擇上,表示執行每一行數據。 文檔
4.若是經過JAVA代碼2判斷爲更新的話,則流程將跳轉至字段選擇組件,只獲取主鍵ID,此步驟很是重要,由於要根據ID去源表中獲取等更新的那條數據。get
5.選擇表輸入組件,該步驟是根據上一步傳入的ID獲取待更新的那一條數據io
PS:獲取SQL查詢語句:此處寫入SQL語句,裏邊的?是變量替換,下邊要勾選上"替換SQL語句裏的變量",從步驟插入數據要選擇上一步,勾選上執行每一行。class
6.下邊的步驟:流查詢、JAVA代碼是對數據進行清洗,字典替換,此處再也不解釋
7.最後一步:Mongodb output輸出須要詳細設置
output options選項卡勾選update modifier update
Mongo文檔字段配置:ID爲主鍵匹配字段,匹配字段更新爲Y 修改器設置爲N/A表示不對主鍵更新