Oracle Goldengate是如何保證數據有序和確保數據不丟失的?

工做中一直在用Oracle 的中間件Oracle GondenGate 是如何保證消息的有序和不丟失呢?數據庫

Oracle GoldenGate邏輯架構

首先,先看一下Oracle GoldenGate 的邏輯架構:網絡

 

圖中涉及到兩個階段:架構

  1. 初始化階段: extract 進程直接抽取源表信息經網絡傳輸到target 端的 replicat進程,replicat 進程獲取到初始化加載數據將其同步到目標數據源。
  2. 增量數據抓取階段:extract 進程從源表redo log 或其餘增量日誌中解析並獲取增量,而後落地成數據文件;而後pump進程將數據經網絡推送到目標端的collector進程。collector是由manager進程維護的,有新的pump數據過來,它會啓動一個新的collector,這個collector綁定到特定的端口上,經過TCP/IP鏈接,負責接收特定pump進程推送過來的數據並落地到指定的目錄下生成trail文件。replicat 進程實時讀取 trail 文件並將數據推送給kafka。

官方關於 trail文件的說明以下:oracle

To support the continuous extraction and replication of database changes, Oracle GoldenGate stores records of the captured changes temporarily on disk in a series of files called a trail. A trail can exist on the source system, an intermediary system, the target system, or any combination of those systems, depending on how you configure Oracle GoldenGate. On the local system it is known as an extract trail (or local trail). On a remote system it is known as a remote trail.app

By using a trail for storage, Oracle GoldenGate supports data accuracy and fault tolerance (see Section 1.2.6, "Overview of Checkpoints"). The use of a trail also allows extraction and replication activities to occur independently of each other. With these processes separated, you have more choices for how data is processed and delivered. For example, instead of extracting and replicating changes continuously, you could extract changes continuously but store them in the trail for replication to the target later, whenever the target application needs them.spa

即trail 中保存的是數據庫中的變化數據。Oracle GoldenGate用trail 作存儲,確保數據的準確性和容錯性。它也容許extract進程和replicat進程能夠獨立存在,相似於消息中間件的做用。線程

checkpoint保證數據不丟失和有序性

下面看一下官方給出的checkpoint 的案例(原本想用項目的真實checkpoint信息,爲避免沒必要要的麻煩,做罷):3d

注意這個是Oracle RAC模式下checkpoint信息。日誌

查看extract進程checkpoint信息命令:INFO EXTRACT JC108XT,SHOWCHcode

extract 進程checkpoint信息以下:

EXTRACT    JC108XT Last Started 2011-01-01 14:15   Status ABENDED
Checkpoint Lag       00:00:00 (updated 00:00:01 ago) Log Read Checkpoint File /orarac/oradata/racq/redo01.log 2011-01-01 14:16:45 Thread 1, Seqno 47, RBA 68748800 Log Read Checkpoint File /orarac/oradata/racq/redo04.log 2011-01-01 14:16:19 Thread 2, Seqno 24, RBA 65657408 Current Checkpoint Detail: Read Checkpoint #1 Oracle RAC Redo Log Startup Checkpoint (starting position in data source): Thread #: 1 Sequence #: 47 RBA: 68548112 Timestamp: 2011-01-01 13:37:51.000000 SCN: 0.8439720 Redo File: /orarac/oradata/racq/redo01.log Recovery Checkpoint (position of oldest unprocessed transaction in data source): Thread #: 1 Sequence #: 47 RBA: 68748304 Timestamp: 2011-01-01 14:16:45.000000 SCN: 0.8440969 Redo File: /orarac/oradata/racq/redo01.log Current Checkpoint (position of last record read in the data source): Thread #: 1 Sequence #: 47 RBA: 68748800 Timestamp: 2011-01-01 14:16:45.000000 SCN: 0.8440969 Redo File: /orarac/oradata/racq/redo01.log Read Checkpoint #2 Oracle RAC Redo Log Startup Checkpoint(starting position in data source): Sequence #: 24 RBA: 60607504 Timestamp: 2011-01-01 13:37:50.000000 SCN: 0.8439719 Redo File: /orarac/oradata/racq/redo04.log Recovery Checkpoint (position of oldest unprocessed transaction in data source): Thread #: 2 Sequence #: 24 RBA: 65657408 Timestamp: 2011-01-01 14:16:19.000000 SCN: 0.8440613 Redo File: /orarac/oradata/racq/redo04.log Current Checkpoint (position of last record read in the data source): Thread #: 2 Sequence #: 24 RBA: 65657408 Timestamp: 2011-01-01 14:16:19.000000 SCN: 0.8440613 Redo File: /orarac/oradata/racq/redo04.log Write Checkpoint #1 GGS Log Trail Current Checkpoint (current write position): Sequence #: 2 RBA: 2142224 Timestamp: 2011-01-01 14:16:50.567638 Extract Trail: ./dirdat/eh Header: Version = 2 Record Source = A Type = 6 # Input Checkpoints = 2 # Output Checkpoints = 1 File Information: Block Size = 2048 Max Blocks = 100 Record Length = 2048 Current Offset = 0 Configuration: Data Source = 3 Transaction Integrity = 1 Task Type = 0 Status: Start Time = 2011-01-01 14:15:14 Last Update Time = 2011-01-01 14:16:50 Stop Status = A Last Result = 400

 

關於Extract的read幾種checkpoint解釋:

