假定在源數據表中有一個字段會記錄數據的新增或修改時間,能夠經過它對數據在時間維度上進行排序。經過中間表記錄每次更新的時間戳,在下一個同步週期時,經過這個時間戳同步該時間戳之後的增量數據。這是時間戳增量同步。數據庫
可是時間戳增量同步不能對源數據庫中歷史數據的刪除操做進行同步,我就使用orcale物化視圖的方式進行刪除更新操做工具
說明:測試
在兩個數據庫中分別建立數據表,並經過腳本在源數據表中插入500萬條數據,完成後再以每秒一條的速度插入新數據,模擬生產環境。url
源數據表結構以下:spa
CREATE TABLE "OIM"."YG_TQ_FD_BASICINFO" ( "ID" NUMBER(10,0) NOT NULL ENABLE, "DECLAREDATE" VARCHAR2(16), "UPDATEDATE" VARCHAR2(16), "SECODE" VARCHAR2(40) NOT NULL ENABLE, "FRONTSYMBOL" VARCHAR2(40), "BACKSYMBOL" VARCHAR2(40), "SYMBOLCOMP" VARCHAR2(40), "SNAMECOMP" VARCHAR2(100), "ENABLED" VARCHAR2(20) NOT NULL ENABLE, "FDSNAME" VARCHAR2(100) NOT NULL ENABLE, "FDTYPE" VARCHAR2(100), "FDNAME" VARCHAR2(200), "FDTOTUNIT" NUMBER(16,4), "FDNATURE" VARCHAR2(40), "INVESTSTYLE" VARCHAR2(40), "INVESTGOAL" CLOB, "INVRULE" CLOB, "FDSTYLE" VARCHAR2(20), "FOUNDDATE" VARCHAR2(16), "LISTDATE" VARCHAR2(16), "ENDDATE" VARCHAR2(16), "KEEPERCODE" VARCHAR2(16), "KEEPERNAME" VARCHAR2(200), "TRUSTEECODE" VARCHAR2(16), "TRUSTEENAME" VARCHAR2(200), "MANAGERNAME" VARCHAR2(400), "DECISIONRULE" CLOB, "DECISIONPROC" CLOB, "DISTRIBUTPRIN" CLOB, "INVESTRANGE" CLOB, "INVESTPOLICY" CLOB, "INVESTSTD" CLOB, "RISKTYPE" CLOB, "RISKINDEX" CLOB, "FDINTRO" CLOB, "FDEVOLUTION" CLOB, "DISCLOSUREPEOPLE" VARCHAR2(60), "DISCLOSUREPHONE" VARCHAR2(100), "EXISTBEGDATE" VARCHAR2(16), "EXISTENDDATE" VARCHAR2(16), "PRIEXISTENDDATE" VARCHAR2(16), "EXSITPERIOD" NUMBER(16,2), "EXCHANGE" VARCHAR2(20) NOT NULL ENABLE, "FDSERIESCODE" VARCHAR2(12), "FDSERIESNAME" VARCHAR2(200), "TRASTYPE" VARCHAR2(20), "EMONOVERDATE" VARCHAR2(20), "SUBCONFDATE" VARCHAR2(20), "REDCONFDATE" VARCHAR2(20), "REDPAYDATE" VARCHAR2(20), "OUTSUBBEGDATE" VARCHAR2(16), "OUTSUBENDDATE" VARCHAR2(16), "INSUBBEGDATE" VARCHAR2(16), "INREDENDDATE" VARCHAR2(16), "FDINVCATEGORY" VARCHAR2(20), "FDMETHOD" VARCHAR2(20) NOT NULL ENABLE, "MEMO" CLOB, "TOTSHARE" NUMBER(18,6), "ISSHARESTAT" NUMBER(10,0) NOT NULL ENABLE, "ISSTAT" NUMBER(10,0) NOT NULL ENABLE, "ISVALID" NUMBER(10,0) NOT NULL ENABLE, "TMSTAMP" RAW(8) NOT NULL ENABLE, "ENTRYDATE" DATE, "ENTRYTIME" VARCHAR2(16), "TRADPLACE" VARCHAR2(20), "SECURITYID" VARCHAR2(40) NOT NULL ENABLE, "NAVPUBTYPE" VARCHAR2(20), "PROFITPAYTYPE" VARCHAR2(20), "OPERATEPERIOD" NUMBER(10,0), "OPERATEPERIODUNIT" VARCHAR2(20), "FSYMBOL" VARCHAR2(40) NOT NULL ENABLE, "KEEPERSNAME" VARCHAR2(100) NOT NULL ENABLE, "KEEPERINITIALS" VARCHAR2(20) NOT NULL ENABLE, "LIQUBEGINDATE" VARCHAR2(16), "LIQUENDDATE" VARCHAR2(16), "CREDITLINEID" NUMBER(10,0), CONSTRAINT "PK_YG_TQ_FD_BASICINFO" PRIMARY KEY ("ID") USING INDEX PCTFREE 10 INITRANS 2 MAXTRANS 255 COMPUTE STATISTICS STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT) TABLESPACE "OIMTS" ENABLE ) SEGMENT CREATION IMMEDIATE PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 NOCOMPRESS LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT) TABLESPACE "OIMTS" LOB ("INVESTGOAL") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("INVRULE") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("DECISIONRULE") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("DECISIONPROC") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("DISTRIBUTPRIN") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("INVESTRANGE") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("INVESTPOLICY") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("INVESTSTD") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("RISKTYPE") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("RISKINDEX") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("FDINTRO") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("FDEVOLUTION") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) LOB ("MEMO") STORE AS BASICFILE ( TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION NOCACHE LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT))
做業的最終截圖以下:code
4.1 建立做業和DB鏈接對象
打開Spoon工具,新建做業,而後在左側主對象樹DB鏈接中新建DB鏈接。建立鏈接並測試經過後能夠在左側DB鏈接下右鍵共享出來。由於在單個做業或者轉換中新建的DB鏈接都是局域數據源,在其餘轉換和做業中是不能使用的,即便屬於同一個做業下的不一樣轉換,因此須要把他們共享,這樣DB鏈接就會成爲全局數據源,不用屢次編輯。blog
這一步是爲了在目標數據庫建中間表ETL_POSITION,並插入初始的時間戳字段。由於該做業在生產環境是循環調用的,該步驟在每個同步週期中都會調用,因此在建表時須要判斷該表是否已經存在,若是不存在才建表。排序
SQL代碼和組件配置截圖以下:事件
create table ${SCHEMA_NAME}.ETL_POSITION ( table_name VARCHAR2(100), last_position_time TIMESTAMP(6), last_position number(30), progress_history VARCHAR2(200), current_progress VARCHAR2(30), target_url VARCHAR2(400), target_table_name VARCHAR2(100), schema_name VARCHAR2(100) ); -- Add comments to the table comment on table ${SCHEMA_NAME}.ETL_POSITION is 'ETL事件惟一標示'; -- Add comments to the columns comment on column ${SCHEMA_NAME}.ETL_POSITION.table_name is '表的名稱'; comment on column ${SCHEMA_NAME}.ETL_POSITION.last_position_time is '最後一次同步的日期'; comment on column ${SCHEMA_NAME}.ETL_POSITION.last_position_time is '最後一次同步的位置ID'; comment on column ${SCHEMA_NAME}.ETL_POSITION.progress_history is ' UNKNOW, MARK, FULLING, INCING, CLEAR, FAILED, SUCCESS;'; comment on column ${SCHEMA_NAME}.ETL_POSITION.current_progress is '當前狀態'; comment on column ${SCHEMA_NAME}.ETL_POSITION.target_url is '目標表的URL(爲了多數據源複製)'; comment on column ${SCHEMA_NAME}.ETL_POSITION.target_table_name is '目標表表名(爲了多數據源複製)'; comment on column ${SCHEMA_NAME}.ETL_POSITION.schema_name is '表所在空間'; -- Create/Recreate indexes create unique index ${SCHEMA_NAME}.ETL_POWEIYI on ${SCHEMA_NAME}.ETL_POSITION (TABLE_NAME, SCHEMA_NAME, TARGET_URL,TARGET_TABLE_NAME);
-- 判斷是否有事件表
select * from all_tables where TABLE_NAME = 'ETL_POSITION' AND OWNER='${SCHEMA_NAME}'
建立物化視圖
CREATE MATERIALIZED VIEW LOG ON ${SCHEMA_NAME}.${TABLE_NAME} with primary key, sequence;
判斷是否有物化視圖
select * from all_mview_logs where master = '${TABLE_NAME}' and log_owner = '${SCHEMA_NAME}'
在做業中設置變量
具體看代碼吧