ODI數據增量同步(一)

背景

最近在項目上開始使用ODI,需求是要將一個庫中的表同步到另外一個庫中,本是一個正常的需求,但後面又續了一條:

要使用增量同步,不能先將數據所有刪除再插入數據

這裏可使用ODI Studio的CDC方式進行同步數據,但CDC方式須要在表上建trigger,會對錶的性能形成影響,因此該方案被否掉了,基於lastupdatedate的CDC同步雖然能夠部分避免性能損失問題,但這種方式須要表中的lastupdatedate對任何數據更新都有相應變化,對於一些老系統難以實現,使用IKM SQL Incremental Update又沒有刪除操做,不能知足需求,因此方案肯定爲修改現有的知識模塊,使其知足需求。

1.以下是一個增量同步的Mapping

clipboard.png

目標表集成類型爲增量同步
clipboard.png數據庫

加載知識模塊爲LKM SQL to Oracle
clipboard.pngapp

集成知識模塊IKM Oracle Incremental Update
clipboard.png性能

源表
clipboard.png測試

目標表spa

clipboard.png

將源表的數據修改日誌

clipboard.png

運行接口
clipboard.pngcode

能夠看到,接口運行完成後,修改的數據正常同步,但刪除的數據沒有起做用,有些接口爲了處理這種狀況,就在運行時先將目標表數據所有刪除掉,再插入數據索引

將DELETE ALL設置爲true能夠在接口插入數據前將數據所有刪除,理論上選擇TRUNCATE也行,但沒有效果
clipboard.png接口

增量同步接口

運行過程以下
clipboard.pngip

clipboard.png

clipboard.png

步驟詳解

1.Drop work table

接口同步數據時會在目標端數據庫建立工做表,以C$_0開頭,這條命令在知識模塊中配置了忽略錯誤,因此出錯也不會形成接口錯誤

drop table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE purge

2.Create work table

建立工做表,工做表字段基於源表

create table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE
(
    ID    NUMBER NULL,
    HOST    VARCHAR2(200) NULL,
    POST    VARCHAR2(200) NULL,
    JMS_SERVICE_NAME    VARCHAR2(200) NULL,
    JMS_NAME    VARCHAR2(200) NULL,
    JMS_SERVICE_TARGET    VARCHAR2(200) NULL,
    JMS_SERVICE_HEALTH    VARCHAR2(200) NULL,
    MESSAGES_CURRENT_COUNT    NUMBER NULL,
    MESSAGES_PENDING_COUNT    NUMBER NULL,
    CONSUMERS_CURRENT_COUNT    NUMBER NULL,
    CONSUMERS_HIGH_COUNT    NUMBER NULL,
    CONSUMERS_TOTAL_COUNT    NUMBER NULL,
    MESSAGES_HIGH_COUNT    NUMBER NULL,
    MESSAGES_RECEIVED_COUNT    NUMBER NULL,
    OBJECT_VERSION_NUMBER    NUMBER NULL,
    CREATION_DATE    DATE NULL,
    CREATED_BY    VARCHAR2(200) NULL,
    LAST_UPDATE_DATE    DATE NULL,
    LAST_UPDATED_BY    VARCHAR2(200) NULL,
    DATA_STATUS    VARCHAR2(200) NULL
)
NOLOGGING

3.Load data

讀取源表的數據,並插入到工做表中

目標代碼

insert  into TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE
(
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
)
values
(
    :ID,
    :HOST,
    :POST,
    :JMS_SERVICE_NAME,
    :JMS_NAME,
    :JMS_SERVICE_TARGET,
    :JMS_SERVICE_HEALTH,
    :MESSAGES_CURRENT_COUNT,
    :MESSAGES_PENDING_COUNT,
    :CONSUMERS_CURRENT_COUNT,
    :CONSUMERS_HIGH_COUNT,
    :CONSUMERS_TOTAL_COUNT,
    :MESSAGES_HIGH_COUNT,
    :MESSAGES_RECEIVED_COUNT,
    :OBJECT_VERSION_NUMBER,
    :CREATION_DATE,
    :CREATED_BY,
    :LAST_UPDATE_DATE,
    :LAST_UPDATED_BY,
    :DATA_STATUS
)

源代碼

