基於OGG的Oracle與Hadoop集羣準實時同步介紹

版權聲明:本文由王亮原創文章,轉載請註明出處: 
文章原文連接:https://www.qcloud.com/community/article/220html

來源:騰雲閣 https://www.qcloud.com/communityjava

 

Oracle裏存儲的結構化數據導出到Hadoop體系作離線計算是一種常見數據處置手段。近期有場景須要作Oracle到Hadoop體系的實時導入,這裏以此案例作以介紹。
Oracle做爲商業化的數據庫解決方案,自發性的獲取數據庫事務日誌等比較困難,故選擇官方提供的同步工具OGG(Oracle GoldenGate)來解決。sql

安裝與基本配置

環境說明

軟件配置數據庫

角色 數據存儲服務及版本 OGG版本 IP
源服務器 OracleRelease11.2.0.1 Oracle GoldenGate 11.2.1.0 for Oracle on Linux x86-64 10.0.0.25
目標服務器 Hadoop 2.7.2 Oracle GoldenGate for Big Data 12.2.0.1 on Linux x86-64 10.0.0.2

以上源服務器上OGG安裝在Oracle用戶下,目標服務器上OGG安裝在root用戶下。apache

注意

Oracle導出到異構的存儲系統,如MySQL,DB2,PG等以及對應的不一樣平臺,如AIX,Windows,Linux等官方都有提供對應的Oracle GoldenGate版本,可在這裏或者在舊版本查詢下載安裝。json

Oracle源端基礎配置

將下載到的對應OGG版本放在方便的位置並解壓,本示例Oracle源端最終的解壓目錄爲/u01/gg。bootstrap

  1. 配置環境變量
    這裏的環境變量主要是對執行OGG的用戶添加OGG相關的環境變量,本示例爲Oracle用戶添加的環境變量以下:(/home/oracle/.bash_profile文件)vim

    export OGG_HOME=/u01/gg/
    export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$OGG_HOME:/lib:/usr/lib
    export CLASSPATH=$ORACLE_HOME/jdk/jre:$ORACLE_HOME/jlib:$ORACLE_HOME/rdbms/jlib
  2. Oracle打開歸檔模式
    使用以下命令查看當前是否爲歸檔模式(archive)centos

    SQL> archive log list 
    Database log mode              Archive Mode
    Automatic archival             Enabled
    Archive destination            /u01/arch_log
    Oldest online log sequence     6
    Next log sequence to archive   8
    Current log sequence           8

    如非以上狀態,手動調整便可bash

    SQL> conn / as sysdba(以DBA身份鏈接數據庫) 
    SQL> shutdown immediate(當即關閉數據庫)
    SQL> startup mount(啓動實例並加載數據庫,但不打開)
    SQL> alter database archivelog(更改數據庫爲歸檔模式)
    SQL> alter database open(打開數據庫)
    SQL> alter system archive log start(啓用自動歸檔)
  3. Oracle打開日誌相關
    OGG基於輔助日誌等進行實時傳輸,故須要打開相關日誌確保可獲取事務內容。經過一下命令查看當前狀態:

    SQL> select force_logging, supplemental_log_data_min from v$database;
    FOR SUPPLEME--- --------
    YES YES

    若是以上查詢結果非YES,可經過如下命令修改狀態:

    SQL> alter database force logging;
    SQL> alter database add supplemental log data;
  4. Oracle建立複製用戶
    爲了使Oracle裏用戶的複製權限更加單純,故專門建立複製用戶,並賦予dba權限

    SQL> create tablespaceoggtbsdatafile '/u01/app/oracle/oradata/orcl/oggtbs01.dbf' size 1000M autoextend on;
    SQL> create user ggs identified by ggs default tablespaceoggtbs;
    User created.
    SQL> grant dba to ggs;
    Grant succeeded.

    最終這個ggs賬號的權限以下所示:

    SQL> select * from dba_sys_privs where GRANTEE='GGS';
    GRANTEE                        PRIVILEGE                                ADM
    GGS                            DROP ANY DIRECTORY                       NO
    GGS                            ALTER ANY TABLE                          NO
    GGS                            ALTER SESSION                            NO
    GGS                            SELECT ANY DICTIONARY                    NO
    GGS                            CREATE ANY DIRECTORY                     NO
    GGS                            RESTRICTED SESSION                       NO
    GGS                            FLASHBACK ANY TABLE                      NO
    GGS                            UPDATE ANY TABLE                         NO
    GGS                            DELETE ANY TABLE                         NO
    GGS                            CREATE TABLE                             NO
    GGS                            INSERT ANY TABLE                         NO
    GRANTEE                        PRIVILEGE                                ADM
    GGS                            UNLIMITED TABLESPACE                     NO
    GGS                            CREATE SESSION                           NO
    GGS                            SELECT ANY TABLE                         NO
  5. OGG初始化
    進入OGG的主目錄執行./ggsci,進入OGG命令行

    [oracle@VM_0_25_centos gg]$ ./ggsci 
    Oracle GoldenGate Command Interpreter for Oracle
    Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
    Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
    Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
    GGSCI (VM_0_25_centos) 1>
    執行create subdirs進行目錄建立
    GGSCI (VM_0_25_centos) 4> create subdirs
    Creating subdirectories under current directory /u01/gg
    Parameter files                /u01/gg/dirprm: already exists
    Report files                   /u01/gg/dirrpt: already exists
    Checkpoint files               /u01/gg/dirchk: already exists
    Process status files           /u01/gg/dirpcs: already exists
    SQL script files               /u01/gg/dirsql: already exists
    Database definitions files     /u01/gg/dirdef: already exists
    Extract data files             /u01/gg/dirdat: already exists
    Temporary files                /u01/gg/dirtmp: already exists
    Stdout files                   /u01/gg/dirout: already exists
  6. Oracle建立模擬複製庫表
    模擬建一個用戶叫tcloud,密碼tcloud,同時基於這個用戶建一張表,叫t_ogg

    SQL> create user tcloud  identified by tcloud default tablespace users;
    User created.
    SQL> grant dba to tcloud;
    Grant succeeded.
    SQL> conn tcloud/tcloud;
    Connected.
    SQL> create table t_ogg(id int ,text_name varchar(20),primary key(id));
    Table created.

