kettle系列-6.kettle實現多字段字典快速翻譯

  在數據清洗轉換中,常見的字典翻譯,如性別在原表中是1(男)、2(女)等,相似還有不少較大的字典須要翻譯,若同一個表中有不少個字典須要翻譯,採用【數據庫查詢】方式翻譯的話效率就會至關低下。java

  這裏採用java代碼來翻譯,初始化時將相關字典加載到內存中,此後就不須要再查詢數據庫了,而後每條記錄進來就翻譯各個字典,其實很簡單,只是【java代碼】這個控件限制較多,不支持泛型、this並非步驟自己、能使用的方法都列在了左側,使用起來不是很方便。關於字典翻譯這個事,其實寫一個專門的控件也不難,也是很不錯的一個主意,只是沒有真正完整的寫個一個控件(後臺實現和ui部分等),要寫的話比較耗時,暫時就採用java代碼實現,有時間能夠考慮寫這麼個控件。數據庫

  算了廢話太多,測試轉換以下圖緩存

  自定義常量就是模擬了幾條數據,你能夠直接傳遞要翻譯的數據,寫日誌就是看看翻譯結果,【java代碼】中的代碼以下:測試

import java.util.Arrays;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import org.pentaho.di.core.database.Database;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.core.Const;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    if (first){
      first = false;

      /* TODO: Your code here. (Using info fields)

      FieldHelper infoField = get(Fields.Info, "info_field_name");

      RowSet infoStream = findInfoRowSet("info_stream_tag");

      Object[] infoRow = null;

      int infoRowCount = 0;

      // Read all rows from info step before calling getRow() method, which returns first row from any
      // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
      while((infoRow = getRowFrom(infoStream)) != null){

        // do something with info data
        infoRowCount++;
      }
      */
    }

    Object[] r = getRow();

    if (r == null) {
      setOutputDone();
      return false;
    }
    //獲取要翻譯字典的代碼
    String kkdm = get(Fields.In, "kkdm").getString(r);
    String cllx = get(Fields.In, "cllx").getString(r);

    // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
    // enough to handle any new fields you are creating in this step.
    r = createOutputRow(r, data.outputRowMeta.size());
    //翻譯字典並設置到輸出字段中
    get(Fields.Out, "kkmc").setValue(r, kkzdMap.get(kkdm));
    get(Fields.Out, "cxmc").setValue(r, cxzdMap.get(cllx));
    
    
    /* TODO: Your code here. (See Sample)
    
    // Get the value from an input field
    String foobar = get(Fields.In, "a_fieldname").getString(r);

    foobar += "bar";
    
    // Set a value in a new output field
    get(Fields.Out, "output_fieldname").setValue(r, foobar);

    */
    // Send the row on to the next step.
    putRow(data.outputRowMeta, r);

    return true;
}

//定義字典緩存Map
public static Map cxzdMap = new HashMap();
public static Map kkzdMap = new HashMap();
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)
{
      try {
        //鏈接數據庫,pg_test是數據庫名稱,在左側db鏈接處建立
        DatabaseMeta dbmeta = DatabaseMeta.findDatabase(this.getTrans().getRepository().readDatabases(), "pg_test");
        Database zddb = new Database(this.getTrans(),dbmeta);
        logBasic(zddb.getObjectName());
        zddb.shareVariablesWith( this.getTrans() );
        zddb.setQueryLimit( Const.toInt( this.getTrans().environmentSubstitute( "100" ), 0 ) );

          try {

              if ( getTransMeta().isUsingUniqueConnections() ) {
                  synchronized ( getTrans() ) { 
                      zddb.connect( getTrans().getTransactionId(), "zdfy" );
                    logBasic(zddb.getObjectId().toString());
                  }
                } else {
                    zddb.connect( getTrans().getTransactionId(), null );
                }
            }catch ( KettleException e ) {
                logError( "An error occurred, processing will be stopped: " + e.getMessage() );
                setErrors( 1 );
                stopAll();
             }
          if ( dbmeta.isRequiringTransactionsOnQueries() ) {
              zddb.setCommit( 100 ); 
          }
          logBasic(Arrays.asList(zddb.getTablenames()).toString());
          //查詢字典表,獲取字典數據本緩存到對應Map中
          List list = zddb.getRows("SELECT * from t_cxzd", 1000);
          for(int i=0;i<list.size();i++){
              Object[] objs = (Object[]) list.get(i);
              cxzdMap.put(objs[0].toString(), objs[1].toString());
          }
          logBasic(cxzdMap.entrySet().toString());
          list = zddb.getRows("SELECT * from t_kkzd", 1000);
          for(int i=0;i<list.size();i++){
              Object[] objs = (Object[]) list.get(i);
              kkzdMap.put(objs[0].toString(), objs[1].toString());
          }
          logBasic(kkzdMap.entrySet().toString());
          zddb.disconnect();
    } catch (KettleException e1) {
        logError("獲取數據庫失敗", e1);
    }
    return parent.initImpl(stepMetaInterface, stepDataInterface);
    
}
相關文章
相關標籤/搜索