前面介紹了Oracle LogMiner配置使用以及使用LogMiner進行解析日誌文件性能,在這篇文章中將利用LogMiner進行數據同步,實現從源目標數據庫到目標數據庫之間的數據同步。因爲LogMiner支持的版本是8.1及以上,因此進行數據同步的Oracle數據庫版本也必須是8.1及以上。java
固然在本文中介紹的是LogMiner進行數據同步例子,也能夠利用LogMiner進行數據審計、數據操做追蹤等功能,因爲這些從操做原理來講是一致,在本文不作討論。sql
l 配置階段數據庫
一、 控制端:指定源端、目標端數據庫信息、LOGMINER同步時間等配置信息;oracle
l 獲取源端同步數據app
二、 控制檯:經過定時輪詢的方式檢測是否到達數據同步時間,若是是則進行數據同步,不然繼續進行輪詢;框架
三、 源數據庫:定時加載數據庫歸檔日誌文件到動態表v$logmnr_contents中;性能
四、 源數據庫:根據條件讀取指定sql語句;測試
l 目標端數據入庫ui
五、 源數據庫:執行sql語句。spa
在該Demo項目中須要引入Oracle JDBC驅動包,具體項目分爲四個類:
1. Start.java:程序入口方法;
2. SyncTask.java:數據同步Demo核心,生成字典文件和讀取日誌文件、目標數據庫執行SQL語句等;
3. DataBase.java:數據庫操做基礎類;
4. Constants.java:源數據庫、目標數據庫配置、字典文件和歸檔文件路徑。
在該類中設置了數據同步開始SCN號、源數據庫配置、目標數據庫配置以及字典文件/日誌文件路徑。須要注意的是在源數據庫配置中有兩個用戶:一個是調用LogMiner用戶,該用戶須要擁有dbms_logmnr、dbms_logmnr_d兩個過程權限,在該Demo中該用爲爲sync;另一個爲LogMiner讀取該用戶操做SQL語句,在該Demo中該用爲爲LOGMINER。
package com.constants; /** * [Constants]|描述:Logminer配置參數 * @做者: *** * @日期: 2013-1-15 下午01:53:57 * @修改歷史: */ public class Constants { /** 上次數據同步最後SCN號 */ public static String LAST_SCN = "0"; /** 源數據庫配置 */ public static String DATABASE_DRIVER="oracle.jdbc.driver.OracleDriver"; public static String SOURCE_DATABASE_URL="jdbc:oracle:thin:@127.0.0.1:1521:practice"; public static String SOURCE_DATABASE_USERNAME="sync"; public static String SOURCE_DATABASE_PASSWORD="sync"; public static String SOURCE_CLIENT_USERNAME = "LOGMINER"; /** 目標數據庫配置 */ public static String SOURCE_TARGET_URL="jdbc:oracle:thin:@127.0.0.1:1521:target"; public static String SOURCE_TARGET_USERNAME="target"; public static String SOURCE_TARGET_PASSWORD="target"; /** 日誌文件路徑 */ public static String LOG_PATH = "D:\\oracle\\oradata\\practice"; /** 數據字典路徑 */ public static String DATA_DICTIONARY = "D:\\oracle\\oradata\\practice\\LOGMNR"; }
在該類中有兩個方法,第一個方法爲createDictionary,做用爲生成數據字典文件,另一個是startLogmur,該方法是LogMiner分析同步方法。
/** * <p>方法名稱: createDictionary|描述: 調用logminer生成數據字典文件</p> * @param sourceConn 源數據庫鏈接 * @throws Exception 異常信息 */ public void createDictionary(Connection sourceConn) throws Exception{ String createDictSql = "BEGIN dbms_logmnr_d.build(dictionary_filename => 'dictionary.ora', dictionary_location =>'"+Constants.DATA_DICTIONARY+"'); END;"; CallableStatement callableStatement = sourceConn.prepareCall(createDictSql); callableStatement.execute(); }
/** * <p>方法名稱: startLogmur|描述:啓動logminer分析 </p> * @throws Exception */ public void startLogmur() throws Exception{ Connection sourceConn = null; Connection targetConn = null; try { ResultSet resultSet = null; // 獲取源數據庫鏈接 sourceConn = DataBase.getSourceDataBase(); Statement statement = sourceConn.createStatement(); // 添加全部日誌文件,本代碼僅分析聯機日誌 StringBuffer sbSQL = new StringBuffer(); sbSQL.append(" BEGIN"); sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\\REDO01.LOG', options=>dbms_logmnr.NEW);"); sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\\REDO02.LOG', options=>dbms_logmnr.ADDFILE);"); sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\\REDO03.LOG', options=>dbms_logmnr.ADDFILE);"); sbSQL.append(" END;"); CallableStatement callableStatement = sourceConn.prepareCall(sbSQL+""); callableStatement.execute(); // 打印獲分析日誌文件信息 resultSet = statement.executeQuery("SELECT db_name, thread_sqn, filename FROM v$logmnr_logs"); while(resultSet.next()){ System.out.println("已添加日誌文件==>"+resultSet.getObject(3)); } System.out.println("開始分析日誌文件,起始scn號:"+Constants.LAST_SCN); callableStatement = sourceConn.prepareCall("BEGIN dbms_logmnr.start_logmnr(startScn=>'"+Constants.LAST_SCN+"',dictfilename=>'"+Constants.DATA_DICTIONARY+"\\dictionary.ora',OPTIONS =>DBMS_LOGMNR.COMMITTED_DATA_ONLY+dbms_logmnr.NO_ROWID_IN_STMT);END;"); callableStatement.execute(); System.out.println("完成分析日誌文件"); // 查詢獲取分析結果 System.out.println("查詢分析結果"); resultSet = statement.executeQuery("SELECT scn,operation,timestamp,status,sql_redo FROM v$logmnr_contents WHERE seg_owner='"+Constants.SOURCE_CLIENT_USERNAME+"' AND seg_type_name='TABLE' AND operation !='SELECT_FOR_UPDATE'"); // 鏈接到目標數據庫,在目標數據庫執行redo語句 targetConn = DataBase.getTargetDataBase(); Statement targetStatement = targetConn.createStatement(); String lastScn = Constants.LAST_SCN; String operation = null; String sql = null; boolean isCreateDictionary = false; while(resultSet.next()){ lastScn = resultSet.getObject(1)+""; if( lastScn.equals(Constants.LAST_SCN) ){ continue; } operation = resultSet.getObject(2)+""; if( "DDL".equalsIgnoreCase(operation) ){ isCreateDictionary = true; } sql = resultSet.getObject(5)+""; // 替換用戶 sql = sql.replace("\""+Constants.SOURCE_CLIENT_USERNAME+"\".", ""); System.out.println("scn="+lastScn+",自動執行sql=="+sql+""); try { targetStatement.executeUpdate(sql.substring(0, sql.length()-1)); } catch (Exception e) { System.out.println("測試一下,已經執行過了"); } } // 更新scn Constants.LAST_SCN = (Integer.parseInt(lastScn))+""; // DDL發生變化,更新數據字典 if( isCreateDictionary ){ System.out.println("DDL發生變化,更新數據字典"); createDictionary(sourceConn); System.out.println("完成更新數據字典"); isCreateDictionary = false; } System.out.println("完成一個工做單元"); } finally{ if( null != sourceConn ){ sourceConn.close(); } if( null != targetConn ){ targetConn.close(); } sourceConn = null; targetConn = null; } }
一、建立AAAAA表,並插入數據
二、建立EMP1表
在控制檯中輸出以下日誌
建立AAAAA和EMP1表,並在AAAAA插入了數據