select    
    ODI_WLS_JMS_INC_SOURCE.ID    AS ID,
    ODI_WLS_JMS_INC_SOURCE.HOST    AS HOST,
    ODI_WLS_JMS_INC_SOURCE.POST    AS POST,
    ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_NAME    AS JMS_SERVICE_NAME,
    ODI_WLS_JMS_INC_SOURCE.JMS_NAME    AS JMS_NAME,
    ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_TARGET    AS JMS_SERVICE_TARGET,
    ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_HEALTH    AS JMS_SERVICE_HEALTH,
    ODI_WLS_JMS_INC_SOURCE.MESSAGES_CURRENT_COUNT    AS MESSAGES_CURRENT_COUNT,
    ODI_WLS_JMS_INC_SOURCE.MESSAGES_PENDING_COUNT    AS MESSAGES_PENDING_COUNT,
    ODI_WLS_JMS_INC_SOURCE.CONSUMERS_CURRENT_COUNT    AS CONSUMERS_CURRENT_COUNT,
    ODI_WLS_JMS_INC_SOURCE.CONSUMERS_HIGH_COUNT    AS CONSUMERS_HIGH_COUNT,
    ODI_WLS_JMS_INC_SOURCE.CONSUMERS_TOTAL_COUNT    AS CONSUMERS_TOTAL_COUNT,
    ODI_WLS_JMS_INC_SOURCE.MESSAGES_HIGH_COUNT    AS MESSAGES_HIGH_COUNT,
    ODI_WLS_JMS_INC_SOURCE.MESSAGES_RECEIVED_COUNT    AS MESSAGES_RECEIVED_COUNT,
    ODI_WLS_JMS_INC_SOURCE.OBJECT_VERSION_NUMBER    AS OBJECT_VERSION_NUMBER,
    ODI_WLS_JMS_INC_SOURCE.CREATION_DATE    AS CREATION_DATE,
    ODI_WLS_JMS_INC_SOURCE.CREATED_BY    AS CREATED_BY,
    ODI_WLS_JMS_INC_SOURCE.LAST_UPDATE_DATE    AS LAST_UPDATE_DATE,
    ODI_WLS_JMS_INC_SOURCE.LAST_UPDATED_BY    AS LAST_UPDATED_BY,
    ODI_WLS_JMS_INC_SOURCE.DATA_STATUS    AS DATA_STATUS
from    ODI.ODI_WLS_JMS_INC_SOURCE ODI_WLS_JMS_INC_SOURCE
where    (1=1)

4.Analyze work table

記錄操做日誌

BEGIN
DBMS_STATS.GATHER_TABLE_STATS (
    ownname =>    'TESTUSER',
    tabname =>    'C$_0ODI_WLS_JMS_INC_SOURCE',
    estimate_percent =>    DBMS_STATS.AUTO_SAMPLE_SIZE
);
END;

5.Drop flow table

刪除數據插入表,數據插入表是另外一張同步中間表,以I$_開頭,每次接口執行時執行刪除操做,避免上次運行後沒有刪除形成問題,因爲配置了忽略錯誤,因此出錯也不會形成接口問題

drop table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET

6.Create flow table I$

建立數據插入表

create table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
(
    ID        NUMBER NULL,
    HOST        VARCHAR2(200) NULL,
    POST        VARCHAR2(200) NULL,
    JMS_SERVICE_NAME        VARCHAR2(200) NULL,
    JMS_NAME        VARCHAR2(200) NULL,
    JMS_SERVICE_TARGET        VARCHAR2(200) NULL,
    JMS_SERVICE_HEALTH        VARCHAR2(200) NULL,
    MESSAGES_CURRENT_COUNT        NUMBER NULL,
    MESSAGES_PENDING_COUNT        NUMBER NULL,
    CONSUMERS_CURRENT_COUNT        NUMBER NULL,
    CONSUMERS_HIGH_COUNT        NUMBER NULL,
    CONSUMERS_TOTAL_COUNT        NUMBER NULL,
    MESSAGES_HIGH_COUNT        NUMBER NULL,
    MESSAGES_RECEIVED_COUNT        NUMBER NULL,
    OBJECT_VERSION_NUMBER        NUMBER NULL,
    CREATION_DATE        DATE NULL,
    CREATED_BY        VARCHAR2(200) NULL,
    LAST_UPDATE_DATE        DATE NULL,
    LAST_UPDATED_BY        VARCHAR2(200) NULL,
    DATA_STATUS        VARCHAR2(400) NULL,
    IND_UPDATE        CHAR(1)
)
NOLOGGING