1. extract將read checkpoints放置在數據源中。若是數據源是Oracle,則檢查點是放在Oracle的日誌中。

2. Startup checkpoint:啓動檢查點是進程啓動時在數據源中建立的第一個檢查點。

  • Thread #: 建立檢查點的線程數,只有Oracle的RAC模式纔會有

  • Sequence #: 建立檢查點的事務日誌的序列號

  • RBA: RBA是relative byte address的簡寫,表示建立檢查點的記錄的相對字節地址

  • Timestamp: 表示建立檢查點的記錄的時間戳

  • SCN: SCN是system change number的簡寫,表示系統更改檢查點所在記錄的編號

  • Redo File: 包含建立檢查點的記錄的事務日誌的路徑名

3. Recovery checkpoint:恢復檢查點表示extract未處理的最先的事務日誌的位置信息。

4. Current checkpoint:表示extract在數據源中讀的最近的(注意:此時尚未寫成功)記錄的位置信息。它應該和 Log Read Checkpoint 信息一致。

關於extract的寫的checkpoint解釋:

extract進程將 current checkpoint 放在trail 文件中。current checkpoint 是指extract 正在寫的trail的位置。

  • Sequence #: 寫入檢查點的trail文件的序列號

  • RBA:trail文件中建立檢查點的記錄的相對字節地址

  • Timestamp: 建立檢查點的記錄的時間戳

  • Extract trail: trail文件的相對路徑名稱

  • Trail Type: 其中在相似於NFS服務上的被認爲是local

查看 replicat 進程 checkpoint 信息命令:INFO REPLICAT JC108RP, SHOWCH

replicat 進程checkpoint 信息以下:

REPLICAT   JC108RP   Last Started 2011-01-12 13:10   Status RUNNING
Checkpoint Lag       00:00:00 (updated 111:46:54 ago) Log Read Checkpoint File ./dirdat/eh000000 First Record RBA 3702915 Current Checkpoint Detail: Read Checkpoint #1 GGS Log Trail Startup Checkpoint(starting position in data source): Sequence #: 0 RBA: 3702915 Timestamp: Not Available Extract Trail: ./dirdat/eh Current Checkpoint (position of last record read in the data source): Sequence #: 0 RBA: 3702915 Timestamp: Not Available Extract Trail: ./dirdat/eh Header: Version = 2 Record Source = A Type = 1 # Input Checkpoints = 1 # Output Checkpoints = 0 File Information: Block Size = 2048 Max Blocks = 100 Record Length = 2048 Current Offset = 0 Configuration: Data Source = 0 Transaction Integrity = -1 Task Type = 0 Status: Start Time = 2011-01-12 13:10:13 Last Update Time = 2011-01-12 21:23:31 Stop Status = A Last Result = 400

 

 

1. Startup Checkpoint

當進程啓動時在trail文件中建立的第一個checkpoint

  • Sequence #: 寫入檢查點的trail文件的序列號

  • RBA: trail文件中建立檢查點的記錄的相對字節地址

  • Timestamp: 表示建立檢查點的記錄的時間戳

  • Extract Trail: trail 文件的相對地址

2. Current Checkpoint:current checkpoint 是指replicat 進程讀取trail文件的最近的記錄的位置。

 

抽取的最終的日誌格式使得實現冪等性操做成爲可能

Oracle GoldenGate的日誌格式是snapshot格式的,試想一下,假設我一條記錄的某個字段 作累加操做,Oracle GoldenGate給咱們的數據是增量數據,在at-least-once語義之上,進行屢次傳輸,那麼數據最終會出問題。而snapshot數據,只須要根據主鍵不斷覆蓋便可。這種數據是支持冪等性操做的。

 

總結

1. 關於數據容錯性。
  1. 其一,日誌格式是snapshot的,使得實現冪等性操做成爲可能。
  2. 其二Oracle GoldenGate的checkpoint機制和消費者保存消費的offset的機制是同樣的。都支持at-least-once的語義。由於頗有可能出現已經寫數據成功,但更新checkpoint數據失敗,即kafka中的數據可能會出現重複的現象。因此在處理Oracle GoldenGate的消息時,要確保最終落地的操做是冪等性操做,這樣數據至少不會丟。通常冪等性都須要惟一id做爲標識,通常選用數據的主鍵作惟一id。若是沒有主鍵能夠使用其餘方案,切記生成的id要惟一且可重複生成,即同一條記錄根據id生成規則,永遠是相同的id且在必定時間範圍內不衝突。
2. 關於有序性。
  1. 其一數據在Oracle GoldenGate傳輸過程當中,同一個extract進程落到一個文件中,經網絡傳輸會被響應的collector接收並把數據放到對應的rmtrail中,其中 local trail 和 remote trail 是一一對應的。至關於數據從一個分區傳輸到另一個固定對應的分區,只要數據有序傳輸(它經過讀寫checkpoint來保證),那麼最終remote trail和local trail的順序確定是一致的(可能會有重複)。
  2. 其二,replicat落到kafka的數據是單分區的,保證了放到kafka的數據的有序性。
 
 

參考:

Oracle GoldenGate文檔庫:https://docs.oracle.com/goldengate/1212/gg-winux/GWUAD/wu_about_gg.htm#GWUAD117

Oracle官方對 Checkpoint 的術語的解釋:https://docs.oracle.com/goldengate/1212/gg-winux/GWUAD/wu_ogg_checkpts.htm#GWUAD965

相關文章
相關標籤/搜索