KETTLE實現數據的刪除和更新

1、實現目標java

  源數據庫的數據更新或者刪除以後,目標數據庫的數據跟着更新或刪除,總體流程截圖以下:數據庫

  

1、準備工做spa

源數據庫ORACLE  目標數據庫MongoDB,在源數據庫添加刪除、更新觸發器3d

2、操做步驟code

  1. 添加表輸入組件,鏈接ORACLE觸發器記錄表
  2. 添加JAVA代碼組件,進行步驟跳轉,根據輸入的數據判斷是刪除或者更新,若是是刪除,則跳轉至MongoDB Delete步驟中,若是是更新的話,跳轉至字段選擇步驟中。JAVA代碼中的詳細信息以下:
  3. import java.util.List;
    
    import org.pentaho.di.core.exception.KettleException;
    import org.pentaho.di.core.row.RowDataUtil;
    import org.pentaho.di.core.row.RowMeta;
    import org.pentaho.di.core.row.RowMetaInterface;
    import org.pentaho.di.core.row.ValueMeta;
    import org.pentaho.di.trans.Trans;
    import org.pentaho.di.trans.TransMeta;
    
    private Object[] previousRow;//上一行
    private RowSet t1 = null;//業務表步驟
    private RowSet t2 = null;//刪除步驟
    
    
    
    public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
    {
        Object[] r = getRow(); //獲取輸入行
    
        if ( first ) {
          if ( getInputRowMeta() == null ) {
            setOutputDone();//設置輸出完成
            return false;
          }
        }
    
        if ( r == null ) { // 若是當前行爲null
          if ( previousRow != null ) {//若是上一行不爲null
            //是最後一行
    
            boolean valid=true;
            previousRow = createOutputRow(previousRow, data.outputRowMeta.size());
            Trans trans=getTrans();//獲取轉換實例
            if (trans != null){
                String sync_val = get(Fields.In, "ID").getString(previousRow);//獲取ID
                trans.setVariable("LAST_SYNC_VAL", sync_val);//設置變量的值
            }
            String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//獲取操做類型是刪除仍是更新
            String keyid= get(Fields.In, "DATAID").getString(previousRow);//獲取操做類型是刪除仍是更新
            
            //Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
            //get(Fields.Out, "KEYID").setValue(rowData,keyid);
            //putRowTo(data.outputRowMeta, previousRow,t2);
    
            if(OpType.equals("UPDATE")){//驗證經過            
                putRowTo(data.outputRowMeta, previousRow,t1);
            }
            else
            {
                putRowTo(data.outputRowMeta, previousRow,t2);
            }
                
          }
          setOutputDone();//設置輸出完成
          return false;//返回false表示不用再繼續處理processRow
        }
    
        if ( !first ) {//不是第一次執行,由於第一次執行時previousRow必定是Null
            //不是最後一行
            boolean valid=true;    
            String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//獲取操做類型是刪除仍是更新
            String keyid= get(Fields.In, "DATAID").getString(previousRow);//獲取操做類型是刪除仍是更新
    
            //Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
            //get(Fields.Out, "KEYID").setValue(rowData,keyid);
            //putRowTo(data.outputRowMeta, previousRow,t2);
            if(OpType.equals("UPDATE")){
                putRowTo(data.outputRowMeta, previousRow,t1);
            }
            else
            {
                putRowTo(data.outputRowMeta, previousRow,t2);
            }
        }
        previousRow = r;//把當前行設爲下一次執行的上一行
        if ( first ) {//若是是首次執行
              first = false;
            t1 = findTargetRowSet("dataupdate");//業務表步驟
            t2 = findTargetRowSet("datadelete");//數據刪除步驟
        }
    
        return true;//返回true表示還要繼續處理processRow
    }

    3.若是跳轉至了MongoDB Delete,則根據ID對目標庫進行刪除。Mongodb delete組件配置以下:blog

  JSON query中的{ID:"?{DATAID}"}表示刪除ID等於傳進來的參數DATAID的全部數據,Execute for each row要選擇上,表示執行每一行數據。          文檔

  4.若是經過JAVA代碼2判斷爲更新的話,則流程將跳轉至字段選擇組件,只獲取主鍵ID,此步驟很是重要,由於要根據ID去源表中獲取等更新的那條數據。get

5.選擇表輸入組件,該步驟是根據上一步傳入的ID獲取待更新的那一條數據io

PS:獲取SQL查詢語句:此處寫入SQL語句,裏邊的?是變量替換,下邊要勾選上"替換SQL語句裏的變量",從步驟插入數據要選擇上一步,勾選上執行每一行。class

6.下邊的步驟:流查詢、JAVA代碼是對數據進行清洗,字典替換,此處再也不解釋

7.最後一步:Mongodb output輸出須要詳細設置

output options選項卡勾選update  modifier update

Mongo文檔字段配置:ID爲主鍵匹配字段,匹配字段更新爲Y 修改器設置爲N/A表示不對主鍵更新

相關文章
相關標籤/搜索