7.Delete target table

刪除目標表,上面配置DELETE ALL後,執行接口會有該步驟,在數據插入前刪除全部數據,若是DELETE ALL 選擇否,就不會執行這條語句,結果就是接口不一樣步刪除的數據

delete from TESTUSER.ODI_WLS_JMS_INC_TARGET

8.Insert flow into I$ table

向I$_表中插入數據,使用NOT EXIST將目標表中已存在的數據過濾掉,向I$_表中插入全部不匹配的數據,標識符都爲I

/* DETECTION_STRATEGY = NOT_EXISTS */
insert into    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
(
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS,
    IND_UPDATE
)
select 
ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS,
    IND_UPDATE
 from (
select      
    ODI_WLS_JMS_INC_SOURCE_A.ID AS ID,
    ODI_WLS_JMS_INC_SOURCE_A.HOST AS HOST,
    ODI_WLS_JMS_INC_SOURCE_A.POST AS POST,
    ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_NAME AS JMS_SERVICE_NAME,
    ODI_WLS_JMS_INC_SOURCE_A.JMS_NAME AS JMS_NAME,
    ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_TARGET AS JMS_SERVICE_TARGET,
    ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_HEALTH AS JMS_SERVICE_HEALTH,
    ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_CURRENT_COUNT AS MESSAGES_CURRENT_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_PENDING_COUNT AS MESSAGES_PENDING_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_CURRENT_COUNT AS CONSUMERS_CURRENT_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_HIGH_COUNT AS CONSUMERS_HIGH_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_TOTAL_COUNT AS CONSUMERS_TOTAL_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_HIGH_COUNT AS MESSAGES_HIGH_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_RECEIVED_COUNT AS MESSAGES_RECEIVED_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER,
    ODI_WLS_JMS_INC_SOURCE_A.CREATION_DATE AS CREATION_DATE,
    ODI_WLS_JMS_INC_SOURCE_A.CREATED_BY AS CREATED_BY,
    ODI_WLS_JMS_INC_SOURCE_A.LAST_UPDATE_DATE AS LAST_UPDATE_DATE,
    ODI_WLS_JMS_INC_SOURCE_A.LAST_UPDATED_BY AS LAST_UPDATED_BY,
    ODI_WLS_JMS_INC_SOURCE_A.DATA_STATUS AS DATA_STATUS,
    'I' IND_UPDATE
from    TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE ODI_WLS_JMS_INC_SOURCE_A
where    (1=1)
) S
where NOT EXISTS 
    ( select 1 from TESTUSER.ODI_WLS_JMS_INC_TARGET T
    where    T.ID    = S.ID
    and    T.HOST    = S.HOST 
         and ((T.POST = S.POST) or (T.POST IS NULL and S.POST IS NULL)) and
        ((T.JMS_SERVICE_NAME = S.JMS_SERVICE_NAME) or (T.JMS_SERVICE_NAME IS NULL and S.JMS_SERVICE_NAME IS NULL)) and
        ((T.JMS_NAME = S.JMS_NAME) or (T.JMS_NAME IS NULL and S.JMS_NAME IS NULL)) and
        ((T.JMS_SERVICE_TARGET = S.JMS_SERVICE_TARGET) or (T.JMS_SERVICE_TARGET IS NULL and S.JMS_SERVICE_TARGET IS NULL)) and
        ((T.JMS_SERVICE_HEALTH = S.JMS_SERVICE_HEALTH) or (T.JMS_SERVICE_HEALTH IS NULL and S.JMS_SERVICE_HEALTH IS NULL)) and
        ((T.MESSAGES_CURRENT_COUNT = S.MESSAGES_CURRENT_COUNT) or (T.MESSAGES_CURRENT_COUNT IS NULL and S.MESSAGES_CURRENT_COUNT IS NULL)) and
        ((T.MESSAGES_PENDING_COUNT = S.MESSAGES_PENDING_COUNT) or (T.MESSAGES_PENDING_COUNT IS NULL and S.MESSAGES_PENDING_COUNT IS NULL)) and
        ((T.CONSUMERS_CURRENT_COUNT = S.CONSUMERS_CURRENT_COUNT) or (T.CONSUMERS_CURRENT_COUNT IS NULL and S.CONSUMERS_CURRENT_COUNT IS NULL)) and
        ((T.CONSUMERS_HIGH_COUNT = S.CONSUMERS_HIGH_COUNT) or (T.CONSUMERS_HIGH_COUNT IS NULL and S.CONSUMERS_HIGH_COUNT IS NULL)) and
        ((T.CONSUMERS_TOTAL_COUNT = S.CONSUMERS_TOTAL_COUNT) or (T.CONSUMERS_TOTAL_COUNT IS NULL and S.CONSUMERS_TOTAL_COUNT IS NULL)) and
        ((T.MESSAGES_HIGH_COUNT = S.MESSAGES_HIGH_COUNT) or (T.MESSAGES_HIGH_COUNT IS NULL and S.MESSAGES_HIGH_COUNT IS NULL)) and
        ((T.MESSAGES_RECEIVED_COUNT = S.MESSAGES_RECEIVED_COUNT) or (T.MESSAGES_RECEIVED_COUNT IS NULL and S.MESSAGES_RECEIVED_COUNT IS NULL)) and
        ((T.OBJECT_VERSION_NUMBER = S.OBJECT_VERSION_NUMBER) or (T.OBJECT_VERSION_NUMBER IS NULL and S.OBJECT_VERSION_NUMBER IS NULL)) and
        ((T.CREATION_DATE = S.CREATION_DATE) or (T.CREATION_DATE IS NULL and S.CREATION_DATE IS NULL)) and
        ((T.CREATED_BY = S.CREATED_BY) or (T.CREATED_BY IS NULL and S.CREATED_BY IS NULL)) and
        ((T.LAST_UPDATE_DATE = S.LAST_UPDATE_DATE) or (T.LAST_UPDATE_DATE IS NULL and S.LAST_UPDATE_DATE IS NULL)) and
        ((T.LAST_UPDATED_BY = S.LAST_UPDATED_BY) or (T.LAST_UPDATED_BY IS NULL and S.LAST_UPDATED_BY IS NULL)) and
        ((T.DATA_STATUS = S.DATA_STATUS) or (T.DATA_STATUS IS NULL and S.DATA_STATUS IS NULL))
        )

