orcale增量全量實時同步mysql可支持多庫使用Kettle實現數據實時增量同步

 

1. 時間戳增量回滾同步

假定在源數據表中有一個字段會記錄數據的新增或修改時間,能夠經過它對數據在時間維度上進行排序。經過中間表記錄每次更新的時間戳,在下一個同步週期時,經過這個時間戳同步該時間戳之後的增量數據。這是時間戳增量同步。數據庫

可是時間戳增量同步不能對源數據庫中歷史數據的刪除操做進行同步,我就使用orcale物化視圖的方式進行刪除更新操做工具

說明:測試

  • 源數據表 須要被同步的數據表
  • 目標數據表 同步至的數據表
  • 中間表 存儲時間戳的表

2. 前期準備

在兩個數據庫中分別建立數據表,並經過腳本在源數據表中插入500萬條數據,完成後再以每秒一條的速度插入新數據,模擬生產環境。url

源數據表結構以下:spa

CREATE TABLE "OIM"."YG_TQ_FD_BASICINFO" 
   (    "ID" NUMBER(10,0) NOT NULL ENABLE, 
    "DECLAREDATE" VARCHAR2(16), 
    "UPDATEDATE" VARCHAR2(16), 
    "SECODE" VARCHAR2(40) NOT NULL ENABLE, 
    "FRONTSYMBOL" VARCHAR2(40), 
    "BACKSYMBOL" VARCHAR2(40), 
    "SYMBOLCOMP" VARCHAR2(40), 
    "SNAMECOMP" VARCHAR2(100), 
    "ENABLED" VARCHAR2(20) NOT NULL ENABLE, 
    "FDSNAME" VARCHAR2(100) NOT NULL ENABLE, 
    "FDTYPE" VARCHAR2(100), 
    "FDNAME" VARCHAR2(200), 
    "FDTOTUNIT" NUMBER(16,4), 
    "FDNATURE" VARCHAR2(40), 
    "INVESTSTYLE" VARCHAR2(40), 
    "INVESTGOAL" CLOB, 
    "INVRULE" CLOB, 
    "FDSTYLE" VARCHAR2(20), 
    "FOUNDDATE" VARCHAR2(16), 
    "LISTDATE" VARCHAR2(16), 
    "ENDDATE" VARCHAR2(16), 
    "KEEPERCODE" VARCHAR2(16), 
    "KEEPERNAME" VARCHAR2(200), 
    "TRUSTEECODE" VARCHAR2(16), 
    "TRUSTEENAME" VARCHAR2(200), 
    "MANAGERNAME" VARCHAR2(400), 
    "DECISIONRULE" CLOB, 
    "DECISIONPROC" CLOB, 
    "DISTRIBUTPRIN" CLOB, 
    "INVESTRANGE" CLOB, 
    "INVESTPOLICY" CLOB, 
    "INVESTSTD" CLOB, 
    "RISKTYPE" CLOB, 
    "RISKINDEX" CLOB, 
    "FDINTRO" CLOB, 
    "FDEVOLUTION" CLOB, 
    "DISCLOSUREPEOPLE" VARCHAR2(60), 
    "DISCLOSUREPHONE" VARCHAR2(100), 
    "EXISTBEGDATE" VARCHAR2(16), 
    "EXISTENDDATE" VARCHAR2(16), 
    "PRIEXISTENDDATE" VARCHAR2(16), 
    "EXSITPERIOD" NUMBER(16,2), 
    "EXCHANGE" VARCHAR2(20) NOT NULL ENABLE, 
    "FDSERIESCODE" VARCHAR2(12), 
    "FDSERIESNAME" VARCHAR2(200), 
    "TRASTYPE" VARCHAR2(20), 
    "EMONOVERDATE" VARCHAR2(20), 
    "SUBCONFDATE" VARCHAR2(20), 
    "REDCONFDATE" VARCHAR2(20), 
    "REDPAYDATE" VARCHAR2(20), 
    "OUTSUBBEGDATE" VARCHAR2(16), 
    "OUTSUBENDDATE" VARCHAR2(16), 
    "INSUBBEGDATE" VARCHAR2(16), 
    "INREDENDDATE" VARCHAR2(16), 
    "FDINVCATEGORY" VARCHAR2(20), 
    "FDMETHOD" VARCHAR2(20) NOT NULL ENABLE, 
    "MEMO" CLOB, 
    "TOTSHARE" NUMBER(18,6), 
    "ISSHARESTAT" NUMBER(10,0) NOT NULL ENABLE, 
    "ISSTAT" NUMBER(10,0) NOT NULL ENABLE, 
    "ISVALID" NUMBER(10,0) NOT NULL ENABLE, 
    "TMSTAMP" RAW(8) NOT NULL ENABLE, 
    "ENTRYDATE" DATE, 
    "ENTRYTIME" VARCHAR2(16), 
    "TRADPLACE" VARCHAR2(20), 
    "SECURITYID" VARCHAR2(40) NOT NULL ENABLE, 
    "NAVPUBTYPE" VARCHAR2(20), 
    "PROFITPAYTYPE" VARCHAR2(20), 
    "OPERATEPERIOD" NUMBER(10,0), 
    "OPERATEPERIODUNIT" VARCHAR2(20), 
    "FSYMBOL" VARCHAR2(40) NOT NULL ENABLE, 
    "KEEPERSNAME" VARCHAR2(100) NOT NULL ENABLE, 
    "KEEPERINITIALS" VARCHAR2(20) NOT NULL ENABLE, 
    "LIQUBEGINDATE" VARCHAR2(16), 
    "LIQUENDDATE" VARCHAR2(16), 
    "CREDITLINEID" NUMBER(10,0), 
     CONSTRAINT "PK_YG_TQ_FD_BASICINFO" PRIMARY KEY ("ID")
  USING INDEX PCTFREE 10 INITRANS 2 MAXTRANS 255 COMPUTE STATISTICS 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "OIMTS"  ENABLE
   ) SEGMENT CREATION IMMEDIATE 
  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 
 NOCOMPRESS LOGGING
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "OIMTS" 
 LOB ("INVESTGOAL") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("INVRULE") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("DECISIONRULE") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("DECISIONPROC") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("DISTRIBUTPRIN") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("INVESTRANGE") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("INVESTPOLICY") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("INVESTSTD") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("RISKTYPE") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("RISKINDEX") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("FDINTRO") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("FDEVOLUTION") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)) 
 LOB ("MEMO") STORE AS BASICFILE (
  TABLESPACE "OIMTS" ENABLE STORAGE IN ROW CHUNK 8192 RETENTION 
  NOCACHE LOGGING 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT))

