Canal:同步mysql增量數據工具,一篇詳解核心知識點

老劉是一名即將找工做的研二學生,寫博客一方面是總結大數據開發的知識點,一方面是但願可以幫助夥伴讓自學今後不求人。因爲老劉是自學大數據開發,博客中確定會存在一些不足,還但願你們可以批評指正,讓咱們一塊兒進步!java

背景

大數據領域數據源有業務庫的數據,也有移動端埋點數據、服務器端產生的日誌數據。咱們在對數據進行採集時根據下游對數據的要求不一樣,咱們可使用不一樣的採集工具來進行。今天老劉給你們講的是同步mysql增量數據的工具Canal,本篇文章的大綱以下:mysql

  1. Canal 的概念
  2. mysql 中主備複製實現原理
  3. Canal 如何從 MySQL 中同步數據
  4. Canal 的 HA 機制設計
  5. 各類數據同步解決方法的簡單總結

老劉爭取用這一篇文章讓你們直接上手 Canal 這個工具,再也不花別的時間來學習。web

mysql 主備複製實現原理

因爲 Canal 是用來同步 mysql 中增量數據的,因此老劉先講 mysql 的主備複製原理,以後再講 Canal 的核心知識點。sql

根據這張圖,老劉把 mysql 的主備複製原理分解爲以下流程:數據庫

  1. 主服務器首先必須啓動二進制日誌 binlog,用來記錄任何修改了數據庫數據的事件。
  2. 主服務器將數據的改變記錄到二進制 binlog 日誌。
  3. 從服務器會將主服務器的二進制日誌複製到其本地的中繼日誌(Relaylog)中。這一步細化的說就是首先從服務器會啓動一個工做線程 I/O 線程,I/O 線程會跟主庫創建一個普通的客戶單鏈接,而後在主服務器上啓動一個特殊的二進制轉儲(binlog dump)線程,這個 binlog dump 線程會讀取主服務器上二進制日誌中的事件,而後向 I/O 線程發送二進制事件,並保存到從服務器上的中繼日誌中。
  4. 從服務器啓動 SQL 線程,從中繼日誌中讀取二進制日誌,而且在從服務器本地會再執行一次數據修改操做,從而實現從服務器數據的更新。

那麼 mysql 主備複製實現原理就講完了,你們看完這個流程,能不能猜到 Canal 的工做原理?服務器

Canal 核心知識點

Canal 的工做原理

Canal 的工做原理就是它模擬 MySQL slave 的交互協議,把本身假裝爲 MySQL slave,向 MySQL master 發動 dump 協議。MySQL master 收到 dump 請求後,就會開始推送 binlog 給 Canal。最後 Canal 就會解析 binlog 對象。架構

Canal 概念

Canal,美[kəˈnæl],是這樣讀的,意思是水道/管道/渠道,主要用途就是用來同步 MySQL 中的增量數據(能夠理解爲實時數據),是阿里巴巴旗下的一款純 Java 開發的開源項目。併發

Canal 架構

server 表明一個 canal 運行實例,對應於一個 JVM。 instance 對應於一個數據隊列,1 個 canal server 對應 1..n 個 instance instance 下的子模塊:框架

  1. EventParser:數據源接入,模擬 salve 協議和 master 進行交互,協議解析
  2. EventSink:Parser 和 Store 連接器,進行數據過濾,加工,分發的工做
  3. EventStore:數據存儲
  4. MetaManager: 增量訂閱&消費信息管理器

到如今 Canal 的基本概念就講完了,那接下來就要講 Canal 如何同步 mysql 的增量數據。編輯器

Canal 同步 MySQL 增量數據

開啓 mysql binlog

咱們用 Canal 同步 mysql 增量數據的前提是 mysql 的 binlog 是開啓的,阿里雲的 mysql 數據庫是默認開啓 binlog 的,可是若是咱們是本身安裝的 mysql 須要手動開啓 binlog 日誌功能。

先找到 mysql 的配置文件:

etc/my.cnf

server-id=1
log-bin=mysql-bin
binlog-format=ROW

這裏有一個知識點是關於 binlog 的格式,老劉給你們講講。

binlog 的格式有三種:STATEMENT、ROW、MIXED

  1. ROW 模式(通常就用它)

    日誌會記錄每一行數據被修改的形式,不會記錄執行 SQL 語句的上下文相關信息,只記錄要修改的數據,哪條數據被修改了,修改爲了什麼樣子,只有 value,不會有 SQL 多表關聯的狀況。

    優勢:它僅僅只須要記錄哪條數據被修改了,修改爲什麼樣子了,因此它的日誌內容會很是清楚地記錄下每一行數據修改的細節,很是容易理解。

    缺點:ROW 模式下,特別是數據添加的狀況下,全部執行的語句都會記錄到日誌中,都將以每行記錄的修改來記錄,這樣會產生大量的日誌內容。

  2. STATEMENT 模式

    每條會修改數據的 SQL 語句都會被記錄下來。

    缺點:因爲它是記錄的執行語句,因此,爲了讓這些語句在 slave 端也能正確執行,那他還必須記錄每條語句在執行過程當中的一些相關信息,也就是上下文信息,以保證全部語句在 slave 端被執行的時候可以獲得和在 master 端執行時候相同的結果。

    但目前例如 step()函數在有些版本中就不能被正確複製,在存儲過程當中使用了 last-insert-id()函數,可能會使 slave 和 master 上獲得不一致的 id,就是會出現數據不一致的狀況,ROW 模式下就沒有。

  3. MIXED 模式

    以上兩種模式都使用。