9.Create Index on flow table

爲I$_表建立索引

create index     TESTUSER.I$_ODI_WLS_JMS_INC_TARGET_UK
on    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET (ID, HOST)
NOLOGGING

10.Analyze integration table

記錄日誌

begin
    dbms_stats.gather_table_stats(
    ownname => 'TESTUSER',
    tabname => 'I$_ODI_WLS_JMS_INC_TARGET',
    estimate_percent => dbms_stats.auto_sample_size
    );
end;

11.create check table

建立校驗表,用來存放插入失敗的數據

create table TESTUSER.SNP_CHECK_TAB
(
    CATALOG_NAME    VARCHAR2(100 CHAR) NULL ,
    SCHEMA_NAME    VARCHAR2(100 CHAR) NULL ,
    RESOURCE_NAME    VARCHAR2(100 CHAR) NULL,
    FULL_RES_NAME    VARCHAR2(100 CHAR) NULL,
    ERR_TYPE        VARCHAR2(1 CHAR) NULL,
    ERR_MESS        VARCHAR2(250 CHAR) NULL ,
    CHECK_DATE    DATE NULL,
    ORIGIN        VARCHAR2(250 CHAR) NULL,
    CONS_NAME    VARCHAR2(128 CHAR) NULL,
    CONS_TYPE        VARCHAR2(2 CHAR) NULL,
    ERR_COUNT        NUMBER(10) NULL
)

12.delete previous check sum

刪除之前的校驗數據

delete from    TESTUSER.SNP_CHECK_TAB
where    SCHEMA_NAME    = 'TESTUSER'
and    ORIGIN         = '(171)mdsProject.My_increment'
and    ERR_TYPE         = 'F'

13.create error table

建立錯誤表,用來記錄插入錯誤的數據