目標端基礎配置

將下載到的對應OGG版本放在方便的位置並解壓,本示例Oracle目標端最終的解壓目錄爲/data/gg

  1. 配置環境變量
    這裏須要用到HDFS相關的庫,故須要配置java環境變量以及OGG相關,並引入HDFS的相關庫文件,參考配置以下:

    export JAVA_HOME=/usr/java/jdk1.7.0_75/
    export LD_LIBRARY_PATH=/usr/java/jdk1.7.0_75/jre/lib/amd64:/usr/java/jdk1.7.0_75/jre/lib/amd64/server:/usr/java/jdk1.7.0_75/jre/lib/amd64/libjsig.so:/usr/java/jdk1.7.0_75/jre/lib/amd64/server/libjvm.so:$OGG_HOME:/lib
    export OGG_HOME=/data/gg
  2. OGG初始化
    目標端的OGG初始化和源端相似進入OGG的主目錄執行./ggsci,進入OGG命令行

    GGSCI (10.0.0.2) 2> create subdirs
    Creating subdirectories under current directory /data/gg
    Parameter files                /data/gg/dirprm: already exists
    Report files                   /data/gg/dirrpt: already exists
    Checkpoint files               /data/gg/dirchk: already exists
    Process status files           /data/gg/dirpcs: already exists
    SQL script files               /data/gg/dirsql: already exists
    Database definitions files     /data/gg/dirdef: already exists
    Extract data files             /data/gg/dirdat: already exists
    Temporary files                /data/gg/dirtmp: already exists
    Credential store files         /data/gg/dircrd: already exists
    Masterkey wallet files         /data/gg/dirwlt: already exists
    Dump files                     /data/gg/dirdmp: already exists

Oracle源配置

Oracle實時傳輸到Hadoop集羣(HDFS,Hive,Kafka等)的基本原理如圖:

根據如上原理,配置大概分爲以下步驟:源端目標端配置ogg管理器(mgr);源端配置extract進程進行Oracle日誌抓取;源端配置pump進程傳輸抓取內容到目標端;目標端配置replicate進程複製日誌到Hadoop集羣或者複製到用戶自定義的解析器將最終結果落入到Hadoop集羣。

配置全局變量

在源端服務器OGG主目錄下,執行./ggsci到OGG命令行下,執行以下命令:

GGSCI (VM_0_25_centos) 1> dblogin userid ggs password ggs
Successfully logged into database.
GGSCI (VM_0_25_centos) 3> view params ./globals
ggschema ggs

其中./globals變量沒有的話能夠用edit params ./globals來編輯添加便可(編輯器默認使用的vim)

配置管理器mgr

在OGG命令行下執行以下命令:

GGSCI (VM_0_25_centos) 4> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

說明:PORT即mgr的默認監聽端口;DYNAMICPORTLIST動態端口列表,當指定的mgr端口不可用時,會在這個端口列表中選擇一個,最大指定範圍爲256個;AUTORESTART重啓參數設置表示重啓全部EXTRACT進程,最多5次,每次間隔3分鐘;PURGEOLDEXTRACTS即TRAIL文件的按期清理
在命令行下執行start mgr便可啓動管理進程,經過info mgr可查看mgr狀態

GGSCI (VM_0_25_centos) 5> info mgr
Manager is running (IP port VM_0_25_centos.7809).

添加複製表

在OGG命令行下執行添加須要複製的表的操做,以下:

GGSCI (VM_0_25_centos) 7> add trandata tcloud.t_ogg
Logging of supplemental redo data enabled for table TCLOUD.T_OGG.
GGSCI (VM_0_25_centos) 8> info trandata tcloud.t_ogg
Logging of supplemental redo log data is enabled for table TCLOUD.T_OGG.
Columns supplementally logged for table TCLOUD.T_OGG: ID.

配置extract進程

配置extract進程OGG命令行下執行以下命令:

GGSCI (VM_0_25_centos) 10> edit params ext2hd
extract ext2hd
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ggs,password ggs
exttrail /u01/gg/dirdat/tc
table tcloud.t_ogg;

說明:第一行指定extract進程名稱;dynamicresolution動態解析;SETENV設置環境變量,這裏分別設置了Oracle數據庫以及字符集;userid ggs,password ggs即OGG鏈接Oracle數據庫的賬號密碼,這裏使用2.3.4中特地建立的複製賬號;exttrail定義trail文件的保存位置以及文件名,注意這裏文件名只能是2個字母,其他部分OGG會補齊;table即複製表的代表,支持*通配,必須以;結尾
接下來在OGG命令行執行以下命令添加extract進程:

GGSCI (VM_0_25_centos) 11> add extract ext2hd,tranlog,begin now
EXTRACT added.

最後添加trail文件的定義與extract進程綁定:

GGSCI (VM_0_25_centos) 12> add exttrail /u01/gg/dirdat/tc,extract ext2hd
EXTTRAIL added

可在OGG命令行下經過info命令查看狀態:

GGSCI (VM_0_25_centos) 14> info ext2hd
EXTRACT    EXT2HD    Initialized   2016-11-09 15:37   Status STOPPED
Checkpoint Lag       00:00:00 (updated 00:02:32 ago)
Log Read Checkpoint  Oracle Redo Logs
                     2016-11-09 15:37:14  Seqno 0, RBA 0
                     SCN 0.0 (0)

配置pump進程

pump進程本質上來講也是一個extract,只不過他的做用僅僅是把trail文件傳遞到目標端,配置過程和extract進程相似,只是邏輯上稱之爲pump進程
在OGG命令行下執行:

GGSCI (VM_0_25_centos) 16> edit params push2hd
extract push2hd
passthru
dynamicresolution
userid ggs,password ggs
rmthost 10.0.0.2 mgrport 7809
rmttrail /data/gg/dirdat/tc
table tcloud.t_ogg;

說明:第一行指定extract進程名稱;passthru即禁止OGG與Oracle交互,咱們這裏使用pump邏輯傳輸,故禁止便可;dynamicresolution動態解析;userid ggs,password ggs即OGG鏈接Oracle數據庫的賬號密碼,這裏使用2.3.4中特地建立的複製賬號;rmthost和mgrhost即目標端OGG的mgr服務的地址以及監聽端口;rmttrail即目標端trail文件存儲位置以及名稱
分別將本地trail文件和目標端的trail文件綁定到extract進程:

GGSCI (VM_0_25_centos) 17> add extract push2hd,exttrailsource /u01/gg/dirdat/tc
EXTRACT added.
GGSCI (VM_0_25_centos) 18> add rmttrail /data/gg/dirdat/tc,extract push2hd
RMTTRAIL added.

一樣能夠在OGG命令行下使用info查看進程狀態:

GGSCI (VM_0_25_centos) 19> info push2hd

EXTRACT    PUSH2HD   Initialized   2016-11-09 15:52   Status STOPPED
Checkpoint Lag       00:00:00 (updated 00:01:04 ago)
Log Read Checkpoint  File /u01/gg/dirdat/tc000000
                     First Record  RBA 0

配置define文件

Oracle與MySQL,Hadoop集羣(HDFS,Hive,kafka等)等之間數據傳輸能夠定義爲異構數據類型的傳輸,故須要定義表之間的關係映射,在OGG命令行執行:

GGSCI (VM_0_25_centos) 20> edit params tcloud
defsfile /u01/gg/dirdef/tcloud.t_ogg
userid ggs,password ggs
table tcloud.t_ogg;

在OGG主目錄下執行:
./defgen paramfile dirprm/tcloud.prm
完成以後會生成這樣的文件/u01/gg/dirdef/tcloud.t_ogg,將這個文件拷貝到目標端的OGG主目錄下的dirdef目錄便可。

目標端的配置

建立目標表(目錄)

這裏主要是當目標端爲HDFS目錄或者Hive表或者MySQL數據庫時須要手動先在目標端建立好目錄或者表,建立方法都相似,這裏咱們模擬實時傳入到HDFS目錄,故手動建立一個接收目錄便可
hadoop –fs mkdir /gg/replication/hive/

配置管理器mgr

目標端的OGG管理器(mgr)和源端的配置相似,在OGG命令行下執行:

GGSCI (10.0.0.2) 2> edit params mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

配置checkpoint

checkpoint即複製可追溯的一個偏移量記錄,在全局配置裏添加checkpoint表便可

GGSCI (10.0.0.2) 5> edit  params  ./GLOBALS
CHECKPOINTTABLE tcloud.checkpoint

保存便可

配置replicate進程

在OGG的命令行下執行:

GGSCI (10.0.0.2) 8> edit params r2hdfs
REPLICAT r2hdfs
sourcedefs /data/gg/dirdef/tcloud.t_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/hdfs.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP tcloud.t_ogg, TARGET tcloud.t_ogg;

說明:REPLICATE r2hdfs定義rep進程名稱;sourcedefs即在3.6中在源服務器上作的表映射文件;TARGETDB LIBFILE即定義HDFS一些適配性的庫文件以及配置文件,配置文件位於OGG主目錄下的dirprm/hdfs.props;REPORTCOUNT即複製任務的報告生成頻率;GROUPTRANSOPS爲以事務傳輸時,事務合併的單位,減小IO操做;MAP即源端與目標端的映射關係
其中property=dirprm/hdfs.props的配置中,最主要的幾項配置及註釋以下:

gg.handlerlist=hdfs //OGG for Big Data中handle類型
gg.handler.hdfs.type=hdfs //OGG for Big Data中HDFS目標
gg.handler.hdfs.rootFilePath=/gg/replication/hive/ //OGG for Big Data中HDFS存儲主目錄
gg.handler.hdfs.mode=op //OGG for Big Data中傳輸模式,即op爲一次SQL傳輸一次,tx爲一次事務傳輸一次
gg.handler.hdfs.format=delimitedtext //OGG for Big Data中文件傳輸格式
gg.classpath=/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/common/*:/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/common/lib/*:/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/hdfs/*:/usr/hdp/2.2.0.0-2041/hadoop/etc/hadoop/:/data/gg/:/data/gg/lib/*:/usr/hdp/2.2.0.0-2041/hadoop/client/* //OGG for Big Data中使用到的HDFS庫的定義

具體的OGG for Big Data支持參數以及定義可參考地址

最後在OGG的命令行下執行:

GGSCI (10.0.0.2) 9> add replicat r2hdfs exttrail /data/gg/dirdat/tc,checkpointtable tcloud.checkpointtab
REPLICAT added.

將文件與複製進程綁定便可

測試

啓動進程

在源端和目標端的OGG命令行下使用start [進程名]的形式啓動全部進程。
啓動順序按照源mgr——目標mgr——源extract——源pump——目標replicate來完成。

檢查進程狀態

以上啓動完成以後,可在源端與目標端的OGG命令行下使用info [進程名]來查看全部進程狀態,以下:
源端:

GGSCI (VM_0_25_centos) 7> info mgr
Manager is running (IP port VM_0_25_centos.7809).
GGSCI (VM_0_25_centos) 9> info ext2hd
EXTRACT    EXT2HD    Last Started 2016-11-09 16:05   Status RUNNING
Checkpoint Lag       00:00:00 (updated 00:00:09 ago)
Log Read Checkpoint  Oracle Redo Logs
                     2016-11-09 16:45:51  Seqno 8, RBA 132864000
                     SCN 0.1452333 (1452333)
GGSCI (VM_0_25_centos) 10> info push2hd
EXTRACT    PUSH2HD   Last Started 2016-11-09 16:05   Status RUNNING
Checkpoint Lag       00:00:00 (updated 00:00:01 ago)
Log Read Checkpoint  File /u01/gg/dirdat/tc000000
                     First Record  RBA 1043

目標端:

GGSCI (10.0.0.2) 13> info mgr 
Manager is running (IP port 10.0.0.2.7809, Process ID 8242).

GGSCI (10.0.0.2) 14> info r2hdfs
REPLICAT   R2HDFS    Last Started 2016-11-09 16:45   Status RUNNING
Checkpoint Lag       00:00:00 (updated 00:00:02 ago)
Process ID           4733
Log Read Checkpoint  File /data/gg/dirdat/tc000000
                     First Record  RBA 0

全部的狀態均是RUNNING便可。(固然也可使用info all來查看全部進程狀態)

測試同步更新效果

測試方法比較簡單,直接在源端的數據表中insert,update,delete操做便可。因爲Oracle到Hadoop集羣的同步是異構形式,目前尚不支持truncate操做。
源端進行insert操做

SQL> conn tcloud/tcloud
Connected.
SQL> select * from t_ogg;
no rows selected
SQL> desc t_ogg;
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 ID                                        NOT NULL NUMBER(38)
 TEXT_NAME                                          VARCHAR2(20)
SQL> insert into t_ogg values(1,'test');
1 row created.
SQL> commit;
Commit complete.

查看源端trail文件狀態

[oracle@VM_0_25_centos dirdat]$ ls -l /u01/gg/dirdat/tc*
-rw-rw-rw- 1 oracle oinstall 1180 Nov  9 17:05 /u01/gg/dirdat/tc000000

查看目標端trail文件狀態

[root@10 dirdat]# ls -l /data/gg/dirdat/tc*      
-rw-r----- 1 root root 1217 Nov  9 17:05 /data/gg/dirdat/tc000000

查看HDFS中是否有寫入

hadoop fs -ls /gg/replication/hive/tcloud.t_ogg
-rw-rw-r--   3 root hdfs        110 2016-11-09 17:05
/gg/replication/hive/tcloud.t_ogg/tcloud.t_ogg_2016-11-09_17-05-30.514.txt

注意:從寫入到HDFS的文件內容看,文件的格式以下:

ITCLOUD.T_OGG2016-11-09 09:05:25.0670822016-11-09T17:05:30.51200000000000000000001080ID1TEXT_NAMEtest

很明顯Oracle的數據已準實時導入到HDFS了。導入的內容實際是一條條的相似流水日誌(具體日誌格式不一樣的傳輸格式,內容略有差別,本例使用的delimitedtext。格式爲操做符 數據庫.表名 操做時間戳(GMT+0) 當前時間戳(GMT+8) 偏移量 字段1名稱 字段1內容 字段2名稱 字段2內容),若是要和Oracle的表內容徹底一致,須要客戶手動實現解析日誌並寫入到Hive的功能,這裏官方並無提供適配器。目前騰訊側已實現該功能的開發。
固然你能夠直接把這個HDFS的路徑經過LOCATION的方式在Hive上建外表(external table)達到實時導入Hive的目的。

總結

OGG for Big Data實現了Oracle實時同步到Hadoop體系的接口,但獲得的日誌目前仍需應用層來解析(關係型數據庫如MySQL時OGG對應版本已實現應用層的解析,無需人工解析)。
OGG的幾個主要進程mgr,extract,pump,replicate配置方便,可快速配置OGG與異構關係存儲結構的實時同步。後續若是有新增表,修改對應的extract,pump和replicate進程便可,固然若是是一整個庫,在配置上述2個進程時,使用通配的方式便可。

附錄

OGG到Hadoop體系的實時同步時,可在源端extract和pump進程配置不變的狀況下,直接在目標端增長replicate進程的方式,增長同步目標,如下簡單介紹本示例中增長同步到Kafka的配置方法。
本示例中extract,pump進程都是現成的,無需再添加。只須要在目標端增長同步到Kafka的replicate進程便可。

在OGG的命令行下執行:

GGSCI (10.0.0.2) 4> edit params r2kafka
REPLICAT r2kafka
sourcedefs /data/gg/dirdef/tcloud.t_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/r2kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP tcloud.t_ogg, TARGET tcloud.t_ogg;

replicate進程和導入到HDFS的配置相似,差別是調用不一樣的配置dirprm/r2kafka.props。這個配置的主要配置以下:

gg.handlerlist = kafkahandler //handler類型
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關配置
gg.handler.kafkahandler.TopicName =ggtopic //kafka的topic名稱,無需手動建立
gg.handler.kafkahandler.format =json //傳輸文件的格式,支持json,xml等
gg.handler.kafkahandler.mode =op  //OGG for Big Data中傳輸模式,即op爲一次SQL傳輸一次,tx爲一次事務傳輸一次
gg.classpath=dirprm/:/usr/hdp/2.2.0.0-2041/kafka/libs/*:/data/gg/:/data/gg/lib/* //相關庫文件的引用

r2kafka.props引用的custom_kafka_producer.properties定義了Kafka的相關配置以下:

bootstrap.servers=10.0.0.62:6667 //kafkabroker的地址
acks=1
compression.type=gzip //壓縮類型
reconnect.backoff.ms=1000 //重連延時

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

以上配置以及其餘可配置項可參考地址

以上配置完成後,在OGG命令行下添加trail文件到replicate進程並啓動導入到Kafka的replicate進程

GGSCI (10.0.0.2) 5> add replicat r2kafka exttrail
/data/gg/dirdat/tc,checkpointtable tcloud.checkpoint
REPLICAT added.
GGSCI (10.0.0.2) 6> start r2kafka
Sending START request to MANAGER ...
REPLICAT R2KAFKA starting
GGSCI (10.0.0.2) 10> info r2kafka

REPLICAT   R2KAFKA   Last Started 2016-11-09 17:59   Status RUNNING
Checkpoint Lag       00:00:00 (updated 00:00:09 ago)
Process ID           5236
Log Read Checkpoint  File /data/gg/dirdat/tc000000
                     2016-11-09 17:05:25.067082  RBA 1217

檢查實時同步到kafka的效果,在Oracle源端更新表的同時,使用kafka客戶端自帶的腳本去查看這裏配置的ggtopic這個kafkatopic下的消息:

SQL> insert into t_ogg values(2,'test2');
1 row created.
SQL> commit;
Commit complete.

目標端Kafka的同步狀況:

[root@10 kafka]# bin/kafka-console-consumer.sh --zookeeper  10.0.0.223:2181  --
from-beginning --topic ggtopic
{"table":"TCLOUD.T_OGG","op_type":"I","op_ts":"2016-11-09
09:05:25.067082","current_ts":"2016-11-
09T17:59:20.943000","pos":"00000000000000001080","after":
{"ID":"1","TEXT_NAME":"test"}}
{"table":"TCLOUD.T_OGG","op_type":"I","op_ts":"2016-11-09 
10:02:06.827204","current_ts":"2016-11-
09T18:02:12.323000","pos":"00000000000000001217","after":
{"ID":"2","TEXT_NAME":"test2"}}

顯然,Oracle的數據已準實時同步到Kafka。從頭開始消費這個topic發現以前的同步信息也存在。架構上能夠直接接Storm,SparkStreaming等直接消費kafka消息進行業務邏輯的處理。
從Oracle實時同步到其餘的Hadoop集羣中,官方最新版本提供了HDFS,HBase,Flume和Kafka,相關配置可參考官網給出的例子配置便可。

參考文檔:
http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/toc.htm

相關文章
相關標籤/搜索