OGG同步Oracle到Kafka(Kafka Connect Handler)

源端數據庫:Oracle11.2.0.4 雙節點RAC Service:BDEDW
downstream服務器:Oracle11.2.0.4 單節點 Service:BDEDWDS
OGG抽取端:12.2.0.1.1
OGG複製端:OGG for Big Data Version 12.3.1.1.1
目標端:Kafka 0.10.0.0
抽取方式:OGG Integrated經過downstream服務器抽取java

1、downstream服務器設置
1.Oracle安裝,略

2.歸檔模式打開數據庫:
alter system set log_archive_dest_1='LOCATION=/opt/app/oracle/archivelog VALID_FOR=(ONLINE_LOGFILE,PRIMARY_ROLE))';
alter system set log_archive_dest_state_1=enable;
shutdown immediate
startup mount
alter database archivelog;
alter database open

3.添加standby log
standby log添加原則:
1)standby redo log size >= source log file size
2)The number of standby log file groups >= The number of source online log file groups+1.So if you have "n" threads at the source database, each having "m" redo log groups, you should configure n*(m+1) redo log groups at the downstream mining database.
ALTER DATABASE ADD STANDBY LOGFILE THREAD 1 GROUP 13 ('/opt/app/oracle/oradata/BDEDWDS/standby_13.log') SIZE 8192M;
ALTER DATABASE ADD STANDBY LOGFILE THREAD 1 GROUP 14 ('/opt/app/oracle/oradata/BDEDWDS/standby_14.log') SIZE 8192M;
ALTER DATABASE ADD STANDBY LOGFILE THREAD 1 GROUP 15 ('/opt/app/oracle/oradata/BDEDWDS/standby_15.log') SIZE 8192M;
ALTER DATABASE ADD STANDBY LOGFILE THREAD 1 GROUP 16 ('/opt/app/oracle/oradata/BDEDWDS/standby_16.log') SIZE 8192M;
ALTER DATABASE ADD STANDBY LOGFILE THREAD 2 GROUP 17 ('/opt/app/oracle/oradata/BDEDWDS/standby_17.log') SIZE 8192M;
ALTER DATABASE ADD STANDBY LOGFILE THREAD 2 GROUP 18 ('/opt/app/oracle/oradata/BDEDWDS/standby_18.log') SIZE 8192M;
ALTER DATABASE ADD STANDBY LOGFILE THREAD 2 GROUP 19 ('/opt/app/oracle/oradata/BDEDWDS/standby_19.log') SIZE 8192M;
ALTER DATABASE ADD STANDBY LOGFILE THREAD 2 GROUP 20 ('/opt/app/oracle/oradata/BDEDWDS/standby_20.log') SIZE 8192M;

4.爲standby log指定歸檔目錄
alter system set log_archive_dest_2='LOCATION=/opt/app/oracle/standbyarch VALID_FOR=(STANDBY_LOGFILE,PRIMARY_ROLE)';
alter system set log_archive_dest_state_2=enable;

5.downstream服務器上建立OGG用戶(will be used to retrieve logical change records from the logmining server at the downstream mining database)
create user ogg identified by xxx;
grant connect,resource,dba to ogg;
exec dbms_goldengate_auth.grant_admin_privilege('OGG');(CREATE RULE、CREATE RULE SET、SELECT ANY TABLE、ALTER ANY TABLE、SELECT ANY TRANSACTION、CREATE JOB、EXECUTE ANY RULE SET、CREATE EVALUATION CONTEXT、ALTER SESSION、DEQUEUE ANY QUEUE、FLASHBACK ANY TABLE、SELECT_CATALOG_ROLE等權限)

6.添加Source庫的TNS:
BDEDWDS =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = downstream ip)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = BDEDWDS)
)
)數據庫

BDEDW =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = 源ip)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = BDEDW)
)
)

7.配置log_archive_config參數
ALTER SYSTEM SET LOG_ARCHIVE_CONFIG='DG_CONFIG=(BDEDW,BDEDWDS)' SCOPE=BOTH;

8.配置enable_goldengate_replication參數
alter system set enable_goldengate_replication=true;apache

2、源庫設置
1.歸檔模式打開數據庫
alter system set log_archive_dest_1='LOCATION=+DATA_SSD VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=BDEDW' sid='*'
alter system set log_archive_dest_state_1=enable;
....

2.同步Source庫的密碼文件到downstream服務器,並在downstream服務器上修改密碼文件名稱使其適合本機
scp ..

3.設置downstream服務器的TNS:
BDEDW =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = source ip)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = BDEDW)
)
)

BDEDWDS =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = downstream ip)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = BDEDWDS)
)
)

4.建立OGG用戶(will be used to fetch data and metadata from BDED)
create user ogg identified by xxx;
grant connect,resource,dba to ogg;
exec dbms_goldengate_auth.grant_admin_privilege('OGG');

5.配置log_archive_config參數
ALTER SYSTEM SET LOG_ARCHIVE_CONFIG='DG_CONFIG=(BDEDW,BDEDWDS)' SCOPE=BOTH;

6.設置archive參數,以傳日誌到downstream服務器
alter system set log_archive_dest_2='SERVICE=BDEDWDS ASYNC NOREGISTER VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAME=BDEDWDS' sid='*';
alter system set log_archive_dest_state_2=enable;json