create table TESTUSER.E$_ODI_WLS_JMS_INC_TARGET
(
    ODI_ROW_ID         UROWID,
    ODI_ERR_TYPE        VARCHAR2(1 CHAR) NULL, 
    ODI_ERR_MESS        VARCHAR2(250 CHAR) NULL,
    ODI_CHECK_DATE    DATE NULL, 
    ID    NUMBER NULL,
    HOST    VARCHAR2(200) NULL,
    POST    VARCHAR2(200) NULL,
    JMS_SERVICE_NAME    VARCHAR2(200) NULL,
    JMS_NAME    VARCHAR2(200) NULL,
    JMS_SERVICE_TARGET    VARCHAR2(200) NULL,
    JMS_SERVICE_HEALTH    VARCHAR2(200) NULL,
    MESSAGES_CURRENT_COUNT    NUMBER NULL,
    MESSAGES_PENDING_COUNT    NUMBER NULL,
    CONSUMERS_CURRENT_COUNT    NUMBER NULL,
    CONSUMERS_HIGH_COUNT    NUMBER NULL,
    CONSUMERS_TOTAL_COUNT    NUMBER NULL,
    MESSAGES_HIGH_COUNT    NUMBER NULL,
    MESSAGES_RECEIVED_COUNT    NUMBER NULL,
    OBJECT_VERSION_NUMBER    NUMBER NULL,
    CREATION_DATE    DATE NULL,
    CREATED_BY    VARCHAR2(200) NULL,
    LAST_UPDATE_DATE    DATE NULL,
    LAST_UPDATED_BY    VARCHAR2(200) NULL,
    DATA_STATUS    VARCHAR2(400) NULL,
    ODI_ORIGIN        VARCHAR2(250 CHAR) NULL,
    ODI_CONS_NAME    VARCHAR2(128 CHAR) NULL,
    ODI_CONS_TYPE        VARCHAR2(2 CHAR) NULL,
    ODI_PK            VARCHAR2(32 CHAR) PRIMARY KEY,
    ODI_SESS_NO        VARCHAR2(36 CHAR)
)

14.delete previous errors

刪除錯誤表以前數據

delete from     TESTUSER.E$_ODI_WLS_JMS_INC_TARGET
where    (ODI_ERR_TYPE = 'S'    and 'F' = 'S')
or    (ODI_ERR_TYPE = 'F'    and ODI_ORIGIN = '(171)mdsProject.My_increment')

15.Create index on PK

爲I$_表建立索引

/* FLOW CONTROL CREATE THE INDEX ON I$TABLE */
create index     TESTUSER.I$_ODI_WLS_JMS_INC_TARGET_PK
on    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET (ID, HOST)

16.insert PK errors

插入主鍵不惟一的行

insert into TESTUSER.E$_ODI_WLS_JMS_INC_TARGET
(
    ODI_PK,
    ODI_SESS_NO,
    ODI_ROW_ID,
    ODI_ERR_TYPE,
    ODI_ERR_MESS,
    ODI_ORIGIN,
    ODI_CHECK_DATE,
    ODI_CONS_NAME,
    ODI_CONS_TYPE,
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
)
select    SYS_GUID(),
    'abb01f39-16b9-41ba-9820-7733e137f237', 
    rowid,
    'F', 
    'ODI-15064: 主鍵 ODI_WLS_JMS_INC_TARGET_PK 不是惟一的。',
    '(171)mdsProject.My_increment',
    sysdate,
    'ODI_WLS_JMS_INC_TARGET_PK',
    'PK',    
    ODI_WLS_JMS_INC_TARGET.ID,
    ODI_WLS_JMS_INC_TARGET.HOST,
    ODI_WLS_JMS_INC_TARGET.POST,
    ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_NAME,
    ODI_WLS_JMS_INC_TARGET.JMS_NAME,
    ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_TARGET,
    ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_HEALTH,
    ODI_WLS_JMS_INC_TARGET.MESSAGES_CURRENT_COUNT,
    ODI_WLS_JMS_INC_TARGET.MESSAGES_PENDING_COUNT,
    ODI_WLS_JMS_INC_TARGET.CONSUMERS_CURRENT_COUNT,
    ODI_WLS_JMS_INC_TARGET.CONSUMERS_HIGH_COUNT,
    ODI_WLS_JMS_INC_TARGET.CONSUMERS_TOTAL_COUNT,
    ODI_WLS_JMS_INC_TARGET.MESSAGES_HIGH_COUNT,
    ODI_WLS_JMS_INC_TARGET.MESSAGES_RECEIVED_COUNT,
    ODI_WLS_JMS_INC_TARGET.OBJECT_VERSION_NUMBER,
    ODI_WLS_JMS_INC_TARGET.CREATION_DATE,
    ODI_WLS_JMS_INC_TARGET.CREATED_BY,
    ODI_WLS_JMS_INC_TARGET.LAST_UPDATE_DATE,
    ODI_WLS_JMS_INC_TARGET.LAST_UPDATED_BY,
    ODI_WLS_JMS_INC_TARGET.DATA_STATUS