Canal 實時同步

  1. 首先咱們要配置環境,在 conf/example/instance.properties 下:
 ## mysql serverId
 canal.instance.mysql.slaveId = 1234
 #position info,須要修改爲本身的數據庫信息
 canal.instance.master.address = 127.0.0.1:3306
 canal.instance.master.journal.name =
 canal.instance.master.position =
 canal.instance.master.timestamp =
 #canal.instance.standby.address =
 #canal.instance.standby.journal.name =
 #canal.instance.standby.position =
 #canal.instance.standby.timestamp =
 #username/password,須要修改爲本身的數據庫信息
 canal.instance.dbUsername = canal
 canal.instance.dbPassword = canal
 canal.instance.defaultDatabaseName =
 canal.instance.connectionCharset = UTF-8
 #table regex
 canal.instance.filter.regex = .\*\\\\..\*

其中,canal.instance.connectionCharset 表明數據庫的編碼方式對應到 java 中的編碼類型,好比 UTF-8,GBK,ISO-8859-1。

  1. 配置完後,就要啓動了
 sh bin/startup.sh
 關閉使用 bin/stop.sh
  1. 觀察日誌

    通常使用 cat 查看 canal/canal.log、example/example.log

  2. 啓動客戶端

    在 IDEA 中業務代碼,mysql 中若是有增量數據就拉取過來,在 IDEA 控制檯打印出來

    在 pom.xml 文件中添加:

 <dependency>
   <groupId>com.alibaba.otter</groupId>
   <artifactId>canal.client</artifactId>
   <version>1.0.12</version>
 </dependency>

添加客戶端代碼:

public class Demo {
 public static void main(String[] args) {
     //建立鏈接
     CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
             "example""""");
     connector.connect();
     //訂閱
     connector.subscribe();
     connector.rollback();
     int batchSize = 1000;
     int emptyCount = 0;
     int totalEmptyCount = 100;
     while (totalEmptyCount > emptyCount) {
         Message msg = connector.getWithoutAck(batchSize);
         long id = msg.getId();
         List<CanalEntry.Entry> entries = msg.getEntries();
         if(id == -1 || entries.size() == 0){
             emptyCount++;
             System.out.println("emptyCount : " + emptyCount);
             try {
                 Thread.sleep(3000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }else{
             emptyCount = 0;
             printEntry(entries);
         }
         connector.ack(id);
     }
 }
 // batch -> entries -> rowchange - rowdata -> cols
 private static void printEntry(List<CanalEntry.Entry> entries) {
     for (CanalEntry.Entry entry : entries){
         if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                 entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
             continue;
         }
         CanalEntry.RowChange rowChange = null;
         try {
             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
         } catch (InvalidProtocolBufferException e) {
             e.printStackTrace();
         }
         CanalEntry.EventType eventType = rowChange.getEventType();
         System.out.println(entry.getHeader().getLogfileName()+" __ " +
                 entry.getHeader().getSchemaName() + " __ " + eventType);
         List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
         for(CanalEntry.RowData rowData : rowDatasList){
             for(CanalEntry.Column column: rowData.getAfterColumnsList()){
                 System.out.println(column.getName() + " - " +
                         column.getValue() + " - " +
                         column.getUpdated());
             }
         }
     }
 }
}
  1. 在mysql中寫數據,客戶端就會把增量數據打印到控制檯。

Canal 的 HA 機制設計

在大數據領域不少框架都會有 HA 機制,Canal 的 HA 分爲兩部分,Canal server 和 Canal client 分別有對應的 HA 實現:

  1. canal server:爲了減小對 mysql dump 的請求,不一樣 server 上的 instance 要求同一時間只能有一個處於 running,其餘的處於 standby 狀態。
  2. canal client:爲了保證有序性,一份 instance 同一時間只能由一個 canal client 進行 get/ack/rollback 操做,不然客戶端接收沒法保證有序。

整個 HA 機制的控制主要是依賴了 ZooKeeper 的幾個特性,ZooKeeper 這裏就不講了。

Canal Server:

  1. canal server 要啓動某個 canal instance 時都先向 ZooKeeper 進行一次嘗試啓動判斷(建立 EPHEMERAL 節點,誰建立成功就容許誰啓動)。
  2. 建立 ZooKeeper 節點成功後,對應的 canal server 就啓動對應的 canal instance,沒有建立成功的 canal instance 就會處於 standby 狀態。
  3. 一旦 ZooKeeper 發現 canal server 建立的節點消失後,當即通知其餘的 canal server 再次進行步驟 1 的操做,從新選出一個 canal server 啓動 instance。
  4. canal client 每次進行 connect 時,會首先向 ZooKeeper 詢問當前是誰啓動了 canal instance,而後和其創建鏈接,一旦鏈接不可用,會從新嘗試 connect。
  5. canal client 的方式和 canal server 方式相似,也是利用 ZooKeeper 的搶佔 EPHEMERAL 節點的方式進行控制。

Canal HA 的配置,並把數據實時同步到 kafka 中。

  1. 修改 conf/canal.properties 文件
 canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181
 canal.serverMode = kafka
 canal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092
  1. 配置 conf/example/example.instance
  canal.instance.mysql.slaveId = 790 /兩臺canal server的slaveID惟一
  canal.mq.topic = canal_log //指定將數據發送到kafka的topic

數據同步方案總結

講完了 Canal 工具,如今給你們簡單總結下目前常見的數據採集工具,不會涉及架構知識,只是簡單總結,讓你們有個印象。

常見的數據採集工具備:DataX、Flume、Canal、Sqoop、LogStash 等。

DataX (處理離線數據)

DataX 是阿里巴巴開源的一個異構數據源離線同步工具,異構數據源離線同步指的是將源端數據同步到目的端,可是端與端的數據源類型種類繁多,在沒有 DataX 以前,端與端的鏈路將組成一個複雜的網狀結構,很是零散沒法把同步核心邏輯抽象出來。

爲了解決異構數據源同步問題,DataX 將複雜的網狀的同步鏈路變成了星型數據鏈路,DataX 做爲中間傳輸載體負責鏈接各類數據源。

因此,當須要接入一個新的數據源的時候,只須要將此數據源對接到 DataX,就能夠跟已有的數據源作到無縫數據同步。

DataX自己做爲離線數據同步框架,採用Framework+plugin架構構建。將數據源讀取和寫入抽象成爲Reader/Writer插件,歸入到整個同步框架中。

  1. Reader: 它爲數據採集模塊,負責採集數據源的數據,將數據發送給Framework。
  2. Writer: 它爲數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。
  3. Framework:它用於鏈接Reader和Writer,做爲二者的數據傳輸通道,並處理緩衝、併發、數據轉換等問題。

DataX的核心架構以下圖:

核心模塊介紹:

  1. DataX完成單個數據同步的做業,咱們把它稱之爲Job,DataX接收到一個Job以後,將啓動一個進程來完成整個做業同步過程。
  2. DataX Job啓動後,會根據不一樣的源端切分策略,將Job切分紅多個小的Task(子任務),以便於併發執行。
  3. 切分多個Task以後,DataX Job會調用Scheduler模塊,根據配置的併發數據量,將拆分紅的Task從新組合,組裝成TaskGroup(任務組)。每個TaskGroup負責以必定的併發運行完畢分配好的全部Task,默認單個任務組的併發數量爲5。
  4. 每個Task都由TaskGroup負責啓動,Task啓動後,會固定啓動Reader->Channel->Writer的線程來完成任務同步工做。
  5. DataX做業運行完成以後,Job監控並等待多個TaskGroup模塊任務完成,等待全部TaskGroup任務完成後Job成功退出。不然,異常退出。

Flume(處理實時數據)

Flume主要應用的場景是同步日誌數據,主要包含三個組件:Source、Channel、Sink。

Flume最大的優勢就是官網提供了豐富的Source、Channel、Sink,根據不一樣的業務需求,咱們能夠在官網查找相關配置。另外,Flume還提供了自定義這些組件的接口。

Logstash(處理離線數據)

Logstash就是一根具有實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可讓你根據本身的需求在中間加上過濾網,Logstash提供了不少功能強大的過濾網來知足各類應用場景。

Logstash是由JRuby編寫,使用基於消息的簡單架構,在JVM上運行。在管道內的數據流稱之爲event,它分爲inputs階段、filters階段、outputs階段。

Sqoop(處理離線數據)

Sqoop是Hadoop和關係型數據庫之間傳送數據的一種工具,它是用來從關係型數據庫如MySQL到Hadoop的HDFS從Hadoop文件系統導出數據到關係型數據庫。Sqoop底層用的仍是MapReducer,用的時候必定要注意數據傾斜。

總結

老劉本篇文章主要講述了Canal工具的核心知識點及其數據採集工具的對比,其中數據採集工具只是大體講了講概念和應用,目的也是讓你們有個印象。老劉敢作保證看完這篇文章基本等於入門,剩下的就是練習了。

好啦,同步mysql增量數據的工具Canal的內容就講完了,儘管當前水平可能不及各位大佬,但老劉會努力變得更加優秀,讓各位小夥伴自學今後不求人!

若是有相關問題,聯繫公衆號:努力的老劉。文章都看到這了,點贊關注支持一波!

相關文章
相關標籤/搜索