3、OGG配置
源端:
1.配置MGR
port 3306
dynamicportlist 9001-9500
autostart er *
autorestart er *,retries 4,waitminutes 4
startupvalidationdelay 5
purgeoldextracts ./dirdat/*,usecheckpoints,minkeephours 12

2.建立用戶:
create identialstore
alter credentialstore add user ogg@BDEDW alias BDEDW
alter credentialstore add user ogg@BDEDWDS alias BDEDWDS

3.downstream服務器上註冊抽取進程
dblogin useridalias BDEDW
miningdblogin useridalias BDEDWDS
register extract kfk_e01 database
刪除抽取進程:unregister extract kfk_e01 database
查看抽取進程:SELECT CAPTURE_NAME,STATUS FROM DBA_CAPTURE;
數據庫中操做抽取進程:
exec dbms_capture_adm.stop_capture('OGG$CAP_KFK_E01', true);
exec dbms_capture_adm.drop_capture('OGG$CAP_KFK_E01', true);bootstrap


4.添加抽取進程
ADD EXTRACT KFK_E01 INTEGRATED TRANLOG, BEGIN NOW
ADD RMTTRAIL /oggbase/ogg108/OGG_KFK/dirdat/ka, EXTRACT KFK_E01, MEGABYTES 50bash

EXTRACT KFK_E01
RMTHOST xx.xxx.xxx.xxx, MGRPORT 3326
SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")
USERIDALIAS BDEDW
TRANLOGOPTIONS MININGUSERALIAS BDEDWDS
TRANLOGOPTIONS INTEGRATEDPARAMS (downstream_real_time_mine Y)
GETTRUNCATES
LOGALLSUPCOLS
UPDATERECORDFORMAT FULL
RMTTRAIL /oggbase/ogg108/OGG_KFK/dirdat/ka
TABLE NEWSUSER.CRM_CUSTOMER;服務器

複製端:
OGG for bigdata版須要java支持,這次配置使用的java版本爲1.8.0_161(若是低於1.8,要升級到1.8)
1.配置java環境變量
.bash_profile文件中添加以下配置(環境變量修改後,要重啓MGR):
export JAVA_HOME=/usr/lib/jvm/jre1.8.0_161
export PATH=$JAVA_HOME/bin:$PATH
export LD_LIBRARY_PATH=$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64:$LD_LIBRARY_PATH

2.配置MGR
port 3326
dynamicportlist 9001-9500
autostart er *
autorestart er *,retries 4,waitminutes 4
startupvalidationdelay 5
purgeoldextracts ./dirdat/*,usecheckpoints,minkeephours 12oracle

3.拷貝示例配置文件到$OGG_HOME/dirprm/下,並拷貝kafka相關lib包到$OGG_HOME/kafkalib/下
cp $OGG_HOME/AdapterExamples/big-data/kafka_connect/* $OGG_HOME/dirprm/
cp $KAFKA_HOME/libs/* $OGG_HOME/kafkalib/

4.修改kafka配置文件$OGG_HOME/dirprm/kc.props:
gg.handlerlist=kafkaconnect
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkaconnect.mode=tx
gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName}
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
gg.handler.kafkaconnect.messageFormatting=op
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=update
gg.handler.kafkaconnect.includeTableName=true
gg.handler.kafkaconnect.includeOpType=true
gg.handler.kafkaconnect.includeOpTimestamp=true
gg.handler.kafkaconnect.includeCurrentTimestamp=true
gg.handler.kafkaconnect.includePosition=true
gg.handler.kafkaconnect.includePrimaryKeys=true
gg.handler.kafkaconnect.includeTokens=true
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/oggbase/ogg108/OGG_KFK/kafkalib/*
javawriter.bootoptions=-Xmx2048m -Xms32m -Djava.class.path=ggjava/ggjava.jarapp

5.修改生產者配置文件$OGG_HOME/dirprm/custom_kafka_producer.properties
bootstrap.servers=kafka集羣節點ip
acks=1

#JSON Converter Settings
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

buffer.memory=33554432
batch.size=16384

6.添加複製進程,並按示例進行配置
add replicat KFK_R01, exttrail /oggbase/ogg108/OGG_KFK/dirdat/ka, begin now

7.啓動抽取端與複製端jvm

搭建過程當中遇到的問題: 1.OGG抽取端版本爲12.3.0.1.0時,報錯:ERROR OGG-00662 OCI Error OCI-22053: overflow error (status = 22053).一直沒解決掉,換爲12.2後問題再也不出現 2.OGG抽取端部署在ACFS上時,啓動抽取進程報錯:ERROR OGG-02079 Extract failed to login to the database as user ogg specified in the MININGUSER parameter because of error ORA-12154: TNS:could not resolve the connect identifier specified.但監聽是好的,OGG只要不部署在ACFS上就不會報這個錯誤。 3.OGG複製端啓動報錯:OGG-15051 Java or JNI exception:...nested exception is java.lang.NoSuchMethodError: oracle.goldengate.handler.kafkaconnect.GGConfig.originalsWithPrefix(Ljava/lang/String;)Ljava/util/Map... 緣由:kafka.props文件中gg.classpath變量設置不正確,或者kafka的jar包在gg.classpath變量指定的路徑下不存在 4.ERROR OGG-02171 Error reading LCR from data source. Status 509, data source type 0. ERROR OGG-02191 Incompatible record 101 in /oggbase/ogg108/OGG_KFK/dirdat/ka000000000, rba -2 when getting trail header. replicat中使用kafka handler時報此錯誤,改成kafka connect handler後就再也不報錯。此問題未解決 5.ERROR OGG-01816 Partial operations are not supported in this release. 緣由:抽取端沒有抽取完整的lob對象。當目標端爲非Oracle數據庫或者其餘須要完整lob信息的狀況時,抽取端應使用TRANLOGOPTIONS FETCHPARTIALLOB參數

相關文章
相關標籤/搜索