3. 做業流程

  1. 開始組件
  2. 建時間戳中間表
  3. 獲取中間表的時間戳,並設置爲全局變量
  4. 抽取兩個數據表的時間戳及時間戳之後的數據進行比對,並根據比對結果進行刪除、新增或修改操做
  5. 刪除物化視圖中的須要刪除的數據
  6. 更新時間戳

4. 建立做業

做業的最終截圖以下:code

 

 

 

 

4.1 建立做業和DB鏈接對象

打開Spoon工具,新建做業,而後在左側主對象樹DB鏈接中新建DB鏈接。建立鏈接並測試經過後能夠在左側DB鏈接下右鍵共享出來。由於在單個做業或者轉換中新建的DB鏈接都是局域數據源,在其餘轉換和做業中是不能使用的,即便屬於同一個做業下的不一樣轉換,因此須要把他們共享,這樣DB鏈接就會成爲全局數據源,不用屢次編輯。blog

4.2 建時間戳中間表

這一步是爲了在目標數據庫建中間表ETL_POSITION,並插入初始的時間戳字段。由於該做業在生產環境是循環調用的,該步驟在每個同步週期中都會調用,因此在建表時須要判斷該表是否已經存在,若是不存在才建表。排序

SQL代碼和組件配置截圖以下:事件

create table ${SCHEMA_NAME}.ETL_POSITION
(
  table_name         VARCHAR2(100),
  last_position_time TIMESTAMP(6),
  last_position    number(30),
  progress_history   VARCHAR2(200),
  current_progress   VARCHAR2(30),
  target_url         VARCHAR2(400), 
  target_table_name  VARCHAR2(100),
  schema_name        VARCHAR2(100)
);
-- Add comments to the table 
comment on table ${SCHEMA_NAME}.ETL_POSITION
  is 'ETL事件惟一標示';
-- Add comments to the columns 
comment on column ${SCHEMA_NAME}.ETL_POSITION.table_name
  is '表的名稱';
comment on column ${SCHEMA_NAME}.ETL_POSITION.last_position_time
  is '最後一次同步的日期';
comment on column ${SCHEMA_NAME}.ETL_POSITION.last_position_time
  is '最後一次同步的位置ID';
comment on column ${SCHEMA_NAME}.ETL_POSITION.progress_history
  is ' UNKNOW, MARK, FULLING, INCING, CLEAR, FAILED, SUCCESS;';
comment on column ${SCHEMA_NAME}.ETL_POSITION.current_progress
  is '當前狀態';
comment on column ${SCHEMA_NAME}.ETL_POSITION.target_url
  is '目標表的URL(爲了多數據源複製)';
comment on column ${SCHEMA_NAME}.ETL_POSITION.target_table_name
  is '目標表表名(爲了多數據源複製)';
comment on column ${SCHEMA_NAME}.ETL_POSITION.schema_name
  is '表所在空間';
-- Create/Recreate indexes 
create unique index ${SCHEMA_NAME}.ETL_POWEIYI on ${SCHEMA_NAME}.ETL_POSITION (TABLE_NAME, SCHEMA_NAME, TARGET_URL,TARGET_TABLE_NAME);

-- 判斷是否有事件表
select * from all_tables where TABLE_NAME = 'ETL_POSITION' AND OWNER='${SCHEMA_NAME}'

建立物化視圖

CREATE MATERIALIZED VIEW LOG ON ${SCHEMA_NAME}.${TABLE_NAME} with primary key, sequence;

判斷是否有物化視圖

select * from all_mview_logs where master = '${TABLE_NAME}' and log_owner = '${SCHEMA_NAME}'

 

 在做業中設置變量

 

 

 

 

 

 具體看代碼吧

相關文章
相關標籤/搜索