from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET   ODI_WLS_JMS_INC_TARGET
where    exists  (
        select    SUB.ID,
            SUB.HOST
        from     TESTUSER.I$_ODI_WLS_JMS_INC_TARGET SUB
        where     SUB.ID=ODI_WLS_JMS_INC_TARGET.ID
            and SUB.HOST=ODI_WLS_JMS_INC_TARGET.HOST
        group by     SUB.ID,
            SUB.HOST
        having     count(1) > 1
        )

17.insert Not Null errors

插入主鍵爲空的行,若是是多個主鍵,該步驟執行屢次,每次一個主鍵字段

insert into TESTUSER.E$_ODI_WLS_JMS_INC_TARGET
(
    ODI_PK,
    ODI_SESS_NO,
    ODI_ROW_ID,
    ODI_ERR_TYPE,
    ODI_ERR_MESS,
    ODI_CHECK_DATE,
    ODI_ORIGIN,
    ODI_CONS_NAME,
    ODI_CONS_TYPE,
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
)
select
    SYS_GUID(),
    'abb01f39-16b9-41ba-9820-7733e137f237', 
    rowid,
    'F', 
    'ODI-15066: 列ID不能爲空值。',
    sysdate,
    '(171)mdsProject.My_increment',
    'ID',
    'NN',    
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
where    ID is null

18.create index on error table

爲E$_表建立索引

/* FLOW CONTROL CREATE INDEX ON THE E$TABLE */
create index     TESTUSER.E$_ODI_WLS_JMS_INC_TARGET_IDX 
on    TESTUSER.E$_ODI_WLS_JMS_INC_TARGET (ODI_ROW_ID)

19.delete errors from controlled table

從I$_表中刪除錯誤的行

delete from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET  T
where    exists     (
        select    1
        from    TESTUSER.E$_ODI_WLS_JMS_INC_TARGET E
        where ODI_SESS_NO = 'abb01f39-16b9-41ba-9820-7733e137f237'
        and T.rowid = E.ODI_ROW_ID
        )

20.insert check sum into check table

向TESTUSER.SNP_CHECK_TAB中記錄錯誤數據

insert into TESTUSER.SNP_CHECK_TAB
(
    SCHEMA_NAME,
    RESOURCE_NAME,
    FULL_RES_NAME,
    ERR_TYPE,
    ERR_MESS,
    CHECK_DATE,
    ORIGIN,
    CONS_NAME,
    CONS_TYPE,
    ERR_COUNT
)
select    
    'TESTUSER',
    'ODI_WLS_JMS_INC_TARGET',
    'TESTUSER.ODI_WLS_JMS_INC_TARGET',
    E.ODI_ERR_TYPE,
    E.ODI_ERR_MESS,
    E.ODI_CHECK_DATE,
    E.ODI_ORIGIN,
    E.ODI_CONS_NAME,
    E.ODI_CONS_TYPE,
    count(1) 
from    TESTUSER.E$_ODI_WLS_JMS_INC_TARGET E
where    E.ODI_ERR_TYPE    = 'F'
and    E.ODI_ORIGIN     = '(171)mdsProject.My_increment'
group by    E.ODI_ERR_TYPE,
    E.ODI_ERR_MESS,
    E.ODI_CHECK_DATE,
    E.ODI_ORIGIN,
    E.ODI_CONS_NAME,
    E.ODI_CONS_TYPE

21.Flag rows for update

設置I$_主鍵在目標表中存在的行IND_UPDATE 爲U,表示這些行須要更新,爲I表示須要插入

/* DETECTION_STRATEGY = NOT_EXISTS */
update    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
set    IND_UPDATE = 'U'
where    (ID, HOST)
    in    (
        select    ID,
            HOST
        from    TESTUSER.ODI_WLS_JMS_INC_TARGET
        )

22.Update existing rows

更新已存在的行

/* DETECTION_STRATEGY = NOT_EXISTS */
update    TESTUSER.ODI_WLS_JMS_INC_TARGET T
set     (
    T.POST,
    T.JMS_SERVICE_NAME,
    T.JMS_NAME,
    T.JMS_SERVICE_TARGET,
    T.JMS_SERVICE_HEALTH,
    T.MESSAGES_CURRENT_COUNT,
    T.MESSAGES_PENDING_COUNT,
    T.CONSUMERS_CURRENT_COUNT,
    T.CONSUMERS_HIGH_COUNT,
    T.CONSUMERS_TOTAL_COUNT,
    T.MESSAGES_HIGH_COUNT,
    T.MESSAGES_RECEIVED_COUNT,
    T.OBJECT_VERSION_NUMBER,
    T.CREATION_DATE,
    T.CREATED_BY,
    T.LAST_UPDATE_DATE,
    T.LAST_UPDATED_BY,
    T.DATA_STATUS
    ) =
        (
        select    S.POST,
            S.JMS_SERVICE_NAME,
            S.JMS_NAME,
            S.JMS_SERVICE_TARGET,
            S.JMS_SERVICE_HEALTH,
            S.MESSAGES_CURRENT_COUNT,
            S.MESSAGES_PENDING_COUNT,
            S.CONSUMERS_CURRENT_COUNT,
            S.CONSUMERS_HIGH_COUNT,
            S.CONSUMERS_TOTAL_COUNT,
            S.MESSAGES_HIGH_COUNT,
            S.MESSAGES_RECEIVED_COUNT,
            S.OBJECT_VERSION_NUMBER,
            S.CREATION_DATE,
            S.CREATED_BY,
            S.LAST_UPDATE_DATE,
            S.LAST_UPDATED_BY,
            S.DATA_STATUS
        from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET S
        where    T.ID    =S.ID
        and    T.HOST    =S.HOST
             )
where    (ID, HOST)
    in    (
        select    ID,
            HOST
        from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
        where    IND_UPDATE = 'U'
        )

23.Insert new rows

插入新的行

/* DETECTION_STRATEGY = NOT_EXISTS */
insert into     TESTUSER.ODI_WLS_JMS_INC_TARGET T
    (
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
    )
select     ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET S
where    IND_UPDATE = 'I'

24.Drop work table

刪除工做表

drop table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE purge

25.Drop flow table

刪除I$_表

drop table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET

一次增量同步完成

修改方案

根據上面的過程,能夠知道,若是想在增量同步中加入刪除操做,只要向 I$_表中插入目標表在工做表(C$_表)中不存在的數據,並記錄標識符爲D,在向目標表更新和插入數據後將標識符爲D的數據刪除便可

廢話很少,直接貼代碼

clipboard.png

在標記標識符爲U以前,向I$_表中插入須要刪除的數據

Insert deleted rows

/* DETECTION_STRATEGY = NOT_EXISTS */ 
insert into  <%=odiRef.getTable("L","INT_NAME","A")%> (  
 <%=odiRef.getColList("", "[COL_NAME]", ", ", "", "UK")%>,   IND_UPDATE ) 
 select    <%=odiRef.getColList("", "[COL_NAME]", ", ", "", "UK")%>,   
 'D' 
 from <%=odiRef.getTable("L", "TARG_NAME", "A")%> T2
 where NOT EXISTS    
 (   select 'X' from  <%=odiRef.getFrom(0)%>
  where  (<%=odiRef.getColList("","[EXPRESSION]\t= T2.[COL_NAME]", "\n\tand\t", "", "UK")%> ) )

Synchronize deletions

從目標表刪除I$_表中標識符爲D的數據

delete from <%=odiRef.getTable("L","TARG_NAME","A")%>
where exists (
        select     'X'
        from    <%=odiRef.getTable("L","INT_NAME","A")%> <%=odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> I
        where    <%=odiRef.getColList("", odiRef.getTable("L","TARG_NAME","A") + ".[COL_NAME] = I.[COL_NAME]", "\n\t\tand \t", "", "UK")%>
        and    IND_UPDATE = 'D'
    )

加完後從新測試接口,確認生效

相關文章
相關標籤/搜索